* [PATCH v2 1/6] [RAW] swim: introduce SWIM's anti-entropy component
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
@ 2018-12-25 19:19 ` Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 2/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
` (4 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-25 19:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
SWIM - Scalable Weakly-consistent Infection-style Process Group
Membership Protocol. It consists of 2 components: events
dissemination and failure detection, and stores in memory a
table of known remote hosts - members. Also some SWIM
implementations have additional component: anti-entropy -
periodical broadcast of a random subset of members table.
Each SWIM component is different from others in both message
structures and goals, they even could be sent in different
messages. But SWIM describes piggybacking of messages: a ping
message can piggyback a dissemination's one. SWIM has a main
operating cycle during which it randomly chooses members from a
member table and sends them events + ping. Answers are
processed out of the main cycle asynchronously.
Random selection provides even network load about ~1 message to
each member regardless of the cluster size. Without randomness
a member would get a network load of N messages each protocol
step, since all other members will choose the same member on
each step where N is the cluster size.
Also SWIM describes a kind of fairness: when selecting a next
member to ping, the protocol prefers LRU members. In code it
would be too complicated, so Tarantool's implementation is
slightly different, easier.
Tarantool splits protocol operation into rounds. At the
beginning of a round all members are randomly reordered and
linked into a list. At each round step a member is popped from
the list head, a message is sent to him, and he waits for a next
round. In such implementation all random selection of the
original SWIM is executed once per round. The round is
'planned' actually. A list is used instead of an array since
new members can be added to its tail without realloc, and dead
members can be removed as easy as that.
Also Tarantool implements third component - anti-entropy. Why
is it needed and even vital? Consider the example: two SWIM
nodes, both are alive. Nothing happens, so the events list is
empty, only pings are being sent periodically. Then a third
node appears. It knows about one of existing nodes. How should
it learn about another one? The cluster is stable, no new
events, so the only chance is to wait until another server
stops and event about it will be broadcasted. Anti-entropy is
an extra simple component, it just piggybacks random part of
members table with each regular ping. In the example above the
new node will learn about the third one via anti-entropy
messages of the second one.
This commit introduces the first component - anti-entropy. With
this component a member can discover other members, but can not
detect who is already dead. It is a part of next commit.
Part of #3234
---
src/CMakeLists.txt | 3 +-
src/evio.c | 3 +-
src/evio.h | 4 +
src/lib/CMakeLists.txt | 1 +
src/lib/swim/CMakeLists.txt | 6 +
src/lib/swim/swim.c | 1111 +++++++++++++++++++++++++++++++++++
src/lib/swim/swim.h | 121 ++++
src/lua/init.c | 2 +
src/lua/swim.c | 243 ++++++++
src/lua/swim.h | 47 ++
10 files changed, 1538 insertions(+), 3 deletions(-)
create mode 100644 src/lib/swim/CMakeLists.txt
create mode 100644 src/lib/swim/swim.c
create mode 100644 src/lib/swim/swim.h
create mode 100644 src/lua/swim.c
create mode 100644 src/lua/swim.h
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04de5ad04..785051966 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -182,6 +182,7 @@ set (server_sources
lua/crypto.c
lua/httpc.c
lua/utf8.c
+ lua/swim.c
lua/info.c
${lua_sources}
${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc
@@ -228,7 +229,7 @@ endif()
set_source_files_compile_flags(${server_sources})
add_library(server STATIC ${server_sources})
-target_link_libraries(server core bit uri uuid ${ICU_LIBRARIES})
+target_link_libraries(server core bit uri uuid swim ${ICU_LIBRARIES})
# Rule of thumb: if exporting a symbol from a static library, list the
# library here.
diff --git a/src/evio.c b/src/evio.c
index 9ca14c45c..8610dbbe7 100644
--- a/src/evio.c
+++ b/src/evio.c
@@ -129,8 +129,7 @@ evio_setsockopt_client(int fd, int family, int type)
return 0;
}
-/** Set options for server sockets. */
-static int
+int
evio_setsockopt_server(int fd, int family, int type)
{
int on = 1;
diff --git a/src/evio.h b/src/evio.h
index 69d641a60..872a21ab6 100644
--- a/src/evio.h
+++ b/src/evio.h
@@ -157,6 +157,10 @@ evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay)
int
evio_setsockopt_client(int fd, int family, int type);
+/** Set options for server sockets. */
+int
+evio_setsockopt_server(int fd, int family, int type);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 98ff19b60..4e21e7da8 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -5,6 +5,7 @@ add_subdirectory(small)
add_subdirectory(salad)
add_subdirectory(csv)
add_subdirectory(json)
+add_subdirectory(swim)
if(ENABLE_BUNDLED_MSGPUCK)
add_subdirectory(msgpuck EXCLUDE_FROM_ALL)
endif()
diff --git a/src/lib/swim/CMakeLists.txt b/src/lib/swim/CMakeLists.txt
new file mode 100644
index 000000000..fcdfcf9e1
--- /dev/null
+++ b/src/lib/swim/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(lib_sources
+ swim.c
+)
+
+set_source_files_compile_flags(${lib_sources})
+add_library(swim STATIC ${lib_sources})
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
new file mode 100644
index 000000000..989d83a22
--- /dev/null
+++ b/src/lib/swim/swim.c
@@ -0,0 +1,1111 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim.h"
+#include "evio.h"
+#include "uri.h"
+#include "assoc.h"
+#include "fiber.h"
+#include "small/rlist.h"
+#include "msgpuck.h"
+#include "info.h"
+#include <arpa/inet.h>
+
+/**
+ * SWIM - Scalable Weakly-consistent Infection-style Process Group
+ * Membership Protocol. It consists of 2 components: events
+ * dissemination and failure detection, and stores in memory a
+ * table of known remote hosts - members. Also some SWIM
+ * implementations have an additional component: anti-entropy -
+ * periodical broadcast of a random subset of members table.
+ *
+ * Each SWIM component is different from others in both message
+ * structures and goals, they even could be sent in different
+ * messages. But SWIM describes piggybacking of messages: a ping
+ * message can piggyback a dissemination's one. SWIM has a main
+ * operating cycle during which it randomly chooses members from a
+ * member table and sends them events + ping. Answers are
+ * processed out of the main cycle asynchronously.
+ *
+ * Random selection provides even network load about ~1 message to
+ * each member regardless of the cluster size. Without randomness
+ * a member would get a network load of N messages each protocol
+ * step, since all other members will choose the same member on
+ * each step where N is the cluster size.
+ *
+ * Also SWIM describes a kind of fairness: when selecting a next
+ * member to ping, the protocol prefers LRU members. In code it
+ * would too complicated, so Tarantool's implementation is
+ * slightly different, easier.
+ *
+ * Tarantool splits protocol operation into rounds. At the
+ * beginning of a round all members are randomly reordered and
+ * linked into a list. At each round step a member is popped from
+ * the list head, a message is sent to him, and he waits for the
+ * next round. In such implementation all random selection of the
+ * original SWIM is executed once per round. The round is
+ * 'planned' actually. A list is used instead of an array since
+ * new members can be added to its tail without realloc, and dead
+ * members can be removed as easy as that.
+ *
+ * Also Tarantool implements third component - anti-entropy. Why
+ * is it needed and even vital? Consider the example: two SWIM
+ * nodes, both are alive. Nothing happens, so the events list is
+ * empty, only pings are being sent periodically. Then a third
+ * node appears. It knows about one of existing nodes. How should
+ * it learn about another one? Sure, its known counterpart can try
+ * to notify another one, but it is UDP, so this event can lost.
+ * Anti-entropy is an extra simple component, it just piggybacks
+ * random part of members table with each regular ping. In the
+ * example above the new node will learn about the third one via
+ * anti-entropy messages of the second one soon or late.
+ */
+
+enum {
+ /** How often to send membership messages and pings. */
+ HEARTBEAT_RATE_DEFAULT = 1,
+ /**
+ * Default MTU is 1500. MTU (when IPv4 is used) consists
+ * of IPv4 header, UDP header, Data. IPv4 has 20 bytes
+ * header, UDP - 8 bytes. So Data = 1500 - 20 - 8 = 1472.
+ * TODO: adapt to other MTUs which can be reduced in some
+ * networks by their admins.
+ */
+ UDP_PACKET_SIZE = 1472,
+};
+
+static ssize_t
+swim_udp_send_msg(int fd, const void *data, size_t size,
+ const struct sockaddr *addr, socklen_t addr_size)
+{
+ ssize_t ret = sio_sendto(fd, data, size, 0, addr, addr_size);
+ if (ret == -1 && sio_wouldblock(errno))
+ return 0;
+ return ret;
+}
+
+static ssize_t
+swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr,
+ socklen_t *addr_size)
+{
+ ssize_t ret = sio_recvfrom(fd, buffer, size, 0, addr, addr_size);
+ if (ret == -1 && sio_wouldblock(errno))
+ return 0;
+ return ret;
+}
+
+/** UDP sendto/recvfrom implementation of swim_transport. */
+static struct swim_transport swim_udp_transport = {
+ /* .send_round_msg = */ swim_udp_send_msg,
+ /* .recv_msg = */ swim_udp_recv_msg,
+};
+
+/**
+ * Each SWIM component in a common case independently may want to
+ * push some data into the network. Dissemination sends events,
+ * failure detection sends pings, acks. Anti-entropy sends member
+ * tables. The intention to send a data is called IO task and is
+ * stored in a queue that is dispatched when output is possible.
+ */
+struct swim_io_task;
+
+typedef void (*swim_io_task_f)(struct swim_io_task *);
+
+struct swim_io_task {
+ /** Function to execute the task. */
+ swim_io_task_f cb;
+ /** SWIM instance, owning the task. */
+ struct swim *swim;
+ /** Place in a queue of tasks. */
+ struct rlist in_queue_output;
+};
+
+static inline void
+swim_io_task_create(struct swim_io_task *task, swim_io_task_f cb,
+ struct swim *swim)
+{
+ task->cb = cb;
+ task->swim = swim;
+ rlist_create(&task->in_queue_output);
+}
+
+/**
+ * UDP body size is limited by definition. To be able to send a
+ * big message it should be split into multiple packets. Each
+ * packet is self-sufficient piece of data, which can withstand
+ * loss of other packets and be processed independently. It is
+ * kinda like holed TCP stream.
+ */
+struct swim_msg_part {
+ /** Place in the whole big message, struct swim_msg. */
+ struct stailq_entry in_msg;
+ /** Real size. */
+ int size;
+ /** Packet body. */
+ char body[UDP_PACKET_SIZE];
+};
+
+struct swim_msg {
+ struct stailq parts;
+};
+
+static inline bool
+swim_msg_part_is_last(struct swim_msg_part *part)
+{
+ return stailq_next(&part->in_msg) == NULL;
+}
+
+static inline char *
+swim_msg_part_end(struct swim_msg_part *part)
+{
+ return part->body + sizeof(part->body);
+}
+
+static inline char *
+swim_msg_part_pos(struct swim_msg_part *part)
+{
+ return part->body + part->size;
+}
+
+static inline int
+swim_msg_part_available(struct swim_msg_part *part)
+{
+ return swim_msg_part_pos(part) - swim_msg_part_end(part);
+}
+
+static inline void
+swim_msg_part_advance(struct swim_msg_part *part, int size)
+{
+ assert(size <= swim_msg_part_available(part));
+ part->size += size;
+}
+
+static inline struct swim_msg_part *
+swim_msg_part_next(struct swim_msg_part *part)
+{
+ if (swim_msg_part_is_last(part))
+ return NULL;
+ return stailq_next_entry(part, in_msg);
+}
+
+static inline void
+swim_msg_part_delete(struct swim_msg_part *part)
+{
+ free(part);
+}
+
+static inline struct swim_msg_part *
+swim_msg_part_new(struct swim_msg *msg)
+{
+ struct swim_msg_part *res =
+ (struct swim_msg_part *) malloc(sizeof(*res));
+ if (res == NULL) {
+ diag_set(OutOfMemory, sizeof(*res), "malloc", "res");
+ return NULL;
+ }
+ stailq_add_tail_entry(&msg->parts, res, in_msg);
+ res->size = 0;
+ return res;
+}
+
+static inline bool
+swim_msg_is_empty(struct swim_msg *msg)
+{
+ return stailq_empty(&msg->parts);
+}
+
+static inline struct swim_msg_part *
+swim_msg_first_part(struct swim_msg *msg)
+{
+ if (swim_msg_is_empty(msg))
+ return NULL;
+ return stailq_first_entry(&msg->parts, struct swim_msg_part, in_msg);
+}
+
+static inline struct swim_msg_part *
+swim_msg_last_part(struct swim_msg *msg)
+{
+ if (swim_msg_is_empty(msg))
+ return NULL;
+ return stailq_last_entry(&msg->parts, struct swim_msg_part, in_msg);
+}
+
+static inline void
+swim_msg_create(struct swim_msg *msg)
+{
+ stailq_create(&msg->parts);
+}
+
+static inline void
+swim_msg_destroy(struct swim_msg *msg)
+{
+ struct swim_msg_part *part, *tmp;
+ stailq_foreach_entry_safe(part, tmp, &msg->parts, in_msg)
+ swim_msg_part_delete(part);
+}
+
+static inline struct swim_msg_part *
+swim_msg_reserve(struct swim_msg *msg, int min_necessary_size)
+{
+ struct swim_msg_part *part = swim_msg_last_part(msg);
+ assert(min_necessary_size <= (int) sizeof(part->body));
+ if (part == NULL || swim_msg_part_available(part) < min_necessary_size)
+ return swim_msg_part_new(msg);
+ return part;
+}
+
+enum swim_member_status {
+ /**
+ * The instance is ok, it responds to requests, sends its
+ * members table.
+ */
+ MEMBER_ALIVE = 0,
+ swim_member_status_MAX,
+};
+
+static const char *swim_member_status_strs[] = {
+ "alive",
+};
+
+/**
+ * A cluster member description. This structure describes the
+ * last known state of an instance, that is updated periodically
+ * via UDP according to SWIM protocol.
+ */
+struct swim_member {
+ /**
+ * Member status. Since the communication goes via UDP,
+ * actual status can be different, as well as different on
+ * other SWIM nodes. But SWIM guarantees that each member
+ * will learn a real status of an instance sometime.
+ */
+ enum swim_member_status status;
+ /**
+ * Address of the instance to which send UDP packets.
+ * Unique identifier of the member.
+ */
+ struct sockaddr_in addr;
+ /**
+ * Position in a queue of members in the current round.
+ */
+ struct rlist in_queue_round;
+};
+
+/**
+ * SWIM instance. Each instance uses its own UDP port. Tarantool
+ * can have multiple SWIMs.
+ */
+struct swim {
+ /**
+ * Global hash of all known members of the cluster. Hash
+ * key is bitwise combination of ip and port, value is a
+ * struct member, describing a remote instance. The only
+ * purpose of such strange hash function is to be able to
+ * reuse mh_i64ptr_t instead of introducing one more
+ * implementation of mhash.
+ *
+ * Discovered members live here until they are
+ * unavailable - in such a case they are removed from the
+ * hash. But a subset of members are pinned - the ones
+ * added explicitly via API. When a member is pinned, it
+ * can not be removed from the hash, and the module will
+ * ping him constantly.
+ */
+ struct mh_i64ptr_t *members;
+ /**
+ * This node. Used to do not send messages to self, it's
+ * meaningless.
+ */
+ struct swim_member *self;
+ /**
+ * Members to which a message should be sent next during
+ * this round.
+ */
+ struct rlist queue_round;
+ /** Generator of round step events. */
+ struct ev_periodic round_tick;
+ /**
+ * Single round step task. It is impossible to have
+ * multiple round steps at the same time, so it is single
+ * and preallocated per SWIM instance. Other tasks are
+ * mainly pings and acks, attached to member objects and
+ * related to them only.
+ */
+ struct swim_io_task round_step_task;
+ /**
+ * Event dispatcher of incomming messages. Takes them from
+ * network.
+ */
+ struct ev_io input;
+ /**
+ * Event dispatcher of outcomming messages. Takes tasks
+ * from queue_output.
+ */
+ struct ev_io output;
+ /** Virtual transport methods. Just sendto/recvfrom. */
+ struct swim_transport transport;
+ /**
+ * An array of members shuffled on each round. Its head it
+ * sent to each member during one round as an
+ * anti-entropy message.
+ */
+ struct swim_member **shuffled_members;
+ /** Length of shuffled_members. */
+ int shuffled_members_size;
+ /** Queue of output tasks ready to write now. */
+ struct rlist queue_output;
+};
+
+static inline uint64_t
+sockaddr_in_hash(const struct sockaddr_in *a)
+{
+ return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
+}
+
+/**
+ * Main round messages can carry merged failure detection
+ * messages and anti-entropy. With these keys the components can
+ * be distinguished from each other.
+ */
+enum swim_component_type {
+ SWIM_ANTI_ENTROPY = 0,
+};
+
+/** {{{ Anti-entropy component */
+
+/**
+ * Attributes of each record of a broadcasted member table. Just
+ * the same as some of struct swim_member attributes.
+ */
+enum swim_member_key {
+ SWIM_MEMBER_STATUS = 0,
+ /**
+ * Now can only be IP. But in future UNIX sockets can be
+ * added.
+ */
+ SWIM_MEMBER_ADDR,
+ SWIM_MEMBER_PORT,
+ swim_member_key_MAX,
+};
+
+/** SWIM anti-entropy MsgPack header template. */
+struct PACKED swim_anti_entropy_header_bin {
+ /** mp_encode_uint(SWIM_ANTI_ENTROPY) */
+ uint8_t k_anti_entropy;
+ /** mp_encode_array() */
+ uint8_t m_anti_entropy;
+ uint32_t v_anti_entropy;
+};
+
+static inline void
+swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
+ int batch_size)
+{
+ header->k_anti_entropy = SWIM_ANTI_ENTROPY;
+ header->m_anti_entropy = 0xdd;
+ header->v_anti_entropy = mp_bswap_u32(batch_size);
+}
+
+/** SWIM member MsgPack template. */
+struct PACKED swim_member_bin {
+ /** mp_encode_map(3) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDR) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+};
+
+static inline void
+swim_member_bin_reset(struct swim_member_bin *header,
+ struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+}
+
+static inline void
+swim_member_bin_create(struct swim_member_bin *header)
+{
+ header->m_header = 0x83;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDR;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+}
+
+/** }}} Anti-entropy component */
+
+/**
+ * SWIM message structure:
+ * {
+ * SWIM_ANTI_ENTROPY: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDR: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port
+ * },
+ * ...
+ * ],
+ * }
+ */
+
+static inline void
+swim_io_task_push(struct swim_io_task *task)
+{
+ rlist_add_tail_entry(&task->swim->queue_output, task, in_queue_output);
+ ev_io_start(loop(), &task->swim->output);
+}
+
+/**
+ * Register a new member with a specified status. Here it is
+ * added to the hash, to the 'next' queue.
+ */
+static struct swim_member *
+swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
+ enum swim_member_status status)
+{
+ struct swim_member *member =
+ (struct swim_member *) calloc(1, sizeof(*member));
+ if (member == NULL) {
+ diag_set(OutOfMemory, sizeof(*member), "calloc", "member");
+ return NULL;
+ }
+ member->status = status;
+ member->addr = *addr;
+ struct mh_i64ptr_node_t node;
+ node.key = sockaddr_in_hash(addr);
+ node.val = member;
+ mh_int_t rc = mh_i64ptr_put(swim->members, &node, NULL, NULL);
+ if (rc == mh_end(swim->members)) {
+ free(member);
+ diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
+ return NULL;
+ }
+ rlist_add_entry(&swim->queue_round, member, in_queue_round);
+ return member;
+}
+
+static inline struct swim_member *
+swim_find_member(struct swim *swim, const struct sockaddr_in *addr)
+{
+ uint64_t hash = sockaddr_in_hash(addr);
+ mh_int_t node = mh_i64ptr_find(swim->members, hash, NULL);
+ if (node == mh_end(swim->members))
+ return NULL;
+ return (struct swim_member *) mh_i64ptr_node(swim->members, node)->val;
+}
+
+/**
+ * Remove the member from all queues, hashes, destroy it and free
+ * the memory.
+ */
+static inline void
+swim_member_delete(struct swim *swim, struct swim_member *member)
+{
+ uint64_t key = sockaddr_in_hash(&member->addr);
+ mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
+ assert(rc != mh_end(swim->members));
+ mh_i64ptr_del(swim->members, rc, NULL);
+ rlist_del_entry(member, in_queue_round);
+ free(member);
+}
+
+/** At the end of each round members table is shuffled. */
+static int
+swim_shuffle_members(struct swim *swim)
+{
+ struct mh_i64ptr_t *members = swim->members;
+ struct swim_member **shuffled = swim->shuffled_members;
+ int old_size = swim->shuffled_members_size;
+ int new_size = mh_size(members);
+ /* Realloc is too big or too small. */
+ if (old_size < new_size || old_size >= new_size * 2) {
+ int size = sizeof(shuffled[0]) * new_size;
+ struct swim_member **new =
+ (struct swim_member **) realloc(shuffled, size);
+ if (new == NULL) {
+ diag_set(OutOfMemory, size, "realloc", "new");
+ return -1;
+ }
+ shuffled = new;
+ swim->shuffled_members = new;
+ swim->shuffled_members_size = new_size;
+ }
+ int i = 0;
+ for (mh_int_t node = mh_first(members),
+ end = mh_end(members); node != end;
+ node = mh_next(members, node), ++i) {
+ shuffled[i] = (struct swim_member *)
+ mh_i64ptr_node(members, node)->val;
+ /*
+ * rand_max / (end - start + 1) + 1 - scaled range
+ * of random numbers to save distribution. This
+ * way of shuffling proved its uniformity via
+ * testing.
+ */
+ int j = rand() / (RAND_MAX / (i + 1) + 1);
+ SWAP(shuffled[i], shuffled[j]);
+ }
+ return 0;
+}
+
+/**
+ * Shuffle, filter members. Build randomly ordered queue of
+ * addressees. In other words, do all round preparation work.
+ */
+static int
+swim_new_round(struct swim *swim)
+{
+ say_verbose("SWIM: start a new round");
+ if (swim_shuffle_members(swim) != 0)
+ return -1;
+ rlist_create(&swim->queue_round);
+ for (int i = 0; i < swim->shuffled_members_size; ++i) {
+ if (swim->shuffled_members[i] != swim->self) {
+ rlist_add_entry(&swim->queue_round,
+ swim->shuffled_members[i],
+ in_queue_round);
+ }
+ }
+ return 0;
+}
+
+/**
+ * Encode anti-entropy header and members data as many as
+ * possible to the end of a last message part.
+ * @retval Number of members encoded.
+ */
+static int
+swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
+{
+ struct swim_anti_entropy_header_bin ae_header_bin;
+ struct swim_member_bin member_bin;
+ struct swim_msg_part *part = swim_msg_last_part(msg);
+ if (part == NULL)
+ return -1;
+ char *pos = swim_msg_part_pos(part);
+ char *end = swim_msg_part_end(part);
+ char *header = pos;
+ int i = 0;
+ pos += sizeof(ae_header_bin);
+
+ swim_member_bin_create(&member_bin);
+ for (; i < (int) mh_size(swim->members) &&
+ pos + sizeof(member_bin) <= end; ++i) {
+ struct swim_member *member = swim->shuffled_members[i];
+ swim_member_bin_reset(&member_bin, member);
+ memcpy(pos, &member_bin, sizeof(member_bin));
+ pos += sizeof(member_bin);
+ }
+ if (i == 0)
+ return 0;
+ swim_anti_entropy_header_bin_create(&ae_header_bin, i);
+ memcpy(header, &ae_header_bin, sizeof(ae_header_bin));
+ swim_msg_part_advance(part, pos - header);
+ return i;
+}
+
+/** Encode SWIM components into a sequence of UDP packets. */
+static int
+swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
+{
+ swim_msg_create(msg);
+ struct swim_msg_part *part = swim_msg_reserve(msg, 1);
+ if (part == NULL)
+ return -1;
+ char *header = swim_msg_part_pos(part);
+ int rc, map_size = 0;
+ swim_msg_part_advance(part, 1);
+
+ rc = swim_encode_anti_entropy(swim, msg);
+ if (rc < 0)
+ goto error;
+ map_size += rc > 0;
+
+ assert(mp_sizeof_map(map_size) == 1);
+ mp_encode_map(header, map_size);
+ return 0;
+error:
+ swim_msg_destroy(msg);
+ return -1;
+}
+
+/**
+ * Do one round step. Send encoded components to a next member
+ * from the queue.
+ */
+static void
+swim_send_round_msg(struct swim_io_task *task)
+{
+ assert(task->cb == swim_send_round_msg);
+ struct swim *swim = task->swim;
+ if ((swim->shuffled_members == NULL ||
+ rlist_empty(&swim->queue_round)) && swim_new_round(swim) != 0) {
+ diag_log();
+ goto next_round_step;
+ }
+ /*
+ * Possibly empty, if no members but self are specified.
+ */
+ if (rlist_empty(&swim->queue_round))
+ goto next_round_step;
+
+ struct swim_msg msg;
+ if (swim_encode_round_msg(swim, &msg) != 0) {
+ diag_log();
+ goto next_round_step;
+ }
+ struct swim_member *m =
+ rlist_first_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ say_verbose("SWIM: send to %s",
+ sio_strfaddr((struct sockaddr *) &m->addr,
+ sizeof(m->addr)));
+ for (struct swim_msg_part *part = swim_msg_first_part(&msg);
+ part != NULL; part = swim_msg_part_next(part)) {
+ if (swim->transport.send_round_msg(swim->output.fd, part->body,
+ part->size,
+ (struct sockaddr *) &m->addr,
+ sizeof(m->addr)) == -1)
+ diag_log();
+ }
+ swim_msg_destroy(&msg);
+ rlist_del_entry(m, in_queue_round);
+next_round_step:
+ ev_periodic_start(loop(), &swim->round_tick);
+}
+
+static void
+swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_WRITE) != 0);
+ (void) events;
+ struct swim *swim = (struct swim *) io->data;
+ if (rlist_empty(&swim->queue_output)) {
+ ev_io_stop(loop, io);
+ return;
+ }
+ struct swim_io_task *task =
+ rlist_shift_entry(&swim->queue_output, struct swim_io_task,
+ in_queue_output);
+ task->cb(task);
+}
+
+/** Once per specified timeout trigger a next broadcast step. */
+static void
+swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ swim_io_task_push(&swim->round_step_task);
+ ev_periodic_stop(loop, p);
+}
+
+/**
+ * SWIM member attributes from anti-entropy and dissemination
+ * messages.
+ */
+struct swim_member_def {
+ struct sockaddr_in addr;
+ enum swim_member_status status;
+};
+
+static inline void
+swim_member_def_create(struct swim_member_def *def)
+{
+ def->addr.sin_port = 0;
+ def->addr.sin_addr.s_addr = 0;
+ def->status = MEMBER_ALIVE;
+}
+
+static void
+swim_process_member_update(struct swim *swim, struct swim_member_def *def)
+{
+ struct swim_member *member = swim_find_member(swim, &def->addr);
+ /*
+ * Trivial processing of a new member - just add it to the
+ * members table.
+ */
+ if (member == NULL) {
+ member = swim_member_new(swim, &def->addr, def->status);
+ if (member == NULL)
+ diag_log();
+ }
+}
+
+static int
+swim_process_member_key(enum swim_member_key key, const char **pos,
+ const char *end, const char *msg_pref,
+ struct swim_member_def *def)
+{
+ switch(key) {
+ case SWIM_MEMBER_STATUS:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member status should be uint", msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_member_status_MAX) {
+ say_error("%s unknown member status", msg_pref);
+ return -1;
+ }
+ def->status = (enum swim_member_status) key;
+ break;
+ case SWIM_MEMBER_ADDR:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member address should be uint", msg_pref);
+ return -1;
+ }
+ def->addr.sin_addr.s_addr = mp_decode_uint(pos);
+ break;
+ case SWIM_MEMBER_PORT:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member port should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t port = mp_decode_uint(pos);
+ if (port > UINT16_MAX) {
+ say_error("%s member port is invalid", msg_pref);
+ return -1;
+ }
+ def->addr.sin_port = port;
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
+/** Decode an anti-entropy message, update members table. */
+static int
+swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "Invalid SWIM anti-entropy message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s member should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown member key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(swim, &def);
+ }
+ return 0;
+}
+
+/** Receive and process a new message. */
+static void
+swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_READ) != 0);
+ (void) events;
+ (void) loop;
+ const char *msg_pref = "Invalid SWIM message:";
+ struct swim *swim = (struct swim *) io->data;
+ struct sockaddr_in addr;
+ socklen_t len = sizeof(addr);
+ char buffer[UDP_PACKET_SIZE];
+ ssize_t size = swim->transport.recv_msg(io->fd, buffer, sizeof(buffer),
+ (struct sockaddr *) &addr,
+ &len);
+ if (size <= 0) {
+ if (size < 0)
+ diag_log();
+ return;
+ }
+ say_verbose("SWIM: received from %s",
+ sio_strfaddr((struct sockaddr *) &addr, len));
+ const char *pos = buffer;
+ const char *end = pos + size;
+ if (mp_typeof(*pos) != MP_MAP || mp_check_map(pos, end) > 0) {
+ say_error("%s expected map header", msg_pref);
+ return;
+ }
+ uint64_t map_size = mp_decode_map(&pos);
+ for (uint64_t i = 0; i < map_size; ++i) {
+ if (mp_typeof(*pos) != MP_UINT || mp_check_uint(pos, end) > 0) {
+ say_error("%s header should contain uint keys",
+ msg_pref);
+ return;
+ }
+ uint64_t key = mp_decode_uint(&pos);
+ switch(key) {
+ case SWIM_ANTI_ENTROPY:
+ say_verbose("SWIM: process anti-entropy");
+ if (swim_process_anti_entropy(swim, &pos, end) != 0)
+ return;
+ break;
+ default:
+ say_error("%s unknown component type component is "\
+ "supported", msg_pref);
+ return;
+ }
+ }
+}
+
+/**
+ * Convert a string URI like "ip:port" to sockaddr_in structure.
+ */
+static int
+uri_to_addr(const char *str, struct sockaddr_in *addr)
+{
+ struct uri uri;
+ if (uri_parse(&uri, str) != 0 || uri.service == NULL)
+ goto invalid_uri;
+ in_addr_t iaddr;
+ if (uri.host_len == strlen(URI_HOST_UNIX) &&
+ memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) {
+ diag_set(IllegalParams, "Unix sockets are not supported");
+ return -1;
+ }
+ if (uri.host_len == 0) {
+ iaddr = htonl(INADDR_ANY);
+ } else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) {
+ iaddr = htonl(INADDR_LOOPBACK);
+ } else {
+ iaddr = inet_addr(tt_cstr(uri.host, uri.host_len));
+ if (iaddr == (in_addr_t) -1)
+ goto invalid_uri;
+ }
+ int port = htons(atoi(uri.service));
+ memset(addr, 0, sizeof(*addr));
+ addr->sin_family = AF_INET;
+ addr->sin_addr.s_addr = iaddr;
+ addr->sin_port = port;
+ return 0;
+
+invalid_uri:
+ diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str);
+ return -1;
+}
+
+/**
+ * Initialize the module. By default, the module is turned off and
+ * does nothing. To start SWIM swim_cfg is used.
+ */
+struct swim *
+swim_new(void)
+{
+ struct swim *swim = (struct swim *) calloc(1, sizeof(*swim));
+ if (swim == NULL) {
+ diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim");
+ return NULL;
+ }
+ swim->members = mh_i64ptr_new();
+ if (swim->members == NULL) {
+ free(swim);
+ diag_set(OutOfMemory, sizeof(*swim->members), "malloc",
+ "members");
+ return NULL;
+ }
+ rlist_create(&swim->queue_round);
+ ev_init(&swim->round_tick, swim_trigger_round_step);
+ ev_periodic_set(&swim->round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL);
+ swim->round_tick.data = (void *) swim;
+ swim_io_task_create(&swim->round_step_task, swim_send_round_msg, swim);
+ ev_init(&swim->input, swim_on_input);
+ swim->input.data = (void *) swim;
+ ev_init(&swim->output, swim_on_output);
+ swim->output.data = (void *) swim;
+ swim->transport = swim_udp_transport;
+ rlist_create(&swim->queue_output);
+ return swim;
+}
+
+int
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ const struct swim_transport *new_transport)
+{
+ struct sockaddr_in addr;
+ struct swim_member *new_self = swim->self;
+
+ if (uri != NULL) {
+ if (uri_to_addr(uri, &addr) != 0)
+ return -1;
+ struct sockaddr_in cur_addr;
+ socklen_t addrlen = sizeof(cur_addr);
+ int old_fd = swim->input.fd;
+
+ if (old_fd == -1 ||
+ getsockname(old_fd, (struct sockaddr *) &cur_addr,
+ &addrlen) != 0 ||
+ addr.sin_addr.s_addr != cur_addr.sin_addr.s_addr ||
+ addr.sin_port != cur_addr.sin_port) {
+
+ int fd = sio_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (fd < 0)
+ return -1;
+ if (sio_bind(fd, (struct sockaddr *) &addr,
+ sizeof(addr)) != 0 ||
+ evio_setsockopt_server(fd, AF_INET,
+ SOCK_DGRAM) != 0) {
+ if (errno == EADDRINUSE) {
+ diag_set(SocketError,
+ sio_socketname(fd), "bind");
+ }
+ close(fd);
+ return -1;
+ }
+ new_self = swim_find_member(swim, &addr);
+ if (new_self == NULL) {
+ new_self = swim_member_new(swim, &addr,
+ MEMBER_ALIVE);
+ if (new_self == NULL) {
+ close(fd);
+ return -1;
+ }
+ }
+ close(old_fd);
+ ev_io_set(&swim->input, fd, EV_READ);
+ ev_io_set(&swim->output, fd, EV_WRITE);
+ ev_periodic_start(loop(), &swim->round_tick);
+ }
+ }
+
+ if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
+ ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
+
+ swim->self = new_self;
+ if (new_transport != NULL)
+ swim->transport = *new_transport;
+ return 0;
+}
+
+int
+swim_add_member(struct swim *swim, const char *uri)
+{
+ struct sockaddr_in addr;
+ if (uri_to_addr(uri, &addr) != 0)
+ return -1;
+ struct swim_member *member = swim_find_member(swim, &addr);
+ if (member == NULL) {
+ member = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ if (member == NULL)
+ return -1;
+ }
+ return 0;
+}
+
+int
+swim_remove_member(struct swim *swim, const char *uri)
+{
+ struct sockaddr_in addr;
+ if (uri_to_addr(uri, &addr) != 0)
+ return -1;
+ struct swim_member *member = swim_find_member(swim, &addr);
+ if (member != NULL)
+ swim_member_delete(swim, member);
+ return 0;
+}
+
+void
+swim_info(struct swim *swim, struct info_handler *info)
+{
+ info_begin(info);
+ for (mh_int_t node = mh_first(swim->members),
+ end = mh_end(swim->members); node != end;
+ node = mh_next(swim->members, node)) {
+ struct swim_member *member = (struct swim_member *)
+ mh_i64ptr_node(swim->members, node)->val;
+ info_table_begin(info,
+ sio_strfaddr((struct sockaddr *) &member->addr,
+ sizeof(member->addr)));
+ info_append_str(info, "status",
+ swim_member_status_strs[member->status]);
+ info_table_end(info);
+ }
+ info_end(info);
+}
+
+void
+swim_delete(struct swim *swim)
+{
+ close(swim->input.fd);
+ ev_io_stop(loop(), &swim->output);
+ ev_io_stop(loop(), &swim->input);
+ ev_periodic_stop(loop(), &swim->round_tick);
+ mh_int_t node = mh_first(swim->members);
+ while (node != mh_end(swim->members)) {
+ struct swim_member *m = (struct swim_member *)
+ mh_i64ptr_node(swim->members, node)->val;
+ swim_member_delete(swim, m);
+ node = mh_first(swim->members);
+ }
+ mh_i64ptr_delete(swim->members);
+ free(swim->shuffled_members);
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
new file mode 100644
index 000000000..77e16ed53
--- /dev/null
+++ b/src/lib/swim/swim.h
@@ -0,0 +1,121 @@
+#ifndef TARANTOOL_SWIM_H_INCLUDED
+#define TARANTOOL_SWIM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <sys/socket.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct info_handler;
+struct swim;
+
+/**
+ * Virtual methods of SWIM protocol steps. Usual implementation -
+ * just sendto/recvfrom for all methods. But for testing via this
+ * interface errors could be simulated.
+ */
+struct swim_transport {
+ /**
+ * Send regular round message containing dissemination,
+ * failure detection and anti-entropy sections. Parameters
+ * are like sendto().
+ */
+ ssize_t
+ (*send_round_msg)(int fd, const void *data, size_t size,
+ const struct sockaddr *addr, socklen_t addr_size);
+
+ /**
+ * Receive a message. Not necessary round or failure
+ * detection. Before message is received, its type is
+ * unknown. Parameters are like recvfrom().
+ */
+ ssize_t
+ (*recv_msg)(int fd, void *buffer, size_t size, struct sockaddr *addr,
+ socklen_t *addr_size);
+};
+
+/**
+ * Create new SWIM instance. Just creation without binding,
+ * setting any parameters or something. Allocation and
+ * initialization only.
+ */
+struct swim *
+swim_new(void);
+
+/**
+ * Configure or reconfigure a SWIM instance.
+ *
+ * @param swim SWIM instance to configure.
+ * @param uri URI in the format "ip:port".
+ * @param heartbeat_rate Rate of sending round messages. It does
+ * mean that each member will be checked each
+ * @heartbeat_rate seconds. It is rather the protocol
+ * speed. Protocol period depends on member count and
+ * @heartbeat_rate.
+ * @param new_transport Transport API to send/receive messages.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ const struct swim_transport *new_transport);
+
+/**
+ * Stop listening and broadcasting messages, cleanup all internal
+ * structures, free memory.
+ */
+void
+swim_delete(struct swim *swim);
+
+/**
+ * Add a new member. It is added to the members table and pinned.
+ * SWIM will ping the member, but never will delete him, even if
+ * pings fail.
+ */
+int
+swim_add_member(struct swim *swim, const char *uri);
+
+/** Silently remove a member from members table. */
+int
+swim_remove_member(struct swim *swim, const char *uri);
+
+/** Dump member statuses into @a info. */
+void
+swim_info(struct swim *swim, struct info_handler *info);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* TARANTOOL_SWIM_H_INCLUDED */
diff --git a/src/lua/init.c b/src/lua/init.c
index ca4b47f3a..5b47aa3e3 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -58,6 +58,7 @@
#include "lua/fio.h"
#include "lua/httpc.h"
#include "lua/utf8.h"
+#include "lua/swim.h"
#include "digest.h"
#include <small/ibuf.h>
@@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
tarantool_lua_socket_init(L);
tarantool_lua_pickle_init(L);
tarantool_lua_digest_init(L);
+ tarantool_lua_swim_init(L);
luaopen_http_client_driver(L);
lua_pop(L, 1);
luaopen_msgpack(L);
diff --git a/src/lua/swim.c b/src/lua/swim.c
new file mode 100644
index 000000000..5a20783bf
--- /dev/null
+++ b/src/lua/swim.c
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "utils.h"
+#include "diag.h"
+#include "swim/swim.h"
+#include "small/ibuf.h"
+#include "lua/info.h"
+#include <info.h>
+
+/** SWIM instances are pushed as cdata with this id. */
+uint32_t CTID_STRUCT_SWIM_PTR;
+
+/**
+ * Get @a n-th value from a Lua stack as a struct swim pointer.
+ * @param L Lua state.
+ * @param n Where pointer is stored on Lua stack.
+ *
+ * @retval NULL The stack position does not exist or it is not a
+ * struct swim pointer.
+ * @retval not NULL Valid SWIM pointer.
+ */
+static inline struct swim *
+lua_swim_ptr(struct lua_State *L, int n)
+{
+ uint32_t ctypeid;
+ if (lua_type(L, n) != LUA_TCDATA)
+ return NULL;
+ void *swim = luaL_checkcdata(L, n, &ctypeid);
+ if (ctypeid != CTID_STRUCT_SWIM_PTR)
+ return NULL;
+ return *(struct swim **) swim;
+}
+
+/**
+ * Delete SWIM instance passed via first Lua stack position. Used
+ * by Lua GC.
+ */
+static int
+lua_swim_gc(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "SWIM gc expected struct swim *");
+ swim_delete(swim);
+ return 0;
+}
+
+/**
+ * Configure @a swim instance using a table stored in @a ncfg-th
+ * position on Lua stack.
+ * @param L Lua state.
+ * @param ncfg Where configuration is stored on Lua stack.
+ * @param swim SWIM instance to configure.
+ * @param funcname Caller function name to use in error messages.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error, stored in diagnostics area. Critical errors
+ * like OOM or incorrect usage can throw.
+ */
+static int
+lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim,
+ const char *funcname)
+{
+ if (! lua_istable(L, ncfg)) {
+ return luaL_error(L, "swim.%s: expected table config",
+ funcname);
+ }
+
+ const char *server_uri;
+ lua_getfield(L, ncfg, "server");
+ if (lua_isstring(L, -1)) {
+ server_uri = lua_tostring(L, -1);
+ } else if (lua_isnil(L, -1)) {
+ server_uri = NULL;
+ } else {
+ return luaL_error(L, "swim.%s: server should be string URI",
+ funcname);
+ }
+ lua_pop(L, 1);
+
+ double heartbeat_rate;
+ lua_getfield(L, ncfg, "heartbeat");
+ if (lua_isnumber(L, -1)) {
+ heartbeat_rate = lua_tonumber(L, -1);
+ if (heartbeat_rate <= 0) {
+ return luaL_error(L, "swim.%s: heartbeat should be "\
+ "positive number", funcname);
+ }
+ } else if (! lua_isnil(L, -1)) {
+ return luaL_error(L, "swim.%s: heartbeat should be positive "\
+ "number", funcname);
+ } else {
+ heartbeat_rate = -1;
+ }
+ lua_pop(L, 1);
+
+ return swim_cfg(swim, server_uri, heartbeat_rate, NULL);
+}
+
+static int
+lua_swim_new(struct lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top > 1)
+ return luaL_error(L, "Usage: swim.new([{<config>}]");
+ struct swim *swim = swim_new();
+ if (swim != NULL) {
+ *(struct swim **)luaL_pushcdata(L, CTID_STRUCT_SWIM_PTR) = swim;
+ lua_pushcfunction(L, lua_swim_gc);
+ luaL_setcdatagc(L, -2);
+ if (top == 0 || lua_swim_cfg_impl(L, 1, swim, "new") == 0)
+ return 1;
+ lua_pop(L, 1);
+ }
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+}
+
+static int
+lua_swim_cfg(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:cfg({<config>})");
+ if (lua_swim_cfg_impl(L, 2, swim, "cfg") != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+static inline int
+lua_swim_add_remove_member(struct lua_State *L, const char *funcname,
+ int (*action)(struct swim *, const char *))
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (lua_gettop(L) != 2 || swim == NULL)
+ return luaL_error(L, "Usage: swim:%s(uri)", funcname);
+ const char *member_uri;
+ if (lua_isstring(L, -1)) {
+ member_uri = lua_tostring(L, 1);
+ } else {
+ return luaL_error(L, "swim.%s: member URI should be array",
+ funcname);
+ }
+
+ if (action(swim, member_uri) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+static int
+lua_swim_add_member(struct lua_State *L)
+{
+ return lua_swim_add_remove_member(L, "add_member", swim_add_member);
+}
+
+static int
+lua_swim_remove_member(struct lua_State *L)
+{
+ return lua_swim_add_remove_member(L, "remove_member",
+ swim_remove_member);
+}
+
+static int
+lua_swim_delete(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:delete()");
+ swim_delete(swim);
+ uint32_t ctypeid;
+ struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == CTID_STRUCT_SWIM_PTR);
+ *cdata = NULL;
+ return 0;
+}
+
+static int
+lua_swim_info(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:info()");
+ struct info_handler info;
+ luaT_info_handler_create(&info, L);
+ swim_info(swim, &info);
+ return 1;
+}
+
+void
+tarantool_lua_swim_init(struct lua_State *L)
+{
+ static const struct luaL_Reg lua_swim_methods [] = {
+ {"new", lua_swim_new},
+ {"cfg", lua_swim_cfg},
+ {"add_member", lua_swim_add_member},
+ {"remove_member", lua_swim_remove_member},
+ {"delete", lua_swim_delete},
+ {"info", lua_swim_info},
+ {NULL, NULL}
+ };
+ luaL_register_module(L, "swim", lua_swim_methods);
+ lua_pop(L, 1);
+ CTID_STRUCT_SWIM_PTR = luaL_ctypeid(L, "struct swim *");
+ assert(CTID_STRUCT_SWIM_PTR != 0);
+};
diff --git a/src/lua/swim.h b/src/lua/swim.h
new file mode 100644
index 000000000..989cf62b3
--- /dev/null
+++ b/src/lua/swim.h
@@ -0,0 +1,47 @@
+#ifndef INCLUDES_TARANTOOL_LUA_SWIM_H
+#define INCLUDES_TARANTOOL_LUA_SWIM_H
+/*
+ * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct lua_State;
+
+void
+tarantool_lua_swim_init(struct lua_State *L);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* INCLUDES_TARANTOOL_LUA_SWIM_H */
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 7+ messages in thread
* [PATCH v2 2/6] [RAW] swim: introduce failure detection component
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
@ 2018-12-25 19:19 ` Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 3/6] [RAW] swim: introduce a dissemination component Vladislav Shpilevoy
` (3 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-25 19:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Failure detection components allows to find which members are
already dead.
Part of #3234
---
src/lib/swim/swim.c | 457 +++++++++++++++++++++++++++++++++++++++++++-
src/lib/swim/swim.h | 9 +
2 files changed, 459 insertions(+), 7 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 989d83a22..22bc06a60 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -99,6 +99,22 @@ enum {
* networks by their admins.
*/
UDP_PACKET_SIZE = 1472,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack.
+ */
+ ACK_TIMEOUT = 1,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a not pinned member confirmed to be dead, it is
+ * removed from the membership after at least this number
+ * of failed pings.
+ */
+ NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
};
static ssize_t
@@ -124,6 +140,7 @@ swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr,
/** UDP sendto/recvfrom implementation of swim_transport. */
static struct swim_transport swim_udp_transport = {
/* .send_round_msg = */ swim_udp_send_msg,
+ /* .send_failure_detection_msg = */ swim_udp_send_msg,
/* .recv_msg = */ swim_udp_recv_msg,
};
@@ -156,6 +173,12 @@ swim_io_task_create(struct swim_io_task *task, swim_io_task_f cb,
rlist_create(&task->in_queue_output);
}
+static inline void
+swim_io_task_destroy(struct swim_io_task *task)
+{
+ rlist_del_entry(task, in_queue_output);
+}
+
/**
* UDP body size is limited by definition. To be able to send a
* big message it should be split into multiple packets. Each
@@ -287,11 +310,17 @@ enum swim_member_status {
* members table.
*/
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership, if it is not pinned.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
static const char *swim_member_status_strs[] = {
"alive",
+ "dead",
};
/**
@@ -316,6 +345,38 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * True, if the member is configured explicitly and can
+ * not disappear from the membership.
+ */
+ bool is_pinned;
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row. After
+ * a threshold the instance is marked as dead. After more
+ * it is removed from the table (if not pinned).
+ */
+ int failed_pings;
+ /** When the last ping was sent. */
+ double ping_ts;
+ /**
+ * Ready at hand ack task to send when this member has
+ * sent ping to us.
+ */
+ struct swim_io_task ack_task;
+ /**
+ * Ready at hand ping task to send when this member too
+ * long does not respond to an initial ping, piggybacked
+ * with members table.
+ */
+ struct swim_io_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct rlist in_queue_wait_ack;
};
/**
@@ -381,6 +442,15 @@ struct swim {
int shuffled_members_size;
/** Queue of output tasks ready to write now. */
struct rlist queue_output;
+ /**
+ * Members waiting for an ACK. On too long absence of ACK
+ * a member is considered to be dead and is removed. The
+ * list is sorted by time in ascending order (tail is
+ * newer, head is older).
+ */
+ struct rlist queue_wait_ack;
+ /** Generator of ack checking events. */
+ struct ev_periodic wait_ack_tick;
};
static inline uint64_t
@@ -396,8 +466,84 @@ sockaddr_in_hash(const struct sockaddr_in *a)
*/
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
+ SWIM_FAILURE_DETECTION,
};
+/** {{{ Failure detection component */
+
+/** Possible failure detection keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
+};
+
+/**
+ * Failure detection message now has only two types: ping or ack.
+ * Indirect ping/ack are todo.
+ */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+static const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
+};
+
+/** SWIM failure detection MsgPack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+static void
+swim_member_schedule_ack_wait(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_wait_ack)) {
+ member->ping_ts = fiber_time();
+ rlist_add_tail_entry(&swim->queue_wait_ack, member,
+ in_queue_wait_ack);
+ }
+}
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -412,6 +558,7 @@ enum swim_member_key {
*/
SWIM_MEMBER_ADDR,
SWIM_MEMBER_PORT,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -435,7 +582,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
/** SWIM member MsgPack template. */
struct PACKED swim_member_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -454,6 +601,12 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(addr.sin_port) */
uint8_t m_port;
uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
static inline void
@@ -463,17 +616,20 @@ swim_member_bin_reset(struct swim_member_bin *header,
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
}
static inline void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x83;
+ header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDR;
header->m_addr = 0xce;
header->k_port = SWIM_MEMBER_PORT;
header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
/** }}} Anti-entropy component */
@@ -481,11 +637,19 @@ swim_member_bin_create(struct swim_member_bin *header)
/**
* SWIM message structure:
* {
+ * SWIM_FAILURE_DETECTION: {
+ * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type,
+ * SWIM_FD_INCARNATION: uint
+ * },
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
* SWIM_MEMBER_ADDR: uint, ip,
- * SWIM_MEMBER_PORT: uint, port
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
* },
* ...
* ],
@@ -499,13 +663,45 @@ swim_io_task_push(struct swim_io_task *task)
ev_io_start(loop(), &task->swim->output);
}
+/**
+ * Update status of the member if needed. Statuses are compared as
+ * a compound key: {incarnation, status}. So @a new_status can
+ * override an old one only if its incarnation is greater, or the
+ * same, but its status is "bigger". Statuses are compared by
+ * their identifier, so "alive" < "dead". This protects from the
+ * case when a member is detected as dead on one instance, but
+ * overriden by another instance with the same incarnation "alive"
+ * message.
+ */
+static inline void
+swim_member_update_status(struct swim *swim, struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation)
+{
+ (void) swim;
+ assert(member != swim->self);
+ if (member->incarnation == incarnation) {
+ if (member->status < new_status)
+ member->status = new_status;
+ } else if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ }
+}
+
+static void
+swim_send_ack(struct swim_io_task *task);
+
+static void
+swim_send_ping(struct swim_io_task *task);
+
/**
* Register a new member with a specified status. Here it is
* added to the hash, to the 'next' queue.
*/
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
- enum swim_member_status status)
+ enum swim_member_status status, uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -515,6 +711,7 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
}
member->status = status;
member->addr = *addr;
+ member->incarnation = incarnation;
struct mh_i64ptr_node_t node;
node.key = sockaddr_in_hash(addr);
node.val = member;
@@ -524,7 +721,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
return NULL;
}
+ swim_io_task_create(&member->ack_task, swim_send_ack, swim);
+ swim_io_task_create(&member->ping_task, swim_send_ping, swim);
rlist_add_entry(&swim->queue_round, member, in_queue_round);
+ rlist_create(&member->in_queue_wait_ack);
return member;
}
@@ -549,7 +749,10 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
assert(rc != mh_end(swim->members));
mh_i64ptr_del(swim->members, rc, NULL);
+ swim_io_task_destroy(&member->ack_task);
+ swim_io_task_destroy(&member->ping_task);
rlist_del_entry(member, in_queue_round);
+ rlist_del_entry(member, in_queue_wait_ack);
free(member);
}
@@ -648,6 +851,27 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
return i;
}
+/**
+ * Encode failure detection component.
+ * @retval Number of encoded messages.
+ */
+static int
+swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
+ enum swim_fd_msg_type type)
+{
+ struct swim_fd_header_bin fd_header_bin;
+ int size = sizeof(fd_header_bin);
+ struct swim_msg_part *part = swim_msg_reserve(msg, size);
+ if (part == NULL)
+ return -1;
+ char *pos = swim_msg_part_pos(part);
+ swim_fd_header_bin_create(&fd_header_bin, type,
+ swim->self->incarnation);
+ memcpy(pos, &fd_header_bin, size);
+ swim_msg_part_advance(part, size);
+ return 1;
+}
+
/** Encode SWIM components into a sequence of UDP packets. */
static int
swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -660,6 +884,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
int rc, map_size = 0;
swim_msg_part_advance(part, 1);
+ rc = swim_encode_failure_detection(swim, msg, SWIM_FD_MSG_PING);
+ if (rc < 0)
+ goto error;
+ map_size += rc > 0;
+
rc = swim_encode_anti_entropy(swim, msg);
if (rc < 0)
goto error;
@@ -713,11 +942,58 @@ swim_send_round_msg(struct swim_io_task *task)
diag_log();
}
swim_msg_destroy(&msg);
+ swim_member_schedule_ack_wait(swim, m);
rlist_del_entry(m, in_queue_round);
next_round_step:
ev_periodic_start(loop(), &swim->round_tick);
}
+/** Send a failure detection message. */
+static void
+swim_send_fd_message(struct swim *swim, struct swim_member *m,
+ enum swim_fd_msg_type type)
+{
+ struct swim_msg msg;
+ swim_msg_create(&msg);
+ int rc = swim_encode_failure_detection(swim, &msg, type);
+ if (rc < 0) {
+ diag_log();
+ swim_msg_destroy(&msg);
+ return;
+ }
+ assert(rc > 0);
+ struct swim_msg_part *part = swim_msg_first_part(&msg);
+ struct sockaddr *addr = (struct sockaddr *) &m->addr;
+ assert(swim_msg_part_is_last(part));
+ say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+ sio_strfaddr(addr, sizeof(m->addr)));
+ if (swim->transport.send_failure_detection_msg(swim->output.fd,
+ part->body, part->size,
+ addr,
+ sizeof(m->addr)) == -1)
+ diag_log();
+ swim_msg_destroy(&msg);
+}
+
+static void
+swim_send_ack(struct swim_io_task *task)
+{
+ assert(task->cb == swim_send_ack);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ack_task);
+ swim_send_fd_message(task->swim, m, SWIM_FD_MSG_ACK);
+}
+
+static void
+swim_send_ping(struct swim_io_task *task)
+{
+ assert(task->cb == swim_send_ping);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ping_task);
+ swim_send_fd_message(task->swim, m, SWIM_FD_MSG_PING);
+ swim_member_schedule_ack_wait(task->swim, m);
+}
+
static void
swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
{
@@ -745,12 +1021,43 @@ swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events)
ev_periodic_stop(loop, p);
}
+/**
+ * Check for failed pings. A ping is failed if an ack was not
+ * received during ACK_TIMEOUT. A failed ping is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) loop;
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ struct swim_member *m, *tmp;
+ double current_time = fiber_time();
+ rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
+ tmp) {
+ if (current_time - m->ping_ts < ACK_TIMEOUT)
+ break;
+ ++m->failed_pings;
+ if (m->failed_pings >= NO_ACKS_TO_GC) {
+ if (!m->is_pinned)
+ swim_member_delete(swim, m);
+ continue;
+ }
+ if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ m->status = MEMBER_DEAD;
+ swim_io_task_push(&m->ping_task);
+ rlist_del_entry(m, in_queue_wait_ack);
+ }
+}
+
/**
* SWIM member attributes from anti-entropy and dissemination
* messages.
*/
struct swim_member_def {
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -760,6 +1067,7 @@ swim_member_def_create(struct swim_member_def *def)
def->addr.sin_port = 0;
def->addr.sin_addr.s_addr = 0;
def->status = MEMBER_ALIVE;
+ def->incarnation = 0;
}
static void
@@ -771,9 +1079,35 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
* members table.
*/
if (member == NULL) {
- member = swim_member_new(swim, &def->addr, def->status);
+ member = swim_member_new(swim, &def->addr, def->status,
+ def->incarnation);
if (member == NULL)
diag_log();
+ return;
+ }
+ struct swim_member *self = swim->self;
+ if (member != self) {
+ swim_member_update_status(swim, member, def->status,
+ def->incarnation);
+ return;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
}
}
@@ -817,6 +1151,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
}
def->addr.sin_port = port;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ def->incarnation = mp_decode_uint(pos);
+ break;
default:
unreachable();
}
@@ -868,6 +1211,91 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule pings, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(struct swim *swim, const char **pos,
+ const char *end, const struct sockaddr_in *src)
+{
+ const char *msg_pref = "Invalid SWIM failure detection message:";
+ if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
+ say_error("%s root should be a map", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_map(pos);
+ if (size != 2) {
+ say_error("%s root map should have two keys - message type "\
+ "and incarnation", msg_pref);
+ return -1;
+ }
+ enum swim_fd_msg_type type = swim_fd_msg_type_MAX;
+ uint64_t incarnation = 0;
+ for (int i = 0; i < (int) size; ++i) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s a key should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s message type should be uint",
+ msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_fd_msg_type_MAX) {
+ say_error("%s unknown message type", msg_pref);
+ return -1;
+ }
+ type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ incarnation = mp_decode_uint(pos);
+ break;
+ default:
+ say_error("%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (type == swim_fd_msg_type_MAX) {
+ say_error("%s message type should be specified", msg_pref);
+ return -1;
+ }
+ struct swim_member *sender = swim_find_member(swim, src);
+ if (sender == NULL) {
+ sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation);
+ if (sender == NULL) {
+ diag_log();
+ return 0;
+ }
+ } else {
+ swim_member_update_status(swim, sender, MEMBER_ALIVE,
+ incarnation);
+ }
+ if (type == SWIM_FD_MSG_PING) {
+ swim_io_task_push(&sender->ack_task);
+ } else {
+ assert(type == SWIM_FD_MSG_ACK);
+ if (incarnation >= sender->incarnation) {
+ sender->failed_pings = 0;
+ rlist_del_entry(&sender->ping_task, in_queue_output);
+ rlist_del_entry(sender, in_queue_wait_ack);
+ }
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -910,6 +1338,12 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
if (swim_process_anti_entropy(swim, &pos, end) != 0)
return;
break;
+ case SWIM_FAILURE_DETECTION:
+ say_verbose("SWIM: process failure detection");
+ if (swim_process_failure_detection(swim, &pos, end,
+ &addr) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -984,6 +1418,10 @@ swim_new(void)
swim->output.data = (void *) swim;
swim->transport = swim_udp_transport;
rlist_create(&swim->queue_output);
+ rlist_create(&swim->queue_wait_ack);
+ ev_init(&swim->wait_ack_tick, swim_check_acks);
+ ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
+ swim->wait_ack_tick.data = (void *) swim;
return swim;
}
@@ -1024,7 +1462,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
new_self = swim_find_member(swim, &addr);
if (new_self == NULL) {
new_self = swim_member_new(swim, &addr,
- MEMBER_ALIVE);
+ MEMBER_ALIVE, 0);
if (new_self == NULL) {
close(fd);
return -1;
@@ -1034,6 +1472,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
ev_io_set(&swim->input, fd, EV_READ);
ev_io_set(&swim->output, fd, EV_WRITE);
ev_periodic_start(loop(), &swim->round_tick);
+ ev_periodic_start(loop(), &swim->wait_ack_tick);
}
}
@@ -1054,9 +1493,10 @@ swim_add_member(struct swim *swim, const char *uri)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
if (member == NULL) {
- member = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
if (member == NULL)
return -1;
+ member->is_pinned = true;
}
return 0;
}
@@ -1087,6 +1527,8 @@ swim_info(struct swim *swim, struct info_handler *info)
sizeof(member->addr)));
info_append_str(info, "status",
swim_member_status_strs[member->status]);
+ info_append_int(info, "incarnation",
+ (int64_t) member->incarnation);
info_table_end(info);
}
info_end(info);
@@ -1099,6 +1541,7 @@ swim_delete(struct swim *swim)
ev_io_stop(loop(), &swim->output);
ev_io_stop(loop(), &swim->input);
ev_periodic_stop(loop(), &swim->round_tick);
+ ev_periodic_stop(loop(), &swim->wait_ack_tick);
mh_int_t node = mh_first(swim->members);
while (node != mh_end(swim->members)) {
struct swim_member *m = (struct swim_member *)
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 77e16ed53..51f0c144d 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -54,6 +54,15 @@ struct swim_transport {
(*send_round_msg)(int fd, const void *data, size_t size,
const struct sockaddr *addr, socklen_t addr_size);
+ /**
+ * Send failure detection message. Contains ping, ack.
+ * Parameters are like sendto().
+ */
+ ssize_t
+ (*send_failure_detection_msg)(int fd, const void *data, size_t size,
+ const struct sockaddr *addr,
+ socklen_t addr_size);
+
/**
* Receive a message. Not necessary round or failure
* detection. Before message is received, its type is
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 7+ messages in thread
* [PATCH v2 3/6] [RAW] swim: introduce a dissemination component
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 2/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
@ 2018-12-25 19:19 ` Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 4/6] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
` (2 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-25 19:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Dissemination components broadcasts events about member status
updates.
Part of #3234
---
src/lib/swim/swim.c | 270 +++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 265 insertions(+), 5 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 22bc06a60..15e079f11 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -377,6 +377,26 @@ struct swim_member {
struct swim_io_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about member status update. So formally,
+ * this structure already has all the needed attributes.
+ * But also an event somehow should be sent to all members
+ * at least once according to SWIM, so it requires
+ * something like TTL, which decrements on each send. And
+ * a member can not be removed from the global table until
+ * it gets dead and its dissemination TTL is 0, so as to
+ * allow other members learn its dead status.
+ */
+ int dissemination_ttl;
+ /**
+ * Events are put into a queue sorted by event occurrence
+ * time.
+ */
+ struct rlist in_queue_events;
};
/**
@@ -451,6 +471,8 @@ struct swim {
struct rlist queue_wait_ack;
/** Generator of ack checking events. */
struct ev_periodic wait_ack_tick;
+ /** Queue of events sorted by occurrence time. */
+ struct rlist queue_events;
};
static inline uint64_t
@@ -467,6 +489,7 @@ sockaddr_in_hash(const struct sockaddr_in *a)
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/** {{{ Failure detection component */
@@ -634,6 +657,88 @@ swim_member_bin_create(struct swim_member_bin *header)
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xdd;
+ header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(4) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDR) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ header->m_header = 0x84;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDR;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_events)) {
+ rlist_add_tail_entry(&swim->queue_events, member,
+ in_queue_events);
+ }
+ member->dissemination_ttl = mh_size(swim->members);
+}
+
+/** }}} Dissemination component */
+
/**
* SWIM message structure:
* {
@@ -644,6 +749,18 @@ swim_member_bin_create(struct swim_member_bin *header)
*
* OR/AND
*
+ * SWIM_DISSEMINATION: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDR: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
+ * },
+ * ...
+ * ],
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -663,6 +780,16 @@ swim_io_task_push(struct swim_io_task *task)
ev_io_start(loop(), &task->swim->output);
}
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_is_updated(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_event(swim, member);
+}
+
/**
* Update status of the member if needed. Statuses are compared as
* a compound key: {incarnation, status}. So @a new_status can
@@ -678,14 +805,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
enum swim_member_status new_status,
uint64_t incarnation)
{
- (void) swim;
assert(member != swim->self);
if (member->incarnation == incarnation) {
- if (member->status < new_status)
+ if (member->status < new_status) {
member->status = new_status;
+ swim_member_is_updated(swim, member);
+ }
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
+ swim_member_is_updated(swim, member);
}
}
@@ -725,6 +854,8 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
swim_io_task_create(&member->ping_task, swim_send_ping, swim);
rlist_add_entry(&swim->queue_round, member, in_queue_round);
rlist_create(&member->in_queue_wait_ack);
+ rlist_create(&member->in_queue_events);
+ swim_schedule_event(swim, member);
return member;
}
@@ -753,6 +884,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
swim_io_task_destroy(&member->ping_task);
rlist_del_entry(member, in_queue_round);
rlist_del_entry(member, in_queue_wait_ack);
+ assert(rlist_empty(&member->in_queue_events));
free(member);
}
@@ -872,6 +1004,62 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
return 1;
}
+static int
+swim_encode_dissemination_part(struct swim_msg *msg, struct rlist **queue_pos)
+{
+ struct swim_diss_header_bin diss_header_bin;
+ struct swim_msg_part *part =
+ swim_msg_reserve(msg, sizeof(diss_header_bin));
+ if (part == NULL)
+ return -1;
+ char *header = swim_msg_part_pos(part);
+ char *end = swim_msg_part_end(part);
+ char *pos = header + sizeof(diss_header_bin);
+
+ int i = 0;
+ struct swim_member *member, *prev = NULL;
+ struct swim_event_bin event_bin;
+ swim_event_bin_create(&event_bin);
+ rlist_foreach_entry(member, *queue_pos, in_queue_events) {
+ if (pos + sizeof(event_bin) > end)
+ break;
+ swim_event_bin_reset(&event_bin, member);
+ memcpy(pos, &event_bin, sizeof(event_bin));
+ pos += sizeof(event_bin);
+ ++i;
+ prev = member;
+ }
+ if (i == 0)
+ return 0;
+ swim_diss_header_bin_create(&diss_header_bin, i);
+ memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+ swim_msg_part_advance(part, pos - header);
+ if (prev == rlist_last_entry(*queue_pos, struct swim_member,
+ in_queue_events))
+ *queue_pos = NULL;
+ else
+ *queue_pos = rlist_next(&prev->in_queue_events);
+ return i;
+}
+
+/**
+ * Encode failure dissemination component.
+ * @retval Number of encoded events.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
+{
+ int count = 0;
+ struct rlist *pos = rlist_first(&swim->queue_events);
+ while (pos != rlist_last(&swim->queue_events)) {
+ int rc = swim_encode_dissemination_part(msg, &pos);
+ if (rc < 0)
+ return -1;
+ count += rc;
+ }
+ return count;
+}
+
/** Encode SWIM components into a sequence of UDP packets. */
static int
swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -889,6 +1077,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
goto error;
map_size += rc > 0;
+ rc = swim_encode_dissemination(swim, msg);
+ if (rc < 0)
+ goto error;
+ map_size += rc > 0;
+
rc = swim_encode_anti_entropy(swim, msg);
if (rc < 0)
goto error;
@@ -902,6 +1095,17 @@ error:
return -1;
}
+static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+ struct swim_member *member, *tmp;
+ rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+ tmp) {
+ if (--member->dissemination_ttl == 0)
+ rlist_del_entry(member, in_queue_events);
+ }
+}
+
/**
* Do one round step. Send encoded components to a next member
* from the queue.
@@ -943,6 +1147,7 @@ swim_send_round_msg(struct swim_io_task *task)
}
swim_msg_destroy(&msg);
swim_member_schedule_ack_wait(swim, m);
+ swim_decrease_events_ttl(swim);
rlist_del_entry(m, in_queue_round);
next_round_step:
ev_periodic_start(loop(), &swim->round_tick);
@@ -1040,12 +1245,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
break;
++m->failed_pings;
if (m->failed_pings >= NO_ACKS_TO_GC) {
- if (!m->is_pinned)
+ if (!m->is_pinned && m->dissemination_ttl == 0)
swim_member_delete(swim, m);
continue;
}
- if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ if (m->failed_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
+ swim_member_is_updated(swim, m);
+ }
swim_io_task_push(&m->ping_task);
rlist_del_entry(m, in_queue_wait_ack);
}
@@ -1296,6 +1503,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
return 0;
}
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "Invald SWIM dissemination message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s event should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s event key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown event key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(swim, &def);
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -1344,6 +1595,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
&addr) != 0)
return;
break;
+ case SWIM_DISSEMINATION:
+ say_verbose("SWIM: process dissemination");
+ if (swim_process_dissemination(swim, &pos, end) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -1422,6 +1678,7 @@ swim_new(void)
ev_init(&swim->wait_ack_tick, swim_check_acks);
ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
swim->wait_ack_tick.data = (void *) swim;
+ rlist_create(&swim->queue_events);
return swim;
}
@@ -1508,8 +1765,10 @@ swim_remove_member(struct swim *swim, const char *uri)
if (uri_to_addr(uri, &addr) != 0)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
- if (member != NULL)
+ if (member != NULL) {
+ rlist_del_entry(member, in_queue_events);
swim_member_delete(swim, member);
+ }
return 0;
}
@@ -1546,6 +1805,7 @@ swim_delete(struct swim *swim)
while (node != mh_end(swim->members)) {
struct swim_member *m = (struct swim_member *)
mh_i64ptr_node(swim->members, node)->val;
+ rlist_del_entry(m, in_queue_events);
swim_member_delete(swim, m);
node = mh_first(swim->members);
}
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 7+ messages in thread
* [PATCH v2 4/6] [RAW] swim: keep encoded round message cached
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
` (2 preceding siblings ...)
2018-12-25 19:19 ` [PATCH v2 3/6] [RAW] swim: introduce a dissemination component Vladislav Shpilevoy
@ 2018-12-25 19:19 ` Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Vladislav Shpilevoy
2018-12-26 21:01 ` [tarantool-patches] [PATCH v2 0/6] SWIM Vladislav Shpilevoy
5 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-25 19:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
During a SWIM round a message is being handed out consisting of
at most 3 sections. Parts of the message change rarely, by
member attributes update and by removal of some of them. So it is
possible to cache the message and send it during several round
steps in a row. Or even do not rebuild it the whole round.
Also, it allows to send message parts on separate libev EV_WRITE
events, because now the message is stored globally and can be
iterated from different events.
Part of #3234
---
src/lib/swim/swim.c | 52 ++++++++++++++++++++++++++++++++++++++-------
1 file changed, 44 insertions(+), 8 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 15e079f11..f880066c5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -345,6 +345,13 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ * It is true, if the member is being sent in the current
+ * SWIM round, so it is encoded into cached round msg.
+ * When this flag is true, and the member has changed or
+ * removed, the cached round msg is invalidated.
+ */
+ bool is_being_sent_in_this_round;
/**
*
* Failure detection component
@@ -473,6 +480,17 @@ struct swim {
struct ev_periodic wait_ack_tick;
/** Queue of events sorted by occurrence time. */
struct rlist queue_events;
+ /**
+ * When a new round starts, it builds a new message
+ * consisting of up to 3 components. This message is then
+ * being handed out to the cluster members. Most of time
+ * the message remains unchanged and can be cached to do
+ * not rebuild it on each step. It should be invalidated
+ * in 2 cases only: a member, encoded here, has changed
+ * its attributes (incarnation, status); a member, encoded
+ * here, is dead too long and removed.
+ */
+ struct swim_msg cached_round_msg;
};
static inline uint64_t
@@ -481,6 +499,13 @@ sockaddr_in_hash(const struct sockaddr_in *a)
return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
}
+static inline void
+cached_round_msg_invalidate(struct swim *swim)
+{
+ swim_msg_destroy(&swim->cached_round_msg);
+ swim_msg_create(&swim->cached_round_msg);
+}
+
/**
* Main round messages can carry merged failure detection
* messages and anti-entropy. With these keys the components can
@@ -788,6 +813,8 @@ static void
swim_member_is_updated(struct swim *swim, struct swim_member *member)
{
swim_schedule_event(swim, member);
+ if (member->is_being_sent_in_this_round)
+ cached_round_msg_invalidate(swim);
}
/**
@@ -876,6 +903,8 @@ swim_find_member(struct swim *swim, const struct sockaddr_in *addr)
static inline void
swim_member_delete(struct swim *swim, struct swim_member *member)
{
+ if (member->is_being_sent_in_this_round)
+ cached_round_msg_invalidate(swim);
uint64_t key = sockaddr_in_hash(&member->addr);
mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
assert(rc != mh_end(swim->members));
@@ -924,6 +953,7 @@ swim_shuffle_members(struct swim *swim)
int j = rand() / (RAND_MAX / (i + 1) + 1);
SWAP(shuffled[i], shuffled[j]);
}
+ cached_round_msg_invalidate(swim);
return 0;
}
@@ -1062,9 +1092,9 @@ swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
/** Encode SWIM components into a sequence of UDP packets. */
static int
-swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
+swim_encode_round_msg(struct swim *swim)
{
- swim_msg_create(msg);
+ struct swim_msg *msg = &swim->cached_round_msg;
struct swim_msg_part *part = swim_msg_reserve(msg, 1);
if (part == NULL)
return -1;
@@ -1089,9 +1119,13 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
assert(mp_sizeof_map(map_size) == 1);
mp_encode_map(header, map_size);
+ for (int i = 0; i < rc; ++i) {
+ struct swim_member *member = swim->shuffled_members[i];
+ member->is_being_sent_in_this_round = true;
+ }
return 0;
error:
- swim_msg_destroy(msg);
+ cached_round_msg_invalidate(swim);
return -1;
}
@@ -1126,8 +1160,8 @@ swim_send_round_msg(struct swim_io_task *task)
if (rlist_empty(&swim->queue_round))
goto next_round_step;
- struct swim_msg msg;
- if (swim_encode_round_msg(swim, &msg) != 0) {
+ if (swim_msg_is_empty(&swim->cached_round_msg) &&
+ swim_encode_round_msg(swim) != 0) {
diag_log();
goto next_round_step;
}
@@ -1137,15 +1171,15 @@ swim_send_round_msg(struct swim_io_task *task)
say_verbose("SWIM: send to %s",
sio_strfaddr((struct sockaddr *) &m->addr,
sizeof(m->addr)));
- for (struct swim_msg_part *part = swim_msg_first_part(&msg);
- part != NULL; part = swim_msg_part_next(part)) {
+ for (struct swim_msg_part *part =
+ swim_msg_first_part(&swim->cached_round_msg); part != NULL;
+ part = swim_msg_part_next(part)) {
if (swim->transport.send_round_msg(swim->output.fd, part->body,
part->size,
(struct sockaddr *) &m->addr,
sizeof(m->addr)) == -1)
diag_log();
}
- swim_msg_destroy(&msg);
swim_member_schedule_ack_wait(swim, m);
swim_decrease_events_ttl(swim);
rlist_del_entry(m, in_queue_round);
@@ -1679,6 +1713,7 @@ swim_new(void)
ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
swim->wait_ack_tick.data = (void *) swim;
rlist_create(&swim->queue_events);
+ swim_msg_create(&swim->cached_round_msg);
return swim;
}
@@ -1811,4 +1846,5 @@ swim_delete(struct swim *swim)
}
mh_i64ptr_delete(swim->members);
free(swim->shuffled_members);
+ cached_round_msg_invalidate(swim);
}
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 7+ messages in thread
* [PATCH v2 5/6] [RAW] swim: send one UDP packet per EV_WRITE event
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
` (3 preceding siblings ...)
2018-12-25 19:19 ` [PATCH v2 4/6] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
@ 2018-12-25 19:19 ` Vladislav Shpilevoy
2018-12-26 21:01 ` [tarantool-patches] [PATCH v2 0/6] SWIM Vladislav Shpilevoy
5 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-25 19:19 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Since the first commit of #3234, where anti-entropy component was
introduced, a single SWIM message could be split into multiple
UDP packets. But so far these packets were being sent in mere
'for' loop on a single EV_WRITE event. It is not proper way of
using event loop, but the simplest, because does not require any
externally stored positions in packet lists.
The previous commit introduced such global list of UDP packets to
send, and now it is much simpler to send each packet on separate
EV_WRITE event. This commit does it.
Part of #3234
---
src/lib/swim/swim.c | 36 +++++++++++++++++++++++++++---------
1 file changed, 27 insertions(+), 9 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index f880066c5..ed0b323e5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -191,6 +191,12 @@ struct swim_msg_part {
struct stailq_entry in_msg;
/** Real size. */
int size;
+ /**
+ * True, if this message part carries a failure detection
+ * component. Used to decide whether SWIM should wait for
+ * an ACK.
+ */
+ bool is_ack_required;
/** Packet body. */
char body[UDP_PACKET_SIZE];
};
@@ -255,6 +261,7 @@ swim_msg_part_new(struct swim_msg *msg)
}
stailq_add_tail_entry(&msg->parts, res, in_msg);
res->size = 0;
+ res->is_ack_required = false;
return res;
}
@@ -491,6 +498,8 @@ struct swim {
* here, is dead too long and removed.
*/
struct swim_msg cached_round_msg;
+ /** Next to send position in cached_round_msg. */
+ struct swim_msg_part *cached_msg_pos;
};
static inline uint64_t
@@ -504,6 +513,7 @@ cached_round_msg_invalidate(struct swim *swim)
{
swim_msg_destroy(&swim->cached_round_msg);
swim_msg_create(&swim->cached_round_msg);
+ swim->cached_msg_pos = NULL;
}
/**
@@ -1031,6 +1041,7 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
swim->self->incarnation);
memcpy(pos, &fd_header_bin, size);
swim_msg_part_advance(part, size);
+ part->is_ack_required = true;
return 1;
}
@@ -1123,6 +1134,7 @@ swim_encode_round_msg(struct swim *swim)
struct swim_member *member = swim->shuffled_members[i];
member->is_being_sent_in_this_round = true;
}
+ swim->cached_msg_pos = swim_msg_first_part(msg);
return 0;
error:
cached_round_msg_invalidate(swim);
@@ -1171,16 +1183,22 @@ swim_send_round_msg(struct swim_io_task *task)
say_verbose("SWIM: send to %s",
sio_strfaddr((struct sockaddr *) &m->addr,
sizeof(m->addr)));
- for (struct swim_msg_part *part =
- swim_msg_first_part(&swim->cached_round_msg); part != NULL;
- part = swim_msg_part_next(part)) {
- if (swim->transport.send_round_msg(swim->output.fd, part->body,
- part->size,
- (struct sockaddr *) &m->addr,
- sizeof(m->addr)) == -1)
- diag_log();
+ struct swim_msg_part *part = swim->cached_msg_pos;
+ if (swim->transport.send_round_msg(swim->output.fd, part->body,
+ part->size,
+ (struct sockaddr *) &m->addr,
+ sizeof(m->addr)) == -1)
+ diag_log();
+ if (part->is_ack_required)
+ swim_member_schedule_ack_wait(swim, m);
+ part = swim_msg_part_next(part);
+ if (part != NULL) {
+ swim->cached_msg_pos = part;
+ /* Push again until all parts are sent. */
+ swim_io_task_push(task);
+ return;
}
- swim_member_schedule_ack_wait(swim, m);
+ swim->cached_msg_pos = swim_msg_first_part(&swim->cached_round_msg);
swim_decrease_events_ttl(swim);
rlist_del_entry(m, in_queue_round);
next_round_step:
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [tarantool-patches] [PATCH v2 0/6] SWIM
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
` (4 preceding siblings ...)
2018-12-25 19:19 ` [PATCH v2 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Vladislav Shpilevoy
@ 2018-12-26 21:01 ` Vladislav Shpilevoy
5 siblings, 0 replies; 7+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-26 21:01 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Hi! Sorry, if you still did not start a review,
do not pay a lot of attention to internals - I've
already reworked them mainly because of struggles
with adding payload and indirect pings.
But still stay as sharp as possible reviewing
public API, especially C in swim.h.
Just FYI.
On 25/12/2018 22:19, Vladislav Shpilevoy wrote:
> First commit message is a comprehensive information about SWIM which I will not
> duplicate here. This is only description of the patchset.
>
> SWIM consists of two main components - dissemination and failure detection, and
> one additional component - anti-entropy. The patchset introduces them one by one
> in the first three commits.
>
> Last two commits are technical improvements.
>
> Note, these commits contain bugs, typos, and have no tests. The goal of this
> review is a highlevel approval of API so as to start writing tests.
>
> Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-3234-swim
> Issue: https://github.com/tarantool/tarantool/issues/3234
>
> Changes in v2:
> - new API with explicit members addition, removal;
> - ability to create multiple SWIM instances per one Tarantool process;
> - multi-packet sending of one SWIM message.
>
> V1: https://www.freelists.org/post/tarantool-patches/PATCH-05-SWIM
>
> Vladislav Shpilevoy (6):
> [RAW] swim: introduce SWIM's anti-entropy component
> [RAW] swim: introduce failure detection component
> [RAW] swim: introduce a dissemination component
> [RAW] swim: keep encoded round message cached
> [RAW] swim: send one UDP packet per EV_WRITE event
>
> src/CMakeLists.txt | 3 +-
> src/evio.c | 3 +-
> src/evio.h | 4 +
> src/lib/CMakeLists.txt | 1 +
> src/lib/swim/CMakeLists.txt | 6 +
> src/lib/swim/swim.c | 1868 ++++++++++++++++++++++++++++++
> src/lib/swim/swim.h | 130 ++
> src/lua/init.c | 2 +
> src/lua/swim.c | 243 ++++
> src/lua/swim.h | 47 +
> 10 files changed, 2304 insertions(+), 3 deletions(-)
> create mode 100644 src/lib/swim/CMakeLists.txt
> create mode 100644 src/lib/swim/swim.c
> create mode 100644 src/lib/swim/swim.h
> create mode 100644 src/lua/swim.c
> create mode 100644 src/lua/swim.h
>
^ permalink raw reply [flat|nested] 7+ messages in thread