Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2 0/5] SWIM on_member_update
@ 2019-06-08 10:31 Vladislav Shpilevoy
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
                   ` (4 more replies)
  0 siblings, 5 replies; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

This patchset introduces API to set triggers on SWIM member updates, what
makes Lua SWIM asynchronous.

V1: https://www.freelists.org/post/tarantool-patches/PATCH-05-SWIM-on-member-update
Changes in V2:
- Renames.
- swim:delete() now waits for worker fiber join.

Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-4250-swim-triggers
Issue: https://github.com/tarantool/tarantool/issues/4250

Vladislav Shpilevoy (5):
  test: create isolated ev_loop for swim unit tests
  swim: fix a 'use after free' in SWIM tests
  swim: allow to set triggers on member updates
  swim: call swim:new/delete via Lua C, not via FFI
  swim: expose Lua triggers on member events

 extra/exports                   |   2 -
 src/CMakeLists.txt              |   3 +-
 src/lib/swim/swim.c             | 232 ++++++++++++++++++++++++----
 src/lib/swim/swim.h             |  47 +++++-
 src/lib/swim/swim_ev.c          |   6 +
 src/lib/swim/swim_ev.h          |  18 +++
 src/lib/swim/swim_io.c          |  15 +-
 src/lua/init.c                  |   2 +
 src/lua/swim.c                  | 106 +++++++++++++
 src/lua/swim.h                  |  34 +++++
 src/lua/swim.lua                | 101 +++++++++++-
 test/swim/swim.result           | 263 ++++++++++++++++++++++++++++++++
 test/swim/swim.test.lua         |  85 +++++++++++
 test/unit/swim.c                | 146 +++++++++++++++++-
 test/unit/swim.result           |  28 +++-
 test/unit/swim_test_ev.c        |  16 ++
 test/unit/swim_test_transport.c |   1 +
 test/unit/swim_test_utils.c     |  46 +++++-
 test/unit/swim_test_utils.h     |  12 ++
 19 files changed, 1107 insertions(+), 56 deletions(-)
 create mode 100644 src/lua/swim.c
 create mode 100644 src/lua/swim.h

-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] [PATCH v2 1/5] test: create isolated ev_loop for swim unit tests
  2019-06-08 10:31 [tarantool-patches] [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
@ 2019-06-08 10:31 ` Vladislav Shpilevoy
  2019-06-08 14:32   ` [tarantool-patches] " Konstantin Osipov
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
                   ` (3 subsequent siblings)
  4 siblings, 1 reply; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

The SWIM unit tests code with the fake events and time does lots
of forbidden things: it manually invokes pending watcher
callbacks; manages global time without a kernel; puts not
existing descriptors into the loop. These foul blows open the
gates to the full control over IO events, descriptors, virtual
time. Hundreds of virtual seconds pass in milliseconds in
reality, it makes SWIM unit tests fast despite complex logic.

All these actions does not affect the loop until yield. On yield
a scheduler fiber wakes up and

    1) infinitely generates EV_READ on not existing descriptors
       because a kernel considers them closed;

    2) manual pending callbacks invocation asserts, because it is
       not allowed for non-scheduler fibers.

To avoid these problems a new isolated loop is created, not
visible for the scheduler. Here the fake events library can rack
and ruin whatever it wants.

Needed for #4250
---
 src/lib/swim/swim.c         | 21 ++++++++++++---------
 src/lib/swim/swim_ev.c      |  6 ++++++
 src/lib/swim/swim_ev.h      | 18 ++++++++++++++++++
 src/lib/swim/swim_io.c      | 15 ++++++++-------
 test/unit/swim_test_ev.c    | 16 ++++++++++++++++
 test/unit/swim_test_utils.c |  2 +-
 6 files changed, 61 insertions(+), 17 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index f7a885b76..46b76731d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -519,7 +519,7 @@ swim_wait_ack(struct swim *swim, struct swim_member *member,
 			timeout *= 2;
 		member->ping_deadline = swim_time() + timeout;
 		wait_ack_heap_insert(&swim->wait_ack_heap, member);
-		swim_ev_timer_again(loop(), &swim->wait_ack_tick);
+		swim_ev_timer_again(swim_loop(), &swim->wait_ack_tick);
 	}
 }
 
@@ -802,7 +802,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		return NULL;
 	}
 	if (mh_size(swim->members) > 1)
-		swim_ev_timer_again(loop(), &swim->round_tick);
+		swim_ev_timer_again(swim_loop(), &swim->round_tick);
 
 	/* Dissemination component. */
 	swim_on_member_update(swim, member);
@@ -1122,7 +1122,7 @@ swim_complete_step(struct swim_task *task,
 	 * It could be stopped by the step begin function, if the
 	 * sending was too long.
 	 */
-	swim_ev_timer_again(loop(), &swim->round_tick);
+	swim_ev_timer_again(swim_loop(), &swim->round_tick);
 	/*
 	 * It is possible that the original member was deleted
 	 * manually during the task execution.
@@ -1813,16 +1813,17 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		addr = swim->self->addr;
 	}
 	struct ev_timer *t = &swim->round_tick;
+	struct ev_loop *l = swim_loop();
 	if (t->repeat != heartbeat_rate && heartbeat_rate > 0) {
 		swim_ev_timer_set(t, 0, heartbeat_rate);
 		if (swim_ev_is_active(t))
-			swim_ev_timer_again(loop(), t);
+			swim_ev_timer_again(l, t);
 	}
 	t = &swim->wait_ack_tick;
 	if (t->repeat != ack_timeout && ack_timeout > 0) {
 		swim_ev_timer_set(t, 0, ack_timeout);
 		if (swim_ev_is_active(t))
-			swim_ev_timer_again(loop(), t);
+			swim_ev_timer_again(l, t);
 	}
 
 	if (new_self != NULL) {
@@ -1953,9 +1954,10 @@ swim_size(const struct swim *swim)
 void
 swim_delete(struct swim *swim)
 {
+	struct ev_loop *l = swim_loop();
 	swim_scheduler_destroy(&swim->scheduler);
-	swim_ev_timer_stop(loop(), &swim->round_tick);
-	swim_ev_timer_stop(loop(), &swim->wait_ack_tick);
+	swim_ev_timer_stop(l, &swim->round_tick);
+	swim_ev_timer_stop(l, &swim->wait_ack_tick);
 	mh_int_t node;
 	mh_foreach(swim->members, node) {
 		struct swim_member *m =
@@ -2018,8 +2020,9 @@ void
 swim_quit(struct swim *swim)
 {
 	assert(swim_is_configured(swim));
-	swim_ev_timer_stop(loop(), &swim->round_tick);
-	swim_ev_timer_stop(loop(), &swim->wait_ack_tick);
+	struct ev_loop *l = swim_loop();
+	swim_ev_timer_stop(l, &swim->round_tick);
+	swim_ev_timer_stop(l, &swim->wait_ack_tick);
 	swim_scheduler_stop_input(&swim->scheduler);
 	/* Start the last round - quiting. */
 	swim_new_round(swim);
diff --git a/src/lib/swim/swim_ev.c b/src/lib/swim/swim_ev.c
index 49c8c273b..82668d41d 100644
--- a/src/lib/swim/swim_ev.c
+++ b/src/lib/swim/swim_ev.c
@@ -55,3 +55,9 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher)
 {
 	ev_timer_stop(loop, watcher);
 }
+
+struct ev_loop *
+swim_loop(void)
+{
+	return loop();
+}
diff --git a/src/lib/swim/swim_ev.h b/src/lib/swim/swim_ev.h
index 1bd81306f..37e743d45 100644
--- a/src/lib/swim/swim_ev.h
+++ b/src/lib/swim/swim_ev.h
@@ -52,6 +52,24 @@ swim_ev_timer_again(struct ev_loop *loop, struct ev_timer *watcher);
 void
 swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher);
 
+/**
+ * The unit tests code with the fake events and time does lots of
+ * forbidden things: it manually invokes pending watcher
+ * callbacks; manages global time without a kernel; puts not
+ * existing descriptors into the loop. All these actions does not
+ * affect the loop until yield. On yield a scheduler fiber wakes
+ * up and 1) infinitely generates EV_READ on not existing
+ * descriptors because considers them closed; 2) manual pending
+ * callbacks invocation asserts, because it is not allowed for
+ * non-scheduler fibers. To avoid these problems a new isolated
+ * loop is created, not visible for the scheduler. Here the fake
+ * events library can rack and ruin whatever it wants. This
+ * function is supposed to be an alias for 'loop()' in the
+ * Tarantool core, but be an isolated object in tests.
+ */
+struct ev_loop *
+swim_loop(void);
+
 #define swim_ev_is_active ev_is_active
 
 #define swim_ev_init ev_init
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index c55c276cb..e7ff321d4 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -148,7 +148,7 @@ swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler)
 {
 	assert(! swim_task_is_scheduled(task));
 	rlist_add_tail_entry(&scheduler->queue_output, task, in_queue_output);
-	swim_ev_io_start(loop(), &scheduler->output);
+	swim_ev_io_start(swim_loop(), &scheduler->output);
 }
 
 void
@@ -289,16 +289,17 @@ int
 swim_scheduler_bind(struct swim_scheduler *scheduler,
 		    const struct sockaddr_in *addr)
 {
-	swim_ev_io_stop(loop(), &scheduler->input);
-	swim_ev_io_stop(loop(), &scheduler->output);
+	struct ev_loop *l = swim_loop();
+	swim_ev_io_stop(l, &scheduler->input);
+	swim_ev_io_stop(l, &scheduler->output);
 	struct swim_transport *t = &scheduler->transport;
 	int rc = swim_transport_bind(t, (const struct sockaddr *) addr,
 				     sizeof(*addr));
 	if (t->fd >= 0) {
 		swim_ev_io_set(&scheduler->output, t->fd, EV_WRITE);
 		swim_ev_io_set(&scheduler->input, t->fd, EV_READ);
-		swim_ev_io_start(loop(), &scheduler->input);
-		swim_ev_io_start(loop(), &scheduler->output);
+		swim_ev_io_start(l, &scheduler->input);
+		swim_ev_io_start(l, &scheduler->output);
 	}
 	return rc;
 }
@@ -306,7 +307,7 @@ swim_scheduler_bind(struct swim_scheduler *scheduler,
 void
 swim_scheduler_stop_input(struct swim_scheduler *scheduler)
 {
-	swim_ev_io_stop(loop(), &scheduler->input);
+	swim_ev_io_stop(swim_loop(), &scheduler->input);
 }
 
 void
@@ -323,7 +324,7 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
 			t->cancel(t, scheduler, -1);
 	}
 	swim_transport_destroy(&scheduler->transport);
-	swim_ev_io_stop(loop(), &scheduler->output);
+	swim_ev_io_stop(swim_loop(), &scheduler->output);
 	swim_scheduler_stop_input(scheduler);
 }
 
diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c
index a4ffa2fc8..23d909b05 100644
--- a/test/unit/swim_test_ev.c
+++ b/test/unit/swim_test_ev.c
@@ -62,6 +62,19 @@ struct swim_event;
 typedef void (*swim_event_process_f)(struct swim_event *, struct ev_loop *);
 typedef void (*swim_event_delete_f)(struct swim_event *);
 
+/**
+ * An isolated event loop not visible to the fiber scheduler,
+ * where it is safe to use fake file descriptors, manually invoke
+ * callbacks etc.
+ */
+static struct ev_loop *test_loop;
+
+struct ev_loop *
+swim_loop(void)
+{
+	return test_loop;
+}
+
 /**
  * Base event. It is stored in the event heap and virtualizes
  * other events.
@@ -330,6 +343,8 @@ swim_test_ev_init(void)
 	events_hash = mh_i64ptr_new();
 	assert(events_hash != NULL);
 	event_heap_create(&event_heap);
+	test_loop = ev_loop_new(0);
+	assert(test_loop != NULL);
 }
 
 void
@@ -338,4 +353,5 @@ swim_test_ev_free(void)
 	swim_test_ev_reset();
 	event_heap_destroy(&event_heap);
 	mh_i64ptr_delete(events_hash);
+	ev_loop_destroy(test_loop);
 }
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index ffd42cbd0..f72fa2450 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -607,7 +607,7 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 {
 	swim_ev_set_brk(timeout);
 	double deadline = swim_time() + timeout;
-	struct ev_loop *loop = loop();
+	struct ev_loop *loop = swim_loop();
 	/*
 	 * There can be pending out of bound IO events, affecting
 	 * the result. For example, 'quit' messages, which are
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] [PATCH v2 2/5] swim: fix a 'use after free' in SWIM tests
  2019-06-08 10:31 [tarantool-patches] [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
@ 2019-06-08 10:31 ` Vladislav Shpilevoy
  2019-06-08 14:32   ` [tarantool-patches] " Konstantin Osipov
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 3/5] swim: allow to set triggers on member updates Vladislav Shpilevoy
                   ` (2 subsequent siblings)
  4 siblings, 1 reply; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

It is a miracle, but somehow it worked until I changed a couple
of places. Here objects stored in an rlist are freed, but not
deleted from the list. The list is reused after that.
---
 test/unit/swim_test_transport.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index c4a1dd774..334ac926e 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -85,6 +85,7 @@ swim_test_packet_new(const char *data, int size, const struct sockaddr_in *src,
 static inline void
 swim_test_packet_delete(struct swim_test_packet *p)
 {
+	rlist_del_entry(p, in_queue);
 	free(p);
 }
 
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] [PATCH v2 3/5] swim: allow to set triggers on member updates
  2019-06-08 10:31 [tarantool-patches] [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
@ 2019-06-08 10:31 ` Vladislav Shpilevoy
  2019-06-08 14:35   ` [tarantool-patches] " Konstantin Osipov
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
  2019-06-08 11:04 ` [tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
  4 siblings, 1 reply; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

SWIM is asynchronous by design. It means, that there are two
ways of getting updates of a member table and individual members:
polling and triggers. Polling is terrible - this is why libev
with select(), poll(), epoll(), kevent() have appeared. The only
acceptable solution is triggers.

This commit allows to set triggers on member updates.

Part of #4250
---
 src/lib/swim/swim.c         | 211 ++++++++++++++++++++++++++++++++----
 src/lib/swim/swim.h         |  47 +++++++-
 test/unit/swim.c            | 146 ++++++++++++++++++++++++-
 test/unit/swim.result       |  28 ++++-
 test/unit/swim_test_utils.c |  44 +++++++-
 test/unit/swim_test_utils.h |  12 ++
 6 files changed, 460 insertions(+), 28 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 46b76731d..00234d1df 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -37,6 +37,7 @@
 #include "msgpuck.h"
 #include "assoc.h"
 #include "sio.h"
+#include "trigger.h"
 #define HEAP_FORWARD_DECLARATION
 #include "salad/heap.h"
 
@@ -334,6 +335,19 @@ struct swim_member {
 	 * time.
 	 */
 	struct rlist in_dissemination_queue;
+	/**
+	 * Each time a member is updated, or created, or dropped,
+	 * it is added to an event queue. Members from this queue
+	 * are dispatched into user defined triggers.
+	 */
+	struct stailq_entry in_event_queue;
+	/**
+	 * Mask of events happened with this member since a
+	 * previous trigger invocation. Once the events are
+	 * delivered into a trigger, the mask is nullified and
+	 * starts collecting new events.
+	 */
+	enum swim_ev_mask events;
 	/**
 	 *
 	 *               Failure detection component
@@ -455,6 +469,17 @@ struct swim {
 	 * as long as the event TTD is non-zero.
 	 */
 	struct rlist dissemination_queue;
+	/**
+	 * Queue of updated, new, and dropped members to deliver
+	 * the events to triggers. Dropped members are also kept
+	 * here until they are handled by a trigger.
+	 */
+	struct stailq event_queue;
+	/**
+	 * List of triggers to call on each new, dropped, and
+	 * updated member.
+	 */
+	struct rlist on_member_event;
 	/**
 	 * Members to which a message should be sent next during
 	 * this round.
@@ -472,6 +497,13 @@ struct swim {
 	 * the beginning of each round.
 	 */
 	struct swim_member **shuffled;
+	/**
+	 * Fiber to serve member event triggers. This task is
+	 * being done in a separate fiber, because user triggers
+	 * can yield and libev callbacks, processing member
+	 * events, are not allowed to yield.
+	 */
+	struct fiber *event_handler;
 	/**
 	 * Single round step task. It is impossible to have
 	 * multiple round steps in the same SWIM instance at the
@@ -549,10 +581,41 @@ swim_register_event(struct swim *swim, struct swim_member *member)
  * change of its status, or incarnation, or both.
  */
 static void
-swim_on_member_update(struct swim *swim, struct swim_member *member)
+swim_on_member_update(struct swim *swim, struct swim_member *member,
+		      enum swim_ev_mask events)
 {
 	member->unacknowledged_pings = 0;
 	swim_register_event(swim, member);
+	/*
+	 * Member event should be delivered to triggers only if
+	 * there is at least one trigger.
+	 */
+	if (! rlist_empty(&swim->on_member_event)) {
+		/*
+		 * Member is referenced and added to a queue only
+		 * once. That moment can be detected when a first
+		 * event happens.
+		 */
+		if (member->events == 0 && events != 0) {
+			swim_member_ref(member);
+			stailq_add_tail_entry(&swim->event_queue, member,
+					      in_event_queue);
+			fiber_wakeup(swim->event_handler);
+		}
+		member->events |= events;
+	}
+}
+
+struct rlist *
+swim_trigger_list_on_member_event(struct swim *swim)
+{
+	return &swim->on_member_event;
+}
+
+bool
+swim_has_pending_events(struct swim *swim)
+{
+	return ! stailq_empty(&swim->event_queue);
 }
 
 /**
@@ -577,13 +640,17 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member,
 	 */
 	assert(member != swim->self);
 	if (member->incarnation < incarnation) {
-		member->status = new_status;
+		enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION;
+		if (new_status != member->status) {
+			events |= SWIM_EV_NEW_STATUS;
+			member->status = new_status;
+		}
 		member->incarnation = incarnation;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, events);
 	} else if (member->incarnation == incarnation &&
 		   member->status < new_status) {
 		member->status = new_status;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
 	}
 }
 
@@ -626,7 +693,7 @@ swim_update_member_payload(struct swim *swim, struct swim_member *member,
 	member->payload_size = payload_size;
 	member->payload_ttd = mh_size(swim->members);
 	member->is_payload_up_to_date = true;
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW_PAYLOAD);
 	return 0;
 }
 
@@ -734,7 +801,6 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 	mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
 	assert(rc != mh_end(swim->members));
 	mh_swim_table_del(swim->members, rc, NULL);
-	swim_cached_round_msg_invalidate(swim);
 	rlist_del_entry(member, in_round_queue);
 
 	/* Failure detection component. */
@@ -742,6 +808,7 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 		wait_ack_heap_delete(&swim->wait_ack_heap, member);
 
 	/* Dissemination component. */
+	swim_on_member_update(swim, member, SWIM_EV_DROP);
 	rlist_del_entry(member, in_dissemination_queue);
 
 	swim_member_delete(member);
@@ -805,7 +872,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		swim_ev_timer_again(swim_loop(), &swim->round_tick);
 
 	/* Dissemination component. */
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW);
 	if (payload_size >= 0 &&
 	    swim_update_member_payload(swim, member, payload,
 				       payload_size) != 0) {
@@ -1299,14 +1366,15 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 			if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
 				break;
 			m->status = MEMBER_SUSPECTED;
-			swim_on_member_update(swim, m);
+			swim_on_member_update(swim, m, SWIM_EV_NEW_STATUS);
 			if (swim_send_indirect_pings(swim, m) != 0)
 				diag_log();
 			break;
 		case MEMBER_SUSPECTED:
 			if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
 				m->status = MEMBER_DEAD;
-				swim_on_member_update(swim, m);
+				swim_on_member_update(swim, m,
+						      SWIM_EV_NEW_STATUS);
 			}
 			break;
 		case MEMBER_DEAD:
@@ -1332,7 +1400,7 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member,
 {
 	assert(! swim_inaddr_eq(&member->addr, addr));
 	member->addr = *addr;
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW_URI);
 }
 
 /**
@@ -1354,12 +1422,10 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
 	if (def->incarnation > member->incarnation) {
 		if (! swim_inaddr_eq(&def->addr, &member->addr))
 			swim_update_member_addr(swim, member, &def->addr);
-		if (def->payload_size >= 0) {
+		if (def->payload_size >= 0)
 			update_payload = true;
-		} else if (member->is_payload_up_to_date) {
+		else if (member->is_payload_up_to_date)
 			member->is_payload_up_to_date = false;
-			swim_on_member_update(swim, member);
-		}
 	} else if (! member->is_payload_up_to_date && def->payload_size >= 0) {
 		update_payload = true;
 	}
@@ -1430,7 +1496,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 	 */
 	if (self->incarnation < def->incarnation) {
 		self->incarnation = def->incarnation;
-		swim_on_member_update(swim, self);
+		swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	}
 	if (def->status != MEMBER_ALIVE &&
 	    def->incarnation == self->incarnation) {
@@ -1440,7 +1506,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 		 * with a bigger incarnation.
 		 */
 		self->incarnation++;
-		swim_on_member_update(swim, self);
+		swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	}
 	return 0;
 skip:
@@ -1532,7 +1598,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	if (def.incarnation == member->incarnation &&
 	    member->status != MEMBER_ALIVE) {
 		member->status = MEMBER_ALIVE;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
 	}
 
 	switch (def.type) {
@@ -1604,7 +1670,7 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
 		swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp);
 	} else if (tmp >= m->incarnation) {
 		m->incarnation = tmp + 1;
-		swim_on_member_update(swim, m);
+		swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION);
 	}
 	return 0;
 }
@@ -1668,6 +1734,55 @@ error:
 	diag_log();
 }
 
+/**
+ * Event handler. At this moment its only task is dispatching
+ * member events to user defined triggers. Generally, because
+ * SWIM is fully IO driven, that fiber should be used only for
+ * yielding tasks not related to SWIM core logic. For all the
+ * other tasks libev callbacks are ok. Unfortunately, yields are
+ * not allowed directly in libev callbacks, because they are
+ * invoked by a cord scheduler fiber prohibited for manual yields.
+ */
+static int
+swim_event_handler_f(va_list va)
+{
+	struct swim *s = va_arg(va, struct swim *);
+	struct swim_on_member_event_ctx ctx;
+	while (! fiber_is_cancelled()) {
+		if (stailq_empty(&s->event_queue)) {
+			fiber_yield();
+			continue;
+		}
+		/*
+		 * Can't be empty. SWIM deletes members from
+		 * event queue only on SWIM deletion, but then
+		 * the fiber would be stopped already.
+		 */
+		assert(! stailq_empty(&s->event_queue));
+		struct swim_member *m =
+			stailq_shift_entry(&s->event_queue, struct swim_member,
+					   in_event_queue);
+		/*
+		 * It is possible, that a member was added and
+		 * removed before firing a trigger. It happens,
+		 * if a previous event was being handled too
+		 * long, for example. There is a convention not to
+		 * show such easy riders.
+		 */
+		if ((m->events & SWIM_EV_NEW) == 0 ||
+		    (m->events & SWIM_EV_DROP) == 0) {
+			ctx.member = m;
+			ctx.events = m->events;
+			m->events = 0;
+			if (trigger_run(&s->on_member_event, (void *) &ctx))
+				diag_log();
+		}
+		swim_member_unref(m);
+	}
+	return 0;
+}
+
+
 struct swim *
 swim_new(void)
 {
@@ -1700,7 +1815,16 @@ swim_new(void)
 
 	/* Dissemination component. */
 	rlist_create(&swim->dissemination_queue);
-
+	rlist_create(&swim->on_member_event);
+	stailq_create(&swim->event_queue);
+	swim->event_handler = fiber_new("SWIM event handler",
+					swim_event_handler_f);
+	if (swim->event_handler == NULL) {
+		swim_delete(swim);
+		return NULL;
+	}
+	fiber_set_joinable(swim->event_handler, true);
+	fiber_start(swim->event_handler, swim);
 	return swim;
 }
 
@@ -1809,6 +1933,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		 * specified.
 		 */
 		addr = swim->scheduler.transport.addr;
+		fiber_set_name(swim->event_handler,
+			       tt_sprintf("SWIM event handler %d",
+					  swim_fd(swim)));
 	} else {
 		addr = swim->self->addr;
 	}
@@ -1828,7 +1955,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 
 	if (new_self != NULL) {
 		swim->self->status = MEMBER_LEFT;
-		swim_on_member_update(swim, swim->self);
+		swim_on_member_update(swim, swim->self, SWIM_EV_NEW_STATUS);
 		swim->self = new_self;
 	}
 	if (! swim_inaddr_eq(&addr, &swim->self->addr)) {
@@ -1866,7 +1993,7 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size)
 	if (swim_update_member_payload(swim, self, payload, payload_size) != 0)
 		return -1;
 	self->incarnation++;
-	swim_on_member_update(swim, self);
+	swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	return 0;
 }
 
@@ -1951,17 +2078,44 @@ swim_size(const struct swim *swim)
 	return mh_size(swim->members);
 }
 
+/**
+ * Cancel and wait finish of an event handler fiber. That
+ * operation is not inlined in the SWIM destructor, because there
+ * is one more place, when the handler should be stopped even
+ * before SWIM deletion - quit. Quit deletes the instance only
+ * when all the 'I left' messages are sent, but it happens in a
+ * libev callback in the scheduler fiber where it is impossible
+ * to yield. So to that moment the handler should be dead already.
+ */
+static inline void
+swim_kill_event_handler(struct swim *swim)
+{
+	struct fiber *f = swim->event_handler;
+	/*
+	 * Nullify so as not to keep pointer at a fiber when it is
+	 * reused.
+	 */
+	swim->event_handler = NULL;
+	fiber_wakeup(f);
+	fiber_cancel(f);
+	fiber_join(f);
+}
+
 void
 swim_delete(struct swim *swim)
 {
+	if (swim->event_handler != NULL)
+		swim_kill_event_handler(swim);
 	struct ev_loop *l = swim_loop();
 	swim_scheduler_destroy(&swim->scheduler);
 	swim_ev_timer_stop(l, &swim->round_tick);
 	swim_ev_timer_stop(l, &swim->wait_ack_tick);
+	struct swim_member *m, *tmp;
+	stailq_foreach_entry_safe(m, tmp, &swim->event_queue, in_event_queue)
+		swim_member_unref(m);
 	mh_int_t node;
 	mh_foreach(swim->members, node) {
-		struct swim_member *m =
-			*mh_swim_table_node(swim->members, node);
+		m = *mh_swim_table_node(swim->members, node);
 		rlist_del_entry(m, in_round_queue);
 		if (! heap_node_is_stray(&m->in_wait_ack_heap))
 			wait_ack_heap_delete(&swim->wait_ack_heap, m);
@@ -1975,6 +2129,7 @@ swim_delete(struct swim *swim)
 	swim_task_destroy(&swim->round_step_task);
 	wait_ack_heap_destroy(&swim->wait_ack_heap);
 	mh_swim_table_delete(swim->members);
+	trigger_destroy(&swim->on_member_event);
 	free(swim->shuffled);
 	free(swim);
 }
@@ -1991,6 +2146,11 @@ swim_quit_step_complete(struct swim_task *task,
 	(void) rc;
 	struct swim *swim = swim_by_scheduler(scheduler);
 	if (rlist_empty(&swim->round_queue)) {
+		/*
+		 * The handler should be dead - can't yield here,
+		 * it is the scheduler fiber.
+		 */
+		assert(swim->event_handler == NULL);
 		swim_delete(swim);
 		return;
 	}
@@ -2020,6 +2180,11 @@ void
 swim_quit(struct swim *swim)
 {
 	assert(swim_is_configured(swim));
+	/*
+	 * Kill the handler now. Later it will be impossible to do
+	 * from the scheduler fiber.
+	 */
+	swim_kill_event_handler(swim);
 	struct ev_loop *l = swim_loop();
 	swim_ev_timer_stop(l, &swim->round_tick);
 	swim_ev_timer_stop(l, &swim->wait_ack_tick);
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 6db12ba9e..a42ace7c6 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -40,6 +40,7 @@ extern "C" {
 #endif
 
 struct swim;
+struct rlist;
 struct tt_uuid;
 struct swim_iterator;
 struct swim_member;
@@ -63,7 +64,8 @@ enum swim_gc_mode {
 
 /**
  * Create a new SWIM instance. Do not bind to a port or set any
- * parameters. Allocation and initialization only.
+ * parameters. Allocation and initialization only. The function
+ * yields.
  */
 struct swim *
 swim_new(void);
@@ -124,7 +126,8 @@ swim_set_codec(struct swim *swim, enum crypto_algo algo, enum crypto_mode mode,
 
 /**
  * Stop listening and broadcasting messages, cleanup all internal
- * structures, free memory.
+ * structures, free memory. The function yields. Actual deletion
+ * happens after currently working triggers are done.
  */
 void
 swim_delete(struct swim *swim);
@@ -272,6 +275,46 @@ swim_member_unref(struct swim_member *member);
 bool
 swim_member_is_dropped(const struct swim_member *member);
 
+enum swim_ev_mask {
+	SWIM_EV_NEW             = 0b00000001,
+	SWIM_EV_NEW_STATUS      = 0b00000010,
+	SWIM_EV_NEW_URI         = 0b00000100,
+	SWIM_EV_NEW_INCARNATION = 0b00001000,
+	SWIM_EV_NEW_PAYLOAD     = 0b00010000,
+	/* Shortcut to check for any update. */
+	SWIM_EV_UPDATE          = 0b00011110,
+	SWIM_EV_DROP            = 0b00100000,
+};
+
+/** On member event trigger context. */
+struct swim_on_member_event_ctx {
+	/** New, dropped, or updated member. */
+	struct swim_member *member;
+	/** Mask of happened events. */
+	enum swim_ev_mask events;
+};
+
+/**
+ * A list of member event processing triggers. A couple of words
+ * about such a strange API, instead of something like
+ * "add_trigger", "drop_trigger". A main motivation is that some
+ * functions need a whole trigger list. For example, a function
+ * adding Lua functions as triggers. At the time of this writing
+ * it was lbox_trigger_reset. There is a convention, that any
+ * Tarantool trigger exposed to Lua should provide a way to add
+ * one, drop one, replace one, return all - lbox_trigger_reset
+ * does all of this.
+ */
+struct rlist *
+swim_trigger_list_on_member_event(struct swim *swim);
+
+/**
+ * Check if a SWIM instance has pending events. Is not a public
+ * one, used by tests.
+ */
+bool
+swim_has_pending_events(struct swim *swim);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 2ba9820d8..0e33d691c 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -41,6 +41,7 @@
 #include "swim_test_transport.h"
 #include "swim_test_ev.h"
 #include "swim_test_utils.h"
+#include "trigger.h"
 #include <fcntl.h>
 
 /**
@@ -950,10 +951,152 @@ swim_test_slow_net(void)
 	swim_finish_test();
 }
 
+struct trigger_ctx {
+	int counter;
+	bool is_deleted;
+	bool need_sleep;
+	struct fiber *f;
+	struct swim_on_member_event_ctx ctx;
+};
+
+static void
+swim_on_member_event_save(struct trigger *t, void *event)
+{
+	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
+	++c->counter;
+	if (c->ctx.member != NULL)
+		swim_member_unref(c->ctx.member);
+	c->ctx = *((struct swim_on_member_event_ctx *) event);
+	swim_member_ref(c->ctx.member);
+}
+
+static void
+swim_on_member_event_yield(struct trigger *t, void *event)
+{
+	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
+	++c->counter;
+	c->f = fiber();
+	while (c->need_sleep)
+		fiber_yield();
+}
+
+static void
+swim_trigger_destroy_cb(struct trigger *t)
+{
+	((struct trigger_ctx *) t->data)->is_deleted = true;
+}
+
+static int
+swim_cluster_delete_f(va_list ap)
+{
+	struct swim_cluster *c = (struct swim_cluster *)
+		va_arg(ap, struct swim_cluster *);
+	swim_cluster_delete(c);
+	return 0;
+}
+
+static void
+swim_test_triggers(void)
+{
+	swim_start_test(21);
+	struct swim_cluster *cluster = swim_cluster_new(2);
+	swim_cluster_set_ack_timeout(cluster, 1);
+	struct trigger_ctx tctx, tctx2;
+	memset(&tctx, 0, sizeof(tctx));
+	memset(&tctx2, 0, sizeof(tctx2));
+	struct trigger *t1 = (struct trigger *) malloc(sizeof(*t1));
+	assert(t1 != NULL);
+	trigger_create(t1, swim_on_member_event_save, (void *) &tctx,
+		       swim_trigger_destroy_cb);
+
+	/* Skip 'new self' events. */
+	swim_cluster_run_triggers(cluster);
+
+	struct swim *s1 = swim_cluster_member(cluster, 0);
+	trigger_add(swim_trigger_list_on_member_event(s1), t1);
+	swim_cluster_interconnect(cluster, 0, 1);
+	swim_cluster_run_triggers(cluster);
+
+	is(tctx.counter, 1, "trigger is fired");
+	ok(! tctx.is_deleted, "is not deleted");
+	is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1),
+	   "ctx.member is set");
+	is(tctx.ctx.events, SWIM_EV_NEW, "ctx.events is set");
+
+	swim_run_for(1);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 2, "payload is delivered, trigger caught that");
+	is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1),
+	   "S1 got S2' payload");
+	is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD, "mask says that");
+
+	swim_cluster_member_set_payload(cluster, 0, "123", 3);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 3, "self payload is updated");
+	is(tctx.ctx.member, swim_self(s1), "self is set as a member");
+	is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_INCARNATION,
+	   "both incarnation and payload events are presented");
+
+	swim_cluster_set_drop(cluster, 1, 100);
+	fail_if(swim_cluster_wait_status(cluster, 0, 1,
+					 MEMBER_SUSPECTED, 3) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 4, "suspicion fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status suspected");
+
+	fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 3) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 5, "death fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status dead");
+
+	fail_if(swim_cluster_wait_status(cluster, 0, 1,
+					 swim_member_status_MAX, 2) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 6, "drop fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_DROP, "status dropped");
+	is(swim_cluster_member_view(cluster, 0, 1), NULL,
+	   "dropped member is not presented in the member table");
+	isnt(tctx.ctx.member, NULL, "but is in the event context");
+	/*
+	 * There is a complication about yields. If a trigger
+	 * yields, other triggers wait for its finish. And all
+	 * the triggers should be ready to SWIM deletion in the
+	 * middle of an event processing. SWIM object should not
+	 * be deleted, until all the triggers are done.
+	 */
+	struct trigger *t2 = (struct trigger *) malloc(sizeof(*t2));
+	assert(t2 != NULL);
+	tctx2.need_sleep = true;
+	trigger_create(t2, swim_on_member_event_yield, (void *) &tctx2, NULL);
+	trigger_add(swim_trigger_list_on_member_event(s1), t2);
+	swim_cluster_add_link(cluster, 0, 1);
+	swim_cluster_run_triggers(cluster);
+	is(tctx2.counter, 1, "yielding trigger is fired");
+	is(tctx.counter, 6, "non-yielding still is not");
+
+	struct fiber *async_delete_fiber =
+		fiber_new("async delete", swim_cluster_delete_f);
+	fiber_start(async_delete_fiber, cluster);
+	ok(! tctx.is_deleted, "trigger is not deleted until all currently "\
+	   "sleeping triggers are finished");
+	tctx2.need_sleep = false;
+	fiber_wakeup(tctx2.f);
+	while (! tctx.is_deleted)
+		fiber_sleep(0);
+	note("now all the triggers are done and deleted");
+
+	free(t1);
+	free(t2);
+	if (tctx.ctx.member != NULL)
+		swim_member_unref(tctx.ctx.member);
+
+	swim_finish_test();
+}
+
 static int
 main_f(va_list ap)
 {
-	swim_start_test(20);
+	swim_start_test(21);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -979,6 +1122,7 @@ main_f(va_list ap)
 	swim_test_indirect_ping();
 	swim_test_encryption();
 	swim_test_slow_net();
+	swim_test_triggers();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 25fdb8833..3ebfe7ea0 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..20
+1..21
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -200,4 +200,30 @@ ok 19 - subtests
     # slow network leads to idle round steps, they should not produce a new message
 ok 20 - subtests
 	*** swim_test_slow_net: done ***
+	*** swim_test_triggers ***
+    1..21
+    ok 1 - trigger is fired
+    ok 2 - is not deleted
+    ok 3 - ctx.member is set
+    ok 4 - ctx.events is set
+    ok 5 - payload is delivered, trigger caught that
+    ok 6 - S1 got S2' payload
+    ok 7 - mask says that
+    ok 8 - self payload is updated
+    ok 9 - self is set as a member
+    ok 10 - both incarnation and payload events are presented
+    ok 11 - suspicion fired a trigger
+    ok 12 - status suspected
+    ok 13 - death fired a trigger
+    ok 14 - status dead
+    ok 15 - drop fired a trigger
+    ok 16 - status dropped
+    ok 17 - dropped member is not presented in the member table
+    ok 18 - but is in the event context
+    ok 19 - yielding trigger is fired
+    ok 20 - non-yielding still is not
+    ok 21 - trigger is not deleted until all currently sleeping triggers are finished
+    # now all the triggers are done and deleted
+ok 21 - subtests
+	*** swim_test_triggers: done ***
 	*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index f72fa2450..463c62390 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -36,6 +36,7 @@
 #include "trivia/util.h"
 #include "fiber.h"
 #include "msgpuck.h"
+#include "trigger.h"
 
 /**
  * Drop rate packet filter to drop packets with a certain
@@ -191,12 +192,32 @@ swim_cluster_id_to_uri(char *buffer, int id)
 	sprintf(buffer, "127.0.0.1:%d", id + 1);
 }
 
+/**
+ * A trigger to check correctness of event context, and ability
+ * to yield.
+ */
+void
+swim_test_event_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct swim_on_member_event_ctx *ctx =
+		(struct swim_on_member_event_ctx *) event;
+	assert(ctx->events != 0);
+	assert((ctx->events & SWIM_EV_NEW) == 0 ||
+	       (ctx->events & SWIM_EV_DROP) == 0);
+	fiber_sleep(0);
+}
+
 /** Create a SWIM cluster node @a n with a 0-based @a id. */
 static inline void
 swim_node_create(struct swim_node *n, int id)
 {
 	n->swim = swim_new();
 	assert(n->swim != NULL);
+	struct trigger *t = (struct trigger *) malloc(sizeof(*t));
+	trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free);
+	trigger_add(swim_trigger_list_on_member_event(n->swim), t);
+
 	char uri[128];
 	swim_cluster_id_to_uri(uri, id);
 	n->uuid = uuid_nil;
@@ -303,7 +324,7 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id)
 			       swim_member_uri(from), swim_member_uuid(from));
 }
 
-static const struct swim_member *
+const struct swim_member *
 swim_cluster_member_view(struct swim_cluster *cluster, int node_id,
 			 int member_id)
 {
@@ -615,6 +636,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 	 * whatsoever.
 	 */
 	swim_test_transport_do_loop_step(loop);
+	if (cluster != NULL)
+		swim_cluster_run_triggers(cluster);
 	while (! check(cluster, data)) {
 		if (swim_time() >= deadline)
 			return -1;
@@ -625,6 +648,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 		 * too.
 		 */
 		swim_test_transport_do_loop_step(loop);
+		if (cluster != NULL)
+			swim_cluster_run_triggers(cluster);
 	}
 	return 0;
 }
@@ -868,6 +893,23 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
 				 swim_loop_check_member_everywhere, &t);
 }
 
+void
+swim_cluster_run_triggers(struct swim_cluster *cluster)
+{
+	bool has_events;
+	do {
+		has_events = false;
+		struct swim_node *n = cluster->node;
+		for (int i = 0; i < cluster->size; ++i, ++n) {
+			if (n->swim != NULL &&
+			    swim_has_pending_events(n->swim)) {
+				has_events = true;
+				fiber_sleep(0);
+			}
+		}
+	} while (has_events);
+}
+
 bool
 swim_error_check_match(const char *msg)
 {
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 0e6f29d80..fde84e39b 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -168,6 +168,14 @@ int
 swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
 				const char *payload, int size);
 
