[tarantool-patches] [PATCH 1/6] swim: follow-ups for SWIM anti-entropy

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Mar 20 13:49:14 MSK 2019


* fix some obvious errors in swim test utils;

* fix a bug with NULL URI garbage on recfg;

* fix typos in a comment and in a log message in swim.c;

* do not start any timers in swim_cfg. Indeed, round timer should
  start only when at least one new member is added except self,
  and it is already done in swim_new_member;

* log not only round begin, but each round step - it helps in
  debug, but does not affect production anyway because the logs
  are verbose;

* in SWIM's event loop log new watch value instead of the old
  one - turned out, that new is more useful for debug;

* log 'process <name> component' inside swim_process_<name>()
  functions. It is needed for failure detection, where a log of
  kind 'process failure detection' says nothing - much better to
  say 'process ping from', or 'process ack';

* in swim tests instead of swim_cluster_wait_...(max_steps) use
  swim_cluster_wait_...(timeout). Step count restriction appeared
  to be useful for anti-entropy being equal to number of round
  steps, but it is not so once failure detection appears. Replies
  for failure detection requests does not depend on SWIM
  heartbeat and affect step count in a not trivial way - it makes
  test writing, debugging and supporting much harder.

Follow-up for 03b9a6e91baf246ee2bb9841d01ba3824b6768a6
---
 src/lib/swim/swim.c             | 23 ++++++++++++++---------
 src/lib/swim/swim_io.c          |  9 +++++----
 src/lib/swim/swim_io.h          |  6 +++++-
 src/lib/swim/swim_proto.c       |  1 +
 test/unit/swim.c                |  5 ++++-
 test/unit/swim.result           | 19 ++++++++++---------
 test/unit/swim_test_ev.c        |  2 +-
 test/unit/swim_test_transport.c | 13 ++++++++++---
 test/unit/swim_test_utils.c     | 13 ++++++++-----
 test/unit/swim_test_utils.h     |  6 ++----
 10 files changed, 60 insertions(+), 37 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 4b401800a..c2b2132a2 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -206,7 +206,7 @@ struct swim_member {
 	 */
 	struct tt_uuid uuid;
 	/**
-	 * Cached hash of the uuid for the members table lookups.
+	 * Cached hash of the uuid for the member table lookups.
 	 */
 	uint32_t hash;
 	/**
@@ -406,7 +406,8 @@ swim_new_member(struct swim *swim, const struct sockaddr_in *addr,
 		diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
 		return NULL;
 	}
-	swim_ev_timer_start(loop(), &swim->round_tick);
+	if (mh_size(swim->members) > 1)
+		swim_ev_timer_start(loop(), &swim->round_tick);
 	say_verbose("SWIM %d: member %s is added, total is %d", swim_fd(swim),
 		    swim_uuid_str(&member->uuid), mh_size(swim->members));
 	return member;
@@ -448,8 +449,9 @@ swim_new_round(struct swim *swim)
 			    swim_fd(swim));
 		return 0;
 	}
+	/* -1 for self. */
 	say_verbose("SWIM %d: start a new round with %d members", swim_fd(swim),
-		    size);
+		    size - 1);
 	swim_shuffle_members(swim);
 	rlist_create(&swim->round_queue);
 	for (int i = 0; i < size; ++i) {
@@ -550,7 +552,9 @@ swim_begin_step(struct ev_loop *loop, struct ev_timer *t, int events)
 	(void) events;
 	(void) loop;
 	struct swim *swim = (struct swim *) t->data;
-	if (rlist_empty(&swim->round_queue) && swim_new_round(swim) != 0) {
+	if (! rlist_empty(&swim->round_queue)) {
+		say_verbose("SWIM %d: continue the round", swim_fd(swim));
+	} else if (swim_new_round(swim) != 0) {
 		diag_log();
 		return;
 	}
@@ -675,6 +679,7 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def)
 static int
 swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
 {
+	say_verbose("SWIM %d: process anti-entropy", swim_fd(swim));
 	const char *prefix = "invalid anti-entropy message:";
 	uint32_t size;
 	if (swim_decode_array(pos, end, &size, prefix, "root") != 0)
@@ -726,8 +731,6 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
 			goto error;
 		switch(key) {
 		case SWIM_ANTI_ENTROPY:
-			say_verbose("SWIM %d: process anti-entropy",
-				    swim_fd(swim));
 			if (swim_process_anti_entropy(swim, &pos, end) != 0)
 				goto error;
 			break;
@@ -760,7 +763,8 @@ swim_new(void)
 	swim_ev_timer_init(&swim->round_tick, swim_begin_step,
 			   HEARTBEAT_RATE_DEFAULT, 0);
 	swim->round_tick.data = (void *) swim;
-	swim_task_create(&swim->round_step_task, swim_complete_step, NULL);
+	swim_task_create(&swim->round_step_task, swim_complete_step, NULL,
+			 "round packet");
 	swim_scheduler_create(&swim->scheduler, swim_on_input);
 	return swim;
 }
@@ -845,11 +849,12 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 		 * specified.
 		 */
 		addr = swim->scheduler.transport.addr;
+	} else {
+		addr = swim->self->addr;
 	}
 	if (swim->round_tick.at != heartbeat_rate && heartbeat_rate > 0)
 		swim_ev_timer_set(&swim->round_tick, heartbeat_rate, 0);
 
-	swim_ev_timer_start(loop(), &swim->round_tick);
 	swim_update_member_addr(swim, swim->self, &addr);
 	int rc = swim_update_member_uuid(swim, swim->self, uuid);
 	/* Reserved above. */
@@ -892,7 +897,7 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid)
 	assert(swim_is_configured(swim));
 	const char *prefix = "swim.remove_member:";
 	if (uuid == NULL || tt_uuid_is_nil(uuid)) {
-		diag_set(SwimError, "%s UUiD is mandatory", prefix);
+		diag_set(SwimError, "%s UUID is mandatory", prefix);
 		return -1;
 	}
 	struct swim_member *member = swim_find_member(swim, uuid);
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 9c16d1ad3..015968a0d 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -57,11 +57,12 @@ swim_packet_create(struct swim_packet *packet)
 
 void
 swim_task_create(struct swim_task *task, swim_task_f complete,
-		 swim_task_f cancel)
+		 swim_task_f cancel, const char *desc)
 {
 	memset(task, 0, sizeof(*task));
 	task->complete = complete;
 	task->cancel = cancel;
+	task->desc = desc;
 	swim_packet_create(&task->packet);
 	rlist_create(&task->in_queue_output);
 }
@@ -170,9 +171,9 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
 	struct swim_task *task =
 		rlist_shift_entry(&scheduler->queue_output, struct swim_task,
 				  in_queue_output);
-	say_verbose("SWIM %d: send to %s", swim_scheduler_fd(scheduler),
-		    sio_strfaddr((struct sockaddr *) &task->dst,
-				 sizeof(task->dst)));
+	say_verbose("SWIM %d: send %s to %s", swim_scheduler_fd(scheduler),
+		    task->desc, sio_strfaddr((struct sockaddr *) &task->dst,
+					     sizeof(task->dst)));
 	struct swim_meta_header_bin header;
 	swim_meta_header_bin_create(&header, &scheduler->transport.addr);
 	memcpy(task->packet.meta, &header, sizeof(header));
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 68fb89818..bc62a29ce 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -201,6 +201,10 @@ struct swim_task {
 	struct sockaddr_in dst;
 	/** Place in a queue of tasks. */
 	struct rlist in_queue_output;
+	/**
+	 * A short description of the packet content. For logging.
+	 */
+	const char *desc;
 };
 
 /**
@@ -213,7 +217,7 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst,
 /** Initialize the task, without scheduling. */
 void
 swim_task_create(struct swim_task *task, swim_task_f complete,
-		 swim_task_f cancel);
+		 swim_task_f cancel, const char *desc);
 
 /** Destroy the task, pop from the queue. */
 static inline void
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 9c0d49657..bf4c09b24 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -288,6 +288,7 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 	if (swim_decode_map(pos, end, &size, prefix, "root") != 0)
 		return -1;
 	memset(def, 0, sizeof(*def));
+	def->src.sin_family = AF_INET;
 	for (uint32_t i = 0; i < size; ++i) {
 		uint64_t key;
 		if (swim_decode_uint(pos, end, &key, prefix, "a key") != 0)
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 29e9eb4f4..921fc8f07 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -109,7 +109,7 @@ swim_test_uuid_update(void)
 static void
 swim_test_cfg(void)
 {
-	swim_start_test(15);
+	swim_start_test(16);
 
 	struct swim *s = swim_new();
 	assert(s != NULL);
@@ -123,6 +123,9 @@ swim_test_cfg(void)
 	is(swim_cfg(s, uri, -1, &uuid), 0, "configured first time");
 	is(swim_cfg(s, NULL, -1, NULL), 0, "second time can omit URI, UUID");
 	is(swim_cfg(s, NULL, 2, NULL), 0, "hearbeat is dynamic");
+	const char *self_uri = swim_member_uri(swim_self(s));
+	is(strcmp(self_uri, uri), 0, "URI is unchanged after recfg with NULL "\
+	   "URI");
 
 	struct swim *s2 = swim_new();
 	assert(s2 != NULL);
diff --git a/test/unit/swim.result b/test/unit/swim.result
index e71d6cfc2..e8991d8d8 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -19,7 +19,7 @@ ok 2 - subtests
 ok 3 - subtests
 	*** swim_test_uuid_update: done ***
 	*** swim_test_cfg ***
-    1..15
+    1..16
     ok 1 - first cfg failed - no URI
     ok 2 - diag says 'mandatory'
     ok 3 - first cfg failed - no UUID
@@ -27,14 +27,15 @@ ok 3 - subtests
     ok 5 - configured first time
     ok 6 - second time can omit URI, UUID
     ok 7 - hearbeat is dynamic
-    ok 8 - can not use invalid URI
-    ok 9 - diag says 'invalid uri'
-    ok 10 - can not use domain names
-    ok 11 - diag says 'invalid uri'
-    ok 12 - UNIX sockets are not supported
-    ok 13 - diag says 'only IP'
-    ok 14 - can not bind to an occupied port
-    ok 15 - diag says 'bind'
+    ok 8 - URI is unchanged after recfg with NULL URI
+    ok 9 - can not use invalid URI
+    ok 10 - diag says 'invalid uri'
+    ok 11 - can not use domain names
+    ok 12 - diag says 'invalid uri'
+    ok 13 - UNIX sockets are not supported
+    ok 14 - diag says 'only IP'
+    ok 15 - can not bind to an occupied port
+    ok 16 - diag says 'bind'
 ok 4 - subtests
 	*** swim_test_cfg: done ***
 	*** swim_test_add_remove ***
diff --git a/test/unit/swim_test_ev.c b/test/unit/swim_test_ev.c
index 950784aec..ee1fcdbb7 100644
--- a/test/unit/swim_test_ev.c
+++ b/test/unit/swim_test_ev.c
@@ -289,12 +289,12 @@ swim_ev_timer_stop(struct ev_loop *loop, struct ev_timer *base)
 void
 swim_do_loop_step(struct ev_loop *loop)
 {
-	say_verbose("Loop watch %f", watch);
 	struct swim_event *next_e, *e = event_heap_top(&event_heap);
 	if (e != NULL) {
 		assert(e->deadline >= watch);
 		/* Multiple events can have the same deadline. */
 		watch = e->deadline;
+		say_verbose("Loop watch %f", watch);
 		do {
 			e->process(e, loop);
 			next_e = event_heap_top(&event_heap);
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index ee50e3922..d6591e969 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -78,6 +78,13 @@ swim_test_packet_new(const char *data, int size, const struct sockaddr_in *src,
 	return p;
 }
 
+/** Free packet memory. */
+static inline void
+swim_test_packet_delete(struct swim_test_packet *p)
+{
+	free(p);
+}
+
 /** Fake file descriptor. */
 struct swim_fd {
 	/** File descriptor number visible to libev. */
@@ -122,9 +129,9 @@ swim_fd_close(struct swim_fd *fd)
 {
 	struct swim_test_packet *i, *tmp;
 	rlist_foreach_entry_safe(i, &fd->recv_queue, in_queue, tmp)
-		free(i);
+		swim_test_packet_delete(i);
 	rlist_foreach_entry_safe(i, &fd->send_queue, in_queue, tmp)
-		free(i);
+		swim_test_packet_delete(i);
 	rlist_del_entry(fd, in_opened);
 }
 
@@ -188,7 +195,7 @@ swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size,
 	*addr_size = sizeof(p->src);
 	ssize_t result = MIN((size_t) p->size, size);
 	memcpy(buffer, p->data, result);
-	free(p);
+	swim_test_packet_delete(p);
 	return result;
 }
 
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 73f8db40f..a92e55233 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -31,6 +31,7 @@
 #include "swim_test_utils.h"
 #include "swim_test_ev.h"
 #include "swim/swim.h"
+#include "swim/swim_ev.h"
 #include "uuid/tt_uuid.h"
 #include "trivia/util.h"
 #include "fiber.h"
@@ -111,7 +112,7 @@ swim1_contains_swim2(struct swim *s1, struct swim *s2)
 		}
 	}
 	swim_iterator_close(it);
-	return false;
+	return true;
 }
 
 bool
@@ -129,13 +130,15 @@ swim_cluster_is_fullmesh(struct swim_cluster *cluster)
 }
 
 int
-swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps)
+swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout)
 {
-	while (! swim_cluster_is_fullmesh(cluster) && max_steps > 0) {
+	double deadline = swim_time() + timeout;
+	while (! swim_cluster_is_fullmesh(cluster)) {
+		if (swim_time() >= deadline)
+			return -1;
 		swim_do_loop_step(loop());
-		--max_steps;
 	}
-	return max_steps < 0 ? -1 : 0;
+	return 0;
 }
 
 bool
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 56036422d..90962b658 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -74,11 +74,9 @@ swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id);
 bool
 swim_cluster_is_fullmesh(struct swim_cluster *cluster);
 
-/**
- * Wait for fullmesh at most @a max_steps event loop iterations.
- */
+/** Wait for fullmesh at most @a timeout fake seconds. */
 int
-swim_cluster_wait_fullmesh(struct swim_cluster *cluster, int max_steps);
+swim_cluster_wait_fullmesh(struct swim_cluster *cluster, double timeout);
 
 #define swim_start_test(n) { \
 	header(); \
-- 
2.17.2 (Apple Git-113)





More information about the Tarantool-patches mailing list