Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH 0/5] SWIM on_member_update
@ 2019-06-02  0:00 Vladislav Shpilevoy
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
                   ` (5 more replies)
  0 siblings, 6 replies; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-02  0:00 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

This patchset introduces API to set triggers on SWIM member updates, what
makes Lua SWIM asynchronous.

Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-4250-swim-triggers
Issue: https://github.com/tarantool/tarantool/issues/4250

Vladislav Shpilevoy (5):
  test: create isolated ev_loop for swim unit tests
  swim: fix a 'use after free' in SWIM tests
  swim: allow to hang triggers on member updates
  swim: call swim:new/delete via Lua C, not via FFI
  swim: expose Lua triggers on member update

 extra/exports                   |   2 -
 src/CMakeLists.txt              |   3 +-
 src/lib/swim/swim.c             | 220 +++++++++++++++++++++-----
 src/lib/swim/swim.h             |  47 +++++-
 src/lib/swim/swim_ev.c          |   6 +
 src/lib/swim/swim_ev.h          |   3 +
 src/lib/swim/swim_io.c          |  15 +-
 src/lua/init.c                  |   2 +
 src/lua/swim.c                  | 106 +++++++++++++
 src/lua/swim.h                  |  34 +++++
 src/lua/swim.lua                | 101 +++++++++++-
 test/swim/swim.result           | 263 ++++++++++++++++++++++++++++++++
 test/swim/swim.test.lua         |  85 +++++++++++
 test/unit/swim.c                | 135 +++++++++++++++-
 test/unit/swim.result           |  28 +++-
 test/unit/swim_test_ev.c        |  24 +++
 test/unit/swim_test_transport.c |   1 +
 test/unit/swim_test_utils.c     |  46 +++++-
 test/unit/swim_test_utils.h     |  12 ++
 19 files changed, 1075 insertions(+), 58 deletions(-)
 create mode 100644 src/lua/swim.c
 create mode 100644 src/lua/swim.h

-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] [PATCH 1/5] test: create isolated ev_loop for swim unit tests
  2019-06-02  0:00 [tarantool-patches] [PATCH 0/5] SWIM on_member_update Vladislav Shpilevoy
@ 2019-06-02  0:00 ` Vladislav Shpilevoy
  2019-06-05  6:51   ` [tarantool-patches] " Konstantin Osipov
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-02  0:00 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

The SWIM unit tests code with the fake events and time does lots
of forbidden things: it manually invokes pending watcher
callbacks; manages global time without a kernel; puts not
existing descriptors into the loop. These foul blows open the
gates to the full control over IO events, descriptors, virtual
time. Hundreds of virtual seconds pass in milliseconds in
reality, it makes SWIM unit tests fast despite complex logic.

All these actions does not affect the loop until yield. On yield
a scheduler fiber wakes up and

    1) infinitely generates EV_READ on not existing descriptors
       because a kernel considers them closed;

    2) manual pending callbacks invocation asserts, because it is
       not allowed for non-scheduler fibers.

To avoid these problems a new isolated loop is created, not
visible for the scheduler. Here the fake events library can rack
and ruin whatever it wants.

Needed for #4250
---
 src/lib/swim/swim.c         | 21 ++++++++++++---------
 src/lib/swim/swim_ev.c      |  6 ++++++
 src/lib/swim/swim_ev.h      |  3 +++
 src/lib/swim/swim_io.c      | 15 ++++++++-------
 test/unit/swim_test_ev.c    | 24 ++++++++++++++++++++++++
 test/unit/swim_test_utils.c |  2 +-
 6 files changed, 54 insertions(+), 17 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index f7a885b76..46b76731d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -519,7 +519,7 @@ swim_wait_ack(struct swim *swim, struct swim_member *member,
 			timeout *= 2;
 		member->ping_deadline = swim_time() + timeout;
 		wait_ack_heap_insert(&swim->wait_ack_heap, member);
-		swim_ev_timer_again(loop(), &swim->wait_ack_tick);
+		swim_ev_timer_again(swim_loop(), &swim->wait_ack_tick);
 	}
 }
 
@@ -802,7 +802,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		return NULL;
 	}
 	if (mh_size(swim->members) > 1)
-		swim_ev_timer_again(loop(), &swim->round_tick);
+		swim_ev_timer_again(swim_loop(), &swim->round_tick);
 
 	/* Dissemination component. */
 	swim_on_member_update(swim, member);
@@ -1122,7 +1122,7 @@ swim_complete_step(struct swim_task *task,
 	 * It could be stopped by the step begin function, if the
 	 * sending was too long.
 	 */
-	swim_ev_timer_again(loop(), &swim->round_tick);
+	swim_ev_timer_again(swim_loop(), &swim->round_tick);
 	/*
 	 * It is possible that the original member was deleted
 	 * manually during the task execution.
@@ -1813,16 +1813,17 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		addr = swim->self->addr;
 	}
 	struct ev_timer *t = &swim->round_tick;
+	struct ev_loop *l = swim_loop();
 	if (t->repeat != heartbeat_rate && heartbeat_rate > 0) {
 		swim_ev_timer_set(t, 0, heartbeat_rate);
 		if (swim_ev_is_active(t))
-			swim_ev_timer_again(loop(), t);
+			swim_ev_timer_again(l, t);
 	}
 	t = &swim->wait_ack_tick;
 	if (t->repeat != ack_timeout && ack_timeout > 0) {
 		swim_ev_timer_set(t, 0, ack_timeout);
 		if (swim_ev_is_active(t))
-			swim_ev_timer_again(loop(), t);
+			swim_ev_timer_again(l, t);
 	}
 
 	if (new_self != NULL) {
@@ -1953,9 +1954,10 @@ swim_size(const struct swim *swim)
 void
 swim_delete(struct swim *swim)
 {
+	struct ev_loop *l = swim_loop();
 	swim_scheduler_destroy(&swim->scheduler);
-	swim_ev_timer_stop(loop(), &swim->round_tick);
-	swim_ev_timer_stop(loop(), &swim->wait_ack_tick);
+	swim_ev_timer_stop(l, &swim->round_tick);
+	swim_ev_timer_stop(l, &swim->wait_ack_tick);
 	mh_int_t node;
 	mh_foreach(swim->members, node) {
 		struct swim_member *m =
@@ -2018,8 +2020,9 @@ void
 swim_quit(struct swim *swim)
 {
 	assert(swim_is_configured(swim));
-	swim_ev_timer_stop(loop(), &swim->round_tick);
-	swim_ev_timer_stop(loop(), &swim->wait_ack_tick);
+	struct ev_loop *l = swim_loop();
+	swim_ev_timer_stop(l, &swim->round_tick);
+	swim_ev_timer_stop(l, &swim->wait_ack_tick);
 	swim_scheduler_stop_input(&swim->scheduler);
 	/* Start the last round - quiting. */
 	swim_new_round(swim);
diff --git a/src/lib/swim/swim_ev.c b/src/lib/swim/swim_ev.c
index 49c8c273b..82668d41d 100644
--- a/src/lib/swim/swim_ev.c
+++ b/src/lib/swim/swim_ev.c
@@ -55,3 +55,9 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher)
 {
 	ev_timer_stop(loop, watcher);
 }
+
+struct ev_loop *
+swim_loop(void)
+{
+	return loop();
+}
diff --git a/src/lib/swim/swim_ev.h b/src/lib/swim/swim_ev.h
index 1bd81306f..900be150f 100644
--- a/src/lib/swim/swim_ev.h
+++ b/src/lib/swim/swim_ev.h
@@ -52,6 +52,9 @@ swim_ev_timer_again(struct ev_loop *loop, struct ev_timer *watcher);
 void
 swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher);
 
+struct ev_loop *
+swim_loop(void);
+
 #define swim_ev_is_active ev_is_active
 
 #define swim_ev_init ev_init
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index c55c276cb..e7ff321d4 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -148,7 +148,7 @@ swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler)
 {
 	assert(! swim_task_is_scheduled(task));
 	rlist_add_tail_entry(&scheduler->queue_output, task, in_queue_output);
-	swim_ev_io_start(loop(), &scheduler->output);
+	swim_ev_io_start(swim_loop(), &scheduler->output);
 }
 
 void
@@ -289,16 +289,17 @@ int
 swim_scheduler_bind(struct swim_scheduler *scheduler,
 		    const struct sockaddr_in *addr)
 {
-	swim_ev_io_stop(loop(), &scheduler->input);
-	swim_ev_io_stop(loop(), &scheduler->output);
+	struct ev_loop *l = swim_loop();
+	swim_ev_io_stop(l, &scheduler->input);
+	swim_ev_io_stop(l, &scheduler->output);
 	struct swim_transport *t = &scheduler->transport;
 	int rc = swim_transport_bind(t, (const struct sockaddr *) addr,
 				     sizeof(*addr));
 	if (t->fd >= 0) {
 		swim_ev_io_set(&scheduler->output, t->fd, EV_WRITE);
 		swim_ev_io_set(&scheduler->input, t->fd, EV_READ);
-		swim_ev_io_start(loop(), &scheduler->input);
-		swim_ev_io_start(loop(), &scheduler->output);
+		swim_ev_io_start(l, &scheduler->input);
+		swim_ev_io_start(l, &scheduler->output);
 	}
 	return rc;
 }
@@ -306,7 +307,7 @@ swim_scheduler_bind(struct swim_scheduler *scheduler,
 void
 swim_scheduler_stop_input(struct swim_scheduler *scheduler)
 {
-	swim_ev_io_stop(loop(), &scheduler->input);
+	swim_ev_io_stop(swim_loop(), &scheduler->input);
 }
 
 void
@@ -323,7 +324,7 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
 			t->cancel(t, scheduler, -1);
 	}
 	swim_transport_destroy(&scheduler->transport);
-	swim_ev_io_stop(loop(), &scheduler->output);
+	swim_ev_io_stop(swim_loop(), &scheduler->output);
 	swim_scheduler_stop_input(scheduler);
 }
 
diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c
index a4ffa2fc8..fb25ac9e4 100644
--- a/test/unit/swim_test_ev.c
+++ b/test/unit/swim_test_ev.c
@@ -62,6 +62,27 @@ struct swim_event;
 typedef void (*swim_event_process_f)(struct swim_event *, struct ev_loop *);
 typedef void (*swim_event_delete_f)(struct swim_event *);
 
+/**
+ * The unit tests code with the fake events and time does lots of
+ * forbidden things: it manually invokes pending watcher
+ * callbacks; manages global time without a kernel; puts not
+ * existing descriptors into the loop. All these actions does not
+ * affect the loop until yield. On yield a scheduler fiber wakes
+ * up and 1) infinitely generates EV_READ on not existing
+ * descriptors because considers them closed; 2) manual pending
+ * callbacks invocation asserts, because it is not allowed for
+ * non-scheduler fibers. To avoid these problems a new isolated
+ * loop is created, not visible for the scheduler. Here the fake
+ * events library can rack and ruin whatever it wants.
+ */
+static struct ev_loop *test_loop;
+
+struct ev_loop *
+swim_loop(void)
+{
+	return test_loop;
+}
+
 /**
  * Base event. It is stored in the event heap and virtualizes
  * other events.
@@ -330,6 +351,8 @@ swim_test_ev_init(void)
 	events_hash = mh_i64ptr_new();
 	assert(events_hash != NULL);
 	event_heap_create(&event_heap);
+	test_loop = ev_loop_new(0);
+	assert(test_loop != NULL);
 }
 
 void
@@ -338,4 +361,5 @@ swim_test_ev_free(void)
 	swim_test_ev_reset();
 	event_heap_destroy(&event_heap);
 	mh_i64ptr_delete(events_hash);
+	ev_loop_destroy(test_loop);
 }
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index ffd42cbd0..f72fa2450 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -607,7 +607,7 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 {
 	swim_ev_set_brk(timeout);
 	double deadline = swim_time() + timeout;
-	struct ev_loop *loop = loop();
+	struct ev_loop *loop = swim_loop();
 	/*
 	 * There can be pending out of bound IO events, affecting
 	 * the result. For example, 'quit' messages, which are
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] [PATCH 2/5] swim: fix a 'use after free' in SWIM tests
  2019-06-02  0:00 [tarantool-patches] [PATCH 0/5] SWIM on_member_update Vladislav Shpilevoy
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
@ 2019-06-02  0:00 ` Vladislav Shpilevoy
  2019-06-05  6:52   ` [tarantool-patches] " Konstantin Osipov
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 3/5] swim: allow to hang triggers on member updates Vladislav Shpilevoy
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-02  0:00 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

It is a miracle, but somehow it worked until I changed a couple
of places. Here objects stored in an rlist are freed, but not
deleted from the list. The list is reused after that.
---
 test/unit/swim_test_transport.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index c4a1dd774..334ac926e 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -85,6 +85,7 @@ swim_test_packet_new(const char *data, int size, const struct sockaddr_in *src,
 static inline void
 swim_test_packet_delete(struct swim_test_packet *p)
 {
+	rlist_del_entry(p, in_queue);
 	free(p);
 }
 
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] [PATCH 3/5] swim: allow to hang triggers on member updates
  2019-06-02  0:00 [tarantool-patches] [PATCH 0/5] SWIM on_member_update Vladislav Shpilevoy
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
@ 2019-06-02  0:00 ` Vladislav Shpilevoy
  2019-06-05  7:11   ` [tarantool-patches] " Konstantin Osipov
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
                   ` (2 subsequent siblings)
  5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-02  0:00 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

SWIM is asynchronous by design. It means, that there are two
ways of getting updates of a member table and individual members:
polling and triggers. Polling is terrible - this is why libev
with select(), poll(), epoll(), kevent() have appeared. The only
acceptable solution is triggers.

This commit allows to hang triggers on member updates.

Part of #4250
---
 src/lib/swim/swim.c         | 199 +++++++++++++++++++++++++++++++-----
 src/lib/swim/swim.h         |  47 ++++++++-
 test/unit/swim.c            | 135 +++++++++++++++++++++++-
 test/unit/swim.result       |  28 ++++-
 test/unit/swim_test_utils.c |  44 +++++++-
 test/unit/swim_test_utils.h |  12 +++
 6 files changed, 435 insertions(+), 30 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 46b76731d..8507eea14 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -37,6 +37,7 @@
 #include "msgpuck.h"
 #include "assoc.h"
 #include "sio.h"
+#include "trigger.h"
 #define HEAP_FORWARD_DECLARATION
 #include "salad/heap.h"
 
