From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org Cc: kostja@tarantool.org Subject: [tarantool-patches] [PATCH 1/2] swim: introduce broadcast tasks Date: Wed, 10 Apr 2019 20:45:33 +0300 [thread overview] Message-ID: <e74513fc0f8d01fc5298cfc7658d59bd657184d1.1554918303.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1554918303.git.v.shpilevoy@tarantool.org> In-Reply-To: <cover.1554918303.git.v.shpilevoy@tarantool.org> When a cluster is just created, no one knows anyone. Broadcast helps to establish some initial relationships between members. This commit introduces only an interface to create broadcast tasks from SWIM code. The next commit uses this interface to implement ping broadcast. Part of #3234 --- src/lib/swim/swim_io.c | 97 +++++++++++++++++++++++++++++++ src/lib/swim/swim_io.h | 24 ++++++++ src/lib/swim/swim_transport.h | 13 +++++ src/lib/swim/swim_transport_udp.c | 17 ++++++ test/unit/swim_test_transport.c | 92 +++++++++++++++++++++++++++-- 5 files changed, 237 insertions(+), 6 deletions(-) diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c index 56c2facc8..03d79ae3e 100644 --- a/src/lib/swim/swim_io.c +++ b/src/lib/swim/swim_io.c @@ -33,6 +33,8 @@ #include "swim_ev.h" #include "fiber.h" #include "sio.h" +#include <ifaddrs.h> +#include <net/if.h> /** * Allocate memory for meta. The same as mere alloc, but moves @@ -105,6 +107,101 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst, swim_task_schedule(task, scheduler); } +/** Delete a broadcast task. */ +static void +swim_bcast_task_delete(struct swim_bcast_task *task) +{ + swim_freeifaddrs(task->addrs); + swim_task_destroy(&task->base); + free(task); +} + +/** Delete broadcast task on its cancelation. */ +static void +swim_bcast_task_delete_cb(struct swim_task *task, + struct swim_scheduler *scheduler, int rc) +{ + (void) scheduler; + (void) rc; + swim_bcast_task_delete((struct swim_bcast_task *) task); +} + +/** + * Write down a next available broadcast address into the task + * destination field. + * @param task Broadcast task to update. + * @retval 0 Success. @a dst field is updated. + * @retval -1 No more addresses. + */ +static int +swim_bcast_task_next_addr(struct swim_bcast_task *task) +{ + for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) { + int flags = i->ifa_flags; + if ((flags & IFF_UP) == 0) + continue; + + if ((flags & IFF_BROADCAST) != 0 && + i->ifa_broadaddr->sa_family == AF_INET) { + task->base.dst = + *(struct sockaddr_in *) i->ifa_broadaddr; + } else if (i->ifa_addr != NULL && + i->ifa_addr->sa_family == AF_INET) { + task->base.dst = *(struct sockaddr_in *) i->ifa_addr; + } else { + continue; + } + task->base.dst.sin_port = task->port; + task->i = task->i->ifa_next; + return 0; + } + return -1; +} + +/** + * Callback on send completion. If there are more broadcast + * addresses to use, then the task is rescheduled. + */ +static void +swim_bcast_task_complete(struct swim_task *base_task, + struct swim_scheduler *scheduler, int rc) +{ + (void) rc; + struct swim_bcast_task *task = (struct swim_bcast_task *) base_task; + if (swim_bcast_task_next_addr(task) != 0) + swim_bcast_task_delete(task); + else + swim_task_schedule(base_task, scheduler); +} + +struct swim_bcast_task * +swim_bcast_task_new(int port, const char *desc) +{ + struct swim_bcast_task *task = + (struct swim_bcast_task *) malloc(sizeof(*task)); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), "malloc", "task"); + return NULL; + } + struct ifaddrs *addrs; + if (swim_getifaddrs(&addrs) != 0) { + free(task); + return NULL; + } + task->port = htons(port); + task->addrs = addrs; + task->i = addrs; + swim_task_create(&task->base, swim_bcast_task_complete, + swim_bcast_task_delete_cb, desc); + if (swim_bcast_task_next_addr(task) != 0) { + diag_set(SwimError, "broadcast has failed - no available "\ + "interfaces"); + swim_bcast_task_delete(task); + return NULL; + } + return task; +} + /** * Scheduler fd mainly is needed to be printed into the logs in * order to distinguish between different SWIM instances logs. diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h index a6032127d..30acd491f 100644 --- a/src/lib/swim/swim_io.h +++ b/src/lib/swim/swim_io.h @@ -247,4 +247,28 @@ swim_task_destroy(struct swim_task *task) rlist_del_entry(task, in_queue_output); } +/** + * Broadcast task. Besides usual task fields, stores a list of + * interfaces available for broadcast packets. The task works + * multiple times, each time sending a packet to one interface. + * After completion it is self-deleted. + */ +struct swim_bcast_task { + /** Base structure. */ + struct swim_task base; + /** Port to use for broadcast, in network byte order. */ + int port; + /** A list of interfaces. */ + struct ifaddrs *addrs; + /** A next interface to send to. */ + struct ifaddrs *i; +}; + +/** + * Create a new broadcast task with a specified port. The port is + * expected to have host byte order. + */ +struct swim_bcast_task * +swim_bcast_task_new(int port, const char *desc); + #endif /* TARANTOOL_SWIM_IO_H_INCLUDED */ \ No newline at end of file diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h index 18c92c981..6f4370087 100644 --- a/src/lib/swim/swim_transport.h +++ b/src/lib/swim/swim_transport.h @@ -34,6 +34,8 @@ #include <netinet/in.h> #include <arpa/inet.h> +struct ifaddrs; + /** Transport implementation. */ struct swim_transport { /** Socket. */ @@ -77,4 +79,15 @@ swim_transport_destroy(struct swim_transport *transport); void swim_transport_create(struct swim_transport *transport); +/** + * Get a list of network interfaces. Just a wrapper around + * getifaddrs, but setting diag. + */ +int +swim_getifaddrs(struct ifaddrs **ifaddrs); + +/** Delete an interface list created earlier with getifaddrs. */ +void +swim_freeifaddrs(struct ifaddrs *ifaddrs); + #endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */ diff --git a/src/lib/swim/swim_transport_udp.c b/src/lib/swim/swim_transport_udp.c index f8fbae102..f017eeaf0 100644 --- a/src/lib/swim/swim_transport_udp.c +++ b/src/lib/swim/swim_transport_udp.c @@ -31,6 +31,8 @@ #include "swim_transport.h" #include "evio.h" #include "diag.h" +#include <ifaddrs.h> +#include <net/if.h> ssize_t swim_transport_send(struct swim_transport *transport, const void *data, @@ -110,3 +112,18 @@ swim_transport_create(struct swim_transport *transport) transport->fd = -1; memset(&transport->addr, 0, sizeof(transport->addr)); } + +int +swim_getifaddrs(struct ifaddrs **ifaddrs) +{ + if (getifaddrs(ifaddrs) == 0) + return 0; + diag_set(SystemError, "failed to take an interface list by getifaddrs"); + return -1; +} + +void +swim_freeifaddrs(struct ifaddrs *ifaddrs) +{ + freeifaddrs(ifaddrs); +} diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c index 78fda587a..83eeba2d4 100644 --- a/test/unit/swim_test_transport.c +++ b/test/unit/swim_test_transport.c @@ -34,6 +34,8 @@ #include "fiber.h" #include <errno.h> #include <sys/socket.h> +#include <ifaddrs.h> +#include <net/if.h> enum { /** @@ -86,6 +88,18 @@ swim_test_packet_delete(struct swim_test_packet *p) free(p); } +/** Fully duplicate a packet on new memory. */ +static inline struct swim_test_packet * +swim_test_packet_dup(struct swim_test_packet *p) +{ + int size = sizeof(struct swim_test_packet) + p->size; + struct swim_test_packet *res = (struct swim_test_packet *) malloc(size); + assert(res != NULL); + memcpy(res, p, size); + rlist_create(&res->in_queue); + return res; +} + /** Fake file descriptor. */ struct swim_fd { /** File descriptor number visible to libev. */ @@ -289,19 +303,42 @@ swim_test_is_drop(double rate) return ((double) rand() / RAND_MAX) * 100 < rate; } +/** + * Move @a p packet, originated from @a src descriptor's send + * queue, to @a dst descriptor's recv queue. The function checks + * if @a dst is opened, and tries a chance to drop the packet, if + * drop rate is not 0. + */ +static inline void +swim_move_packet(struct swim_fd *src, struct swim_fd *dst, + struct swim_test_packet *p) +{ + if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) && + !swim_test_is_drop(src->drop_rate)) + rlist_add_tail_entry(&dst->recv_queue, p, in_queue); + else + swim_test_packet_delete(p); +} + static inline void swim_fd_send_packet(struct swim_fd *fd) { assert(! rlist_empty(&fd->send_queue)); - struct swim_test_packet *p = + struct swim_fd *dst; + struct swim_test_packet *dup, *p = rlist_shift_entry(&fd->send_queue, struct swim_test_packet, in_queue); - struct swim_fd *dst = &swim_fd[ntohs(p->dst.sin_port)]; - if (dst->is_opened && !swim_test_is_drop(dst->drop_rate) && - !swim_test_is_drop(fd->drop_rate)) - rlist_add_tail_entry(&dst->recv_queue, p, in_queue); - else + if (p->dst.sin_addr.s_addr == INADDR_BROADCAST && + p->dst.sin_port == 0) { + rlist_foreach_entry(dst, &swim_fd_active, in_active) { + dup = swim_test_packet_dup(p); + swim_move_packet(fd, dst, dup); + } swim_test_packet_delete(p); + } else { + dst = &swim_fd[ntohs(p->dst.sin_port)]; + swim_move_packet(fd, dst, p); + } } /** @@ -350,3 +387,46 @@ swim_test_transport_do_loop_step(struct ev_loop *loop) */ } while (ev_pending_count(loop) > 0); } + +int +swim_getifaddrs(struct ifaddrs **ifaddrs) +{ + /* + * This is a fake implementation of getifaddrs. It always + * returns two interfaces. First is a normal broadcast, + * which is later used to send a packet to all the opened + * descriptors. Second is a dummy interface leading to + * nowhere. The latter is used just for testing that the + * real SWIM code correctly iterates through the + * interface list. + */ + int size = (sizeof(struct ifaddrs) + sizeof(struct sockaddr_in)) * 2; + struct ifaddrs *iface = (struct ifaddrs *) calloc(1, size); + assert(iface != NULL); + struct ifaddrs *iface_next = &iface[1]; + iface->ifa_next = iface_next; + + struct sockaddr_in *broadaddr = (struct sockaddr_in *) &iface_next[1]; + broadaddr->sin_family = AF_INET; + broadaddr->sin_addr.s_addr = INADDR_BROADCAST; + iface->ifa_flags = IFF_UP | IFF_BROADCAST; + iface->ifa_broadaddr = (struct sockaddr *) broadaddr; + + struct sockaddr_in *dummy_addr = &broadaddr[1]; + dummy_addr->sin_family = AF_INET; + iface_next->ifa_flags = IFF_UP; + iface_next->ifa_addr = (struct sockaddr *) dummy_addr; + + *ifaddrs = iface; + return 0; +} + +void +swim_freeifaddrs(struct ifaddrs *ifaddrs) +{ + /* + * The whole list is packed into a single allocation + * above. + */ + free(ifaddrs); +} -- 2.17.2 (Apple Git-113)
next prev parent reply other threads:[~2019-04-10 17:45 UTC|newest] Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-04-10 17:45 [tarantool-patches] [PATCH 0/2] swim broadcast Vladislav Shpilevoy 2019-04-10 17:45 ` Vladislav Shpilevoy [this message] 2019-04-11 12:57 ` [tarantool-patches] Re: [PATCH 1/2] swim: introduce broadcast tasks Konstantin Osipov 2019-04-11 13:46 ` Vladislav Shpilevoy 2019-04-10 17:45 ` [tarantool-patches] [PATCH 2/2] swim: expose ping broadcast API Vladislav Shpilevoy 2019-04-11 13:00 ` [tarantool-patches] " Konstantin Osipov 2019-04-11 13:00 ` Konstantin Osipov 2019-04-11 13:47 ` Vladislav Shpilevoy 2019-04-10 18:11 ` [tarantool-patches] Re: [PATCH 0/2] swim broadcast Konstantin Osipov 2019-04-10 18:13 ` Vladislav Shpilevoy 2019-04-10 22:13 ` Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=e74513fc0f8d01fc5298cfc7658d59bd657184d1.1554918303.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 1/2] swim: introduce broadcast tasks' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox