[tarantool-patches] [PATCH 1/2] swim: introduce broadcast tasks

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Apr 10 20:45:33 MSK 2019


When a cluster is just created, no one knows anyone. Broadcast
helps to establish some initial relationships between members.

This commit introduces only an interface to create broadcast
tasks from SWIM code. The next commit uses this interface to
implement ping broadcast.

Part of #3234
---
 src/lib/swim/swim_io.c            | 97 +++++++++++++++++++++++++++++++
 src/lib/swim/swim_io.h            | 24 ++++++++
 src/lib/swim/swim_transport.h     | 13 +++++
 src/lib/swim/swim_transport_udp.c | 17 ++++++
 test/unit/swim_test_transport.c   | 92 +++++++++++++++++++++++++++--
 5 files changed, 237 insertions(+), 6 deletions(-)

diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 56c2facc8..03d79ae3e 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -33,6 +33,8 @@
 #include "swim_ev.h"
 #include "fiber.h"
 #include "sio.h"
+#include <ifaddrs.h>
+#include <net/if.h>
 
 /**
  * Allocate memory for meta. The same as mere alloc, but moves
@@ -105,6 +107,101 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst,
 	swim_task_schedule(task, scheduler);
 }
 
+/** Delete a broadcast task. */
+static void
+swim_bcast_task_delete(struct swim_bcast_task *task)
+{
+	swim_freeifaddrs(task->addrs);
+	swim_task_destroy(&task->base);
+	free(task);
+}
+
+/** Delete broadcast task on its cancelation. */
+static void
+swim_bcast_task_delete_cb(struct swim_task *task,
+			  struct swim_scheduler *scheduler, int rc)
+{
+	(void) scheduler;
+	(void) rc;
+	swim_bcast_task_delete((struct swim_bcast_task *) task);
+}
+
+/**
+ * Write down a next available broadcast address into the task
+ * destination field.
+ * @param task Broadcast task to update.
+ * @retval 0 Success. @a dst field is updated.
+ * @retval -1 No more addresses.
+ */
+static int
+swim_bcast_task_next_addr(struct swim_bcast_task *task)
+{
+	for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
+		int flags = i->ifa_flags;
+		if ((flags & IFF_UP) == 0)
+			continue;
+
+		if ((flags & IFF_BROADCAST) != 0 &&
+		    i->ifa_broadaddr->sa_family == AF_INET) {
+			task->base.dst =
+				*(struct sockaddr_in *) i->ifa_broadaddr;
+		} else if (i->ifa_addr != NULL &&
+			   i->ifa_addr->sa_family == AF_INET) {
+			task->base.dst = *(struct sockaddr_in *) i->ifa_addr;
+		} else {
+			continue;
+		}
+		task->base.dst.sin_port = task->port;
+		task->i = task->i->ifa_next;
+		return 0;
+	}
+	return -1;
+}
+
+/**
+ * Callback on send completion. If there are more broadcast
+ * addresses to use, then the task is rescheduled.
+ */
+static void
+swim_bcast_task_complete(struct swim_task *base_task,
+			 struct swim_scheduler *scheduler, int rc)
+{
+	(void) rc;
+	struct swim_bcast_task *task = (struct swim_bcast_task *) base_task;
+	if (swim_bcast_task_next_addr(task) != 0)
+		swim_bcast_task_delete(task);
+	else
+		swim_task_schedule(base_task, scheduler);
+}
+
+struct swim_bcast_task *
+swim_bcast_task_new(int port, const char *desc)
+{
+	struct swim_bcast_task *task =
+		(struct swim_bcast_task *) malloc(sizeof(*task));
+	if (task == NULL) {
+		diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+		return NULL;
+	}
+	struct ifaddrs *addrs;
+	if (swim_getifaddrs(&addrs) != 0) {
+		free(task);
+		return NULL;
+	}
+	task->port = htons(port);
+	task->addrs = addrs;
+	task->i = addrs;
+	swim_task_create(&task->base, swim_bcast_task_complete,
+			 swim_bcast_task_delete_cb, desc);
+	if (swim_bcast_task_next_addr(task) != 0) {
+		diag_set(SwimError, "broadcast has failed - no available "\
+			 "interfaces");
+		swim_bcast_task_delete(task);
+		return NULL;
+	}
+	return task;
+}
+
 /**
  * Scheduler fd mainly is needed to be printed into the logs in
  * order to distinguish between different SWIM instances logs.
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index a6032127d..30acd491f 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -247,4 +247,28 @@ swim_task_destroy(struct swim_task *task)
 	rlist_del_entry(task, in_queue_output);
 }
 
+/**
+ * Broadcast task. Besides usual task fields, stores a list of
+ * interfaces available for broadcast packets. The task works
+ * multiple times, each time sending a packet to one interface.
+ * After completion it is self-deleted.
+ */
+struct swim_bcast_task {
+	/** Base structure. */
+	struct swim_task base;
+	/** Port to use for broadcast, in network byte order. */
+	int port;
+	/** A list of interfaces. */
+	struct ifaddrs *addrs;
+	/** A next interface to send to. */
+	struct ifaddrs *i;
+};
+
+/**
+ * Create a new broadcast task with a specified port. The port is
+ * expected to have host byte order.
+ */
+struct swim_bcast_task *
+swim_bcast_task_new(int port, const char *desc);
+
 #endif /* TARANTOOL_SWIM_IO_H_INCLUDED */
\ No newline at end of file
diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h
index 18c92c981..6f4370087 100644
--- a/src/lib/swim/swim_transport.h
+++ b/src/lib/swim/swim_transport.h
@@ -34,6 +34,8 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 
+struct ifaddrs;
+
 /** Transport implementation. */
 struct swim_transport {
 	/** Socket. */
@@ -77,4 +79,15 @@ swim_transport_destroy(struct swim_transport *transport);
 void
 swim_transport_create(struct swim_transport *transport);
 
+/**
+ * Get a list of network interfaces. Just a wrapper around
+ * getifaddrs, but setting diag.
+ */
+int
+swim_getifaddrs(struct ifaddrs **ifaddrs);
+
+/** Delete an interface list created earlier with getifaddrs. */
+void
+swim_freeifaddrs(struct ifaddrs *ifaddrs);
+
 #endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */
diff --git a/src/lib/swim/swim_transport_udp.c b/src/lib/swim/swim_transport_udp.c
index f8fbae102..f017eeaf0 100644
--- a/src/lib/swim/swim_transport_udp.c
+++ b/src/lib/swim/swim_transport_udp.c
@@ -31,6 +31,8 @@
 #include "swim_transport.h"
 #include "evio.h"
 #include "diag.h"
+#include <ifaddrs.h>
+#include <net/if.h>
 
 ssize_t
 swim_transport_send(struct swim_transport *transport, const void *data,
@@ -110,3 +112,18 @@ swim_transport_create(struct swim_transport *transport)
 	transport->fd = -1;
 	memset(&transport->addr, 0, sizeof(transport->addr));
 }
+
+int
+swim_getifaddrs(struct ifaddrs **ifaddrs)
+{
+	if (getifaddrs(ifaddrs) == 0)
+		return 0;
+	diag_set(SystemError, "failed to take an interface list by getifaddrs");
+	return -1;
+}
+
+void
+swim_freeifaddrs(struct ifaddrs *ifaddrs)
+{
+	freeifaddrs(ifaddrs);
+}
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index 78fda587a..83eeba2d4 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -34,6 +34,8 @@
 #include "fiber.h"
 #include <errno.h>
 #include <sys/socket.h>
+#include <ifaddrs.h>
+#include <net/if.h>
 
 enum {
 	/**
@@ -86,6 +88,18 @@ swim_test_packet_delete(struct swim_test_packet *p)
 	free(p);
 }
 
+/** Fully duplicate a packet on new memory. */
+static inline struct swim_test_packet *
+swim_test_packet_dup(struct swim_test_packet *p)
+{
+	int size = sizeof(struct swim_test_packet) + p->size;
+	struct swim_test_packet *res = (struct swim_test_packet *) malloc(size);
+	assert(res != NULL);
+	memcpy(res, p, size);
+	rlist_create(&res->in_queue);
+	return res;
+}
+
 /** Fake file descriptor. */
 struct swim_fd {
 	/** File descriptor number visible to libev. */
@@ -289,19 +303,42 @@ swim_test_is_drop(double rate)
 	return ((double) rand() / RAND_MAX) * 100 < rate;
 }
 
+/**
+ * Move @a p packet, originated from @a src descriptor's send
+ * queue, to @a dst descriptor's recv queue. The function checks
+ * if @a dst is opened, and tries a chance to drop the packet, if
+ * drop rate is not 0.
+ */
+static inline void
+swim_move_packet(struct swim_fd *src, struct swim_fd *dst,
+		 struct swim_test_packet *p)
+{
+	if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) &&
+	    !swim_test_is_drop(src->drop_rate))
+		rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
+	else
+		swim_test_packet_delete(p);
+}
+
 static inline void
 swim_fd_send_packet(struct swim_fd *fd)
 {
 	assert(! rlist_empty(&fd->send_queue));
-	struct swim_test_packet *p =
+	struct swim_fd *dst;
+	struct swim_test_packet *dup, *p =
 		rlist_shift_entry(&fd->send_queue, struct swim_test_packet,
 				  in_queue);
-	struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)];
-	if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) &&
-	    !swim_test_is_drop(fd->drop_rate))
-		rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
-	else
+	if (p->dst.sin_addr.s_addr == INADDR_BROADCAST &&
+	    p->dst.sin_port == 0) {
+		rlist_foreach_entry(dst, &swim_fd_active, in_active) {
+			dup = swim_test_packet_dup(p);
+			swim_move_packet(fd, dst, dup);
+		}
 		swim_test_packet_delete(p);
+	} else {
+		dst = &swim_fd[ntohs(p->dst.sin_port)];
+		swim_move_packet(fd, dst, p);
+	}
 }
 
 /**
@@ -350,3 +387,46 @@ swim_test_transport_do_loop_step(struct ev_loop *loop)
 		 */
 	} while (ev_pending_count(loop) > 0);
 }
+
+int
+swim_getifaddrs(struct ifaddrs **ifaddrs)
+{
+	/*
+	 * This is a fake implementation of getifaddrs. It always
+	 * returns two interfaces. First is a normal broadcast,
+	 * which is later used to send a packet to all the opened
+	 * descriptors. Second is a dummy interface leading to
+	 * nowhere. The latter is used just for testing that the
+	 * real SWIM code correctly iterates through the
+	 * interface list.
+	 */
+	int size = (sizeof(struct ifaddrs) + sizeof(struct sockaddr_in)) * 2;
+	struct ifaddrs *iface = (struct ifaddrs *) calloc(1, size);
+	assert(iface != NULL);
+	struct ifaddrs *iface_next = &iface[1];
+	iface->ifa_next = iface_next;
+
+	struct sockaddr_in *broadaddr = (struct sockaddr_in *) &iface_next[1];
+	broadaddr->sin_family = AF_INET;
+	broadaddr->sin_addr.s_addr = INADDR_BROADCAST;
+	iface->ifa_flags = IFF_UP | IFF_BROADCAST;
+	iface->ifa_broadaddr = (struct sockaddr *) broadaddr;
+
+	struct sockaddr_in *dummy_addr = &broadaddr[1];
+	dummy_addr->sin_family = AF_INET;
+	iface_next->ifa_flags = IFF_UP;
+	iface_next->ifa_addr = (struct sockaddr *) dummy_addr;
+
+	*ifaddrs = iface;
+	return 0;
+}
+
+void
+swim_freeifaddrs(struct ifaddrs *ifaddrs)
+{
+	/*
+	 * The whole list is packed into a single allocation
+	 * above.
+	 */
+	free(ifaddrs);
+}
-- 
2.17.2 (Apple Git-113)





More information about the Tarantool-patches mailing list