* [tarantool-patches] [PATCH 1/2] swim: introduce broadcast tasks
2019-04-10 17:45 [tarantool-patches] [PATCH 0/2] swim broadcast Vladislav Shpilevoy
@ 2019-04-10 17:45 ` Vladislav Shpilevoy
2019-04-11 12:57 ` [tarantool-patches] " Konstantin Osipov
2019-04-10 17:45 ` [tarantool-patches] [PATCH 2/2] swim: expose ping broadcast API Vladislav Shpilevoy
2019-04-10 18:11 ` [tarantool-patches] Re: [PATCH 0/2] swim broadcast Konstantin Osipov
2 siblings, 1 reply; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-10 17:45 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
When a cluster is just created, no one knows anyone. Broadcast
helps to establish some initial relationships between members.
This commit introduces only an interface to create broadcast
tasks from SWIM code. The next commit uses this interface to
implement ping broadcast.
Part of #3234
---
src/lib/swim/swim_io.c | 97 +++++++++++++++++++++++++++++++
src/lib/swim/swim_io.h | 24 ++++++++
src/lib/swim/swim_transport.h | 13 +++++
src/lib/swim/swim_transport_udp.c | 17 ++++++
test/unit/swim_test_transport.c | 92 +++++++++++++++++++++++++++--
5 files changed, 237 insertions(+), 6 deletions(-)
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 56c2facc8..03d79ae3e 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -33,6 +33,8 @@
#include "swim_ev.h"
#include "fiber.h"
#include "sio.h"
+#include <ifaddrs.h>
+#include <net/if.h>
/**
* Allocate memory for meta. The same as mere alloc, but moves
@@ -105,6 +107,101 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst,
swim_task_schedule(task, scheduler);
}
+/** Delete a broadcast task. */
+static void
+swim_bcast_task_delete(struct swim_bcast_task *task)
+{
+ swim_freeifaddrs(task->addrs);
+ swim_task_destroy(&task->base);
+ free(task);
+}
+
+/** Delete broadcast task on its cancelation. */
+static void
+swim_bcast_task_delete_cb(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) scheduler;
+ (void) rc;
+ swim_bcast_task_delete((struct swim_bcast_task *) task);
+}
+
+/**
+ * Write down a next available broadcast address into the task
+ * destination field.
+ * @param task Broadcast task to update.
+ * @retval 0 Success. @a dst field is updated.
+ * @retval -1 No more addresses.
+ */
+static int
+swim_bcast_task_next_addr(struct swim_bcast_task *task)
+{
+ for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
+ int flags = i->ifa_flags;
+ if ((flags & IFF_UP) == 0)
+ continue;
+
+ if ((flags & IFF_BROADCAST) != 0 &&
+ i->ifa_broadaddr->sa_family == AF_INET) {
+ task->base.dst =
+ *(struct sockaddr_in *) i->ifa_broadaddr;
+ } else if (i->ifa_addr != NULL &&
+ i->ifa_addr->sa_family == AF_INET) {
+ task->base.dst = *(struct sockaddr_in *) i->ifa_addr;
+ } else {
+ continue;
+ }
+ task->base.dst.sin_port = task->port;
+ task->i = task->i->ifa_next;
+ return 0;
+ }
+ return -1;
+}
+
+/**
+ * Callback on send completion. If there are more broadcast
+ * addresses to use, then the task is rescheduled.
+ */
+static void
+swim_bcast_task_complete(struct swim_task *base_task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ struct swim_bcast_task *task = (struct swim_bcast_task *) base_task;
+ if (swim_bcast_task_next_addr(task) != 0)
+ swim_bcast_task_delete(task);
+ else
+ swim_task_schedule(base_task, scheduler);
+}
+
+struct swim_bcast_task *
+swim_bcast_task_new(int port, const char *desc)
+{
+ struct swim_bcast_task *task =
+ (struct swim_bcast_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ struct ifaddrs *addrs;
+ if (swim_getifaddrs(&addrs) != 0) {
+ free(task);
+ return NULL;
+ }
+ task->port = htons(port);
+ task->addrs = addrs;
+ task->i = addrs;
+ swim_task_create(&task->base, swim_bcast_task_complete,
+ swim_bcast_task_delete_cb, desc);
+ if (swim_bcast_task_next_addr(task) != 0) {
+ diag_set(SwimError, "broadcast has failed - no available "\
+ "interfaces");
+ swim_bcast_task_delete(task);
+ return NULL;
+ }
+ return task;
+}
+
/**
* Scheduler fd mainly is needed to be printed into the logs in
* order to distinguish between different SWIM instances logs.
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index a6032127d..30acd491f 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -247,4 +247,28 @@ swim_task_destroy(struct swim_task *task)
rlist_del_entry(task, in_queue_output);
}
+/**
+ * Broadcast task. Besides usual task fields, stores a list of
+ * interfaces available for broadcast packets. The task works
+ * multiple times, each time sending a packet to one interface.
+ * After completion it is self-deleted.
+ */
+struct swim_bcast_task {
+ /** Base structure. */
+ struct swim_task base;
+ /** Port to use for broadcast, in network byte order. */
+ int port;
+ /** A list of interfaces. */
+ struct ifaddrs *addrs;
+ /** A next interface to send to. */
+ struct ifaddrs *i;
+};
+
+/**
+ * Create a new broadcast task with a specified port. The port is
+ * expected to have host byte order.
+ */
+struct swim_bcast_task *
+swim_bcast_task_new(int port, const char *desc);
+
#endif /* TARANTOOL_SWIM_IO_H_INCLUDED */
\ No newline at end of file
diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h
index 18c92c981..6f4370087 100644
--- a/src/lib/swim/swim_transport.h
+++ b/src/lib/swim/swim_transport.h
@@ -34,6 +34,8 @@
#include <netinet/in.h>
#include <arpa/inet.h>
+struct ifaddrs;
+
/** Transport implementation. */
struct swim_transport {
/** Socket. */
@@ -77,4 +79,15 @@ swim_transport_destroy(struct swim_transport *transport);
void
swim_transport_create(struct swim_transport *transport);
+/**
+ * Get a list of network interfaces. Just a wrapper around
+ * getifaddrs, but setting diag.
+ */
+int
+swim_getifaddrs(struct ifaddrs **ifaddrs);
+
+/** Delete an interface list created earlier with getifaddrs. */
+void
+swim_freeifaddrs(struct ifaddrs *ifaddrs);
+
#endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */
diff --git a/src/lib/swim/swim_transport_udp.c b/src/lib/swim/swim_transport_udp.c
index f8fbae102..f017eeaf0 100644
--- a/src/lib/swim/swim_transport_udp.c
+++ b/src/lib/swim/swim_transport_udp.c
@@ -31,6 +31,8 @@
#include "swim_transport.h"
#include "evio.h"
#include "diag.h"
+#include <ifaddrs.h>
+#include <net/if.h>
ssize_t
swim_transport_send(struct swim_transport *transport, const void *data,
@@ -110,3 +112,18 @@ swim_transport_create(struct swim_transport *transport)
transport->fd = -1;
memset(&transport->addr, 0, sizeof(transport->addr));
}
+
+int
+swim_getifaddrs(struct ifaddrs **ifaddrs)
+{
+ if (getifaddrs(ifaddrs) == 0)
+ return 0;
+ diag_set(SystemError, "failed to take an interface list by getifaddrs");
+ return -1;
+}
+
+void
+swim_freeifaddrs(struct ifaddrs *ifaddrs)
+{
+ freeifaddrs(ifaddrs);
+}
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
index 78fda587a..83eeba2d4 100644
--- a/test/unit/swim_test_transport.c
+++ b/test/unit/swim_test_transport.c
@@ -34,6 +34,8 @@
#include "fiber.h"
#include <errno.h>
#include <sys/socket.h>
+#include <ifaddrs.h>
+#include <net/if.h>
enum {
/**
@@ -86,6 +88,18 @@ swim_test_packet_delete(struct swim_test_packet *p)
free(p);
}
+/** Fully duplicate a packet on new memory. */
+static inline struct swim_test_packet *
+swim_test_packet_dup(struct swim_test_packet *p)
+{
+ int size = sizeof(struct swim_test_packet) + p->size;
+ struct swim_test_packet *res = (struct swim_test_packet *) malloc(size);
+ assert(res != NULL);
+ memcpy(res, p, size);
+ rlist_create(&res->in_queue);
+ return res;
+}
+
/** Fake file descriptor. */
struct swim_fd {
/** File descriptor number visible to libev. */
@@ -289,19 +303,42 @@ swim_test_is_drop(double rate)
return ((double) rand() / RAND_MAX) * 100 < rate;
}
+/**
+ * Move @a p packet, originated from @a src descriptor's send
+ * queue, to @a dst descriptor's recv queue. The function checks
+ * if @a dst is opened, and tries a chance to drop the packet, if
+ * drop rate is not 0.
+ */
+static inline void
+swim_move_packet(struct swim_fd *src, struct swim_fd *dst,
+ struct swim_test_packet *p)
+{
+ if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) &&
+ !swim_test_is_drop(src->drop_rate))
+ rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
+ else
+ swim_test_packet_delete(p);
+}
+
static inline void
swim_fd_send_packet(struct swim_fd *fd)
{
assert(! rlist_empty(&fd->send_queue));
- struct swim_test_packet *p =
+ struct swim_fd *dst;
+ struct swim_test_packet *dup, *p =
rlist_shift_entry(&fd->send_queue, struct swim_test_packet,
in_queue);
- struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)];
- if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) &&
- !swim_test_is_drop(fd->drop_rate))
- rlist_add_tail_entry(&dst->recv_queue, p, in_queue);
- else
+ if (p->dst.sin_addr.s_addr == INADDR_BROADCAST &&
+ p->dst.sin_port == 0) {
+ rlist_foreach_entry(dst, &swim_fd_active, in_active) {
+ dup = swim_test_packet_dup(p);
+ swim_move_packet(fd, dst, dup);
+ }
swim_test_packet_delete(p);
+ } else {
+ dst = &swim_fd[ntohs(p->dst.sin_port)];
+ swim_move_packet(fd, dst, p);
+ }
}
/**
@@ -350,3 +387,46 @@ swim_test_transport_do_loop_step(struct ev_loop *loop)
*/
} while (ev_pending_count(loop) > 0);
}
+
+int
+swim_getifaddrs(struct ifaddrs **ifaddrs)
+{
+ /*
+ * This is a fake implementation of getifaddrs. It always
+ * returns two interfaces. First is a normal broadcast,
+ * which is later used to send a packet to all the opened
+ * descriptors. Second is a dummy interface leading to
+ * nowhere. The latter is used just for testing that the
+ * real SWIM code correctly iterates through the
+ * interface list.
+ */
+ int size = (sizeof(struct ifaddrs) + sizeof(struct sockaddr_in)) * 2;
+ struct ifaddrs *iface = (struct ifaddrs *) calloc(1, size);
+ assert(iface != NULL);
+ struct ifaddrs *iface_next = &iface[1];
+ iface->ifa_next = iface_next;
+
+ struct sockaddr_in *broadaddr = (struct sockaddr_in *) &iface_next[1];
+ broadaddr->sin_family = AF_INET;
+ broadaddr->sin_addr.s_addr = INADDR_BROADCAST;
+ iface->ifa_flags = IFF_UP | IFF_BROADCAST;
+ iface->ifa_broadaddr = (struct sockaddr *) broadaddr;
+
+ struct sockaddr_in *dummy_addr = &broadaddr[1];
+ dummy_addr->sin_family = AF_INET;
+ iface_next->ifa_flags = IFF_UP;
+ iface_next->ifa_addr = (struct sockaddr *) dummy_addr;
+
+ *ifaddrs = iface;
+ return 0;
+}
+
+void
+swim_freeifaddrs(struct ifaddrs *ifaddrs)
+{
+ /*
+ * The whole list is packed into a single allocation
+ * above.
+ */
+ free(ifaddrs);
+}
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] [PATCH 2/2] swim: expose ping broadcast API
2019-04-10 17:45 [tarantool-patches] [PATCH 0/2] swim broadcast Vladislav Shpilevoy
2019-04-10 17:45 ` [tarantool-patches] [PATCH 1/2] swim: introduce broadcast tasks Vladislav Shpilevoy
@ 2019-04-10 17:45 ` Vladislav Shpilevoy
2019-04-11 13:00 ` [tarantool-patches] " Konstantin Osipov
2019-04-10 18:11 ` [tarantool-patches] Re: [PATCH 0/2] swim broadcast Konstantin Osipov
2 siblings, 1 reply; 11+ messages in thread
From: Vladislav Shpilevoy @ 2019-04-10 17:45 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
The previous commit has introduced an API to broadcast SWIM
packets. This commit harnesses it in orider to allow user to do
initial discovery in a cluster, when member tables are empty, and
UUIDs aren't ready at hand.
Part of #3234
---
src/lib/swim/swim.c | 13 +++++++++++++
src/lib/swim/swim.h | 7 +++++++
test/unit/swim.c | 34 +++++++++++++++++++++++++++++++++-
test/unit/swim.result | 12 +++++++++++-
4 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index c64b8df3a..db21f54c5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1526,6 +1526,19 @@ swim_probe_member(struct swim *swim, const char *uri)
return 0;
}
+int
+swim_broadcast(struct swim *swim, int port)
+{
+ assert(swim_is_configured(swim));
+ if (port < 0)
+ port = ntohs(swim->self->addr.sin_port);
+ struct swim_bcast_task *t = swim_bcast_task_new(port, "broadcast ping");
+ if (t == NULL)
+ return -1;
+ swim_send_ping(swim, &t->base, &t->base.dst);
+ return 0;
+}
+
void
swim_info(struct swim *swim, struct info_handler *info)
{
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index f8dfdde87..60f5a8a1d 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -136,6 +136,13 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid);
int
swim_probe_member(struct swim *swim, const char *uri);
+/**
+ * Broadcast a ping to all interfaces on a specified @a port. If
+ * @a port is < 0, then a port of the SWIM instance is used.
+ */
+int
+swim_broadcast(struct swim *swim, int port);
+
/** Dump member statuses into @a info. */
void
swim_info(struct swim *swim, struct info_handler *info);
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 499b2ef52..736db4da1 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -33,6 +33,7 @@
#include "fiber.h"
#include "uuid/tt_uuid.h"
#include "unit.h"
+#include "uri/uri.h"
#include "swim/swim.h"
#include "swim/swim_ev.h"
#include "swim_test_transport.h"
@@ -552,10 +553,40 @@ swim_test_quit(void)
swim_finish_test();
}
+static void
+swim_test_broadcast(void)
+{
+ swim_start_test(6);
+ int size = 4;
+ struct swim_cluster *cluster = swim_cluster_new(size);
+ struct swim *s0 = swim_cluster_node(cluster, 0);
+ struct swim *s1 = swim_cluster_node(cluster, 1);
+ const char *s1_uri = swim_member_uri(swim_self(s1));
+ struct uri u;
+ fail_if(uri_parse(&u, s1_uri) != 0 || u.service == NULL);
+ int port = atoi(u.service);
+ is(swim_broadcast(s0, port), 0, "S1 chooses to broadcast with port %d",
+ port);
+ is(swim_cluster_wait_status(cluster, 1, 0, MEMBER_ALIVE, 1), 0,
+ "S2 receives the broadcast from S1");
+ swim_run_for(1);
+ is(swim_cluster_member_status(cluster, 2, 0), swim_member_status_MAX,
+ "others don't");
+
+ is(swim_broadcast(s0, 0), 0, "S1 broadcasts ping without port");
+ is(swim_cluster_wait_status_everywhere(cluster, 0, MEMBER_ALIVE, 0), 0,
+ "now everyone sees S1");
+ is(swim_cluster_wait_fullmesh(cluster, size), 0,
+ "fullmesh is reached, and no one link was added explicitly");
+
+ swim_cluster_delete(cluster);
+ swim_finish_test();
+}
+
static int
main_f(va_list ap)
{
- swim_start_test(13);
+ swim_start_test(14);
(void) ap;
swim_test_ev_init();
@@ -574,6 +605,7 @@ main_f(va_list ap)
swim_test_undead();
swim_test_packet_loss();
swim_test_quit();
+ swim_test_broadcast();
swim_test_transport_free();
swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 7277f2ee6..73f2706b1 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
*** main_f ***
-1..13
+1..14
*** swim_test_one_link ***
1..6
ok 1 - no rounds - no fullmesh
@@ -131,4 +131,14 @@ ok 12 - subtests
ok 9 - and still is not added to S2 - left members can not be added
ok 13 - subtests
*** swim_test_quit: done ***
+ *** swim_test_broadcast ***
+ 1..6
+ ok 1 - S1 chooses to broadcast with port 2
+ ok 2 - S2 receives the broadcast from S1
+ ok 3 - others don't
+ ok 4 - S1 broadcasts ping without port
+ ok 5 - now everyone sees S1
+ ok 6 - fullmesh is reached, and no one link was added explicitly
+ok 14 - subtests
+ *** swim_test_broadcast: done ***
*** main_f: done ***
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 11+ messages in thread