Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org,
	Konstantin Osipov <kostja@tarantool.org>
Subject: [tarantool-patches] Re: [PATCH 1/1] swim: introduce SWIM's anti-entropy component
Date: Thu, 7 Mar 2019 15:33:53 +0300	[thread overview]
Message-ID: <7fe095c9-bcab-f5dc-54e4-3ac326d27c02@tarantool.org> (raw)
In-Reply-To: <20190307102723.GD5263@chai>



On 07/03/2019 13:27, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/03/07 11:19]:
>> Sorry again - ignore the whole email. I've found a bug
>> which required drastic changes in the event loop
>> implementation.
> 
> Please submit the fix in a follow up patch. I will not review the
> entire several thousands of lines once again.

As I said, I changed mostly swim_test_transport.c and swim_test_ev.c.
Swim.c totally adds 21 lines. It is not about new 1000 lines.

But as you wish. I attached a diff below.

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 19d7935b0..4b401800a 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -175,6 +175,14 @@ swim_uuid_str(const struct tt_uuid *uuid)
  	return buf;
  }
  
+/** Check if two AF_INET addresses are equal. */
+static bool
+swim_sockaddr_in_eq(const struct sockaddr_in *a1, const struct sockaddr_in *a2)
+{
+	return a1->sin_port == a2->sin_port &&
+	       a1->sin_addr.s_addr == a2->sin_addr.s_addr;
+}
+
  /**
   * A cluster member description. This structure describes the
   * last known state of an instance. This state is updated
@@ -268,20 +276,18 @@
-/**
- * SWIM fd mainly is needed to be printed into the logs in order
- * to distinguish between different SWIM instances logs.
- */
-static inline int
+int
  swim_fd(const struct swim *swim)
  {
  	return swim->scheduler.transport.fd;
@@ -575,8 +579,17 @@ swim_complete_step(struct swim_task *task,
  	(void) task;
  	struct swim *swim = swim_by_scheduler(scheduler);
  	swim_ev_timer_start(loop(), &swim->round_tick);
-	rlist_shift_entry(&swim->round_queue, struct swim_member,
-			  in_round_queue);
+	/*
+	 * It is possible that the original member was deleted
+	 * manually during the task execution.
+	 */
+	if (rlist_empty(&swim->round_queue))
+		return;
+	struct swim_member *m =
+		rlist_first_entry(&swim->round_queue, struct swim_member,
+				  in_round_queue);
+	if (swim_sockaddr_in_eq(&m->addr, &task->dst))
+		rlist_shift(&swim->round_queue);
  }
  
  /**
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 8b82df051..744214371 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -80,6 +80,14 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
  void
  swim_delete(struct swim *swim);
  
+/**
+ * SWIM fd mainly is needed to be printed into the logs in order
+ * to distinguish between different SWIM instances logs. And for
+ * unit testing.
+ */
+int
+swim_fd(const struct swim *swim);
+
  /** Add a new member. */
  int
  swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid);
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 78a69b7c6..ebb9418d4 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -149,7 +149,7 @@ swim_test_cfg(void)
  static void
  swim_test_add_remove(void)
  {
-	swim_start_test(12);
+	swim_start_test(13);
  
  	struct swim_cluster *cluster = swim_cluster_new(2);
  	swim_cluster_add_link(cluster, 0, 1);
@@ -189,6 +189,18 @@ swim_test_add_remove(void)
  	   "after removal the cluster is not in fullmesh");
  	is(swim_cluster_wait_fullmesh(cluster, 1), 0,
  	   "but it is back in 1 step");
+
+	/*
+	 * On each step s1 sends itself to s2. However s2 can be
+	 * removed from s1 after the message is scheduled but
+	 * before its completion.
+	 */
+	swim_cluster_block_io(cluster, 0, 2);
+	swim_do_loop_step(loop());
+	swim_remove_member(s1, swim_member_uuid(s2_self));
+	is(swim_cluster_wait_fullmesh(cluster, 1), 0,
+	   "back in fullmesh after a member removal in the middle of a step");
+
  	swim_cluster_delete(cluster);
  
  	swim_finish_test();
@@ -200,7 +212,6 @@ main_f(va_list ap)
  	swim_start_test(5);
  
  	(void) ap;
-	struct ev_loop *loop = loop();
  	swim_test_ev_init();
  	swim_test_transport_init();
  
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 73e1504a3..e71d6cfc2 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -38,7 +38,7 @@ ok 3 - subtests
  ok 4 - subtests
  	*** swim_test_cfg: done ***
  	*** swim_test_add_remove ***
-    1..12
+    1..13
      ok 1 - can not add an existing member
      ok 2 - diag says 'already exists'
      ok 3 - can not add a invalid uri
@@ -51,6 +51,7 @@ ok 4 - subtests
      ok 10 - remove of a not existing member
      ok 11 - after removal the cluster is not in fullmesh
      ok 12 - but it is back in 1 step
+    ok 13 - back in fullmesh after a member removal in the middle of a step
  ok 5 - subtests
  	*** swim_test_add_remove: done ***
  	*** main_f: done ***
diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c
index b70a8b9c8..950784aec 100644
--- a/test/unit/swim_test_ev.c
+++ b/test/unit/swim_test_ev.c
@@ -49,31 +49,42 @@ static double watch = 0;
  static int event_id = 0;
  
  /**
- * Timed event's description and attributes used to process it.
- * Note, that IO events are not wrapped with that and processed
- * separately in test transport module.
+ * SWIM testing event loop has two event types - natural libev
+ * events like timer, and artificial like fake socket blocking.
   */
-struct swim_test_event {
-	/**
-	 * Event mask with which libev watcher's callback should
-	 * be invoked on that event.
-	 */
-	int revents;
-	/**
-	 * Libev watcher. Used to store callback and to find the
-	 * event by watcher pointer. It is necessary because SWIM
-	 * operates by libev watchers.
-	 */
-	struct ev_watcher *watcher;
+enum swim_event_type {
+	SWIM_EVENT_TIMER,
+	SWIM_EVENT_FD_UNBLOCK,
+};
+
+struct swim_event;
+
+typedef void (*swim_event_process_f)(struct swim_event *, struct ev_loop *);
+typedef void (*swim_event_delete_f)(struct swim_event *);
+
+/**
+ * Base event. It is stored in the event heap and virtualizes
+ * other events.
+ */
+struct swim_event {
+	/** Type, for assertions only. */
+	enum swim_event_type type;
  	/**
  	 * When that event should be invoked according to the fake
  	 * watch.
  	 */
  	double deadline;
-	/** A link in the events heap. */
-	struct heap_node in_events_heap;
+	/** A link in the event heap. */
+	struct heap_node in_event_heap;
  	/** ID to sort events with the same deadline. */
  	int id;
+	/**
+	 * Process the event. Usually the event is deleted right
+	 * after that.
+	 */
+	swim_event_process_f process;
+	/** Just delete the event. Called on event heap reset. */
+	swim_event_delete_f delete;
  };
  
  /**
@@ -81,68 +92,162 @@ struct swim_test_event {
   * deadline and the smallest ID in that deadline.
   */
  static inline bool
-swim_test_event_less(const struct swim_test_event *e1,
-		     const struct swim_test_event *e2)
+swim_event_less(const struct swim_event *e1, const struct swim_event *e2)
  {
  	if (e1->deadline == e2->deadline)
  		return e1->id < e2->id;
  	return e1->deadline < e2->deadline;
  }
  
-#define HEAP_NAME events_heap
-#define HEAP_LESS(h, e1, e2) swim_test_event_less(e1, e2)
-#define heap_value_t struct swim_test_event
-#define heap_value_attr in_events_heap
+#define HEAP_NAME event_heap
+#define HEAP_LESS(h, e1, e2) swim_event_less(e1, e2)
+#define heap_value_t struct swim_event
+#define heap_value_attr in_event_heap
  #include "salad/heap.h"
  
-/** Events heap. Event loop pops them from here. */
-static heap_t events_heap;
+/** Event heap. Event loop pops them from here. */
+static heap_t event_heap;
  
-/** Libev watcher is matched to exactly one test event here. */
+/** Libev watcher is matched to exactly one event here. */
  static struct mh_i64ptr_t *events_hash;
  
  /**
- * Create a new event which should happen after @a delay and with
- * the specified events mask on behalf of @a watcher.
+ * Create a new event which should call @a process after @a delay
+ * fake seconds. @A delete is called explicitly when the event
+ * is deleted by SWIM explicitly, and when the event heap is
+ * reset.
   */
  static void
-swim_test_event_new(struct ev_watcher *watcher, double delay, int revents)
+swim_event_create(struct swim_event *e, enum swim_event_type type, double delay,
+		  swim_event_process_f process, swim_event_delete_f delete)
  {
-	struct swim_test_event *e =
-		(struct swim_test_event *) malloc(sizeof(*e));
-	assert(e != NULL);
-	e->watcher = watcher;
  	e->deadline = swim_time() + delay;
-	e->revents = revents;
  	e->id = event_id++;
-	events_heap_insert(&events_heap, e);
-	struct mh_i64ptr_node_t old = {0, NULL}, *old_p = &old;
-	struct mh_i64ptr_node_t node = {(uint64_t) watcher, e};
-	mh_int_t rc = mh_i64ptr_put(events_hash, &node, &old_p, NULL);
-	(void) rc;
-	assert(rc != mh_end(events_hash));
-	assert(old.val == NULL && old.key == 0);
+	e->process = process;
+	e->delete = delete;
+	e->type = type;
+	event_heap_insert(&event_heap, e);
  }
  
-/** Delete event, free resources. */
+/** Destroy a basic event. */
  static inline void
-swim_test_event_delete(struct swim_test_event *e)
+swim_event_destroy(struct swim_event *e)
  {
-	events_heap_delete(&events_heap, e);
-	mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) e->watcher, NULL);
-	assert(rc != mh_end(events_hash));
-	mh_i64ptr_del(events_hash, rc, NULL);
-	free(e);
+	event_heap_delete(&event_heap, e);
+}
+
+/** Destroy a event and free its resources. */
+static inline void
+swim_event_delete(struct swim_event *e)
+{
+	e->delete(e);
  }
  
  /** Find an event by @a watcher. */