@@ -334,6 +335,19 @@ struct swim_member {
 	 * time.
 	 */
 	struct rlist in_dissemination_queue;
+	/**
+	 * Each time a member is updated, or created, or dropped,
+	 * it is added to an update queue. Members from this queue
+	 * are dispatched into user defined triggers.
+	 */
+	struct stailq_entry in_update_queue;
+	/**
+	 * Mask of events happened with this member since a
+	 * previous trigger invocation. Once the events are
+	 * delivered into a trigger, the mask is nullified and
+	 * starts collecting new events.
+	 */
+	enum swim_ev_mask events;
 	/**
 	 *
 	 *               Failure detection component
@@ -455,6 +469,17 @@ struct swim {
 	 * as long as the event TTD is non-zero.
 	 */
 	struct rlist dissemination_queue;
+	/**
+	 * Queue of updated, new, and dropped members to deliver
+	 * the events to triggers. Dropped members are also kept
+	 * here until they are handled by a trigger.
+	 */
+	struct stailq update_queue;
+	/**
+	 * List of triggers to call on each new, dropped, and
+	 * updated member.
+	 */
+	struct rlist on_member_update;
 	/**
 	 * Members to which a message should be sent next during
 	 * this round.
@@ -472,6 +497,13 @@ struct swim {
 	 * the beginning of each round.
 	 */
 	struct swim_member **shuffled;
+	/**
+	 * Worker fiber to serve member update triggers. This task
+	 * is being done in a separate fiber, because user
+	 * triggers can yield and libev callbacks, processing
+	 * member updates, are not allowed to yield.
+	 */
+	struct fiber *worker;
 	/**
 	 * Single round step task. It is impossible to have
 	 * multiple round steps in the same SWIM instance at the
@@ -549,10 +581,41 @@ swim_register_event(struct swim *swim, struct swim_member *member)
  * change of its status, or incarnation, or both.
  */
 static void
-swim_on_member_update(struct swim *swim, struct swim_member *member)
+swim_on_member_update(struct swim *swim, struct swim_member *member,
+		      enum swim_ev_mask events)
 {
 	member->unacknowledged_pings = 0;
 	swim_register_event(swim, member);
+	/*
+	 * Member update should be delivered to triggers only
+	 * if there is at least one trigger.
+	 */
+	if (! rlist_empty(&swim->on_member_update)) {
+		/*
+		 * Member is referenced and added to a queue only
+		 * once. That moment can be detected when a first
+		 * event happens.
+		 */
+		if (member->events == 0 && events != 0) {
+			swim_member_ref(member);
+			stailq_add_tail_entry(&swim->update_queue, member,
+					      in_update_queue);
+			fiber_wakeup(swim->worker);
+		}
+		member->events |= events;
+	}
+}
+
+struct rlist *
+swim_trigger_list_on_member_update(struct swim *swim)
+{
+	return &swim->on_member_update;
+}
+
+bool
+swim_has_pending_events(struct swim *swim)
+{
+	return ! stailq_empty(&swim->update_queue);
 }
 
 /**
@@ -577,13 +640,17 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member,
 	 */
 	assert(member != swim->self);
 	if (member->incarnation < incarnation) {
-		member->status = new_status;
+		enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION;
+		if (new_status != member->status) {
+			events |= SWIM_EV_NEW_STATUS;
+			member->status = new_status;
+		}
 		member->incarnation = incarnation;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, events);
 	} else if (member->incarnation == incarnation &&
 		   member->status < new_status) {
 		member->status = new_status;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
 	}
 }
 
@@ -626,7 +693,7 @@ swim_update_member_payload(struct swim *swim, struct swim_member *member,
 	member->payload_size = payload_size;
 	member->payload_ttd = mh_size(swim->members);
 	member->is_payload_up_to_date = true;
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW_PAYLOAD);
 	return 0;
 }
 
@@ -734,7 +801,6 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 	mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
 	assert(rc != mh_end(swim->members));
 	mh_swim_table_del(swim->members, rc, NULL);
-	swim_cached_round_msg_invalidate(swim);
 	rlist_del_entry(member, in_round_queue);
 
 	/* Failure detection component. */
@@ -742,6 +808,7 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 		wait_ack_heap_delete(&swim->wait_ack_heap, member);
 
 	/* Dissemination component. */
+	swim_on_member_update(swim, member, SWIM_EV_DROP);
 	rlist_del_entry(member, in_dissemination_queue);
 
 	swim_member_delete(member);
@@ -805,7 +872,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		swim_ev_timer_again(swim_loop(), &swim->round_tick);
 
 	/* Dissemination component. */
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW);
 	if (payload_size >= 0 &&
 	    swim_update_member_payload(swim, member, payload,
 				       payload_size) != 0) {
@@ -1299,14 +1366,15 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 			if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
 				break;
 			m->status = MEMBER_SUSPECTED;
-			swim_on_member_update(swim, m);
+			swim_on_member_update(swim, m, SWIM_EV_NEW_STATUS);
 			if (swim_send_indirect_pings(swim, m) != 0)
 				diag_log();
 			break;
 		case MEMBER_SUSPECTED:
 			if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
 				m->status = MEMBER_DEAD;
-				swim_on_member_update(swim, m);
+				swim_on_member_update(swim, m,
+						      SWIM_EV_NEW_STATUS);
 			}
 			break;
 		case MEMBER_DEAD:
@@ -1332,7 +1400,7 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member,
 {
 	assert(! swim_inaddr_eq(&member->addr, addr));
 	member->addr = *addr;
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW_URI);
 }
 
 /**
@@ -1354,12 +1422,10 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
 	if (def->incarnation > member->incarnation) {
 		if (! swim_inaddr_eq(&def->addr, &member->addr))
 			swim_update_member_addr(swim, member, &def->addr);
-		if (def->payload_size >= 0) {
+		if (def->payload_size >= 0)
 			update_payload = true;
-		} else if (member->is_payload_up_to_date) {
+		else if (member->is_payload_up_to_date)
 			member->is_payload_up_to_date = false;
-			swim_on_member_update(swim, member);
-		}
 	} else if (! member->is_payload_up_to_date && def->payload_size >= 0) {
 		update_payload = true;
 	}
@@ -1430,7 +1496,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 	 */
 	if (self->incarnation < def->incarnation) {
 		self->incarnation = def->incarnation;
-		swim_on_member_update(swim, self);
+		swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	}
 	if (def->status != MEMBER_ALIVE &&
 	    def->incarnation == self->incarnation) {
@@ -1440,7 +1506,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 		 * with a bigger incarnation.
 		 */
 		self->incarnation++;
-		swim_on_member_update(swim, self);
+		swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	}
 	return 0;
 skip:
@@ -1532,7 +1598,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	if (def.incarnation == member->incarnation &&
 	    member->status != MEMBER_ALIVE) {
 		member->status = MEMBER_ALIVE;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
 	}
 
 	switch (def.type) {
@@ -1604,7 +1670,7 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
 		swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp);
 	} else if (tmp >= m->incarnation) {
 		m->incarnation = tmp + 1;
-		swim_on_member_update(swim, m);
+		swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION);
 	}
 	return 0;
 }
@@ -1668,6 +1734,66 @@ error:
 	diag_log();
 }
 
+/**
+ * Do actual deletion of a SWIM instance. This destructor is
+ * called only after worker fiber has been stopped, and triggers
+ * do not work.
+ */
+static void
+swim_do_delete(struct swim *swim);
+
+/**
+ * Worker fiber. At this moment its only task is dispatching
+ * member updates to user defined triggers. Generally, because
+ * SWIM is fully IO driven, that fiber should be used only for
+ * yielding tasks not related to SWIM core logic. For all the
+ * other tasks libev callbacks are ok. Unfortunately, yields are
+ * not allowed directly in libev callbacks, because they are
+ * invoked by a cord scheduler fiber prohibited for manual yields.
+ */
+static int
+swim_worker_f(va_list va)
+{
+	struct swim *s = va_arg(va, struct swim *);
+	struct swim_on_member_update_ctx ctx;
+	while (! fiber_is_cancelled()) {
+		while (stailq_empty(&s->update_queue)) {
+			fiber_yield();
+			if (fiber_is_cancelled())
+				goto end;
+		}
+		/*
+		 * Can't be empty. SWIM deletes members from
+		 * update queue only on SWIM deletion, but then
+		 * the fiber would be stopped already.
+		 */
+		assert(! stailq_empty(&s->update_queue));
+		struct swim_member *m =
+			stailq_shift_entry(&s->update_queue, struct swim_member,
+					   in_update_queue);
+		/*
+		 * It is possible, that a member was added and
+		 * removed before firing a trigger. It happens,
+		 * if a previous event was being handled too
+		 * long, for example. There is a convention not to
+		 * show such easy riders.
+		 */
+		if ((m->events & SWIM_EV_NEW) == 0 ||
+		    (m->events & SWIM_EV_DROP) == 0) {
+			ctx.member = m;
+			ctx.events = m->events;
+			m->events = 0;
+			if (trigger_run(&s->on_member_update, (void *) &ctx))
+				diag_log();
+		}
+		swim_member_unref(m);
+	}
+end:
+	swim_do_delete(s);
+	return 0;
+}
+
+
 struct swim *
 swim_new(void)
 {
@@ -1676,8 +1802,14 @@ swim_new(void)
 		diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim");
 		return NULL;
 	}
+	swim->worker = fiber_new("SWIM worker", swim_worker_f);
+	if (swim->worker == NULL) {
+		free(swim);
+		return NULL;
+	}
 	swim->members = mh_swim_table_new();
 	if (swim->members == NULL) {
+		fiber_cancel(swim->worker);
 		free(swim);
 		diag_set(OutOfMemory, sizeof(*swim->members),
 			 "mh_swim_table_new", "members");
@@ -1700,7 +1832,10 @@ swim_new(void)
 
 	/* Dissemination component. */
 	rlist_create(&swim->dissemination_queue);
+	rlist_create(&swim->on_member_update);
+	stailq_create(&swim->update_queue);
 
+	fiber_start(swim->worker, swim);
 	return swim;
 }
 
@@ -1809,6 +1944,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		 * specified.
 		 */
 		addr = swim->scheduler.transport.addr;
+		fiber_set_name(swim->worker, tt_sprintf("SWIM worker %d",
+							swim_fd(swim)));
 	} else {
 		addr = swim->self->addr;
 	}
@@ -1828,7 +1965,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 
 	if (new_self != NULL) {
 		swim->self->status = MEMBER_LEFT;
-		swim_on_member_update(swim, swim->self);
+		swim_on_member_update(swim, swim->self, SWIM_EV_NEW_STATUS);
 		swim->self = new_self;
 	}
 	if (! swim_inaddr_eq(&addr, &swim->self->addr)) {
@@ -1866,7 +2003,7 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size)
 	if (swim_update_member_payload(swim, self, payload, payload_size) != 0)
 		return -1;
 	self->incarnation++;
-	swim_on_member_update(swim, self);
+	swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	return 0;
 }
 
@@ -1951,17 +2088,19 @@ swim_size(const struct swim *swim)
 	return mh_size(swim->members);
 }
 
