[PATCH v4 11/12] [RAW] swim: introduce broadcast tasks
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Jan 31 00:28:29 MSK 2019
When a cluster is just created, no one knows anyone. Broadcast
helps to establish some initial relationships between members.
Part of #3234
---
src/lib/swim/swim_io.c | 97 ++++++++++++++++++++++++++++++++++++++++++
src/lib/swim/swim_io.h | 22 ++++++++++
2 files changed, 119 insertions(+)
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index e62b7126f..7bdb4bee0 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -32,6 +32,8 @@
#include "swim_proto.h"
#include "fiber.h"
#include "sio.h"
+#include <ifaddrs.h>
+#include <net/if.h>
/**
* Allocate memory for meta. The same as mere alloc, but moves
@@ -147,6 +149,101 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst,
swim_task_schedule(task, scheduler);
}
+/** Delete a broadcast task. */
+static void
+swim_bcast_task_delete(struct swim_bcast_task *task)
+{
+ freeifaddrs(task->addrs);
+ swim_task_destroy(&task->base);
+ free(task);
+}
+
+/** Delete broadcast task on its cancelation. */
+static void
+swim_bcast_task_delete_cb(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) scheduler;
+ (void) rc;
+ swim_bcast_task_delete((struct swim_bcast_task *) task);
+}
+
+/**
+ * Write down a next available broadcast address into the task
+ * destination field.
+ * @param task Broadcast task to update.
+ * @retval 0 Success. 'dst' field is updated.
+ * @retval -1 No more addresses.
+ */
+static int
+swim_bcast_task_next_addr(struct swim_bcast_task *task)
+{
+ for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
+ int flags = i->ifa_flags;
+ if ((flags & IFF_UP) == 0)
+ continue;
+
+ if ((flags & IFF_BROADCAST) != 0 &&
+ (i->ifa_broadaddr->sa_family == AF_INET))
+ task->base.dst =
+ *(struct sockaddr_in *) i->ifa_broadaddr;
+ else if (i->ifa_addr != NULL &&
+ i->ifa_addr->sa_family == AF_INET)
+ task->base.dst = *(struct sockaddr_in *) i->ifa_addr;
+ else
+ continue;
+ task->base.dst.sin_port = task->port;
+ task->i = task->i->ifa_next;
+ return 0;
+ }
+ return -1;
+}
+
+/**
+ * Callback on a send completion. If there are more broadcast
+ * addresses to use, then the task is rescheduled. Else deleted.
+ */
+static void
+swim_bcast_task_complete(struct swim_task *base_task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ struct swim_bcast_task *task = (struct swim_bcast_task *) base_task;
+ if (swim_bcast_task_next_addr(task) != 0)
+ swim_bcast_task_delete(task);
+ else
+ swim_task_schedule(base_task, scheduler);
+}
+
+struct swim_bcast_task *
+swim_bcast_task_new(int port)
+{
+ struct swim_bcast_task *task =
+ (struct swim_bcast_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ struct ifaddrs *addrs;
+ if (getifaddrs(&addrs) != 0) {
+ diag_set(SystemError, "error in getifaddrs");
+ free(task);
+ return NULL;
+ }
+ task->port = port;
+ task->addrs = addrs;
+ task->i = addrs;
+ swim_task_create(&task->base, swim_bcast_task_complete,
+ swim_bcast_task_delete_cb);
+ if (swim_bcast_task_next_addr(task) != 0) {
+ diag_set(SwimError, "broadcast has failed - no available "\
+ "interfaces");
+ swim_bcast_task_delete(task);
+ return NULL;
+ }
+ return task;
+}
+
/**
* Dispatch a next output event. Build packet meta and send the
* packet.
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index c13b5d14f..0341d8e3c 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -43,6 +43,7 @@
*/
struct swim_task;
+struct ifaddrs;
struct swim_scheduler;
enum {
@@ -257,4 +258,25 @@ swim_task_destroy(struct swim_task *task)
rlist_del_entry(task, in_queue_output);
}
+/**
+ * Broadcast task. Besides usual task fields, stores a list of
+ * interfaces available for broadcast packets. The task works
+ * multiple times, each time sending a packet to one interface.
+ * After completion it is self-deleted.
+ */
+struct swim_bcast_task {
+ /** Base structure. */
+ struct swim_task base;
+ /** Port to use for broadcast. */
+ int port;
+ /** A list of interfaces. */
+ struct ifaddrs *addrs;
+ /** A next interface to send to. */
+ struct ifaddrs *i;
+};
+
+/** Create a new broadcast task with a specified port. */
+struct swim_bcast_task *
+swim_bcast_task_new(int port);
+
#endif /* TARANTOOL_SWIM_IO_H_INCLUDED */
\ No newline at end of file
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list