* [PATCH 1/5] swim: introduce SWIM's anti-entropy component
2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy
@ 2018-12-17 12:53 ` Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 2/5] swim: introduce failure detection component Vladislav Shpilevoy
` (3 subsequent siblings)
4 siblings, 0 replies; 6+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-17 12:53 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
SWIM - Scalable Weakly-consistent Infection-style Process Group
Membership Protocol. It consists of 2 components: events
dissemination and failure detection, and stores in memory a
table of known remote hosts - members. Also some SWIM
implementations have additional component: anti-entropy -
periodical broadcast of a random subset of members table.
Each SWIM component is different from others in both message
structures and goals, they even could be sent in different
messages. But SWIM describes piggybacking of messages: a ping
message can piggyback a dissemination's one. SWIM has a main
operating cycle during which it randomly chooses members from a
member table and sends them events + ping. Answers are
processed out of the main cycle asynchronously.
Random selection provides even network load about ~1 message to
each member regardless of the cluster size. Without randomness
a member would get a network load of N messages each protocol
step, since all other members will choose the same member on
each step where N is the cluster size.
Also SWIM describes a kind of fairness: when selecting a next
member to ping, the protocol prefers LRU members. In code it
would be too complicated, so Tarantool's implementation is
slightly different, easier.
Tarantool splits protocol operation into rounds. At the
beginning of a round all members are randomly reordered and
linked into a list. At each round step a member is popped from
the list head, a message is sent to him, and he waits for a next
round. In such implementation all random selection of the
original SWIM is executed once per round. The round is
'planned' actually. A list is used instead of an array since
new members can be added to its tail without realloc, and dead
members can be removed as easy as that.
Also Tarantool implements third component - anti-entropy. Why
is it needed and even vital? Consider the example: two SWIM
nodes, both are alive. Nothing happens, so the events list is
empty, only pings are being sent periodically. Then a third
node appears. It knows about one of existing nodes. How should
it learn about another one? The cluster is stable, no new
events, so the only chance is to wait until another server
stops and event about it will be broadcasted. Anti-entropy is
an extra simple component, it just piggybacks random part of
members table with each regular ping. In the example above the
new node will learn about the third one via anti-entropy
messages of the second one.
This commit introduces the first component - anti-entropy. With
this component a member can discover other members, but can not
detect who has already dead. It is a part of next commit.
Part of #3234
---
src/CMakeLists.txt | 3 +-
src/lib/CMakeLists.txt | 1 +
src/lib/swim/CMakeLists.txt | 6 +
src/lib/swim/swim.c | 938 ++++++++++++++++++++++++++++++++++++
src/lib/swim/swim.h | 83 ++++
src/lua/init.c | 2 +
src/lua/swim.c | 151 ++++++
src/lua/swim.h | 47 ++
test/swim/app.lua | 15 +
test/swim/basic.result | 179 +++++++
test/swim/basic.test.lua | 61 +++
test/swim/suite.ini | 6 +
test/swim/swim_utils.lua | 9 +
13 files changed, 1500 insertions(+), 1 deletion(-)
create mode 100644 src/lib/swim/CMakeLists.txt
create mode 100644 src/lib/swim/swim.c
create mode 100644 src/lib/swim/swim.h
create mode 100644 src/lua/swim.c
create mode 100644 src/lua/swim.h
create mode 100644 test/swim/app.lua
create mode 100644 test/swim/basic.result
create mode 100644 test/swim/basic.test.lua
create mode 100644 test/swim/suite.ini
create mode 100644 test/swim/swim_utils.lua
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04de5ad04..785051966 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -182,6 +182,7 @@ set (server_sources
lua/crypto.c
lua/httpc.c
lua/utf8.c
+ lua/swim.c
lua/info.c
${lua_sources}
${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc
@@ -228,7 +229,7 @@ endif()
set_source_files_compile_flags(${server_sources})
add_library(server STATIC ${server_sources})
-target_link_libraries(server core bit uri uuid ${ICU_LIBRARIES})
+target_link_libraries(server core bit uri uuid swim ${ICU_LIBRARIES})
# Rule of thumb: if exporting a symbol from a static library, list the
# library here.
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 98ff19b60..4e21e7da8 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -5,6 +5,7 @@ add_subdirectory(small)
add_subdirectory(salad)
add_subdirectory(csv)
add_subdirectory(json)
+add_subdirectory(swim)
if(ENABLE_BUNDLED_MSGPUCK)
add_subdirectory(msgpuck EXCLUDE_FROM_ALL)
endif()
diff --git a/src/lib/swim/CMakeLists.txt b/src/lib/swim/CMakeLists.txt
new file mode 100644
index 000000000..fcdfcf9e1
--- /dev/null
+++ b/src/lib/swim/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(lib_sources
+ swim.c
+)
+
+set_source_files_compile_flags(${lib_sources})
+add_library(swim STATIC ${lib_sources})
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
new file mode 100644
index 000000000..7e5d0eb9e
--- /dev/null
+++ b/src/lib/swim/swim.c
@@ -0,0 +1,938 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim.h"
+#include "evio.h"
+#include "uri.h"
+#include "assoc.h"
+#include "fiber.h"
+#include "small/rlist.h"
+#include "msgpuck.h"
+#include "info.h"
+#include <arpa/inet.h>
+
+/**
+ * Possible optimizations:
+ * - track hash table versions and do not resend when a received
+ * already knows your version.
+ * - on small updates send to another node only updates since a
+ * version. On rare updates it can dramatically reduce message
+ * size and its encoding time.
+ * - do not send self.
+ * - cache encoded batch.
+ * - refute immediately.
+ * - indirect ping.
+ * - increment own incarnation on each round.
+ * - attach dst incarnation to ping.
+ */
+
+/**
+ * SWIM - Scalable Weakly-consistent Infection-style Process Group
+ * Membership Protocol. It consists of 2 components: events
+ * dissemination and failure detection, and stores in memory a
+ * table of known remote hosts - members. Also some SWIM
+ * implementations have an additional component: anti-entropy -
+ * periodical broadcast of a random subset of members table.
+ *
+ * Each SWIM component is different from others in both message
+ * structures and goals, they even could be sent in different
+ * messages. But SWIM describes piggybacking of messages: a ping
+ * message can piggyback a dissemination's one. SWIM has a main
+ * operating cycle during which it randomly chooses members from a
+ * member table and sends them events + ping. Answers are
+ * processed out of the main cycle asynchronously.
+ *
+ * Random selection provides even network load about ~1 message to
+ * each member regardless of the cluster size. Without randomness
+ * a member would get a network load of N messages each protocol
+ * step, since all other members will choose the same member on
+ * each step where N is the cluster size.
+ *
+ * Also SWIM describes a kind of fairness: when selecting a next
+ * member to ping, the protocol prefers LRU members. In code it
+ * would too complicated, so Tarantool's implementation is
+ * slightly different, easier.
+ *
+ * Tarantool splits protocol operation into rounds. At the
+ * beginning of a round all members are randomly reordered and
+ * linked into a list. At each round step a member is popped from
+ * the list head, a message is sent to him, and he waits for the
+ * next round. In such implementation all random selection of the
+ * original SWIM is executed once per round. The round is
+ * 'planned' actually. A list is used instead of an array since
+ * new members can be added to its tail without realloc, and dead
+ * members can be removed as easy as that.
+ *
+ * Also Tarantool implements third component - anti-entropy. Why
+ * is it needed and even vital? Consider the example: two SWIM
+ * nodes, both are alive. Nothing happens, so the events list is
+ * empty, only pings are being sent periodically. Then a third
+ * node appears. It knows about one of existing nodes. How should
+ * it learn about another one? Sure, its known counterpart can try
+ * to notify another one, but it is UDP, so this event can lost.
+ * Anti-entropy is an extra simple component, it just piggybacks
+ * random part of members table with each regular ping. In the
+ * example above the new node will learn about the third one via
+ * anti-entropy messages of the second one soon or late.
+ */
+
+/**
+ * Global hash of all known members of the cluster. Hash key is
+ * bitwise combination of ip and port, value is a struct member,
+ * describing a remote instance. The only purpose of such strange
+ * hash function is to be able to reuse mh_i64ptr_t instead of
+ * introducing one more implementation of mhash.
+ *
+ * Discovered members live here until they are unavailable - in
+ * such a case they are removed from the hash. But a subset of
+ * members are pinned - the ones added via SWIM configuration.
+ * When a member is pinned, it can not be removed from the hash,
+ * and the module will ping him constantly.
+ */
+static struct mh_i64ptr_t *members = NULL;
+
+static inline uint64_t
+sockaddr_in_hash(const struct sockaddr_in *a)
+{
+ return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
+}
+
+/**
+ * Each SWIM component in a common case independently may want to
+ * push some data into the network. Dissemination sends events,
+ * failure detection sends pings, acks. Anti-entropy sends member
+ * tables. The intention to send a data is called IO task and is
+ * stored in a queue that is dispatched when output is possible.
+ */
+typedef void (*swim_io_task_f)(void);
+
+struct swim_io_task {
+ swim_io_task_f cb;
+ struct rlist in_queue_output;
+};
+
+enum swim_member_status {
+ /**
+ * The instance is ok, it responds to requests, sends its
+ * members table.
+ */
+ MEMBER_ALIVE = 0,
+ swim_member_status_MAX,
+};
+
+static const char *swim_member_status_strs[] = {
+ "alive",
+};
+
+/**
+ * A cluster member description. This structure describes the
+ * last known state of an instance, that is updated periodically
+ * via UDP according to SWIM protocol.
+ */
+struct swim_member {
+ /**
+ * Member status. Since the communication goes via UDP,
+ * actual status can be different, as well as different on
+ * other SWIM nodes. But SWIM guarantees that each member
+ * will learn a real status of an instance sometime.
+ */
+ enum swim_member_status status;
+ /**
+ * Address of the instance to which send UDP packets.
+ * Unique identifier of the member.
+ */
+ struct sockaddr_in addr;
+ /**
+ * Position in a queue of members in the current round.
+ */
+ struct rlist in_queue_round;
+};
+
+/**
+ * This node. Used to do not send messages to self, it's
+ * meaningless.
+ */
+static struct swim_member *self = NULL;
+
+/**
+ * Main round messages can carry merged failure detection
+ * messages and anti-entropy. With these keys the components can
+ * be distinguished from each other.
+ */
+enum swim_component_type {
+ SWIM_ANTI_ENTROPY = 0,
+};
+
+/** {{{ Anti-entropy component */
+
+/**
+ * Attributes of each record of a broadcasted member table. Just
+ * the same as some of struct swim_member attributes.
+ */
+enum swim_member_key {
+ SWIM_MEMBER_STATUS = 0,
+ /**
+ * Now can only be IP. But in future UNIX sockets can be
+ * added.
+ */
+ SWIM_MEMBER_ADDR,
+ SWIM_MEMBER_PORT,
+ swim_member_key_MAX,
+};
+
+/** SWIM anti-entropy MsgPack header template. */
+struct PACKED swim_anti_entropy_header_bin {
+ /** mp_encode_uint(SWIM_ANTI_ENTROPY) */
+ uint8_t k_anti_entropy;
+ /** mp_encode_array() */
+ uint8_t m_anti_entropy;
+ uint32_t v_anti_entropy;
+};
+
+static inline void
+swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
+ int batch_size)
+{
+ header->k_anti_entropy = SWIM_ANTI_ENTROPY;
+ header->m_anti_entropy = 0xdd;
+ header->v_anti_entropy = mp_bswap_u32(batch_size);
+}
+
+/** SWIM member MsgPack template. */
+struct PACKED swim_member_bin {
+ /** mp_encode_map(3) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDR) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+};
+
+static inline void
+swim_member_bin_reset(struct swim_member_bin *header,
+ struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+}
+
+static inline void
+swim_member_bin_create(struct swim_member_bin *header)
+{
+ header->m_header = 0x83;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDR;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+}
+
+/**
+ * Members to which a message should be sent next during this
+ * round.
+ */
+static RLIST_HEAD(queue_round);
+/** Generator of round step events. */
+static struct ev_periodic round_tick;
+
+/**
+ * Single round step task. It is impossible to have multiple
+ * round steps at the same time, so it is static and global.
+ * Other tasks are mainly pings and acks, attached to member
+ * objects and related to them only.
+ */
+static void
+swim_send_round_msg(void);
+
+static struct swim_io_task round_step_task = {
+ /* .cb = */ swim_send_round_msg,
+ /* .in_queue_output = */ RLIST_LINK_INITIALIZER,
+};
+
+/** }}} Anti-entropy component */
+
+/**
+ * SWIM message structure:
+ * {
+ * SWIM_ANTI_ENTROPY: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDR: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port
+ * },
+ * ...
+ * ],
+ * }
+ */
+
+enum {
+ /** How often to send membership messages and pings. */
+ HEARTBEAT_RATE_DEFAULT = 1,
+ /**
+ * Default MTU is 1500. MTU (when IPv4 is used) consists
+ * of IPv4 header, UDP header, Data. IPv4 has 20 bytes
+ * header, UDP - 8 bytes. So Data = 1500 - 20 - 8 = 1472.
+ * TODO: adapt to other MTUs which can be reduced in some
+ * networks by their admins.
+ */
+ UDP_PACKET_SIZE = 1472,
+};
+
+/**
+ * Event dispatcher of incomming messages. Takes them from
+ * network.
+ */
+static struct evio_service input;
+/**
+ * Event dispatcher of outcomming messages. Takes tasks from
+ * queue_output.
+ */
+static struct ev_io output;
+
+/**
+ * An array of members shuffled on each round. Its head it sent
+ * to each member during one round as an anti-entropy message.
+ */
+static struct swim_member **shuffled_members = NULL;
+static int shuffled_members_size = 0;
+
+/** Queue of io tasks ready to push now. */
+static RLIST_HEAD(queue_output);
+
+static inline void
+swim_io_task_push(struct swim_io_task *task)
+{
+ rlist_add_tail_entry(&queue_output, task, in_queue_output);
+ ev_io_start(loop(), &output);
+}
+
+/**
+ * Register a new member with a specified status. Here it is
+ * added to the hash, to the 'next' queue.
+ */
+static struct swim_member *
+swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status)
+{
+ struct swim_member *member =
+ (struct swim_member *) malloc(sizeof(*member));
+ if (member == NULL) {
+ diag_set(OutOfMemory, sizeof(*member), "malloc", "member");
+ return NULL;
+ }
+ member->status = status;
+ member->addr = *addr;
+ struct mh_i64ptr_node_t node;
+ node.key = sockaddr_in_hash(addr);
+ node.val = member;
+ mh_int_t rc = mh_i64ptr_put(members, &node, NULL, NULL);
+ if (rc == mh_end(members)) {
+ free(member);
+ diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
+ return NULL;
+ }
+ rlist_add_entry(&queue_round, member, in_queue_round);
+ return member;
+}
+
+static inline struct swim_member *
+swim_find_member(const struct sockaddr_in *addr)
+{
+ uint64_t hash = sockaddr_in_hash(addr);
+ mh_int_t node = mh_i64ptr_find(members, hash, NULL);
+ if (node == mh_end(members))
+ return NULL;
+ return (struct swim_member *) mh_i64ptr_node(members, node)->val;
+}
+
+/**
+ * Remove the member from all queues, hashes, destroy it and free
+ * the memory.
+ */
+static inline void
+swim_member_delete(struct swim_member *member)
+{
+ uint64_t key = sockaddr_in_hash(&member->addr);
+ mh_int_t rc = mh_i64ptr_find(members, key, NULL);
+ assert(rc != mh_end(members));
+ mh_i64ptr_del(members, rc, NULL);
+ rlist_del_entry(member, in_queue_round);
+ free(member);
+}
+
+/** At the end of each round members table is shuffled. */
+static int
+swim_shuffle_members(void)
+{
+ int new_size = mh_size(members);
+ /* Realloc is too big or too small. */
+ if (shuffled_members_size < new_size ||
+ shuffled_members_size >= new_size * 2) {
+ int size = sizeof(shuffled_members[0]) * new_size;
+ struct swim_member **new =
+ (struct swim_member **) realloc(shuffled_members, size);
+ if (new == NULL) {
+ diag_set(OutOfMemory, size, "realloc", "new");
+ return -1;
+ }
+ shuffled_members = new;
+ shuffled_members_size = new_size;
+ }
+ int i = 0;
+ for (mh_int_t node = mh_first(members), end = mh_end(members);
+ node != end; node = mh_next(members, node), ++i) {
+ shuffled_members[i] = (struct swim_member *)
+ mh_i64ptr_node(members, node)->val;
+ /*
+ * rand_max / (end - start + 1) + 1 - scaled range
+ * of random numbers to save distribution.
+ */
+ int j = rand() / (RAND_MAX / (i + 1) + 1);
+ SWAP(shuffled_members[i], shuffled_members[j]);
+ }
+ return 0;
+}
+
+/**
+ * Shuffle, filter members. Build randomly ordered queue of
+ * addressees. In other words, do all round preparation work.
+ */
+static int
+swim_new_round(void)
+{
+ say_verbose("SWIM: start a new round");
+ if (swim_shuffle_members() != 0)
+ return -1;
+ rlist_create(&queue_round);
+ for (int i = 0; i < shuffled_members_size; ++i) {
+ if (shuffled_members[i] != self) {
+ rlist_add_entry(&queue_round, shuffled_members[i],
+ in_queue_round);
+ }
+ }
+ return 0;
+}
+
+/**
+ * Helper to calculate how many records of one section can fit
+ * into a message.
+ */
+static inline int
+calculate_bin_batch_size(int header_size, int member_size, int avail_size)
+{
+ if (avail_size <= header_size)
+ return 0;
+ return (avail_size - header_size) / member_size;
+}
+
+static int
+swim_encode_round_msg(char *buffer, int size)
+{
+ char *start = buffer;
+ if ((shuffled_members == NULL || rlist_empty(&queue_round)) &&
+ swim_new_round() != 0)
+ return -1;
+ /* 1 - for the root map header. */
+ assert(size > 1);
+ --size;
+ int ae_batch_size = calculate_bin_batch_size(
+ sizeof(struct swim_anti_entropy_header_bin),
+ sizeof(struct swim_member_bin), size);
+ if (ae_batch_size > shuffled_members_size)
+ ae_batch_size = shuffled_members_size;
+
+ buffer = mp_encode_map(buffer, 1);
+
+ struct swim_anti_entropy_header_bin ae_header_bin;
+ swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
+ memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
+ buffer += sizeof(ae_header_bin);
+
+ struct swim_member_bin member_bin;
+ swim_member_bin_create(&member_bin);
+ for (int i = 0; i < ae_batch_size; ++i) {
+ struct swim_member *member = shuffled_members[i];
+ swim_member_bin_reset(&member_bin, member);
+ memcpy(buffer, &member_bin, sizeof(member_bin));
+ buffer += sizeof(member_bin);
+ }
+ return buffer - start;
+}
+
+/**
+ * Do one round step. Send encoded components to a next member
+ * from the queue.
+ */
+static void
+swim_send_round_msg(void)
+{
+ char buffer[UDP_PACKET_SIZE];
+ int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE);
+ if (size < 0) {
+ diag_log();
+ goto end;
+ }
+ /* Possibly empty, if no members but self is specified. */
+ if (rlist_empty(&queue_round))
+ goto end;
+ struct swim_member *m =
+ rlist_first_entry(&queue_round, struct swim_member,
+ in_queue_round);
+ say_verbose("SWIM: send to %s",
+ sio_strfaddr((struct sockaddr *) &m->addr,
+ sizeof(m->addr)));
+ if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr,
+ sizeof(m->addr)) == -1 && ! sio_wouldblock(errno))
+ diag_log();
+ rlist_del_entry(m, in_queue_round);
+end:
+ ev_periodic_start(loop(), &round_tick);
+}
+
+static void
+swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_WRITE) != 0);
+ (void) events;
+ if (rlist_empty(&queue_output)) {
+ ev_io_stop(loop, io);
+ return;
+ }
+ struct swim_io_task *task =
+ rlist_shift_entry(&queue_output, struct swim_io_task,
+ in_queue_output);
+ task->cb();
+}
+
+/** Once per specified timeout trigger a next broadcast step. */
+static void
+swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) events;
+ swim_io_task_push(&round_step_task);
+ ev_periodic_stop(loop, p);
+}
+
+/**
+ * SWIM member attributes from anti-entropy and dissemination
+ * messages.
+ */
+struct swim_member_def {
+ struct sockaddr_in addr;
+ enum swim_member_status status;
+};
+
+static inline void
+swim_member_def_create(struct swim_member_def *def)
+{
+ def->addr.sin_port = 0;
+ def->addr.sin_addr.s_addr = 0;
+ def->status = MEMBER_ALIVE;
+}
+
+static void
+swim_process_member_update(struct swim_member_def *def)
+{
+ struct swim_member *member = swim_find_member(&def->addr);
+ /*
+ * Trivial processing of a new member - just add it to the
+ * members table.
+ */
+ if (member == NULL) {
+ member = swim_member_new(&def->addr, def->status);
+ if (member == NULL)
+ diag_log();
+ }
+}
+
+static int
+swim_process_member_key(enum swim_member_key key, const char **pos,
+ const char *end, const char *msg_pref,
+ struct swim_member_def *def)
+{
+ switch(key) {
+ case SWIM_MEMBER_STATUS:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member status should be uint", msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_member_status_MAX) {
+ say_error("%s unknown member status", msg_pref);
+ return -1;
+ }
+ def->status = (enum swim_member_status) key;
+ break;
+ case SWIM_MEMBER_ADDR:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member address should be uint", msg_pref);
+ return -1;
+ }
+ def->addr.sin_addr.s_addr = mp_decode_uint(pos);
+ break;
+ case SWIM_MEMBER_PORT:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member port should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t port = mp_decode_uint(pos);
+ if (port > UINT16_MAX) {
+ say_error("%s member port is invalid", msg_pref);
+ return -1;
+ }
+ def->addr.sin_port = port;
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
+/** Decode an anti-entropy message, update members table. */
+static int
+swim_process_anti_entropy(const char **pos, const char *end)
+{
+ const char *msg_pref = "Invalid SWIM anti-entropy message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s member should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown member key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(&def);
+ }
+ return 0;
+}
+
+/** Receive and process a new message. */
+static void
+swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_READ) != 0);
+ (void) events;
+ (void) loop;
+ const char *msg_pref = "Invalid SWIM message:";
+ struct sockaddr_in addr;
+ socklen_t len = sizeof(addr);
+ char buffer[UDP_PACKET_SIZE];
+ ssize_t size = sio_recvfrom(io->fd, buffer, sizeof(buffer), 0,
+ (struct sockaddr *) &addr, &len);
+ if (size == -1) {
+ if (! sio_wouldblock(errno))
+ diag_log();
+ return;
+ }
+ say_verbose("SWIM: received from %s",
+ sio_strfaddr((struct sockaddr *) &addr, len));
+ const char *pos = buffer;
+ const char *end = pos + size;
+ if (mp_typeof(*pos) != MP_MAP || mp_check_map(pos, end) > 0) {
+ say_error("%s expected map header", msg_pref);
+ return;
+ }
+ uint64_t map_size = mp_decode_map(&pos);
+ for (uint64_t i = 0; i < map_size; ++i) {
+ if (mp_typeof(*pos) != MP_UINT || mp_check_uint(pos, end) > 0) {
+ say_error("%s header should contain uint keys",
+ msg_pref);
+ return;
+ }
+ uint64_t key = mp_decode_uint(&pos);
+ switch(key) {
+ case SWIM_ANTI_ENTROPY:
+ say_verbose("SWIM: process anti-entropy");
+ if (swim_process_anti_entropy(&pos, end) != 0)
+ return;
+ break;
+ default:
+ say_error("%s unknown component type component is "\
+ "supported", msg_pref);
+ return;
+ }
+ }
+}
+
+/**
+ * Convert a string URI like "ip:port" to sockaddr_in structure.
+ */
+static int
+uri_to_addr(const char *str, struct sockaddr_in *addr)
+{
+ struct uri uri;
+ if (uri_parse(&uri, str) != 0 || uri.service == NULL)
+ goto invalid_uri;
+ in_addr_t iaddr;
+ if (uri.host_len == strlen(URI_HOST_UNIX) &&
+ memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) {
+ diag_set(IllegalParams, "Unix sockets are not supported");
+ return -1;
+ }
+ if (uri.host_len == 0) {
+ iaddr = htonl(INADDR_ANY);
+ } else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) {
+ iaddr = htonl(INADDR_LOOPBACK);
+ } else {
+ iaddr = inet_addr(tt_cstr(uri.host, uri.host_len));
+ if (iaddr == (in_addr_t) -1)
+ goto invalid_uri;
+ }
+ int port = htons(atoi(uri.service));
+ memset(addr, 0, sizeof(*addr));
+ addr->sin_family = AF_INET;
+ addr->sin_addr.s_addr = iaddr;
+ addr->sin_port = port;
+ return 0;
+
+invalid_uri:
+ diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str);
+ return -1;
+}
+
+static inline void
+swim_init_input(struct evio_service *service)
+{
+ evio_service_init_udp(loop(), service, "swim", swim_on_input);
+}
+
+/**
+ * Initialize the module. By default, the module is turned off and
+ * does nothing. To start SWIM swim_cfg is used.
+ */
+static int
+swim_init(void)
+{
+ members = mh_i64ptr_new();
+ if (members == NULL) {
+ diag_set(OutOfMemory, sizeof(*members), "malloc",
+ "members");
+ return -1;
+ }
+ swim_init_input(&input);
+ ev_init(&output, swim_on_output);
+ ev_init(&round_tick, swim_trigger_round_step);
+ ev_periodic_set(&round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL);
+ return 0;
+}
+
+int
+swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
+ double heartbeat_rate)
+{
+ if (members == NULL && swim_init() != 0)
+ return -1;
+ struct sockaddr_in addr;
+ struct swim_member **new_cfg;
+ struct swim_member *new_self = self;
+ enum swim_member_status new_status = swim_member_status_MAX;
+ if (member_uri_count > 0) {
+ int size = sizeof(new_cfg[0]) * member_uri_count;
+ new_cfg = (struct swim_member **) malloc(size);
+ if (new_cfg == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "new_cfg");
+ return -1;
+ }
+ }
+ int new_cfg_size = 0;
+ for (; new_cfg_size < member_uri_count; ++new_cfg_size) {
+ if (uri_to_addr(member_uris[new_cfg_size], &addr) != 0)
+ goto error;
+ struct swim_member *member = swim_find_member(&addr);
+ if (member == NULL) {
+ member = swim_member_new(&addr, new_status);
+ if (member == NULL)
+ goto error;
+ }
+ new_cfg[new_cfg_size] = member;
+ }
+
+ if (server_uri != NULL) {
+ if (uri_to_addr(server_uri, &addr) != 0)
+ goto error;
+ struct sockaddr_in cur_addr;
+ socklen_t addrlen = sizeof(cur_addr);
+
+ if (!evio_service_is_active(&input) ||
+ getsockname(input.ev.fd, (struct sockaddr *) &cur_addr,
+ &addrlen) != 0 ||
+ addr.sin_addr.s_addr != cur_addr.sin_addr.s_addr ||
+ addr.sin_port != cur_addr.sin_port) {
+
+ new_self = swim_find_member(&addr);
+ if (new_self == NULL) {
+ new_self = swim_member_new(&addr, new_status);
+ if (new_self == NULL)
+ goto error;
+ }
+ struct evio_service new_input;
+ swim_init_input(&new_input);
+ if (evio_service_bind(&new_input, server_uri) != 0) {
+ evio_service_stop(&new_input);
+ goto error;
+ }
+ evio_service_stop(&input);
+ input = new_input;
+ ev_io_set(&output, input.ev.fd, EV_WRITE);
+ evio_service_start(&input);
+ ev_periodic_start(loop(), &round_tick);
+ }
+ }
+
+ if (round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
+ ev_periodic_set(&round_tick, 0, heartbeat_rate, NULL);
+
+ if (member_uri_count > 0) {
+ for (int i = 0; i < new_cfg_size; ++i)
+ new_cfg[i]->status = MEMBER_ALIVE;
+ free(new_cfg);
+ }
+ if (new_self != NULL && new_self->status == new_status)
+ new_self->status = MEMBER_ALIVE;
+ self = new_self;
+ return 0;
+
+error:
+ for (int i = 0; i < new_cfg_size; ++i) {
+ if (new_cfg[i]->status == new_status) {
+ swim_member_delete(new_cfg[i]);
+ if (new_self == new_cfg[i])
+ new_self = NULL;
+ }
+ }
+ if (member_uri_count > 0)
+ free(new_cfg);
+ if (new_self != NULL && new_self->status == new_status)
+ swim_member_delete(new_self);
+ return -1;
+}
+
+void
+swim_info(struct info_handler *info)
+{
+ info_begin(info);
+ if (members == NULL)
+ return;
+ for (mh_int_t node = mh_first(members), end = mh_end(members);
+ node != end; node = mh_next(members, node)) {
+ struct swim_member *member = (struct swim_member *)
+ mh_i64ptr_node(members, node)->val;
+ info_table_begin(info,
+ sio_strfaddr((struct sockaddr *) &member->addr,
+ sizeof(member->addr)));
+ info_append_str(info, "status",
+ swim_member_status_strs[member->status]);
+ info_table_end(info);
+ }
+ info_end(info);
+}
+
+void
+swim_stop(void)
+{
+ if (members == NULL)
+ return;
+ ev_io_stop(loop(), &output);
+ evio_service_stop(&input);
+ ev_periodic_stop(loop(), &round_tick);
+ mh_int_t node = mh_first(members);
+ while (node != mh_end(members)) {
+ struct swim_member *m = (struct swim_member *)
+ mh_i64ptr_node(members, node)->val;
+ swim_member_delete(m);
+ node = mh_first(members);
+ }
+ mh_i64ptr_delete(members);
+ free(shuffled_members);
+
+ members = NULL;
+ shuffled_members = NULL;
+ shuffled_members_size = 0;
+ rlist_create(&queue_output);
+ rlist_create(&queue_round);
+}
+
+#ifndef NDEBUG
+/**
+ * Test utils to make some things faster and to trigger an event,
+ * simulate an error.
+ */
+
+void
+swim_debug_round_step(void)
+{
+ ev_feed_event(loop(), &round_tick, EV_PERIODIC);
+}
+
+#endif
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
new file mode 100644
index 000000000..a802d0ac7
--- /dev/null
+++ b/src/lib/swim/swim.h
@@ -0,0 +1,83 @@
+#ifndef TARANTOOL_SWIM_H_INCLUDED
+#define TARANTOOL_SWIM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct info_handler;
+
+/**
+ * Configure or reconfigure the module.
+ *
+ * @param member_uris An array of member URIs in the format
+ * "ip:port".
+ * @param member_uri_count Length of @member_uris.
+ * @param server_uri A URI in the format "ip:port".
+ * @param heartbeat_rate Rate of broadcasting messages. It does
+ * mean that each member will be checked each
+ * @heartbeat_rate seconds. It is rather the protocol
+ * speed. Protocol period depends on member count and
+ * broadcast batch.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
+ double heartbeat_rate);
+
+/**
+ * Stop listening and broadcasting messages, cleanup all internal
+ * structures, free memory. Note, that swim_cfg/swim_stop
+ * combination can be called many times.
+ */
+void
+swim_stop(void);
+
+void
+swim_info(struct info_handler *info);
+
+#ifndef NDEBUG
+
+/** Trigger next round step right now. */
+void
+swim_debug_round_step(void);
+
+#endif
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* TARANTOOL_SWIM_H_INCLUDED */
diff --git a/src/lua/init.c b/src/lua/init.c
index ca4b47f3a..5b47aa3e3 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -58,6 +58,7 @@
#include "lua/fio.h"
#include "lua/httpc.h"
#include "lua/utf8.h"
+#include "lua/swim.h"
#include "digest.h"
#include <small/ibuf.h>
@@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
tarantool_lua_socket_init(L);
tarantool_lua_pickle_init(L);
tarantool_lua_digest_init(L);
+ tarantool_lua_swim_init(L);
luaopen_http_client_driver(L);
lua_pop(L, 1);
luaopen_msgpack(L);
diff --git a/src/lua/swim.c b/src/lua/swim.c
new file mode 100644
index 000000000..c92f2bc8d
--- /dev/null
+++ b/src/lua/swim.c
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "utils.h"
+#include "diag.h"
+#include "swim/swim.h"
+#include "small/ibuf.h"
+#include "lua/info.h"
+#include <info.h>
+
+static int
+lua_swim_cfg(struct lua_State *L)
+{
+ if (lua_gettop(L) != 1)
+ return luaL_error(L, "Usage: swim.cfg({<config>})");
+ lua_getfield(L, 1, "members");
+ const char **member_uris;
+ int member_count;
+ if (lua_istable(L, -1)) {
+ member_count = lua_objlen(L, -1);
+ if (member_count > 0) {
+ ibuf_reset(tarantool_lua_ibuf);
+ int size = sizeof(member_uris[0]) * member_count;
+ member_uris = ibuf_alloc(tarantool_lua_ibuf, size);
+ if (member_uris == NULL) {
+ diag_set(OutOfMemory, size, "ibuf_alloc",
+ "member_uris");
+ return luaT_error(L);
+ }
+ for (int i = 0; i < member_count; ++i) {
+ lua_rawgeti(L, -1, i + 1);
+ if (! lua_isstring(L, -1)) {
+ return luaL_error(L, "Member should "\
+ "be string URI");
+ }
+ member_uris[i] = lua_tostring(L, -1);
+ lua_pop(L, 1);
+ }
+ }
+ } else if (lua_isnil(L, -1)) {
+ member_uris = NULL;
+ member_count = 0;
+ } else {
+ return luaL_error(L, "Members should be array");
+ }
+ lua_pop(L, 1);
+
+ const char *server_uri;
+ lua_getfield(L, 1, "server");
+ if (lua_isstring(L, -1))
+ server_uri = lua_tostring(L, -1);
+ else if (lua_isnil(L, -1))
+ server_uri = NULL;
+ else
+ return luaL_error(L, "Server should be string URI");
+ lua_pop(L, 1);
+
+ double heartbeat_rate;
+ lua_getfield(L, 1, "heartbeat");
+ if (lua_isnumber(L, -1)) {
+ heartbeat_rate = lua_tonumber(L, -1);
+ if (heartbeat_rate <= 0) {
+ return luaL_error(L, "Heartbeat should be positive "\
+ "number");
+ }
+ } else if (! lua_isnil(L, -1)) {
+ return luaL_error(L, "Heartbeat should be positive number");
+ } else {
+ heartbeat_rate = -1;
+ }
+ lua_pop(L, 1);
+
+ if (swim_cfg(member_uris, member_count, server_uri, heartbeat_rate) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+static int
+lua_swim_stop(struct lua_State *L)
+{
+ (void) L;
+ swim_stop();
+ return 0;
+}
+
+static int
+lua_swim_info(struct lua_State *L)
+{
+ struct info_handler info;
+ luaT_info_handler_create(&info, L);
+ swim_info(&info);
+ return 1;
+}
+
+#ifndef NDEBUG
+
+static int
+lua_swim_debug_round_step(struct lua_State *L)
+{
+ (void) L;
+ swim_debug_round_step();
+ return 0;
+}
+
+#endif
+
+void
+tarantool_lua_swim_init(struct lua_State *L)
+{
+ static const struct luaL_Reg lua_swim_methods [] = {
+ {"cfg", lua_swim_cfg},
+ {"stop", lua_swim_stop},
+ {"info", lua_swim_info},
+ {"debug_round_step", lua_swim_debug_round_step},
+ {NULL, NULL}
+ };
+ luaL_register_module(L, "swim", lua_swim_methods);
+ lua_pop(L, 1);
+};
diff --git a/src/lua/swim.h b/src/lua/swim.h
new file mode 100644
index 000000000..989cf62b3
--- /dev/null
+++ b/src/lua/swim.h
@@ -0,0 +1,47 @@
+#ifndef INCLUDES_TARANTOOL_LUA_SWIM_H
+#define INCLUDES_TARANTOOL_LUA_SWIM_H
+/*
+ * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct lua_State;
+
+void
+tarantool_lua_swim_init(struct lua_State *L);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* INCLUDES_TARANTOOL_LUA_SWIM_H */
diff --git a/test/swim/app.lua b/test/swim/app.lua
new file mode 100644
index 000000000..d4ba66980
--- /dev/null
+++ b/test/swim/app.lua
@@ -0,0 +1,15 @@
+#!/usr/bin/env tarantool
+os = require('os')
+box.cfg{}
+swim = require('swim')
+require('swim_utils')
+listen_uri = os.getenv('LISTEN')
+parsed_uri = require('uri').parse(listen_uri)
+listen_port = parsed_uri.service
+listen_host = parsed_uri.host
+if listen_host == 'localhost' then
+ listen_host = '127.0.0.1'
+ listen_uri = listen_host..':'..listen_port
+end
+require('console').listen(os.getenv('ADMIN'))
+test_run = require('test_run').new()
diff --git a/test/swim/basic.result b/test/swim/basic.result
new file mode 100644
index 000000000..a7ae140d6
--- /dev/null
+++ b/test/swim/basic.result
@@ -0,0 +1,179 @@
+test_run:cmd("push filter ':"..listen_port.."' to ':listen_port'")
+---
+- true
+...
+--
+-- gh-3234: SWIM - Scalable Weakly-consistent Infection-style
+-- Process Group Membership Protocol. Here are some basic tests on
+-- cfg checking, binding to an existing address etc.
+--
+-- Info() and stop() on non-working server.
+swim_info_sorted()
+---
+- []
+...
+swim.stop()
+---
+...
+-- Empty and multiple cfg is ok.
+swim.cfg({})
+---
+- true
+...
+swim.cfg({})
+---
+- true
+...
+swim_info_sorted()
+---
+- []
+...
+swim.stop()
+---
+...
+-- Members without a server is ok.
+members = {'192.168.0.1:3333', '192.168.0.2:3333', '192.168.0.3:3333'}
+---
+...
+swim.cfg({members = members})
+---
+- true
+...
+swim_info_sorted()
+---
+- - - 192.168.0.1:3333
+ - status: alive
+ - - 192.168.0.2:3333
+ - status: alive
+ - - 192.168.0.3:3333
+ - status: alive
+...
+swim.stop()
+---
+...
+swim_info_sorted()
+---
+- []
+...
+swim.cfg({server = listen_uri, members = members})
+---
+- true
+...
+swim_info_sorted()
+---
+- - - 127.0.0.1:listen_port
+ - status: alive
+ - - 192.168.0.1:3333
+ - status: alive
+ - - 192.168.0.2:3333
+ - status: alive
+ - - 192.168.0.3:3333
+ - status: alive
+...
+swim.debug_round_step()
+---
+...
+swim_info_sorted()
+---
+- - - 127.0.0.1:listen_port
+ - status: alive
+ - - 192.168.0.1:3333
+ - status: alive
+ - - 192.168.0.2:3333
+ - status: alive
+ - - 192.168.0.3:3333
+ - status: alive
+...
+swim.stop()
+---
+...
+-- Unix is not supported.
+unix_uri = 'unix/:/tmp/anyfile'
+---
+...
+swim.cfg({server = unix_uri})
+---
+- null
+- Unix sockets are not supported
+...
+-- Invalid server and member URI.
+swim.cfg({server = 'bad uri'})
+---
+- null
+- invalid uri "bad uri", called on fd -1
+...
+swim.cfg({members = {'bad uri'}})
+---
+- null
+- invalid uri "bad uri", called on fd -1
+...
+-- Change server URI without stop.
+swim.cfg({server = listen_uri})
+---
+- true
+...
+swim.cfg({server = listen_uri})
+---
+- true
+...
+swim.debug_round_step()
+---
+...
+swim_info_sorted()
+---
+- - - 127.0.0.1:listen_port
+ - status: alive
+...
+swim.stop()
+---
+...
+-- It is ok to have server URI in members list.
+table.insert(members, listen_uri)
+---
+...
+-- Address in use.
+socket = require('socket')
+---
+...
+s = socket("AF_INET", "SOCK_DGRAM", "udp")
+---
+...
+s:bind(listen_host, listen_port)
+---
+- true
+...
+swim.cfg({server = listen_uri, members = members})
+---
+- null
+- 'swim: failed to bind, called on fd -1'
+...
+swim_info_sorted()
+---
+- []
+...
+s:close()
+---
+- true
+...
+swim.cfg({server = listen_uri, members = members})
+---
+- true
+...
+swim_info_sorted()
+---
+- - - 127.0.0.1:listen_port
+ - status: alive
+ - - 192.168.0.1:3333
+ - status: alive
+ - - 192.168.0.2:3333
+ - status: alive
+ - - 192.168.0.3:3333
+ - status: alive
+...
+swim.stop()
+---
+...
+test_run:cmd("clear filter")
+---
+- true
+...
diff --git a/test/swim/basic.test.lua b/test/swim/basic.test.lua
new file mode 100644
index 000000000..7dc6cb684
--- /dev/null
+++ b/test/swim/basic.test.lua
@@ -0,0 +1,61 @@
+test_run:cmd("push filter ':"..listen_port.."' to ':listen_port'")
+--
+-- gh-3234: SWIM - Scalable Weakly-consistent Infection-style
+-- Process Group Membership Protocol. Here are some basic tests on
+-- cfg checking, binding to an existing address etc.
+--
+
+-- Info() and stop() on non-working server.
+swim_info_sorted()
+swim.stop()
+
+-- Empty and multiple cfg is ok.
+swim.cfg({})
+swim.cfg({})
+swim_info_sorted()
+swim.stop()
+
+-- Members without a server is ok.
+members = {'192.168.0.1:3333', '192.168.0.2:3333', '192.168.0.3:3333'}
+swim.cfg({members = members})
+swim_info_sorted()
+swim.stop()
+swim_info_sorted()
+
+swim.cfg({server = listen_uri, members = members})
+swim_info_sorted()
+
+swim.debug_round_step()
+swim_info_sorted()
+swim.stop()
+
+-- Unix is not supported.
+unix_uri = 'unix/:/tmp/anyfile'
+swim.cfg({server = unix_uri})
+
+-- Invalid server and member URI.
+swim.cfg({server = 'bad uri'})
+swim.cfg({members = {'bad uri'}})
+
+-- Change server URI without stop.
+swim.cfg({server = listen_uri})
+swim.cfg({server = listen_uri})
+swim.debug_round_step()
+swim_info_sorted()
+swim.stop()
+
+-- It is ok to have server URI in members list.
+table.insert(members, listen_uri)
+
+-- Address in use.
+socket = require('socket')
+s = socket("AF_INET", "SOCK_DGRAM", "udp")
+s:bind(listen_host, listen_port)
+swim.cfg({server = listen_uri, members = members})
+swim_info_sorted()
+s:close()
+swim.cfg({server = listen_uri, members = members})
+swim_info_sorted()
+
+swim.stop()
+test_run:cmd("clear filter")
diff --git a/test/swim/suite.ini b/test/swim/suite.ini
new file mode 100644
index 000000000..c33405d64
--- /dev/null
+++ b/test/swim/suite.ini
@@ -0,0 +1,6 @@
+[default]
+core = tarantool
+description = swim tests
+script = app.lua
+is_parallel = True
+lua_libs = swim_utils.lua
diff --git a/test/swim/swim_utils.lua b/test/swim/swim_utils.lua
new file mode 100644
index 000000000..583206257
--- /dev/null
+++ b/test/swim/swim_utils.lua
@@ -0,0 +1,9 @@
+function swim_info_sorted()
+ local t = swim.info()
+ local keys = {}
+ for k, _ in pairs(t) do table.insert(keys, k) end
+ table.sort(keys)
+ local res = {}
+ for _, k in pairs(keys) do table.insert(res, {k, t[k]}) end
+ return setmetatable(res, {__index = t})
+end
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH 2/5] swim: introduce failure detection component
2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 1/5] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
@ 2018-12-17 12:53 ` Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 3/5] swim: introduce a dissemination component Vladislav Shpilevoy
` (2 subsequent siblings)
4 siblings, 0 replies; 6+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-17 12:53 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Failure detection components allows to find which
members are already dead.
Part of #3234
---
src/lib/swim/swim.c | 484 +++++++++++++++++++++++++++++++++++++++--
test/swim/basic.result | 16 ++
2 files changed, 478 insertions(+), 22 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 7e5d0eb9e..6b2f1ca0c 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -40,17 +40,15 @@
/**
* Possible optimizations:
- * - track hash table versions and do not resend when a received
- * already knows your version.
- * - on small updates send to another node only updates since a
- * version. On rare updates it can dramatically reduce message
- * size and its encoding time.
+ *
+ * Optional:
* - do not send self.
* - cache encoded batch.
* - refute immediately.
* - indirect ping.
* - increment own incarnation on each round.
* - attach dst incarnation to ping.
+ * - fix swim_member_bin mp_encode_map(2) to 3 in the first patch.
*/
/**
@@ -131,24 +129,45 @@ sockaddr_in_hash(const struct sockaddr_in *a)
* tables. The intention to send a data is called IO task and is
* stored in a queue that is dispatched when output is possible.
*/
-typedef void (*swim_io_task_f)(void);
+struct swim_io_task;
+
+typedef void (*swim_io_task_f)(struct swim_io_task *);
struct swim_io_task {
swim_io_task_f cb;
struct rlist in_queue_output;
};
+static inline void
+swim_io_task_create(struct swim_io_task *task, swim_io_task_f cb)
+{
+ task->cb = cb;
+ rlist_create(&task->in_queue_output);
+}
+
+static inline void
+swim_io_task_destroy(struct swim_io_task *task)
+{
+ rlist_del_entry(task, in_queue_output);
+}
+
enum swim_member_status {
/**
* The instance is ok, it responds to requests, sends its
* members table.
*/
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership, if it is not pinned.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
static const char *swim_member_status_strs[] = {
"alive",
+ "dead",
};
/**
@@ -173,6 +192,38 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * True, if the member is configured explicitly and can
+ * not disappear from the membership.
+ */
+ bool is_pinned;
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row. After
+ * a threshold the instance is marked as dead. After more
+ * it is removed from the table (if not pinned).
+ */
+ int failed_pings;
+ /** When the last ping was sent. */
+ double ping_ts;
+ /**
+ * Ready at hand ack task to send when this member has
+ * sent ping to us.
+ */
+ struct swim_io_task ack_task;
+ /**
+ * Ready at hand ping task to send when this member too
+ * long does not respond to an initial ping, piggybacked
+ * with members table.
+ */
+ struct swim_io_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct rlist in_queue_wait_ack;
};
/**
@@ -188,8 +239,93 @@ static struct swim_member *self = NULL;
*/
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
+ SWIM_FAILURE_DETECTION,
+};
+
+/** {{{ Failure detection component */
+
+/** Possible failure detection keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
};
+/**
+ * Failure detection message now has only two types: ping or ack.
+ * Indirect ping/ack are todo.
+ */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+static const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
+};
+
+/** SWIM failure detection MsgPack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(self->incarnation);
+}
+
+/**
+ * Members waiting for an ACK. On too long absence of ACK a member
+ * is considered to be dead and is removed. The list is sorted by
+ * time in ascending order (tail is newer, head is older).
+ */
+static RLIST_HEAD(queue_wait_ack);
+/** Generator of ack checking events. */
+static struct ev_periodic wait_ack_tick;
+
+static void
+swim_member_schedule_ack_wait(struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_wait_ack)) {
+ member->ping_ts = fiber_time();
+ rlist_add_tail_entry(&queue_wait_ack, member,
+ in_queue_wait_ack);
+ }
+}
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -204,6 +340,7 @@ enum swim_member_key {
*/
SWIM_MEMBER_ADDR,
SWIM_MEMBER_PORT,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -227,7 +364,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
/** SWIM member MsgPack template. */
struct PACKED swim_member_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -246,6 +383,12 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(addr.sin_port) */
uint8_t m_port;
uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
static inline void
@@ -255,17 +398,20 @@ swim_member_bin_reset(struct swim_member_bin *header,
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
}
static inline void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x83;
+ header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDR;
header->m_addr = 0xce;
header->k_port = SWIM_MEMBER_PORT;
header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
/**
@@ -283,7 +429,7 @@ static struct ev_periodic round_tick;
* objects and related to them only.
*/
static void
-swim_send_round_msg(void);
+swim_send_round_msg(struct swim_io_task *task);
static struct swim_io_task round_step_task = {
/* .cb = */ swim_send_round_msg,
@@ -295,11 +441,19 @@ static struct swim_io_task round_step_task = {
/**
* SWIM message structure:
* {
+ * SWIM_FAILURE_DETECTION: {
+ * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type,
+ * SWIM_FD_INCARNATION: uint
+ * },
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
* SWIM_MEMBER_ADDR: uint, ip,
- * SWIM_MEMBER_PORT: uint, port
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
* },
* ...
* ],
@@ -317,6 +471,22 @@ enum {
* networks by their admins.
*/
UDP_PACKET_SIZE = 1472,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack.
+ */
+ ACK_TIMEOUT = 1,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a not pinned member confirmed to be dead, it is
+ * removed from the membership after at least this number
+ * of failed pings.
+ */
+ NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
};
/**
@@ -330,6 +500,14 @@ static struct evio_service input;
*/
static struct ev_io output;
+/**
+ * An array of configured members. Used only to easy rollback a
+ * failed reconfiguration.
+ */
+static struct swim_member **cfg = NULL;
+/** Number of configured members. */
+static int cfg_size = 0;
+
/**
* An array of members shuffled on each round. Its head it sent
* to each member during one round as an anti-entropy message.
@@ -347,12 +525,44 @@ swim_io_task_push(struct swim_io_task *task)
ev_io_start(loop(), &output);
}
+static void
+swim_send_ack(struct swim_io_task *task);
+
+static void
+swim_send_ping(struct swim_io_task *task);
+
+/**
+ * Update status of the member if needed. Statuses are compared as
+ * a compound key: {incarnation, status}. So @a new_status can
+ * override an old one only if its incarnation is greater, or the
+ * same, but its status is "bigger". Statuses are compared by
+ * their identifier, so "alive" < "dead". This protects from the
+ * case when a member is detected as dead on one instance, but
+ * overriden by another instance with the same incarnation "alive"
+ * message.
+ */
+static inline void
+swim_member_update_status(struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation)
+{
+ assert(member != self);
+ if (member->incarnation == incarnation) {
+ if (member->status < new_status)
+ member->status = new_status;
+ } else if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ }
+}
+
/**
* Register a new member with a specified status. Here it is
* added to the hash, to the 'next' queue.
*/
static struct swim_member *
-swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status)
+swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
+ uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) malloc(sizeof(*member));
@@ -362,6 +572,10 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status)
}
member->status = status;
member->addr = *addr;
+ member->incarnation = incarnation;
+ member->is_pinned = false;
+ member->failed_pings = 0;
+ member->ping_ts = 0;
struct mh_i64ptr_node_t node;
node.key = sockaddr_in_hash(addr);
node.val = member;
@@ -371,7 +585,10 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status)
diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
return NULL;
}
+ swim_io_task_create(&member->ack_task, swim_send_ack);
+ swim_io_task_create(&member->ping_task, swim_send_ping);
rlist_add_entry(&queue_round, member, in_queue_round);
+ rlist_create(&member->in_queue_wait_ack);
return member;
}
@@ -396,7 +613,10 @@ swim_member_delete(struct swim_member *member)
mh_int_t rc = mh_i64ptr_find(members, key, NULL);
assert(rc != mh_end(members));
mh_i64ptr_del(members, rc, NULL);
+ swim_io_task_destroy(&member->ack_task);
+ swim_io_task_destroy(&member->ping_task);
rlist_del_entry(member, in_queue_round);
+ rlist_del_entry(member, in_queue_wait_ack);
free(member);
}
@@ -472,16 +692,22 @@ swim_encode_round_msg(char *buffer, int size)
if ((shuffled_members == NULL || rlist_empty(&queue_round)) &&
swim_new_round() != 0)
return -1;
- /* 1 - for the root map header. */
- assert(size > 1);
- --size;
+ /* -1 - for the root map header. */
+ assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
+ size -= sizeof(struct swim_fd_header_bin) + 1;
+
int ae_batch_size = calculate_bin_batch_size(
sizeof(struct swim_anti_entropy_header_bin),
sizeof(struct swim_member_bin), size);
if (ae_batch_size > shuffled_members_size)
ae_batch_size = shuffled_members_size;
- buffer = mp_encode_map(buffer, 1);
+ buffer = mp_encode_map(buffer, 2);
+
+ struct swim_fd_header_bin fd_header_bin;
+ swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING);
+ memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin));
+ buffer += sizeof(fd_header_bin);
struct swim_anti_entropy_header_bin ae_header_bin;
swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
@@ -504,8 +730,10 @@ swim_encode_round_msg(char *buffer, int size)
* from the queue.
*/
static void
-swim_send_round_msg(void)
+swim_send_round_msg(struct swim_io_task *task)
{
+ (void) task;
+ assert(task->cb == swim_send_round_msg);
char buffer[UDP_PACKET_SIZE];
int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE);
if (size < 0) {
@@ -524,11 +752,51 @@ swim_send_round_msg(void)
if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr,
sizeof(m->addr)) == -1 && ! sio_wouldblock(errno))
diag_log();
+ swim_member_schedule_ack_wait(m);
rlist_del_entry(m, in_queue_round);
end:
ev_periodic_start(loop(), &round_tick);
}
+/** Send a failure detection message. */
+static void
+swim_send_fd_message(struct swim_member *m, enum swim_fd_msg_type type)
+{
+ char buffer[UDP_PACKET_SIZE];
+ char *pos = mp_encode_map(buffer, 1);
+ struct swim_fd_header_bin header_bin;
+ swim_fd_header_bin_create(&header_bin, type);
+ memcpy(pos, &header_bin, sizeof(header_bin));
+ pos += sizeof(header_bin);
+ assert(pos - buffer <= (int)sizeof(buffer));
+ say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+ sio_strfaddr((struct sockaddr *) &m->addr,
+ sizeof(m->addr)));
+ if (sio_sendto(output.fd, buffer, pos - buffer, 0,
+ (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 &&
+ ! sio_wouldblock(errno))
+ diag_log();
+}
+
+static void
+swim_send_ack(struct swim_io_task *task)
+{
+ assert(task->cb == swim_send_ack);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ack_task);
+ swim_send_fd_message(m, SWIM_FD_MSG_ACK);
+}
+
+static void
+swim_send_ping(struct swim_io_task *task)
+{
+ assert(task->cb == swim_send_ping);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ping_task);
+ swim_send_fd_message(m, SWIM_FD_MSG_PING);
+ swim_member_schedule_ack_wait(m);
+}
+
static void
swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
{
@@ -541,7 +809,7 @@ swim_on_output(struct ev_loop *loop, struct ev_io *io, int events)
struct swim_io_task *task =
rlist_shift_entry(&queue_output, struct swim_io_task,
in_queue_output);
- task->cb();
+ task->cb(task);
}
/** Once per specified timeout trigger a next broadcast step. */
@@ -554,12 +822,42 @@ swim_trigger_round_step(struct ev_loop *loop, struct ev_periodic *p, int events)
ev_periodic_stop(loop, p);
}
+/**
+ * Check for failed pings. A ping is failed if an ack was not
+ * received during ACK_TIMEOUT. A failed ping is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) loop;
+ (void) p;
+ (void) events;
+ struct swim_member *m, *tmp;
+ double current_time = fiber_time();
+ rlist_foreach_entry_safe(m, &queue_wait_ack, in_queue_wait_ack, tmp) {
+ if (current_time - m->ping_ts < ACK_TIMEOUT)
+ break;
+ ++m->failed_pings;
+ if (m->failed_pings >= NO_ACKS_TO_GC) {
+ if (!m->is_pinned)
+ swim_member_delete(m);
+ continue;
+ }
+ if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ m->status = MEMBER_DEAD;
+ swim_io_task_push(&m->ping_task);
+ rlist_del_entry(m, in_queue_wait_ack);
+ }
+}
+
/**
* SWIM member attributes from anti-entropy and dissemination
* messages.
*/
struct swim_member_def {
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -569,6 +867,7 @@ swim_member_def_create(struct swim_member_def *def)
def->addr.sin_port = 0;
def->addr.sin_addr.s_addr = 0;
def->status = MEMBER_ALIVE;
+ def->incarnation = 0;
}
static void
@@ -580,9 +879,34 @@ swim_process_member_update(struct swim_member_def *def)
* members table.
*/
if (member == NULL) {
- member = swim_member_new(&def->addr, def->status);
+ member = swim_member_new(&def->addr, def->status,
+ def->incarnation);
if (member == NULL)
diag_log();
+ return;
+ }
+ if (member != self) {
+ swim_member_update_status(member, def->status,
+ def->incarnation);
+ return;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
}
}
@@ -626,6 +950,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
}
def->addr.sin_port = port;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ def->incarnation = mp_decode_uint(pos);
+ break;
default:
unreachable();
}
@@ -677,6 +1010,90 @@ swim_process_anti_entropy(const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule pings, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(const char **pos, const char *end,
+ const struct sockaddr_in *src)
+{
+ const char *msg_pref = "Invalid SWIM failure detection message:";
+ if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
+ say_error("%s root should be a map", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_map(pos);
+ if (size != 2) {
+ say_error("%s root map should have two keys - message type "\
+ "and incarnation", msg_pref);
+ return -1;
+ }
+ enum swim_fd_msg_type type = swim_fd_msg_type_MAX;
+ uint64_t incarnation = 0;
+ for (int i = 0; i < (int) size; ++i) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s a key should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s message type should be uint",
+ msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_fd_msg_type_MAX) {
+ say_error("%s unknown message type", msg_pref);
+ return -1;
+ }
+ type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ incarnation = mp_decode_uint(pos);
+ break;
+ default:
+ say_error("%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (type == swim_fd_msg_type_MAX) {
+ say_error("%s message type should be specified", msg_pref);
+ return -1;
+ }
+ struct swim_member *sender = swim_find_member(src);
+ if (sender == NULL) {
+ sender = swim_member_new(src, MEMBER_ALIVE, incarnation);
+ if (sender == NULL) {
+ diag_log();
+ return 0;
+ }
+ } else {
+ swim_member_update_status(sender, MEMBER_ALIVE, incarnation);
+ }
+ if (type == SWIM_FD_MSG_PING) {
+ swim_io_task_push(&sender->ack_task);
+ } else {
+ assert(type == SWIM_FD_MSG_ACK);
+ if (incarnation >= sender->incarnation) {
+ sender->failed_pings = 0;
+ rlist_del_entry(&sender->ping_task, in_queue_output);
+ rlist_del_entry(sender, in_queue_wait_ack);
+ }
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -717,6 +1134,12 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
if (swim_process_anti_entropy(&pos, end) != 0)
return;
break;
+ case SWIM_FAILURE_DETECTION:
+ say_verbose("SWIM: process failure detection");
+ if (swim_process_failure_detection(&pos, end,
+ &addr) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -783,7 +1206,9 @@ swim_init(void)
swim_init_input(&input);
ev_init(&output, swim_on_output);
ev_init(&round_tick, swim_trigger_round_step);
+ ev_init(&wait_ack_tick, swim_check_acks);
ev_periodic_set(&round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL);
+ ev_periodic_set(&wait_ack_tick, 0, ACK_TIMEOUT, NULL);
return 0;
}
@@ -811,7 +1236,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
goto error;
struct swim_member *member = swim_find_member(&addr);
if (member == NULL) {
- member = swim_member_new(&addr, new_status);
+ member = swim_member_new(&addr, new_status, 0);
if (member == NULL)
goto error;
}
@@ -832,7 +1257,8 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
new_self = swim_find_member(&addr);
if (new_self == NULL) {
- new_self = swim_member_new(&addr, new_status);
+ new_self = swim_member_new(&addr, new_status,
+ 0);
if (new_self == NULL)
goto error;
}
@@ -847,6 +1273,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
ev_io_set(&output, input.ev.fd, EV_WRITE);
evio_service_start(&input);
ev_periodic_start(loop(), &round_tick);
+ ev_periodic_start(loop(), &wait_ack_tick);
}
}
@@ -854,9 +1281,15 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
ev_periodic_set(&round_tick, 0, heartbeat_rate, NULL);
if (member_uri_count > 0) {
- for (int i = 0; i < new_cfg_size; ++i)
+ for (int i = 0; i < cfg_size; ++i)
+ cfg[i]->is_pinned = false;
+ free(cfg);
+ for (int i = 0; i < new_cfg_size; ++i) {
+ new_cfg[i]->is_pinned = true;
new_cfg[i]->status = MEMBER_ALIVE;
- free(new_cfg);
+ }
+ cfg = new_cfg;
+ cfg_size = new_cfg_size;
}
if (new_self != NULL && new_self->status == new_status)
new_self->status = MEMBER_ALIVE;
@@ -893,6 +1326,8 @@ swim_info(struct info_handler *info)
sizeof(member->addr)));
info_append_str(info, "status",
swim_member_status_strs[member->status]);
+ info_append_int(info, "incarnation",
+ (int64_t) member->incarnation);
info_table_end(info);
}
info_end(info);
@@ -906,6 +1341,7 @@ swim_stop(void)
ev_io_stop(loop(), &output);
evio_service_stop(&input);
ev_periodic_stop(loop(), &round_tick);
+ ev_periodic_stop(loop(), &wait_ack_tick);
mh_int_t node = mh_first(members);
while (node != mh_end(members)) {
struct swim_member *m = (struct swim_member *)
@@ -915,10 +1351,14 @@ swim_stop(void)
}
mh_i64ptr_delete(members);
free(shuffled_members);
+ free(cfg);
members = NULL;
+ cfg = NULL;
+ cfg_size = 0;
shuffled_members = NULL;
shuffled_members_size = 0;
+ rlist_create(&queue_wait_ack);
rlist_create(&queue_output);
rlist_create(&queue_round);
}
diff --git a/test/swim/basic.result b/test/swim/basic.result
index a7ae140d6..7d0131606 100644
--- a/test/swim/basic.result
+++ b/test/swim/basic.result
@@ -43,10 +43,13 @@ swim_info_sorted()
---
- - - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.stop()
---
@@ -63,12 +66,16 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
- - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.debug_round_step()
---
@@ -77,12 +84,16 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
- - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.stop()
---
@@ -123,6 +134,7 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
...
swim.stop()
---
@@ -163,12 +175,16 @@ swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- status: alive
+ incarnation: 0
- - 192.168.0.1:3333
- status: alive
+ incarnation: 0
- - 192.168.0.2:3333
- status: alive
+ incarnation: 0
- - 192.168.0.3:3333
- status: alive
+ incarnation: 0
...
swim.stop()
---
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH 3/5] swim: introduce a dissemination component
2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 1/5] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 2/5] swim: introduce failure detection component Vladislav Shpilevoy
@ 2018-12-17 12:53 ` Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 4/5] swim: introduce "suspected" status Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 5/5] swim: keep encoded round message cached Vladislav Shpilevoy
4 siblings, 0 replies; 6+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-17 12:53 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Dissemination components broadcasts events about member status
updates.
Public API:
swim.cfg({server = <uri>, members = <array of uris>,
heartbeat = <seconds>})
Configures the SWIM module.
@server - URI of UDP server to which other cluster
members will send SWIM messages. It should
have the format "ip:port".
@members - array of URIs explicitly defined by a
user. These members are never deleted from
members table until they are removed from the
configuration explicitly. SWIM downloads from
them their members tables, merges with its
own and repeats.
@heartbeat - how often send a part of members
table to another member. Note, that it is not
how ofter send the whole table, nor how ofter
to send the table to all members. It is only
one step of the protocol.
swim.stop()
Stops the SWIM module: shuts down the server,
closes socket, destroys queues, frees memory.
Note that after it swim.cfg can be called again.
swim.info()
Show info about each known member in the format:
{
["ip:port"] = {
status = <alive/dead>,
incarnation = <growing unsigned number>
}
}
Closes #3234
---
src/lib/swim/swim.c | 237 ++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 231 insertions(+), 6 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 6b2f1ca0c..bbf6b7fd5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -48,7 +48,6 @@
* - indirect ping.
* - increment own incarnation on each round.
* - attach dst incarnation to ping.
- * - fix swim_member_bin mp_encode_map(2) to 3 in the first patch.
*/
/**
@@ -224,6 +223,26 @@ struct swim_member {
struct swim_io_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about member status update. So formally,
+ * this structure already has all the needed attributes.
+ * But also an event somehow should be sent to all members
+ * at least once according to SWIM, so it requires
+ * something like TTL, which decrements on each send. And
+ * a member can not be removed from the global table until
+ * it gets dead and its dissemination TTL is 0, so as to
+ * allow other members learn its dead status.
+ */
+ int dissemination_ttl;
+ /**
+ * Events are put into a queue sorted by event occurrence
+ * time.
+ */
+ struct rlist in_queue_events;
};
/**
@@ -240,6 +259,7 @@ static struct swim_member *self = NULL;
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/** {{{ Failure detection component */
@@ -438,6 +458,92 @@ static struct swim_io_task round_step_task = {
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xdd;
+ header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(4) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDR) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ header->m_header = 0x84;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDR;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+/** Queue of events sorted by occurrence time. */
+static RLIST_HEAD(queue_events);
+static int event_count = 0;
+
+static inline void
+swim_schedule_event(struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_events)) {
+ rlist_add_tail_entry(&queue_events, member, in_queue_events);
+ event_count++;
+ }
+ member->dissemination_ttl = mh_size(members);
+}
+
+/** }}} Dissemination component */
+
/**
* SWIM message structure:
* {
@@ -448,6 +554,18 @@ static struct swim_io_task round_step_task = {
*
* OR/AND
*
+ * SWIM_DISSEMINATION: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDR: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
+ * },
+ * ...
+ * ],
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -531,6 +649,16 @@ swim_send_ack(struct swim_io_task *task);
static void
swim_send_ping(struct swim_io_task *task);
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_is_updated(struct swim_member *member)
+{
+ swim_schedule_event(member);
+}
+
/**
* Update status of the member if needed. Statuses are compared as
* a compound key: {incarnation, status}. So @a new_status can
@@ -548,11 +676,14 @@ swim_member_update_status(struct swim_member *member,
{
assert(member != self);
if (member->incarnation == incarnation) {
- if (member->status < new_status)
+ if (member->status < new_status) {
member->status = new_status;
+ swim_member_is_updated(member);
+ }
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
+ swim_member_is_updated(member);
}
}
@@ -589,6 +720,8 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
swim_io_task_create(&member->ping_task, swim_send_ping);
rlist_add_entry(&queue_round, member, in_queue_round);
rlist_create(&member->in_queue_wait_ack);
+ rlist_create(&member->in_queue_events);
+ swim_schedule_event(member);
return member;
}
@@ -617,6 +750,7 @@ swim_member_delete(struct swim_member *member)
swim_io_task_destroy(&member->ping_task);
rlist_del_entry(member, in_queue_round);
rlist_del_entry(member, in_queue_wait_ack);
+ assert(rlist_empty(&member->in_queue_events));
free(member);
}
@@ -696,19 +830,53 @@ swim_encode_round_msg(char *buffer, int size)
assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
size -= sizeof(struct swim_fd_header_bin) + 1;
+ int diss_batch_size = calculate_bin_batch_size(
+ sizeof(struct swim_diss_header_bin),
+ sizeof(struct swim_event_bin), size);
+ if (diss_batch_size > event_count)
+ diss_batch_size = event_count;
+ size -= sizeof(struct swim_diss_header_bin) -
+ diss_batch_size * sizeof(struct swim_event_bin);
+
int ae_batch_size = calculate_bin_batch_size(
sizeof(struct swim_anti_entropy_header_bin),
sizeof(struct swim_member_bin), size);
if (ae_batch_size > shuffled_members_size)
ae_batch_size = shuffled_members_size;
- buffer = mp_encode_map(buffer, 2);
+ buffer = mp_encode_map(buffer, 1 + (diss_batch_size > 0) +
+ (ae_batch_size > 0));
struct swim_fd_header_bin fd_header_bin;
swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING);
memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin));
buffer += sizeof(fd_header_bin);
+ if (diss_batch_size > 0) {
+ struct swim_diss_header_bin diss_header_bin;
+ swim_diss_header_bin_create(&diss_header_bin, diss_batch_size);
+ memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin));
+ buffer += sizeof(diss_header_bin);
+
+ int i = 0;
+ struct swim_member *member, *tmp;
+ struct swim_event_bin event_bin;
+ swim_event_bin_create(&event_bin);
+ rlist_foreach_entry_safe(member, &queue_events, in_queue_events,
+ tmp) {
+ swim_event_bin_reset(&event_bin, member);
+ memcpy(buffer, &event_bin, sizeof(event_bin));
+ buffer += sizeof(event_bin);
+ rlist_del_entry(member, in_queue_events);
+ --member->dissemination_ttl;
+ if (++i >= diss_batch_size)
+ break;
+ }
+ event_count -= diss_batch_size;
+ }
+
+ if (ae_batch_size == 0)
+ return buffer - start;
struct swim_anti_entropy_header_bin ae_header_bin;
swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
@@ -840,12 +1008,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
break;
++m->failed_pings;
if (m->failed_pings >= NO_ACKS_TO_GC) {
- if (!m->is_pinned)
+ if (!m->is_pinned && m->dissemination_ttl == 0)
swim_member_delete(m);
continue;
}
- if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ if (m->failed_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
+ swim_member_is_updated(m);
+ }
swim_io_task_push(&m->ping_task);
rlist_del_entry(m, in_queue_wait_ack);
}
@@ -1094,6 +1264,50 @@ swim_process_failure_detection(const char **pos, const char *end,
return 0;
}
+static int
+swim_process_dissemination(const char **pos, const char *end)
+{
+ const char *msg_pref = "Invald SWIM dissemination message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s event should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s event key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown event key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(&def);
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -1140,6 +1354,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
&addr) != 0)
return;
break;
+ case SWIM_DISSEMINATION:
+ say_verbose("SWIM: process dissemination");
+ if (swim_process_dissemination(&pos, end) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -1299,6 +1518,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
error:
for (int i = 0; i < new_cfg_size; ++i) {
if (new_cfg[i]->status == new_status) {
+ rlist_del_entry(new_cfg[i], in_queue_events);
swim_member_delete(new_cfg[i]);
if (new_self == new_cfg[i])
new_self = NULL;
@@ -1306,8 +1526,10 @@ error:
}
if (member_uri_count > 0)
free(new_cfg);
- if (new_self != NULL && new_self->status == new_status)
+ if (new_self != NULL && new_self->status == new_status) {
+ rlist_del_entry(new_self, in_queue_events);
swim_member_delete(new_self);
+ }
return -1;
}
@@ -1346,6 +1568,7 @@ swim_stop(void)
while (node != mh_end(members)) {
struct swim_member *m = (struct swim_member *)
mh_i64ptr_node(members, node)->val;
+ rlist_del_entry(m, in_queue_events);
swim_member_delete(m);
node = mh_first(members);
}
@@ -1358,9 +1581,11 @@ swim_stop(void)
cfg_size = 0;
shuffled_members = NULL;
shuffled_members_size = 0;
+ event_count = 0;
rlist_create(&queue_wait_ack);
rlist_create(&queue_output);
rlist_create(&queue_round);
+ rlist_create(&queue_events);
}
#ifndef NDEBUG
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH 4/5] swim: introduce "suspected" status
2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy
` (2 preceding siblings ...)
2018-12-17 12:53 ` [PATCH 3/5] swim: introduce a dissemination component Vladislav Shpilevoy
@ 2018-12-17 12:53 ` Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 5/5] swim: keep encoded round message cached Vladislav Shpilevoy
4 siblings, 0 replies; 6+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-17 12:53 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Now a member dies "gradually". After some failed pings
it is declared as suspected. After more failed pings
it is finaly dead. New members in a config are
declared as suspected because the instance can not
be sure whether they are alive or not.
Follow up #3234
---
src/lib/swim/swim.c | 26 +++++++++++++++++++++++---
test/swim/basic.result | 26 +++++++++++++-------------
2 files changed, 36 insertions(+), 16 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index bbf6b7fd5..df57ef470 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -156,6 +156,12 @@ enum swim_member_status {
* members table.
*/
MEMBER_ALIVE = 0,
+ /**
+ * If a member has not responded to a ping, it is declared
+ * as suspected to be dead. After more failed pings it
+ * is finaly dead.
+ */
+ MEMBER_SUSPECTED,
/**
* The member is considered to be dead. It will disappear
* from the membership, if it is not pinned.
@@ -166,6 +172,7 @@ enum swim_member_status {
static const char *swim_member_status_strs[] = {
"alive",
+ "suspected",
"dead",
};
@@ -596,9 +603,15 @@ enum {
ACK_TIMEOUT = 1,
/**
* If a member has not been responding to pings this
- * number of times, it is considered to be dead.
+ * number of times, it is suspected to be dead. To confirm
+ * the death it should fail more pings.
*/
- NO_ACKS_TO_DEAD = 3,
+ NO_ACKS_TO_SUSPECT = 2,
+ /**
+ * If a member is suspected to be dead, after this number
+ * of failed pings its death is confirmed.
+ */
+ NO_ACKS_TO_DEAD = NO_ACKS_TO_SUSPECT + 2,
/**
* If a not pinned member confirmed to be dead, it is
* removed from the membership after at least this number
@@ -1015,6 +1028,9 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
if (m->failed_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
swim_member_is_updated(m);
+ } else if (m->failed_pings >= NO_ACKS_TO_SUSPECT) {
+ m->status = MEMBER_SUSPECTED;
+ swim_member_is_updated(m);
}
swim_io_task_push(&m->ping_task);
rlist_del_entry(m, in_queue_wait_ack);
@@ -1505,7 +1521,11 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri,
free(cfg);
for (int i = 0; i < new_cfg_size; ++i) {
new_cfg[i]->is_pinned = true;
- new_cfg[i]->status = MEMBER_ALIVE;
+ /*
+ * Real status is unknown, so a new member
+ * can not be alive.
+ */
+ new_cfg[i]->status = MEMBER_SUSPECTED;
}
cfg = new_cfg;
cfg_size = new_cfg_size;
diff --git a/test/swim/basic.result b/test/swim/basic.result
index 7d0131606..f223950a6 100644
--- a/test/swim/basic.result
+++ b/test/swim/basic.result
@@ -42,13 +42,13 @@ swim.cfg({members = members})
swim_info_sorted()
---
- - - 192.168.0.1:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.2:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.3:3333
- - status: alive
+ - status: suspected
incarnation: 0
...
swim.stop()
@@ -68,13 +68,13 @@ swim_info_sorted()
- status: alive
incarnation: 0
- - 192.168.0.1:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.2:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.3:3333
- - status: alive
+ - status: suspected
incarnation: 0
...
swim.debug_round_step()
@@ -86,13 +86,13 @@ swim_info_sorted()
- status: alive
incarnation: 0
- - 192.168.0.1:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.2:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.3:3333
- - status: alive
+ - status: suspected
incarnation: 0
...
swim.stop()
@@ -174,16 +174,16 @@ swim.cfg({server = listen_uri, members = members})
swim_info_sorted()
---
- - - 127.0.0.1:listen_port
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.1:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.2:3333
- - status: alive
+ - status: suspected
incarnation: 0
- - 192.168.0.3:3333
- - status: alive
+ - status: suspected
incarnation: 0
...
swim.stop()
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 6+ messages in thread
* [PATCH 5/5] swim: keep encoded round message cached
2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy
` (3 preceding siblings ...)
2018-12-17 12:53 ` [PATCH 4/5] swim: introduce "suspected" status Vladislav Shpilevoy
@ 2018-12-17 12:53 ` Vladislav Shpilevoy
4 siblings, 0 replies; 6+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-17 12:53 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
During a SWIM round a message is being handed out
consisting of at most 3 sections. Parts of the message
change rarely, by member attributes update, or by
removal of some of them. So it is possible to cache
the message and send it during several round steps in
a row. Or even do not rebuild it the whole round.
Follow up #3234
---
src/lib/swim/swim.c | 75 +++++++++++++++++++++++++++++++++++++--------
1 file changed, 63 insertions(+), 12 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index df57ef470..a04c1646c 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -43,11 +43,11 @@
*
* Optional:
* - do not send self.
- * - cache encoded batch.
* - refute immediately.
* - indirect ping.
* - increment own incarnation on each round.
* - attach dst incarnation to ping.
+ * - fix errors in sio_listen/bind.
*/
/**
@@ -198,6 +198,13 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ * It is true, if the member is being sent in the current
+ * SWIM round, so it is encoded into cached round msg.
+ * When this flag is true, and the member has changed or
+ * removed, the cached round msg is invalidated.
+ */
+ bool is_being_sent_in_this_round;
/**
*
* Failure detection component
@@ -645,6 +652,27 @@ static int cfg_size = 0;
*/
static struct swim_member **shuffled_members = NULL;
static int shuffled_members_size = 0;
+/**
+ * When a new round starts, it builds a new message consisting of
+ * up to 3 components. This message is then being handed out to
+ * the cluster members. Most of time the message remains unchanged
+ * and can be cached to do not rebuild it on each step. It should
+ * be invalidated in 2 cases only: a member, encoded here, has
+ * changed its attributes (incarnation, status); a member, encoded
+ * here, is dead too long and removed.
+ */
+static char *cached_round_msg = NULL;
+/**
+ * Payload size in the cached_round_msg buffer. Capacity is
+ * UDP packet size.
+ */
+static int cached_round_msg_size = 0;
+
+static inline void
+cached_round_msg_invalidate(void)
+{
+ cached_round_msg_size = 0;
+}
/** Queue of io tasks ready to push now. */
static RLIST_HEAD(queue_output);
@@ -670,6 +698,8 @@ static void
swim_member_is_updated(struct swim_member *member)
{
swim_schedule_event(member);
+ if (member->is_being_sent_in_this_round)
+ cached_round_msg_invalidate();
}
/**
@@ -716,6 +746,7 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
}
member->status = status;
member->addr = *addr;
+ member->is_being_sent_in_this_round = false;
member->incarnation = incarnation;
member->is_pinned = false;
member->failed_pings = 0;
@@ -755,6 +786,8 @@ swim_find_member(const struct sockaddr_in *addr)
static inline void
swim_member_delete(struct swim_member *member)
{
+ if (member->is_being_sent_in_this_round)
+ cached_round_msg_invalidate();
uint64_t key = sockaddr_in_hash(&member->addr);
mh_int_t rc = mh_i64ptr_find(members, key, NULL);
assert(rc != mh_end(members));
@@ -797,6 +830,7 @@ swim_shuffle_members(void)
int j = rand() / (RAND_MAX / (i + 1) + 1);
SWAP(shuffled_members[i], shuffled_members[j]);
}
+ cached_round_msg_invalidate();
return 0;
}
@@ -833,12 +867,24 @@ calculate_bin_batch_size(int header_size, int member_size, int avail_size)
}
static int
-swim_encode_round_msg(char *buffer, int size)
+swim_encode_round_msg(void)
{
- char *start = buffer;
if ((shuffled_members == NULL || rlist_empty(&queue_round)) &&
swim_new_round() != 0)
return -1;
+ if (cached_round_msg_size > 0)
+ return 0;
+ if (cached_round_msg == NULL) {
+ cached_round_msg = malloc(UDP_PACKET_SIZE);
+ if (cached_round_msg == NULL) {
+ diag_set(OutOfMemory, UDP_PACKET_SIZE, "malloc",
+ "cached_round_msg");
+ return -1;
+ }
+ }
+ char *buffer = cached_round_msg;
+ int i, size = UDP_PACKET_SIZE;
+
/* -1 - for the root map header. */
assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
size -= sizeof(struct swim_fd_header_bin) + 1;
@@ -871,7 +917,7 @@ swim_encode_round_msg(char *buffer, int size)
memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin));
buffer += sizeof(diss_header_bin);
- int i = 0;
+ i = 0;
struct swim_member *member, *tmp;
struct swim_event_bin event_bin;
swim_event_bin_create(&event_bin);
@@ -888,8 +934,9 @@ swim_encode_round_msg(char *buffer, int size)
event_count -= diss_batch_size;
}
+ i = 0;
if (ae_batch_size == 0)
- return buffer - start;
+ goto end;
struct swim_anti_entropy_header_bin ae_header_bin;
swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
@@ -897,13 +944,18 @@ swim_encode_round_msg(char *buffer, int size)
struct swim_member_bin member_bin;
swim_member_bin_create(&member_bin);
- for (int i = 0; i < ae_batch_size; ++i) {
+ for (; i < ae_batch_size; ++i) {
struct swim_member *member = shuffled_members[i];
+ member->is_being_sent_in_this_round = true;
swim_member_bin_reset(&member_bin, member);
memcpy(buffer, &member_bin, sizeof(member_bin));
buffer += sizeof(member_bin);
}
- return buffer - start;
+end:
+ for (; i < shuffled_members_size; ++i)
+ shuffled_members[i]->is_being_sent_in_this_round = false;
+ cached_round_msg_size = buffer - cached_round_msg;
+ return 0;
}
/**
@@ -915,9 +967,7 @@ swim_send_round_msg(struct swim_io_task *task)
{
(void) task;
assert(task->cb == swim_send_round_msg);
- char buffer[UDP_PACKET_SIZE];
- int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE);
- if (size < 0) {
+ if (swim_encode_round_msg() != 0) {
diag_log();
goto end;
}
@@ -930,8 +980,9 @@ swim_send_round_msg(struct swim_io_task *task)
say_verbose("SWIM: send to %s",
sio_strfaddr((struct sockaddr *) &m->addr,
sizeof(m->addr)));
- if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr,
- sizeof(m->addr)) == -1 && ! sio_wouldblock(errno))
+ if (sio_sendto(output.fd, cached_round_msg, cached_round_msg_size, 0,
+ (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 &&
+ ! sio_wouldblock(errno))
diag_log();
swim_member_schedule_ack_wait(m);
rlist_del_entry(m, in_queue_round);
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 6+ messages in thread