-void
-swim_delete(struct swim *swim)
+static void
+swim_do_delete(struct swim *swim)
 {
 	struct ev_loop *l = swim_loop();
 	swim_scheduler_destroy(&swim->scheduler);
 	swim_ev_timer_stop(l, &swim->round_tick);
 	swim_ev_timer_stop(l, &swim->wait_ack_tick);
+	struct swim_member *m, *tmp;
+	stailq_foreach_entry_safe(m, tmp, &swim->update_queue, in_update_queue)
+		swim_member_unref(m);
 	mh_int_t node;
 	mh_foreach(swim->members, node) {
-		struct swim_member *m =
-			*mh_swim_table_node(swim->members, node);
+		m = *mh_swim_table_node(swim->members, node);
 		rlist_del_entry(m, in_round_queue);
 		if (! heap_node_is_stray(&m->in_wait_ack_heap))
 			wait_ack_heap_delete(&swim->wait_ack_heap, m);
@@ -1975,10 +2114,19 @@ swim_delete(struct swim *swim)
 	swim_task_destroy(&swim->round_step_task);
 	wait_ack_heap_destroy(&swim->wait_ack_heap);
 	mh_swim_table_delete(swim->members);
+	trigger_destroy(&swim->on_member_update);
 	free(swim->shuffled);
 	free(swim);
 }
 
+void
+swim_delete(struct swim *swim)
+{
+	fiber_wakeup(swim->worker);
+	fiber_cancel(swim->worker);
+	fiber_sleep(0);
+}
+
 /**
  * Quit message is broadcasted in the same way as round messages,
  * step by step, with the only difference that quit round steps
@@ -1991,7 +2139,8 @@ swim_quit_step_complete(struct swim_task *task,
 	(void) rc;
 	struct swim *swim = swim_by_scheduler(scheduler);
 	if (rlist_empty(&swim->round_queue)) {
-		swim_delete(swim);
+		fiber_wakeup(swim->worker);
+		fiber_cancel(swim->worker);
 		return;
 	}
 	struct swim_member *m =
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 6db12ba9e..8a5bc9522 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -40,6 +40,7 @@ extern "C" {
 #endif
 
 struct swim;
+struct rlist;
 struct tt_uuid;
 struct swim_iterator;
 struct swim_member;
@@ -63,7 +64,8 @@ enum swim_gc_mode {
 
 /**
  * Create a new SWIM instance. Do not bind to a port or set any
- * parameters. Allocation and initialization only.
+ * parameters. Allocation and initialization only. The function
+ * yields.
  */
 struct swim *
 swim_new(void);
@@ -124,7 +126,8 @@ swim_set_codec(struct swim *swim, enum crypto_algo algo, enum crypto_mode mode,
 
 /**
  * Stop listening and broadcasting messages, cleanup all internal
- * structures, free memory.
+ * structures, free memory. The function yields. Actual deletion
+ * happens after currently working triggers are done.
  */
 void
 swim_delete(struct swim *swim);
@@ -272,6 +275,46 @@ swim_member_unref(struct swim_member *member);
 bool
 swim_member_is_dropped(const struct swim_member *member);
 
+enum swim_ev_mask {
+	SWIM_EV_NEW             = 0b00000001,
+	SWIM_EV_NEW_STATUS      = 0b00000010,
+	SWIM_EV_NEW_URI         = 0b00000100,
+	SWIM_EV_NEW_INCARNATION = 0b00001000,
+	SWIM_EV_NEW_PAYLOAD     = 0b00010000,
+	/* Shortcut to check for any update. */
+	SWIM_EV_UPDATE          = 0b00011110,
+	SWIM_EV_DROP            = 0b00100000,
+};
+
+/** On member update trigger context. */
+struct swim_on_member_update_ctx {
+	/** New, dropped, or updated member. */
+	struct swim_member *member;
+	/** Mask of happened events. */
+	enum swim_ev_mask events;
+};
+
+/**
+ * A list of member update triggers. A couple of words about such
+ * a strange API, instead of something like "add_trigger",
+ * "drop_trigger". A main motivation is that some functions need
+ * a whole trigger list. For example, a function adding Lua
+ * functions as triggers. At the time of this writing it was
+ * lbox_trigger_reset. There is a convention, that any Tarantool
+ * trigger exposed to Lua should provide a way to add one, drop
+ * one, replace one, return all - lbox_trigger_reset does all of
+ * this.
+ */
+struct rlist *
+swim_trigger_list_on_member_update(struct swim *swim);
+
+/**
+ * Check if a SWIM instance has pending update events. Is not a
+ * public one, used by tests.
+ */
+bool
+swim_has_pending_events(struct swim *swim);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 2ba9820d8..3f2d156c5 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -41,6 +41,7 @@
 #include "swim_test_transport.h"
 #include "swim_test_ev.h"
 #include "swim_test_utils.h"
+#include "trigger.h"
 #include <fcntl.h>
 
 /**
@@ -950,10 +951,141 @@ swim_test_slow_net(void)
 	swim_finish_test();
 }
 
+struct trigger_ctx {
+	int counter;
+	bool is_deleted;
+	bool need_sleep;
+	struct fiber *f;
+	struct swim_on_member_update_ctx ctx;
+};
+
+static void
+swim_on_member_update_save_event(struct trigger *t, void *event)
+{
+	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
+	++c->counter;
+	if (c->ctx.member != NULL)
+		swim_member_unref(c->ctx.member);
+	c->ctx = *((struct swim_on_member_update_ctx *) event);
+	swim_member_ref(c->ctx.member);
+}
+
+static void
+swim_on_member_update_yield(struct trigger *t, void *event)
+{
+	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
+	++c->counter;
+	c->f = fiber();
+	while (c->need_sleep)
+		fiber_yield();
+}
+
+static void
+swim_trigger_destroy_cb(struct trigger *t)
+{
+	((struct trigger_ctx *) t->data)->is_deleted = true;
+}
+
+static void
+swim_test_triggers(void)
+{
+	swim_start_test(21);
+	struct swim_cluster *cluster = swim_cluster_new(2);
+	swim_cluster_set_ack_timeout(cluster, 1);
+	struct trigger_ctx tctx, tctx2;
+	memset(&tctx, 0, sizeof(tctx));
+	memset(&tctx2, 0, sizeof(tctx2));
+	struct trigger *t1 = (struct trigger *) malloc(sizeof(*t1));
+	assert(t1 != NULL);
+	trigger_create(t1, swim_on_member_update_save_event, (void *) &tctx,
+		       swim_trigger_destroy_cb);
+
+	/* Skip 'new self' events. */
+	swim_cluster_run_triggers(cluster);
+
+	struct swim *s1 = swim_cluster_member(cluster, 0);
+	trigger_add(swim_trigger_list_on_member_update(s1), t1);
+	swim_cluster_interconnect(cluster, 0, 1);
+	swim_cluster_run_triggers(cluster);
+
+	is(tctx.counter, 1, "trigger is fired");
+	ok(! tctx.is_deleted, "is not deleted");
+	is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1),
+	   "ctx.member is set");
+	is(tctx.ctx.events, SWIM_EV_NEW, "ctx.events is set");
+
+	swim_run_for(1);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 2, "payload is delivered, trigger caught that");
+	is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1),
+	   "S1 got S2' payload");
+	is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD, "mask says that");
+
+	swim_cluster_member_set_payload(cluster, 0, "123", 3);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 3, "self payload is updated");
+	is(tctx.ctx.member, swim_self(s1), "self is set as a member");
+	is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_INCARNATION,
+	   "both incarnation and payload events are presented");
+
+	swim_cluster_set_drop(cluster, 1, 100);
+	fail_if(swim_cluster_wait_status(cluster, 0, 1,
+					 MEMBER_SUSPECTED, 3) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 4, "suspicion fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status suspected");
+
+	fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 3) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 5, "death fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status dead");
+
+	fail_if(swim_cluster_wait_status(cluster, 0, 1,
+					 swim_member_status_MAX, 2) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 6, "drop fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_DROP, "status dropped");
+	is(swim_cluster_member_view(cluster, 0, 1), NULL,
+	   "dropped member is not presented in the member table");
+	isnt(tctx.ctx.member, NULL, "but is in the event context");
+	/*
+	 * There is a complication about yields. If a trigger
+	 * yields, other triggers wait for its finish. And all
+	 * the triggers should be ready to SWIM deletion in the
+	 * middle of an update processing. SWIM object should not
+	 * be deleted, until all the triggers are done.
+	 */
+	struct trigger *t2 = (struct trigger *) malloc(sizeof(*t2));
+	assert(t2 != NULL);
+	tctx2.need_sleep = true;
+	trigger_create(t2, swim_on_member_update_yield, (void *) &tctx2, NULL);
+	trigger_add(swim_trigger_list_on_member_update(s1), t2);
+	swim_cluster_add_link(cluster, 0, 1);
+	swim_cluster_run_triggers(cluster);
+	is(tctx2.counter, 1, "yielding trigger is fired");
+	is(tctx.counter, 6, "non-yielding still is not");
+
+	swim_cluster_delete(cluster);
+	ok(! tctx.is_deleted, "trigger is not deleted until all currently "\
+	   "sleeping triggers are finished");
+	tctx2.need_sleep = false;
+	fiber_wakeup(tctx2.f);
+	while (! tctx.is_deleted)
+		fiber_sleep(0);
+	note("now all the triggers are done and deleted");
+
+	free(t1);
+	free(t2);
+	if (tctx.ctx.member != NULL)
+		swim_member_unref(tctx.ctx.member);
+
+	swim_finish_test();
+}
+
 static int
 main_f(va_list ap)
 {
-	swim_start_test(20);
+	swim_start_test(21);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -979,6 +1111,7 @@ main_f(va_list ap)
 	swim_test_indirect_ping();
 	swim_test_encryption();
 	swim_test_slow_net();
+	swim_test_triggers();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 25fdb8833..3ebfe7ea0 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..20
+1..21
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -200,4 +200,30 @@ ok 19 - subtests
     # slow network leads to idle round steps, they should not produce a new message
 ok 20 - subtests
 	*** swim_test_slow_net: done ***
+	*** swim_test_triggers ***
+    1..21
+    ok 1 - trigger is fired
+    ok 2 - is not deleted
+    ok 3 - ctx.member is set
+    ok 4 - ctx.events is set
+    ok 5 - payload is delivered, trigger caught that
+    ok 6 - S1 got S2' payload
+    ok 7 - mask says that
+    ok 8 - self payload is updated
+    ok 9 - self is set as a member
+    ok 10 - both incarnation and payload events are presented
+    ok 11 - suspicion fired a trigger
+    ok 12 - status suspected
+    ok 13 - death fired a trigger
+    ok 14 - status dead
+    ok 15 - drop fired a trigger
+    ok 16 - status dropped
+    ok 17 - dropped member is not presented in the member table
+    ok 18 - but is in the event context
+    ok 19 - yielding trigger is fired
+    ok 20 - non-yielding still is not
+    ok 21 - trigger is not deleted until all currently sleeping triggers are finished
+    # now all the triggers are done and deleted
+ok 21 - subtests
+	*** swim_test_triggers: done ***
 	*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index f72fa2450..0b4743b2d 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -36,6 +36,7 @@
 #include "trivia/util.h"
 #include "fiber.h"
 #include "msgpuck.h"
+#include "trigger.h"
 
 /**
  * Drop rate packet filter to drop packets with a certain
@@ -191,12 +192,32 @@ swim_cluster_id_to_uri(char *buffer, int id)
 	sprintf(buffer, "127.0.0.1:%d", id + 1);
 }
 
+/**
+ * A trigger to check correctness of event context, and ability
+ * to yield.
+ */
+void
+swim_test_event_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct swim_on_member_update_ctx *ctx =
+		(struct swim_on_member_update_ctx *) event;
+	assert(ctx->events != 0);
+	assert((ctx->events & SWIM_EV_NEW) == 0 ||
+	       (ctx->events & SWIM_EV_DROP) == 0);
+	fiber_sleep(0);
+}
+
 /** Create a SWIM cluster node @a n with a 0-based @a id. */
 static inline void
 swim_node_create(struct swim_node *n, int id)
 {
 	n->swim = swim_new();
 	assert(n->swim != NULL);
+	struct trigger *t = (struct trigger *) malloc(sizeof(*t));
+	trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free);
+	trigger_add(swim_trigger_list_on_member_update(n->swim), t);
+
 	char uri[128];
 	swim_cluster_id_to_uri(uri, id);
 	n->uuid = uuid_nil;
@@ -303,7 +324,7 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id)
 			       swim_member_uri(from), swim_member_uuid(from));
 }
 
-static const struct swim_member *
+const struct swim_member *
 swim_cluster_member_view(struct swim_cluster *cluster, int node_id,
 			 int member_id)
 {
@@ -615,6 +636,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 	 * whatsoever.
 	 */
 	swim_test_transport_do_loop_step(loop);
+	if (cluster != NULL)
+		swim_cluster_run_triggers(cluster);
 	while (! check(cluster, data)) {
 		if (swim_time() >= deadline)
 			return -1;
@@ -625,6 +648,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 		 * too.
 		 */
 		swim_test_transport_do_loop_step(loop);
+		if (cluster != NULL)
+			swim_cluster_run_triggers(cluster);
 	}
 	return 0;
 }
@@ -868,6 +893,23 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
 				 swim_loop_check_member_everywhere, &t);
 }
 
+void
+swim_cluster_run_triggers(struct swim_cluster *cluster)
+{
+	bool has_events;
+	do {
+		has_events = false;
+		struct swim_node *n = cluster->node;
+		for (int i = 0; i < cluster->size; ++i, ++n) {
+			if (n->swim != NULL &&
+			    swim_has_pending_events(n->swim)) {
+				has_events = true;
+				fiber_sleep(0);
+			}
+		}
+	} while (has_events);
+}
+
 bool
 swim_error_check_match(const char *msg)
 {
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 0e6f29d80..fde84e39b 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -168,6 +168,14 @@ int
 swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
 				const char *payload, int size);
 
+/**
+ * Get a member object stored in a SWIM node @a node_id and
+ * showing a known state of a SWIM node @a member_id.
+ */
+const struct swim_member *
+swim_cluster_member_view(struct swim_cluster *cluster, int node_id,
+			 int member_id);
+
 /**
  * Check if in the cluster every instance knowns the about other
  * instances.
@@ -229,6 +237,10 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
 				     int member_id, const char *payload,
 				     int payload_size, double timeout);
 
+/** Run all pending triggers in the cluster. */
+void
+swim_cluster_run_triggers(struct swim_cluster *cluster);
+
 /** Process SWIM events for @a duration fake seconds. */
 void
 swim_run_for(double duration);
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] [PATCH 4/5] swim: call swim:new/delete via Lua C, not via FFI
  2019-06-02  0:00 [tarantool-patches] [PATCH 0/5] SWIM on_member_update Vladislav Shpilevoy
                   ` (2 preceding siblings ...)
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 3/5] swim: allow to hang triggers on member updates Vladislav Shpilevoy
@ 2019-06-02  0:00 ` Vladislav Shpilevoy
  2019-06-08 14:24   ` [tarantool-patches] " Konstantin Osipov
  2019-06-02  0:10 ` [tarantool-patches] [PATCH 5/5] swim: expose Lua triggers on member update Vladislav Shpilevoy
       [not found] ` <12b8ea76f7c1cd100a80ddcea3c29d20354e073e.1559433539.git.v.shpilevoy@tarantool.org>
  5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-02  0:00 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

These function are going to yield in scope of #4250, because
swim:new() will start a fiber, while swim:delete() cancels and
gives it a control.

Needed for #4250
---
 extra/exports      |  2 --
 src/CMakeLists.txt |  3 +-
 src/lua/init.c     |  2 ++
 src/lua/swim.c     | 81 ++++++++++++++++++++++++++++++++++++++++++++++
 src/lua/swim.h     | 34 +++++++++++++++++++
 src/lua/swim.lua   | 13 +++-----
 6 files changed, 123 insertions(+), 12 deletions(-)
 create mode 100644 src/lua/swim.c
 create mode 100644 src/lua/swim.h

diff --git a/extra/exports b/extra/exports
index 0b1102d03..b8c42c0df 100644
--- a/extra/exports
+++ b/extra/exports
@@ -89,12 +89,10 @@ crypto_stream_delete
 
 lua_static_aligned_alloc
 
-swim_new
 swim_is_configured
 swim_cfg
 swim_set_payload
 swim_set_codec
-swim_delete
 swim_add_member
 swim_remove_member
 swim_probe_member
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 68674d06a..64ea95c17 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -117,6 +117,7 @@ set (server_sources
      lua/info.c
      lua/string.c
      lua/buffer.c
+     lua/swim.c
      ${lua_sources}
      ${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc
      ${PROJECT_SOURCE_DIR}/third_party/lua-yaml/b64.c
@@ -169,7 +170,7 @@ target_link_libraries(server core coll http_parser bit uri uuid swim swim_udp
 
 # Rule of thumb: if exporting a symbol from a static library, list the
 # library here.
-set (reexport_libraries server core misc bitset csv swim
+set (reexport_libraries server core misc bitset csv swim swim_udp swim_ev
      ${LUAJIT_LIBRARIES} ${MSGPUCK_LIBRARIES} ${ICU_LIBRARIES})
 
 set (common_libraries
diff --git a/src/lua/init.c b/src/lua/init.c
index 5ddc5a4d8..d8b3501be 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -58,6 +58,7 @@
 #include "lua/fio.h"
 #include "lua/httpc.h"
 #include "lua/utf8.h"
+#include "lua/swim.h"
 #include "digest.h"
 #include <small/ibuf.h>
 
@@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
 	tarantool_lua_socket_init(L);
 	tarantool_lua_pickle_init(L);
 	tarantool_lua_digest_init(L);
+	tarantool_lua_swim_init(L);
 	luaopen_http_client_driver(L);
 	lua_pop(L, 1);
 	luaopen_msgpack(L);
diff --git a/src/lua/swim.c b/src/lua/swim.c
new file mode 100644
index 000000000..3b9e229be
--- /dev/null
+++ b/src/lua/swim.c
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim/swim.h"
+#include "trigger.h"
+#include "diag.h"
+#include "lua/utils.h"
+
+static uint32_t ctid_swim_ptr;
+
+/**
+ * Create a new SWIM instance. SWIM is not created via FFI,
+ * because this operation yields.
+ * @retval 1 Success. A SWIM instance pointer is on the stack.
+ * @retval 2 Error. Nil and an error object are pushed.
+ */
+static int
+lua_swim_new(struct lua_State *L)
+{
+	struct swim *s = swim_new();
+	*(struct swim **) luaL_pushcdata(L, ctid_swim_ptr) = s;
+	if (s != NULL)
+		return 1;
+	luaT_pusherror(L, diag_last_error(diag_get()));
+	return 2;
+}
+
+/**
+ * Delete a SWIM instance. SWIM is not deleted via FFI, because
+ * this operation yields.
+ */
+static int
+lua_swim_delete(struct lua_State *L)
+{
+	uint32_t ctypeid;
+	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+	swim_delete(s);
+	return 0;
+}
+
+void
+tarantool_lua_swim_init(struct lua_State *L)
+{
+	luaL_cdef(L, "struct swim;");
+	ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
+
+	static const struct luaL_Reg lua_swim_internal_methods [] = {
+		{"swim_new", lua_swim_new},
+		{"swim_delete", lua_swim_delete},
+		{NULL, NULL}
+	};
+	luaL_register_module(L, "swim", lua_swim_internal_methods);
+	lua_pop(L, 1);
+}
diff --git a/src/lua/swim.h b/src/lua/swim.h
new file mode 100644
index 000000000..b60bb76b7
--- /dev/null
+++ b/src/lua/swim.h
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+struct lua_State;
+
+void
+tarantool_lua_swim_init(struct lua_State *L);
diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index 4893d5767..527299284 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -3,6 +3,7 @@ local uuid = require('uuid')
 local buffer = require('buffer')
 local msgpack = require('msgpack')
 local crypto = require('crypto')
+local internal = require('swim')
 
 ffi.cdef[[
     struct swim;
@@ -23,9 +24,6 @@ ffi.cdef[[
         MEMBER_LEFT,
     };
 
-    struct swim *
-    swim_new(void);
-
     bool
     swim_is_configured(const struct swim *swim);
 
@@ -41,9 +39,6 @@ ffi.cdef[[
     swim_set_codec(struct swim *swim, enum crypto_algo algo,
                    enum crypto_mode mode, const char *key, int key_size);
 
-    void
-    swim_delete(struct swim *swim);
-
     int
     swim_add_member(struct swim *swim, const char *uri,
                     const struct tt_uuid *uuid);
@@ -466,7 +461,7 @@ local swim_mt_deleted = {
 --
 local function swim_delete(s)
     local ptr = swim_check_instance(s, 'swim:delete')
-    capi.swim_delete(ffi.gc(ptr, nil))
+    internal.swim_delete(ffi.gc(ptr, nil))
     s.ptr = nil
     setmetatable(s, swim_mt_deleted)
 end
@@ -821,11 +816,11 @@ local cache_table_mt = { __mode = 'v' }
 -- provided.
 --
 local function swim_new(cfg)
-    local ptr = capi.swim_new()
+    local ptr = internal.swim_new()
     if ptr == nil then
         return nil, box.error.last()
     end
-    ffi.gc(ptr, capi.swim_delete)
+    ffi.gc(ptr, internal.swim_delete)
     local s = setmetatable({
         ptr = ptr,
         cfg = setmetatable({index = {}}, swim_cfg_not_configured_mt),
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] [PATCH 5/5] swim: expose Lua triggers on member update
  2019-06-02  0:00 [tarantool-patches] [PATCH 0/5] SWIM on_member_update Vladislav Shpilevoy
                   ` (3 preceding siblings ...)
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
@ 2019-06-02  0:10 ` Vladislav Shpilevoy
  2019-06-05 21:54   ` [tarantool-patches] " Vladislav Shpilevoy
       [not found] ` <12b8ea76f7c1cd100a80ddcea3c29d20354e073e.1559433539.git.v.shpilevoy@tarantool.org>
  5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-02  0:10 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

SWIM as a monitoring module is hard to use without an ability to subscribe
on events. Otherwise a user should have polled a SWIM
member table for updates - it would be too inefficient.

This commit exposes an ability to hang Lua triggers.

Closes #4250

@TarantoolBot document
Title: SWIM: swim:on_member_update

Now a user can hang triggers on member table updates. There is a
function for that, which can be used in one of several ways:
```Lua
swim:on_member_update(new_trigger[, ctx])
```
Add a new trigger on member table update. The function
`new_trigger` will be called on each new member appearance, an
existing member drop, and update. It should take 3 arguments:
first is an updated SWIM member, second is an events object,
third is `ctx` passed as is.

Events object has methods to help a user to determine what update
has happened.
```Lua
local function on_update(member, events, ctx)
    if events:is_new() then
        ...
    elseif events:is_drop() then
        ...
    end

    if events:is_update() then
        if events:is_new_status() then
            ...
        elseif events:is_new_uri() then
            ...
        elseif events:is_new_incarnation() then
            ...
        elseif events:is_new_payload() then
            ...
        end
    end
end

s:on_member_update(on_update, ctx)
```
Note, that multiple events can happen simultaneously. A user
should be ready to that. Additionally, 'new' and 'drop' never
happen together. But they can happen with 'update', easily.
Especially if there are lots of updates, and triggers work too
slow. Then a member can be added and updated after a while, but
still does not reach a trigger.

A remarkable case is 'new' + 'new payload'. This case does not
correlate with triggers speed. The thing is that payload absence
and payload of size 0 are not the same. And sometimes is happens,
that a member is added without a payload. For example, a ping
was received - pings do not carry payload. In such a case the
missed payload is received later eventually. If that matters for
a user's application, it should be ready to that: 'new' does not
mean, that the member already has a payload, and payload size
says nothing about its presence or absence.

Second usage case:
```Lua
swim:on_member_update(nil, old_trigger)
```
Drop an old trigger.

Third usage case:
```Lua
swim:on_member_update(new_trigger, old_trigger[, ctx])
```
Replace an existing trigger in-place, with keeping its position
in the trigger list.

Fourth usage case:
```Lua
swim:on_member_update()
```
Get a list of triggers.

When drop or replace a trigger, a user should be attentive - the
following code does not work:
```Lua
tr = function() ... end
-- Add a trigger.
s:on_member_update(tr)
...
-- Drop a trigger.
s:on_member_update(nil, tr)
```
The last line, if be precise. This is because SWIM wraps user
triggers with an internal closure for parameters preprocessing.
To drop a trigger a user should save result of the first
on_member_update() call.

This code works:
```Lua
tr = function() ... end
-- Add a trigger.
tr_id = s:on_member_update(tr)
...
-- Drop a trigger.
s:on_member_update(nil, tr_id)
```

The triggers are executed one by one in a separate fiber. And
they can yield. These two facts mean that if one trigger sleeps
too long - other triggers wait. It does not block SWIM from doing
its routine operations, but block other triggers.

The last point to remember is that if a member was added and
dropped before its appearance has reached a trigger, then such
a member does not fire triggers at all. A user will not notice
that easy rider member.
---
 src/lua/swim.c          |  27 ++++-
 src/lua/swim.lua        |  90 ++++++++++++++
 test/swim/swim.result   | 263 ++++++++++++++++++++++++++++++++++++++++
 test/swim/swim.test.lua |  85 +++++++++++++
 4 files changed, 464 insertions(+), 1 deletion(-)

diff --git a/src/lua/swim.c b/src/lua/swim.c
index 3b9e229be..17441e58c 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -33,8 +33,31 @@
 #include "diag.h"
 #include "lua/utils.h"
 
+static uint32_t ctid_swim_member_ptr;
 static uint32_t ctid_swim_ptr;
 
+/** Push member update context into a Lua stack. */
+static int
+lua_swim_on_member_update_push(struct lua_State *L, void *event)
+{
+	struct swim_on_member_update_ctx *ctx =
+		(struct swim_on_member_update_ctx *) event;
+	*(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) =
+		ctx->member;
+	lua_pushinteger(L, ctx->events);
+	return 2;
+}
+
+/** Hang or/and delete a trigger on a SWIM member update. */
+static int
+lua_swim_on_member_update(struct lua_State *L)
+{
+	uint32_t ctypeid;
+	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+	return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_update(s),
+				  lua_swim_on_member_update_push, NULL);
+}
+
 /**
  * Create a new SWIM instance. SWIM is not created via FFI,
  * because this operation yields.
@@ -68,12 +91,14 @@ lua_swim_delete(struct lua_State *L)
 void
 tarantool_lua_swim_init(struct lua_State *L)
 {
-	luaL_cdef(L, "struct swim;");
+	luaL_cdef(L, "struct swim_member; struct swim;");
+	ctid_swim_member_ptr = luaL_ctypeid(L, "struct swim_member *");
 	ctid_swim_ptr = luaL_ctypeid(L, "struct swim *");
 
 	static const struct luaL_Reg lua_swim_internal_methods [] = {
 		{"swim_new", lua_swim_new},
 		{"swim_delete", lua_swim_delete},
+		{"swim_on_member_update", lua_swim_on_member_update},
 		{NULL, NULL}
 	};
 	luaL_register_module(L, "swim", lua_swim_internal_methods);
diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index 527299284..9b8ff1c29 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -24,6 +24,16 @@ ffi.cdef[[
         MEMBER_LEFT,
     };
 
+    enum swim_ev_mask {
+        SWIM_EV_NEW             = 0b00000001,
+        SWIM_EV_NEW_STATUS      = 0b00000010,
+        SWIM_EV_NEW_URI         = 0b00000100,
+        SWIM_EV_NEW_INCARNATION = 0b00001000,
+        SWIM_EV_NEW_PAYLOAD     = 0b00010000,
+        SWIM_EV_UPDATE          = 0b00011110,
+        SWIM_EV_DROP            = 0b00100000,
+    };
+
     bool
     swim_is_configured(const struct swim *swim);
 
@@ -689,6 +699,84 @@ local function swim_pairs(s)
     return swim_pairs_next, {swim = s, iterator = iterator}, nil
 end
 
+local swim_on_member_update_index = {
+    is_new = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0
+    end,
+    is_drop = function(self)
+        return bit.band(self[1], capi.SWIM_EV_DROP) ~= 0
+    end,
+    is_update = function(self)
+        return bit.band(self[1], capi.SWIM_EV_UPDATE) ~= 0
+    end,
+    is_new_status = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_STATUS) ~= 0
+    end,
+    is_new_uri = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_URI) ~= 0
+    end,
+    is_new_incarnation = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_INCARNATION) ~= 0
+    end,
+    is_new_payload = function(self)
+        return bit.band(self[1], capi.SWIM_EV_NEW_PAYLOAD) ~= 0
+    end,
+}
+
+local swim_on_member_update_event_mt = {
+    __index = swim_on_member_update_index,
+    __serialize = function(self)
+        local res = {}
+        for k, v in pairs(swim_on_member_update_index) do
+            v = v(self)
+            if v then
+                res[k] = v
+            end
+        end
+        return res
+    end,
+}
+
+--
+-- Create a closure function for preprocessing raw SWIM member
+-- update trigger parameters.
+-- @param s SWIM instance.
+-- @param callback User functions to call.
+-- @param ctx An optional parameter for @a callback passed as is.
+-- @return A function to set as a trigger.
+--
+local function swim_on_member_update_new(s, callback, ctx)
+    return function(member_ptr, event_mask)
+        local m = swim_wrap_member(s, member_ptr)
+        local event = setmetatable({event_mask}, swim_on_member_update_event_mt)
+        return callback(m, event, ctx)
+    end
+end
+
+--
+-- Add or/and delete a trigger on member update. Possible usages:
+--
+-- * on_member_update(new[, ctx]) - add a new trigger. It should
+--   accept 3 arguments: an updated member, an events object, an
+--   optional @a ctx parameter passed as is.
+--
+-- * on_member_update(new, old[, ctx]) - add a new trigger @a new
+--   if not nil, in place of @a old trigger.
+--
+-- * on_member_update() - get a list of triggers.
+--
+local function swim_on_member_update(s, new, old, ctx)
+    local ptr = swim_check_instance(s, 'swim:on_member_update')
+    if type(old) ~= 'function' then
+        ctx = old
+        old = nil
+    end
+    if new ~= nil then
+        new = swim_on_member_update_new(s, new, ctx)
+    end
+    return internal.swim_on_member_update(ptr, new, old)
+end
+
 --
 -- Normal metatable of a configured SWIM instance.
 --
@@ -708,6 +796,7 @@ local swim_mt = {
         set_payload = swim_set_payload,
         set_codec = swim_set_codec,
         pairs = swim_pairs,
+        on_member_update = swim_on_member_update,
     },
     __serialize = swim_serialize
 }
@@ -800,6 +889,7 @@ local swim_not_configured_mt = {
         delete = swim_delete,
         is_configured = swim_is_configured,
         set_codec = swim_set_codec,
+        on_member_update = swim_on_member_update,
     },
     __serialize = swim_serialize
 }
diff --git a/test/swim/swim.result b/test/swim/swim.result
index 436d4e579..1de903b04 100644
--- a/test/swim/swim.result
+++ b/test/swim/swim.result
@@ -1216,6 +1216,269 @@ s1:delete()
 s2:delete()
 ---
 ...
+--
+-- gh-4250: allow to set triggers on a new member appearance, old
+-- member drop, member update.
+--
+s1 = swim.new()
+---
+...
+s1.on_member_update()
+---
+- error: 'builtin/swim.lua:<line>: swim:on_member_update: first argument is not a SWIM
+    instance'
+...
+m_list = {}
+---
+...
+e_list = {}
+---
+...
+ctx_list = {}
+---
+...
+f = nil
+---
+...
+f_need_sleep = false
+---
+...
+_ = test_run:cmd("setopt delimiter ';'")
+---
+...
+t_save_event = function(m, e, ctx)
+    table.insert(m_list, m)
+    table.insert(e_list, e)
+    table.insert(ctx_list, ctx)
+end;
+---
+...
+t_yield = function(m, e, ctx)
+    f = fiber.self()
+    t_save_event(m, e, ctx)
+    while f_need_sleep do fiber.sleep(10000) end
+end;
+---
+...
+_ = test_run:cmd("setopt delimiter ''");
+---
+...
+t_save_event_id = s1:on_member_update(t_save_event, 'ctx')
+---
+...
+-- Not equal, because SWIM wraps user triggers with a closure for
+-- context preprocessing.
+t_save_event_id ~= t_save_event
+---
+- true
+...
+s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
+---
+- true
+...
+while #m_list < 1 do fiber.sleep(0) end
+---
+...
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 0
+...
+e_list
+---
+- - is_new_payload: true
+    is_new_uri: true
+    is_new: true
+    is_update: true
+...
+ctx_list
+---
+- - ctx
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+t_yield_id = s1:on_member_update(t_yield, 'ctx2')
+---
+...
+f_need_sleep = true
+---
+...
+s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
+---
+...
+s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
+---
+- true
+...
+while s1:size() ~= 2 do fiber.sleep(0.01) end
+---
+...
+-- Only first trigger worked. Second is waiting, because first
+-- sleeps.
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+...
+e_list
+---
+- - is_new_payload: true
+    is_new: true
+    is_update: true
+...
+ctx_list
+---
+- - ctx2
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+-- But it does not prevent normal SWIM operation.
+s1:set_payload('payload')
+---
+- true
+...
+while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
+---
+...
+s2:member_by_uuid(s1:self():uuid()):payload()
+---
+- payload
+...
+f_need_sleep = false
+---
+...
+fiber.wakeup(f)
+---
+...
+while #m_list ~= 3 do fiber.sleep(0.01) end
+---
+...
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+  - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 2
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 8
+  - uri: 127.0.0.1:<port>
+    status: alive
+    incarnation: 2
+    uuid: 00000000-0000-1000-8000-000000000001
+    payload_size: 8
+...
+e_list
+---
+- - is_new_payload: true
+    is_new: true
+    is_update: true
+  - is_new_payload: true
+    is_update: true
+    is_new_incarnation: true
+  - is_new_payload: true
+    is_update: true
+    is_new_incarnation: true
+...
+ctx_list
+---
+- - ctx
+  - ctx2
+  - ctx
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+#s1:on_member_update()
+---
+- 2
+...
+s1:on_member_update(nil, t_yield_id)
+---
+...
+s2:quit()
+---
+...
+while s1:size() ~= 1 do fiber.sleep(0.01) end
+---
+...
+-- Process update.
+fiber.sleep(0)
+---
+...
+-- Two events - status update to 'left', and 'drop'.
+m_list
+---
+- - uri: 127.0.0.1:<port>
+    status: left
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+  - uri: 127.0.0.1:<port>
+    status: left
+    incarnation: 1
+    uuid: 00000000-0000-1000-8000-000000000002
+    payload_size: 0
+...
+e_list
+---
+- - is_new_status: true
+    is_update: true
+  - is_drop: true
+...
+ctx_list
+---
+- - ctx
+  - ctx
+...
+m = m_list[1]
+---
+...
+-- Cached member table works even when a member is deleted.
+m_list[1] == m_list[2]
+---
+- true
+...
+m_list = {} e_list = {} ctx_list = {}
+---
+...
+s1:on_member_update(nil, t_save_event_id)
+---
+...
+s1:add_member({uuid = m:uuid(), uri = m:uri()})
+---
+- true
+...
+fiber.sleep(0)
+---
+...
+-- No updates - all the triggers are dropped.
+m_list
+---
+- []
+...
+e_list
+---
+- []
+...
+ctx_list
+---
+- []
+...
+s1:delete()
+---
+...
 test_run:cmd("clear filter")
 ---
 - true
diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
index a3eac9b46..25eb5a7d0 100644
--- a/test/swim/swim.test.lua
+++ b/test/swim/swim.test.lua
@@ -413,4 +413,89 @@ s2:member_by_uuid(s1:self():uuid())
 s1:delete()
 s2:delete()
 
+--
+-- gh-4250: allow to set triggers on a new member appearance, old
+-- member drop, member update.
+--
+s1 = swim.new()
+s1.on_member_update()
+
+m_list = {}
+e_list = {}
+ctx_list = {}
+f = nil
+f_need_sleep = false
+_ = test_run:cmd("setopt delimiter ';'")
+t_save_event = function(m, e, ctx)
+    table.insert(m_list, m)
+    table.insert(e_list, e)
+    table.insert(ctx_list, ctx)
+end;
+t_yield = function(m, e, ctx)
+    f = fiber.self()
+    t_save_event(m, e, ctx)
+    while f_need_sleep do fiber.sleep(10000) end
+end;
+_ = test_run:cmd("setopt delimiter ''");
+t_save_event_id = s1:on_member_update(t_save_event, 'ctx')
+-- Not equal, because SWIM wraps user triggers with a closure for
+-- context preprocessing.
+t_save_event_id ~= t_save_event
+
+s1:cfg{uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}
+while #m_list < 1 do fiber.sleep(0) end
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+
+t_yield_id = s1:on_member_update(t_yield, 'ctx2')
+f_need_sleep = true
+s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
+s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
+while s1:size() ~= 2 do fiber.sleep(0.01) end
+-- Only first trigger worked. Second is waiting, because first
+-- sleeps.
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+-- But it does not prevent normal SWIM operation.
+s1:set_payload('payload')
+while not s2:member_by_uuid(s1:self():uuid()):payload() do fiber.sleep(0.01) end
+s2:member_by_uuid(s1:self():uuid()):payload()
+
+f_need_sleep = false
+fiber.wakeup(f)
+while #m_list ~= 3 do fiber.sleep(0.01) end
+m_list
+e_list
+ctx_list
+m_list = {} e_list = {} ctx_list = {}
+#s1:on_member_update()
+
+s1:on_member_update(nil, t_yield_id)
+s2:quit()
+while s1:size() ~= 1 do fiber.sleep(0.01) end
+-- Process update.
+fiber.sleep(0)
+-- Two events - status update to 'left', and 'drop'.
+m_list
+e_list
+ctx_list
+m = m_list[1]
+-- Cached member table works even when a member is deleted.
+m_list[1] == m_list[2]
+m_list = {} e_list = {} ctx_list = {}
+
+s1:on_member_update(nil, t_save_event_id)
+s1:add_member({uuid = m:uuid(), uri = m:uri()})
+fiber.sleep(0)
+-- No updates - all the triggers are dropped.
+m_list
+e_list
+ctx_list
+
+s1:delete()
+
 test_run:cmd("clear filter")
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 1/5] test: create isolated ev_loop for swim unit tests
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
@ 2019-06-05  6:51   ` Konstantin Osipov
  2019-06-05 21:53     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-05  6:51 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:
> --- a/src/lib/swim/swim_ev.c
> +++ b/src/lib/swim/swim_ev.c
> @@ -55,3 +55,9 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher)
>  {
>  	ev_timer_stop(loop, watcher);
>  }
> +
> +struct ev_loop *
> +swim_loop(void)

The comment explaining why you need a separate loop should be
here, not in the tests, since this is the place most people will
be looking at and wondering why you need this wrapper at all.

You could hack this around with a define, but I think your
approach is more clean, so please just add a comment.

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 2/5] swim: fix a 'use after free' in SWIM tests
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
@ 2019-06-05  6:52   ` Konstantin Osipov
  0 siblings, 0 replies; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-05  6:52 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:

LGTM.

>  test/unit/swim_test_transport.c | 1 +
>  1 file changed, 1 insertion(+)
> 
> diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
> index c4a1dd774..334ac926e 100644
> --- a/test/unit/swim_test_transport.c
> +++ b/test/unit/swim_test_transport.c
> @@ -85,6 +85,7 @@ swim_test_packet_new(const char *data, int size, const struct sockaddr_in *src,
>  static inline void
>  swim_test_packet_delete(struct swim_test_packet *p)
>  {
> +	rlist_del_entry(p, in_queue);
>  	free(p);
>  }
>  
> -- 
> 2.20.1 (Apple Git-117)
> 

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 3/5] swim: allow to hang triggers on member updates
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 3/5] swim: allow to hang triggers on member updates Vladislav Shpilevoy
@ 2019-06-05  7:11   ` Konstantin Osipov
  2019-06-05 21:53     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-05  7:11 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:

I like the design: elegant and lean. Some comments below.

> This commit allows to hang triggers on member updates.

"hang triggers" sounds ominous. I think you simply
meant "set triggers". See also patch subject, please change as
well.

>  	struct rlist in_dissemination_queue;
> +	/**
> +	 * Each time a member is updated, or created, or dropped,
> +	 * it is added to an update queue. Members from this queue
> +	 * are dispatched into user defined triggers.
> +	 */
> +	struct stailq_entry in_update_queue;

I think the name is a bit confusing. After all, the dissemination
queue is also an update queue. in_local_queue? in_trigger_queue?
in_event_queue? in_pending_member_update_queue? 
in_member_update? in_member_update_queue?

Please also make sure that all API names continue to agree with
each other and internal member names after the rename.

> +	/**
> +	 * Mask of events happened with this member since a
> +	 * previous trigger invocation. Once the events are
> +	 * delivered into a trigger, the mask is nullified and
> +	 * starts collecting new events.
> +	 */
> +	enum swim_ev_mask events;

Since you use 'events' for an associated mask, perhaps
in_event_queue or in_local_event_queue is good enough.
> +	/**
> +	 * Worker fiber to serve member update triggers. This task
> +	 * is being done in a separate fiber, because user
> +	 * triggers can yield and libev callbacks, processing
> +	 * member updates, are not allowed to yield.
> +	 */
> +	struct fiber *worker;

I think as long as the worker is only used for triggers, the name
should be more specific. when the worker is used for other events
or actions, it could be renamed.

> @@ -734,7 +801,6 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
>  	mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
>  	assert(rc != mh_end(swim->members));
>  	mh_swim_table_del(swim->members, rc, NULL);
> -	swim_cached_round_msg_invalidate(swim);
>  	rlist_del_entry(member, in_round_queue);

Stray change? At least I don't understand it.

> +	struct swim *s = va_arg(va, struct swim *);
> +	struct swim_on_member_update_ctx ctx;
> +	while (! fiber_is_cancelled()) {
> +		while (stailq_empty(&s->update_queue)) {
> +			fiber_yield();
> +			if (fiber_is_cancelled())
> +				goto end;

> +		}

Since you use goto anyway, why a double loop?

    while (! fiber_is_cancelled()) {
        if (stailq_empty(&s->update_queue) {
            fiber_yield();
            continue;
        }
        ... 
    }
> +	}
> +end:
> +	swim_do_delete(s);

I don't mind swim_do_delete(), but I think the worker should not
be destorying the instance. Looks a breach of encapsulation, this assumes
the workers knows about all other uses of struct swim.

Why not cancel and join the worker from the destructor and then
safely do the delete?

> +	swim->worker = fiber_new("SWIM worker", swim_worker_f);
> +	if (swim->worker == NULL) {
> +		free(swim);
> +		return NULL;
> +	}

>  	swim->members = mh_swim_table_new();
>  	if (swim->members == NULL) {
> +		fiber_cancel(swim->worker);

Better to start the worker last, then you will have easy time
destroying it on failure.  This is a minor nit, ignore if you wish.

>  		free(swim);
>  		diag_set(OutOfMemory, sizeof(*swim->members),
>  			 "mh_swim_table_new", "members");
> @@ -1700,7 +1832,10 @@ swim_new(void)
> @@ -1951,17 +2088,19 @@ swim_size(const struct swim *swim)
>  	return mh_size(swim->members);
>  }
>  
>  
> +void
> +swim_delete(struct swim *swim)
> +{
> +	fiber_wakeup(swim->worker);
> +	fiber_cancel(swim->worker);
> +	fiber_sleep(0);

This looks like a hack, please cancel and join the worker instead.

> @@ -1991,7 +2139,8 @@ swim_quit_step_complete(struct swim_task *task,
>  	(void) rc;
>  	struct swim *swim = swim_by_scheduler(scheduler);
>  	if (rlist_empty(&swim->round_queue)) {
> -		swim_delete(swim);
> +		fiber_wakeup(swim->worker);
> +		fiber_cancel(swim->worker);
>  		return;

And now this code implicitly destroys the swim instance, which is
hard to follow when reading.
-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 1/5] test: create isolated ev_loop for swim unit tests
  2019-06-05  6:51   ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-05 21:53     ` Vladislav Shpilevoy
  2019-06-08 14:24       ` Konstantin Osipov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-05 21:53 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

Hi! Thanks for the review.

On 05/06/2019 09:51, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:
>> --- a/src/lib/swim/swim_ev.c
>> +++ b/src/lib/swim/swim_ev.c
>> @@ -55,3 +55,9 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher)
>>  {
>>  	ev_timer_stop(loop, watcher);
>>  }
>> +
>> +struct ev_loop *
>> +swim_loop(void)
> 
> The comment explaining why you need a separate loop should be
> here, not in the tests, since this is the place most people will
> be looking at and wondering why you need this wrapper at all.
> 
> You could hack this around with a define, but I think your
> approach is more clean, so please just add a comment.
> 

It can't be solved with define, because I need swim.o. I can't
postpone preprocessor work till linking time. Otherwise we could
just implement every function in swim_ev.h and swim_transport.h
as a macros.

The comment is moved and slightly modified.

=======================================================================

diff --git a/src/lib/swim/swim_ev.h b/src/lib/swim/swim_ev.h
index 900be150f..37e743d45 100644
--- a/src/lib/swim/swim_ev.h
+++ b/src/lib/swim/swim_ev.h
@@ -52,6 +52,21 @@ swim_ev_timer_again(struct ev_loop *loop, struct ev_timer *watcher);
 void
 swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher);
 
+/**
+ * The unit tests code with the fake events and time does lots of
+ * forbidden things: it manually invokes pending watcher
+ * callbacks; manages global time without a kernel; puts not
+ * existing descriptors into the loop. All these actions does not
+ * affect the loop until yield. On yield a scheduler fiber wakes
+ * up and 1) infinitely generates EV_READ on not existing
+ * descriptors because considers them closed; 2) manual pending
+ * callbacks invocation asserts, because it is not allowed for
+ * non-scheduler fibers. To avoid these problems a new isolated
+ * loop is created, not visible for the scheduler. Here the fake
+ * events library can rack and ruin whatever it wants. This
+ * function is supposed to be an alias for 'loop()' in the
+ * Tarantool core, but be an isolated object in tests.
+ */
 struct ev_loop *
 swim_loop(void);
 
diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c
index fb25ac9e4..23d909b05 100644
--- a/test/unit/swim_test_ev.c
+++ b/test/unit/swim_test_ev.c
@@ -63,17 +63,9 @@ typedef void (*swim_event_process_f)(struct swim_event *, struct ev_loop *);
 typedef void (*swim_event_delete_f)(struct swim_event *);
 
 /**
- * The unit tests code with the fake events and time does lots of
- * forbidden things: it manually invokes pending watcher
- * callbacks; manages global time without a kernel; puts not
- * existing descriptors into the loop. All these actions does not
- * affect the loop until yield. On yield a scheduler fiber wakes
- * up and 1) infinitely generates EV_READ on not existing
- * descriptors because considers them closed; 2) manual pending
- * callbacks invocation asserts, because it is not allowed for
- * non-scheduler fibers. To avoid these problems a new isolated
- * loop is created, not visible for the scheduler. Here the fake
- * events library can rack and ruin whatever it wants.
+ * An isolated event loop not visible to the fiber scheduler,
+ * where it is safe to use fake file descriptors, manually invoke
+ * callbacks etc.
  */
 static struct ev_loop *test_loop;
 

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 3/5] swim: allow to hang triggers on member updates
  2019-06-05  7:11   ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-05 21:53     ` Vladislav Shpilevoy
  2019-06-07 13:35       ` Konstantin Osipov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-05 21:53 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

On 05/06/2019 10:11, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:
> 
> I like the design: elegant and lean. Some comments below.
> 

Thanks.

>> This commit allows to hang triggers on member updates.
> 
> "hang triggers" sounds ominous. I think you simply
> meant "set triggers". See also patch subject, please change as
> well.
> 

Yes, 'set' is the right word. Updated in all places.

>>  	struct rlist in_dissemination_queue;
>> +	/**
>> +	 * Each time a member is updated, or created, or dropped,
>> +	 * it is added to an update queue. Members from this queue
>> +	 * are dispatched into user defined triggers.
>> +	 */
>> +	struct stailq_entry in_update_queue;
> 
> I think the name is a bit confusing. After all, the dissemination
> queue is also an update queue. in_local_queue? in_trigger_queue?
> in_event_queue? in_pending_member_update_queue? 
> in_member_update? in_member_update_queue?

Indeed, 'update' is not the best name. I guess, 'event_queue'
is better - it is consistent with 'events' field in
struct swim_member. Besides, when a member is dropped, technically
we can't call it 'update'.

But then we should rename everything, including
'swim_trigger_list_on_member_update', and Lua part
'swim:on_member_update'.

They will be 'swim_trigger_list_on_member_event',
'swim:on_member_event' etc.

=======================================================================

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 8507eea14..d48e43761 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -337,10 +337,10 @@ struct swim_member {
 	struct rlist in_dissemination_queue;
 	/**
 	 * Each time a member is updated, or created, or dropped,
-	 * it is added to an update queue. Members from this queue
+	 * it is added to an event queue. Members from this queue
 	 * are dispatched into user defined triggers.
 	 */
-	struct stailq_entry in_update_queue;
+	struct stailq_entry in_event_queue;
 	/**
 	 * Mask of events happened with this member since a
 	 * previous trigger invocation. Once the events are
@@ -474,12 +474,12 @@ struct swim {
 	 * the events to triggers. Dropped members are also kept
 	 * here until they are handled by a trigger.
 	 */
-	struct stailq update_queue;
+	struct stailq event_queue;
 	/**
 	 * List of triggers to call on each new, dropped, and
 	 * updated member.
 	 */
-	struct rlist on_member_update;
+	struct rlist on_member_event;
 	/**
 	 * Members to which a message should be sent next during
 	 * this round.
@@ -498,10 +498,10 @@ struct swim {
 	 */
 	struct swim_member **shuffled;
 	/**
-	 * Worker fiber to serve member update triggers. This task
+	 * Worker fiber to serve member event triggers. This task
 	 * is being done in a separate fiber, because user
 	 * triggers can yield and libev callbacks, processing
-	 * member updates, are not allowed to yield.
+	 * member events, are not allowed to yield.
 	 */
 	struct fiber *worker;
 	/**
@@ -587,10 +587,10 @@ swim_on_member_update(struct swim *swim, struct swim_member *member,
 	member->unacknowledged_pings = 0;
 	swim_register_event(swim, member);
 	/*
-	 * Member update should be delivered to triggers only
-	 * if there is at least one trigger.
+	 * Member event should be delivered to triggers only if
+	 * there is at least one trigger.
 	 */
-	if (! rlist_empty(&swim->on_member_update)) {
+	if (! rlist_empty(&swim->on_member_event)) {
 		/*
 		 * Member is referenced and added to a queue only
 		 * once. That moment can be detected when a first
@@ -598,8 +598,8 @@ swim_on_member_update(struct swim *swim, struct swim_member *member,
 		 */
 		if (member->events == 0 && events != 0) {
 			swim_member_ref(member);
-			stailq_add_tail_entry(&swim->update_queue, member,
-					      in_update_queue);
+			stailq_add_tail_entry(&swim->event_queue, member,
+					      in_event_queue);
 			fiber_wakeup(swim->worker);
 		}
 		member->events |= events;
@@ -607,15 +607,15 @@ swim_on_member_update(struct swim *swim, struct swim_member *member,
 }
 
 struct rlist *
-swim_trigger_list_on_member_update(struct swim *swim)
+swim_trigger_list_on_member_event(struct swim *swim)
 {
-	return &swim->on_member_update;
+	return &swim->on_member_event;
 }
 
 bool
 swim_has_pending_events(struct swim *swim)
 {
-	return ! stailq_empty(&swim->update_queue);
+	return ! stailq_empty(&swim->event_queue);
 }
 
 /**
@@ -1744,7 +1744,7 @@ swim_do_delete(struct swim *swim);
 
 /**
  * Worker fiber. At this moment its only task is dispatching
- * member updates to user defined triggers. Generally, because
+ * member events to user defined triggers. Generally, because
  * SWIM is fully IO driven, that fiber should be used only for
  * yielding tasks not related to SWIM core logic. For all the
  * other tasks libev callbacks are ok. Unfortunately, yields are
@@ -1755,22 +1755,22 @@ static int
 swim_worker_f(va_list va)
 {
 	struct swim *s = va_arg(va, struct swim *);
-	struct swim_on_member_update_ctx ctx;
+	struct swim_on_member_event_ctx ctx;
 	while (! fiber_is_cancelled()) {
-		while (stailq_empty(&s->update_queue)) {
+		while (stailq_empty(&s->event_queue)) {
 			fiber_yield();
 			if (fiber_is_cancelled())
 				goto end;
 		}
 		/*
 		 * Can't be empty. SWIM deletes members from
-		 * update queue only on SWIM deletion, but then
+		 * event queue only on SWIM deletion, but then
 		 * the fiber would be stopped already.
 		 */
-		assert(! stailq_empty(&s->update_queue));
+		assert(! stailq_empty(&s->event_queue));
 		struct swim_member *m =
-			stailq_shift_entry(&s->update_queue, struct swim_member,
-					   in_update_queue);
+			stailq_shift_entry(&s->event_queue, struct swim_member,
+					   in_event_queue);
 		/*
 		 * It is possible, that a member was added and
 		 * removed before firing a trigger. It happens,
@@ -1783,7 +1783,7 @@ swim_worker_f(va_list va)
 			ctx.member = m;
 			ctx.events = m->events;
 			m->events = 0;
-			if (trigger_run(&s->on_member_update, (void *) &ctx))
+			if (trigger_run(&s->on_member_event, (void *) &ctx))
 				diag_log();
 		}
 		swim_member_unref(m);
@@ -1832,8 +1832,8 @@ swim_new(void)
 
 	/* Dissemination component. */
 	rlist_create(&swim->dissemination_queue);
-	rlist_create(&swim->on_member_update);
-	stailq_create(&swim->update_queue);
+	rlist_create(&swim->on_member_event);
+	stailq_create(&swim->event_queue);
 
 	fiber_start(swim->worker, swim);
 	return swim;
@@ -2096,7 +2096,7 @@ swim_do_delete(struct swim *swim)
 	swim_ev_timer_stop(l, &swim->round_tick);
 	swim_ev_timer_stop(l, &swim->wait_ack_tick);
 	struct swim_member *m, *tmp;
-	stailq_foreach_entry_safe(m, tmp, &swim->update_queue, in_update_queue)
+	stailq_foreach_entry_safe(m, tmp, &swim->event_queue, in_event_queue)
 		swim_member_unref(m);
 	mh_int_t node;
 	mh_foreach(swim->members, node) {
@@ -2114,7 +2114,7 @@ swim_do_delete(struct swim *swim)
 	swim_task_destroy(&swim->round_step_task);
 	wait_ack_heap_destroy(&swim->wait_ack_heap);
 	mh_swim_table_delete(swim->members);
-	trigger_destroy(&swim->on_member_update);
+	trigger_destroy(&swim->on_member_event);
 	free(swim->shuffled);
 	free(swim);
 }
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 8a5bc9522..a42ace7c6 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -286,8 +286,8 @@ enum swim_ev_mask {
 	SWIM_EV_DROP            = 0b00100000,
 };
 
-/** On member update trigger context. */
-struct swim_on_member_update_ctx {
+/** On member event trigger context. */
+struct swim_on_member_event_ctx {
 	/** New, dropped, or updated member. */
 	struct swim_member *member;
 	/** Mask of happened events. */
@@ -295,22 +295,22 @@ struct swim_on_member_update_ctx {
 };
 
 /**
- * A list of member update triggers. A couple of words about such
- * a strange API, instead of something like "add_trigger",
- * "drop_trigger". A main motivation is that some functions need
- * a whole trigger list. For example, a function adding Lua
- * functions as triggers. At the time of this writing it was
- * lbox_trigger_reset. There is a convention, that any Tarantool
- * trigger exposed to Lua should provide a way to add one, drop
- * one, replace one, return all - lbox_trigger_reset does all of
- * this.
+ * A list of member event processing triggers. A couple of words
+ * about such a strange API, instead of something like
+ * "add_trigger", "drop_trigger". A main motivation is that some
+ * functions need a whole trigger list. For example, a function
+ * adding Lua functions as triggers. At the time of this writing
+ * it was lbox_trigger_reset. There is a convention, that any
+ * Tarantool trigger exposed to Lua should provide a way to add
+ * one, drop one, replace one, return all - lbox_trigger_reset
+ * does all of this.
  */
 struct rlist *
-swim_trigger_list_on_member_update(struct swim *swim);
+swim_trigger_list_on_member_event(struct swim *swim);
 
 /**
- * Check if a SWIM instance has pending update events. Is not a
- * public one, used by tests.
+ * Check if a SWIM instance has pending events. Is not a public
+ * one, used by tests.
  */
 bool
 swim_has_pending_events(struct swim *swim);
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 3f2d156c5..8bf9b0eba 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -956,22 +956,22 @@ struct trigger_ctx {
 	bool is_deleted;
 	bool need_sleep;
 	struct fiber *f;
-	struct swim_on_member_update_ctx ctx;
+	struct swim_on_member_event_ctx ctx;
 };
 
 static void
-swim_on_member_update_save_event(struct trigger *t, void *event)
+swim_on_member_event_save(struct trigger *t, void *event)
 {
 	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
 	++c->counter;
 	if (c->ctx.member != NULL)
 		swim_member_unref(c->ctx.member);
-	c->ctx = *((struct swim_on_member_update_ctx *) event);
+	c->ctx = *((struct swim_on_member_event_ctx *) event);
 	swim_member_ref(c->ctx.member);
 }
 
 static void
-swim_on_member_update_yield(struct trigger *t, void *event)
+swim_on_member_event_yield(struct trigger *t, void *event)
 {
 	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
 	++c->counter;
@@ -997,14 +997,14 @@ swim_test_triggers(void)
 	memset(&tctx2, 0, sizeof(tctx2));
 	struct trigger *t1 = (struct trigger *) malloc(sizeof(*t1));
 	assert(t1 != NULL);
-	trigger_create(t1, swim_on_member_update_save_event, (void *) &tctx,
+	trigger_create(t1, swim_on_member_event_save, (void *) &tctx,
 		       swim_trigger_destroy_cb);
 
 	/* Skip 'new self' events. */
 	swim_cluster_run_triggers(cluster);
 
 	struct swim *s1 = swim_cluster_member(cluster, 0);
-	trigger_add(swim_trigger_list_on_member_update(s1), t1);
+	trigger_add(swim_trigger_list_on_member_event(s1), t1);
 	swim_cluster_interconnect(cluster, 0, 1);
 	swim_cluster_run_triggers(cluster);
 
@@ -1052,14 +1052,14 @@ swim_test_triggers(void)
 	 * There is a complication about yields. If a trigger
 	 * yields, other triggers wait for its finish. And all
 	 * the triggers should be ready to SWIM deletion in the
-	 * middle of an update processing. SWIM object should not
+	 * middle of an event processing. SWIM object should not
 	 * be deleted, until all the triggers are done.
 	 */
 	struct trigger *t2 = (struct trigger *) malloc(sizeof(*t2));
 	assert(t2 != NULL);
 	tctx2.need_sleep = true;
-	trigger_create(t2, swim_on_member_update_yield, (void *) &tctx2, NULL);
-	trigger_add(swim_trigger_list_on_member_update(s1), t2);
+	trigger_create(t2, swim_on_member_event_yield, (void *) &tctx2, NULL);
+	trigger_add(swim_trigger_list_on_member_event(s1), t2);
 	swim_cluster_add_link(cluster, 0, 1);
 	swim_cluster_run_triggers(cluster);
 	is(tctx2.counter, 1, "yielding trigger is fired");
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 0b4743b2d..463c62390 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -200,8 +200,8 @@ void
 swim_test_event_cb(struct trigger *trigger, void *event)
 {
 	(void) trigger;
-	struct swim_on_member_update_ctx *ctx =
-		(struct swim_on_member_update_ctx *) event;
+	struct swim_on_member_event_ctx *ctx =
+		(struct swim_on_member_event_ctx *) event;
 	assert(ctx->events != 0);
 	assert((ctx->events & SWIM_EV_NEW) == 0 ||
 	       (ctx->events & SWIM_EV_DROP) == 0);
@@ -216,7 +216,7 @@ swim_node_create(struct swim_node *n, int id)
 	assert(n->swim != NULL);
 	struct trigger *t = (struct trigger *) malloc(sizeof(*t));
 	trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free);
-	trigger_add(swim_trigger_list_on_member_update(n->swim), t);
+	trigger_add(swim_trigger_list_on_member_event(n->swim), t);
 
 	char uri[128];
 	swim_cluster_id_to_uri(uri, id);

=======================================================================

>> +	/**
>> +	 * Worker fiber to serve member update triggers. This task
>> +	 * is being done in a separate fiber, because user
>> +	 * triggers can yield and libev callbacks, processing
>> +	 * member updates, are not allowed to yield.
>> +	 */
>> +	struct fiber *worker;
> 
> I think as long as the worker is only used for triggers, the name
> should be more specific. when the worker is used for other events
> or actions, it could be renamed.

Perhaps.

=======================================================================

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index d48e43761..fbc3cc7be 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -498,12 +498,12 @@ struct swim {
 	 */
 	struct swim_member **shuffled;
 	/**
-	 * Worker fiber to serve member event triggers. This task
-	 * is being done in a separate fiber, because user
-	 * triggers can yield and libev callbacks, processing
-	 * member events, are not allowed to yield.
+	 * Fiber to serve member event triggers. This task is
+	 * being done in a separate fiber, because user triggers
+	 * can yield and libev callbacks, processing member
+	 * events, are not allowed to yield.
 	 */
-	struct fiber *worker;
+	struct fiber *event_handler;
 	/**
 	 * Single round step task. It is impossible to have
 	 * multiple round steps in the same SWIM instance at the
@@ -600,7 +600,7 @@ swim_on_member_update(struct swim *swim, struct swim_member *member,
 			swim_member_ref(member);
 			stailq_add_tail_entry(&swim->event_queue, member,
 					      in_event_queue);
-			fiber_wakeup(swim->worker);
+			fiber_wakeup(swim->event_handler);
 		}
 		member->events |= events;
 	}
@@ -1736,14 +1736,14 @@ error:
 
 /**
  * Do actual deletion of a SWIM instance. This destructor is
- * called only after worker fiber has been stopped, and triggers
- * do not work.
+ * called only after event handler fiber has been stopped, and
+ * triggers do not work.
  */
 static void
 swim_do_delete(struct swim *swim);
 
 /**
- * Worker fiber. At this moment its only task is dispatching
+ * Event handler. At this moment its only task is dispatching
  * member events to user defined triggers. Generally, because
  * SWIM is fully IO driven, that fiber should be used only for
  * yielding tasks not related to SWIM core logic. For all the
@@ -1752,7 +1752,7 @@ swim_do_delete(struct swim *swim);
  * invoked by a cord scheduler fiber prohibited for manual yields.
  */
 static int
-swim_worker_f(va_list va)
+swim_event_handler_f(va_list va)
 {
 	struct swim *s = va_arg(va, struct swim *);
 	struct swim_on_member_event_ctx ctx;
@@ -1802,14 +1802,15 @@ swim_new(void)
 		diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim");
 		return NULL;
 	}
-	swim->worker = fiber_new("SWIM worker", swim_worker_f);
-	if (swim->worker == NULL) {
+	swim->event_handler = fiber_new("SWIM event handler",
+					swim_event_handler_f);
+	if (swim->event_handler == NULL) {
 		free(swim);
 		return NULL;
 	}
 	swim->members = mh_swim_table_new();
 	if (swim->members == NULL) {
-		fiber_cancel(swim->worker);
+		fiber_cancel(swim->event_handler);
 		free(swim);
 		diag_set(OutOfMemory, sizeof(*swim->members),
 			 "mh_swim_table_new", "members");
@@ -1835,7 +1836,7 @@ swim_new(void)
 	rlist_create(&swim->on_member_event);
 	stailq_create(&swim->event_queue);
 
-	fiber_start(swim->worker, swim);
+	fiber_start(swim->event_handler, swim);
 	return swim;
 }
 
@@ -1944,8 +1945,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		 * specified.
 		 */
 		addr = swim->scheduler.transport.addr;
-		fiber_set_name(swim->worker, tt_sprintf("SWIM worker %d",
-							swim_fd(swim)));
+		fiber_set_name(swim->event_handler,
+			       tt_sprintf("SWIM event handler %d",
+					  swim_fd(swim)));
 	} else {
 		addr = swim->self->addr;
 	}
@@ -2122,8 +2124,8 @@ swim_do_delete(struct swim *swim)
 void
 swim_delete(struct swim *swim)
 {
-	fiber_wakeup(swim->worker);
-	fiber_cancel(swim->worker);
+	fiber_wakeup(swim->event_handler);
+	fiber_cancel(swim->event_handler);
 	fiber_sleep(0);
 }
 
@@ -2139,8 +2141,8 @@ swim_quit_step_complete(struct swim_task *task,
 	(void) rc;
 	struct swim *swim = swim_by_scheduler(scheduler);
 	if (rlist_empty(&swim->round_queue)) {
-		fiber_wakeup(swim->worker);
-		fiber_cancel(swim->worker);
+		fiber_wakeup(swim->event_handler);
+		fiber_cancel(swim->event_handler);
 		return;
 	}
 	struct swim_member *m =

=======================================================================

> 
>> @@ -734,7 +801,6 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
>>  	mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
>>  	assert(rc != mh_end(swim->members));
>>  	mh_swim_table_del(swim->members, rc, NULL);
>> -	swim_cached_round_msg_invalidate(swim);
>>  	rlist_del_entry(member, in_round_queue);
> 
> Stray change? At least I don't understand it.

No, it is not stray. I've started calling swim_on_member_update()
in swim_delete_member(). The former calls swim_cached_round_msg_invalidate
too, so I decided to remove one from the latter.

> 
>> +	struct swim *s = va_arg(va, struct swim *);
>> +	struct swim_on_member_update_ctx ctx;
>> +	while (! fiber_is_cancelled()) {
>> +		while (stailq_empty(&s->update_queue)) {
>> +			fiber_yield();
>> +			if (fiber_is_cancelled())
>> +				goto end;
> 
>> +		}
> 
> Since you use goto anyway, why a double loop?
> 
>     while (! fiber_is_cancelled()) {
>         if (stailq_empty(&s->update_queue) {
>             fiber_yield();
>             continue;
>         }
>         ... 
>     }

You are right. I forgot about 'continue', shame on me.

=======================================================================

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index fbc3cc7be..17e1e6a7b 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1757,10 +1757,9 @@ swim_event_handler_f(va_list va)
 	struct swim *s = va_arg(va, struct swim *);
 	struct swim_on_member_event_ctx ctx;
 	while (! fiber_is_cancelled()) {
-		while (stailq_empty(&s->event_queue)) {
+		if (stailq_empty(&s->event_queue)) {
 			fiber_yield();
-			if (fiber_is_cancelled())
-				goto end;
+			continue;
 		}
 		/*
 		 * Can't be empty. SWIM deletes members from
@@ -1788,7 +1787,6 @@ swim_event_handler_f(va_list va)
 		}
 		swim_member_unref(m);
 	}
-end:
 	swim_do_delete(s);
 	return 0;
 }

=======================================================================

>> +	}
>> +end:
>> +	swim_do_delete(s);
> 
> I don't mind swim_do_delete(), but I think the worker should not
> be destorying the instance. Looks a breach of encapsulation, this assumes
> the workers knows about all other uses of struct swim.
> 
> Why not cancel and join the worker from the destructor and then
> safely do the delete?

Yes, this is a breach. My motivation why I did this after multiple
iterations of patch remake and lots of doubts was that I wanted
swim:delete() be fast. With fiber_join() it will wait while all the
triggers are finished. But probably I magnified distress as usual,
and fiber_join would be really much simpler here.

=======================================================================

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 17e1e6a7b..6f34faa71 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1734,14 +1734,6 @@ error:
 	diag_log();
 }
 
-/**
- * Do actual deletion of a SWIM instance. This destructor is
- * called only after event handler fiber has been stopped, and
- * triggers do not work.
- */
-static void
-swim_do_delete(struct swim *swim);
-
 /**
  * Event handler. At this moment its only task is dispatching
  * member events to user defined triggers. Generally, because
@@ -1787,7 +1779,6 @@ swim_event_handler_f(va_list va)
 		}
 		swim_member_unref(m);
 	}
-	swim_do_delete(s);
 	return 0;
 }
 
@@ -1806,6 +1797,7 @@ swim_new(void)
 		free(swim);
 		return NULL;
 	}
+	fiber_set_joinable(swim->event_handler, true);
 	swim->members = mh_swim_table_new();
 	if (swim->members == NULL) {
 		fiber_cancel(swim->event_handler);
@@ -2088,9 +2080,34 @@ swim_size(const struct swim *swim)
 	return mh_size(swim->members);
 }
 
-static void
-swim_do_delete(struct swim *swim)
+/**
+ * Cancel and wait finish of an event handler fiber. That
+ * operation is not inlined in the SWIM destructor, because there
+ * is one more place, when the handler should be stopped even
+ * before SWIM deletion - quit. Quit deletes the instance only
+ * when all the 'I left' messages are sent, but it happens in a
+ * libev callback in the scheduler fiber where it is impossible
+ * to yield. So to that moment the handler should be dead already.
+ */
+static inline void
+swim_kill_event_handler(struct swim *swim)
 {
+	struct fiber *f = swim->event_handler;
+	/*
+	 * Nullify so as not to keep pointer at a fiber when it is
+	 * reused.
+	 */
+	swim->event_handler = NULL;
+	fiber_wakeup(f);
+	fiber_cancel(f);
+	fiber_join(f);
+}
+
+void
+swim_delete(struct swim *swim)
+{
+	if (swim->event_handler != NULL)
+		swim_kill_event_handler(swim);
 	struct ev_loop *l = swim_loop();
 	swim_scheduler_destroy(&swim->scheduler);
 	swim_ev_timer_stop(l, &swim->round_tick);
@@ -2119,14 +2136,6 @@ swim_do_delete(struct swim *swim)
 	free(swim);
 }
 
-void
-swim_delete(struct swim *swim)
-{
-	fiber_wakeup(swim->event_handler);
-	fiber_cancel(swim->event_handler);
-	fiber_sleep(0);
-}
-
 /**
  * Quit message is broadcasted in the same way as round messages,
  * step by step, with the only difference that quit round steps
@@ -2139,8 +2148,12 @@ swim_quit_step_complete(struct swim_task *task,
 	(void) rc;
 	struct swim *swim = swim_by_scheduler(scheduler);
 	if (rlist_empty(&swim->round_queue)) {
-		fiber_wakeup(swim->event_handler);
-		fiber_cancel(swim->event_handler);
+		/*
+		 * The handler should be dead - can't yield here,
+		 * it is the scheduler fiber.
+		 */
+		assert(swim->event_handler == NULL);
+		swim_delete(swim);
 		return;
 	}
 	struct swim_member *m =
@@ -2169,6 +2182,11 @@ void
 swim_quit(struct swim *swim)
 {
 	assert(swim_is_configured(swim));
+	/*
+	 * Kill the handler now. Later it will be impossible to do
+	 * from the scheduler fiber.
+	 */
+	swim_kill_event_handler(swim);
 	struct ev_loop *l = swim_loop();
 	swim_ev_timer_stop(l, &swim->round_tick);
 	swim_ev_timer_stop(l, &swim->wait_ack_tick);
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 8bf9b0eba..0e33d691c 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -986,6 +986,15 @@ swim_trigger_destroy_cb(struct trigger *t)
 	((struct trigger_ctx *) t->data)->is_deleted = true;
 }
 
+static int
+swim_cluster_delete_f(va_list ap)
+{
+	struct swim_cluster *c = (struct swim_cluster *)
+		va_arg(ap, struct swim_cluster *);
+	swim_cluster_delete(c);
+	return 0;
+}
+
 static void
 swim_test_triggers(void)
 {
@@ -1065,7 +1074,9 @@ swim_test_triggers(void)
 	is(tctx2.counter, 1, "yielding trigger is fired");
 	is(tctx.counter, 6, "non-yielding still is not");
 
-	swim_cluster_delete(cluster);
+	struct fiber *async_delete_fiber =
+		fiber_new("async delete", swim_cluster_delete_f);
+	fiber_start(async_delete_fiber, cluster);
 	ok(! tctx.is_deleted, "trigger is not deleted until all currently "\
 	   "sleeping triggers are finished");
 	tctx2.need_sleep = false;

=======================================================================

>> +	swim->worker = fiber_new("SWIM worker", swim_worker_f);
>> +	if (swim->worker == NULL) {
>> +		free(swim);
>> +		return NULL;
>> +	}
> 
>>  	swim->members = mh_swim_table_new();
>>  	if (swim->members == NULL) {
>> +		fiber_cancel(swim->worker);
> 
> Better to start the worker last, then you will have easy time
> destroying it on failure.  This is a minor nit, ignore if you wish.

=======================================================================

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index ae66ce198..00234d1df 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1791,15 +1791,8 @@ swim_new(void)
 		diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim");
 		return NULL;
 	}
-	swim->event_handler = fiber_new("SWIM event handler",
-					swim_event_handler_f);
-	if (swim->event_handler == NULL) {
-		free(swim);
-		return NULL;
-	}
 	swim->members = mh_swim_table_new();
 	if (swim->members == NULL) {
-		fiber_cancel(swim->event_handler);
 		free(swim);
 		diag_set(OutOfMemory, sizeof(*swim->members),
 			 "mh_swim_table_new", "members");
@@ -1824,7 +1817,13 @@ swim_new(void)
 	rlist_create(&swim->dissemination_queue);
 	rlist_create(&swim->on_member_event);
 	stailq_create(&swim->event_queue);
-
+	swim->event_handler = fiber_new("SWIM event handler",
+					swim_event_handler_f);
+	if (swim->event_handler == NULL) {
+		swim_delete(swim);
+		return NULL;
+	}
+	fiber_set_joinable(swim->event_handler, true);
 	fiber_start(swim->event_handler, swim);
 	return swim;
 }

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 5/5] swim: expose Lua triggers on member update
  2019-06-02  0:10 ` [tarantool-patches] [PATCH 5/5] swim: expose Lua triggers on member update Vladislav Shpilevoy
@ 2019-06-05 21:54   ` Vladislav Shpilevoy
  2019-06-08 14:29     ` Konstantin Osipov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-05 21:54 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

There is names update:

=======================================================================

diff --git a/src/lua/swim.c b/src/lua/swim.c
index 17441e58c..c3a0a9911 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -36,26 +36,26 @@
 static uint32_t ctid_swim_member_ptr;
 static uint32_t ctid_swim_ptr;
 
-/** Push member update context into a Lua stack. */
+/** Push member event context into a Lua stack. */
 static int
-lua_swim_on_member_update_push(struct lua_State *L, void *event)
+lua_swim_member_event_push(struct lua_State *L, void *event)
 {
-	struct swim_on_member_update_ctx *ctx =
-		(struct swim_on_member_update_ctx *) event;
+	struct swim_on_member_event_ctx *ctx =
+		(struct swim_on_member_event_ctx *) event;
 	*(struct swim_member **) luaL_pushcdata(L, ctid_swim_member_ptr) =
 		ctx->member;
 	lua_pushinteger(L, ctx->events);
 	return 2;
 }
 
-/** Hang or/and delete a trigger on a SWIM member update. */
+/** Set or/and delete a trigger on a SWIM member event. */
 static int
-lua_swim_on_member_update(struct lua_State *L)
+lua_swim_on_member_event(struct lua_State *L)
 {
 	uint32_t ctypeid;
 	struct swim *s = *(struct swim **) luaL_checkcdata(L, 1, &ctypeid);
-	return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_update(s),
-				  lua_swim_on_member_update_push, NULL);
+	return lbox_trigger_reset(L, 3, swim_trigger_list_on_member_event(s),
+				  lua_swim_member_event_push, NULL);
 }
 
 /**
@@ -98,7 +98,7 @@ tarantool_lua_swim_init(struct lua_State *L)
 	static const struct luaL_Reg lua_swim_internal_methods [] = {
 		{"swim_new", lua_swim_new},
 		{"swim_delete", lua_swim_delete},
-		{"swim_on_member_update", lua_swim_on_member_update},
+		{"swim_on_member_event", lua_swim_on_member_event},
 		{NULL, NULL}
 	};
 	luaL_register_module(L, "swim", lua_swim_internal_methods);
diff --git a/src/lua/swim.lua b/src/lua/swim.lua
index 9b8ff1c29..a7d5caab3 100644
--- a/src/lua/swim.lua
+++ b/src/lua/swim.lua
@@ -699,7 +699,7 @@ local function swim_pairs(s)
     return swim_pairs_next, {swim = s, iterator = iterator}, nil
 end
 
-local swim_on_member_update_index = {
+local swim_member_event_index = {
     is_new = function(self)
         return bit.band(self[1], capi.SWIM_EV_NEW) ~= 0
     end,
@@ -723,11 +723,11 @@ local swim_on_member_update_index = {
     end,
 }
 
-local swim_on_member_update_event_mt = {
-    __index = swim_on_member_update_index,
+local swim_member_event_mt = {
+    __index = swim_member_event_index,
     __serialize = function(self)
         local res = {}
-        for k, v in pairs(swim_on_member_update_index) do
+        for k, v in pairs(swim_member_event_index) do
             v = v(self)
             if v then
                 res[k] = v
@@ -739,42 +739,42 @@ local swim_on_member_update_event_mt = {
 
 --
 -- Create a closure function for preprocessing raw SWIM member
--- update trigger parameters.
+-- event trigger parameters.
 -- @param s SWIM instance.
 -- @param callback User functions to call.
 -- @param ctx An optional parameter for @a callback passed as is.
 -- @return A function to set as a trigger.
 --
-local function swim_on_member_update_new(s, callback, ctx)
+local function swim_on_member_event_new(s, callback, ctx)
     return function(member_ptr, event_mask)
         local m = swim_wrap_member(s, member_ptr)
-        local event = setmetatable({event_mask}, swim_on_member_update_event_mt)
+        local event = setmetatable({event_mask}, swim_member_event_mt)
         return callback(m, event, ctx)
     end
 end
 
 --
--- Add or/and delete a trigger on member update. Possible usages:
+-- Add or/and delete a trigger on member event. Possible usages:
 --
--- * on_member_update(new[, ctx]) - add a new trigger. It should
+-- * on_member_event(new[, ctx]) - add a new trigger. It should
 --   accept 3 arguments: an updated member, an events object, an
 --   optional @a ctx parameter passed as is.
 --
--- * on_member_update(new, old[, ctx]) - add a new trigger @a new
+-- * on_member_event(new, old[, ctx]) - add a new trigger @a new
 --   if not nil, in place of @a old trigger.
 --
--- * on_member_update() - get a list of triggers.
+-- * on_member_event() - get a list of triggers.
 --
-local function swim_on_member_update(s, new, old, ctx)
-    local ptr = swim_check_instance(s, 'swim:on_member_update')
+local function swim_on_member_event(s, new, old, ctx)
+    local ptr = swim_check_instance(s, 'swim:on_member_event')
     if type(old) ~= 'function' then
         ctx = old
         old = nil
     end
     if new ~= nil then
-        new = swim_on_member_update_new(s, new, ctx)
+        new = swim_on_member_event_new(s, new, ctx)
     end
-    return internal.swim_on_member_update(ptr, new, old)
+    return internal.swim_on_member_event(ptr, new, old)
 end
 
 --
@@ -796,7 +796,7 @@ local swim_mt = {
         set_payload = swim_set_payload,
         set_codec = swim_set_codec,
         pairs = swim_pairs,
-        on_member_update = swim_on_member_update,
+        on_member_event = swim_on_member_event,
     },
     __serialize = swim_serialize
 }
@@ -889,7 +889,7 @@ local swim_not_configured_mt = {
         delete = swim_delete,
         is_configured = swim_is_configured,
         set_codec = swim_set_codec,
-        on_member_update = swim_on_member_update,
+        on_member_event = swim_on_member_event,
     },
     __serialize = swim_serialize
 }
diff --git a/test/swim/swim.result b/test/swim/swim.result
index 1de903b04..0196837c8 100644
--- a/test/swim/swim.result
+++ b/test/swim/swim.result
@@ -1223,9 +1223,9 @@ s2:delete()
 s1 = swim.new()
 ---
 ...
-s1.on_member_update()
+s1.on_member_event()
 ---
-- error: 'builtin/swim.lua:<line>: swim:on_member_update: first argument is not a SWIM
+- error: 'builtin/swim.lua:<line>: swim:on_member_event: first argument is not a SWIM
     instance'
 ...
 m_list = {}
@@ -1263,7 +1263,7 @@ end;
 _ = test_run:cmd("setopt delimiter ''");
 ---
 ...
-t_save_event_id = s1:on_member_update(t_save_event, 'ctx')
+t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
 ---
 ...
 -- Not equal, because SWIM wraps user triggers with a closure for
@@ -1301,7 +1301,7 @@ ctx_list
 m_list = {} e_list = {} ctx_list = {}
 ---
 ...
-t_yield_id = s1:on_member_update(t_yield, 'ctx2')
+t_yield_id = s1:on_member_event(t_yield, 'ctx2')
 ---
 ...
 f_need_sleep = true
@@ -1400,11 +1400,11 @@ ctx_list
 m_list = {} e_list = {} ctx_list = {}
 ---
 ...
-#s1:on_member_update()
+#s1:on_member_event()
 ---
 - 2
 ...
-s1:on_member_update(nil, t_yield_id)
+s1:on_member_event(nil, t_yield_id)
 ---
 ...
 s2:quit()
@@ -1413,7 +1413,7 @@ s2:quit()
 while s1:size() ~= 1 do fiber.sleep(0.01) end
 ---
 ...
--- Process update.
+-- Process event.
 fiber.sleep(0)
 ---
 ...
@@ -1453,7 +1453,7 @@ m_list[1] == m_list[2]
 m_list = {} e_list = {} ctx_list = {}
 ---
 ...
-s1:on_member_update(nil, t_save_event_id)
+s1:on_member_event(nil, t_save_event_id)
 ---
 ...
 s1:add_member({uuid = m:uuid(), uri = m:uri()})
@@ -1463,7 +1463,7 @@ s1:add_member({uuid = m:uuid(), uri = m:uri()})
 fiber.sleep(0)
 ---
 ...
--- No updates - all the triggers are dropped.
+-- No events - all the triggers are dropped.
 m_list
 ---
 - []
diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua
index 25eb5a7d0..458e349e0 100644
--- a/test/swim/swim.test.lua
+++ b/test/swim/swim.test.lua
@@ -418,7 +418,7 @@ s2:delete()
 -- member drop, member update.
 --
 s1 = swim.new()
-s1.on_member_update()
+s1.on_member_event()
 
 m_list = {}
 e_list = {}
@@ -437,7 +437,7 @@ t_yield = function(m, e, ctx)
     while f_need_sleep do fiber.sleep(10000) end
 end;
 _ = test_run:cmd("setopt delimiter ''");
-t_save_event_id = s1:on_member_update(t_save_event, 'ctx')
+t_save_event_id = s1:on_member_event(t_save_event, 'ctx')
 -- Not equal, because SWIM wraps user triggers with a closure for
 -- context preprocessing.
 t_save_event_id ~= t_save_event
@@ -449,7 +449,7 @@ e_list
 ctx_list
 m_list = {} e_list = {} ctx_list = {}
 
-t_yield_id = s1:on_member_update(t_yield, 'ctx2')
+t_yield_id = s1:on_member_event(t_yield, 'ctx2')
 f_need_sleep = true
 s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
 s2:add_member({uuid = s1:self():uuid(), uri = s1:self():uri()})
@@ -472,12 +472,12 @@ m_list
 e_list
 ctx_list
 m_list = {} e_list = {} ctx_list = {}
-#s1:on_member_update()
+#s1:on_member_event()
 
-s1:on_member_update(nil, t_yield_id)
+s1:on_member_event(nil, t_yield_id)
 s2:quit()
 while s1:size() ~= 1 do fiber.sleep(0.01) end
--- Process update.
+-- Process event.
 fiber.sleep(0)
 -- Two events - status update to 'left', and 'drop'.
 m_list
@@ -488,10 +488,10 @@ m = m_list[1]
 m_list[1] == m_list[2]
 m_list = {} e_list = {} ctx_list = {}
 
-s1:on_member_update(nil, t_save_event_id)
+s1:on_member_event(nil, t_save_event_id)
 s1:add_member({uuid = m:uuid(), uri = m:uri()})
 fiber.sleep(0)
--- No updates - all the triggers are dropped.
+-- No events - all the triggers are dropped.
 m_list
 e_list
 ctx_list

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 3/5] swim: allow to hang triggers on member updates
  2019-06-05 21:53     ` Vladislav Shpilevoy
@ 2019-06-07 13:35       ` Konstantin Osipov
  0 siblings, 0 replies; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-07 13:35 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/06 10:44]:

OK, could you please resend the amalgamated patch to the list?


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 1/5] test: create isolated ev_loop for swim unit tests
  2019-06-05 21:53     ` Vladislav Shpilevoy
@ 2019-06-08 14:24       ` Konstantin Osipov
  0 siblings, 0 replies; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:24 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/06 10:44]:

> Hi! Thanks for the review.
> 
> On 05/06/2019 09:51, Konstantin Osipov wrote:
> > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:
> >> --- a/src/lib/swim/swim_ev.c
> >> +++ b/src/lib/swim/swim_ev.c
> >> @@ -55,3 +55,9 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *watcher)
> >>  {
> >>  	ev_timer_stop(loop, watcher);
> >>  }
> >> +
> >> +struct ev_loop *
> >> +swim_loop(void)
> > 
> > The comment explaining why you need a separate loop should be
> > here, not in the tests, since this is the place most people will
> > be looking at and wondering why you need this wrapper at all.
> > 
> > You could hack this around with a define, but I think your
> > approach is more clean, so please just add a comment.
> > 
> 
> It can't be solved with define, because I need swim.o. I can't
> postpone preprocessor work till linking time. Otherwise we could
> just implement every function in swim_ev.h and swim_transport.h
> as a macros.
> 
> The comment is moved and slightly modified.

OK, thanks!

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 4/5] swim: call swim:new/delete via Lua C, not via FFI
  2019-06-02  0:00 ` [tarantool-patches] [PATCH 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
@ 2019-06-08 14:24   ` Konstantin Osipov
  0 siblings, 0 replies; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:24 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:
> These function are going to yield in scope of #4250, because
> swim:new() will start a fiber, while swim:delete() cancels and
> gives it a control.

LGTM

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 5/5] swim: expose Lua triggers on member update
       [not found] ` <12b8ea76f7c1cd100a80ddcea3c29d20354e073e.1559433539.git.v.shpilevoy@tarantool.org>
@ 2019-06-08 14:27   ` Konstantin Osipov
  2019-06-08 19:52     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:27 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:
> 
> Events object has methods to help a user to determine what update
> has happened.
> ```Lua
> local function on_update(member, events, ctx)
>     if events:is_new() then

This doesn't look proper English to me, events is plural, is_new()
is used for singular case. It is either an event_mask, or an event
list/set, but not something in the middle.

What about:

event_mask:has(...)?
> +    enum swim_ev_mask {
> +        SWIM_EV_NEW             = 0b00000001,
> +        SWIM_EV_NEW_STATUS      = 0b00000010,
> +        SWIM_EV_NEW_URI         = 0b00000100,
> +        SWIM_EV_NEW_INCARNATION = 0b00001000,
> +        SWIM_EV_NEW_PAYLOAD     = 0b00010000,
> +        SWIM_EV_UPDATE          = 0b00011110,
> +        SWIM_EV_DROP            = 0b00100000,
> +    };

Or simply export these objects to Lua and let users play with
them.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 5/5] swim: expose Lua triggers on member update
  2019-06-05 21:54   ` [tarantool-patches] " Vladislav Shpilevoy
@ 2019-06-08 14:29     ` Konstantin Osipov
  0 siblings, 0 replies; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-08 14:29 UTC (permalink / raw)
  To: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/06 10:44]:
> There is names update:

OK, this change is LGTM, but please see the minor comment about
the event api itself.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 5/5] swim: expose Lua triggers on member update
  2019-06-08 14:27   ` Konstantin Osipov
@ 2019-06-08 19:52     ` Vladislav Shpilevoy
  2019-06-09  5:15       ` Konstantin Osipov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-08 19:52 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches



On 08/06/2019 17:27, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/03 14:33]:
>>
>> Events object has methods to help a user to determine what update
>> has happened.
>> ```Lua
>> local function on_update(member, events, ctx)
>>     if events:is_new() then
> 
> This doesn't look proper English to me, events is plural, is_new()
> is used for singular case. It is either an event_mask, or an event
> list/set, but not something in the middle.

Yes, probably you are right. I thing, 'eventset' or even just 'event'
are good. See below why I avoid integers, masks, etc in public API.

> 
> What about:
> 
> event_mask:has(...)?
>> +    enum swim_ev_mask {
>> +        SWIM_EV_NEW             = 0b00000001,
>> +        SWIM_EV_NEW_STATUS      = 0b00000010,
>> +        SWIM_EV_NEW_URI         = 0b00000100,
>> +        SWIM_EV_NEW_INCARNATION = 0b00001000,
>> +        SWIM_EV_NEW_PAYLOAD     = 0b00010000,
>> +        SWIM_EV_UPDATE          = 0b00011110,
>> +        SWIM_EV_DROP            = 0b00100000,
>> +    };
> 
> Or simply export these objects to Lua and let users play with
> them.
> 

This is exactly what I was trying to avoid with all these
mask metamethods. I want to be able in future to add old values
of updated member attributes, if it will be necessary. It will
be easy without breaking the old code, if from now we will return
just an abstract 'events' object with some metamethods.

Also probably in future we will not return the events as a mask.
So I don't want to expose swim_ev_mask to Lua API.

I've fixed the documentation with 'events' -> 'event' rename.
Just treat the object as a complex event from multiple parts.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 5/5] swim: expose Lua triggers on member update
  2019-06-08 19:52     ` Vladislav Shpilevoy
@ 2019-06-09  5:15       ` Konstantin Osipov
  2019-06-09 16:41         ` Vladislav Shpilevoy
  0 siblings, 1 reply; 20+ messages in thread
From: Konstantin Osipov @ 2019-06-09  5:15 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/08 22:52]:
> This is exactly what I was trying to avoid with all these
> mask metamethods. I want to be able in future to add old values
> of updated member attributes, if it will be necessary. It will
> be easy without breaking the old code, if from now we will return
> just an abstract 'events' object with some metamethods.
> 
> Also probably in future we will not return the events as a mask.
> So I don't want to expose swim_ev_mask to Lua API.
> 
> I've fixed the documentation with 'events' -> 'event' rename.
> Just treat the object as a complex event from multiple parts.

OK, but what if I want to test a combination of events? With
masks, I could just pass a bit.bor() mask to the caller, without I
have to invoke :is..() multiple times.

Anyway, LGTM after rename.

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [tarantool-patches] Re: [PATCH 5/5] swim: expose Lua triggers on member update
  2019-06-09  5:15       ` Konstantin Osipov
@ 2019-06-09 16:41         ` Vladislav Shpilevoy
  0 siblings, 0 replies; 20+ messages in thread
From: Vladislav Shpilevoy @ 2019-06-09 16:41 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches



On 09/06/2019 08:15, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/06/08 22:52]:
>> This is exactly what I was trying to avoid with all these
>> mask metamethods. I want to be able in future to add old values
>> of updated member attributes, if it will be necessary. It will
>> be easy without breaking the old code, if from now we will return
>> just an abstract 'events' object with some metamethods.
>>
>> Also probably in future we will not return the events as a mask.
>> So I don't want to expose swim_ev_mask to Lua API.
>>
>> I've fixed the documentation with 'events' -> 'event' rename.
>> Just treat the object as a complex event from multiple parts.
> 
> OK, but what if I want to test a combination of events? With
> masks, I could just pass a bit.bor() mask to the caller, without I
> have to invoke :is..() multiple times.

Unfortunately, here it is a price of not exposing
any internal details of 'event' object. You need to
use multiple 'is_...()' to check for multiple events.

> 
> Anyway, LGTM after rename.
> 

Pushed to the master.

^ permalink raw reply	[flat|nested] 20+ messages in thread

end of thread, other threads:[~2019-06-09 16:41 UTC | newest]

Thread overview: 20+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-02  0:00 [tarantool-patches] [PATCH 0/5] SWIM on_member_update Vladislav Shpilevoy
2019-06-02  0:00 ` [tarantool-patches] [PATCH 1/5] test: create isolated ev_loop for swim unit tests Vladislav Shpilevoy
2019-06-05  6:51   ` [tarantool-patches] " Konstantin Osipov
2019-06-05 21:53     ` Vladislav Shpilevoy
2019-06-08 14:24       ` Konstantin Osipov
2019-06-02  0:00 ` [tarantool-patches] [PATCH 2/5] swim: fix a 'use after free' in SWIM tests Vladislav Shpilevoy
2019-06-05  6:52   ` [tarantool-patches] " Konstantin Osipov
2019-06-02  0:00 ` [tarantool-patches] [PATCH 3/5] swim: allow to hang triggers on member updates Vladislav Shpilevoy
2019-06-05  7:11   ` [tarantool-patches] " Konstantin Osipov
2019-06-05 21:53     ` Vladislav Shpilevoy
2019-06-07 13:35       ` Konstantin Osipov
2019-06-02  0:00 ` [tarantool-patches] [PATCH 4/5] swim: call swim:new/delete via Lua C, not via FFI Vladislav Shpilevoy
2019-06-08 14:24   ` [tarantool-patches] " Konstantin Osipov
2019-06-02  0:10 ` [tarantool-patches] [PATCH 5/5] swim: expose Lua triggers on member update Vladislav Shpilevoy
2019-06-05 21:54   ` [tarantool-patches] " Vladislav Shpilevoy
2019-06-08 14:29     ` Konstantin Osipov
     [not found] ` <12b8ea76f7c1cd100a80ddcea3c29d20354e073e.1559433539.git.v.shpilevoy@tarantool.org>
2019-06-08 14:27   ` Konstantin Osipov
2019-06-08 19:52     ` Vladislav Shpilevoy
2019-06-09  5:15       ` Konstantin Osipov
2019-06-09 16:41         ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox