From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 1BFB429D16 for ; Wed, 10 Apr 2019 13:45:39 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id di_CpNIji2-p for ; Wed, 10 Apr 2019 13:45:38 -0400 (EDT) Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 4FAB429BAD for ; Wed, 10 Apr 2019 13:45:38 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 1/2] swim: introduce broadcast tasks Date: Wed, 10 Apr 2019 20:45:33 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org When a cluster is just created, no one knows anyone. Broadcast helps to establish some initial relationships between members. This commit introduces only an interface to create broadcast tasks from SWIM code. The next commit uses this interface to implement ping broadcast. Part of #3234 --- src/lib/swim/swim_io.c | 97 +++++++++++++++++++++++++++++++ src/lib/swim/swim_io.h | 24 ++++++++ src/lib/swim/swim_transport.h | 13 +++++ src/lib/swim/swim_transport_udp.c | 17 ++++++ test/unit/swim_test_transport.c | 92 +++++++++++++++++++++++++++-- 5 files changed, 237 insertions(+), 6 deletions(-) diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 56c2facc8..03d79ae3e 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -33,6 +33,8 @@ #include "swim_ev.h" #include "fiber.h" #include "sio.h" +#include +#include /** * Allocate memory for meta. The same as mere alloc, but moves @@ -105,6 +107,101 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst, swim_task_schedule(task, scheduler); } +/** Delete a broadcast task. */ +static void +swim_bcast_task_delete(struct swim_bcast_task *task) +{ + swim_freeifaddrs(task->addrs); + swim_task_destroy(&task->base); + free(task); +} + +/** Delete broadcast task on its cancelation. */ +static void +swim_bcast_task_delete_cb(struct swim_task *task, + struct swim_scheduler *scheduler, int rc) +{ + (void) scheduler; + (void) rc; + swim_bcast_task_delete((struct swim_bcast_task *) task); +} + +/** + * Write down a next available broadcast address into the task + * destination field. + * @param task Broadcast task to update. + * @retval 0 Success. @a dst field is updated. + * @retval -1 No more addresses. + */ +static int +swim_bcast_task_next_addr(struct swim_bcast_task *task) +{ + for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) { + int flags = i->ifa_flags; + if ((flags & IFF_UP) == 0) + continue; + + if ((flags & IFF_BROADCAST) != 0 && + i->ifa_broadaddr->sa_family == AF_INET) { + task->base.dst = + *(struct sockaddr_in *) i->ifa_broadaddr; + } else if (i->ifa_addr != NULL && + i->ifa_addr->sa_family == AF_INET) { + task->base.dst = *(struct sockaddr_in *) i->ifa_addr; + } else { + continue; + } + task->base.dst.sin_port = task->port; + task->i = task->i->ifa_next; + return 0; + } + return -1; +} + +/** + * Callback on send completion. If there are more broadcast + * addresses to use, then the task is rescheduled. + */ +static void +swim_bcast_task_complete(struct swim_task *base_task, + struct swim_scheduler *scheduler, int rc) +{ + (void) rc; + struct swim_bcast_task *task = (struct swim_bcast_task *) base_task; + if (swim_bcast_task_next_addr(task) != 0) + swim_bcast_task_delete(task); + else + swim_task_schedule(base_task, scheduler); +} + +struct swim_bcast_task * +swim_bcast_task_new(int port, const char *desc) +{ + struct swim_bcast_task *task = + (struct swim_bcast_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return NULL; + } + struct ifaddrs *addrs; + if (swim_getifaddrs(&addrs) != 0) { + free(task); + return NULL; + } + task->port = htons(port); + task->addrs = addrs; + task->i = addrs; + swim_task_create(&task->base, swim_bcast_task_complete, + swim_bcast_task_delete_cb, desc); + if (swim_bcast_task_next_addr(task) != 0) { + diag_set(SwimError, "broadcast has failed - no available "\ + "interfaces"); + swim_bcast_task_delete(task); + return NULL; + } + return task; +} + /** * Scheduler fd mainly is needed to be printed into the logs in * order to distinguish between different SWIM instances logs. diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index a6032127d..30acd491f 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -247,4 +247,28 @@ swim_task_destroy(struct swim_task *task) rlist_del_entry(task, in_queue_output); } +/** + * Broadcast task. Besides usual task fields, stores a list of + * interfaces available for broadcast packets. The task works + * multiple times, each time sending a packet to one interface. + * After completion it is self-deleted. + */ +struct swim_bcast_task { + /** Base structure. */ + struct swim_task base; + /** Port to use for broadcast, in network byte order. */ + int port; + /** A list of interfaces. */ + struct ifaddrs *addrs; + /** A next interface to send to. */ + struct ifaddrs *i; +}; + +/** + * Create a new broadcast task with a specified port. The port is + * expected to have host byte order. + */ +struct swim_bcast_task * +swim_bcast_task_new(int port, const char *desc); + #endif /* TARANTOOL_SWIM_IO_H_INCLUDED */ \ No newline at end of file diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h index 18c92c981..6f4370087 100644 --- a/src/lib/swim/swim_transport.h +++ b/src/lib/swim/swim_transport.h @@ -34,6 +34,8 @@ #include #include +struct ifaddrs; + /** Transport implementation. */ struct swim_transport { /** Socket. */ @@ -77,4 +79,15 @@ swim_transport_destroy(struct swim_transport *transport); void swim_transport_create(struct swim_transport *transport); +/** + * Get a list of network interfaces. Just a wrapper around + * getifaddrs, but setting diag. + */ +int +swim_getifaddrs(struct ifaddrs **ifaddrs); + +/** Delete an interface list created earlier with getifaddrs. */ +void +swim_freeifaddrs(struct ifaddrs *ifaddrs); + #endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */ diff --git a/src/lib/swim/swim_transport_udp.c b/src/lib/swim/swim_transport_udp.c index f8fbae102..f017eeaf0 100644 --- a/src/lib/swim/swim_transport_udp.c +++ b/src/lib/swim/swim_transport_udp.c @@ -31,6 +31,8 @@ #include "swim_transport.h" #include "evio.h" #include "diag.h" +#include +#include ssize_t swim_transport_send(struct swim_transport *transport, const void *data, @@ -110,3 +112,18 @@ swim_transport_create(struct swim_transport *transport) transport->fd = -1; memset(&transport->addr, 0, sizeof(transport->addr)); } + +int +swim_getifaddrs(struct ifaddrs **ifaddrs) +{ + if (getifaddrs(ifaddrs) == 0) + return 0; + diag_set(SystemError, "failed to take an interface list by getifaddrs"); + return -1; +} + +void +swim_freeifaddrs(struct ifaddrs *ifaddrs) +{ + freeifaddrs(ifaddrs); +} diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index 78fda587a..83eeba2d4 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -34,6 +34,8 @@ #include "fiber.h" #include #include +#include +#include enum { /** @@ -86,6 +88,18 @@ swim_test_packet_delete(struct swim_test_packet *p) free(p); } +/** Fully duplicate a packet on new memory. */ +static inline struct swim_test_packet * +swim_test_packet_dup(struct swim_test_packet *p) +{ + int size = sizeof(struct swim_test_packet) + p->size; + struct swim_test_packet *res = (struct swim_test_packet *) malloc(size); + assert(res != NULL); + memcpy(res, p, size); + rlist_create(&res->in_queue); + return res; +} + /** Fake file descriptor. */ struct swim_fd { /** File descriptor number visible to libev. */ @@ -289,19 +303,42 @@ swim_test_is_drop(double rate) return ((double) rand() / RAND_MAX) * 100 < rate; } +/** + * Move @a p packet, originated from @a src descriptor's send + * queue, to @a dst descriptor's recv queue. The function checks + * if @a dst is opened, and tries a chance to drop the packet, if + * drop rate is not 0. + */ +static inline void +swim_move_packet(struct swim_fd *src, struct swim_fd *dst, + struct swim_test_packet *p) +{ + if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) && + !swim_test_is_drop(src->drop_rate)) + rlist_add_tail_entry(&dst->recv_queue, p, in_queue); + else + swim_test_packet_delete(p); +} + static inline void swim_fd_send_packet(struct swim_fd *fd) { assert(! rlist_empty(&fd->send_queue)); - struct swim_test_packet *p = + struct swim_fd *dst; + struct swim_test_packet *dup, *p = rlist_shift_entry(&fd->send_queue, struct swim_test_packet, in_queue); - struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)]; - if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) && - !swim_test_is_drop(fd->drop_rate)) - rlist_add_tail_entry(&dst->recv_queue, p, in_queue); - else + if (p->dst.sin_addr.s_addr == INADDR_BROADCAST && + p->dst.sin_port == 0) { + rlist_foreach_entry(dst, &swim_fd_active, in_active) { + dup = swim_test_packet_dup(p); + swim_move_packet(fd, dst, dup); + } swim_test_packet_delete(p); + } else { + dst = &swim_fd[ntohs(p->dst.sin_port)]; + swim_move_packet(fd, dst, p); + } } /** @@ -350,3 +387,46 @@ swim_test_transport_do_loop_step(struct ev_loop *loop) */ } while (ev_pending_count(loop) > 0); } + +int +swim_getifaddrs(struct ifaddrs **ifaddrs) +{ + /* + * This is a fake implementation of getifaddrs. It always + * returns two interfaces. First is a normal broadcast, + * which is later used to send a packet to all the opened + * descriptors. Second is a dummy interface leading to + * nowhere. The latter is used just for testing that the + * real SWIM code correctly iterates through the + * interface list. + */ + int size = (sizeof(struct ifaddrs) + sizeof(struct sockaddr_in)) * 2; + struct ifaddrs *iface = (struct ifaddrs *) calloc(1, size); + assert(iface != NULL); + struct ifaddrs *iface_next = &iface[1]; + iface->ifa_next = iface_next; + + struct sockaddr_in *broadaddr = (struct sockaddr_in *) &iface_next[1]; + broadaddr->sin_family = AF_INET; + broadaddr->sin_addr.s_addr = INADDR_BROADCAST; + iface->ifa_flags = IFF_UP | IFF_BROADCAST; + iface->ifa_broadaddr = (struct sockaddr *) broadaddr; + + struct sockaddr_in *dummy_addr = &broadaddr[1]; + dummy_addr->sin_family = AF_INET; + iface_next->ifa_flags = IFF_UP; + iface_next->ifa_addr = (struct sockaddr *) dummy_addr; + + *ifaddrs = iface; + return 0; +} + +void +swim_freeifaddrs(struct ifaddrs *ifaddrs) +{ + /* + * The whole list is packed into a single allocation + * above. + */ + free(ifaddrs); +} -- 2.17.2 (Apple Git-113)