-static struct swim_test_event *
-swim_test_event_by_ev(struct ev_watcher *watcher)
+static struct swim_event *
+swim_event_by_ev(struct ev_watcher *watcher)
  {
  	mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) watcher, NULL);
  	if (rc == mh_end(events_hash))
  		return NULL;
-	return (struct swim_test_event *) mh_i64ptr_node(events_hash, rc)->val;
+	return (struct swim_event *) mh_i64ptr_node(events_hash, rc)->val;
+}
+
+/** Timer event generated by libev. */
+struct swim_timer_event {
+	struct swim_event base;
+	/**
+	 * Libev watcher. Used to store callback and to find the
+	 * event by watcher pointer. It is necessary because SWIM
+	 * operates by libev watchers.
+	 */
+	struct ev_watcher *watcher;
+};
+
+/** Destroy a timer event and free its resources. */
+static void
+swim_timer_event_delete(struct swim_event *e)
+{
+	assert(e->type == SWIM_EVENT_TIMER);
+	struct swim_timer_event *te = (struct swim_timer_event *) e;
+	mh_int_t rc = mh_i64ptr_find(events_hash, (uint64_t) te->watcher, NULL);
+	assert(rc != mh_end(events_hash));
+	mh_i64ptr_del(events_hash, rc, NULL);
+	swim_event_destroy(e);
+	free(te);
+}
+
+/** Process a timer event and delete it. */
+static void
+swim_timer_event_process(struct swim_event *e, struct ev_loop *loop)
+{
+	assert(e->type == SWIM_EVENT_TIMER);
+	struct ev_watcher *w = ((struct swim_timer_event *) e)->watcher;
+	swim_timer_event_delete(e);
+	ev_invoke(loop, w, EV_TIMER);
+}
+
+/** Create a new timer event. */
+static void
+swim_timer_event_new(struct ev_watcher *watcher, double delay)
+{
+	struct swim_timer_event *e =
+		(struct swim_timer_event *) malloc(sizeof(*e));
+	assert(e != NULL);
+	swim_event_create(&e->base, SWIM_EVENT_TIMER, delay,
+			  swim_timer_event_process, swim_timer_event_delete);
+	e->watcher = watcher;
+	assert(swim_event_by_ev(watcher) == NULL);
+	struct mh_i64ptr_node_t node = {(uint64_t) watcher, e};
+	mh_int_t rc = mh_i64ptr_put(events_hash, &node, NULL, NULL);
+	(void) rc;
+	assert(rc != mh_end(events_hash));
+}
+
+/**
+ * SWIM fake transport's event. It is used to block a fake file
+ * descriptor for a delay. Right after a block that event is
+ * generated to unblock the descriptor later.
+ */
+struct swim_fd_unblock_event {
+	struct swim_event base;
+	/** A fake file descriptor to unlock. */
+	int fd;
+};
+
+/** Delete a fd unblock event. */
+static void
+swim_fd_unblock_event_delete(struct swim_event *e)
+{
+	assert(e->type == SWIM_EVENT_FD_UNBLOCK);
+	swim_event_destroy(e);
+	free(e);
+}
+
+/** Process and delete a fd unblock event. */
+static void
+swim_fd_unblock_event_process(struct swim_event *e, struct ev_loop *loop)
+{
+	(void) loop;
+	assert(e->type == SWIM_EVENT_FD_UNBLOCK);
+	struct swim_fd_unblock_event *fe = (struct swim_fd_unblock_event *) e;
+	swim_test_transport_unblock_fd(fe->fd);
+	swim_fd_unblock_event_delete(e);
+}
+
+void
+swim_test_ev_block_fd(int fd, double delay)
+{
+	struct swim_fd_unblock_event *e =
+		(struct swim_fd_unblock_event *) malloc(sizeof(*e));
+	assert(e != NULL);
+	/* Block now. */
+	swim_test_transport_block_fd(fd);
+	/* Unblock after delay. */
+	swim_event_create(&e->base, SWIM_EVENT_FD_UNBLOCK, delay,
+			  swim_fd_unblock_event_process,
+			  swim_fd_unblock_event_delete);
+	e->fd = fd;
  }
  
  /** Implementation of global time visible in SWIM. */
@@ -159,24 +264,25 @@ swim_time(void)
  void
  swim_ev_timer_start(struct ev_loop *loop, struct ev_timer *base)
  {
-	if (swim_test_event_by_ev((struct ev_watcher *) base) != NULL)
+	(void) loop;
+	if (swim_event_by_ev((struct ev_watcher *) base) != NULL)
  		return;
  	/* Create the periodic watcher and one event. */
-	swim_test_event_new((struct ev_watcher *) base, base->at, EV_TIMER);
+	swim_timer_event_new((struct ev_watcher *) base, base->at);
  }
  
  /** Time stop cancels the event if the timer is active. */
  void
  swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *base)
  {
+	(void) loop;
  	/*
  	 * Delete the watcher and its events. Should be only one.
  	 */
-	struct swim_test_event *e =
-		swim_test_event_by_ev((struct ev_watcher *) base);
+	struct swim_event *e = swim_event_by_ev((struct ev_watcher *) base);
  	if (e == NULL)
  		return;
-	swim_test_event_delete(e);
+	swim_event_delete(e);
  }
  
  /** Process all the events with the next nearest deadline. */
@@ -184,17 +290,16 @@ void
  swim_do_loop_step(struct ev_loop *loop)
  {
  	say_verbose("Loop watch %f", watch);
-	struct swim_test_event *e = events_heap_top(&events_heap);
+	struct swim_event *next_e, *e = event_heap_top(&event_heap);
  	if (e != NULL) {
  		assert(e->deadline >= watch);
  		/* Multiple events can have the same deadline. */
  		watch = e->deadline;
  		do {
-			int revents = e->revents;
-			struct ev_watcher *w = e->watcher;
-			swim_test_event_delete(e);
-			ev_invoke(loop, w, revents);
-			e = events_heap_top(&events_heap);
+			e->process(e, loop);
+			next_e = event_heap_top(&event_heap);
+			assert(e != next_e);
+			e = next_e;
  		} while (e != NULL && e->deadline == watch);
  	}
  	/*
@@ -227,9 +332,9 @@ swim_do_loop_step(struct ev_loop *loop)
  void
  swim_test_ev_reset(void)
  {
-	struct swim_test_event *e;
-	while ((e = events_heap_top(&events_heap)) != NULL)
-		swim_test_event_delete(e);
+	struct swim_event *e;
+	while ((e = event_heap_top(&event_heap)) != NULL)
+		swim_event_delete(e);
  	assert(mh_size(events_hash) == 0);
  	event_id = 0;
  	watch = 0;
@@ -240,13 +345,13 @@ swim_test_ev_init(void)
  {
  	events_hash = mh_i64ptr_new();
  	assert(events_hash != NULL);
-	events_heap_create(&events_heap);
+	event_heap_create(&event_heap);
  }
  
  void
  swim_test_ev_free(void)
  {
  	swim_test_ev_reset();
-	events_heap_destroy(&events_heap);
+	event_heap_destroy(&event_heap);
  	mh_i64ptr_delete(events_hash);
  }
diff --git a/test/unit/swim_test_ev.h b/test/unit/swim_test_ev.h
index 788ac06e9..808bc510e 100644
--- a/test/unit/swim_test_ev.h
+++ b/test/unit/swim_test_ev.h
@@ -47,6 +47,10 @@ swim_test_ev_init(void);
  void
  swim_test_ev_free(void);
  
+/** Block a file descriptor @a fd for @a delay fake seconds. */
+void
+swim_test_ev_block_fd(int fd, double delay);
+
  /** Play one step of event loop, process generated events. */
  void
  swim_do_loop_step(struct ev_loop *loop);
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index d49b92885..ee50e3922 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -225,6 +225,22 @@ swim_transport_create(struct swim_transport *transport)
  	memset(&transport->addr, 0, sizeof(transport->addr));
  }
  
+void
+swim_test_transport_block_fd(int fd)
+{
+	struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
+	assert(! rlist_empty(&sfd->in_opened));
+	rlist_del_entry(sfd, in_opened);
+}
+
+void
+swim_test_transport_unblock_fd(int fd)
+{
+	struct swim_fd *sfd = &swim_fd[fd - FAKE_FD_BASE];
+	assert(rlist_empty(&sfd->in_opened));
+	rlist_add_tail_entry(&swim_fd_opened, sfd, in_opened);
+}
+
  /** Send one packet to destination's recv queue. */
  static inline void
  swim_fd_send_packet(struct swim_fd *fd)
diff --git a/test/unit/swim_test_transport.h b/test/unit/swim_test_transport.h
index c9739e250..f8066e636 100644
--- a/test/unit/swim_test_transport.h
+++ b/test/unit/swim_test_transport.h
@@ -47,6 +47,17 @@ struct ev_loop;
  void
  swim_transport_do_loop_step(struct ev_loop *loop);
  
+/**
+ * Block a file descriptor so as it can not receive nor send any
+ * packets.
+ */
+void
+swim_test_transport_block_fd(int fd);
+
+/** Unblock a file descriptor. */
+void
+swim_test_transport_unblock_fd(int fd);
+
  /** Initialize test transport system. */
  void
  swim_test_transport_init(void);
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 20775effb..73f8db40f 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -92,6 +92,12 @@ swim_cluster_node(struct swim_cluster *cluster, int i)
  	return cluster->node[i];
  }
  
+void
+swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay)
+{
+	swim_test_ev_block_fd(swim_fd(cluster->node[i]), delay);
+}
+
  /** Check if @a s1 knows every member of @a s2's table. */
  static inline bool
  swim1_contains_swim2(struct swim *s1, struct swim *s2)
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 19eea551d..56036422d 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -56,6 +56,10 @@ swim_error_check_match(const char *msg);
  struct swim *
  swim_cluster_node(struct swim_cluster *cluster, int i);
  
+/** Block IO on a SWIM instance with id @a i. */
+void
+swim_cluster_block_io(struct swim_cluster *cluster, int i, double delay);
+
  /**
   * Explicitly add a member of id @a from_id to a member of id
   * @a to_id.


> 
> 
> -- 
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
> 

  reply	other threads:[~2019-03-07 12:33 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-03-06 15:13 [tarantool-patches] " Vladislav Shpilevoy
2019-03-06 15:16 ` [tarantool-patches] " Vladislav Shpilevoy
2019-03-06 21:24 ` Vladislav Shpilevoy
2019-03-07 10:27   ` Konstantin Osipov
2019-03-07 12:33     ` Vladislav Shpilevoy [this message]
2019-03-07 13:20       ` Konstantin Osipov
2019-03-07 13:54         ` Vladislav Shpilevoy
2019-03-07 10:22 ` Konstantin Osipov
2019-03-07 12:33   ` Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=7fe095c9-bcab-f5dc-54e4-3ac326d27c02@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='[tarantool-patches] Re: [PATCH 1/1] swim: introduce SWIM'\''s anti-entropy component' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox