[tarantool-patches] [PATCH v2 3/5] swim: allow to set triggers on member updates

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Jun 8 13:31:19 MSK 2019


SWIM is asynchronous by design. It means, that there are two
ways of getting updates of a member table and individual members:
polling and triggers. Polling is terrible - this is why libev
with select(), poll(), epoll(), kevent() have appeared. The only
acceptable solution is triggers.

This commit allows to set triggers on member updates.

Part of #4250
---
 src/lib/swim/swim.c         | 211 ++++++++++++++++++++++++++++++++----
 src/lib/swim/swim.h         |  47 +++++++-
 test/unit/swim.c            | 146 ++++++++++++++++++++++++-
 test/unit/swim.result       |  28 ++++-
 test/unit/swim_test_utils.c |  44 +++++++-
 test/unit/swim_test_utils.h |  12 ++
 6 files changed, 460 insertions(+), 28 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 46b76731d..00234d1df 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -37,6 +37,7 @@
 #include "msgpuck.h"
 #include "assoc.h"
 #include "sio.h"
+#include "trigger.h"
 #define HEAP_FORWARD_DECLARATION
 #include "salad/heap.h"
 
@@ -334,6 +335,19 @@ struct swim_member {
 	 * time.
 	 */
 	struct rlist in_dissemination_queue;
+	/**
+	 * Each time a member is updated, or created, or dropped,
+	 * it is added to an event queue. Members from this queue
+	 * are dispatched into user defined triggers.
+	 */
+	struct stailq_entry in_event_queue;
+	/**
+	 * Mask of events happened with this member since a
+	 * previous trigger invocation. Once the events are
+	 * delivered into a trigger, the mask is nullified and
+	 * starts collecting new events.
+	 */
+	enum swim_ev_mask events;
 	/**
 	 *
 	 *               Failure detection component
@@ -455,6 +469,17 @@ struct swim {
 	 * as long as the event TTD is non-zero.
 	 */
 	struct rlist dissemination_queue;
+	/**
+	 * Queue of updated, new, and dropped members to deliver
+	 * the events to triggers. Dropped members are also kept
+	 * here until they are handled by a trigger.
+	 */
+	struct stailq event_queue;
+	/**
+	 * List of triggers to call on each new, dropped, and
+	 * updated member.
+	 */
+	struct rlist on_member_event;
 	/**
 	 * Members to which a message should be sent next during
 	 * this round.
@@ -472,6 +497,13 @@ struct swim {
 	 * the beginning of each round.
 	 */
 	struct swim_member **shuffled;
+	/**
+	 * Fiber to serve member event triggers. This task is
+	 * being done in a separate fiber, because user triggers
+	 * can yield and libev callbacks, processing member
+	 * events, are not allowed to yield.
+	 */
+	struct fiber *event_handler;
 	/**
 	 * Single round step task. It is impossible to have
 	 * multiple round steps in the same SWIM instance at the
@@ -549,10 +581,41 @@ swim_register_event(struct swim *swim, struct swim_member *member)
  * change of its status, or incarnation, or both.
  */
 static void
-swim_on_member_update(struct swim *swim, struct swim_member *member)
+swim_on_member_update(struct swim *swim, struct swim_member *member,
+		      enum swim_ev_mask events)
 {
 	member->unacknowledged_pings = 0;
 	swim_register_event(swim, member);
+	/*
+	 * Member event should be delivered to triggers only if
+	 * there is at least one trigger.
+	 */
+	if (! rlist_empty(&swim->on_member_event)) {
+		/*
+		 * Member is referenced and added to a queue only
+		 * once. That moment can be detected when a first
+		 * event happens.
+		 */
+		if (member->events == 0 && events != 0) {
+			swim_member_ref(member);
+			stailq_add_tail_entry(&swim->event_queue, member,
+					      in_event_queue);
+			fiber_wakeup(swim->event_handler);
+		}
+		member->events |= events;
+	}
+}
+
+struct rlist *
+swim_trigger_list_on_member_event(struct swim *swim)
+{
+	return &swim->on_member_event;
+}
+
+bool
+swim_has_pending_events(struct swim *swim)
+{
+	return ! stailq_empty(&swim->event_queue);
 }
 
 /**
@@ -577,13 +640,17 @@ swim_update_member_inc_status(struct swim *swim, struct swim_member *member,
 	 */
 	assert(member != swim->self);
 	if (member->incarnation < incarnation) {
-		member->status = new_status;
+		enum swim_ev_mask events = SWIM_EV_NEW_INCARNATION;
+		if (new_status != member->status) {
+			events |= SWIM_EV_NEW_STATUS;
+			member->status = new_status;
+		}
 		member->incarnation = incarnation;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, events);
 	} else if (member->incarnation == incarnation &&
 		   member->status < new_status) {
 		member->status = new_status;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
 	}
 }
 
@@ -626,7 +693,7 @@ swim_update_member_payload(struct swim *swim, struct swim_member *member,
 	member->payload_size = payload_size;
 	member->payload_ttd = mh_size(swim->members);
 	member->is_payload_up_to_date = true;
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW_PAYLOAD);
 	return 0;
 }
 
@@ -734,7 +801,6 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 	mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
 	assert(rc != mh_end(swim->members));
 	mh_swim_table_del(swim->members, rc, NULL);
-	swim_cached_round_msg_invalidate(swim);
 	rlist_del_entry(member, in_round_queue);
 
 	/* Failure detection component. */
@@ -742,6 +808,7 @@ swim_delete_member(struct swim *swim, struct swim_member *member)
 		wait_ack_heap_delete(&swim->wait_ack_heap, member);
 
 	/* Dissemination component. */
+	swim_on_member_update(swim, member, SWIM_EV_DROP);
 	rlist_del_entry(member, in_dissemination_queue);
 
 	swim_member_delete(member);
@@ -805,7 +872,7 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		swim_ev_timer_again(swim_loop(), &swim->round_tick);
 
 	/* Dissemination component. */
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW);
 	if (payload_size >= 0 &&
 	    swim_update_member_payload(swim, member, payload,
 				       payload_size) != 0) {
@@ -1299,14 +1366,15 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 			if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
 				break;
 			m->status = MEMBER_SUSPECTED;
-			swim_on_member_update(swim, m);
+			swim_on_member_update(swim, m, SWIM_EV_NEW_STATUS);
 			if (swim_send_indirect_pings(swim, m) != 0)
 				diag_log();
 			break;
 		case MEMBER_SUSPECTED:
 			if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
 				m->status = MEMBER_DEAD;
-				swim_on_member_update(swim, m);
+				swim_on_member_update(swim, m,
+						      SWIM_EV_NEW_STATUS);
 			}
 			break;
 		case MEMBER_DEAD:
@@ -1332,7 +1400,7 @@ swim_update_member_addr(struct swim *swim, struct swim_member *member,
 {
 	assert(! swim_inaddr_eq(&member->addr, addr));
 	member->addr = *addr;
-	swim_on_member_update(swim, member);
+	swim_on_member_update(swim, member, SWIM_EV_NEW_URI);
 }
 
 /**
@@ -1354,12 +1422,10 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def,
 	if (def->incarnation > member->incarnation) {
 		if (! swim_inaddr_eq(&def->addr, &member->addr))
 			swim_update_member_addr(swim, member, &def->addr);
-		if (def->payload_size >= 0) {
+		if (def->payload_size >= 0)
 			update_payload = true;
-		} else if (member->is_payload_up_to_date) {
+		else if (member->is_payload_up_to_date)
 			member->is_payload_up_to_date = false;
-			swim_on_member_update(swim, member);
-		}
 	} else if (! member->is_payload_up_to_date && def->payload_size >= 0) {
 		update_payload = true;
 	}
@@ -1430,7 +1496,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 	 */
 	if (self->incarnation < def->incarnation) {
 		self->incarnation = def->incarnation;
-		swim_on_member_update(swim, self);
+		swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	}
 	if (def->status != MEMBER_ALIVE &&
 	    def->incarnation == self->incarnation) {
@@ -1440,7 +1506,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 		 * with a bigger incarnation.
 		 */
 		self->incarnation++;
-		swim_on_member_update(swim, self);
+		swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	}
 	return 0;
 skip:
@@ -1532,7 +1598,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	if (def.incarnation == member->incarnation &&
 	    member->status != MEMBER_ALIVE) {
 		member->status = MEMBER_ALIVE;
-		swim_on_member_update(swim, member);
+		swim_on_member_update(swim, member, SWIM_EV_NEW_STATUS);
 	}
 
 	switch (def.type) {
@@ -1604,7 +1670,7 @@ swim_process_quit(struct swim *swim, const char **pos, const char *end,
 		swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp);
 	} else if (tmp >= m->incarnation) {
 		m->incarnation = tmp + 1;
-		swim_on_member_update(swim, m);
+		swim_on_member_update(swim, m, SWIM_EV_NEW_INCARNATION);
 	}
 	return 0;
 }
@@ -1668,6 +1734,55 @@ error:
 	diag_log();
 }
 
+/**
+ * Event handler. At this moment its only task is dispatching
+ * member events to user defined triggers. Generally, because
+ * SWIM is fully IO driven, that fiber should be used only for
+ * yielding tasks not related to SWIM core logic. For all the
+ * other tasks libev callbacks are ok. Unfortunately, yields are
+ * not allowed directly in libev callbacks, because they are
+ * invoked by a cord scheduler fiber prohibited for manual yields.
+ */
+static int
+swim_event_handler_f(va_list va)
+{
+	struct swim *s = va_arg(va, struct swim *);
+	struct swim_on_member_event_ctx ctx;
+	while (! fiber_is_cancelled()) {
+		if (stailq_empty(&s->event_queue)) {
+			fiber_yield();
+			continue;
+		}
+		/*
+		 * Can't be empty. SWIM deletes members from
+		 * event queue only on SWIM deletion, but then
+		 * the fiber would be stopped already.
+		 */
+		assert(! stailq_empty(&s->event_queue));
+		struct swim_member *m =
+			stailq_shift_entry(&s->event_queue, struct swim_member,
+					   in_event_queue);
+		/*
+		 * It is possible, that a member was added and
+		 * removed before firing a trigger. It happens,
+		 * if a previous event was being handled too
+		 * long, for example. There is a convention not to
+		 * show such easy riders.
+		 */
+		if ((m->events & SWIM_EV_NEW) == 0 ||
+		    (m->events & SWIM_EV_DROP) == 0) {
+			ctx.member = m;
+			ctx.events = m->events;
+			m->events = 0;
+			if (trigger_run(&s->on_member_event, (void *) &ctx))
+				diag_log();
+		}
+		swim_member_unref(m);
+	}
+	return 0;
+}
+
+
 struct swim *
 swim_new(void)
 {
@@ -1700,7 +1815,16 @@ swim_new(void)
 
 	/* Dissemination component. */
 	rlist_create(&swim->dissemination_queue);
-
+	rlist_create(&swim->on_member_event);
+	stailq_create(&swim->event_queue);
+	swim->event_handler = fiber_new("SWIM event handler",
+					swim_event_handler_f);
+	if (swim->event_handler == NULL) {
+		swim_delete(swim);
+		return NULL;
+	}
+	fiber_set_joinable(swim->event_handler, true);
+	fiber_start(swim->event_handler, swim);
 	return swim;
 }
 
@@ -1809,6 +1933,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		 * specified.
 		 */
 		addr = swim->scheduler.transport.addr;
+		fiber_set_name(swim->event_handler,
+			       tt_sprintf("SWIM event handler %d",
+					  swim_fd(swim)));
 	} else {
 		addr = swim->self->addr;
 	}
@@ -1828,7 +1955,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 
 	if (new_self != NULL) {
 		swim->self->status = MEMBER_LEFT;
-		swim_on_member_update(swim, swim->self);
+		swim_on_member_update(swim, swim->self, SWIM_EV_NEW_STATUS);
 		swim->self = new_self;
 	}
 	if (! swim_inaddr_eq(&addr, &swim->self->addr)) {
@@ -1866,7 +1993,7 @@ swim_set_payload(struct swim *swim, const char *payload, int payload_size)
 	if (swim_update_member_payload(swim, self, payload, payload_size) != 0)
 		return -1;
 	self->incarnation++;
-	swim_on_member_update(swim, self);
+	swim_on_member_update(swim, self, SWIM_EV_NEW_INCARNATION);
 	return 0;
 }
 
@@ -1951,17 +2078,44 @@ swim_size(const struct swim *swim)
 	return mh_size(swim->members);
 }
 
+/**
+ * Cancel and wait finish of an event handler fiber. That
+ * operation is not inlined in the SWIM destructor, because there
+ * is one more place, when the handler should be stopped even
+ * before SWIM deletion - quit. Quit deletes the instance only
+ * when all the 'I left' messages are sent, but it happens in a
+ * libev callback in the scheduler fiber where it is impossible
+ * to yield. So to that moment the handler should be dead already.
+ */
+static inline void
+swim_kill_event_handler(struct swim *swim)
+{
+	struct fiber *f = swim->event_handler;
+	/*
+	 * Nullify so as not to keep pointer at a fiber when it is
+	 * reused.
+	 */
+	swim->event_handler = NULL;
+	fiber_wakeup(f);
+	fiber_cancel(f);
+	fiber_join(f);
+}
+
 void
 swim_delete(struct swim *swim)
 {
+	if (swim->event_handler != NULL)
+		swim_kill_event_handler(swim);
 	struct ev_loop *l = swim_loop();
 	swim_scheduler_destroy(&swim->scheduler);
 	swim_ev_timer_stop(l, &swim->round_tick);
 	swim_ev_timer_stop(l, &swim->wait_ack_tick);
+	struct swim_member *m, *tmp;
+	stailq_foreach_entry_safe(m, tmp, &swim->event_queue, in_event_queue)
+		swim_member_unref(m);
 	mh_int_t node;
 	mh_foreach(swim->members, node) {
-		struct swim_member *m =
-			*mh_swim_table_node(swim->members, node);
+		m = *mh_swim_table_node(swim->members, node);
 		rlist_del_entry(m, in_round_queue);
 		if (! heap_node_is_stray(&m->in_wait_ack_heap))
 			wait_ack_heap_delete(&swim->wait_ack_heap, m);
@@ -1975,6 +2129,7 @@ swim_delete(struct swim *swim)
 	swim_task_destroy(&swim->round_step_task);
 	wait_ack_heap_destroy(&swim->wait_ack_heap);
 	mh_swim_table_delete(swim->members);
+	trigger_destroy(&swim->on_member_event);
 	free(swim->shuffled);
 	free(swim);
 }
@@ -1991,6 +2146,11 @@ swim_quit_step_complete(struct swim_task *task,
 	(void) rc;
 	struct swim *swim = swim_by_scheduler(scheduler);
 	if (rlist_empty(&swim->round_queue)) {
+		/*
+		 * The handler should be dead - can't yield here,
+		 * it is the scheduler fiber.
+		 */
+		assert(swim->event_handler == NULL);
 		swim_delete(swim);
 		return;
 	}
@@ -2020,6 +2180,11 @@ void
 swim_quit(struct swim *swim)
 {
 	assert(swim_is_configured(swim));
+	/*
+	 * Kill the handler now. Later it will be impossible to do
+	 * from the scheduler fiber.
+	 */
+	swim_kill_event_handler(swim);
 	struct ev_loop *l = swim_loop();
 	swim_ev_timer_stop(l, &swim->round_tick);
 	swim_ev_timer_stop(l, &swim->wait_ack_tick);
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 6db12ba9e..a42ace7c6 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -40,6 +40,7 @@ extern "C" {
 #endif
 
 struct swim;
+struct rlist;
 struct tt_uuid;
 struct swim_iterator;
 struct swim_member;
@@ -63,7 +64,8 @@ enum swim_gc_mode {
 
 /**
  * Create a new SWIM instance. Do not bind to a port or set any
- * parameters. Allocation and initialization only.
+ * parameters. Allocation and initialization only. The function
+ * yields.
  */
 struct swim *
 swim_new(void);
@@ -124,7 +126,8 @@ swim_set_codec(struct swim *swim, enum crypto_algo algo, enum crypto_mode mode,
 
 /**
  * Stop listening and broadcasting messages, cleanup all internal
- * structures, free memory.
+ * structures, free memory. The function yields. Actual deletion
+ * happens after currently working triggers are done.
  */
 void
 swim_delete(struct swim *swim);
@@ -272,6 +275,46 @@ swim_member_unref(struct swim_member *member);
 bool
 swim_member_is_dropped(const struct swim_member *member);
 
+enum swim_ev_mask {
+	SWIM_EV_NEW             = 0b00000001,
+	SWIM_EV_NEW_STATUS      = 0b00000010,
+	SWIM_EV_NEW_URI         = 0b00000100,
+	SWIM_EV_NEW_INCARNATION = 0b00001000,
+	SWIM_EV_NEW_PAYLOAD     = 0b00010000,
+	/* Shortcut to check for any update. */
+	SWIM_EV_UPDATE          = 0b00011110,
+	SWIM_EV_DROP            = 0b00100000,
+};
+
+/** On member event trigger context. */
+struct swim_on_member_event_ctx {
+	/** New, dropped, or updated member. */
+	struct swim_member *member;
+	/** Mask of happened events. */
+	enum swim_ev_mask events;
+};
+
+/**
+ * A list of member event processing triggers. A couple of words
+ * about such a strange API, instead of something like
+ * "add_trigger", "drop_trigger". A main motivation is that some
+ * functions need a whole trigger list. For example, a function
+ * adding Lua functions as triggers. At the time of this writing
+ * it was lbox_trigger_reset. There is a convention, that any
+ * Tarantool trigger exposed to Lua should provide a way to add
+ * one, drop one, replace one, return all - lbox_trigger_reset
+ * does all of this.
+ */
+struct rlist *
+swim_trigger_list_on_member_event(struct swim *swim);
+
+/**
+ * Check if a SWIM instance has pending events. Is not a public
+ * one, used by tests.
+ */
+bool
+swim_has_pending_events(struct swim *swim);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 2ba9820d8..0e33d691c 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -41,6 +41,7 @@
 #include "swim_test_transport.h"
 #include "swim_test_ev.h"
 #include "swim_test_utils.h"
+#include "trigger.h"
 #include <fcntl.h>
 
 /**
@@ -950,10 +951,152 @@ swim_test_slow_net(void)
 	swim_finish_test();
 }
 
+struct trigger_ctx {
+	int counter;
+	bool is_deleted;
+	bool need_sleep;
+	struct fiber *f;
+	struct swim_on_member_event_ctx ctx;
+};
+
+static void
+swim_on_member_event_save(struct trigger *t, void *event)
+{
+	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
+	++c->counter;
+	if (c->ctx.member != NULL)
+		swim_member_unref(c->ctx.member);
+	c->ctx = *((struct swim_on_member_event_ctx *) event);
+	swim_member_ref(c->ctx.member);
+}
+
+static void
+swim_on_member_event_yield(struct trigger *t, void *event)
+{
+	struct trigger_ctx *c = (struct trigger_ctx *) t->data;
+	++c->counter;
+	c->f = fiber();
+	while (c->need_sleep)
+		fiber_yield();
+}
+
+static void
+swim_trigger_destroy_cb(struct trigger *t)
+{
+	((struct trigger_ctx *) t->data)->is_deleted = true;
+}
+
+static int
+swim_cluster_delete_f(va_list ap)
+{
+	struct swim_cluster *c = (struct swim_cluster *)
+		va_arg(ap, struct swim_cluster *);
+	swim_cluster_delete(c);
+	return 0;
+}
+
+static void
+swim_test_triggers(void)
+{
+	swim_start_test(21);
+	struct swim_cluster *cluster = swim_cluster_new(2);
+	swim_cluster_set_ack_timeout(cluster, 1);
+	struct trigger_ctx tctx, tctx2;
+	memset(&tctx, 0, sizeof(tctx));
+	memset(&tctx2, 0, sizeof(tctx2));
+	struct trigger *t1 = (struct trigger *) malloc(sizeof(*t1));
+	assert(t1 != NULL);
+	trigger_create(t1, swim_on_member_event_save, (void *) &tctx,
+		       swim_trigger_destroy_cb);
+
+	/* Skip 'new self' events. */
+	swim_cluster_run_triggers(cluster);
+
+	struct swim *s1 = swim_cluster_member(cluster, 0);
+	trigger_add(swim_trigger_list_on_member_event(s1), t1);
+	swim_cluster_interconnect(cluster, 0, 1);
+	swim_cluster_run_triggers(cluster);
+
+	is(tctx.counter, 1, "trigger is fired");
+	ok(! tctx.is_deleted, "is not deleted");
+	is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1),
+	   "ctx.member is set");
+	is(tctx.ctx.events, SWIM_EV_NEW, "ctx.events is set");
+
+	swim_run_for(1);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 2, "payload is delivered, trigger caught that");
+	is(tctx.ctx.member, swim_cluster_member_view(cluster, 0, 1),
+	   "S1 got S2' payload");
+	is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD, "mask says that");
+
+	swim_cluster_member_set_payload(cluster, 0, "123", 3);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 3, "self payload is updated");
+	is(tctx.ctx.member, swim_self(s1), "self is set as a member");
+	is(tctx.ctx.events, SWIM_EV_NEW_PAYLOAD | SWIM_EV_NEW_INCARNATION,
+	   "both incarnation and payload events are presented");
+
+	swim_cluster_set_drop(cluster, 1, 100);
+	fail_if(swim_cluster_wait_status(cluster, 0, 1,
+					 MEMBER_SUSPECTED, 3) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 4, "suspicion fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status suspected");
+
+	fail_if(swim_cluster_wait_status(cluster, 0, 1, MEMBER_DEAD, 3) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 5, "death fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_NEW_STATUS, "status dead");
+
+	fail_if(swim_cluster_wait_status(cluster, 0, 1,
+					 swim_member_status_MAX, 2) != 0);
+	swim_cluster_run_triggers(cluster);
+	is(tctx.counter, 6, "drop fired a trigger");
+	is(tctx.ctx.events, SWIM_EV_DROP, "status dropped");
+	is(swim_cluster_member_view(cluster, 0, 1), NULL,
+	   "dropped member is not presented in the member table");
+	isnt(tctx.ctx.member, NULL, "but is in the event context");
+	/*
+	 * There is a complication about yields. If a trigger
+	 * yields, other triggers wait for its finish. And all
+	 * the triggers should be ready to SWIM deletion in the
+	 * middle of an event processing. SWIM object should not
+	 * be deleted, until all the triggers are done.
+	 */
+	struct trigger *t2 = (struct trigger *) malloc(sizeof(*t2));
+	assert(t2 != NULL);
+	tctx2.need_sleep = true;
+	trigger_create(t2, swim_on_member_event_yield, (void *) &tctx2, NULL);
+	trigger_add(swim_trigger_list_on_member_event(s1), t2);
+	swim_cluster_add_link(cluster, 0, 1);
+	swim_cluster_run_triggers(cluster);
+	is(tctx2.counter, 1, "yielding trigger is fired");
+	is(tctx.counter, 6, "non-yielding still is not");
+
+	struct fiber *async_delete_fiber =
+		fiber_new("async delete", swim_cluster_delete_f);
+	fiber_start(async_delete_fiber, cluster);
+	ok(! tctx.is_deleted, "trigger is not deleted until all currently "\
+	   "sleeping triggers are finished");
+	tctx2.need_sleep = false;
+	fiber_wakeup(tctx2.f);
+	while (! tctx.is_deleted)
+		fiber_sleep(0);
+	note("now all the triggers are done and deleted");
+
+	free(t1);
+	free(t2);
+	if (tctx.ctx.member != NULL)
+		swim_member_unref(tctx.ctx.member);
+
+	swim_finish_test();
+}
+
 static int
 main_f(va_list ap)
 {
-	swim_start_test(20);
+	swim_start_test(21);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -979,6 +1122,7 @@ main_f(va_list ap)
 	swim_test_indirect_ping();
 	swim_test_encryption();
 	swim_test_slow_net();
+	swim_test_triggers();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 25fdb8833..3ebfe7ea0 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..20
+1..21
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -200,4 +200,30 @@ ok 19 - subtests
     # slow network leads to idle round steps, they should not produce a new message
 ok 20 - subtests
 	*** swim_test_slow_net: done ***
+	*** swim_test_triggers ***
+    1..21
+    ok 1 - trigger is fired
+    ok 2 - is not deleted
+    ok 3 - ctx.member is set
+    ok 4 - ctx.events is set
+    ok 5 - payload is delivered, trigger caught that
+    ok 6 - S1 got S2' payload
+    ok 7 - mask says that
+    ok 8 - self payload is updated
+    ok 9 - self is set as a member
+    ok 10 - both incarnation and payload events are presented
+    ok 11 - suspicion fired a trigger
+    ok 12 - status suspected
+    ok 13 - death fired a trigger
+    ok 14 - status dead
+    ok 15 - drop fired a trigger
+    ok 16 - status dropped
+    ok 17 - dropped member is not presented in the member table
+    ok 18 - but is in the event context
+    ok 19 - yielding trigger is fired
+    ok 20 - non-yielding still is not
+    ok 21 - trigger is not deleted until all currently sleeping triggers are finished
+    # now all the triggers are done and deleted
+ok 21 - subtests
+	*** swim_test_triggers: done ***
 	*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index f72fa2450..463c62390 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -36,6 +36,7 @@
 #include "trivia/util.h"
 #include "fiber.h"
 #include "msgpuck.h"
+#include "trigger.h"
 
 /**
  * Drop rate packet filter to drop packets with a certain
@@ -191,12 +192,32 @@ swim_cluster_id_to_uri(char *buffer, int id)
 	sprintf(buffer, "127.0.0.1:%d", id + 1);
 }
 
+/**
+ * A trigger to check correctness of event context, and ability
+ * to yield.
+ */
+void
+swim_test_event_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct swim_on_member_event_ctx *ctx =
+		(struct swim_on_member_event_ctx *) event;
+	assert(ctx->events != 0);
+	assert((ctx->events & SWIM_EV_NEW) == 0 ||
+	       (ctx->events & SWIM_EV_DROP) == 0);
+	fiber_sleep(0);
+}
+
 /** Create a SWIM cluster node @a n with a 0-based @a id. */
 static inline void
 swim_node_create(struct swim_node *n, int id)
 {
 	n->swim = swim_new();
 	assert(n->swim != NULL);
+	struct trigger *t = (struct trigger *) malloc(sizeof(*t));
+	trigger_create(t, swim_test_event_cb, NULL, (trigger_f0) free);
+	trigger_add(swim_trigger_list_on_member_event(n->swim), t);
+
 	char uri[128];
 	swim_cluster_id_to_uri(uri, id);
 	n->uuid = uuid_nil;
@@ -303,7 +324,7 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id)
 			       swim_member_uri(from), swim_member_uuid(from));
 }
 
-static const struct swim_member *
+const struct swim_member *
 swim_cluster_member_view(struct swim_cluster *cluster, int node_id,
 			 int member_id)
 {
@@ -615,6 +636,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 	 * whatsoever.
 	 */
 	swim_test_transport_do_loop_step(loop);
+	if (cluster != NULL)
+		swim_cluster_run_triggers(cluster);
 	while (! check(cluster, data)) {
 		if (swim_time() >= deadline)
 			return -1;
@@ -625,6 +648,8 @@ swim_wait_timeout(double timeout, struct swim_cluster *cluster,
 		 * too.
 		 */
 		swim_test_transport_do_loop_step(loop);
+		if (cluster != NULL)
+			swim_cluster_run_triggers(cluster);
 	}
 	return 0;
 }
@@ -868,6 +893,23 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
 				 swim_loop_check_member_everywhere, &t);
 }
 
+void
+swim_cluster_run_triggers(struct swim_cluster *cluster)
+{
+	bool has_events;
+	do {
+		has_events = false;
+		struct swim_node *n = cluster->node;
+		for (int i = 0; i < cluster->size; ++i, ++n) {
+			if (n->swim != NULL &&
+			    swim_has_pending_events(n->swim)) {
+				has_events = true;
+				fiber_sleep(0);
+			}
+		}
+	} while (has_events);
+}
+
 bool
 swim_error_check_match(const char *msg)
 {
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 0e6f29d80..fde84e39b 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -168,6 +168,14 @@ int
 swim_cluster_member_set_payload(struct swim_cluster *cluster, int i,
 				const char *payload, int size);
 
+/**
+ * Get a member object stored in a SWIM node @a node_id and
+ * showing a known state of a SWIM node @a member_id.
+ */
+const struct swim_member *
+swim_cluster_member_view(struct swim_cluster *cluster, int node_id,
+			 int member_id);
+
 /**
  * Check if in the cluster every instance knowns the about other
  * instances.
@@ -229,6 +237,10 @@ swim_cluster_wait_payload_everywhere(struct swim_cluster *cluster,
 				     int member_id, const char *payload,
 				     int payload_size, double timeout);
 
+/** Run all pending triggers in the cluster. */
+void
+swim_cluster_run_triggers(struct swim_cluster *cluster);
+
 /** Process SWIM events for @a duration fake seconds. */
 void
 swim_run_for(double duration);
-- 
2.20.1 (Apple Git-117)





More information about the Tarantool-patches mailing list