* [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component
2018-12-29 10:14 [PATCH v3 0/6] SWIM draft Vladislav Shpilevoy
@ 2018-12-29 10:14 ` Vladislav Shpilevoy
2019-01-09 9:12 ` [tarantool-patches] " Konstantin Osipov
2019-01-09 11:45 ` [tarantool-patches] " Konstantin Osipov
2018-12-29 10:14 ` [PATCH v3 2/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
` (4 subsequent siblings)
5 siblings, 2 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-29 10:14 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
SWIM - Scalable Weakly-consistent Infection-style Process Group
Membership Protocol. It consists of 2 components: events
dissemination and failure detection, and stores in memory a
table of known remote hosts - members. Also some SWIM
implementations have additional component: anti-entropy -
periodical broadcast of a random subset of members table.
Each SWIM component is different from others in both message
structures and goals, they even could be sent in different
messages. But SWIM describes piggybacking of messages: a ping
message can piggyback a dissemination's one. SWIM has a main
operating cycle during which it randomly chooses members from a
member table and sends them events + ping. Answers are
processed out of the main cycle asynchronously.
Random selection provides even network load about ~1 message to
each member regardless of the cluster size. Without randomness
a member would get a network load of N messages each protocol
step, since all other members will choose the same member on
each step where N is the cluster size.
Also SWIM describes a kind of fairness: when selecting a next
member to ping, the protocol prefers LRU members. In code it
would be too complicated, so Tarantool's implementation is
slightly different, easier.
Tarantool splits protocol operation into rounds. At the
beginning of a round all members are randomly reordered and
linked into a list. At each round step a member is popped from
the list head, a message is sent to him, and he waits for a next
round. In such implementation all random selection of the
original SWIM is executed once per round. The round is
'planned' actually. A list is used instead of an array since
new members can be added to its tail without realloc, and dead
members can be removed as easy as that.
Also Tarantool implements third component - anti-entropy. Why
is it needed and even vital? Consider the example: two SWIM
nodes, both are alive. Nothing happens, so the events list is
empty, only pings are being sent periodically. Then a third
node appears. It knows about one of existing nodes. How should
it learn about another one? The cluster is stable, no new
events, so the only chance is to wait until another server
stops and event about it will be broadcasted. Anti-entropy is
an extra simple component, it just piggybacks random part of
members table with each regular ping. In the example above the
new node will learn about the third one via anti-entropy
messages of the second one.
This commit introduces the first component - anti-entropy. With
this component a member can discover other members, but can not
detect who is already dead. It is a part of next commit.
Part of #3234
---
src/CMakeLists.txt | 3 +-
src/evio.c | 3 +-
src/evio.h | 4 +
src/lib/CMakeLists.txt | 1 +
src/lib/swim/CMakeLists.txt | 7 +
src/lib/swim/swim.c | 823 ++++++++++++++++++++++++++++++++++
src/lib/swim/swim.h | 95 ++++
src/lib/swim/swim_io.c | 163 +++++++
src/lib/swim/swim_io.h | 299 ++++++++++++
src/lib/swim/swim_transport.h | 64 +++
src/lua/init.c | 2 +
src/lua/swim.c | 244 ++++++++++
src/lua/swim.h | 47 ++
13 files changed, 1752 insertions(+), 3 deletions(-)
create mode 100644 src/lib/swim/CMakeLists.txt
create mode 100644 src/lib/swim/swim.c
create mode 100644 src/lib/swim/swim.h
create mode 100644 src/lib/swim/swim_io.c
create mode 100644 src/lib/swim/swim_io.h
create mode 100644 src/lib/swim/swim_transport.h
create mode 100644 src/lua/swim.c
create mode 100644 src/lua/swim.h
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04de5ad04..785051966 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -182,6 +182,7 @@ set (server_sources
lua/crypto.c
lua/httpc.c
lua/utf8.c
+ lua/swim.c
lua/info.c
${lua_sources}
${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc
@@ -228,7 +229,7 @@ endif()
set_source_files_compile_flags(${server_sources})
add_library(server STATIC ${server_sources})
-target_link_libraries(server core bit uri uuid ${ICU_LIBRARIES})
+target_link_libraries(server core bit uri uuid swim ${ICU_LIBRARIES})
# Rule of thumb: if exporting a symbol from a static library, list the
# library here.
diff --git a/src/evio.c b/src/evio.c
index 9ca14c45c..8610dbbe7 100644
--- a/src/evio.c
+++ b/src/evio.c
@@ -129,8 +129,7 @@ evio_setsockopt_client(int fd, int family, int type)
return 0;
}
-/** Set options for server sockets. */
-static int
+int
evio_setsockopt_server(int fd, int family, int type)
{
int on = 1;
diff --git a/src/evio.h b/src/evio.h
index 69d641a60..872a21ab6 100644
--- a/src/evio.h
+++ b/src/evio.h
@@ -157,6 +157,10 @@ evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay)
int
evio_setsockopt_client(int fd, int family, int type);
+/** Set options for server sockets. */
+int
+evio_setsockopt_server(int fd, int family, int type);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 98ff19b60..4e21e7da8 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -5,6 +5,7 @@ add_subdirectory(small)
add_subdirectory(salad)
add_subdirectory(csv)
add_subdirectory(json)
+add_subdirectory(swim)
if(ENABLE_BUNDLED_MSGPUCK)
add_subdirectory(msgpuck EXCLUDE_FROM_ALL)
endif()
diff --git a/src/lib/swim/CMakeLists.txt b/src/lib/swim/CMakeLists.txt
new file mode 100644
index 000000000..0fe8574ce
--- /dev/null
+++ b/src/lib/swim/CMakeLists.txt
@@ -0,0 +1,7 @@
+set(lib_sources
+ swim.c
+ swim_io.c
+)
+
+set_source_files_compile_flags(${lib_sources})
+add_library(swim STATIC ${lib_sources})
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
new file mode 100644
index 000000000..08c377374
--- /dev/null
+++ b/src/lib/swim/swim.c
@@ -0,0 +1,823 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim.h"
+#include "swim_io.h"
+#include "uri.h"
+#include "assoc.h"
+#include "fiber.h"
+#include "msgpuck.h"
+#include "info.h"
+
+/**
+ * SWIM - Scalable Weakly-consistent Infection-style Process Group
+ * Membership Protocol. It consists of 2 components: events
+ * dissemination and failure detection, and stores in memory a
+ * table of known remote hosts - members. Also some SWIM
+ * implementations have an additional component: anti-entropy -
+ * periodical broadcast of a random subset of members table.
+ *
+ * Each SWIM component is different from others in both message
+ * structures and goals, they even could be sent in different
+ * messages. But SWIM describes piggybacking of messages: a ping
+ * message can piggyback a dissemination's one. SWIM has a main
+ * operating cycle during which it randomly chooses members from a
+ * member table and sends them events + ping. Answers are
+ * processed out of the main cycle asynchronously.
+ *
+ * Random selection provides even network load about ~1 message to
+ * each member regardless of the cluster size. Without randomness
+ * a member would get a network load of N messages each protocol
+ * step, since all other members will choose the same member on
+ * each step where N is the cluster size.
+ *
+ * Also SWIM describes a kind of fairness: when selecting a next
+ * member to ping, the protocol prefers LRU members. In code it
+ * would too complicated, so Tarantool's implementation is
+ * slightly different, easier.
+ *
+ * Tarantool splits protocol operation into rounds. At the
+ * beginning of a round all members are randomly reordered and
+ * linked into a list. At each round step a member is popped from
+ * the list head, a message is sent to him, and he waits for the
+ * next round. In such implementation all random selection of the
+ * original SWIM is executed once per round. The round is
+ * 'planned' actually. A list is used instead of an array since
+ * new members can be added to its tail without realloc, and dead
+ * members can be removed as easy as that.
+ *
+ * Also Tarantool implements third component - anti-entropy. Why
+ * is it needed and even vital? Consider the example: two SWIM
+ * nodes, both are alive. Nothing happens, so the events list is
+ * empty, only pings are being sent periodically. Then a third
+ * node appears. It knows about one of existing nodes. How should
+ * it learn about another one? Sure, its known counterpart can try
+ * to notify another one, but it is UDP, so this event can lost.
+ * Anti-entropy is an extra simple component, it just piggybacks
+ * random part of members table with each regular ping. In the
+ * example above the new node will learn about the third one via
+ * anti-entropy messages of the second one soon or late.
+ */
+
+enum {
+ /** How often to send membership messages and pings. */
+ HEARTBEAT_RATE_DEFAULT = 1,
+};
+
+/**
+ * Take a random number not blindly calculating a module, but
+ * scaling random number down the given borders to save
+ * distribution. A result belongs the range [start, end].
+ */
+static inline int
+swim_scaled_rand(int start, int end)
+{
+ assert(end > start);
+ return rand() / (RAND_MAX / (end - start + 1) + 1);
+}
+
+enum swim_member_status {
+ /**
+ * The instance is ok, it responds to requests, sends its
+ * members table.
+ */
+ MEMBER_ALIVE = 0,
+ swim_member_status_MAX,
+};
+
+static const char *swim_member_status_strs[] = {
+ "alive",
+};
+
+/**
+ * A cluster member description. This structure describes the
+ * last known state of an instance, that is updated periodically
+ * via UDP according to SWIM protocol.
+ */
+struct swim_member {
+ /**
+ * Member status. Since the communication goes via UDP,
+ * actual status can be different, as well as different on
+ * other SWIM nodes. But SWIM guarantees that each member
+ * will learn a real status of an instance sometime.
+ */
+ enum swim_member_status status;
+ /**
+ * Address of the instance to which send UDP packets.
+ * Unique identifier of the member.
+ */
+ struct sockaddr_in addr;
+ /**
+ * Position in a queue of members in the current round.
+ */
+ struct rlist in_queue_round;
+};
+
+/**
+ * SWIM instance. Each instance uses its own UDP port. Tarantool
+ * can have multiple SWIMs.
+ */
+struct swim {
+ /**
+ * Global hash of all known members of the cluster. Hash
+ * key is bitwise combination of ip and port, value is a
+ * struct member, describing a remote instance. The only
+ * purpose of such strange hash function is to be able to
+ * reuse mh_i64ptr_t instead of introducing one more
+ * implementation of mhash.
+ *
+ * Discovered members live here until they are
+ * unavailable - in such a case they are removed from the
+ * hash. But a subset of members are pinned - the ones
+ * added explicitly via API. When a member is pinned, it
+ * can not be removed from the hash, and the module will
+ * ping him constantly.
+ */
+ struct mh_i64ptr_t *members;
+ /**
+ * This node. Used to do not send messages to self, it's
+ * meaningless.
+ */
+ struct swim_member *self;
+ /**
+ * Members to which a message should be sent next during
+ * this round.
+ */
+ struct rlist queue_round;
+ /** Generator of round step events. */
+ struct ev_periodic round_tick;
+ /**
+ * Single round step task. It is impossible to have
+ * multiple round steps at the same time, so it is single
+ * and preallocated per SWIM instance.
+ */
+ struct swim_task round_step_task;
+ /** Transport to send/receive data. */
+ const struct swim_transport *transport;
+ /** Scheduler of output requests. */
+ struct swim_scheduler scheduler;
+ /**
+ * An array of members shuffled on each round. Its head it
+ * sent to each member during one round as an
+ * anti-entropy message.
+ */
+ struct swim_member **shuffled_members;
+};
+
+static inline uint64_t
+sockaddr_in_hash(const struct sockaddr_in *a)
+{
+ return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
+}
+
+/**
+ * Main round messages can carry merged failure detection
+ * messages and anti-entropy. With these keys the components can
+ * be distinguished from each other.
+ */
+enum swim_component_type {
+ SWIM_ANTI_ENTROPY = 0,
+};
+
+/** {{{ Anti-entropy component */
+
+/**
+ * Attributes of each record of a broadcasted member table. Just
+ * the same as some of struct swim_member attributes.
+ */
+enum swim_member_key {
+ SWIM_MEMBER_STATUS = 0,
+ /**
+ * Now can only be IP. But in future UNIX sockets can be
+ * added.
+ */
+ SWIM_MEMBER_ADDRESS,
+ SWIM_MEMBER_PORT,
+ swim_member_key_MAX,
+};
+
+/** SWIM anti-entropy MsgPack header template. */
+struct PACKED swim_anti_entropy_header_bin {
+ /** mp_encode_uint(SWIM_ANTI_ENTROPY) */
+ uint8_t k_anti_entropy;
+ /** mp_encode_array() */
+ uint8_t m_anti_entropy;
+ uint32_t v_anti_entropy;
+};
+
+static inline void
+swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
+ int batch_size)
+{
+ header->k_anti_entropy = SWIM_ANTI_ENTROPY;
+ header->m_anti_entropy = 0xdd;
+ header->v_anti_entropy = mp_bswap_u32(batch_size);
+}
+
+/** SWIM member MsgPack template. */
+struct PACKED swim_member_bin {
+ /** mp_encode_map(3) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+};
+
+static inline void
+swim_member_bin_reset(struct swim_member_bin *header,
+ struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+}
+
+static inline void
+swim_member_bin_create(struct swim_member_bin *header)
+{
+ header->m_header = 0x83;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDRESS;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+}
+
+/** }}} Anti-entropy component */
+
+/**
+ * SWIM message structure:
+ * {
+ * SWIM_ANTI_ENTROPY: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDRESS: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port
+ * },
+ * ...
+ * ],
+ * }
+ */
+
+/**
+ * Remove the member from all queues, hashes, destroy it and free
+ * the memory.
+ */
+static void
+swim_member_delete(struct swim *swim, struct swim_member *member)
+{
+ uint64_t key = sockaddr_in_hash(&member->addr);
+ mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
+ assert(rc != mh_end(swim->members));
+ mh_i64ptr_del(swim->members, rc, NULL);
+ rlist_del_entry(member, in_queue_round);
+
+ free(member);
+}
+
+/**
+ * Register a new member with a specified status. Here it is
+ * added to the hash, to the 'next' queue.
+ */
+static struct swim_member *
+swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
+ enum swim_member_status status)
+{
+ struct swim_member *member =
+ (struct swim_member *) calloc(1, sizeof(*member));
+ if (member == NULL) {
+ diag_set(OutOfMemory, sizeof(*member), "calloc", "member");
+ return NULL;
+ }
+ member->status = status;
+ member->addr = *addr;
+ struct mh_i64ptr_node_t node;
+ node.key = sockaddr_in_hash(addr);
+ node.val = member;
+ mh_int_t rc = mh_i64ptr_put(swim->members, &node, NULL, NULL);
+ if (rc == mh_end(swim->members)) {
+ free(member);
+ diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
+ return NULL;
+ }
+ rlist_add_entry(&swim->queue_round, member, in_queue_round);
+
+ return member;
+}
+
+static inline struct swim_member *
+swim_find_member(struct swim *swim, const struct sockaddr_in *addr)
+{
+ uint64_t hash = sockaddr_in_hash(addr);
+ mh_int_t node = mh_i64ptr_find(swim->members, hash, NULL);
+ if (node == mh_end(swim->members))
+ return NULL;
+ return (struct swim_member *) mh_i64ptr_node(swim->members, node)->val;
+}
+
+/** At the end of each round members table is shuffled. */
+static int
+swim_shuffle_members(struct swim *swim)
+{
+ struct mh_i64ptr_t *members = swim->members;
+ struct swim_member **shuffled = swim->shuffled_members;
+ int new_size = mh_size(members);
+ int bsize = sizeof(shuffled[0]) * new_size;
+ struct swim_member **new_shuffled =
+ (struct swim_member **) realloc(shuffled, bsize);
+ if (new_shuffled == NULL) {
+ diag_set(OutOfMemory, bsize, "realloc", "new_shuffled");
+ return -1;
+ }
+ shuffled = new_shuffled;
+ swim->shuffled_members = new_shuffled;
+ int i = 0;
+ for (mh_int_t node = mh_first(members), end = mh_end(members);
+ node != end; node = mh_next(members, node), ++i) {
+ shuffled[i] = (struct swim_member *)
+ mh_i64ptr_node(members, node)->val;
+ int j = swim_scaled_rand(0, i);
+ SWAP(shuffled[i], shuffled[j]);
+ }
+ return 0;
+}
+
+/**
+ * Shuffle, filter members. Build randomly ordered queue of
+ * addressees. In other words, do all round preparation work.
+ */
+static int
+swim_new_round(struct swim *swim)
+{
+ say_verbose("SWIM: start a new round");
+ if (swim_shuffle_members(swim) != 0)
+ return -1;
+ rlist_create(&swim->queue_round);
+ int size = mh_size(swim->members);
+ for (int i = 0; i < size; ++i) {
+ if (swim->shuffled_members[i] != swim->self) {
+ rlist_add_entry(&swim->queue_round,
+ swim->shuffled_members[i],
+ in_queue_round);
+ }
+ }
+ return 0;
+}
+
+/**
+ * Encode anti-entropy header and members data as many as
+ * possible to the end of a last packet.
+ * @retval -1 Error.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
+{
+ struct swim_anti_entropy_header_bin ae_header_bin;
+ struct swim_member_bin member_bin;
+ struct swim_packet *packet = swim_msg_last_packet(msg);
+ if (packet == NULL)
+ return -1;
+ char *header = swim_packet_alloc(packet, sizeof(ae_header_bin));
+ if (header == NULL)
+ return 0;
+ int i = 0;
+
+ swim_member_bin_create(&member_bin);
+ for (; i < (int) mh_size(swim->members); ++i) {
+ char *pos = swim_packet_alloc(packet, sizeof(member_bin));
+ if (pos == NULL)
+ break;
+ struct swim_member *member = swim->shuffled_members[i];
+ swim_member_bin_reset(&member_bin, member);
+ memcpy(pos, &member_bin, sizeof(member_bin));
+ }
+ if (i == 0)
+ return 0;
+ swim_anti_entropy_header_bin_create(&ae_header_bin, i);
+ memcpy(header, &ae_header_bin, sizeof(ae_header_bin));
+ swim_packet_flush(packet);
+ return 1;
+}
+
+/** Encode SWIM components into a sequence of UDP packets. */
+static int
+swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
+{
+ swim_msg_create(msg);
+ struct swim_packet *packet = swim_msg_reserve(msg, 1);
+ if (packet == NULL)
+ return -1;
+ char *header = swim_packet_alloc(packet, 1);
+ int rc, map_size = 0;
+
+ rc = swim_encode_anti_entropy(swim, msg);
+ if (rc < 0)
+ goto error;
+ map_size += rc;
+
+ assert(mp_sizeof_map(map_size) == 1);
+ mp_encode_map(header, map_size);
+ return 0;
+error:
+ swim_msg_destroy(msg);
+ return -1;
+}
+
+/** Once per specified timeout trigger a next broadcast step. */
+static void
+swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ if ((swim->shuffled_members == NULL ||
+ rlist_empty(&swim->queue_round)) && swim_new_round(swim) != 0) {
+ diag_log();
+ return;
+ }
+ /*
+ * Possibly empty, if no members but self are specified.
+ */
+ if (rlist_empty(&swim->queue_round))
+ return;
+
+ struct swim_msg *msg = &swim->round_step_task.msg;
+ if (swim_encode_round_msg(swim, msg) != 0) {
+ diag_log();
+ return;
+ }
+ struct swim_member *m =
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ swim_task_schedule(&swim->round_step_task,
+ swim->transport->send_round_msg, &m->addr);
+ ev_periodic_stop(loop, p);
+}
+
+static void
+swim_round_step_complete(struct swim_task *task)
+{
+ struct swim *swim = container_of(task, struct swim, round_step_task);
+ swim_msg_reset(&task->msg);
+ ev_periodic_start(loop(), &swim->round_tick);
+}
+
+/**
+ * SWIM member attributes from anti-entropy and dissemination
+ * messages.
+ */
+struct swim_member_def {
+ struct sockaddr_in addr;
+ enum swim_member_status status;
+};
+
+static inline void
+swim_member_def_create(struct swim_member_def *def)
+{
+ memset(def, 0, sizeof(*def));
+ def->status = MEMBER_ALIVE;
+}
+
+static void
+swim_process_member_update(struct swim *swim, struct swim_member_def *def)
+{
+ struct swim_member *member = swim_find_member(swim, &def->addr);
+ /*
+ * Trivial processing of a new member - just add it to the
+ * members table.
+ */
+ if (member == NULL) {
+ member = swim_member_new(swim, &def->addr, def->status);
+ if (member == NULL)
+ diag_log();
+ }
+}
+
+static int
+swim_process_member_key(enum swim_member_key key, const char **pos,
+ const char *end, const char *msg_pref,
+ struct swim_member_def *def)
+{
+ switch(key) {
+ case SWIM_MEMBER_STATUS:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member status should be uint", msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_member_status_MAX) {
+ say_error("%s unknown member status", msg_pref);
+ return -1;
+ }
+ def->status = (enum swim_member_status) key;
+ break;
+ case SWIM_MEMBER_ADDRESS:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member address should be uint", msg_pref);
+ return -1;
+ }
+ def->addr.sin_addr.s_addr = mp_decode_uint(pos);
+ break;
+ case SWIM_MEMBER_PORT:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member port should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t port = mp_decode_uint(pos);
+ if (port > UINT16_MAX) {
+ say_error("%s member port is invalid", msg_pref);
+ return -1;
+ }
+ def->addr.sin_port = port;
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
+/** Decode an anti-entropy message, update members table. */
+static int
+swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "Invalid SWIM anti-entropy message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s member should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown member key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(swim, &def);
+ }
+ return 0;
+}
+
+/** Receive and process a new message. */
+static void
+swim_on_input(struct swim_scheduler *scheduler,
+ const struct swim_packet *packet, const struct sockaddr_in *src)
+{
+ (void) src;
+ const char *msg_pref = "Invalid SWIM message:";
+ struct swim *swim = container_of(scheduler, struct swim, scheduler);
+ const char *pos = packet->body;
+ const char *end = packet->pos;
+ if (mp_typeof(*pos) != MP_MAP || mp_check_map(pos, end) > 0) {
+ say_error("%s expected map header", msg_pref);
+ return;
+ }
+ uint64_t map_size = mp_decode_map(&pos);
+ for (uint64_t i = 0; i < map_size; ++i) {
+ if (mp_typeof(*pos) != MP_UINT || mp_check_uint(pos, end) > 0) {
+ say_error("%s header should contain uint keys",
+ msg_pref);
+ return;
+ }
+ uint64_t key = mp_decode_uint(&pos);
+ switch(key) {
+ case SWIM_ANTI_ENTROPY:
+ say_verbose("SWIM: process anti-entropy");
+ if (swim_process_anti_entropy(swim, &pos, end) != 0)
+ return;
+ break;
+ default:
+ say_error("%s unknown component type component is "\
+ "supported", msg_pref);
+ return;
+ }
+ }
+}
+
+/**
+ * Convert a string URI like "ip:port" to sockaddr_in structure.
+ */
+static int
+uri_to_addr(const char *str, struct sockaddr_in *addr)
+{
+ struct uri uri;
+ if (uri_parse(&uri, str) != 0 || uri.service == NULL)
+ goto invalid_uri;
+ in_addr_t iaddr;
+ if (uri.host_len == strlen(URI_HOST_UNIX) &&
+ memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) {
+ diag_set(IllegalParams, "Unix sockets are not supported");
+ return -1;
+ }
+ if (uri.host_len == 0) {
+ iaddr = htonl(INADDR_ANY);
+ } else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) {
+ iaddr = htonl(INADDR_LOOPBACK);
+ } else {
+ iaddr = inet_addr(tt_cstr(uri.host, uri.host_len));
+ if (iaddr == (in_addr_t) -1)
+ goto invalid_uri;
+ }
+ int port = htons(atoi(uri.service));
+ memset(addr, 0, sizeof(*addr));
+ addr->sin_family = AF_INET;
+ addr->sin_addr.s_addr = iaddr;
+ addr->sin_port = port;
+ return 0;
+
+invalid_uri:
+ diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str);
+ return -1;
+}
+
+/**
+ * Initialize the module. By default, the module is turned off and
+ * does nothing. To start SWIM swim_cfg is used.
+ */
+struct swim *
+swim_new(void)
+{
+ struct swim *swim = (struct swim *) calloc(1, sizeof(*swim));
+ if (swim == NULL) {
+ diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim");
+ return NULL;
+ }
+ swim->members = mh_i64ptr_new();
+ if (swim->members == NULL) {
+ free(swim);
+ diag_set(OutOfMemory, sizeof(*swim->members), "malloc",
+ "members");
+ return NULL;
+ }
+ rlist_create(&swim->queue_round);
+ ev_init(&swim->round_tick, swim_round_step_begin);
+ ev_periodic_set(&swim->round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL);
+ swim->round_tick.data = (void *) swim;
+ swim_task_create(&swim->round_step_task, &swim->scheduler,
+ swim_round_step_complete);
+ swim->transport = &swim_udp_transport;
+ swim_scheduler_create(&swim->scheduler, swim_on_input, swim->transport);
+ return swim;
+}
+
+int
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ const struct swim_transport *new_transport)
+{
+ struct sockaddr_in addr;
+ if (uri_to_addr(uri, &addr) != 0)
+ return -1;
+ struct swim_member *new_self = NULL;
+ if (swim_find_member(swim, &addr) == NULL) {
+ new_self = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ if (new_self == NULL)
+ return -1;
+ }
+ if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) {
+ swim_member_delete(swim, new_self);
+ return -1;
+ }
+ ev_periodic_start(loop(), &swim->round_tick);
+
+ if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
+ ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
+
+ swim->self = new_self;
+ if (new_transport != NULL) {
+ swim->transport = new_transport;
+ swim_scheduler_set_transport(&swim->scheduler, new_transport);
+ }
+ return 0;
+}
+
+int
+swim_add_member(struct swim *swim, const char *uri)
+{
+ struct sockaddr_in addr;
+ if (uri_to_addr(uri, &addr) != 0)
+ return -1;
+ struct swim_member *member = swim_find_member(swim, &addr);
+ if (member == NULL) {
+ member = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ if (member == NULL)
+ return -1;
+ }
+ return 0;
+}
+
+int
+swim_remove_member(struct swim *swim, const char *uri)
+{
+ struct sockaddr_in addr;
+ if (uri_to_addr(uri, &addr) != 0)
+ return -1;
+ struct swim_member *member = swim_find_member(swim, &addr);
+ if (member != NULL)
+ swim_member_delete(swim, member);
+ return 0;
+}
+
+void
+swim_info(struct swim *swim, struct info_handler *info)
+{
+ info_begin(info);
+ for (mh_int_t node = mh_first(swim->members),
+ end = mh_end(swim->members); node != end;
+ node = mh_next(swim->members, node)) {
+ struct swim_member *member = (struct swim_member *)
+ mh_i64ptr_node(swim->members, node)->val;
+ info_table_begin(info,
+ sio_strfaddr((struct sockaddr *) &member->addr,
+ sizeof(member->addr)));
+ info_append_str(info, "status",
+ swim_member_status_strs[member->status]);
+ info_table_end(info);
+ }
+ info_end(info);
+}
+
+void
+swim_delete(struct swim *swim)
+{
+ swim_scheduler_destroy(&swim->scheduler);
+ ev_periodic_stop(loop(), &swim->round_tick);
+ swim_task_destroy(&swim->round_step_task);
+ mh_int_t node = mh_first(swim->members);
+ while (node != mh_end(swim->members)) {
+ struct swim_member *m = (struct swim_member *)
+ mh_i64ptr_node(swim->members, node)->val;
+ swim_member_delete(swim, m);
+ node = mh_first(swim->members);
+ }
+ mh_i64ptr_delete(swim->members);
+ free(swim->shuffled_members);
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
new file mode 100644
index 000000000..350fa0cee
--- /dev/null
+++ b/src/lib/swim/swim.h
@@ -0,0 +1,95 @@
+#ifndef TARANTOOL_SWIM_H_INCLUDED
+#define TARANTOOL_SWIM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct info_handler;
+struct swim;
+struct swim_transport;
+
+/**
+ * Create new SWIM instance. Just creation without binding,
+ * setting any parameters or something. Allocation and
+ * initialization only.
+ */
+struct swim *
+swim_new(void);
+
+/**
+ * Configure or reconfigure a SWIM instance.
+ *
+ * @param swim SWIM instance to configure.
+ * @param uri URI in the format "ip:port".
+ * @param heartbeat_rate Rate of sending round messages. It does
+ * mean that each member will be checked each
+ * @heartbeat_rate seconds. It is rather the protocol
+ * speed. Protocol period depends on member count and
+ * @heartbeat_rate.
+ * @param new_transport Transport API to send/receive messages.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ const struct swim_transport *new_transport);
+
+/**
+ * Stop listening and broadcasting messages, cleanup all internal
+ * structures, free memory.
+ */
+void
+swim_delete(struct swim *swim);
+
+/**
+ * Add a new member. It is added to the members table and pinned.
+ * SWIM will ping the member, but never will delete him, even if
+ * pings fail.
+ */
+int
+swim_add_member(struct swim *swim, const char *uri);
+
+/** Silently remove a member from members table. */
+int
+swim_remove_member(struct swim *swim, const char *uri);
+
+/** Dump member statuses into @a info. */
+void
+swim_info(struct swim *swim, struct info_handler *info);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* TARANTOOL_SWIM_H_INCLUDED */
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
new file mode 100644
index 000000000..8a1eca819
--- /dev/null
+++ b/src/lib/swim/swim_io.c
@@ -0,0 +1,163 @@
+#include "swim_io.h"
+#include "fiber.h"
+
+static ssize_t
+swim_udp_send_msg(int fd, const void *data, size_t size,
+ const struct sockaddr *addr, socklen_t addr_size)
+{
+ ssize_t ret = sio_sendto(fd, data, size, 0, addr, addr_size);
+ if (ret == -1 && sio_wouldblock(errno))
+ return 0;
+ return ret;
+}
+
+static ssize_t
+swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr,
+ socklen_t *addr_size)
+{
+ ssize_t ret = sio_recvfrom(fd, buffer, size, 0, addr, addr_size);
+ if (ret == -1 && sio_wouldblock(errno))
+ return 0;
+ return ret;
+}
+
+struct swim_transport swim_udp_transport = {
+ /* .send_round_msg = */ swim_udp_send_msg,
+ /* .recv_msg = */ swim_udp_recv_msg,
+};
+
+struct swim_packet *
+swim_packet_new(struct swim_msg *msg)
+{
+ struct swim_packet *res =
+ (struct swim_packet *) malloc(sizeof(*res));
+ if (res == NULL) {
+ diag_set(OutOfMemory, sizeof(*res), "malloc", "res");
+ return NULL;
+ }
+ swim_packet_create(res, msg);
+ return res;
+}
+
+void
+swim_task_schedule(struct swim_task *task, swim_transport_send_f send,
+ const struct sockaddr_in *dst)
+{
+ assert(! swim_task_is_active(task));
+ task->send = send;
+ task->dst = *dst;
+ rlist_add_tail_entry(&task->scheduler->queue_output, task,
+ in_queue_output);
+ ev_io_start(loop(), &task->scheduler->output);
+}
+
+static void
+swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events);
+
+static void
+swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events);
+
+void
+swim_scheduler_create(struct swim_scheduler *scheduler,
+ swim_scheduler_on_input_f on_input,
+ const struct swim_transport *transport)
+{
+ ev_init(&scheduler->output, swim_scheduler_on_output);
+ scheduler->output.data = (void *) scheduler;
+ ev_init(&scheduler->input, swim_scheduler_on_input);
+ scheduler->input.data = (void *) scheduler;
+ rlist_create(&scheduler->queue_output);
+ scheduler->on_input = on_input;
+ swim_scheduler_set_transport(scheduler, transport);
+}
+
+int
+swim_scheduler_bind(struct swim_scheduler *scheduler, struct sockaddr_in *addr)
+{
+ struct sockaddr_in cur_addr;
+ socklen_t addrlen = sizeof(cur_addr);
+ int old_fd = scheduler->input.fd;
+
+ if (old_fd != -1 &&
+ getsockname(old_fd, (struct sockaddr *) &cur_addr, &addrlen) == 0 &&
+ addr->sin_addr.s_addr == cur_addr.sin_addr.s_addr &&
+ addr->sin_port == cur_addr.sin_port)
+ return 0;
+
+ int fd = sio_socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (fd < 0)
+ return -1;
+ if (sio_bind(fd, (struct sockaddr *) addr, sizeof(*addr)) != 0 ||
+ evio_setsockopt_server(fd, AF_INET, SOCK_DGRAM) != 0) {
+ if (errno == EADDRINUSE)
+ diag_set(SocketError, sio_socketname(fd), "bind");
+ close(fd);
+ return -1;
+ }
+ close(old_fd);
+ ev_io_set(&scheduler->input, fd, EV_READ);
+ ev_io_set(&scheduler->output, fd, EV_WRITE);
+ return 0;
+}
+
+void
+swim_scheduler_destroy(struct swim_scheduler *scheduler)
+{
+ close(scheduler->input.fd);
+ ev_io_stop(loop(), &scheduler->output);
+ ev_io_stop(loop(), &scheduler->input);
+}
+
+static void
+swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_WRITE) != 0);
+ (void) events;
+ struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+ if (rlist_empty(&scheduler->queue_output)) {
+ ev_io_stop(loop, io);
+ return;
+ }
+ struct swim_task *task =
+ rlist_shift_entry(&scheduler->queue_output, struct swim_task,
+ in_queue_output);
+ say_verbose("SWIM: send to %s",
+ sio_strfaddr((struct sockaddr *) &task->dst,
+ sizeof(task->dst)));
+ for (struct swim_packet *packet = swim_msg_first_packet(&task->msg);
+ packet != NULL; packet = swim_packet_next(packet)) {
+ if (task->send(io->fd, packet->body, packet->pos - packet->body,
+ (struct sockaddr *) &task->dst,
+ sizeof(task->dst)) == -1)
+ diag_log();
+ }
+ task->complete(task);
+}
+
+static void
+swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_READ) != 0);
+ (void) events;
+ (void) loop;
+ struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+ struct sockaddr_in addr;
+ socklen_t len = sizeof(addr);
+ struct swim_packet packet;
+ struct swim_msg msg;
+ swim_msg_create(&msg);
+ swim_packet_create(&packet, &msg);
+ swim_transport_recv_f recv = scheduler->transport->recv_msg;
+ ssize_t size = recv(io->fd, packet.body, packet.end - packet.body,
+ (struct sockaddr *) &addr, &len);
+ if (size <= 0) {
+ if (size < 0)
+ diag_log();
+ return;
+ }
+ swim_packet_advance(&packet, size);
+ swim_packet_flush(&packet);
+ say_verbose("SWIM: received from %s",
+ sio_strfaddr((struct sockaddr *) &addr, len));
+ scheduler->on_input(scheduler, &packet, &addr);
+}
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
new file mode 100644
index 000000000..fc44fd0a7
--- /dev/null
+++ b/src/lib/swim/swim_io.h
@@ -0,0 +1,299 @@
+#ifndef TARANTOOL_SWIM_IO_H_INCLUDED
+#define TARANTOOL_SWIM_IO_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "trivia/util.h"
+#include "small/rlist.h"
+#include "salad/stailq.h"
+#include "swim_transport.h"
+#include "evio.h"
+#include <stdbool.h>
+#include <arpa/inet.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct swim_task;
+struct swim_scheduler;
+
+/** UDP sendto/recvfrom implementation of swim_transport. */
+extern struct swim_transport swim_udp_transport;
+
+enum {
+ /**
+ * Default MTU is 1500. MTU (when IPv4 is used) consists
+ * of IPv4 header, UDP header, Data. IPv4 has 20 bytes
+ * header, UDP - 8 bytes. So Data = 1500 - 20 - 8 = 1472.
+ * TODO: adapt to other MTUs which can be reduced in some
+ * networks by their admins.
+ */
+ UDP_PACKET_SIZE = 1472,
+};
+
+/**
+ * UDP body size is limited by definition. To be able to send a
+ * big message it should be split into multiple packets. Each
+ * packet is self-sufficient piece of data, which can withstand
+ * loss of other packets and be processed independently.
+ */
+struct swim_packet {
+ /** Place in the whole big message, struct swim_msg. */
+ struct stailq_entry in_msg;
+ /** Last valid position in the body. */
+ char *pos;
+ /** Position beyond pos, contains unfinished data. */
+ char *next_pos;
+ /** Packet body. */
+ char body[UDP_PACKET_SIZE];
+ /**
+ * Pointer to the end of the body. Just syntax sugar to do
+ * not write 'body + sizeof(body)' each time.
+ */
+ char end[0];
+};
+
+struct swim_msg {
+ struct stailq packets;
+};
+
+static inline bool
+swim_packet_is_last(struct swim_packet *packet)
+{
+ return stailq_next(&packet->in_msg) == NULL;
+}
+
+static inline char *
+swim_packet_reserve(struct swim_packet *packet, int size)
+{
+ return packet->next_pos + size > packet->end ? NULL : packet->next_pos;
+}
+
+static inline void
+swim_packet_advance(struct swim_packet *packet, int size)
+{
+ assert(packet->next_pos + size <= packet->end);
+ packet->next_pos += size;
+}
+
+static inline char *
+swim_packet_alloc(struct swim_packet *packet, int size)
+{
+ char *res = swim_packet_reserve(packet, size);
+ if (res == NULL)
+ return NULL;
+ swim_packet_advance(packet, size);
+ return res;
+}
+
+static inline void
+swim_packet_flush(struct swim_packet *packet)
+{
+ assert(packet->next_pos >= packet->pos);
+ packet->pos = packet->next_pos;
+}
+
+static inline struct swim_packet *
+swim_packet_next(struct swim_packet *packet)
+{
+ if (swim_packet_is_last(packet))
+ return NULL;
+ return stailq_next_entry(packet, in_msg);
+}
+
+static inline void
+swim_packet_delete(struct swim_packet *packet)
+{
+ free(packet);
+}
+
+static inline void
+swim_packet_create(struct swim_packet *packet, struct swim_msg *msg)
+{
+ stailq_add_tail_entry(&msg->packets, packet, in_msg);
+ packet->pos = packet->body;
+ packet->next_pos = packet->body;
+}
+
+struct swim_packet *
+swim_packet_new(struct swim_msg *msg);
+
+static inline bool
+swim_msg_is_empty(struct swim_msg *msg)
+{
+ return stailq_empty(&msg->packets);
+}
+
+static inline struct swim_packet *
+swim_msg_first_packet(struct swim_msg *msg)
+{
+ if (swim_msg_is_empty(msg))
+ return NULL;
+ return stailq_first_entry(&msg->packets, struct swim_packet, in_msg);
+}
+
+static inline struct swim_packet *
+swim_msg_last_packet(struct swim_msg *msg)
+{
+ if (swim_msg_is_empty(msg))
+ return NULL;
+ return stailq_last_entry(&msg->packets, struct swim_packet, in_msg);
+}
+
+static inline void
+swim_msg_create(struct swim_msg *msg)
+{
+ stailq_create(&msg->packets);
+}
+
+static inline void
+swim_msg_destroy(struct swim_msg *msg)
+{
+ struct swim_packet *packet, *tmp;
+ stailq_foreach_entry_safe(packet, tmp, &msg->packets, in_msg)
+ swim_packet_delete(packet);
+}
+
+static inline void
+swim_msg_reset(struct swim_msg *msg)
+{
+ swim_msg_destroy(msg);
+ swim_msg_create(msg);
+}
+
+static inline struct swim_packet *
+swim_msg_reserve(struct swim_msg *msg, int size)
+{
+ struct swim_packet *packet = swim_msg_last_packet(msg);
+ assert(size <= (int) sizeof(packet->body));
+ if (packet == NULL || swim_packet_reserve(packet, size) == NULL)
+ return swim_packet_new(msg);
+ return packet;
+}
+
+typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
+ const struct swim_packet *packet,
+ const struct sockaddr_in *src);
+
+struct swim_scheduler {
+ /** Transport used to receive packets. */
+ const struct swim_transport *transport;
+ /** Function called when a packet is received. */
+ swim_scheduler_on_input_f on_input;
+ /**
+ * Event dispatcher of incomming messages. Takes them from
+ * network.
+ */
+ struct ev_io input;
+ /**
+ * Event dispatcher of outcomming messages. Takes tasks
+ * from queue_output.
+ */
+ struct ev_io output;
+ /** Queue of output tasks ready to write now. */
+ struct rlist queue_output;
+};
+
+void
+swim_scheduler_create(struct swim_scheduler *scheduler,
+ swim_scheduler_on_input_f on_input,
+ const struct swim_transport *transport);
+
+int
+swim_scheduler_bind(struct swim_scheduler *scheduler, struct sockaddr_in *addr);
+
+static inline void
+swim_scheduler_set_transport(struct swim_scheduler *scheduler,
+ const struct swim_transport *transport)
+{
+ scheduler->transport = transport;
+}
+
+void
+swim_scheduler_destroy(struct swim_scheduler *scheduler);
+
+/**
+ * Each SWIM component in a common case independently may want to
+ * push some data into the network. Dissemination sends events,
+ * failure detection sends pings, acks. Anti-entropy sends member
+ * tables. The intention to send a data is called IO task and is
+ * stored in a queue that is dispatched when output is possible.
+ */
+typedef void (*swim_task_f)(struct swim_task *);
+
+struct swim_task {
+ /** Function to send each packet. */
+ swim_transport_send_f send;
+ /** Function called when the task has completed. */
+ swim_task_f complete;
+ /** Message to send. */
+ struct swim_msg msg;
+ /** Destination address. */
+ struct sockaddr_in dst;
+ /** Place in a queue of tasks. */
+ struct rlist in_queue_output;
+ /** SWIM scheduler managing the task. */
+ struct swim_scheduler *scheduler;
+};
+
+void
+swim_task_schedule(struct swim_task *task, swim_transport_send_f send,
+ const struct sockaddr_in *dst);
+
+static inline void
+swim_task_create(struct swim_task *task, struct swim_scheduler *scheduler,
+ swim_task_f complete)
+{
+ memset(task, 0, sizeof(*task));
+ task->complete = complete;
+ swim_msg_create(&task->msg);
+ rlist_create(&task->in_queue_output);
+ task->scheduler = scheduler;
+}
+
+static inline bool
+swim_task_is_active(struct swim_task *task)
+{
+ return ! rlist_empty(&task->in_queue_output);
+}
+
+static inline void
+swim_task_destroy(struct swim_task *task)
+{
+ rlist_del_entry(task, in_queue_output);
+ swim_msg_destroy(&task->msg);
+}
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* TARANTOOL_SWIM_IO_H_INCLUDED */
\ No newline at end of file
diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h
new file mode 100644
index 000000000..d629526ac
--- /dev/null
+++ b/src/lib/swim/swim_transport.h
@@ -0,0 +1,64 @@
+#ifndef TARANTOOL_SWIM_TRANSPORT_H_INCLUDED
+#define TARANTOOL_SWIM_TRANSPORT_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <sys/socket.h>
+
+typedef ssize_t (*swim_transport_send_f)(int fd, const void *data, size_t size,
+ const struct sockaddr *addr,
+ socklen_t addr_size);
+
+typedef ssize_t (*swim_transport_recv_f)(int fd, void *buffer, size_t size,
+ struct sockaddr *addr,
+ socklen_t *addr_size);
+
+/**
+ * Virtual methods of SWIM protocol steps. Usual implementation -
+ * just sendto/recvfrom for all methods. But for testing via this
+ * interface errors could be simulated.
+ */
+struct swim_transport {
+ /**
+ * Send regular round message containing dissemination,
+ * failure detection and anti-entropy sections. Parameters
+ * are like sendto().
+ */
+ swim_transport_send_f send_round_msg;
+
+ /**
+ * Receive a message. Not necessary round or failure
+ * detection. Before message is received, its type is
+ * unknown. Parameters are like recvfrom().
+ */
+ swim_transport_recv_f recv_msg;
+};
+
+#endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */
diff --git a/src/lua/init.c b/src/lua/init.c
index ca4b47f3a..5b47aa3e3 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -58,6 +58,7 @@
#include "lua/fio.h"
#include "lua/httpc.h"
#include "lua/utf8.h"
+#include "lua/swim.h"
#include "digest.h"
#include <small/ibuf.h>
@@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
tarantool_lua_socket_init(L);
tarantool_lua_pickle_init(L);
tarantool_lua_digest_init(L);
+ tarantool_lua_swim_init(L);
luaopen_http_client_driver(L);
lua_pop(L, 1);
luaopen_msgpack(L);
diff --git a/src/lua/swim.c b/src/lua/swim.c
new file mode 100644
index 000000000..4748ef139
--- /dev/null
+++ b/src/lua/swim.c
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "utils.h"
+#include "diag.h"
+#include "swim/swim.h"
+#include "small/ibuf.h"
+#include "lua/info.h"
+#include <info.h>
+
+/** SWIM instances are pushed as cdata with this id. */
+uint32_t CTID_STRUCT_SWIM_PTR;
+
+/**
+ * Get @a n-th value from a Lua stack as a struct swim pointer.
+ * @param L Lua state.
+ * @param n Where pointer is stored on Lua stack.
+ *
+ * @retval NULL The stack position does not exist or it is not a
+ * struct swim pointer.
+ * @retval not NULL Valid SWIM pointer.
+ */
+static inline struct swim *
+lua_swim_ptr(struct lua_State *L, int n)
+{
+ uint32_t ctypeid;
+ if (lua_type(L, n) != LUA_TCDATA)
+ return NULL;
+ void *swim = luaL_checkcdata(L, n, &ctypeid);
+ if (ctypeid != CTID_STRUCT_SWIM_PTR)
+ return NULL;
+ return *(struct swim **) swim;
+}
+
+/**
+ * Delete SWIM instance passed via first Lua stack position. Used
+ * by Lua GC.
+ */
+static int
+lua_swim_gc(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "SWIM gc expected struct swim *");
+ swim_delete(swim);
+ return 0;
+}
+
+/**
+ * Configure @a swim instance using a table stored in @a ncfg-th
+ * position on Lua stack.
+ * @param L Lua state.
+ * @param ncfg Where configuration is stored on Lua stack.
+ * @param swim SWIM instance to configure.
+ * @param funcname Caller function name to use in error messages.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error, stored in diagnostics area. Critical errors
+ * like OOM or incorrect usage can throw.
+ */
+static int
+lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim,
+ const char *funcname)
+{
+ if (! lua_istable(L, ncfg)) {
+ return luaL_error(L, "swim.%s: expected table config",
+ funcname);
+ }
+
+ const char *server_uri;
+ lua_getfield(L, ncfg, "server");
+ if (lua_isstring(L, -1)) {
+ server_uri = lua_tostring(L, -1);
+ } else {
+ return luaL_error(L, "swim.%s: server should be string URI",
+ funcname);
+ }
+ lua_pop(L, 1);
+
+ double heartbeat_rate;
+ lua_getfield(L, ncfg, "heartbeat");
+ if (lua_isnumber(L, -1)) {
+ heartbeat_rate = lua_tonumber(L, -1);
+ if (heartbeat_rate <= 0) {
+ return luaL_error(L, "swim.%s: heartbeat should be "\
+ "positive number", funcname);
+ }
+ } else if (! lua_isnil(L, -1)) {
+ return luaL_error(L, "swim.%s: heartbeat should be positive "\
+ "number", funcname);
+ } else {
+ heartbeat_rate = -1;
+ }
+ lua_pop(L, 1);
+
+ return swim_cfg(swim, server_uri, heartbeat_rate, NULL);
+}
+
+static int
+lua_swim_new(struct lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top > 1)
+ return luaL_error(L, "Usage: swim.new([{<config>}]");
+ struct swim *swim = swim_new();
+ if (swim != NULL) {
+ *(struct swim **)luaL_pushcdata(L, CTID_STRUCT_SWIM_PTR) = swim;
+ lua_pushcfunction(L, lua_swim_gc);
+ luaL_setcdatagc(L, -2);
+ if (top == 0 || lua_swim_cfg_impl(L, 1, swim, "new") == 0)
+ return 1;
+ lua_pop(L, 1);
+ }
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+}
+
+static int
+lua_swim_cfg(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:cfg({<config>})");
+ if (lua_swim_cfg_impl(L, 2, swim, "cfg") != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+static inline int
+lua_swim_add_remove_member(struct lua_State *L, const char *funcname,
+ int (*action)(struct swim *, const char *))
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (lua_gettop(L) != 2 || swim == NULL)
+ return luaL_error(L, "Usage: swim:%s(uri)", funcname);
+ const char *member_uri;
+ if (lua_isstring(L, -1)) {
+ member_uri = lua_tostring(L, 1);
+ } else {
+ return luaL_error(L, "swim.%s: member URI should be array",
+ funcname);
+ }
+
+ if (action(swim, member_uri) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+static int
+lua_swim_add_member(struct lua_State *L)
+{
+ return lua_swim_add_remove_member(L, "add_member", swim_add_member);
+}
+
+static int
+lua_swim_remove_member(struct lua_State *L)
+{
+ return lua_swim_add_remove_member(L, "remove_member",
+ swim_remove_member);
+}
+
+static int
+lua_swim_delete(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:delete()");
+ swim_delete(swim);
+ uint32_t ctypeid;
+ struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == CTID_STRUCT_SWIM_PTR);
+ *cdata = NULL;
+ return 0;
+}
+
+static int
+lua_swim_info(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:info()");
+ struct info_handler info;
+ luaT_info_handler_create(&info, L);
+ swim_info(swim, &info);
+ return 1;
+}
+
+void
+tarantool_lua_swim_init(struct lua_State *L)
+{
+ static const struct luaL_Reg lua_swim_methods [] = {
+ {"new", lua_swim_new},
+ {"cfg", lua_swim_cfg},
+ {"add_member", lua_swim_add_member},
+ {"remove_member", lua_swim_remove_member},
+ {"delete", lua_swim_delete},
+ {"info", lua_swim_info},
+ {NULL, NULL}
+ };
+ luaL_register_module(L, "swim", lua_swim_methods);
+ lua_pop(L, 1);
+ int rc = luaL_cdef(L, "struct swim;");
+ assert(rc == 0);
+ (void) rc;
+ CTID_STRUCT_SWIM_PTR = luaL_ctypeid(L, "struct swim *");
+ assert(CTID_STRUCT_SWIM_PTR != 0);
+};
diff --git a/src/lua/swim.h b/src/lua/swim.h
new file mode 100644
index 000000000..989cf62b3
--- /dev/null
+++ b/src/lua/swim.h
@@ -0,0 +1,47 @@
+#ifndef INCLUDES_TARANTOOL_LUA_SWIM_H
+#define INCLUDES_TARANTOOL_LUA_SWIM_H
+/*
+ * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct lua_State;
+
+void
+tarantool_lua_swim_init(struct lua_State *L);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* INCLUDES_TARANTOOL_LUA_SWIM_H */
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component
2018-12-29 10:14 ` [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
@ 2019-01-09 9:12 ` Konstantin Osipov
2019-01-15 14:42 ` [tarantool-patches] " Vladislav Shpilevoy
2019-01-09 11:45 ` [tarantool-patches] " Konstantin Osipov
1 sibling, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2019-01-09 9:12 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
> +enum {
> + /** How often to send membership messages and pings. */
.. in seconds (please add these two words to the comment).
> + HEARTBEAT_RATE_DEFAULT = 1,
> +};
> +
\ is / modulO
> + * Take a random number not blindly calculating a module, but
It took me some time to wrap my head around this
sentence before I cracked what's a module.
boundaries to preserve the original
> + * scaling random number down the given borders to save
> + * distribution. A result belongs the range [start, end].
The result belongs to the range
> + */
> +static inline int
> +swim_scaled_rand(int start, int end)
> +{
> + assert(end > start);
> + return rand() / (RAND_MAX / (end - start + 1) + 1);
> +}
> +
> + * Global hash of all known members of the cluster. Hash
> + * key is bitwise combination of ip and port
What's a bitwise combination? I see later that you shift the IP address
and concatenate it with the port value. It would be a
concatenation then. Why not simply run the entire sockarddr_in
through murmur, this would be portable across transports
and reliably random? The good old days of the trick you used in
sockaddr_in_hash are bygone IMHO.
You can easily remember the hash value in swim_member struct,
so that all hash table lookups are easy.
> + * struct member, describing a remote instance. The only
> + * purpose of such strange hash function is to be able to
> + * reuse mh_i64ptr_t instead of introducing one more
> + * implementation of mhash.
> + *
> + * Discovered members live here until they are
> + * unavailable - in such a case they are removed from the
> + * hash. But a subset of members are pinned - the ones
> + * added explicitly via API. When a member is pinned, it
> + * can not be removed from the hash, and the module will
> + * ping him constantly.
Great comments btw, makes the whole thing easy to understand. I
will stop by your desk to fix a couple of grammar issues I found.
> +static inline uint64_t
> +sockaddr_in_hash(const struct sockaddr_in *a)
> +{
> + return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
> +}
See my comment re hash function.
> +struct PACKED swim_member_bin {
> + /** mp_encode_map(3) */
> + uint8_t m_header;
> +
> + /** mp_encode_uint(SWIM_MEMBER_STATUS) */
> + uint8_t k_status;
> + /** mp_encode_uint(enum member_status) */
> + uint8_t v_status;
> +
> + /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
> + uint8_t k_addr;
> + /** mp_encode_uint(addr.sin_addr.s_addr) */
> + uint8_t m_addr;
> + uint32_t v_addr;
> +
> + /** mp_encode_uint(SWIM_MEMBER_PORT) */
> + uint8_t k_port;
> + /** mp_encode_uint(addr.sin_port) */
> + uint8_t m_port;
> + uint16_t v_port;
> +};
Please create a patch for the docs extending the binary protocol
with these new messages. I would also ponder a bit more about
extending iproto_constants.h with swim command codes in case we
ever want to transport them over iproto.
> + * SWIM_ANTI_ENTROPY: [
> + * {
> + * SWIM_MEMBER_STATUS: uint, enum member_status,
> + * SWIM_MEMBER_ADDRESS: uint, ip,
> + * SWIM_MEMBER_PORT: uint, port
> + * },
Is this going to work only over ip network? Why not use server
uuids as member identifiers?
> + for (mh_int_t node = mh_first(members), end = mh_end(members);
> + node != end; node = mh_next(members, node), ++i) {
> + shuffled[i] = (struct swim_member *)
> + mh_i64ptr_node(members, node)->val;
> + int j = swim_scaled_rand(0, i);
> + SWAP(shuffled[i], shuffled[j]);
> + }
Please add a comment that this method preserves even distribution
of a random sequence, ideally explaining why, or at least
mentioning that we tested it.
> +/**
> + * Encode anti-entropy header and members data as many as
> + * possible to the end of a last packet.
> + * @retval -1 Error.
> + * @retval 0 Not error, but nothing is encoded.
> + * @retval 1 Something is encoded.
> + */
I would appreciate a bit more lengthy comment about your chunking
strategy when pushing this data over UDP. Do you prepare all
chunks and then send them? Do you fill a chunk and flush it along
the way? What is chunk size (it's a pity it's not part of any
function signature, it's the defining constraint of every function
below. Do you have static asserts for the case when msgpack packet
doesn't fit the udp packet?
> +
> +static int
> +swim_process_member_key(enum swim_member_key key, const char **pos,
> + const char *end, const char *msg_pref,
> + struct swim_member_def *def)
No comment. What's going on here?
> +{
> + switch(key) {
Missing space.
> + case SWIM_MEMBER_STATUS:
> + if (mp_typeof(**pos) != MP_UINT ||
> + mp_check_uint(*pos, end) > 0) {
> + say_error("%s member status should be uint", msg_pref);
> + return -1;
> + }
> + key = mp_decode_uint(pos);
> + if (key >= swim_member_status_MAX) {
> + say_error("%s unknown member status", msg_pref);
> + return -1;
> + }
> + def->status = (enum swim_member_status) key;
> + break;
> + case SWIM_MEMBER_ADDRESS:
> + if (mp_typeof(**pos) != MP_UINT ||
> + mp_check_uint(*pos, end) > 0) {
> + say_error("%s member address should be uint", msg_pref);
> + return -1;
> + }
> + def->addr.sin_addr.s_addr = mp_decode_uint(pos);
> + break;
> + case SWIM_MEMBER_PORT:
> + if (mp_typeof(**pos) != MP_UINT ||
> + mp_check_uint(*pos, end) > 0) {
> + say_error("%s member port should be uint", msg_pref);
> + return -1;
> + }
> + uint64_t port = mp_decode_uint(pos);
> + if (port > UINT16_MAX) {
> + say_error("%s member port is invalid", msg_pref);
> + return -1;
> + }
> + def->addr.sin_port = port;
> + break;
> + default:
> + unreachable();
> + }
> + return 0;
> +}
OK, I understand now it's merely updating a member configuration.
But then what's the event flow of such update? What happens after
the update? I know from the public API that we suspend all
activities while update is in progress, but this is not so obvious
when you reading the code.
> +/** Decode an anti-entropy message, update members table. */
> +static int
> +swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
Why not swim_decode_whatever?
process is a very general world, it could mean any action.
> +{
> + const char *msg_pref = "Invalid SWIM anti-entropy message:";
> + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
> + say_error("%s message should be an array", msg_pref);
> + return -1;
> + }
> + uint64_t size = mp_decode_array(pos);
> + for (uint64_t i = 0; i < size; ++i) {
> + if (mp_typeof(**pos) != MP_MAP ||
> + mp_check_map(*pos, end) > 0) {
> + say_error("%s member should be map", msg_pref);
> + return -1;
> + }
> + uint64_t map_size = mp_decode_map(pos);
> + struct swim_member_def def;
> + swim_member_def_create(&def);
> + for (uint64_t j = 0; j < map_size; ++j) {
> + if (mp_typeof(**pos) != MP_UINT ||
> + mp_check_uint(*pos, end) > 0) {
> + say_error("%s member key should be uint",
> + msg_pref);
> + return -1;
> + }
> + uint64_t key = mp_decode_uint(pos);
> + if (key >= swim_member_key_MAX) {
> + say_error("%s unknown member key", msg_pref);
> + return -1;
> + }
> + if (swim_process_member_key(key, pos, end, msg_pref,
> + &def) != 0)
> + return -1;
> + }
> + if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
> + say_error("%s member address should be specified",
> + msg_pref);
> + return -1;
> + }
> + swim_process_member_update(swim, &def);
Why not swim_udpate_member?
Someone needs to check that there is no buffer overflow in this
code (I will check in the next review round).
> +/**
> + * Convert a string URI like "ip:port" to sockaddr_in structure.
> + */
> +static int
> +uri_to_addr(const char *str, struct sockaddr_in *addr)
> +{
> + struct uri uri;
> + if (uri_parse(&uri, str) != 0 || uri.service == NULL)
> + goto invalid_uri;
> + in_addr_t iaddr;
> + if (uri.host_len == strlen(URI_HOST_UNIX) &&
> + memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) {
> + diag_set(IllegalParams, "Unix sockets are not supported");
> + return -1;
> + }
> + if (uri.host_len == 0) {
> + iaddr = htonl(INADDR_ANY);
> + } else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) {
> + iaddr = htonl(INADDR_LOOPBACK);
> + } else {
> + iaddr = inet_addr(tt_cstr(uri.host, uri.host_len));
> + if (iaddr == (in_addr_t) -1)
> + goto invalid_uri;
> + }
> + int port = htons(atoi(uri.service));
> + memset(addr, 0, sizeof(*addr));
> + addr->sin_family = AF_INET;
> + addr->sin_addr.s_addr = iaddr;
> + addr->sin_port = port;
> + return 0;
> +
> +invalid_uri:
> + diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str);
> + return -1;
> +}
Shouldn't this be part of sio (it's a utility function).
> + }
> + if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) {
> + swim_member_delete(swim, new_self);
> + return -1;
Why is this a scheduler function, not member function?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component
2019-01-09 9:12 ` [tarantool-patches] " Konstantin Osipov
@ 2019-01-15 14:42 ` Vladislav Shpilevoy
0 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-15 14:42 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
Hi! Thanks for the review! See my answers inlined.
On 09/01/2019 12:12, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
>> +enum {
>> + /** How often to send membership messages and pings. */
>
> .. in seconds (please add these two words to the comment).
>
>> + HEARTBEAT_RATE_DEFAULT = 1,
>> +};
>> +
@@ -87,7 +87,10 @@
*/
enum {
- /** How often to send membership messages and pings. */
+ /**
+ * How often to send membership messages and pings in
+ * seconds.
+ */
HEARTBEAT_RATE_DEFAULT = 1,
};
>
> \ is / modulO
>> + * Take a random number not blindly calculating a module, but
I did not add 'is' on purpose - here 'take' is not a noun, but verb,
and calculating is an adjective.
@@ -95,7 +95,7 @@ enum {
};
/**
- * Take a random number not blindly calculating a module, but
+ * Take a random number not blindly calculating a modulo, but
* scaling random number down the given borders to save
* distribution. A result belongs the range [start, end].
*/
>
> It took me some time to wrap my head around this
> sentence before I cracked what's a module.
> boundaries to preserve the original
>> + * scaling random number down the given borders to save
>> + * distribution. A result belongs the range [start, end].
> The result belongs to the range
@@ -96,8 +96,9 @@ enum {
/**
* Take a random number not blindly calculating a modulo, but
- * scaling random number down the given borders to save
- * distribution. A result belongs the range [start, end].
+ * scaling random number down the given boundaries to preserve the
+ * original distribution. The result belongs the range
+ * [start, end].
*/
static inline int
swim_scaled_rand(int start, int end)
>> + */
>> +static inline int
>> +swim_scaled_rand(int start, int end)
>> +{
>> + assert(end > start);
>> + return rand() / (RAND_MAX / (end - start + 1) + 1);
>> +}
>> +
>> + * Global hash of all known members of the cluster. Hash
>> + * key is bitwise combination of ip and port
>
> What's a bitwise combination? I see later that you shift the IP address
> and concatenate it with the port value. It would be a
> concatenation then.
I do not want to adhere to a concrete way of bit combination here - it
is encapsulated in sockaddr_in_hash().
> Why not simply run the entire sockarddr_in
> through murmur, this would be portable across transports
> and reliably random? The good old days of the trick you used in
> sockaddr_in_hash are bygone IMHO.
I do not like complex hashes here like murmur. Also, struct
sockaddr_in is allowed to have move fields than sin_port and
sin_addr.s_addr, so it is not portable to take a hash of the entire
struct. For example, I could create the struct on the stack and fill
its sin_addr and sin_port, and get a struct from recvfrom with the
same sin_addr and sin_port, but with filled other fields - hashes
would be different.
>
> You can easily remember the hash value in swim_member struct,
> so that all hash table lookups are easy.
I do not want to store hash here because mh_i64ptr already stores hashes
in mh_i64ptr_node.
>
>> + * struct member, describing a remote instance. The only
>> + * purpose of such strange hash function is to be able to
>> + * reuse mh_i64ptr_t instead of introducing one more
>> + * implementation of mhash.
>> + *
>> + * Discovered members live here until they are
>> + * unavailable - in such a case they are removed from the
>> + * hash. But a subset of members are pinned - the ones
>> + * added explicitly via API. When a member is pinned, it
>> + * can not be removed from the hash, and the module will
>> + * ping him constantly.
>
> Great comments btw, makes the whole thing easy to understand. I
> will stop by your desk to fix a couple of grammar issues I found.
Ok, looking forward.
>
>> +static inline uint64_t
>> +sockaddr_in_hash(const struct sockaddr_in *a)
>> +{
>> + return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
>> +}
>
>> +struct PACKED swim_member_bin {
>> + /** mp_encode_map(3) */
>> + uint8_t m_header;
>> +
>> + /** mp_encode_uint(SWIM_MEMBER_STATUS) */
>> + uint8_t k_status;
>> + /** mp_encode_uint(enum member_status) */
>> + uint8_t v_status;
>> +
>> + /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
>> + uint8_t k_addr;
>> + /** mp_encode_uint(addr.sin_addr.s_addr) */
>> + uint8_t m_addr;
>> + uint32_t v_addr;
>> +
>> + /** mp_encode_uint(SWIM_MEMBER_PORT) */
>> + uint8_t k_port;
>> + /** mp_encode_uint(addr.sin_port) */
>> + uint8_t m_port;
>> + uint16_t v_port;
>> +};
>
> Please create a patch for the docs extending the binary protocol
> with these new messages. I would also ponder a bit more about
> extending iproto_constants.h with swim command codes in case we
> ever want to transport them over iproto.
I guess, it is better to write a doc request on the whole swim module
once we agree on the protocol details and API. Concerning iproto - I
think we can move constants when we decide to transport swim over
iproto, *if* we decide it ever. What is more, iproto is box/, swim is
lib/ and moving constants to iproto requires making swim part of box.
I do not like it, however as an alternative in future it is possible
to move some swim constants into a separate header like swim_bin.h and
include it to iproto_constants.h. But now it makes no sense.
>
>> + * SWIM_ANTI_ENTROPY: [
>> + * {
>> + * SWIM_MEMBER_STATUS: uint, enum member_status,
>> + * SWIM_MEMBER_ADDRESS: uint, ip,
>> + * SWIM_MEMBER_PORT: uint, port
>> + * },
>
> Is this going to work only over ip network? Why not use server
> uuids as member identifiers?
Yes, IP, and only in UDP which was approved by you. UUIDs are useless
in terms of transport - you can not send a packet to UUID. Only to ip
and possibly port. But as verbally discussed, UUIDs can be used as an
identifier in local membership table. We have decided to implement
them, and I will do it later.
>
>> + for (mh_int_t node = mh_first(members), end = mh_end(members);
>> + node != end; node = mh_next(members, node), ++i) {
>> + shuffled[i] = (struct swim_member *)
>> + mh_i64ptr_node(members, node)->val;
>> + int j = swim_scaled_rand(0, i);
>> + SWAP(shuffled[i], shuffled[j]);
>> + }
>
> Please add a comment that this method preserves even distribution
> of a random sequence, ideally explaining why, or at least
> mentioning that we tested it.
@@ -377,6 +377,10 @@ swim_shuffle_members(struct swim *swim)
shuffled = new_shuffled;
swim->shuffled_members = new_shuffled;
int i = 0;
+ /*
+ * This shuffling preserves even distribution of a random
+ * sequence, that is proved by testing.
+ */
for (mh_int_t node = mh_first(members), end = mh_end(members);
node != end; node = mh_next(members, node), ++i) {
shuffled[i] = (struct swim_member *)
>
>> +/**
>> + * Encode anti-entropy header and members data as many as
>> + * possible to the end of a last packet.
>> + * @retval -1 Error.
>> + * @retval 0 Not error, but nothing is encoded.
>> + * @retval 1 Something is encoded.
>> + */
>
> I would appreciate a bit more lengthy comment about your chunking
> strategy when pushing this data over UDP. Do you prepare all
> chunks and then send them? Do you fill a chunk and flush it along
> the way? What is chunk size (it's a pity it's not part of any
> function signature, it's the defining constraint of every function
> below. Do you have static asserts for the case when msgpack packet
> doesn't fit the udp packet?
This function is not a place for transport details, but I will answer
your questions below nonetheless.
> Do I prepare all chunks and then send them?
I guess, under chunks you mean packets. Yes, I prepare all packets
and then send them.
> Do I fill a chunk and flush it along the way?
No, it is not possible, until I did not receive EV_WRITE event.
> What is chunk size?
See swim_io.h UDP_PACKET_SIZE.
> "it's a pity it's not part of any function signature".
And it should not be part of any function signature. UDP
packet size is known (unless MTU is modified by admin, but
even in such a case a new MTU would be stored in struct swim).
> Do you have static asserts for the case when msgpack packet
doesn't fit the udp packet?
No. During packing if a packet size is exceeded I just fill its
header and start a new packet. During unpacking (process_...()
functions) I do not care about packet size nor packets concept
at all - I just parse msgpack in a buffer of a given size.
>
>> +
>> +static int
>> +swim_process_member_key(enum swim_member_key key, const char **pos,
>> + const char *end, const char *msg_pref,
>> + struct swim_member_def *def)
>
> No comment. What's going on here?
Parsing of a single swim_member_key value and filling swim_member_def.
I thought it is obvious from the function name, but as you wish, I will
write a comment.
@@ -544,12 +544,23 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
}
}
+/**
+ * Decode a MessagePack value of @a key and store it in @a def.
+ * @param key Key to read value of.
+ * @param[in][out] pos Where a value is stored.
+ * @param end End of the buffer.
+ * @param msg_pref Error message prefix.
+ * @param[out] def Where to store the value.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
static int
>
>> +{
>> + switch(key) {
> Missing space.
swim_process_member_key(enum swim_member_key key, const char **pos,
const char *end, const char *msg_pref,
struct swim_member_def *def)
{
- switch(key) {
+ switch (key) {
case SWIM_MEMBER_STATUS:
>
>> + case SWIM_MEMBER_STATUS:
>> + if (mp_typeof(**pos) != MP_UINT ||
>> + mp_check_uint(*pos, end) > 0) {
>> + say_error("%s member status should be uint", msg_pref);
>> + return -1;
>> + }
>> + key = mp_decode_uint(pos);
>> + if (key >= swim_member_status_MAX) {
>> + say_error("%s unknown member status", msg_pref);
>> + return -1;
>> + }
>> + def->status = (enum swim_member_status) key;
>> + break;
>> + case SWIM_MEMBER_ADDRESS:
>> + if (mp_typeof(**pos) != MP_UINT ||
>> + mp_check_uint(*pos, end) > 0) {
>> + say_error("%s member address should be uint", msg_pref);
>> + return -1;
>> + }
>> + def->addr.sin_addr.s_addr = mp_decode_uint(pos);
>> + break;
>> + case SWIM_MEMBER_PORT:
>> + if (mp_typeof(**pos) != MP_UINT ||
>> + mp_check_uint(*pos, end) > 0) {
>> + say_error("%s member port should be uint", msg_pref);
>> + return -1;
>> + }
>> + uint64_t port = mp_decode_uint(pos);
>> + if (port > UINT16_MAX) {
>> + say_error("%s member port is invalid", msg_pref);
>> + return -1;
>> + }
>> + def->addr.sin_port = port;
>> + break;
>> + default:
>> + unreachable();
>> + }
>> + return 0;
>> +}
>
> OK, I understand now it's merely updating a member configuration.
It is not updating of a member configuration, but filling of swim_member_def
structure. Like opts_parse_key.
> But then what's the event flow of such update? What happens after
> the update? I know from the public API that we suspend all
> activities while update is in progress, but this is not so obvious
> when you reading the code.
Nothing happens after the update in this commit. A struct swim_member is
just added to a members table and never changes nor disappears.
Also, I do not understand what do you mean saying "we suspend all
activities while update is in progress". Of course we suspend, since it is
TX thread. Any action in TX thread suspend all other actions.
>
>> +/** Decode an anti-entropy message, update members table. */
>> +static int
>> +swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
>
> Why not swim_decode_whatever?
Because it is not just decoding. After decoding of each member actions
may be performed to update/add/delete a member.
> process is a very general world, it could mean any action.
Exactly. Any action may be performed during decoding.
>
>> +{
>> + const char *msg_pref = "Invalid SWIM anti-entropy message:";
>> + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
>> + say_error("%s message should be an array", msg_pref);
>> + return -1;
>> + }
>> + uint64_t size = mp_decode_array(pos);
>> + for (uint64_t i = 0; i < size; ++i) {
>> + if (mp_typeof(**pos) != MP_MAP ||
>> + mp_check_map(*pos, end) > 0) {
>> + say_error("%s member should be map", msg_pref);
>> + return -1;
>> + }
>> + uint64_t map_size = mp_decode_map(pos);
>> + struct swim_member_def def;
>> + swim_member_def_create(&def);
>> + for (uint64_t j = 0; j < map_size; ++j) {
>> + if (mp_typeof(**pos) != MP_UINT ||
>> + mp_check_uint(*pos, end) > 0) {
>> + say_error("%s member key should be uint",
>> + msg_pref);
>> + return -1;
>> + }
>> + uint64_t key = mp_decode_uint(pos);
>> + if (key >= swim_member_key_MAX) {
>> + say_error("%s unknown member key", msg_pref);
>> + return -1;
>> + }
>> + if (swim_process_member_key(key, pos, end, msg_pref,
>> + &def) != 0)
>> + return -1;
>> + }
>> + if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
>> + say_error("%s member address should be specified",
>> + msg_pref);
>> + return -1;
>> + }
>> + swim_process_member_update(swim, &def);
>
> Why not swim_udpate_member?
For names consistency - process_<component>, process_update, ... .
But as you wish.
@@ -530,7 +530,7 @@ swim_member_def_create(struct swim_member_def *def)
}
static void
-swim_process_member_update(struct swim *swim, struct swim_member_def *def)
+swim_update_member(struct swim *swim, struct swim_member_def *def)
{
struct swim_member *member = swim_find_member(swim, &def->addr);
/*
@@ -641,7 +641,7 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
msg_pref);
return -1;
}
- swim_process_member_update(swim, &def);
+ swim_update_member(swim, &def);
}
return 0;
>
> Someone needs to check that there is no buffer overflow in this
> code (I will check in the next review round).
Where is a buffer to overflow? If you mean checking of the buffer
borders, I do it via mp_check_...() before decoding of any value.
>
>> +/**
>> + * Convert a string URI like "ip:port" to sockaddr_in structure.
>> + */
>> +static int
>> +uri_to_addr(const char *str, struct sockaddr_in *addr)
>> +{
>> + struct uri uri;
>> + if (uri_parse(&uri, str) != 0 || uri.service == NULL)
>> + goto invalid_uri;
>> + in_addr_t iaddr;
>> + if (uri.host_len == strlen(URI_HOST_UNIX) &&
>> + memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) {
>> + diag_set(IllegalParams, "Unix sockets are not supported");
>> + return -1;
>> + }
>> + if (uri.host_len == 0) {
>> + iaddr = htonl(INADDR_ANY);
>> + } else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) {
>> + iaddr = htonl(INADDR_LOOPBACK);
>> + } else {
>> + iaddr = inet_addr(tt_cstr(uri.host, uri.host_len));
>> + if (iaddr == (in_addr_t) -1)
>> + goto invalid_uri;
>> + }
>> + int port = htons(atoi(uri.service));
>> + memset(addr, 0, sizeof(*addr));
>> + addr->sin_family = AF_INET;
>> + addr->sin_addr.s_addr = iaddr;
>> + addr->sin_port = port;
>> + return 0;
>> +
>> +invalid_uri:
>> + diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str);
>> + return -1;
>> +}
>
> Shouldn't this be part of sio (it's a utility function).
Done in a separate commit.
>
>> + }
>> + if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) {
>> + swim_member_delete(swim, new_self);
>> + return -1;
>
> Why is this a scheduler function, not member function?
>
Because I do not bind a member. I bind the whole struct swim to an
address. All communication with each member goes then through this
address.
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component
2018-12-29 10:14 ` [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2019-01-09 9:12 ` [tarantool-patches] " Konstantin Osipov
@ 2019-01-09 11:45 ` Konstantin Osipov
2019-01-15 14:42 ` [tarantool-patches] " Vladislav Shpilevoy
1 sibling, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2019-01-09 11:45 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
> + swim_member_bin_create(&member_bin);
> + for (; i < (int) mh_size(swim->members); ++i) {
> + char *pos = swim_packet_alloc(packet, sizeof(member_bin));
> + if (pos == NULL)
> + break;
> + struct swim_member *member = swim->shuffled_members[i];
> + swim_member_bin_reset(&member_bin, member);
Why do you need to create() the member if you then reset it?
Perhaps encode() or fill() is a more suitable verb than reset?
> + memcpy(pos, &member_bin, sizeof(member_bin));
> + swim_anti_entropy_header_bin_create(&ae_header_bin, i);
> + memcpy(header, &ae_header_bin, sizeof(ae_header_bin));
> + swim_packet_flush(packet);
Why flush() and not simply send()?
> +swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
Why not simply swim_encode_round()?
> +/** Once per specified timeout trigger a next broadcast step. */
> +static void
> +swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
Once again I have a difficulty understanding the name. Is it swim
step begin or swim round begin? What is swim round step? Sounds
like each round has many steps and each step has a beginning and an end?
Then I'm missing swim_round_step_end(), swim_round_step_first(),
or something like that.
Looking at the code, swim_round_step_begin() is simply
swim_round().
> +static void
> +swim_process_member_update(struct swim *swim, struct swim_member_def *def)
> +{
> + struct swim_member *member = swim_find_member(swim, &def->addr);
> + /*
> + * Trivial processing of a new member - just add it to the
> + * members table.
> + */
> + if (member == NULL) {
> + member = swim_member_new(swim, &def->addr, def->status);
> + if (member == NULL)
> + diag_log();
> + }
> +}
Why nothing is done for an existing member? This needs a comment, no?
> +
> +struct swim_transport swim_udp_transport = {
> + /* .send_round_msg = */ swim_udp_send_msg,
> + /* .recv_msg = */ swim_udp_recv_msg,
> +};
Initializing/destroying an endpoint (like calling bind()) should also be
part of transport api.
> +int
> +swim_scheduler_bind(struct swim_scheduler *scheduler, struct sockaddr_in *addr)
And not part of the scheduler api.
> + evio_setsockopt_server(fd, AF_INET, SOCK_DGRAM) != 0) {
The file descriptor itself should also be part of the transport.
> +static void
> +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
> +{
> + assert((events & EV_READ) != 0);
> + (void) events;
> + (void) loop;
> + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
> + struct sockaddr_in addr;
> + socklen_t len = sizeof(addr);
> + struct swim_packet packet;
> + struct swim_msg msg;
> + swim_msg_create(&msg);
> + swim_packet_create(&packet, &msg);
> + swim_transport_recv_f recv = scheduler->transport->recv_msg;
> + ssize_t size = recv(io->fd, packet.body, packet.end - packet.body,
> + (struct sockaddr *) &addr, &len);
I don't understand why you do it here, if it's part of the
transport api.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component
2019-01-09 11:45 ` [tarantool-patches] " Konstantin Osipov
@ 2019-01-15 14:42 ` Vladislav Shpilevoy
0 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-15 14:42 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
On 09/01/2019 14:45, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
>> + swim_member_bin_create(&member_bin);
>
>> + for (; i < (int) mh_size(swim->members); ++i) {
>> + char *pos = swim_packet_alloc(packet, sizeof(member_bin));
>> + if (pos == NULL)
>> + break;
>> + struct swim_member *member = swim->shuffled_members[i];
>> + swim_member_bin_reset(&member_bin, member);
>
> Why do you need to create() the member if you then reset it?
> Perhaps encode() or fill() is a more suitable verb than reset?
Create initializes constant fields. Reset fills others. But as you
wish. Done here and with similar functions in the next commits.
@@ -269,8 +269,7 @@ struct PACKED swim_member_bin {
};
static inline void
-swim_member_bin_reset(struct swim_member_bin *header,
- struct swim_member *member)
+swim_member_bin_fill(struct swim_member_bin *header, struct swim_member *member)
{
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
@@ -439,7 +438,7 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
if (pos == NULL)
break;
struct swim_member *member = swim->shuffled_members[i];
- swim_member_bin_reset(&member_bin, member);
+ swim_member_bin_fill(&member_bin, member);
memcpy(pos, &member_bin, sizeof(member_bin));
}
if (i == 0)
>
>> + memcpy(pos, &member_bin, sizeof(member_bin));
>> + swim_anti_entropy_header_bin_create(&ae_header_bin, i);
>> + memcpy(header, &ae_header_bin, sizeof(ae_header_bin));
>> + swim_packet_flush(packet);
>
> Why flush() and not simply send()?
Because it has nothing to do with network. swim_packet is an
allocator, where backend buffer is a char[UDP_PACKET_SIZE].
Flush() moves current position in that buffer. Please, read
carefully swim_packet and swim_msg API.
As a leading light I've used mpstream API:
mpstream_flush(), reserve(), advance().
>
>> +swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
>
> Why not simply swim_encode_round()?
Because it takes a message as an argument and encodes it.
>
>> +/** Once per specified timeout trigger a next broadcast step. */
>> +static void
>> +swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
>
> Once again I have a difficulty understanding the name. Is it swim
> step begin or swim round begin? What is swim round step? Sounds
> like each round has many steps and each step has a beginning and an end?
It is begin of a step. What is round step is described at the beginning
of the file. Cite:
"
* Tarantool splits protocol operation into rounds. At the
* beginning of a round all members are randomly reordered and
* linked into a list. At each round step a member is popped from
* the list head, a message is sent to him, and he waits for the
* next round.
"
>
> Then I'm missing swim_round_step_end(), swim_round_step_first(),
> or something like that.
There are no first nor second nor end. swim_round_step_begin schedules
sending of a round message to the next member. It does not send
immediately, so technically it is not round_step(), but round_step_begin().
Instead of round_step_end() I have round_step_complete(), just like
for vy_task.
>
> Looking at the code, swim_round_step_begin() is simply
> swim_round().
It is not round. What is round is described at the beginning of
the file, see cite above.
>
>
>> +static void
>> +swim_process_member_update(struct swim *swim, struct swim_member_def *def)
>> +{
>> + struct swim_member *member = swim_find_member(swim, &def->addr);
>> + /*
>> + * Trivial processing of a new member - just add it to the
>> + * members table.
>> + */
>> + if (member == NULL) {
>> + member = swim_member_new(swim, &def->addr, def->status);
>> + if (member == NULL)
>> + diag_log();
>> + }
>> +}
>
> Why nothing is done for an existing member? This needs a comment, no?
In this commit a member can only be added. Not updated nor deleted. These
actions are introduced in the next commits.
>
>> +
>> +struct swim_transport swim_udp_transport = {
>> + /* .send_round_msg = */ swim_udp_send_msg,
>> + /* .recv_msg = */ swim_udp_recv_msg,
>> +};
>
> Initializing/destroying an endpoint (like calling bind()) should also be
> part of transport api.>
>> +int
>> +swim_scheduler_bind(struct swim_scheduler *scheduler, struct sockaddr_in *addr)
>
> And not part of the scheduler api.
As you wish. Done on the branch. I do not pase the diff here
because it is too big.
>
>> + evio_setsockopt_server(fd, AF_INET, SOCK_DGRAM) != 0) {
>
> The file descriptor itself should also be part of the transport.
In such a case the transport will be not a vtab, but an object, with
its own attributes. As I remember, you asked to make the transport just
a simple vtab. But as you wish. Done on the branch.
>
>> +static void
>> +swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
>> +{
>> + assert((events & EV_READ) != 0);
>> + (void) events;
>> + (void) loop;
>> + struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
>> + struct sockaddr_in addr;
>> + socklen_t len = sizeof(addr);
>> + struct swim_packet packet;
>> + struct swim_msg msg;
>> + swim_msg_create(&msg);
>> + swim_packet_create(&packet, &msg);
>> + swim_transport_recv_f recv = scheduler->transport->recv_msg;
>> + ssize_t size = recv(io->fd, packet.body, packet.end - packet.body,
>> + (struct sockaddr *) &addr, &len);
>
> I don't understand why you do it here, if it's part of the
> transport api.
It is. recv here is a variable taken from scheduler->transport->recv_msg.
But never mind, this code is reworked already due to other comments.
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH v3 2/6] [RAW] swim: introduce failure detection component
2018-12-29 10:14 [PATCH v3 0/6] SWIM draft Vladislav Shpilevoy
2018-12-29 10:14 ` [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
@ 2018-12-29 10:14 ` Vladislav Shpilevoy
2019-01-09 13:48 ` [tarantool-patches] " Konstantin Osipov
2018-12-29 10:14 ` [PATCH v3 3/6] [RAW] swim: introduce a dissemination component Vladislav Shpilevoy
` (3 subsequent siblings)
5 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-29 10:14 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Failure detection components allows to find which members are
already dead.
Part of #3234
---
src/lib/swim/swim.c | 439 +++++++++++++++++++++++++++++++++-
src/lib/swim/swim_io.c | 14 ++
src/lib/swim/swim_io.h | 16 ++
src/lib/swim/swim_transport.h | 4 +-
4 files changed, 463 insertions(+), 10 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 08c377374..c7bc11bca 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -89,6 +89,22 @@
enum {
/** How often to send membership messages and pings. */
HEARTBEAT_RATE_DEFAULT = 1,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack.
+ */
+ ACK_TIMEOUT = 1,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a not pinned member confirmed to be dead, it is
+ * removed from the membership after at least this number
+ * of failed pings.
+ */
+ NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
};
/**
@@ -109,11 +125,17 @@ enum swim_member_status {
* members table.
*/
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership, if it is not pinned.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
static const char *swim_member_status_strs[] = {
"alive",
+ "dead",
};
/**
@@ -138,6 +160,30 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * True, if the member is configured explicitly and can
+ * not disappear from the membership.
+ */
+ bool is_pinned;
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row. After
+ * a threshold the instance is marked as dead. After more
+ * it is removed from the table (if not pinned).
+ */
+ int failed_pings;
+ /** When the latest ping was sent to this member. */
+ double ping_ts;
+ /** Ready at hand regular ACK task. */
+ struct swim_task ack_task;
+ struct swim_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct rlist in_queue_wait_ack;
};
/**
@@ -189,6 +235,19 @@ struct swim {
* anti-entropy message.
*/
struct swim_member **shuffled_members;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * Members waiting for an ACK. On too long absence of ACK
+ * a member is considered to be dead and is removed. The
+ * list is sorted by time in ascending order (tail is
+ * newer, head is older).
+ */
+ struct rlist queue_wait_ack;
+ /** Generator of ack checking events. */
+ struct ev_periodic wait_ack_tick;
};
static inline uint64_t
@@ -204,8 +263,84 @@ sockaddr_in_hash(const struct sockaddr_in *a)
*/
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
+ SWIM_FAILURE_DETECTION,
};
+/** {{{ Failure detection component */
+
+/** Possible failure detection keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
+};
+
+/**
+ * Failure detection message now has only two types: ping or ack.
+ * Indirect ping/ack are todo.
+ */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+static const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
+};
+
+/** SWIM failure detection MsgPack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+static void
+swim_member_schedule_ack_wait(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_wait_ack)) {
+ member->ping_ts = fiber_time();
+ rlist_add_tail_entry(&swim->queue_wait_ack, member,
+ in_queue_wait_ack);
+ }
+}
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -220,6 +355,7 @@ enum swim_member_key {
*/
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -243,7 +379,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
/** SWIM member MsgPack template. */
struct PACKED swim_member_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -262,6 +398,12 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(addr.sin_port) */
uint8_t m_port;
uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
static inline void
@@ -271,17 +413,20 @@ swim_member_bin_reset(struct swim_member_bin *header,
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
}
static inline void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x83;
+ header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
header->k_port = SWIM_MEMBER_PORT;
header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
/** }}} Anti-entropy component */
@@ -289,17 +434,51 @@ swim_member_bin_create(struct swim_member_bin *header)
/**
* SWIM message structure:
* {
+ * SWIM_FAILURE_DETECTION: {
+ * SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type,
+ * SWIM_FD_INCARNATION: uint
+ * },
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
* SWIM_MEMBER_ADDRESS: uint, ip,
- * SWIM_MEMBER_PORT: uint, port
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
* },
* ...
* ],
* }
*/
+/**
+ * Update status and incarnation of the member if needed. Statuses
+ * are compared as a compound key: {incarnation, status}. So @a
+ * new_status can override an old one only if its incarnation is
+ * greater, or the same, but its status is "bigger". Statuses are
+ * compared by their identifier, so "alive" < "dead". This
+ * protects from the case when a member is detected as dead on one
+ * instance, but overriden by another instance with the same
+ * incarnation "alive" message.
+ */
+static inline void
+swim_member_update_status(struct swim *swim, struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation)
+{
+ (void) swim;
+ assert(member != swim->self);
+ if (member->incarnation == incarnation) {
+ if (member->status < new_status)
+ member->status = new_status;
+ } else if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ }
+}
+
/**
* Remove the member from all queues, hashes, destroy it and free
* the memory.
@@ -313,6 +492,11 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
mh_i64ptr_del(swim->members, rc, NULL);
rlist_del_entry(member, in_queue_round);
+ /* Failure detection component. */
+ rlist_del_entry(member, in_queue_wait_ack);
+ swim_task_destroy(&member->ack_task);
+ swim_task_destroy(&member->ping_task);
+
free(member);
}
@@ -322,7 +506,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
*/
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
- enum swim_member_status status)
+ enum swim_member_status status, uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -343,6 +527,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
}
rlist_add_entry(&swim->queue_round, member, in_queue_round);
+ /* Failure detection component. */
+ member->incarnation = incarnation;
+ rlist_create(&member->in_queue_wait_ack);
+ swim_task_create(&member->ack_task, &swim->scheduler, swim_task_reset);
+ swim_task_create(&member->ping_task, &swim->scheduler, swim_task_reset);
+
return member;
}
@@ -442,6 +632,28 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
return 1;
}
+/**
+ * Encode failure detection component.
+ * @retval -1 Error.
+ * @retval 1 Success, something is encoded.
+ */
+static int
+swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
+ enum swim_fd_msg_type type)
+{
+ struct swim_fd_header_bin fd_header_bin;
+ int size = sizeof(fd_header_bin);
+ struct swim_packet *packet = swim_msg_reserve(msg, size);
+ if (packet == NULL)
+ return -1;
+ char *pos = swim_packet_alloc(packet, size);
+ swim_fd_header_bin_create(&fd_header_bin, type,
+ swim->self->incarnation);
+ memcpy(pos, &fd_header_bin, size);
+ swim_packet_flush(packet);
+ return 1;
+}
+
/** Encode SWIM components into a sequence of UDP packets. */
static int
swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -453,6 +665,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
char *header = swim_packet_alloc(packet, 1);
int rc, map_size = 0;
+ rc = swim_encode_failure_detection(swim, msg, SWIM_FD_MSG_PING);
+ if (rc < 0)
+ goto error;
+ map_size += rc;
+
rc = swim_encode_anti_entropy(swim, msg);
if (rc < 0)
goto error;
@@ -490,10 +707,11 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
return;
}
struct swim_member *m =
- rlist_shift_entry(&swim->queue_round, struct swim_member,
+ rlist_first_entry(&swim->queue_round, struct swim_member,
in_queue_round);
swim_task_schedule(&swim->round_step_task,
swim->transport->send_round_msg, &m->addr);
+ swim_member_schedule_ack_wait(swim, m);
ev_periodic_stop(loop, p);
}
@@ -501,16 +719,84 @@ static void
swim_round_step_complete(struct swim_task *task)
{
struct swim *swim = container_of(task, struct swim, round_step_task);
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
swim_msg_reset(&task->msg);
ev_periodic_start(loop(), &swim->round_tick);
}
+/** Send a failure detection message. */
+static void
+swim_schedule_fd_request(struct swim *swim, struct swim_task *task,
+ struct swim_member *m, enum swim_fd_msg_type type,
+ swim_transport_send_f send)
+{
+ struct swim_msg *msg = &task->msg;
+ int rc = swim_encode_failure_detection(swim, msg, type);
+ if (rc < 0) {
+ diag_log();
+ swim_task_delete(task);
+ return;
+ }
+ assert(rc > 0);
+ say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+ sio_strfaddr((struct sockaddr *) &m->addr,
+ sizeof(m->addr)));
+ swim_task_schedule(task, send, &m->addr);
+}
+
+static inline void
+swim_schedule_ack(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_fd_request(swim, &member->ack_task, member,
+ SWIM_FD_MSG_ACK, swim->transport->send_ack);
+}
+
+static inline void
+swim_schedule_ping(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_fd_request(swim, &member->ping_task, member,
+ SWIM_FD_MSG_PING, swim->transport->send_ping);
+ swim_member_schedule_ack_wait(swim, member);
+}
+
+/**
+ * Check for failed pings. A ping is failed if an ack was not
+ * received during ACK_TIMEOUT. A failed ping is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) loop;
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ struct swim_member *m, *tmp;
+ double current_time = fiber_time();
+ rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
+ tmp) {
+ if (current_time - m->ping_ts < ACK_TIMEOUT)
+ break;
+ ++m->failed_pings;
+ if (m->failed_pings >= NO_ACKS_TO_GC) {
+ if (!m->is_pinned)
+ swim_member_delete(swim, m);
+ continue;
+ }
+ if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ m->status = MEMBER_DEAD;
+ swim_schedule_ping(swim, m);
+ rlist_del_entry(m, in_queue_wait_ack);
+ }
+}
+
/**
* SWIM member attributes from anti-entropy and dissemination
* messages.
*/
struct swim_member_def {
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -530,9 +816,35 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
* members table.
*/
if (member == NULL) {
- member = swim_member_new(swim, &def->addr, def->status);
+ member = swim_member_new(swim, &def->addr, def->status,
+ def->incarnation);
if (member == NULL)
diag_log();
+ return;
+ }
+ struct swim_member *self = swim->self;
+ if (member != self) {
+ swim_member_update_status(swim, member, def->status,
+ def->incarnation);
+ return;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
}
}
@@ -576,6 +888,15 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
}
def->addr.sin_port = port;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s member incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ def->incarnation = mp_decode_uint(pos);
+ break;
default:
unreachable();
}
@@ -627,12 +948,95 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule pings, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(struct swim *swim, const char **pos,
+ const char *end, const struct sockaddr_in *src)
+{
+ const char *msg_pref = "Invalid SWIM failure detection message:";
+ if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
+ say_error("%s root should be a map", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_map(pos);
+ if (size != 2) {
+ say_error("%s root map should have two keys - message type "\
+ "and incarnation", msg_pref);
+ return -1;
+ }
+ enum swim_fd_msg_type type = swim_fd_msg_type_MAX;
+ uint64_t incarnation = 0;
+ for (int i = 0; i < (int) size; ++i) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s a key should be uint", msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s message type should be uint",
+ msg_pref);
+ return -1;
+ }
+ key = mp_decode_uint(pos);
+ if (key >= swim_fd_msg_type_MAX) {
+ say_error("%s unknown message type", msg_pref);
+ return -1;
+ }
+ type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s incarnation should be uint",
+ msg_pref);
+ return -1;
+ }
+ incarnation = mp_decode_uint(pos);
+ break;
+ default:
+ say_error("%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (type == swim_fd_msg_type_MAX) {
+ say_error("%s message type should be specified", msg_pref);
+ return -1;
+ }
+ struct swim_member *sender = swim_find_member(swim, src);
+ if (sender == NULL) {
+ sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation);
+ if (sender == NULL) {
+ diag_log();
+ return 0;
+ }
+ } else {
+ swim_member_update_status(swim, sender, MEMBER_ALIVE,
+ incarnation);
+ }
+ if (type == SWIM_FD_MSG_PING) {
+ swim_schedule_ack(swim, sender);
+ } else {
+ assert(type == SWIM_FD_MSG_ACK);
+ if (incarnation >= sender->incarnation) {
+ sender->failed_pings = 0;
+ rlist_del_entry(sender, in_queue_wait_ack);
+ }
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler,
const struct swim_packet *packet, const struct sockaddr_in *src)
{
- (void) src;
const char *msg_pref = "Invalid SWIM message:";
struct swim *swim = container_of(scheduler, struct swim, scheduler);
const char *pos = packet->body;
@@ -655,6 +1059,12 @@ swim_on_input(struct swim_scheduler *scheduler,
if (swim_process_anti_entropy(swim, &pos, end) != 0)
return;
break;
+ case SWIM_FAILURE_DETECTION:
+ say_verbose("SWIM: process failure detection");
+ if (swim_process_failure_detection(swim, &pos, end,
+ src) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -726,6 +1136,12 @@ swim_new(void)
swim_round_step_complete);
swim->transport = &swim_udp_transport;
swim_scheduler_create(&swim->scheduler, swim_on_input, swim->transport);
+
+ /* Failure detection component. */
+ rlist_create(&swim->queue_wait_ack);
+ ev_init(&swim->wait_ack_tick, swim_check_acks);
+ ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
+ swim->wait_ack_tick.data = (void *) swim;
return swim;
}
@@ -738,7 +1154,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
struct swim_member *new_self = NULL;
if (swim_find_member(swim, &addr) == NULL) {
- new_self = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
if (new_self == NULL)
return -1;
}
@@ -747,6 +1163,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
ev_periodic_start(loop(), &swim->round_tick);
+ ev_periodic_start(loop(), &swim->wait_ack_tick);
if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
@@ -767,9 +1184,10 @@ swim_add_member(struct swim *swim, const char *uri)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
if (member == NULL) {
- member = swim_member_new(swim, &addr, MEMBER_ALIVE);
+ member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
if (member == NULL)
return -1;
+ member->is_pinned = true;
}
return 0;
}
@@ -800,6 +1218,8 @@ swim_info(struct swim *swim, struct info_handler *info)
sizeof(member->addr)));
info_append_str(info, "status",
swim_member_status_strs[member->status]);
+ info_append_int(info, "incarnation",
+ (int64_t) member->incarnation);
info_table_end(info);
}
info_end(info);
@@ -810,6 +1230,7 @@ swim_delete(struct swim *swim)
{
swim_scheduler_destroy(&swim->scheduler);
ev_periodic_stop(loop(), &swim->round_tick);
+ ev_periodic_stop(loop(), &swim->wait_ack_tick);
swim_task_destroy(&swim->round_step_task);
mh_int_t node = mh_first(swim->members);
while (node != mh_end(swim->members)) {
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 8a1eca819..00a16a2bb 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -23,6 +23,8 @@ swim_udp_recv_msg(int fd, void *buffer, size_t size, struct sockaddr *addr,
struct swim_transport swim_udp_transport = {
/* .send_round_msg = */ swim_udp_send_msg,
+ /* .send_ping = */ swim_udp_send_msg,
+ /* .send_ack = */ swim_udp_send_msg,
/* .recv_msg = */ swim_udp_recv_msg,
};
@@ -39,6 +41,18 @@ swim_packet_new(struct swim_msg *msg)
return res;
}
+struct swim_task *
+swim_task_new(struct swim_scheduler *scheduler, swim_task_f complete)
+{
+ struct swim_task *task = (struct swim_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ swim_task_create(task, scheduler, complete);
+ return task;
+}
+
void
swim_task_schedule(struct swim_task *task, swim_transport_send_f send,
const struct sockaddr_in *dst)
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index fc44fd0a7..f08bd1ef3 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -279,12 +279,21 @@ swim_task_create(struct swim_task *task, struct swim_scheduler *scheduler,
task->scheduler = scheduler;
}
+struct swim_task *
+swim_task_new(struct swim_scheduler *scheduler, swim_task_f complete);
+
static inline bool
swim_task_is_active(struct swim_task *task)
{
return ! rlist_empty(&task->in_queue_output);
}
+static inline void
+swim_task_reset(struct swim_task *task)
+{
+ swim_msg_reset(&task->msg);
+}
+
static inline void
swim_task_destroy(struct swim_task *task)
{
@@ -292,6 +301,13 @@ swim_task_destroy(struct swim_task *task)
swim_msg_destroy(&task->msg);
}
+static inline void
+swim_task_delete(struct swim_task *task)
+{
+ swim_task_destroy(task);
+ free(task);
+}
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h
index d629526ac..0bf4aa186 100644
--- a/src/lib/swim/swim_transport.h
+++ b/src/lib/swim/swim_transport.h
@@ -52,7 +52,9 @@ struct swim_transport {
* are like sendto().
*/
swim_transport_send_f send_round_msg;
-
+ /** Send failure detection message. */
+ swim_transport_send_f send_ping;
+ swim_transport_send_f send_ack;
/**
* Receive a message. Not necessary round or failure
* detection. Before message is received, its type is
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] [PATCH v3 2/6] [RAW] swim: introduce failure detection component
2018-12-29 10:14 ` [PATCH v3 2/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
@ 2019-01-09 13:48 ` Konstantin Osipov
2019-01-15 14:42 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2019-01-09 13:48 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
> enum {
> /** How often to send membership messages and pings. */
> HEARTBEAT_RATE_DEFAULT = 1,
> + /**
> + * If a ping was sent, it is considered to be lost after
> + * this time without an ack.
> + */
> + ACK_TIMEOUT = 1,
The timeout should be configurable. A reasonable default looks
more close 30 seconds at least - we have many cases in
(malfunctioning) production when long requests stall the event
loop for 10-15 seconds, such requests should not lead to
membership storms.
> + /**
> + * If a member has not been responding to pings this
> + * number of times, it is considered to be dead.
> + */
> + NO_ACKS_TO_DEAD = 3,
> + /**
> + * If a not pinned member confirmed to be dead, it is
> + * removed from the membership after at least this number
> + * of failed pings.
> + */
> + NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
> };
>
> + bool is_pinned;
> + /** Growing number to refute old messages. */
reject, or perhaps ignore?
refute is usually used for arguments in a heated conversation.
> + /**
> + * How many pings did not receive an ack in a row. After
> + * a threshold the instance is marked as dead. After more
> + * it is removed from the table (if not pinned).
> + */
> + int failed_pings;
These are more like unacknowledged pings. Have they failed? Maybe.
> + /** When the latest ping was sent to this member. */
> + double ping_ts;
last_ping_time? Why use double and not ev_time_t?
> + /** Type of the failure detection message: ping or ack. */
> + SWIM_FD_MSG_TYPE,
> + /**
> + * Incarnation of the sender. To make the member alive if
> + * it was considered to be dead, but ping/ack with greater
> + * incarnation was received from it.
> + */
> + SWIM_FD_INCARNATION,
Uhm, FD is too commonly used for a file descriptor. Please use a
different name. Why not simply SWIM_PING and SWIM_ACK?
> struct swim_transport swim_udp_transport = {
> /* .send_round_msg = */ swim_udp_send_msg,
> + /* .send_ping = */ swim_udp_send_msg,
> + /* .send_ack = */ swim_udp_send_msg,
Why do you need a separate transport api for ack/ping sends?
Shouldn't send/recv be enough? This is a transport layer, it
should be unaware of protocol details.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 2/6] [RAW] swim: introduce failure detection component
2019-01-09 13:48 ` [tarantool-patches] " Konstantin Osipov
@ 2019-01-15 14:42 ` Vladislav Shpilevoy
0 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-15 14:42 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
On 09/01/2019 16:48, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
>> enum {
>> /** How often to send membership messages and pings. */
>> HEARTBEAT_RATE_DEFAULT = 1,
>> + /**
>> + * If a ping was sent, it is considered to be lost after
>> + * this time without an ack.
>> + */
>> + ACK_TIMEOUT = 1,
>
> The timeout should be configurable. A reasonable default looks
> more close 30 seconds at least - we have many cases in
> (malfunctioning) production when long requests stall the event
> loop for 10-15 seconds, such requests should not lead to
> membership storms.
Done.
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 97395a3a9..1c0cc2cd4 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -96,7 +96,7 @@ enum {
* If a ping was sent, it is considered to be lost after
* this time without an ack.
*/
- ACK_TIMEOUT = 1,
+ ACK_TIMEOUT_DEFAULT = 30,
/**
* If a member has not been responding to pings this
* number of times, it is considered to be dead.
@@ -765,7 +765,7 @@ swim_schedule_ping(struct swim *swim, struct swim_member *member)
/**
* Check for failed pings. A ping is failed if an ack was not
- * received during ACK_TIMEOUT. A failed ping is resent here.
+ * received during ACK timeout. A failed ping is resent here.
*/
static void
swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
@@ -775,10 +775,11 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
(void) events;
struct swim *swim = (struct swim *) p->data;
struct swim_member *m, *tmp;
+ double ack_timeout = swim->wait_ack_tick.interval;
double current_time = fiber_time();
rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
tmp) {
- if (current_time - m->ping_ts < ACK_TIMEOUT)
+ if (current_time - m->ping_ts < ack_timeout)
break;
++m->failed_pings;
if (m->failed_pings >= NO_ACKS_TO_GC) {
@@ -1113,7 +1114,7 @@ swim_new(const struct swim_transport_vtab *transport_vtab)
/* Failure detection component. */
rlist_create(&swim->queue_wait_ack);
ev_init(&swim->wait_ack_tick, swim_check_acks);
- ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
+ ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL);
swim->wait_ack_tick.data = (void *) swim;
return swim;
}
@@ -1133,7 +1134,8 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr)
}
int
-swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate)
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ double ack_timeout)
{
struct sockaddr_in addr;
if (swim_uri_to_addr(uri, &addr) != 0)
@@ -1154,6 +1156,9 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate)
if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
+ if (swim->wait_ack_tick.interval != ack_timeout && ack_timeout > 0)
+ ev_periodic_set(&swim->wait_ack_tick, 0, ack_timeout, NULL);
+
swim->self = new_self;
return 0;
}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index b54fc47b2..001900311 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -56,12 +56,15 @@ swim_new(const struct swim_transport_vtab *transport_vtab);
* @heartbeat_rate seconds. It is rather the protocol
* speed. Protocol period depends on member count and
* @heartbeat_rate.
+ * @param ack_timeout Time in seconds after which a ping is
+ * considered to be unacknowledged.
*
* @retval 0 Success.
* @retval -1 Error.
*/
int
-swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate);
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ double ack_timeout);
/**
* Stop listening and broadcasting messages, cleanup all internal
diff --git a/src/lua/swim.c b/src/lua/swim.c
index 6a78e5dd5..8b6f0b0b0 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -75,6 +75,28 @@ lua_swim_gc(struct lua_State *L)
return 0;
}
+static inline double
+lua_swim_get_timeout_field(struct lua_State *L, int ncfg,
+ const char *fieldname, const char *funcname)
+{
+ double timeout;
+ lua_getfield(L, ncfg, fieldname);
+ if (lua_isnumber(L, -1)) {
+ timeout = lua_tonumber(L, -1);
+ if (timeout <= 0) {
+ return luaL_error(L, "swim.%s: %s should be positive "\
+ "number", funcname, fieldname);
+ }
+ } else if (! lua_isnil(L, -1)) {
+ return luaL_error(L, "swim.%s: %s should be positive number",
+ funcname, fieldname);
+ } else {
+ timeout = -1;
+ }
+ lua_pop(L, 1);
+ return timeout;
+}
+
/**
* Configure @a swim instance using a table stored in @a ncfg-th
* position on Lua stack.
@@ -106,23 +128,12 @@ lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim,
}
lua_pop(L, 1);
- double heartbeat_rate;
- lua_getfield(L, ncfg, "heartbeat");
- if (lua_isnumber(L, -1)) {
- heartbeat_rate = lua_tonumber(L, -1);
- if (heartbeat_rate <= 0) {
- return luaL_error(L, "swim.%s: heartbeat should be "\
- "positive number", funcname);
- }
- } else if (! lua_isnil(L, -1)) {
- return luaL_error(L, "swim.%s: heartbeat should be positive "\
- "number", funcname);
- } else {
- heartbeat_rate = -1;
- }
- lua_pop(L, 1);
+ double heartbeat_rate =
+ lua_swim_get_timeout_field(L, ncfg, "heartbeat", funcname);
+ double ack_timeout =
+ lua_swim_get_timeout_field(L, ncfg, "ack_timeout", funcname);
- return swim_cfg(swim, server_uri, heartbeat_rate);
+ return swim_cfg(swim, server_uri, heartbeat_rate, ack_timeout);
}
static int
>
>> + /**
>> + * If a member has not been responding to pings this
>> + * number of times, it is considered to be dead.
>> + */
>> + NO_ACKS_TO_DEAD = 3,
>> + /**
>> + * If a not pinned member confirmed to be dead, it is
>> + * removed from the membership after at least this number
>> + * of failed pings.
>> + */
>> + NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
>> };
>>
>> + bool is_pinned;
>> + /** Growing number to refute old messages. */
>
> reject, or perhaps ignore?
>
> refute is usually used for arguments in a heated conversation.
No, exactly refute. Reject/ignore are almost the same in this
context and mean 'do nothing'. Refute means, according to Oxford
Dictionary:
"Deny or contradict (a statement or accusation). Prove
(a statement or theory) to be wrong or false; disprove."
Here the swim instance refutes incorrect assumptions about its
own incarnation emitted by other cluster members.
>
>> + /**
>> + * How many pings did not receive an ack in a row. After
>> + * a threshold the instance is marked as dead. After more
>> + * it is removed from the table (if not pinned).
>> + */
>> + int failed_pings;
>
> These are more like unacknowledged pings. Have they failed? Maybe.
Done.
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 1c0cc2cd4..75f57f69f 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -105,7 +105,7 @@ enum {
/**
* If a not pinned member confirmed to be dead, it is
* removed from the membership after at least this number
- * of failed pings.
+ * of unacknowledged pings.
*/
NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
};
@@ -180,7 +180,7 @@ struct swim_member {
* a threshold the instance is marked as dead. After more
* it is removed from the table (if not pinned).
*/
- int failed_pings;
+ int unacknowledged_pings;
/** When the latest ping was sent to this member. */
double ping_ts;
/** Ready at hand regular ACK task. */
@@ -764,8 +764,9 @@ swim_schedule_ping(struct swim *swim, struct swim_member *member)
}
/**
- * Check for failed pings. A ping is failed if an ack was not
- * received during ACK timeout. A failed ping is resent here.
+ * Check for unacknowledged pings. A ping is unacknowledged if an
+ * ack was not received during ACK timeout. An unacknowledged ping
+ * is resent here.
*/
static void
swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
@@ -781,13 +782,13 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
tmp) {
if (current_time - m->ping_ts < ack_timeout)
break;
- ++m->failed_pings;
- if (m->failed_pings >= NO_ACKS_TO_GC) {
+ ++m->unacknowledged_pings;
+ if (m->unacknowledged_pings >= NO_ACKS_TO_GC) {
if (!m->is_pinned)
swim_member_delete(swim, m);
continue;
}
- if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD)
m->status = MEMBER_DEAD;
swim_schedule_ping(swim, m);
rlist_del_entry(m, in_queue_wait_ack);
@@ -1040,7 +1041,7 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
} else {
assert(type == SWIM_FD_MSG_ACK);
if (incarnation >= sender->incarnation) {
- sender->failed_pings = 0;
+ sender->unacknowledged_pings = 0;
rlist_del_entry(sender, in_queue_wait_ack);
}
}
>
>> + /** When the latest ping was sent to this member. */
>> + double ping_ts;
>
> last_ping_time? Why use double and not ev_time_t?
I did not use ev_time_t since fiber_time() returns double and
ping_ts is initialized from fiber_time(), so I'll keep double.
The name is changed.
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 75f57f69f..3e1b26356 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -182,7 +182,7 @@ struct swim_member {
*/
int unacknowledged_pings;
/** When the latest ping was sent to this member. */
- double ping_ts;
+ double last_ping_time;
/** Ready at hand regular ACK task. */
struct swim_task ack_task;
struct swim_task ping_task;
@@ -335,7 +335,7 @@ static void
swim_member_schedule_ack_wait(struct swim *swim, struct swim_member *member)
{
if (rlist_empty(&member->in_queue_wait_ack)) {
- member->ping_ts = fiber_time();
+ member->last_ping_time = fiber_time();
rlist_add_tail_entry(&swim->queue_wait_ack, member,
in_queue_wait_ack);
}
@@ -780,7 +780,7 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
double current_time = fiber_time();
rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
tmp) {
- if (current_time - m->ping_ts < ack_timeout)
+ if (current_time - m->last_ping_time < ack_timeout)
break;
++m->unacknowledged_pings;
if (m->unacknowledged_pings >= NO_ACKS_TO_GC) {
>
>> + /** Type of the failure detection message: ping or ack. */
>> + SWIM_FD_MSG_TYPE,
>> + /**
>> + * Incarnation of the sender. To make the member alive if
>> + * it was considered to be dead, but ping/ack with greater
>> + * incarnation was received from it.
>> + */
>> + SWIM_FD_INCARNATION,
>
> Uhm, FD is too commonly used for a file descriptor. Please use a
> different name. Why not simply SWIM_PING and SWIM_ACK?
Because I do not want to put all protocol constants into a single
namespace, starting with SWIM_, so I used SWIM_FD_, SWIM_MEMBER_ etc.
I tried to use SWIM_FAILURE_DETECTION_, but it is too long.
Currently I have these names with 'fd': enum swim_fd_key,
enum swim_fd_msg_type, const char *swim_fd_msg_type_strs[],
struct swim_fd_header_bin.
How about expanding 'fd' to 'fail_det', 'fail_detec'?
Also, I could remove 'fd' from enum constants, but then it
would be harder to determine to which component a constant
belongs.
Please, choose one of new suffixes, or approve removal of
them at all.
>
>> struct swim_transport swim_udp_transport = {
>> /* .send_round_msg = */ swim_udp_send_msg,
>> + /* .send_ping = */ swim_udp_send_msg,
>> + /* .send_ack = */ swim_udp_send_msg,
>
> Why do you need a separate transport api for ack/ping sends?
> Shouldn't send/recv be enough? This is a transport layer, it
> should be unaware of protocol details.
>
>
Because this is what you asked me explicitly for. But honestly I
prefer send/recv, without explicit send_ping/ack etc. So I fixed it
in the first commit.
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH v3 3/6] [RAW] swim: introduce a dissemination component
2018-12-29 10:14 [PATCH v3 0/6] SWIM draft Vladislav Shpilevoy
2018-12-29 10:14 ` [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2018-12-29 10:14 ` [PATCH v3 2/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
@ 2018-12-29 10:14 ` Vladislav Shpilevoy
2018-12-29 10:14 ` [PATCH v3 4/6] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
` (2 subsequent siblings)
5 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-29 10:14 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Dissemination components broadcasts events about member status
updates.
Part of #3234
---
src/lib/swim/swim.c | 287 +++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 282 insertions(+), 5 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index c7bc11bca..4e7ffbc54 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -184,6 +184,27 @@ struct swim_member {
struct swim_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about member status update. So formally,
+ * this structure already has all the needed attributes.
+ * But also an event somehow should be sent to all members
+ * at least once according to SWIM, so it requires
+ * something like TTL for each type of event, which
+ * decrements on each send. And a member can not be
+ * removed from the global table until it gets dead and
+ * its status TTLs is 0, so as to allow other members
+ * learn its dead status.
+ */
+ int status_ttl;
+ /**
+ * Events are put into a queue sorted by event occurrence
+ * time.
+ */
+ struct rlist in_queue_events;
};
/**
@@ -248,6 +269,12 @@ struct swim {
struct rlist queue_wait_ack;
/** Generator of ack checking events. */
struct ev_periodic wait_ack_tick;
+ /**
+ *
+ * Dissemination component
+ */
+ /** Queue of events sorted by occurrence time. */
+ struct rlist queue_events;
};
static inline uint64_t
@@ -264,6 +291,7 @@ sockaddr_in_hash(const struct sockaddr_in *a)
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/** {{{ Failure detection component */
@@ -431,6 +459,88 @@ swim_member_bin_create(struct swim_member_bin *header)
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xdd;
+ header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(4) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ header->m_header = 0x84;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDRESS;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_events)) {
+ rlist_add_tail_entry(&swim->queue_events, member,
+ in_queue_events);
+ }
+ member->status_ttl = mh_size(swim->members);
+}
+
+/** }}} Dissemination component */
+
/**
* SWIM message structure:
* {
@@ -441,6 +551,18 @@ swim_member_bin_create(struct swim_member_bin *header)
*
* OR/AND
*
+ * SWIM_DISSEMINATION: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDRESS: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
+ * },
+ * ...
+ * ],
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -453,6 +575,16 @@ swim_member_bin_create(struct swim_member_bin *header)
* }
*/
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_event(swim, member);
+}
+
/**
* Update status and incarnation of the member if needed. Statuses
* are compared as a compound key: {incarnation, status}. So @a
@@ -468,14 +600,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
enum swim_member_status new_status,
uint64_t incarnation)
{
- (void) swim;
assert(member != swim->self);
if (member->incarnation == incarnation) {
- if (member->status < new_status)
+ if (member->status < new_status) {
member->status = new_status;
+ swim_member_status_is_updated(swim, member);
+ }
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
+ swim_member_status_is_updated(swim, member);
}
}
@@ -497,6 +631,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
swim_task_destroy(&member->ack_task);
swim_task_destroy(&member->ping_task);
+ /* Dissemination component. */
+ assert(rlist_empty(&member->in_queue_events));
+
free(member);
}
@@ -533,6 +670,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
swim_task_create(&member->ack_task, &swim->scheduler, swim_task_reset);
swim_task_create(&member->ping_task, &swim->scheduler, swim_task_reset);
+ /* Dissemination component. */
+ rlist_create(&member->in_queue_events);
+ swim_member_status_is_updated(swim, member);
+
return member;
}
@@ -654,6 +795,60 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
return 1;
}
+/**
+ * Encode a part of the dissemination component into a single SWIM
+ * packet.
+ * @retval -1 Error.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos)
+{
+ struct swim_diss_header_bin diss_header_bin;
+ int size = sizeof(diss_header_bin);
+ struct swim_packet *packet = swim_msg_reserve(msg, size);
+ if (packet == NULL)
+ return -1;
+ char *header = swim_packet_alloc(packet, size);
+
+ int i = 0;
+ struct swim_member *member, *prev = NULL;
+ struct swim_event_bin event_bin;
+ swim_event_bin_create(&event_bin);
+ rlist_foreach_entry(member, *queue_pos, in_queue_events) {
+ char *pos = swim_packet_alloc(packet, sizeof(event_bin));
+ if (pos == NULL)
+ break;
+ swim_event_bin_reset(&event_bin, member);
+ memcpy(pos, &event_bin, sizeof(event_bin));
+ ++i;
+ prev = member;
+ }
+ if (i == 0)
+ return 0;
+ swim_diss_header_bin_create(&diss_header_bin, i);
+ memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+ swim_packet_flush(packet);
+ return 1;
+}
+
+/**
+ * Encode failure dissemination component.
+ * @retval -1 Error.
+ * @retval 1 Success, something is encoded.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
+{
+ struct rlist *pos;
+ rlist_foreach(pos, &swim->queue_events) {
+ if (swim_encode_dissemination_packet(msg, &pos) < 0)
+ return -1;
+ }
+ return ! rlist_empty(&swim->queue_events);
+}
+
/** Encode SWIM components into a sequence of UDP packets. */
static int
swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -670,6 +865,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
goto error;
map_size += rc;
+ rc = swim_encode_dissemination(swim, msg);
+ if (rc < 0)
+ goto error;
+ map_size += rc;
+
rc = swim_encode_anti_entropy(swim, msg);
if (rc < 0)
goto error;
@@ -685,6 +885,21 @@ error:
/** Once per specified timeout trigger a next broadcast step. */
static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+ struct swim_member *member, *tmp;
+ rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+ tmp) {
+ if (--member->status_ttl == 0)
+ rlist_del_entry(member, in_queue_events);
+ }
+}
+
+/**
+ * Do one round step. Send encoded components to a next member
+ * from the queue.
+ */
+static void
swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
{
assert((events & EV_PERIODIC) != 0);
@@ -712,6 +927,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
swim_task_schedule(&swim->round_step_task,
swim->transport->send_round_msg, &m->addr);
swim_member_schedule_ack_wait(swim, m);
+ swim_decrease_events_ttl(swim);
ev_periodic_stop(loop, p);
}
@@ -779,12 +995,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
break;
++m->failed_pings;
if (m->failed_pings >= NO_ACKS_TO_GC) {
- if (!m->is_pinned)
+ if (!m->is_pinned && m->status_ttl == 0)
swim_member_delete(swim, m);
continue;
}
- if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ if (m->failed_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
+ swim_member_status_is_updated(swim, m);
+ }
swim_schedule_ping(swim, m);
rlist_del_entry(m, in_queue_wait_ack);
}
@@ -828,6 +1046,7 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
def->incarnation);
return;
}
+ uint64_t old_incarnation = self->incarnation;
/*
* It is possible that other instances know a bigger
* incarnation of this instance - such thing happens when
@@ -846,6 +1065,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
*/
self->incarnation++;
}
+ if (old_incarnation != self->incarnation)
+ swim_member_status_is_updated(swim, self);
}
static int
@@ -1032,6 +1253,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
return 0;
}
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "Invald SWIM dissemination message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s event should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s event key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown event key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(swim, &def);
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler,
@@ -1065,6 +1330,11 @@ swim_on_input(struct swim_scheduler *scheduler,
src) != 0)
return;
break;
+ case SWIM_DISSEMINATION:
+ say_verbose("SWIM: process dissemination");
+ if (swim_process_dissemination(swim, &pos, end) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -1142,6 +1412,10 @@ swim_new(void)
ev_init(&swim->wait_ack_tick, swim_check_acks);
ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
swim->wait_ack_tick.data = (void *) swim;
+
+ /* Dissemination events. */
+ rlist_create(&swim->queue_events);
+
return swim;
}
@@ -1199,8 +1473,10 @@ swim_remove_member(struct swim *swim, const char *uri)
if (uri_to_addr(uri, &addr) != 0)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
- if (member != NULL)
+ if (member != NULL) {
+ rlist_del_entry(member, in_queue_events);
swim_member_delete(swim, member);
+ }
return 0;
}
@@ -1236,6 +1512,7 @@ swim_delete(struct swim *swim)
while (node != mh_end(swim->members)) {
struct swim_member *m = (struct swim_member *)
mh_i64ptr_node(swim->members, node)->val;
+ rlist_del_entry(m, in_queue_events);
swim_member_delete(swim, m);
node = mh_first(swim->members);
}
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH v3 4/6] [RAW] swim: keep encoded round message cached
2018-12-29 10:14 [PATCH v3 0/6] SWIM draft Vladislav Shpilevoy
` (2 preceding siblings ...)
2018-12-29 10:14 ` [PATCH v3 3/6] [RAW] swim: introduce a dissemination component Vladislav Shpilevoy
@ 2018-12-29 10:14 ` Vladislav Shpilevoy
2018-12-29 10:14 ` [PATCH v3 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Vladislav Shpilevoy
2018-12-29 10:14 ` [PATCH v3 6/6] [RAW] swim: introduce payload Vladislav Shpilevoy
5 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-29 10:14 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
During a SWIM round a message is being handed out consisting of
at most 3 sections. Parts of the message change rarely, by
member attributes update and by removal of some of them. So it is
possible to cache the message and send it during several round
steps in a row. Or even do not rebuild it the whole round.
Also, it allows to send message parts on separate libev EV_WRITE
events, because now the message is stored globally and can be
iterated from different events.
Part of #3234
---
src/lib/swim/swim.c | 35 ++++++++++++++++++++++++++---------
1 file changed, 26 insertions(+), 9 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 4e7ffbc54..7dff22dd5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -246,6 +246,8 @@ struct swim {
* and preallocated per SWIM instance.
*/
struct swim_task round_step_task;
+ /** True, if msg in round_step_task is up to date. */
+ bool is_round_msg_valid;
/** Transport to send/receive data. */
const struct swim_transport *transport;
/** Scheduler of output requests. */
@@ -283,6 +285,12 @@ sockaddr_in_hash(const struct sockaddr_in *a)
return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
}
+static inline void
+cached_round_msg_invalidate(struct swim *swim)
+{
+ swim->is_round_msg_valid = false;
+}
+
/**
* Main round messages can carry merged failure detection
* messages and anti-entropy. With these keys the components can
@@ -583,6 +591,7 @@ static void
swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
{
swim_schedule_event(swim, member);
+ cached_round_msg_invalidate(swim);
}
/**
@@ -620,6 +629,7 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
static void
swim_member_delete(struct swim *swim, struct swim_member *member)
{
+ cached_round_msg_invalidate(swim);
uint64_t key = sockaddr_in_hash(&member->addr);
mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
assert(rc != mh_end(swim->members));
@@ -711,6 +721,7 @@ swim_shuffle_members(struct swim *swim)
int j = swim_scaled_rand(0, i);
SWAP(shuffled[i], shuffled[j]);
}
+ cached_round_msg_invalidate(swim);
return 0;
}
@@ -851,9 +862,12 @@ swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
/** Encode SWIM components into a sequence of UDP packets. */
static int
-swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
+swim_encode_round_msg(struct swim *swim)
{
- swim_msg_create(msg);
+ if (swim->is_round_msg_valid)
+ return 0;
+ struct swim_msg *msg = &swim->round_step_task.msg;
+ swim_msg_reset(msg);
struct swim_packet *packet = swim_msg_reserve(msg, 1);
if (packet == NULL)
return -1;
@@ -879,7 +893,7 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
mp_encode_map(header, map_size);
return 0;
error:
- swim_msg_destroy(msg);
+ cached_round_msg_invalidate(swim);
return -1;
}
@@ -890,8 +904,10 @@ swim_decrease_events_ttl(struct swim *swim)
struct swim_member *member, *tmp;
rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
tmp) {
- if (--member->status_ttl == 0)
+ if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
+ cached_round_msg_invalidate(swim);
+ }
}
}
@@ -915,9 +931,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
*/
if (rlist_empty(&swim->queue_round))
return;
-
- struct swim_msg *msg = &swim->round_step_task.msg;
- if (swim_encode_round_msg(swim, msg) != 0) {
+ if (swim_encode_round_msg(swim) != 0) {
diag_log();
return;
}
@@ -937,7 +951,6 @@ swim_round_step_complete(struct swim_task *task)
struct swim *swim = container_of(task, struct swim, round_step_task);
rlist_shift_entry(&swim->queue_round, struct swim_member,
in_queue_round);
- swim_msg_reset(&task->msg);
ev_periodic_start(loop(), &swim->round_tick);
}
@@ -1442,7 +1455,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
- swim->self = new_self;
+ if (new_self != NULL) {
+ swim->self = new_self;
+ cached_round_msg_invalidate(swim);
+ }
if (new_transport != NULL) {
swim->transport = new_transport;
swim_scheduler_set_transport(&swim->scheduler, new_transport);
@@ -1518,4 +1534,5 @@ swim_delete(struct swim *swim)
}
mh_i64ptr_delete(swim->members);
free(swim->shuffled_members);
+ cached_round_msg_invalidate(swim);
}
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH v3 5/6] [RAW] swim: send one UDP packet per EV_WRITE event
2018-12-29 10:14 [PATCH v3 0/6] SWIM draft Vladislav Shpilevoy
` (3 preceding siblings ...)
2018-12-29 10:14 ` [PATCH v3 4/6] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
@ 2018-12-29 10:14 ` Vladislav Shpilevoy
2019-01-09 13:53 ` [tarantool-patches] " Konstantin Osipov
2018-12-29 10:14 ` [PATCH v3 6/6] [RAW] swim: introduce payload Vladislav Shpilevoy
5 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-29 10:14 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Since the first commit of #3234, where anti-entropy component was
introduced, a single SWIM message could be split into multiple
UDP packets. But so far these packets were being sent in mere
'for' loop on a single EV_WRITE event. It is not proper way of
using event loop, but the simplest, because does not require any
externally stored positions in packet lists.
The previous commit introduced such global list of UDP packets to
send, and now it is much simpler to send each packet on separate
EV_WRITE event. This commit does it.
Part of #3234
---
src/lib/swim/swim_io.c | 19 +++++++++++--------
src/lib/swim/swim_io.h | 1 +
2 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 00a16a2bb..dadf14db2 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -60,6 +60,7 @@ swim_task_schedule(struct swim_task *task, swim_transport_send_f send,
assert(! swim_task_is_active(task));
task->send = send;
task->dst = *dst;
+ task->pos = swim_msg_first_packet(&task->msg);
rlist_add_tail_entry(&task->scheduler->queue_output, task,
in_queue_output);
ev_io_start(loop(), &task->scheduler->output);
@@ -133,19 +134,21 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
return;
}
struct swim_task *task =
- rlist_shift_entry(&scheduler->queue_output, struct swim_task,
+ rlist_first_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
say_verbose("SWIM: send to %s",
sio_strfaddr((struct sockaddr *) &task->dst,
sizeof(task->dst)));
- for (struct swim_packet *packet = swim_msg_first_packet(&task->msg);
- packet != NULL; packet = swim_packet_next(packet)) {
- if (task->send(io->fd, packet->body, packet->pos - packet->body,
- (struct sockaddr *) &task->dst,
- sizeof(task->dst)) == -1)
- diag_log();
+ struct swim_packet *packet = task->pos;
+ if (task->send(io->fd, packet->body, packet->pos - packet->body,
+ (struct sockaddr *) &task->dst,
+ sizeof(task->dst)) == -1)
+ diag_log();
+ task->pos = swim_packet_next(packet);
+ if (task->pos == NULL) {
+ task->complete(task);
+ rlist_del_entry(task, in_queue_output);
}
- task->complete(task);
}
static void
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index f08bd1ef3..605542c4e 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -256,6 +256,7 @@ struct swim_task {
swim_task_f complete;
/** Message to send. */
struct swim_msg msg;
+ struct swim_packet *pos;
/** Destination address. */
struct sockaddr_in dst;
/** Place in a queue of tasks. */
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] [PATCH v3 5/6] [RAW] swim: send one UDP packet per EV_WRITE event
2018-12-29 10:14 ` [PATCH v3 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Vladislav Shpilevoy
@ 2019-01-09 13:53 ` Konstantin Osipov
2019-01-15 14:42 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2019-01-09 13:53 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
> Since the first commit of #3234, where anti-entropy component was
> introduced, a single SWIM message could be split into multiple
> UDP packets. But so far these packets were being sent in mere
> 'for' loop on a single EV_WRITE event. It is not proper way of
> using event loop, but the simplest, because does not require any
> externally stored positions in packet lists.
>
> The previous commit introduced such global list of UDP packets to
> send, and now it is much simpler to send each packet on separate
> EV_WRITE event. This commit does it.
>
Looks like this commit breaks encapsulation of the transport
layer.
One thing, is that we should not have tasks which require sending
multiple packets.
Another is that the packet concept is part of the transport and
should not leak into the protocol itself.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 5/6] [RAW] swim: send one UDP packet per EV_WRITE event
2019-01-09 13:53 ` [tarantool-patches] " Konstantin Osipov
@ 2019-01-15 14:42 ` Vladislav Shpilevoy
0 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-15 14:42 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
On 09/01/2019 16:53, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
>> Since the first commit of #3234, where anti-entropy component was
>> introduced, a single SWIM message could be split into multiple
>> UDP packets. But so far these packets were being sent in mere
>> 'for' loop on a single EV_WRITE event. It is not proper way of
>> using event loop, but the simplest, because does not require any
>> externally stored positions in packet lists.
>>
>> The previous commit introduced such global list of UDP packets to
>> send, and now it is much simpler to send each packet on separate
>> EV_WRITE event. This commit does it.
>>
>
> Looks like this commit breaks encapsulation of the transport
> layer.
It does not. In fact, this commit does not change protocol code
at all and affects only swim_io.c/.h. I think, this is a prove that
it does not break encapsulation, doesn't it?
>
> One thing, is that we should not have tasks which require sending
> multiple packets.
Multipacket is required since it is hard to fit many events,
failure detection and anti-entropy components, especially with payloads,
into a single 1.5Kb packet.
>
> Another is that the packet concept is part of the transport and
> should not leak into the protocol itself.
>
>
As I understood from our verbal conversation, you mistakenly think
that packets are dependent on each other, and multi-packet here works
like a weird TCP, with splitting message into packets and concatenating
them back on receiver side, but it is wrong. Each packet is
self-sufficient piece of data, which is processed independently from
other packets. Each packet carries all necessary headers and a chunk of
events, anti-entropy records etc. Multi-packet here is in fact
multi-message already. Loss of some packets does not affect processing
of other ones.
struct swim_msg encapsulates multi-packet UDP encoding, while struct
swim_task encapsulates multi-packet sending. This is why this commit
so easily changes multi-packet sending from one EV_WRITE to multiple
EV_WRITE - one per each packet.
For protocol, swim.c, struct swim_msg looks like a one big packet,
while swim_io.c is able to send swim_msg in multiple independent
packets which should not be concatenated on receiver side.
Receiver reads packets, struct swim_packet. Not the entire struct
swim_msg. And processes the packets one by one.
^ permalink raw reply [flat|nested] 17+ messages in thread
* [PATCH v3 6/6] [RAW] swim: introduce payload
2018-12-29 10:14 [PATCH v3 0/6] SWIM draft Vladislav Shpilevoy
` (4 preceding siblings ...)
2018-12-29 10:14 ` [PATCH v3 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Vladislav Shpilevoy
@ 2018-12-29 10:14 ` Vladislav Shpilevoy
2019-01-09 13:58 ` [tarantool-patches] " Konstantin Osipov
5 siblings, 1 reply; 17+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-29 10:14 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, kostja
Part of #3234
---
src/lib/swim/swim.c | 139 ++++++++++++++++++++++++++++++++++++++++----
src/lib/swim/swim.h | 4 ++
2 files changed, 131 insertions(+), 12 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 7dff22dd5..fa2ae0273 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -105,6 +105,8 @@ enum {
* of failed pings.
*/
NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
+ /** Reserve 272 bytes for headers. */
+ MAX_PAYLOAD_SIZE = 1200,
};
/**
@@ -200,6 +202,16 @@ struct swim_member {
* learn its dead status.
*/
int status_ttl;
+ /** Arbitrary user data, disseminated on each change. */
+ char *payload;
+ /** Useless formal comment: payload size. */
+ int payload_size;
+ /**
+ * TTL of payload. At most this number of times payload is
+ * sent as a part of dissemination component. Reset on
+ * each update.
+ */
+ int payload_ttl;
/**
* Events are put into a queue sorted by event occurrence
* time.
@@ -392,6 +404,7 @@ enum swim_member_key {
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
SWIM_MEMBER_INCARNATION,
+ SWIM_MEMBER_PAYLOAD,
swim_member_key_MAX,
};
@@ -415,7 +428,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
/** SWIM member MsgPack template. */
struct PACKED swim_member_bin {
- /** mp_encode_map(4) */
+ /** mp_encode_map(5) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -440,6 +453,13 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(64bit incarnation) */
uint8_t m_incarnation;
uint64_t v_incarnation;
+
+ /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+ uint8_t k_payload;
+ /** mp_encode_bin(16bit bin header) */
+ uint8_t m_payload_size;
+ uint16_t v_payload_size;
+ /** Payload data ... */
};
static inline void
@@ -450,12 +470,13 @@ swim_member_bin_reset(struct swim_member_bin *header,
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
header->v_incarnation = mp_bswap_u64(member->incarnation);
+ header->v_payload_size = mp_bswap_u16(member->payload_size);
}
static inline void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x84;
+ header->m_header = 0x85;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
@@ -463,6 +484,8 @@ swim_member_bin_create(struct swim_member_bin *header)
header->m_port = 0xcd;
header->k_incarnation = SWIM_MEMBER_INCARNATION;
header->m_incarnation = 0xcf;
+ header->k_payload = SWIM_MEMBER_PAYLOAD;
+ header->m_payload_size = 0xc5;
}
/** }}} Anti-entropy component */
@@ -488,7 +511,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
/** SWIM event MsgPack template. */
struct PACKED swim_event_bin {
- /** mp_encode_map(4) */
+ /** mp_encode_map(4 or 5) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -518,7 +541,6 @@ struct PACKED swim_event_bin {
static inline void
swim_event_bin_create(struct swim_event_bin *header)
{
- header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
@@ -531,6 +553,7 @@ swim_event_bin_create(struct swim_event_bin *header)
static inline void
swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
{
+ header->m_header = 0x84 + member->payload_ttl > 0;
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
@@ -594,6 +617,14 @@ swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
cached_round_msg_invalidate(swim);
}
+static void
+swim_member_payload_is_updated(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_event(swim, member);
+ member->payload_ttl = mh_size(swim->members);
+ cached_round_msg_invalidate(swim);
+}
+
/**
* Update status and incarnation of the member if needed. Statuses
* are compared as a compound key: {incarnation, status}. So @a
@@ -622,6 +653,28 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
}
}
+static inline int
+swim_member_update_payload(struct swim *swim, struct swim_member *member,
+ uint64_t incarnation, const char *payload,
+ int payload_size)
+{
+ if (incarnation < member->incarnation)
+ return 0;
+ if (payload_size == member->payload_size &&
+ memcmp(payload, member->payload, payload_size) == 0)
+ return 0;
+ char *new_payload = (char *) realloc(member->payload, payload_size);
+ if (new_payload == NULL) {
+ diag_set(OutOfMemory, payload_size, "malloc", "new_payload");
+ return -1;
+ }
+ memcpy(new_payload, payload, payload_size);
+ member->payload = new_payload;
+ member->payload_size = payload_size;
+ swim_member_payload_is_updated(swim, member);
+ return 0;
+}
+
/**
* Remove the member from all queues, hashes, destroy it and free
* the memory.
@@ -653,7 +706,8 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
*/
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
- enum swim_member_status status, uint64_t incarnation)
+ enum swim_member_status status, uint64_t incarnation,
+ const char *payload, int payload_size)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -683,6 +737,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
/* Dissemination component. */
rlist_create(&member->in_queue_events);
swim_member_status_is_updated(swim, member);
+ if (swim_member_update_payload(swim, member, incarnation, payload,
+ payload_size) != 0) {
+ rlist_del_entry(member, in_queue_events);
+ swim_member_delete(swim, member);
+ return NULL;
+ }
return member;
}
@@ -769,12 +829,15 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
swim_member_bin_create(&member_bin);
for (; i < (int) mh_size(swim->members); ++i) {
- char *pos = swim_packet_alloc(packet, sizeof(member_bin));
+ struct swim_member *member = swim->shuffled_members[i];
+ char *pos = swim_packet_alloc(packet, sizeof(member_bin) +
+ member->payload_size);
if (pos == NULL)
break;
- struct swim_member *member = swim->shuffled_members[i];
swim_member_bin_reset(&member_bin, member);
memcpy(pos, &member_bin, sizeof(member_bin));
+ pos += sizeof(member_bin);
+ memcpy(pos, member->payload, member->payload_size);
}
if (i == 0)
return 0;
@@ -828,11 +891,22 @@ swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos)
struct swim_event_bin event_bin;
swim_event_bin_create(&event_bin);
rlist_foreach_entry(member, *queue_pos, in_queue_events) {
- char *pos = swim_packet_alloc(packet, sizeof(event_bin));
+ int size = sizeof(event_bin);
+ if (member->payload_ttl > 0) {
+ size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) +
+ mp_sizeof_bin(member->payload_size);
+ }
+ char *pos = swim_packet_alloc(packet, size);
if (pos == NULL)
break;
swim_event_bin_reset(&event_bin, member);
memcpy(pos, &event_bin, sizeof(event_bin));
+ pos += sizeof(event_bin);
+ if (member->payload_ttl > 0) {
+ pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD);
+ mp_encode_bin(pos, member->payload,
+ member->payload_size);
+ }
++i;
prev = member;
}
@@ -904,6 +978,10 @@ swim_decrease_events_ttl(struct swim *swim)
struct swim_member *member, *tmp;
rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
tmp) {
+ assert(member->status_ttl > 0);
+ assert(member->status_ttl >= member->payload_ttl);
+ if (member->payload_ttl > 0)
+ --member->payload_ttl;
if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
cached_round_msg_invalidate(swim);
@@ -1029,6 +1107,8 @@ struct swim_member_def {
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
+ const char *payload;
+ int payload_size;
};
static inline void
@@ -1048,7 +1128,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
*/
if (member == NULL) {
member = swim_member_new(swim, &def->addr, def->status,
- def->incarnation);
+ def->incarnation, def->payload,
+ def->payload_size);
if (member == NULL)
diag_log();
return;
@@ -1057,6 +1138,10 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
if (member != self) {
swim_member_update_status(swim, member, def->status,
def->incarnation);
+ if (swim_member_update_payload(swim, member, def->incarnation,
+ def->payload,
+ def->payload_size) != 0)
+ diag_log();
return;
}
uint64_t old_incarnation = self->incarnation;
@@ -1131,6 +1216,21 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
}
def->incarnation = mp_decode_uint(pos);
break;
+ case SWIM_MEMBER_PAYLOAD:
+ if (mp_typeof(**pos) != MP_BIN ||\
+ mp_check_binl(*pos, end) > 0) {
+ say_error("%s member payload should be bin", msg_pref);
+ return -1;
+ }
+ uint32_t len;
+ def->payload = mp_decode_bin(pos, &len);
+ if (len > MAX_PAYLOAD_SIZE) {
+ say_error("%s member payload size should be <= %d",
+ msg_pref, MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ def->payload_size = (int) len;
+ break;
default:
unreachable();
}
@@ -1245,7 +1345,8 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
}
struct swim_member *sender = swim_find_member(swim, src);
if (sender == NULL) {
- sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation);
+ sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation,
+ NULL, 0);
if (sender == NULL) {
diag_log();
return 0;
@@ -1441,7 +1542,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
struct swim_member *new_self = NULL;
if (swim_find_member(swim, &addr) == NULL) {
- new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
+ new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL,
+ 0);
if (new_self == NULL)
return -1;
}
@@ -1466,6 +1568,19 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return 0;
}
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size)
+{
+ if (payload_size > MAX_PAYLOAD_SIZE) {
+ diag_set(IllegalParams, "Payload should be <= %d",
+ MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ return swim_member_update_payload(swim, swim->self,
+ swim->self->incarnation, payload,
+ payload_size);
+}
+
int
swim_add_member(struct swim *swim, const char *uri)
{
@@ -1474,7 +1589,7 @@ swim_add_member(struct swim *swim, const char *uri)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
if (member == NULL) {
- member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
+ member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL, 0);
if (member == NULL)
return -1;
member->is_pinned = true;
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 350fa0cee..a44fcb977 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -65,6 +65,10 @@ int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
const struct swim_transport *new_transport);
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size);
+
/**
* Stop listening and broadcasting messages, cleanup all internal
* structures, free memory.
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] [PATCH v3 6/6] [RAW] swim: introduce payload
2018-12-29 10:14 ` [PATCH v3 6/6] [RAW] swim: introduce payload Vladislav Shpilevoy
@ 2019-01-09 13:58 ` Konstantin Osipov
2019-01-15 14:42 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2019-01-09 13:58 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
If the payload is the same for all members, why keep a copy in
each member?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3 6/6] [RAW] swim: introduce payload
2019-01-09 13:58 ` [tarantool-patches] " Konstantin Osipov
@ 2019-01-15 14:42 ` Vladislav Shpilevoy
0 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-15 14:42 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
On 09/01/2019 16:58, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/12/29 15:07]:
>
> If the payload is the same for all members, why keep a copy in
> each member?
>
It is not the same. Payload is a mere attribute of member, just
like status, incarnation, ip, port. Each member can disseminate its
own payload, and once it is changed, other members learn it via
dissemination component and forward this event to other members.
It allows
1) do not block other members payload change, when another member
tries to change the global payload;
2) do not implement complex logic of payload changes merge,
when multiple members are modifying the payload;
3) atomic and granulated update of payloads. When one member
changes its payload, it is not necessary to disseminate payloads
of all other known members;
4) each member can have a payload of size up to packet size, while
in your proposal the global payload for *all* members will be limited
by this limit.
Your proposal is a global payload, but it turns the cluster into
a giant bus line, where while one member occupies the line, other
members can not neither put out their own arbitrary payload, nor
send an answer on the active payload without its modification or
removal.
^ permalink raw reply [flat|nested] 17+ messages in thread