+/**
+ * Get a member object stored in a SWIM node @a node_id and
+ * showing a known state of a SWIM node @a member_id.
+ */
+const struct swim_member *
+swim_cluster_member_view(struct swim_cluster *cluster, int node_id,
+			 int member_id);
+
 /**
  * Check if in the cluster every instance knowns the about other
  * instances.
@@ -229,6 +237,10 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
 				     int member_id, const char *payload,
 				     int payload_size, double timeout);
 
+/** Run all pending triggers in the cluster. */
+void
+swim_cluster_run_triggers(struct swim_cluster *cluster);
+
 /** Process SWIM events for @a duration fake seconds. */
 void
 swim_run_for(double duration);
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] [PATCH v2 4/5] swim: call swim:new/delete via Lua C, not via FFI
  2019-06-08 10:31 [tarantool-patches] [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
                   ` (2 preceding siblings ...)
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 3/5] swim: allow to set triggers on member updates Vladislav Shpilevoy
@ 2019-06-08 10:31 ` Vladislav Shpilevoy
  2019-06-08 14:35   ` [tarantool-patches] " Konstantin Osipov
  2019-06-08 11:04 ` [tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
  4 siblings, 1 reply; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

These function are going to yield in scope of #4250, because
swim:new() will start a fiber, while swim:delete() cancels and
gives it a control.

Needed for #4250
---
 extra/exports      |  2 --
 src/CMakeLists.txt |  3 +-
 src/lua/init.c     |  2 ++
 src/lua/swim.c     | 81 ++++++++++++++++++++++++++++++++++++++++++++++
 src/lua/swim.h     | 34 +++++++++++++++++++
 src/lua/swim.lua   | 13 +++-----
 6 files changed, 123 insertions(+), 12 deletions(-)
 create mode 100644 src/lua/swim.c
 create mode 100644 src/lua/swim.h

diff --git a/extra/exports b/extra/exports
index 0b1102d03..b8c42c0df 100644
--- a/extra/exports
+++ b/extra/exports
@@ -89,12 +89,10 @@ crypto_stream_delete
 
 lua_static_aligned_alloc
 
-swim_new
 swim_is_configured
 swim_cfg
 swim_set_payload
 swim_set_codec
-swim_delete
 swim_add_member
 swim_remove_member
 swim_probe_member
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 68674d06a..64ea95c17 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -117,6 +117,7 @@ set (server_sources
      lua/info.c
      lua/string.c
      lua/buffer.c
+     lua/swim.c
      ${lua_sources}
      ${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc
      ${PROJECT_SOURCE_DIR}/third_party/lua-yaml/b64.c
@@ -169,7 +170,7 @@ target_link_libraries(server core coll http_parser bit uri uuid swim swim_udp
 
 # Rule of thumb: if exporting a symbol from a static library, list the
 # library here.
-set (reexport_libraries server core misc bitset csv swim
+set (reexport_libraries server core misc bitset csv swim swim_udp swim_ev
      ${LUAJIT_LIBRARIES} ${MSGPUCK_LIBRARIES} ${ICU_LIBRARIES})
 
 set (common_libraries
diff --git a/src/lua/init.c b/src/lua/init.c
index 5ddc5a4d8..d8b3501be 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -58,6 +58,7 @@
 #include "lua/fio.h"
 #include "lua/httpc.h"
 #include "lua/utf8.h"
+#include "lua/swim.h"
 #include "digest.h"
 #include <small/ibuf.h>
 
@@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
 	tarantool_lua_socket_init(L);
 	tarantool_lua_pickle_init(L);
 	tarantool_lua_digest_init(L);
+	tarantool_lua_swim_init(L);
 	luaopen_http_client_driver(L);
 	lua_pop(L, 1);
 	luaopen_msgpack(L);
diff --git a/src/lua/swim.c b/src/lua/swim.c
new file mode 100644
index 000000000..3b9e229be
--- /dev/null
+++ b/src/lua/swim.c
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim/swim.h"
+#include "trigger.h"
+#include "diag.h"
+#include "lua/utils.h"
+
+static uint32_t ctid_swim_ptr;
+
+/**
+ * Create a new SWIM instance. SWIM is not created via FFI,
+ * because this operation yields.
+ * @retval 1 Success. A SWIM instance pointer is on the stack.
+ * @retval 2 Error. Nil and an error object are pushed.
+ */
+static int
+lua_swim_new(struct lua_State *L)
+{
+	struct swim *s = swim_new();
+	*(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s;
+	if (s != NULL)
+		return 1;
+	luaT_pusherror(L, diag_last_error(diag_get()));
+	return 2;
+}
+
+/**
+ * Delete a SWIM instance. SWIM is not deleted via FFI, because
+ * this operation yields.
+ */
+static int
+lua_swim_delete(struct lua_State *L)
+{
+	uint32_t ctypeid;
+	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+	swim_delete(s);
+	return 0;
+}
+
+void
+tarantool_lua_swim_init(struct lua_State *L)
+{
+	luaL_cdef(L, "struct swim;");
+	ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
+
+	static const struct luaL_Reg lua_swim_internal_methods [] = {
+		{"swim_new", lua_swim_new},
+		{"swim_delete", lua_swim_delete},
+		{NULL, NULL}
+	};
+	luaL_register_module(L, "swim", lua_swim_internal_methods);
+	lua_pop(L, 1);
+}
diff --git a/src/lua/swim.h b/src/lua/swim.h
new file mode 100644
index 000000000..b60bb76b7
--- /dev/null
+++ b/src/lua/swim.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+struct lua_State;
+
+void
+tarantool_lua_swim_init(struct lua_State *L);
diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index 4893d5767..527299284 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -3,6 +3,7 @@ local uuid = require('uuid')
 local buffer = require('buffer')
 local msgpack = require('msgpack')
 local crypto = require('crypto')
+local internal = require('swim')
 
 ffi.cdef[[
     struct swim;
@@ -23,9 +24,6 @@ ffi.cdef[[
         MEMBER_LEFT,
     };
 
-    struct swim *
-    swim_new(void);
-
     bool
     swim_is_configured(const struct swim *swim);
 
@@ -41,9 +39,6 @@ ffi.cdef[[
     swim_set_codec(struct swim *swim, enum crypto_algo algo,
                    enum crypto_mode mode, const char *key, int key_size);
 
-    void
-    swim_delete(struct swim *swim);
-
     int
     swim_add_member(struct swim *swim, const char *uri,
                     const struct tt_uuid *uuid);
@@ -466,7 +461,7 @@ local swim_mt_deleted = {
 --
 local function swim_delete(s)
     local ptr = swim_check_instance(s, 'swim:delete')
-    capi.swim_delete(ffi.gc(ptr, nil))
+    internal.swim_delete(ffi.gc(ptr, nil))
     s.ptr = nil
     setmetatable(s, swim_mt_deleted)
 end
@@ -821,11 +816,11 @@ local cache_table_mt = { __mode = 'v' }
 -- provided.
 --
 local function swim_new(cfg)
-    local ptr = capi.swim_new()
+    local ptr = internal.swim_new()
     if ptr == nil then
         return nil, box.error.last()
     end
-    ffi.gc(ptr, capi.swim_delete)
+    ffi.gc(ptr, internal.swim_delete)
     local s = setmetatable({
         ptr = ptr,
         cfg = setmetatable({index = {}}, swim_cfg_not_configured_mt),
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update
  2019-06-08 10:31 [tarantool-patches] [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
                   ` (3 preceding siblings ...)
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
@ 2019-06-08 11:04 ` Vladislav Shpilevoy
  2019-06-08 11:06   ` Vladislav Shpilevoy
  4 siblings, 1 reply; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 11:04 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

From 9e33c61c00eefab6866225ed7ab904f84f65a65f Mon Sep 17 00:00:00 2001
Message-Id: <9e33c61c00eefab6866225ed7ab904f84f65a65f.1559989748.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1559989748.git.v.shpilevoy@tarantool.org>
References: <cover.1559989748.git.v.shpilevoy@tarantool.org>
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date: Sat, 1 Jun 2019 20:40:21 +0200
Subject: [PATCH v2 5/5] swim: expose Lua triggers on member events

SWIM as a monitoring module is hard to use without an ability to subscribe
on events. Otherwise a user should have polled a SWIM
member table for updates - it would be too inefficient.

This commit exposes an ability to hang Lua triggers.

Closes #4250

@TarantoolBot document
Title: SWIM: swim:on_member_event

Now a user can hang triggers on member table updates. There is a
function for that, which can be used in one of several ways:
```Lua
swim:on_member_event(new_trigger[, ctx])
```
Add a new trigger on member table update. The function
`new_trigger` will be called on each new member appearance, an
existing member drop, and update. It should take 3 arguments:
first is an updated SWIM member, second is an events object,
third is `ctx` passed as is.

Events object has methods to help a user to determine what event
has happened.
```Lua
local function on_event(member, events, ctx)
    if events:is_new() then
        ...
    elseif events:is_drop() then
        ...
    end

    if events:is_update() then
        if events:is_new_status() then
            ...
        elseif events:is_new_uri() then
            ...
        elseif events:is_new_incarnation() then
            ...
        elseif events:is_new_payload() then
            ...
        end
    end
end

s:on_member_event(on_event, ctx)
```
Note, that multiple events can happen simultaneously. A user
should be ready to that. Additionally, 'new' and 'drop' never
happen together. But they can happen with 'update', easily.
Especially if there are lots of events, and triggers work too
slow. Then a member can be added and updated after a while, but
still does not reach a trigger.

A remarkable case is 'new' + 'new payload'. This case does not
correlate with triggers speed. The thing is that payload absence
and payload of size 0 are not the same. And sometimes is happens,
that a member is added without a payload. For example, a ping
was received - pings do not carry payload. In such a case the
missed payload is received later eventually. If that matters for
a user's application, it should be ready to that: 'new' does not
mean, that the member already has a payload, and payload size
says nothing about its presence or absence.

Second usage case:
```Lua
swim:on_member_event(nil, old_trigger)
```
Drop an old trigger.

Third usage case:
```Lua
swim:on_member_event(new_trigger, old_trigger[, ctx])
```
Replace an existing trigger in-place, with keeping its position
in the trigger list.

Fourth usage case:
```Lua
swim:on_member_event()
```
Get a list of triggers.

When drop or replace a trigger, a user should be attentive - the
following code does not work:
```Lua
tr = function() ... end
-- Add a trigger.
s:on_member_event(tr)
...
-- Drop a trigger.
s:on_member_event(nil, tr)
```
The last line, if be precise. This is because SWIM wraps user
triggers with an internal closure for parameters preprocessing.
To drop a trigger a user should save result of the first
on_member_event() call.

This code works:
```Lua
tr = function() ... end
-- Add a trigger.
tr_id = s:on_member_event(tr)
...
-- Drop a trigger.
s:on_member_event(nil, tr_id)
```

The triggers are executed one by one in a separate fiber. And
they can yield. These two facts mean that if one trigger sleeps
too long - other triggers wait. It does not block SWIM from doing
its routine operations, but block other triggers.

The last point to remember is that if a member was added and
dropped before its appearance has reached a trigger, then such
a member does not fire triggers at all. A user will not notice
that easy rider member.
---
 src/lua/swim.c          |  27 ++++-
 src/lua/swim.lua        |  90 ++++++++++++++
 test/swim/swim.result   | 263 ++++++++++++++++++++++++++++++++++++++++
 test/swim/swim.test.lua |  85 +++++++++++++
 4 files changed, 464 insertions(+), 1 deletion(-)

diff --git a/src/lua/swim.c b/src/lua/swim.c
index 3b9e229be..c3a0a9911 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -33,8 +33,31 @@
 #include "diag.h"
 #include "lua/utils.h"
 
+static uint32_t ctid_swim_member_ptr;
 static uint32_t ctid_swim_ptr;
 
+/** Push member event context into a Lua stack. */
+static int
+lua_swim_member_event_push(struct lua_State *L, void *event)
+{
+	struct swim_on_member_event_ctx *ctx =
+		(struct swim_on_member_event_ctx *) event;
+	*(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) =
+		ctx->member;
+	lua_pushinteger(L, ctx->events);
+	return 2;
+}
+
+/** Set or/and delete a trigger on a SWIM member event. */
+static int
+lua_swim_on_member_event(struct lua_State *L)
+{
+	uint32_t ctypeid;
+	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+	return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_event(s),
+				  lua_swim_member_event_push, NULL);
+}
+
 /**
  * Create a new SWIM instance. SWIM is not created via FFI,
  * because this operation yields.
@@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L)
 void
 tarantool_lua_swim_init(struct lua_State *L)
 {
-	luaL_cdef(L, "struct swim;");
+	luaL_cdef(L, "struct swim_member; struct swim;");
+	ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *");
 	ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
 
 	static const struct luaL_Reg lua_swim_internal_methods [] = {
 		{"swim_new", lua_swim_new},
 		{"swim_delete", lua_swim_delete},
+		{"swim_on_member_event", lua_swim_on_member_event},
 		{NULL, NULL}
 	};
 	luaL_register_module(L, "swim", lua_swim_internal_methods);
diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index 527299284..a7d5caab3 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -24,6 +24,16 @@ ffi.cdef[[
         MEMBER_LEFT,
     };
 
+    enum swim_ev_mask {
+        SWIM_EV_NEW             = 0b00000001,
+        SWIM_EV_NEW_STATUS      = 0b00000010,
+        SWIM_EV_NEW_URI         = 0b00000100,
+        SWIM_EV_NEW_INCARNATION = 0b00001000,
+        SWIM_EV_NEW_PAYLOAD     = 0b00010000,
+        SWIM_EV_UPDATE          = 0b00011110,
+        SWIM_EV_DROP            = 0b00100000,
+    };
+
     bool
     swim_is_configured(const struct swim *swim);
 
@@ -689,6 +699,84 @@ local function swim_pairs(s)
     return swim_pairs_next, {swim = s, iterator = iterator}, nil
 end
 
+local swim_member_event_index = {
+    is_new = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0
+    end,
+    is_drop = function(self)
+        return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0
+    end,
+    is_update = function(self)
+        return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0
+    end,
+    is_new_status = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0
+    end,
+    is_new_uri = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0
+    end,
+    is_new_incarnation = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0
+    end,
+    is_new_payload = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0
+    end,
+}
+
+local swim_member_event_mt = {
+    __index = swim_member_event_index,
+    __serialize = function(self)
+        local res = {}
+        for k, v in pairs(swim_member_event_index) do
+            v = v(self)
+            if v then
+                res[k] = v
+            end
+        end
+        return res
+    end,
+}
+
+--
+-- Create a closure function for preprocessing raw SWIM member
+-- event trigger parameters.
+-- @param s SWIM instance.
+-- @param callback User functions to call.
+-- @param ctx An optional parameter for @a callback passed as is.
+-- @return A function to set as a trigger.
+--
+local function swim_on_member_event_new(s, callback, ctx)
+    return function(member_ptr, event_mask)
+        local m = swim_wrap_member(s, member_ptr)
+        local event = setmetatable({event_mask}, swim_member_event_mt)
+        return callback(m, event, ctx)
+    end
+end
+
+--
+-- Add or/and delete a trigger on member event. Possible usages:
+--
+-- * on_member_event(new[, ctx]) - add a new trigger. It should
+--   accept 3 arguments: an updated member, an events object, an
+--   optional @a ctx parameter passed as is.
+--
+-- * on_member_event(new, old[, ctx]) - add a new trigger @a new
+--   if not nil, in place of @a old trigger.
+--
+-- * on_member_event() - get a list of triggers.
+--
+local function swim_on_member_event(s, new, old, ctx)
+    local ptr = swim_check_instance(s, 'swim:on_member_event')
+    if type(old) ~= 'function' then
+        ctx = old
+        old = nil
+    end
+    if new ~= nil then
+        new = swim_on_member_event_new(s, new, ctx)
+    end
+    return internal.swim_on_member_event(ptr, new, old)
+end
+
 --
 -- Normal metatable of a configured SWIM instance.
 --
@@ -708,6 +796,7 @@ local swim_mt = {
         set_payload = swim_set_payload,
         set_codec = swim_set_codec,
         pairs = swim_pairs,
+        on_member_event = swim_on_member_event,
     },
     __serialize = swim_serialize
 }
@@ -800,6 +889,7 @@ local swim_not_configured_mt = {
         delete = swim_delete,
         is_configured = swim_is_configured,
         set_codec = swim_set_codec,
+        on_member_event = swim_on_member_event,
     },
     __serialize = swim_serialize
 }
diff --git a/test/swim/swim.result b/test/swim/swim.result
index 436d4e579..0196837c8 100644
--- a/test/swim/swim.result
+++ b/test/swim/swim.result
@@ -1216,6 +1216,269 @@ s1:delete()
 s2:delete()
 ---
 ...
+--
+-- gh-4250: allow to set triggers on a new member appearance, old
+-- member drop, member update.
+--
+s1 = swim.new()
+---
+...
+s1.on_member_event()
+---
+- error: 'builtin/swim.lua:<line>: swim:on_member_event: first argument is not a SWIM
+    instance'
+...
+m_list = {}
+---
+...
+e_list = {}
+---
+...
+ctx_list = {}
+---
+...
+f = nil
+---
+...
+f_need_sleep = false
+---
+...
+_ = test_run:cmd("setopt delimiter ';'")
+---
+...
+t_save_event = function(m, e, ctx)
+    table.insert(m_list, m)
+    table.insert(e_list, e)
+    table.insert(ctx_list, ctx)
+end;
+---
+...
+t_yield = function(m, e, ctx)
+    f = fiber.self()
+    t_save_event(m, e, ctx)
+    while f_need_sleep do fiber.sleep(10000) end
+end;
+---
+...
+_ = test_run:cmd("setopt delimiter ''");
+---
+...
+t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
+---
+...
+-- Not equal, because SWIM wraps user triggers with a closure for
+-- context preprocessing.
+t_save_event_id ~= t_save_event
+---
+- true
+...
+s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
+---
+- true
+...
+while #m_list < 1 do fiber.sleep(0) end
+---
+...
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 0
+...
+e_list
+---
+- - is_new_payload: true
+    is_new_uri: true
+    is_new: true
+    is_update: true
+...
+ctx_list
+---
+- - ctx
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+t_yield_id = s1:on_member_event(t_yield, 'ctx2')
+---
+...
+f_need_sleep = true
+---
+...
+s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
+---
+...
+s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
+---
+- true
+...
+while s1:size() ~= 2 do fiber.sleep(0.01) end
+---
+...
+-- Only first trigger worked. Second is waiting, because first
+-- sleeps.
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+...
+e_list
+---
+- - is_new_payload: true
+    is_new: true
+    is_update: true
+...
+ctx_list
+---
+- - ctx2
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+-- But it does not prevent normal SWIM operation.
+s1:set_payload('payload')
+---
+- true
+...
+while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
+---
+...
+s2:member_by_uuid(s1:self():uuid()):payload()
+---
+- payload
+...
+f_need_sleep = false
+---
+...
+fiber.wakeup(f)
+---
+...
+while #m_list ~= 3 do fiber.sleep(0.01) end
+---
+...
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+  - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 2
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 8
+  - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 2
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 8
+...
+e_list
+---
+- - is_new_payload: true
+    is_new: true
+    is_update: true
+  - is_new_payload: true
+    is_update: true
+    is_new_incarnation: true
+  - is_new_payload: true
+    is_update: true
+    is_new_incarnation: true
+...
+ctx_list
+---
+- - ctx
+  - ctx2
+  - ctx
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+#s1:on_member_event()
+---
+- 2
+...
+s1:on_member_event(nil, t_yield_id)
+---
+...
+s2:quit()
+---
+...
+while s1:size() ~= 1 do fiber.sleep(0.01) end
+---
+...
+-- Process event.
+fiber.sleep(0)
+---
+...
+-- Two events - status update to 'left', and 'drop'.
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: left
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+  - uri: 127.0.0.1:<port>
+    status: left
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+...
+e_list
+---
+- - is_new_status: true
+    is_update: true
+  - is_drop: true
+...
+ctx_list
+---
+- - ctx
+  - ctx
+...
+m = m_list[1]
+---
+...
+-- Cached member table works even when a member is deleted.
+m_list[1] == m_list[2]
+---
+- true
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+s1:on_member_event(nil, t_save_event_id)
+---
+...
+s1:add_member({uuid = m:uuid(), uri = m:uri()})
+---
+- true
+...
+fiber.sleep(0)
+---
+...
+-- No events - all the triggers are dropped.
+m_list
+---
+- []
+...
+e_list
+---
+- []
+...
+ctx_list
+---
+- []
+...
+s1:delete()
+---
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
index a3eac9b46..458e349e0 100644
--- a/test/swim/swim.test.lua
+++ b/test/swim/swim.test.lua
@@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid())
 s1:delete()
 s2:delete()
 
+--
+-- gh-4250: allow to set triggers on a new member appearance, old
+-- member drop, member update.
+--
+s1 = swim.new()
+s1.on_member_event()
+
+m_list = {}
+e_list = {}
+ctx_list = {}
+f = nil
+f_need_sleep = false
+_ = test_run:cmd("setopt delimiter ';'")
+t_save_event = function(m, e, ctx)
+    table.insert(m_list, m)
+    table.insert(e_list, e)
+    table.insert(ctx_list, ctx)
+end;
+t_yield = function(m, e, ctx)
+    f = fiber.self()
+    t_save_event(m, e, ctx)
+    while f_need_sleep do fiber.sleep(10000) end
+end;
+_ = test_run:cmd("setopt delimiter ''");
+t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
+-- Not equal, because SWIM wraps user triggers with a closure for
+-- context preprocessing.
+t_save_event_id ~= t_save_event
+
+s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
+while #m_list < 1 do fiber.sleep(0) end
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+
+t_yield_id = s1:on_member_event(t_yield, 'ctx2')
+f_need_sleep = true
+s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
+s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
+while s1:size() ~= 2 do fiber.sleep(0.01) end
+-- Only first trigger worked. Second is waiting, because first
+-- sleeps.
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+-- But it does not prevent normal SWIM operation.
+s1:set_payload('payload')
+while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
+s2:member_by_uuid(s1:self():uuid()):payload()
+
+f_need_sleep = false
+fiber.wakeup(f)
+while #m_list ~= 3 do fiber.sleep(0.01) end
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+#s1:on_member_event()
+
+s1:on_member_event(nil, t_yield_id)
+s2:quit()
+while s1:size() ~= 1 do fiber.sleep(0.01) end
+-- Process event.
+fiber.sleep(0)
+-- Two events - status update to 'left', and 'drop'.
+m_list
+e_list
+ctx_list
+m = m_list[1]
+-- Cached member table works even when a member is deleted.
+m_list[1] == m_list[2]
+m_list = {} e_list = {} ctx_list = {}
+
+s1:on_member_event(nil, t_save_event_id)
+s1:add_member({uuid = m:uuid(), uri = m:uri()})
+fiber.sleep(0)
+-- No events - all the triggers are dropped.
+m_list
+e_list
+ctx_list
+
+s1:delete()
+
 test_run:cmd("clear filter")
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update
  2019-06-08 11:04 ` [tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
@ 2019-06-08 11:06   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 11:06 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

Sorry, freelists have problems with word 'subscribe'
be at the beginning of a line. So I sent the email manually
and moved 'subscribe' on the previous line.

On 08/06/2019 14:04, Vladislav Shpilevoy wrote:
> From 9e33c61c00eefab6866225ed7ab904f84f65a65f Mon Sep 17 00:00:00 2001
> Message-Id: <9e33c61c00eefab6866225ed7ab904f84f65a65f.1559989748.git.v.shpilevoy@tarantool.org>
> In-Reply-To: <cover.1559989748.git.v.shpilevoy@tarantool.org>
> References: <cover.1559989748.git.v.shpilevoy@tarantool.org>
> From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
> Date: Sat, 1 Jun 2019 20:40:21 +0200
> Subject: [PATCH v2 5/5] swim: expose Lua triggers on member events
> 
> SWIM as a monitoring module is hard to use without an ability to subscribe
> on events. Otherwise a user should have polled a SWIM
> member table for updates - it would be too inefficient.
> 
> This commit exposes an ability to hang Lua triggers.
> 
> Closes #4250
> 
> @TarantoolBot document
> Title: SWIM: swim:on_member_event
> 
> Now a user can hang triggers on member table updates. There is a
> function for that, which can be used in one of several ways:
> ```Lua
> swim:on_member_event(new_trigger[, ctx])
> ```
> Add a new trigger on member table update. The function
> `new_trigger` will be called on each new member appearance, an
> existing member drop, and update. It should take 3 arguments:
> first is an updated SWIM member, second is an events object,
> third is `ctx` passed as is.
> 
> Events object has methods to help a user to determine what event
> has happened.
> ```Lua
> local function on_event(member, events, ctx)
>     if events:is_new() then
>         ...
>     elseif events:is_drop() then
>         ...
>     end
> 
>     if events:is_update() then
>         if events:is_new_status() then
>             ...
>         elseif events:is_new_uri() then
>             ...
>         elseif events:is_new_incarnation() then
>             ...
>         elseif events:is_new_payload() then
>             ...
>         end
>     end
> end
> 
> s:on_member_event(on_event, ctx)
> ```
> Note, that multiple events can happen simultaneously. A user
> should be ready to that. Additionally, 'new' and 'drop' never
> happen together. But they can happen with 'update', easily.
> Especially if there are lots of events, and triggers work too
> slow. Then a member can be added and updated after a while, but
> still does not reach a trigger.
> 
> A remarkable case is 'new' + 'new payload'. This case does not
> correlate with triggers speed. The thing is that payload absence
> and payload of size 0 are not the same. And sometimes is happens,
> that a member is added without a payload. For example, a ping
> was received - pings do not carry payload. In such a case the
> missed payload is received later eventually. If that matters for
> a user's application, it should be ready to that: 'new' does not
> mean, that the member already has a payload, and payload size
> says nothing about its presence or absence.
> 
> Second usage case:
> ```Lua
> swim:on_member_event(nil, old_trigger)
> ```
> Drop an old trigger.
> 
> Third usage case:
> ```Lua
> swim:on_member_event(new_trigger, old_trigger[, ctx])
> ```
> Replace an existing trigger in-place, with keeping its position
> in the trigger list.
> 
> Fourth usage case:
> ```Lua
> swim:on_member_event()
> ```
> Get a list of triggers.
> 
> When drop or replace a trigger, a user should be attentive - the
> following code does not work:
> ```Lua
> tr = function() ... end
> -- Add a trigger.
> s:on_member_event(tr)
> ...
> -- Drop a trigger.
> s:on_member_event(nil, tr)
> ```
> The last line, if be precise. This is because SWIM wraps user
> triggers with an internal closure for parameters preprocessing.
> To drop a trigger a user should save result of the first
> on_member_event() call.
> 
> This code works:
> ```Lua
> tr = function() ... end
> -- Add a trigger.
> tr_id = s:on_member_event(tr)
> ...
> -- Drop a trigger.
> s:on_member_event(nil, tr_id)
> ```
> 
> The triggers are executed one by one in a separate fiber. And
> they can yield. These two facts mean that if one trigger sleeps
> too long - other triggers wait. It does not block SWIM from doing
> its routine operations, but block other triggers.
> 
> The last point to remember is that if a member was added and
> dropped before its appearance has reached a trigger, then such
> a member does not fire triggers at all. A user will not notice
> that easy rider member.
> ---
>  src/lua/swim.c          |  27 ++++-
>  src/lua/swim.lua        |  90 ++++++++++++++
>  test/swim/swim.result   | 263 ++++++++++++++++++++++++++++++++++++++++
>  test/swim/swim.test.lua |  85 +++++++++++++
>  4 files changed, 464 insertions(+), 1 deletion(-)
> 
> diff --git a/src/lua/swim.c b/src/lua/swim.c
> index 3b9e229be..c3a0a9911 100644
> --- a/src/lua/swim.c
> +++ b/src/lua/swim.c
> @@ -33,8 +33,31 @@
>  #include "diag.h"
>  #include "lua/utils.h"
>  
> +static uint32_t ctid_swim_member_ptr;
>  static uint32_t ctid_swim_ptr;
>  
> +/** Push member event context into a Lua stack. */
> +static int
> +lua_swim_member_event_push(struct lua_State *L, void *event)
> +{
> +	struct swim_on_member_event_ctx *ctx =
> +		(struct swim_on_member_event_ctx *) event;
> +	*(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) =
> +		ctx->member;
> +	lua_pushinteger(L, ctx->events);
> +	return 2;
> +}
> +
> +/** Set or/and delete a trigger on a SWIM member event. */
> +static int
> +lua_swim_on_member_event(struct lua_State *L)
> +{
> +	uint32_t ctypeid;
> +	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
> +	return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_event(s),
> +				  lua_swim_member_event_push, NULL);
> +}
> +
>  /**
>   * Create a new SWIM instance. SWIM is not created via FFI,
>   * because this operation yields.
> @@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L)
>  void
>  tarantool_lua_swim_init(struct lua_State *L)
>  {
> -	luaL_cdef(L, "struct swim;");
> +	luaL_cdef(L, "struct swim_member; struct swim;");
> +	ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *");
>  	ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
>  
>  	static const struct luaL_Reg lua_swim_internal_methods [] = {
>  		{"swim_new", lua_swim_new},
>  		{"swim_delete", lua_swim_delete},
> +		{"swim_on_member_event", lua_swim_on_member_event},
>  		{NULL, NULL}
>  	};
>  	luaL_register_module(L, "swim", lua_swim_internal_methods);
> diff --git a/src/lua/swim.lua b/src/lua/swim.lua
> index 527299284..a7d5caab3 100644
> --- a/src/lua/swim.lua
> +++ b/src/lua/swim.lua
> @@ -24,6 +24,16 @@ ffi.cdef[[
>          MEMBER_LEFT,
>      };
>  
> +    enum swim_ev_mask {
> +        SWIM_EV_NEW             = 0b00000001,
> +        SWIM_EV_NEW_STATUS      = 0b00000010,
> +        SWIM_EV_NEW_URI         = 0b00000100,
> +        SWIM_EV_NEW_INCARNATION = 0b00001000,
> +        SWIM_EV_NEW_PAYLOAD     = 0b00010000,
> +        SWIM_EV_UPDATE          = 0b00011110,
> +        SWIM_EV_DROP            = 0b00100000,
> +    };
> +
>      bool
>      swim_is_configured(const struct swim *swim);
>  
> @@ -689,6 +699,84 @@ local function swim_pairs(s)
>      return swim_pairs_next, {swim = s, iterator = iterator}, nil
>  end
>  
> +local swim_member_event_index = {
> +    is_new = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0
> +    end,
> +    is_drop = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0
> +    end,
> +    is_update = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0
> +    end,
> +    is_new_status = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0
> +    end,
> +    is_new_uri = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0
> +    end,
> +    is_new_incarnation = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0
> +    end,
> +    is_new_payload = function(self)
> +        return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0
> +    end,
> +}
> +
> +local swim_member_event_mt = {
> +    __index = swim_member_event_index,
> +    __serialize = function(self)
> +        local res = {}
> +        for k, v in pairs(swim_member_event_index) do
> +            v = v(self)
> +            if v then
> +                res[k] = v
> +            end
> +        end
> +        return res
> +    end,
> +}
> +
> +--
> +-- Create a closure function for preprocessing raw SWIM member
> +-- event trigger parameters.
> +-- @param s SWIM instance.
> +-- @param callback User functions to call.
> +-- @param ctx An optional parameter for @a callback passed as is.
> +-- @return A function to set as a trigger.
> +--
> +local function swim_on_member_event_new(s, callback, ctx)
> +    return function(member_ptr, event_mask)
> +        local m = swim_wrap_member(s, member_ptr)
> +        local event = setmetatable({event_mask}, swim_member_event_mt)
> +        return callback(m, event, ctx)
> +    end
> +end
> +
> +--
> +-- Add or/and delete a trigger on member event. Possible usages:
> +--
> +-- * on_member_event(new[, ctx]) - add a new trigger. It should
> +--   accept 3 arguments: an updated member, an events object, an
> +--   optional @a ctx parameter passed as is.
> +--
> +-- * on_member_event(new, old[, ctx]) - add a new trigger @a new
> +--   if not nil, in place of @a old trigger.
> +--
> +-- * on_member_event() - get a list of triggers.
> +--
> +local function swim_on_member_event(s, new, old, ctx)
> +    local ptr = swim_check_instance(s, 'swim:on_member_event')
> +    if type(old) ~= 'function' then
> +        ctx = old
> +        old = nil
> +    end
> +    if new ~= nil then
> +        new = swim_on_member_event_new(s, new, ctx)
> +    end
> +    return internal.swim_on_member_event(ptr, new, old)
> +end
> +
>  --
>  -- Normal metatable of a configured SWIM instance.
>  --
> @@ -708,6 +796,7 @@ local swim_mt = {
>          set_payload = swim_set_payload,
>          set_codec = swim_set_codec,
>          pairs = swim_pairs,
> +        on_member_event = swim_on_member_event,
>      },
>      __serialize = swim_serialize
>  }
> @@ -800,6 +889,7 @@ local swim_not_configured_mt = {
>          delete = swim_delete,
>          is_configured = swim_is_configured,
>          set_codec = swim_set_codec,
> +        on_member_event = swim_on_member_event,
>      },
>      __serialize = swim_serialize
>  }
> diff --git a/test/swim/swim.result b/test/swim/swim.result
> index 436d4e579..0196837c8 100644
> --- a/test/swim/swim.result
> +++ b/test/swim/swim.result
> @@ -1216,6 +1216,269 @@ s1:delete()
>  s2:delete()
>  ---
>  ...
> +--
> +-- gh-4250: allow to set triggers on a new member appearance, old
> +-- member drop, member update.
> +--
> +s1 = swim.new()
> +---
> +...
> +s1.on_member_event()
> +---
> +- error: 'builtin/swim.lua:<line>: swim:on_member_event: first argument is not a SWIM
> +    instance'
> +...
> +m_list = {}
> +---
> +...
> +e_list = {}
> +---
> +...
> +ctx_list = {}
> +---
> +...
> +f = nil
> +---
> +...
> +f_need_sleep = false
> +---
> +...
> +_ = test_run:cmd("setopt delimiter ';'")
> +---
> +...
> +t_save_event = function(m, e, ctx)
> +    table.insert(m_list, m)
> +    table.insert(e_list, e)
> +    table.insert(ctx_list, ctx)
> +end;
> +---
> +...
> +t_yield = function(m, e, ctx)
> +    f = fiber.self()
> +    t_save_event(m, e, ctx)
> +    while f_need_sleep do fiber.sleep(10000) end
> +end;
> +---
> +...
> +_ = test_run:cmd("setopt delimiter ''");
> +---
> +...
> +t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
> +---
> +...
> +-- Not equal, because SWIM wraps user triggers with a closure for
> +-- context preprocessing.
> +t_save_event_id ~= t_save_event
> +---
> +- true
> +...
> +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
> +---
> +- true
> +...
> +while #m_list < 1 do fiber.sleep(0) end
> +---
> +...
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000001
> +    payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_payload: true
> +    is_new_uri: true
> +    is_new: true
> +    is_update: true
> +...
> +ctx_list
> +---
> +- - ctx
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +t_yield_id = s1:on_member_event(t_yield, 'ctx2')
> +---
> +...
> +f_need_sleep = true
> +---
> +...
> +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
> +---
> +...
> +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
> +---
> +- true
> +...
> +while s1:size() ~= 2 do fiber.sleep(0.01) end
> +---
> +...
> +-- Only first trigger worked. Second is waiting, because first
> +-- sleeps.
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_payload: true
> +    is_new: true
> +    is_update: true
> +...
> +ctx_list
> +---
> +- - ctx2
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +-- But it does not prevent normal SWIM operation.
> +s1:set_payload('payload')
> +---
> +- true
> +...
> +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
> +---
> +...
> +s2:member_by_uuid(s1:self():uuid()):payload()
> +---
> +- payload
> +...
> +f_need_sleep = false
> +---
> +...
> +fiber.wakeup(f)
> +---
> +...
> +while #m_list ~= 3 do fiber.sleep(0.01) end
> +---
> +...
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +  - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 2
> +    uuid: 00000000-0000-1000-8000-000000000001
> +    payload_size: 8
> +  - uri: 127.0.0.1:<port>
> +    status: alive
> +    incarnation: 2
> +    uuid: 00000000-0000-1000-8000-000000000001
> +    payload_size: 8
> +...
> +e_list
> +---
> +- - is_new_payload: true
> +    is_new: true
> +    is_update: true
> +  - is_new_payload: true
> +    is_update: true
> +    is_new_incarnation: true
> +  - is_new_payload: true
> +    is_update: true
> +    is_new_incarnation: true
> +...
> +ctx_list
> +---
> +- - ctx
> +  - ctx2
> +  - ctx
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +#s1:on_member_event()
> +---
> +- 2
> +...
> +s1:on_member_event(nil, t_yield_id)
> +---
> +...
> +s2:quit()
> +---
> +...
> +while s1:size() ~= 1 do fiber.sleep(0.01) end
> +---
> +...
> +-- Process event.
> +fiber.sleep(0)
> +---
> +...
> +-- Two events - status update to 'left', and 'drop'.
> +m_list
> +---
> +- - uri: 127.0.0.1:<port>
> +    status: left
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +  - uri: 127.0.0.1:<port>
> +    status: left
> +    incarnation: 1
> +    uuid: 00000000-0000-1000-8000-000000000002
> +    payload_size: 0
> +...
> +e_list
> +---
> +- - is_new_status: true
> +    is_update: true
> +  - is_drop: true
> +...
> +ctx_list
> +---
> +- - ctx
> +  - ctx
> +...
> +m = m_list[1]
> +---
> +...
> +-- Cached member table works even when a member is deleted.
> +m_list[1] == m_list[2]
> +---
> +- true
> +...
> +m_list = {} e_list = {} ctx_list = {}
> +---
> +...
> +s1:on_member_event(nil, t_save_event_id)
> +---
> +...
> +s1:add_member({uuid = m:uuid(), uri = m:uri()})
> +---
> +- true
> +...
> +fiber.sleep(0)
> +---
> +...
> +-- No events - all the triggers are dropped.
> +m_list
> +---
> +- []
> +...
> +e_list
> +---
> +- []
> +...
> +ctx_list
> +---
> +- []
> +...
> +s1:delete()
> +---
> +...
>  test_run:cmd("clear filter")
>  ---
>  - true
> diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
> index a3eac9b46..458e349e0 100644
> --- a/test/swim/swim.test.lua
> +++ b/test/swim/swim.test.lua
> @@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid())
>  s1:delete()
>  s2:delete()
>  
> +--
> +-- gh-4250: allow to set triggers on a new member appearance, old
> +-- member drop, member update.
> +--
> +s1 = swim.new()
> +s1.on_member_event()
> +
> +m_list = {}
> +e_list = {}
> +ctx_list = {}
> +f = nil
> +f_need_sleep = false
> +_ = test_run:cmd("setopt delimiter ';'")
> +t_save_event = function(m, e, ctx)
> +    table.insert(m_list, m)
> +    table.insert(e_list, e)
> +    table.insert(ctx_list, ctx)
> +end;
> +t_yield = function(m, e, ctx)
> +    f = fiber.self()
> +    t_save_event(m, e, ctx)
> +    while f_need_sleep do fiber.sleep(10000) end
> +end;
> +_ = test_run:cmd("setopt delimiter ''");
> +t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
> +-- Not equal, because SWIM wraps user triggers with a closure for
> +-- context preprocessing.
> +t_save_event_id ~= t_save_event
> +
> +s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
> +while #m_list < 1 do fiber.sleep(0) end
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +
> +t_yield_id = s1:on_member_event(t_yield, 'ctx2')
> +f_need_sleep = true
> +s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
> +s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
> +while s1:size() ~= 2 do fiber.sleep(0.01) end
> +-- Only first trigger worked. Second is waiting, because first
> +-- sleeps.
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +-- But it does not prevent normal SWIM operation.
> +s1:set_payload('payload')
> +while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
> +s2:member_by_uuid(s1:self():uuid()):payload()
> +
> +f_need_sleep = false
> +fiber.wakeup(f)
> +while #m_list ~= 3 do fiber.sleep(0.01) end
> +m_list
> +e_list
> +ctx_list
> +m_list = {} e_list = {} ctx_list = {}
> +#s1:on_member_event()
> +
> +s1:on_member_event(nil, t_yield_id)
> +s2:quit()
> +while s1:size() ~= 1 do fiber.sleep(0.01) end
> +-- Process event.
> +fiber.sleep(0)
> +-- Two events - status update to 'left', and 'drop'.
> +m_list
> +e_list
> +ctx_list
> +m = m_list[1]
> +-- Cached member table works even when a member is deleted.
> +m_list[1] == m_list[2]
> +m_list = {} e_list = {} ctx_list = {}
> +
> +s1:on_member_event(nil, t_save_event_id)
> +s1:add_member({uuid = m:uuid(), uri = m:uri()})
> +fiber.sleep(0)
> +-- No events - all the triggers are dropped.
> +m_list
> +e_list
> +ctx_list
> +
> +s1:delete()
> +
>  test_run:cmd("clear filter")
> 

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 1/5] test: create isolated ev_loop for swim unit tests
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
@ 2019-06-08 14:32   ` Konstantin Osipov
  0 siblings, 0 replies; 11+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:32 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/08 17:11]:
> The SWIM unit tests code with the fake events and time does lots
> of forbidden things: it manually invokes pending watcher
> callbacks; manages global time without a kernel; puts not
> existing descriptors into the loop. These foul blows open the
> gates to the full control over IO events, descriptors, virtual
> time. Hundreds of virtual seconds pass in milliseconds in
> reality, it makes SWIM unit tests fast despite complex logic.
> 
> All these actions does not affect the loop until yield. On yield
> a scheduler fiber wakes up and
> 
>     1) infinitely generates EV_READ on not existing descriptors
>        because a kernel considers them closed;
> 
>     2) manual pending callbacks invocation asserts, because it is
>        not allowed for non-scheduler fibers.
> 
> To avoid these problems a new isolated loop is created, not
> visible for the scheduler. Here the fake events library can rack
> and ruin whatever it wants.
> 
> Needed for #4250

OK to push.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 2/5] swim: fix a 'use after free' in SWIM tests
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
@ 2019-06-08 14:32   ` Konstantin Osipov
  0 siblings, 0 replies; 11+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:32 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/08 17:11]:
> It is a miracle, but somehow it worked until I changed a couple
> of places. Here objects stored in an rlist are freed, but not
> deleted from the list. The list is reused after that.

OK to push.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 3/5] swim: allow to set triggers on member updates
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 3/5] swim: allow to set triggers on member updates Vladislav Shpilevoy
@ 2019-06-08 14:35   ` Konstantin Osipov
  0 siblings, 0 replies; 11+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:35 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/08 17:11]:
> SWIM is asynchronous by design. It means, that there are two
> ways of getting updates of a member table and individual members:
> polling and triggers. Polling is terrible - this is why libev
> with select(), poll(), epoll(), kevent() have appeared. The only
> acceptable solution is triggers.

LGTM


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 11+ messages in thread

* [tarantool-patches] Re: [PATCH v2 4/5] swim: call swim:new/delete via Lua C, not via FFI
  2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
@ 2019-06-08 14:35   ` Konstantin Osipov
  0 siblings, 0 replies; 11+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:35 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/08 17:11]:
> These function are going to yield in scope of #4250, because
> swim:new() will start a fiber, while swim:delete() cancels and
> gives it a control.
> 
LGTM


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2019-06-08 14:35 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-08 10:31 [tarantool-patches] [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
2019-06-08 14:32   ` [tarantool-patches] " Konstantin Osipov
2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
2019-06-08 14:32   ` [tarantool-patches] " Konstantin Osipov
2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 3/5] swim: allow to set triggers on member updates Vladislav Shpilevoy
2019-06-08 14:35   ` [tarantool-patches] " Konstantin Osipov
2019-06-08 10:31 ` [tarantool-patches] [PATCH v2 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
2019-06-08 14:35   ` [tarantool-patches] " Konstantin Osipov
2019-06-08 11:04 ` [tarantool-patches] Re: [PATCH v2 0/5] SWIM on_member_update Vladislav Shpilevoy
2019-06-08 11:06   ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox