* [PATCH v4 01/12] sio: introduce sio_uri_to_addr
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-02-15 13:21 ` [tarantool-patches] " Konstantin Osipov
2019-01-30 21:28 ` [PATCH v4 10/12] [RAW] swim: introduce 'quit' message Vladislav Shpilevoy
` (10 subsequent siblings)
11 siblings, 1 reply; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
The function parses string URI consisting of either IP and port,
or UNIX socket address, and stores the result into struct
sockaddr.
---
src/CMakeLists.txt | 2 +-
src/sio.c | 41 +++++++++++++++++
src/sio.h | 7 +++
test/unit/CMakeLists.txt | 3 ++
test/unit/sio.c | 96 ++++++++++++++++++++++++++++++++++++++++
5 files changed, 148 insertions(+), 1 deletion(-)
create mode 100644 test/unit/sio.c
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 04de5ad04..38bd576e6 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -120,7 +120,7 @@ endif ()
add_library(core STATIC ${core_sources})
target_link_libraries(core
- salad small
+ salad small uri
${LIBEV_LIBRARIES}
${LIBEIO_LIBRARIES}
${LIBCORO_LIBRARIES}
diff --git a/src/sio.c b/src/sio.c
index 613cd9412..419f4e0f4 100644
--- a/src/sio.c
+++ b/src/sio.c
@@ -36,9 +36,11 @@
#include <limits.h>
#include <netinet/in.h> /* TCP_NODELAY */
#include <netinet/tcp.h> /* TCP_NODELAY */
+#include <arpa/inet.h>
#include "say.h"
#include "trivia/util.h"
#include "exception.h"
+#include "uri.h"
const char *
sio_socketname(int fd)
@@ -321,3 +323,42 @@ sio_strfaddr(struct sockaddr *addr, socklen_t addrlen)
}
return name;
}
+
+int
+sio_uri_to_addr(const char *uri, struct sockaddr *addr)
+{
+ struct uri u;
+ if (uri_parse(&u, uri) != 0 || u.service == NULL)
+ goto invalid_uri;
+ if (u.host_len == strlen(URI_HOST_UNIX) &&
+ memcmp(u.host, URI_HOST_UNIX, u.host_len) == 0) {
+ struct sockaddr_un *un = (struct sockaddr_un *) addr;
+ if (u.service_len + 1 > sizeof(un->sun_path))
+ goto invalid_uri;
+ memcpy(un->sun_path, u.service, u.service_len);
+ un->sun_path[u.service_len] = 0;
+ un->sun_family = AF_UNIX;
+ return 0;
+ }
+ in_addr_t iaddr;
+ if (u.host_len == 0) {
+ iaddr = htonl(INADDR_ANY);
+ } else if (u.host_len == 9 && memcmp("localhost", u.host, 9) == 0) {
+ iaddr = htonl(INADDR_LOOPBACK);
+ } else {
+ iaddr = inet_addr(tt_cstr(u.host, u.host_len));
+ if (iaddr == (in_addr_t) -1)
+ goto invalid_uri;
+ }
+ struct sockaddr_in *in = (struct sockaddr_in *) addr;
+ int port = htons(atoi(u.service));
+ memset(in, 0, sizeof(*in));
+ in->sin_family = AF_INET;
+ in->sin_addr.s_addr = iaddr;
+ in->sin_port = port;
+ return 0;
+
+invalid_uri:
+ diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", uri);
+ return -1;
+}
diff --git a/src/sio.h b/src/sio.h
index 3067dc111..27988a232 100644
--- a/src/sio.h
+++ b/src/sio.h
@@ -203,6 +203,13 @@ ssize_t sio_sendto(int fd, const void *buf, size_t len, int flags,
ssize_t sio_recvfrom(int fd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen);
+/**
+ * Convert a string URI like "ip:port" or "unix/:path" to
+ * sockaddr_in/un structure.
+ */
+int
+sio_uri_to_addr(const char *uri, struct sockaddr *addr);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 0025d3611..16739f75d 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -203,3 +203,6 @@ add_executable(checkpoint_schedule.test
${PROJECT_SOURCE_DIR}/src/box/checkpoint_schedule.c
)
target_link_libraries(checkpoint_schedule.test m unit)
+
+add_executable(sio.test sio.c)
+target_link_libraries(sio.test unit core)
diff --git a/test/unit/sio.c b/test/unit/sio.c
new file mode 100644
index 000000000..84a86aac3
--- /dev/null
+++ b/test/unit/sio.c
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "unit.h"
+#include "memory.h"
+#include "fiber.h"
+#include <sys/un.h>
+#include <arpa/inet.h>
+#include "sio.h"
+
+static void
+check_uri_to_addr(void)
+{
+ header();
+ plan(18);
+ struct sockaddr_storage storage;
+ struct sockaddr *addr = (struct sockaddr *) &storage;
+ struct sockaddr_un *un = (struct sockaddr_un *) addr;
+ struct sockaddr_in *in = (struct sockaddr_in *) addr;
+ isnt(0, sio_uri_to_addr("invalid uri", addr),
+ "invalid uri is detected");
+
+ char long_path[1000];
+ char *pos = long_path + sprintf(long_path, "unix/:/");
+ memset(pos, 'a', 900);
+ pos[900] = 0;
+ isnt(0, sio_uri_to_addr(long_path, addr), "too long UNIX path");
+
+ is(0, sio_uri_to_addr("unix/:/normal_path", addr), "UNIX");
+ is(0, strcmp(un->sun_path, "/normal_path"), "UNIX path");
+ is(AF_UNIX, un->sun_family, "UNIX family");
+
+ is(0, sio_uri_to_addr("localhost:1234", addr), "localhost");
+ is(AF_INET, in->sin_family, "localhost family");
+ is(htonl(INADDR_LOOPBACK), in->sin_addr.s_addr, "localhost address");
+ is(htons(1234), in->sin_port, "localhost port");
+
+ is(0, sio_uri_to_addr("5678", addr), "'any'");
+ is(AF_INET, in->sin_family, "'any' family");
+ is(htonl(INADDR_ANY), in->sin_addr.s_addr, "'any' address");
+ is(htons(5678), in->sin_port, "'any' port");
+
+ is(0, sio_uri_to_addr("192.168.0.1:9101", addr), "IP");
+ is(AF_INET, in->sin_family, "IP family");
+ is(inet_addr("192.168.0.1"), in->sin_addr.s_addr, "IP address");
+ is(htons(9101), in->sin_port, "IP port");
+
+ isnt(0, sio_uri_to_addr("192.168.0.300:1112", addr), "invalid IP");
+
+ check_plan();
+ footer();
+}
+
+int
+main(void)
+{
+ memory_init();
+ fiber_init(fiber_c_invoke);
+
+ header();
+ plan(1);
+ check_uri_to_addr();
+ int rc = check_plan();
+ footer();
+
+ fiber_free();
+ memory_free();
+ return rc;
+}
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] [PATCH v4 01/12] sio: introduce sio_uri_to_addr
2019-01-30 21:28 ` [PATCH v4 01/12] sio: introduce sio_uri_to_addr Vladislav Shpilevoy
@ 2019-02-15 13:21 ` Konstantin Osipov
2019-02-15 21:22 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 23+ messages in thread
From: Konstantin Osipov @ 2019-02-15 13:21 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/01/31 10:28]:
> The function parses string URI consisting of either IP and port,
> or UNIX socket address, and stores the result into struct
> sockaddr.
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 01/12] sio: introduce sio_uri_to_addr
2019-02-15 13:21 ` [tarantool-patches] " Konstantin Osipov
@ 2019-02-15 21:22 ` Vladislav Shpilevoy
0 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-02-15 21:22 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
Pushed to 2.1.
On 15/02/2019 14:21, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/01/31 10:28]:
>> The function parses string URI consisting of either IP and port,
>> or UNIX socket address, and stores the result into struct
>> sockaddr.
>
> OK to push.
>
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 10/12] [RAW] swim: introduce 'quit' message
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 01/12] sio: introduce sio_uri_to_addr Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-02-21 12:13 ` [tarantool-patches] " Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 11/12] [RAW] swim: introduce broadcast tasks Vladislav Shpilevoy
` (9 subsequent siblings)
11 siblings, 1 reply; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
'Quit' message helps gracefully leave the cluster, notifying all
members that this instance is not dead, but just decided to
leave.
Part of #3234
---
src/lib/swim/swim.c | 255 +++++++++++++++++++++++++++++++++++---
src/lib/swim/swim.h | 8 ++
src/lib/swim/swim_io.c | 8 +-
src/lib/swim/swim_io.h | 4 +
src/lib/swim/swim_proto.c | 12 ++
src/lib/swim/swim_proto.h | 38 ++++++
src/lua/swim.c | 33 ++++-
7 files changed, 333 insertions(+), 25 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 5cec3789a..b377f154f 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -138,8 +138,14 @@ enum {
*/
ACK_TIMEOUT_DEFAULT = 30,
/**
- * If a member has not been responding to pings this
- * number of times, it is considered to be dead.
+ * If an alive member has not been responding to pings
+ * this number of times, it is suspected to be dead. To
+ * confirm the death it should fail more pings.
+ */
+ NO_ACKS_TO_SUSPECT = 2,
+ /**
+ * If a suspected member has not been responding to pings
+ * this number of times, it is considered to be dead.
* According to the SWIM paper, for a member it is enough
* to do not respond on one direct ping, and on K
* simultanous indirect pings, to be considered as dead.
@@ -156,6 +162,11 @@ enum {
* anti-entropy components.
*/
NO_ACKS_TO_GC = 2,
+ /**
+ * Number of attempts to reach out a member who did not
+ * answered on a regular ping via another members.
+ */
+ INDIRECT_PING_COUNT = 2,
};
/**
@@ -367,6 +378,15 @@ struct swim {
struct rlist queue_events;
};
+/** Get a random member from a members table. */
+static inline struct swim_member *
+swim_random_member(struct swim *swim)
+{
+ int rnd = swim_scaled_rand(0, mh_size(swim->members) - 1);
+ mh_int_t node = mh_swim_table_random(swim->members, rnd);
+ return *mh_swim_table_node(swim->members, node);
+}
+
/** Reset cached round message on any change of any member. */
static inline void
cached_round_msg_invalidate(struct swim *swim)
@@ -374,15 +394,34 @@ cached_round_msg_invalidate(struct swim *swim)
swim_packet_create(&swim->round_step_task.packet);
}
+/** Comparator for a sorted list of ping deadlines. */
+static inline int
+swim_member_ping_deadline_cmp(struct swim_member *a, struct swim_member *b)
+{
+ double res = a->ping_deadline - b->ping_deadline;
+ if (res > 0)
+ return 1;
+ return res < 0 ? -1 : 0;
+}
+
/** Put the member into a list of ACK waiters. */
static void
-swim_member_wait_ack(struct swim *swim, struct swim_member *member)
+swim_member_wait_ack(struct swim *swim, struct swim_member *member,
+ int hop_count)
{
if (rlist_empty(&member->in_queue_wait_ack)) {
- member->ping_deadline = fiber_time() +
- swim->wait_ack_tick.interval;
- rlist_add_tail_entry(&swim->queue_wait_ack, member,
- in_queue_wait_ack);
+ double timeout = swim->wait_ack_tick.interval * hop_count;
+ member->ping_deadline = fiber_time() + timeout;
+ struct swim_member *pos;
+ /*
+ * Indirect ping deadline can be later than
+ * deadlines of some of newer direct pings, so it
+ * is not enough to just append a new member to
+ * the end of this list.
+ */
+ rlist_add_tail_entry_sorted(&swim->queue_wait_ack, pos, member,
+ in_queue_wait_ack,
+ swim_member_ping_deadline_cmp);
}
}
@@ -556,7 +595,7 @@ swim_ping_task_complete(struct swim_task *task,
struct swim *swim = swim_by_scheduler(scheduler);
struct swim_member *m = container_of(task, struct swim_member,
ping_task);
- swim_member_wait_ack(swim, m);
+ swim_member_wait_ack(swim, m, 1);
}
/**
@@ -845,6 +884,8 @@ swim_decrease_events_ttl(struct swim *swim)
if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
cached_round_msg_invalidate(swim);
+ if (member->status == MEMBER_LEFT)
+ swim_member_delete(swim, member);
}
}
}
@@ -897,7 +938,7 @@ swim_round_step_complete(struct swim_task *task,
* Each round message contains failure detection
* section with a ping.
*/
- swim_member_wait_ack(swim, m);
+ swim_member_wait_ack(swim, m, 1);
/* As well as dissemination. */
swim_decrease_events_ttl(swim);
}
@@ -906,12 +947,15 @@ swim_round_step_complete(struct swim_task *task,
/** Schedule send of a failure detection message. */
static void
swim_send_fd_request(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+ const struct sockaddr_in *dst, enum swim_fd_msg_type type,
+ const struct sockaddr_in *proxy)
{
/*
* Reset packet allocator in case if task is being reused.
*/
swim_packet_create(&task->packet);
+ if (proxy != NULL)
+ swim_task_proxy(task, proxy);
char *header = swim_packet_alloc(&task->packet, 1);
int map_size = swim_encode_src_uuid(swim, &task->packet);
map_size += swim_encode_failure_detection(swim, &task->packet, type);
@@ -925,17 +969,82 @@ swim_send_fd_request(struct swim *swim, struct swim_task *task,
/** Schedule send of an ack. */
static inline void
swim_send_ack(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst)
+ const struct sockaddr_in *dst, const struct sockaddr_in *proxy)
{
- swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK);
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK, proxy);
}
/** Schedule send of a ping. */
static inline void
swim_send_ping(struct swim *swim, struct swim_task *task,
- const struct sockaddr_in *dst)
+ const struct sockaddr_in *dst, const struct sockaddr_in *proxy)
{
- swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING);
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING, proxy);
+}
+
+/**
+ * Indirect ping task. It is executed multiple times to send a
+ * ping to several random members. Main motivation of this task is
+ * to do not create many tasks for indirect pings swarm, but reuse
+ * one.
+ */
+struct swim_iping_task {
+ /** Base structure. */
+ struct swim_task base;
+ /**
+ * How many times to send. Decremented on each send and on
+ * 0 the task is deleted.
+ */
+ int ttl;
+};
+
+/** Reschedule the task with a different proxy, or delete. */
+static void
+swim_iping_task_complete(struct swim_task *base_task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_iping_task *task = (struct swim_iping_task *) base_task;
+ if (--task->ttl == 0) {
+ swim_task_destroy(base_task);
+ free(task);
+ return;
+ }
+ swim_task_send(base_task, &swim_random_member(swim)->addr, scheduler);
+}
+
+/**
+ * Schedule a number of indirect pings of a member with the
+ * specified address and UUID.
+ */
+static inline int
+swim_send_indirect_pings(struct swim *swim, const struct sockaddr_in *dst)
+{
+ struct swim_iping_task *task =
+ (struct swim_iping_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return -1;
+ }
+ task->ttl = INDIRECT_PING_COUNT;
+ swim_task_create(&task->base, swim_iping_task_complete,
+ swim_task_delete_cb);
+ swim_send_ping(swim, &task->base, dst, &swim_random_member(swim)->addr);
+ return 0;
+}
+
+/** Schedule an indirect ACK. */
+static inline int
+swim_send_indirect_ack(struct swim *swim, const struct sockaddr_in *dst,
+ const struct sockaddr_in *proxy)
+{
+ struct swim_task *task = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb);
+ if (task == NULL)
+ return -1;
+ swim_send_ack(swim, task, dst, proxy);
+ return 0;
}
/**
@@ -959,6 +1068,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
++m->unacknowledged_pings;
switch (m->status) {
case MEMBER_ALIVE:
+ if (m->unacknowledged_pings < NO_ACKS_TO_SUSPECT)
+ break;
+ m->status = MEMBER_SUSPECTED;
+ swim_member_status_is_updated(m, swim);
+ if (swim_send_indirect_pings(swim, &m->addr) != 0)
+ diag_log();
+ break;
+ case MEMBER_SUSPECTED:
if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
swim_member_status_is_updated(m, swim);
@@ -969,10 +1086,12 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
m->status_ttl == 0)
swim_member_delete(swim, m);
break;
+ case MEMBER_LEFT:
+ break;
default:
unreachable();
}
- swim_send_ping(swim, &m->ping_task, &m->addr);
+ swim_send_ping(swim, &m->ping_task, &m->addr, NULL);
rlist_del_entry(m, in_queue_wait_ack);
}
}
@@ -1136,7 +1255,8 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
static int
swim_process_failure_detection(struct swim *swim, const char **pos,
const char *end, const struct sockaddr_in *src,
- const struct tt_uuid *uuid)
+ const struct tt_uuid *uuid,
+ const struct sockaddr_in *proxy)
{
const char *msg_pref = "invalid failure detection message:";
struct swim_failure_detection_def def;
@@ -1154,7 +1274,13 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
switch (def.type) {
case SWIM_FD_MSG_PING:
- swim_send_ack(swim, &member->ack_task, &member->addr);
+ if (proxy == NULL) {
+ swim_send_ack(swim, &member->ack_task, &member->addr,
+ NULL);
+ } else if (swim_send_indirect_ack(swim, &member->addr,
+ proxy) != 0) {
+ diag_log();
+ }
break;
case SWIM_FD_MSG_ACK:
if (def.incarnation >= member->incarnation) {
@@ -1197,13 +1323,43 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a quit message. Schedule dissemination, change status.
+ */
+static int
+swim_process_quit(struct swim *swim, const char **pos, const char *end,
+ const struct sockaddr_in *src, const struct tt_uuid *uuid)
+{
+ (void) src;
+ const char *msg_pref = "invald quit message:";
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ if (size != 1) {
+ diag_set(SwimError, "%s map of size 1 is expected", msg_pref);
+ return -1;
+ }
+ uint64_t tmp;
+ if (swim_decode_uint(pos, end, &tmp, msg_pref, "a key") != 0)
+ return -1;
+ if (tmp != SWIM_QUIT_INCARNATION) {
+ diag_set(SwimError, "%s a key should be incarnation", msg_pref);
+ return -1;
+ }
+ if (swim_decode_uint(pos, end, &tmp, msg_pref, "incarnation") != 0)
+ return -1;
+ struct swim_member *m = swim_find_member(swim, uuid);
+ if (m != NULL)
+ swim_member_update_status(m, MEMBER_LEFT, tmp, swim);
+ return 0;
+}
+
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
const char *end, const struct sockaddr_in *src,
const struct sockaddr_in *proxy)
{
- (void) proxy;
const char *msg_pref = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
@@ -1237,7 +1393,8 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
case SWIM_FAILURE_DETECTION:
say_verbose("SWIM: process failure detection");
if (swim_process_failure_detection(swim, &pos, end,
- src, &uuid) != 0)
+ src, &uuid,
+ proxy) != 0)
goto error;
break;
case SWIM_DISSEMINATION:
@@ -1245,6 +1402,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
if (swim_process_dissemination(swim, &pos, end) != 0)
goto error;
break;
+ case SWIM_QUIT:
+ say_verbose("SWIM: process quit");
+ if (swim_process_quit(swim, &pos, end, src, &uuid) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", msg_pref);
goto error;
@@ -1460,7 +1622,7 @@ swim_probe_member(struct swim *swim, const char *uri)
swim_task_delete_cb);
if (t == NULL)
return -1;
- swim_send_ping(swim, t, &addr);
+ swim_send_ping(swim, t, &addr, NULL);
return 0;
}
@@ -1501,3 +1663,56 @@ swim_delete(struct swim *swim)
}
mh_swim_table_delete(swim->members);
}
+
+/**
+ * Quit message is broadcasted in the same way as round messages,
+ * step by step, with the only difference that quit round steps
+ * follow each other without delays.
+ */
+static void
+swim_quit_step_complete(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ (void) task;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ if (rlist_empty(&swim->queue_round)) {
+ swim_delete(swim);
+ return;
+ }
+ struct swim_member *m =
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler);
+}
+
+void
+swim_quit(struct swim *swim)
+{
+ if (swim->self == NULL) {
+ swim_delete(swim);
+ return;
+ }
+ ev_periodic_stop(loop(), &swim->round_tick);
+ ev_periodic_stop(loop(), &swim->wait_ack_tick);
+ swim_scheduler_stop_input(&swim->scheduler);
+ /* Start the last round - quiting. */
+ if (swim_new_round(swim) != 0 || rlist_empty(&swim->queue_round)) {
+ swim_delete(swim);
+ return;
+ }
+ swim_task_destroy(&swim->round_step_task);
+ swim_task_create(&swim->round_step_task, swim_quit_step_complete,
+ swim_task_delete_cb);
+ struct swim_quit_bin header;
+ swim_quit_bin_create(&header, swim->self->incarnation);
+ int size = mp_sizeof_map(1) + sizeof(header);
+ char *pos = swim_packet_alloc(&swim->round_step_task.packet, size);
+ assert(pos != NULL);
+ pos = mp_encode_map(pos, 1);
+ memcpy(pos, &header, sizeof(header));
+ struct swim_member *m =
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler);
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index dced172c0..24f3a4b33 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -97,6 +97,14 @@ swim_probe_member(struct swim *swim, const char *uri);
void
swim_info(struct swim *swim, struct info_handler *info);
+/**
+ * Gracefully leave the cluster, broadcast a notification.
+ * Members, received it, will remove a record about this instance
+ * from their tables, and will not consider it to be dead.
+ */
+void
+swim_quit(struct swim *swim);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index a8fb1f588..e62b7126f 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -187,6 +187,12 @@ swim_scheduler_bind(struct swim_scheduler *scheduler,
return 0;
}
+void
+swim_scheduler_stop_input(struct swim_scheduler *scheduler)
+{
+ ev_io_stop(loop(), &scheduler->input);
+}
+
void
swim_scheduler_destroy(struct swim_scheduler *scheduler)
{
@@ -202,7 +208,7 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
}
swim_transport_destroy(&scheduler->transport);
ev_io_stop(loop(), &scheduler->output);
- ev_io_stop(loop(), &scheduler->input);
+ swim_scheduler_stop_input(scheduler);
}
static void
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 0ba8972f0..c13b5d14f 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -178,6 +178,10 @@ int
swim_scheduler_bind(struct swim_scheduler *scheduler,
const struct sockaddr_in *addr);
+/** Stop accepting new packets from the network. */
+void
+swim_scheduler_stop_input(struct swim_scheduler *scheduler);
+
/** Destroy scheduler, its queues, close the socket. */
void
swim_scheduler_destroy(struct swim_scheduler *scheduler);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 8b1ed76a7..aa9edfc9d 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -36,7 +36,9 @@
const char *swim_member_status_strs[] = {
"alive",
+ "suspected",
"dead",
+ "left",
};
const char *swim_fd_msg_type_strs[] = {
@@ -581,3 +583,13 @@ swim_route_bin_create(struct swim_route_bin *route,
route->m_dst_port = 0xcd;
route->v_dst_port = mp_bswap_u16(dst->sin_port);
}
+
+void
+swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation)
+{
+ header->k_quit = SWIM_QUIT;
+ header->m_quit = 0x81;
+ header->k_incarnation = SWIM_QUIT_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index fe9eb85c5..b743074e5 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -92,6 +92,12 @@ enum {
* | }, |
* | ... |
* | ], |
+ * | |
+ * | OR/AND |
+ * | |
+ * | SWIM_QUIT: { |
+ * | SWIM_QUIT_INCARNATION: uint |
+ * | } |
* | } |
* +-------------------------------------------------------------+
*/
@@ -99,11 +105,19 @@ enum {
enum swim_member_status {
/** The instance is ok, responds to requests. */
MEMBER_ALIVE = 0,
+ /**
+ * If a member has not responded to a ping, it is declared
+ * as suspected to be dead. After more failed pings it
+ * is finaly dead.
+ */
+ MEMBER_SUSPECTED,
/**
* The member is considered to be dead. It will disappear
* from the membership, if it is not pinned.
*/
MEMBER_DEAD,
+ /** The member has voluntary left the cluster. */
+ MEMBER_LEFT,
swim_member_status_MAX,
};
@@ -157,6 +171,7 @@ enum swim_body_key {
SWIM_ANTI_ENTROPY,
SWIM_FAILURE_DETECTION,
SWIM_DISSEMINATION,
+ SWIM_QUIT,
};
/**
@@ -589,6 +604,29 @@ swim_route_bin_create(struct swim_route_bin *route,
/** }}} Meta component */
+enum swim_quit_key {
+ /** Incarnation to ignore old quit messages. */
+ SWIM_QUIT_INCARNATION = 0,
+};
+
+/** Quit section. Describes voluntary quit from the cluster. */
+struct PACKED swim_quit_bin {
+ /** mp_encode_uint(SWIM_QUIT) */
+ uint8_t k_quit;
+ /** mp_encode_map(1) */
+ uint8_t m_quit;
+
+ /** mp_encode_uint(SWIM_QUIT_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+/** Initialize quit section. */
+void
+swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation);
+
/**
* Helpers to decode some values - map, array, etc with
* appropriate checks. All of them set diagnostics on an error
diff --git a/src/lua/swim.c b/src/lua/swim.c
index a20c2dc0d..7df4e5c85 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -312,6 +312,16 @@ lua_swim_remove_member(struct lua_State *L)
return 1;
}
+/** Remove a SWIM instance pointer from Lua space, nullify. */
+static void
+lua_swim_invalidate(struct lua_State *L)
+{
+ uint32_t ctypeid;
+ struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == CTID_STRUCT_SWIM_PTR);
+ *cdata = NULL;
+}
+
/**
* Destroy and delete a SWIM instance. All its memory is freed, it
* stops participating in any rounds, the socket is closed. No
@@ -326,10 +336,7 @@ lua_swim_delete(struct lua_State *L)
if (swim == NULL)
return luaL_error(L, "Usage: swim:delete()");
swim_delete(swim);
- uint32_t ctypeid;
- struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid);
- assert(ctypeid == CTID_STRUCT_SWIM_PTR);
- *cdata = NULL;
+ lua_swim_invalidate(L);
return 0;
}
@@ -379,6 +386,23 @@ lua_swim_probe_member(struct lua_State *L)
return 1;
}
+/**
+ * Gracefully leave the cluster. The Lua stack should contain one
+ * value - a SWIM instance. After this method is called, the SWIM
+ * instance is deleted and can not be used.
+ * @param L Lua state.
+ */
+static int
+lua_swim_quit(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:quit()");
+ swim_quit(swim);
+ lua_swim_invalidate(L);
+ return 0;
+}
+
void
tarantool_lua_swim_init(struct lua_State *L)
{
@@ -390,6 +414,7 @@ tarantool_lua_swim_init(struct lua_State *L)
{"delete", lua_swim_delete},
{"info", lua_swim_info},
{"probe_member", lua_swim_probe_member},
+ {"quit", lua_swim_quit},
{NULL, NULL}
};
luaL_register_module(L, "swim", lua_swim_methods);
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] [PATCH v4 10/12] [RAW] swim: introduce 'quit' message
2019-01-30 21:28 ` [PATCH v4 10/12] [RAW] swim: introduce 'quit' message Vladislav Shpilevoy
@ 2019-02-21 12:13 ` Vladislav Shpilevoy
0 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-02-21 12:13 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
Sorry, this commit was accidentally merged with indirect ping/acks.
I will fix that on the branch.
On 31/01/2019 00:28, Vladislav Shpilevoy wrote:
> 'Quit' message helps gracefully leave the cluster, notifying all
> members that this instance is not dead, but just decided to
> leave.
>
> Part of #3234
> ---
> src/lib/swim/swim.c | 255 +++++++++++++++++++++++++++++++++++---
> src/lib/swim/swim.h | 8 ++
> src/lib/swim/swim_io.c | 8 +-
> src/lib/swim/swim_io.h | 4 +
> src/lib/swim/swim_proto.c | 12 ++
> src/lib/swim/swim_proto.h | 38 ++++++
> src/lua/swim.c | 33 ++++-
> 7 files changed, 333 insertions(+), 25 deletions(-)
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 11/12] [RAW] swim: introduce broadcast tasks
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 01/12] sio: introduce sio_uri_to_addr Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 10/12] [RAW] swim: introduce 'quit' message Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 12/12] [RAW] swim: allow to use broadcast tasks to send pings Vladislav Shpilevoy
` (8 subsequent siblings)
11 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
When a cluster is just created, no one knows anyone. Broadcast
helps to establish some initial relationships between members.
Part of #3234
---
src/lib/swim/swim_io.c | 97 ++++++++++++++++++++++++++++++++++++++++++
src/lib/swim/swim_io.h | 22 ++++++++++
2 files changed, 119 insertions(+)
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index e62b7126f..7bdb4bee0 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -32,6 +32,8 @@
#include "swim_proto.h"
#include "fiber.h"
#include "sio.h"
+#include <ifaddrs.h>
+#include <net/if.h>
/**
* Allocate memory for meta. The same as mere alloc, but moves
@@ -147,6 +149,101 @@ swim_task_send(struct swim_task *task, const struct sockaddr_in *dst,
swim_task_schedule(task, scheduler);
}
+/** Delete a broadcast task. */
+static void
+swim_bcast_task_delete(struct swim_bcast_task *task)
+{
+ freeifaddrs(task->addrs);
+ swim_task_destroy(&task->base);
+ free(task);
+}
+
+/** Delete broadcast task on its cancelation. */
+static void
+swim_bcast_task_delete_cb(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) scheduler;
+ (void) rc;
+ swim_bcast_task_delete((struct swim_bcast_task *) task);
+}
+
+/**
+ * Write down a next available broadcast address into the task
+ * destination field.
+ * @param task Broadcast task to update.
+ * @retval 0 Success. 'dst' field is updated.
+ * @retval -1 No more addresses.
+ */
+static int
+swim_bcast_task_next_addr(struct swim_bcast_task *task)
+{
+ for (struct ifaddrs *i = task->i; i != NULL; i = i->ifa_next) {
+ int flags = i->ifa_flags;
+ if ((flags & IFF_UP) == 0)
+ continue;
+
+ if ((flags & IFF_BROADCAST) != 0 &&
+ (i->ifa_broadaddr->sa_family == AF_INET))
+ task->base.dst =
+ *(struct sockaddr_in *) i->ifa_broadaddr;
+ else if (i->ifa_addr != NULL &&
+ i->ifa_addr->sa_family == AF_INET)
+ task->base.dst = *(struct sockaddr_in *) i->ifa_addr;
+ else
+ continue;
+ task->base.dst.sin_port = task->port;
+ task->i = task->i->ifa_next;
+ return 0;
+ }
+ return -1;
+}
+
+/**
+ * Callback on a send completion. If there are more broadcast
+ * addresses to use, then the task is rescheduled. Else deleted.
+ */
+static void
+swim_bcast_task_complete(struct swim_task *base_task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ struct swim_bcast_task *task = (struct swim_bcast_task *) base_task;
+ if (swim_bcast_task_next_addr(task) != 0)
+ swim_bcast_task_delete(task);
+ else
+ swim_task_schedule(base_task, scheduler);
+}
+
+struct swim_bcast_task *
+swim_bcast_task_new(int port)
+{
+ struct swim_bcast_task *task =
+ (struct swim_bcast_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ struct ifaddrs *addrs;
+ if (getifaddrs(&addrs) != 0) {
+ diag_set(SystemError, "error in getifaddrs");
+ free(task);
+ return NULL;
+ }
+ task->port = port;
+ task->addrs = addrs;
+ task->i = addrs;
+ swim_task_create(&task->base, swim_bcast_task_complete,
+ swim_bcast_task_delete_cb);
+ if (swim_bcast_task_next_addr(task) != 0) {
+ diag_set(SwimError, "broadcast has failed - no available "\
+ "interfaces");
+ swim_bcast_task_delete(task);
+ return NULL;
+ }
+ return task;
+}
+
/**
* Dispatch a next output event. Build packet meta and send the
* packet.
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index c13b5d14f..0341d8e3c 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -43,6 +43,7 @@
*/
struct swim_task;
+struct ifaddrs;
struct swim_scheduler;
enum {
@@ -257,4 +258,25 @@ swim_task_destroy(struct swim_task *task)
rlist_del_entry(task, in_queue_output);
}
+/**
+ * Broadcast task. Besides usual task fields, stores a list of
+ * interfaces available for broadcast packets. The task works
+ * multiple times, each time sending a packet to one interface.
+ * After completion it is self-deleted.
+ */
+struct swim_bcast_task {
+ /** Base structure. */
+ struct swim_task base;
+ /** Port to use for broadcast. */
+ int port;
+ /** A list of interfaces. */
+ struct ifaddrs *addrs;
+ /** A next interface to send to. */
+ struct ifaddrs *i;
+};
+
+/** Create a new broadcast task with a specified port. */
+struct swim_bcast_task *
+swim_bcast_task_new(int port);
+
#endif /* TARANTOOL_SWIM_IO_H_INCLUDED */
\ No newline at end of file
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 12/12] [RAW] swim: allow to use broadcast tasks to send pings
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (2 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 11/12] [RAW] swim: introduce broadcast tasks Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 02/12] evio: expose evio_setsockopt_server function Vladislav Shpilevoy
` (7 subsequent siblings)
11 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
Part of #3234
---
src/lib/swim/swim.c | 15 +++++++++++++++
src/lib/swim/swim.h | 7 +++++++
src/lua/swim.c | 39 +++++++++++++++++++++++++++++++++++++++
3 files changed, 61 insertions(+)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index b377f154f..f67acc5b3 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1626,6 +1626,21 @@ swim_probe_member(struct swim *swim, const char *uri)
return 0;
}
+int
+swim_broadcast(struct swim *swim, int port)
+{
+ const char *msg_pref = "swim.broadcast:";
+ if (swim_check_is_configured(swim, msg_pref) != 0)
+ return -1;
+ if (port < 0)
+ port = ntohs(swim->self->addr.sin_port);
+ struct swim_bcast_task *t = swim_bcast_task_new(port);
+ if (t == NULL)
+ return -1;
+ swim_send_ping(swim, &t->base, &t->base.dst, NULL);
+ return 0;
+}
+
void
swim_info(struct swim *swim, struct info_handler *info)
{
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 24f3a4b33..8f30d58d0 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -93,6 +93,13 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid);
int
swim_probe_member(struct swim *swim, const char *uri);
+/**
+ * Broadcast a ping to all interfaces on a specified port. If a
+ * port is < 0, then a port of the SWIM instance is used.
+ */
+int
+swim_broadcast(struct swim *swim, int port);
+
/** Dump member statuses into @a info. */
void
swim_info(struct swim *swim, struct info_handler *info);
diff --git a/src/lua/swim.c b/src/lua/swim.c
index 7df4e5c85..1be8e7595 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -403,6 +403,44 @@ lua_swim_quit(struct lua_State *L)
return 0;
}
+/**
+ * Broadcast a ping over all network interfaces with a speicifed
+ * port. Port is optional and in a case of absence it is set to
+ * a port of the current instance. The Lua stack should contain a
+ * SWIM instance to broadcast from, and optionally a port.
+ * @param L Lua state.
+ * @retval 1 True.
+ * @retval 2 Nil and an error object. On invalid Lua parameters
+ * and OOM it throws.
+ */
+static int
+lua_swim_broadcast(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:broadcast([port])");
+ int port = -1;
+ if (lua_gettop(L) > 1) {
+ if (! lua_isnumber(L, 2)) {
+ return luaL_error(L, "swim.broadcast: port should be "\
+ "a number");
+ }
+ double dport = lua_tonumber(L, 2);
+ port = dport;
+ if (dport != (double) port) {
+ return luaL_error(L, "swim.broadcast: port should be "\
+ "an integer");
+ }
+ }
+ if (swim_broadcast(swim, port) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
void
tarantool_lua_swim_init(struct lua_State *L)
{
@@ -415,6 +453,7 @@ tarantool_lua_swim_init(struct lua_State *L)
{"info", lua_swim_info},
{"probe_member", lua_swim_probe_member},
{"quit", lua_swim_quit},
+ {"broadcast", lua_swim_broadcast},
{NULL, NULL}
};
luaL_register_module(L, "swim", lua_swim_methods);
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 02/12] evio: expose evio_setsockopt_server function
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (3 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 12/12] [RAW] swim: allow to use broadcast tasks to send pings Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-02-15 13:21 ` [tarantool-patches] " Konstantin Osipov
2019-01-30 21:28 ` [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted Vladislav Shpilevoy
` (6 subsequent siblings)
11 siblings, 1 reply; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
It is going to be used in SWIM module to set UDP server socket
options. Apparently this function sets some usefull flags like
NONBLOCK, REUSEADDR.
---
src/evio.c | 3 +--
src/evio.h | 4 ++++
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/src/evio.c b/src/evio.c
index 9ca14c45c..8610dbbe7 100644
--- a/src/evio.c
+++ b/src/evio.c
@@ -129,8 +129,7 @@ evio_setsockopt_client(int fd, int family, int type)
return 0;
}
-/** Set options for server sockets. */
-static int
+int
evio_setsockopt_server(int fd, int family, int type)
{
int on = 1;
diff --git a/src/evio.h b/src/evio.h
index 69d641a60..872a21ab6 100644
--- a/src/evio.h
+++ b/src/evio.h
@@ -157,6 +157,10 @@ evio_timeout_update(ev_loop *loop, ev_tstamp start, ev_tstamp *delay)
int
evio_setsockopt_client(int fd, int family, int type);
+/** Set options for server sockets. */
+int
+evio_setsockopt_server(int fd, int family, int type);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] [PATCH v4 02/12] evio: expose evio_setsockopt_server function
2019-01-30 21:28 ` [PATCH v4 02/12] evio: expose evio_setsockopt_server function Vladislav Shpilevoy
@ 2019-02-15 13:21 ` Konstantin Osipov
2019-02-15 21:22 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 23+ messages in thread
From: Konstantin Osipov @ 2019-02-15 13:21 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/01/31 10:28]:
> It is going to be used in SWIM module to set UDP server socket
> options. Apparently this function sets some usefull flags like
> NONBLOCK, REUSEADDR.
A bit more detailed comment wouldn't harm for a public function:
which options this function is setting and why.
Otherwise OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 02/12] evio: expose evio_setsockopt_server function
2019-02-15 13:21 ` [tarantool-patches] " Konstantin Osipov
@ 2019-02-15 21:22 ` Vladislav Shpilevoy
0 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-02-15 21:22 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
Pushed to 2.1.
On 15/02/2019 14:21, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/01/31 10:28]:
>> It is going to be used in SWIM module to set UDP server socket
>> options. Apparently this function sets some usefull flags like
>> NONBLOCK, REUSEADDR.
>
> A bit more detailed comment wouldn't harm for a public function:
> which options this function is setting and why.
>
> Otherwise OK to push.
>
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (4 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 02/12] evio: expose evio_setsockopt_server function Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-02-15 13:26 ` [tarantool-patches] " Konstantin Osipov
2019-01-30 21:28 ` [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
` (5 subsequent siblings)
11 siblings, 1 reply; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
Add an entry to an ascending sorted list. Scan is started from
the tail. Applicable when new elements are usually already bigger
than all other ones, since insertion into a sorted list is O(N).
Necessary for SWIM implementation, where a list is stored of
members, waiting for an ACK. Sometimes there can be inserted an
indirect ACK, which deadline could be bigger, than deadlines
of next direct ACKs. So new elements are inserted either into the
tail, or almost into the tail.
Order allows to stop checking next elements for an unacknowledged
ping when on a next one deadline is not exceeded yet.
Needed for #3234
---
src/lib/small | 2 +-
test/unit/rlist.c | 40 +++++++++++++++++++++++++++++++++++++++-
test/unit/rlist.result | 6 +++++-
3 files changed, 45 insertions(+), 3 deletions(-)
diff --git a/src/lib/small b/src/lib/small
index cdf7d4a8e..ff4a7279a 160000
--- a/src/lib/small
+++ b/src/lib/small
@@ -1 +1 @@
-Subproject commit cdf7d4a8e24ae465298d0ddacaf1aaed1085281e
+Subproject commit ff4a7279af79f7f49fedf7c32622ec2602de7644
diff --git a/test/unit/rlist.c b/test/unit/rlist.c
index c0c29a3f1..eb78adc7e 100644
--- a/test/unit/rlist.c
+++ b/test/unit/rlist.c
@@ -1,10 +1,12 @@
#include "small/rlist.h"
#include <stdio.h>
#include <stdarg.h>
+#include <limits.h>
+#include <time.h>
#include "unit.h"
-#define PLAN 87
+#define PLAN 91
#define ITEMS 7
@@ -19,9 +21,16 @@ static struct test items[ITEMS];
static RLIST_HEAD(head);
static RLIST_HEAD(head2);
+static inline int
+cmp(struct test *a, struct test *b)
+{
+ return a->no - b->no;
+}
+
int
main(void)
{
+ srand(time(NULL));
int i;
struct test *it;
struct rlist *rlist;
@@ -132,6 +141,35 @@ main(void)
rlist_add_entry(&head, &items[0], list);
ok(rlist_prev_entry_safe(&items[0], &head, list) == NULL,
"prev is null");
+
+ rlist_insert_after_entry(&items[0], &items[2], list);
+ it = rlist_first_entry(&head, struct test, list);
+ is(it, &items[0], "inserted after first, first is ok");
+ it = rlist_next_entry(it, list);
+ is(it, &items[2], "inserted after first, second is ok");
+
+ rlist_insert_after_entry(&items[0], &items[1], list);
+ int is_sorted = 1;
+ i = 0;
+ rlist_foreach_entry(it, &head, list)
+ is_sorted = is_sorted && it == &items[i++];
+ rlist_foreach_entry_reverse(it, &head, list)
+ is_sorted = is_sorted && it == &items[--i];
+ ok(is_sorted, "after insertion into the middle the list is ok");
+
+ rlist_create(&head);
+ for (int i = 0; i < ITEMS; ++i) {
+ items[i].no = rand() % ITEMS;
+ rlist_add_tail_entry_sorted(&head, it, &items[i], list, cmp);
+ }
+ int prev = INT_MIN;
+ is_sorted = 1;
+ rlist_foreach_entry(it, &head, list) {
+ is_sorted = is_sorted && prev <= it->no;
+ prev = it->no;
+ }
+ ok(is_sorted, "the list is sorted");
+
return check_plan();
}
diff --git a/test/unit/rlist.result b/test/unit/rlist.result
index fa99a87cf..b2239d8f9 100644
--- a/test/unit/rlist.result
+++ b/test/unit/rlist.result
@@ -1,4 +1,4 @@
-1..87
+1..91
ok 1 - list is empty
ok 2 - rlist_nil is empty
ok 3 - head2 is empty
@@ -86,3 +86,7 @@ ok 84 - element (foreach_entry) 2
ok 85 - element (foreach_entry) 1
ok 86 - element (foreach_entry) 0
ok 87 - prev is null
+ok 88 - inserted after first, first is ok
+ok 89 - inserted after first, second is ok
+ok 90 - after insertion into the middle the list is ok
+ok 91 - the list is sorted
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted
2019-01-30 21:28 ` [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted Vladislav Shpilevoy
@ 2019-02-15 13:26 ` Konstantin Osipov
2019-02-15 13:34 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 23+ messages in thread
From: Konstantin Osipov @ 2019-02-15 13:26 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/01/31 10:28]:
> Add an entry to an ascending sorted list. Scan is started from
> the tail. Applicable when new elements are usually already bigger
> than all other ones, since insertion into a sorted list is O(N).
>
> Necessary for SWIM implementation, where a list is stored of
> members, waiting for an ACK. Sometimes there can be inserted an
> indirect ACK, which deadline could be bigger, than deadlines
> of next direct ACKs. So new elements are inserted either into the
> tail, or almost into the tail.
>
> Order allows to stop checking next elements for an unacknowledged
> ping when on a next one deadline is not exceeded yet.
>
> Needed for #3234
I don't mind having an insert into a sorted list, but
1) we're not stl, so this algorithm is not a generic one. Soon
we'll get slist_insert_sorted, array_insert_sorted, etc.
2) I don't understand if there is any reason to use rlist in this
case, why not use a red-black tree?
3) Why do we have the test in the server, while the library itself
is a subproject? I believe the library does have unit tests,
why not add tests there?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted
2019-02-15 13:26 ` [tarantool-patches] " Konstantin Osipov
@ 2019-02-15 13:34 ` Vladislav Shpilevoy
2019-02-15 18:07 ` Konstantin Osipov
0 siblings, 1 reply; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-02-15 13:34 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
On 15/02/2019 14:26, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/01/31 10:28]:
>> Add an entry to an ascending sorted list. Scan is started from
>> the tail. Applicable when new elements are usually already bigger
>> than all other ones, since insertion into a sorted list is O(N).
>>
>> Necessary for SWIM implementation, where a list is stored of
>> members, waiting for an ACK. Sometimes there can be inserted an
>> indirect ACK, which deadline could be bigger, than deadlines
>> of next direct ACKs. So new elements are inserted either into the
>> tail, or almost into the tail.
>>
>> Order allows to stop checking next elements for an unacknowledged
>> ping when on a next one deadline is not exceeded yet.
>>
>> Needed for #3234
>
> I don't mind having an insert into a sorted list, but
> 1) we're not stl, so this algorithm is not a generic one. Soon
> we'll get slist_insert_sorted, array_insert_sorted, etc.
It is generic for rlist. It takes a comparator.
> 2) I don't understand if there is any reason to use rlist in this
> case, why not use a red-black tree?
RB-tree is an overkill here. As I described in the commit message,
almost always a new element is pushed at the end of the list. Also,
out generic RB tree does not have O(1) access to the smallest element.
Honestly, I understand your complaint, and recently I decided to use
a heap here in later versions of the patchset. Heap gives me O(1)
access to the smallest element, while still proving O(log)
removal/addition. Moreover, when a new element is the biggest, insertion
is O(1) also. And this is the most popular case here.
> 3) Why do we have the test in the server, while the library itself
> is a subproject? I believe the library does have unit tests,
> why not add tests there?
For an unknown for me reason, rlist tests are stored in the server
repo. I do not know why and I decided not to break it here.
>
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted
2019-02-15 13:34 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2019-02-15 18:07 ` Konstantin Osipov
0 siblings, 0 replies; 23+ messages in thread
From: Konstantin Osipov @ 2019-02-15 18:07 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches, vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/02/15 16:38]:
I like the approach with heap better since it has a bounded worst
case. You never know what can go wrong with your code in the
future and I hate finding out production bugs caused by broken
assumptions about input data.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (5 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 03/12] rlist: introduce rlist_add_tail_entry_sorted Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-02-21 18:35 ` [tarantool-patches] " Konstantin Osipov
2019-01-30 21:28 ` [PATCH v4 05/12] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
` (4 subsequent siblings)
11 siblings, 1 reply; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
SWIM - Scalable Weakly-consistent Infection-style Process Group
Membership Protocol. It consists of 2 components: events
dissemination and failure detection, and stores in memory a
table of known remote hosts - members. Also some SWIM
implementations have an additional component: anti-entropy -
periodical broadcast of a random subset of members table.
Dissemination component spreads over the cluster changes occurred
with members. Failure detection constantly searches for failed
dead members. Anti-entropy just sends all known information at
once about a member so as to synchronize it among all other
members in case some events were not disseminated (UDP problems).
Anti-entropy is the most vital component, since it can work
without dissemination and failure detection. But they can not
work properly with out the former. Consider the example: two SWIM
nodes, both are alive. Nothing happens, so the events list is
empty, only pings are being sent periodically. Then a third
node appears. It knows about one of existing nodes. How should
it learn about another one? Sure, its known counterpart can try
to notify another one, but it is UDP, so this event can lost.
Anti-entropy is an extra simple component, it just piggybacks
random part of members table with each regular round message.
In the example above the new node will learn about the third
one via anti-entropy messages of the second one soon or late.
Part of #3234
---
src/CMakeLists.txt | 3 +-
src/diag.h | 2 +
src/exception.cc | 23 +
src/exception.h | 7 +
src/lib/CMakeLists.txt | 1 +
src/lib/swim/CMakeLists.txt | 6 +
src/lib/swim/swim.c | 828 ++++++++++++++++++++++++++++++++
src/lib/swim/swim.h | 91 ++++
src/lib/swim/swim_io.c | 204 ++++++++
src/lib/swim/swim_io.h | 225 +++++++++
src/lib/swim/swim_proto.c | 327 +++++++++++++
src/lib/swim/swim_proto.h | 320 ++++++++++++
src/lib/swim/swim_transport.h | 73 +++
src/lua/init.c | 2 +
src/lua/swim.c | 370 ++++++++++++++
src/lua/swim.h | 47 ++
test/unit/CMakeLists.txt | 3 +
test/unit/swim.c | 34 ++
test/unit/swim_test_transport.c | 78 +++
19 files changed, 2643 insertions(+), 1 deletion(-)
create mode 100644 src/lib/swim/CMakeLists.txt
create mode 100644 src/lib/swim/swim.c
create mode 100644 src/lib/swim/swim.h
create mode 100644 src/lib/swim/swim_io.c
create mode 100644 src/lib/swim/swim_io.h
create mode 100644 src/lib/swim/swim_proto.c
create mode 100644 src/lib/swim/swim_proto.h
create mode 100644 src/lib/swim/swim_transport.h
create mode 100644 src/lua/swim.c
create mode 100644 src/lua/swim.h
create mode 100644 test/unit/swim.c
create mode 100644 test/unit/swim_test_transport.c
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 38bd576e6..d2d33043b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -182,6 +182,7 @@ set (server_sources
lua/crypto.c
lua/httpc.c
lua/utf8.c
+ lua/swim.c
lua/info.c
${lua_sources}
${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc
@@ -228,7 +229,7 @@ endif()
set_source_files_compile_flags(${server_sources})
add_library(server STATIC ${server_sources})
-target_link_libraries(server core bit uri uuid ${ICU_LIBRARIES})
+target_link_libraries(server core bit uri uuid swim swim_udp ${ICU_LIBRARIES})
# Rule of thumb: if exporting a symbol from a static library, list the
# library here.
diff --git a/src/diag.h b/src/diag.h
index a0b71f049..f4189cd67 100644
--- a/src/diag.h
+++ b/src/diag.h
@@ -249,6 +249,8 @@ struct error *
BuildSystemError(const char *file, unsigned line, const char *format, ...);
struct error *
BuildCollationError(const char *file, unsigned line, const char *format, ...);
+struct error *
+BuildSwimError(const char *file, unsigned line, const char *format, ...);
struct index_def;
diff --git a/src/exception.cc b/src/exception.cc
index 8fdb24ad9..6124c70d0 100644
--- a/src/exception.cc
+++ b/src/exception.cc
@@ -269,6 +269,17 @@ CollationError::CollationError(const char *file, unsigned line,
va_end(ap);
}
+const struct type_info type_SwimError = make_type("SwimError", &type_Exception);
+
+SwimError::SwimError(const char *file, unsigned line, const char *format, ...)
+ : Exception(&type_SwimError, file, line)
+{
+ va_list ap;
+ va_start(ap, format);
+ error_vformat_msg(this, format, ap);
+ va_end(ap);
+}
+
#define BuildAlloc(type) \
void *p = malloc(sizeof(type)); \
if (p == NULL) \
@@ -348,6 +359,18 @@ BuildCollationError(const char *file, unsigned line, const char *format, ...)
return e;
}
+struct error *
+BuildSwimError(const char *file, unsigned line, const char *format, ...)
+{
+ BuildAlloc(SwimError);
+ SwimError *e = new (p) SwimError(file, line, "");
+ va_list ap;
+ va_start(ap, format);
+ error_vformat_msg(e, format, ap);
+ va_end(ap);
+ return e;
+}
+
struct error *
BuildSocketError(const char *file, unsigned line, const char *socketname,
const char *format, ...)
diff --git a/src/exception.h b/src/exception.h
index f08d946b5..2ec8a74ee 100644
--- a/src/exception.h
+++ b/src/exception.h
@@ -50,6 +50,7 @@ extern const struct type_info type_LuajitError;
extern const struct type_info type_IllegalParams;
extern const struct type_info type_SystemError;
extern const struct type_info type_CollationError;
+extern const struct type_info type_SwimError;
const char *
exception_get_string(struct error *e, const struct method_info *method);
@@ -159,6 +160,12 @@ public:
virtual void raise() { throw this; }
};
+class SwimError: public Exception {
+public:
+ SwimError(const char *file, unsigned line, const char *format, ...);
+ virtual void raise() { throw this; }
+};
+
/**
* Initialize the exception subsystem.
*/
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 98ff19b60..4e21e7da8 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -5,6 +5,7 @@ add_subdirectory(small)
add_subdirectory(salad)
add_subdirectory(csv)
add_subdirectory(json)
+add_subdirectory(swim)
if(ENABLE_BUNDLED_MSGPUCK)
add_subdirectory(msgpuck EXCLUDE_FROM_ALL)
endif()
diff --git a/src/lib/swim/CMakeLists.txt b/src/lib/swim/CMakeLists.txt
new file mode 100644
index 000000000..7af8aeda5
--- /dev/null
+++ b/src/lib/swim/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(lib_swim_sources swim.c swim_io.c swim_proto.c)
+set(lib_swim_udp_sources swim_udp_transport.c)
+
+set_source_files_compile_flags(${lib_swim_sources} ${lib_swim_udp_sources})
+add_library(swim STATIC ${lib_swim_sources})
+add_library(swim_udp STATIC ${lib_swim_udp_sources})
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
new file mode 100644
index 000000000..adf01cfad
--- /dev/null
+++ b/src/lib/swim/swim.c
@@ -0,0 +1,828 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim.h"
+#include "swim_io.h"
+#include "swim_proto.h"
+#include "uri.h"
+#include "fiber.h"
+#include "msgpuck.h"
+#include "info.h"
+#include "assoc.h"
+#include "sio.h"
+
+/**
+ * SWIM - Scalable Weakly-consistent Infection-style Process Group
+ * Membership Protocol. It consists of 2 components: events
+ * dissemination and failure detection, and stores in memory a
+ * table of known remote hosts - members. Also some SWIM
+ * implementations have an additional component: anti-entropy -
+ * periodical broadcast of a random subset of members table.
+ *
+ * Each SWIM component is different from others in both message
+ * structures and goals, they even could be sent in different
+ * messages. But SWIM describes piggybacking of messages: a ping
+ * message can piggyback a dissemination's one. SWIM has a main
+ * operating cycle during which it randomly chooses members from a
+ * member table and sends to them events + ping. Replies are
+ * processed out of the main cycle asynchronously.
+ *
+ * Random selection provides even network load about ~1 message to
+ * each member per protocol step regardless of the cluster size.
+ * Without randomness a member would get a network load of N
+ * messages each protocol step, since all other members will
+ * choose the same member on each step where N is the cluster size.
+ *
+ * Also SWIM describes a kind of fairness: when selecting a next
+ * member to ping, the protocol prefers LRU members. In code it
+ * would be too complicated, so Tarantool's implementation is
+ * slightly different, easier.
+ *
+ * Tarantool splits protocol operation into rounds. At the
+ * beginning of a round all members are randomly reordered and
+ * linked into a list. At each round step a member is popped from
+ * the list head, a message is sent to him, and he waits for the
+ * next round. In such implementation all random selection of the
+ * original SWIM is executed once per round. The round is
+ * 'planned' actually. A list is used instead of an array since
+ * new members can be added to its tail without realloc, and dead
+ * members can be removed as easy as that.
+ *
+ * Also Tarantool implements third component - anti-entropy. Why
+ * is it needed and even vital? Consider the example: two SWIM
+ * nodes, both are alive. Nothing happens, so the events list is
+ * empty, only pings are being sent periodically. Then a third
+ * node appears. It knows about one of existing nodes. How should
+ * it learn about another one? Sure, its known counterpart can try
+ * to notify another one, but it is UDP, so this event can lost.
+ * Anti-entropy is an extra simple component, it just piggybacks
+ * random part of members table with each regular round message.
+ * In the example above the new node will learn about the third
+ * one via anti-entropy messages of the second one soon or late.
+ *
+ * Surprisingly, original SWIM does not describe any addressing,
+ * how to uniquely identify a member. IP/port fallaciously could
+ * be considered as a good unique identifier, but some arguments
+ * below demolish this belief:
+ *
+ * - if instances work in separate containers, they can have
+ * the same IP/port inside a container NATed to a unique
+ * IP/port outside the container;
+ *
+ * - IP/port are likely to change during instance lifecycle.
+ * Once IP/port are changed, a ghost of the old member's
+ * configuration still lives for a while until it is
+ * suspected, dead and GC-ed. Taking into account that ACK
+ * timeout can be tens of seconds, 'Dead Souls' can exist
+ * unpleasantly long.
+ *
+ * Tarantool SWIM implementation uses UUIDs as unique identifiers.
+ * UUID is much more unlikely to change than IP/port. But even if
+ * that happens, dissemination component for a while gossips the
+ * new UUID together with the old one.
+ *
+ * SWIM implementation is split into 3 parts: protocol logic,
+ * transport level, protocol structure.
+ *
+ * - protocol logic consist of how to react on various events,
+ * failure detection pings/acks, how often to send messages,
+ * handles logic of three components (failure detection,
+ * anti-entropy, dissemination);
+ *
+ * - transport level handles routing, transport headers,
+ * packet forwarding;
+ *
+ * - protocol structure describes how packet looks in
+ * MessagePack, which section and header follows which
+ * another one.
+ */
+
+enum {
+ /**
+ * How often to send membership messages and pings in
+ * seconds. Nothing special in this concrete default
+ * value.
+ */
+ HEARTBEAT_RATE_DEFAULT = 1,
+};
+
+/**
+ * Take a random number not blindly calculating a modulo, but
+ * scaling random number down the given boundaries to preserve the
+ * original distribution. The result belongs the range
+ * [start, end].
+ */
+static inline int
+swim_scaled_rand(int start, int end)
+{
+ assert(end > start);
+ /*
+ * RAND_MAX is likely to be INT_MAX - hardly SWIM will
+ * ever be used in such a huge cluster.
+ */
+ assert(end - start < RAND_MAX);
+ return rand() / (RAND_MAX / (end - start + 1) + 1);
+}
+
+/** Calculate UUID hash to use as a members table key. */
+static inline uint32_t
+swim_uuid_hash(const struct tt_uuid *uuid)
+{
+ return mh_strn_hash((const char *) uuid, UUID_LEN);
+}
+
+/**
+ * Helper to do not call tt_static_buf() in all places where it is
+ * wanted to get string UUID.
+ */
+static inline const char *
+swim_uuid_str(const struct tt_uuid *uuid)
+{
+ char *buf = tt_static_buf();
+ tt_uuid_to_string(uuid, buf);
+ return buf;
+}
+
+/**
+ * A cluster member description. This structure describes the
+ * last known state of an instance, that is updated periodically
+ * via UDP according to SWIM protocol.
+ */
+struct swim_member {
+ /**
+ * Member status. Since the communication goes via UDP,
+ * actual status can be different, as well as different on
+ * other SWIM nodes. But SWIM guarantees that each member
+ * will learn a real status of an instance sometime.
+ */
+ enum swim_member_status status;
+ /** Address of the instance to which send UDP packets. */
+ struct sockaddr_in addr;
+ /** Unique identifier of the member. Members table key. */
+ struct tt_uuid uuid;
+ /** Cached hash of the uuid for members table lookups. */
+ uint32_t hash;
+ /**
+ * Position in a queue of members in the current round.
+ */
+ struct rlist in_queue_round;
+};
+
+#define mh_name _swim_table
+struct mh_swim_table_key {
+ uint32_t hash;
+ const struct tt_uuid *uuid;
+};
+#define mh_key_t struct mh_swim_table_key
+#define mh_node_t struct swim_member *
+#define mh_arg_t void *
+#define mh_hash(a, arg) ((*a)->hash)
+#define mh_hash_key(a, arg) (a.hash)
+#define mh_cmp(a, b, arg) (tt_uuid_compare(&(*a)->uuid, &(*b)->uuid))
+#define mh_cmp_key(a, b, arg) (tt_uuid_compare(a.uuid, &(*b)->uuid))
+#define MH_SOURCE 1
+#include "salad/mhash.h"
+
+/**
+ * SWIM instance. Stores configuration, manages periodical tasks,
+ * rounds. Each member has an object of this type on its host,
+ * while on others it is represented as a struct swim_member
+ * object.
+ */
+struct swim {
+ /**
+ * Global hash of all known members of the cluster. Hash
+ * key is UUID, value is a struct member, describing a
+ * remote instance. Discovered members live here until
+ * they are detected as dead - in such a case they are
+ * removed from the hash after a while.
+ */
+ struct mh_swim_table_t *members;
+ /**
+ * This node. Is used to do not send messages to self,
+ * it's meaningless. Also to refute false gossips about
+ * self status.
+ */
+ struct swim_member *self;
+ /**
+ * Members to which a message should be sent next during
+ * this round.
+ */
+ struct rlist queue_round;
+ /** Generator of round step events. */
+ struct ev_periodic round_tick;
+ /**
+ * Single round step task. It is impossible to have
+ * multiple round steps in the same SWIM instance at the
+ * same time, so it is single and preallocated per SWIM
+ * instance.
+ */
+ struct swim_task round_step_task;
+ /**
+ * Scheduler of output requests, receiver of incomming
+ * ones.
+ */
+ struct swim_scheduler scheduler;
+};
+
+/**
+ * A helper to get a pointer to a SWIM instance having only a
+ * pointer to it scheduler. It is used by task complete functions.
+ */
+static inline struct swim *
+swim_by_scheduler(struct swim_scheduler *scheduler)
+{
+ return container_of(scheduler, struct swim, scheduler);
+}
+
+/**
+ * Remove the member from all queues, hashes, destroy it and free
+ * the memory.
+ */
+static void
+swim_member_delete(struct swim *swim, struct swim_member *member)
+{
+ say_verbose("SWIM: member %s is deleted", swim_uuid_str(&member->uuid));
+ struct mh_swim_table_key key = {member->hash, &member->uuid};
+ mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
+ assert(rc != mh_end(swim->members));
+ mh_swim_table_del(swim->members, rc, NULL);
+ rlist_del_entry(member, in_queue_round);
+
+ free(member);
+}
+
+/** Find a member by UUID. */
+static inline struct swim_member *
+swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
+{
+ struct mh_swim_table_key key = {swim_uuid_hash(uuid), uuid};
+ mh_int_t node = mh_swim_table_find(swim->members, key, NULL);
+ if (node == mh_end(swim->members))
+ return NULL;
+ return *mh_swim_table_node(swim->members, node);
+}
+
+/**
+ * Register a new member with a specified status. Here it is
+ * added to the hash, to the 'next' queue.
+ */
+static struct swim_member *
+swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
+ const struct tt_uuid *uuid, enum swim_member_status status)
+{
+ struct swim_member *member =
+ (struct swim_member *) calloc(1, sizeof(*member));
+ if (member == NULL) {
+ diag_set(OutOfMemory, sizeof(*member), "calloc", "member");
+ return NULL;
+ }
+ member->status = status;
+ member->addr = *addr;
+ member->uuid = *uuid;
+ member->hash = swim_uuid_hash(uuid);
+ mh_int_t rc = mh_swim_table_put(swim->members,
+ (const struct swim_member **) &member,
+ NULL, NULL);
+ if (rc == mh_end(swim->members)) {
+ free(member);
+ diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
+ return NULL;
+ }
+ rlist_add_entry(&swim->queue_round, member, in_queue_round);
+
+ say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
+ return member;
+}
+
+/**
+ * Take all the members from the table and shuffle them randomly.
+ * Is used for forthcoming round planning.
+ */
+static struct swim_member **
+swim_shuffle_members(struct swim *swim)
+{
+ struct mh_swim_table_t *members = swim->members;
+ struct swim_member **shuffled;
+ int bsize = sizeof(shuffled[0]) * mh_size(members);
+ shuffled = (struct swim_member **) malloc(bsize);
+ if (shuffled == NULL) {
+ diag_set(OutOfMemory, bsize, "malloc", "shuffled");
+ return NULL;
+ }
+ int i = 0;
+ /*
+ * This shuffling preserves even distribution of a random
+ * sequence, that is proved by testing.
+ */
+ for (mh_int_t node = mh_first(members), end = mh_end(members);
+ node != end; node = mh_next(members, node), ++i) {
+ shuffled[i] = *mh_swim_table_node(members, node);
+ int j = swim_scaled_rand(0, i);
+ SWAP(shuffled[i], shuffled[j]);
+ }
+ return shuffled;
+}
+
+/**
+ * Shuffle members, build randomly ordered queue of addressees. In
+ * other words, do all round preparation work.
+ */
+static int
+swim_new_round(struct swim *swim)
+{
+ int size = mh_size(swim->members);
+ say_verbose("SWIM: start a new round with %d members", size);
+ struct swim_member **shuffled = swim_shuffle_members(swim);
+ if (shuffled == NULL)
+ return -1;
+ rlist_create(&swim->queue_round);
+ for (int i = 0; i < size; ++i) {
+ if (shuffled[i] != swim->self) {
+ rlist_add_entry(&swim->queue_round, shuffled[i],
+ in_queue_round);
+ }
+ }
+ free(shuffled);
+ return 0;
+}
+
+/**
+ * Encode anti-entropy header and random members data as many as
+ * possible to the end of the packet.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
+{
+ struct swim_anti_entropy_header_bin ae_header_bin;
+ struct swim_member_bin member_bin;
+ int size = sizeof(ae_header_bin);
+ char *header = swim_packet_reserve(packet, size);
+ if (header == NULL)
+ return 0;
+ swim_member_bin_create(&member_bin);
+ struct mh_swim_table_t *t = swim->members;
+ int i = 0, member_count = mh_size(t);
+ int rnd = swim_scaled_rand(0, member_count - 1);
+ for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
+ i < member_count; ++i) {
+ struct swim_member *m = *mh_swim_table_node(t, rc);
+ int new_size = size + sizeof(member_bin);
+ char *pos = swim_packet_reserve(packet, new_size);
+ if (pos == NULL)
+ break;
+ size = new_size;
+ swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
+ m->status);
+ memcpy(pos, &member_bin, sizeof(member_bin));
+ /*
+ * First random member could be choosen too close
+ * to the hash end. Here the cycle is wrapped, if
+ * a packet still has free memory, but the
+ * iterator has already reached the hash end.
+ */
+ rc = mh_next(t, rc);
+ if (rc == end)
+ rc = mh_first(t);
+ }
+ if (i == 0)
+ return 0;
+ swim_packet_advance(packet, size);
+ swim_anti_entropy_header_bin_create(&ae_header_bin, i);
+ memcpy(header, &ae_header_bin, sizeof(ae_header_bin));
+ return 1;
+}
+
+/**
+ * Encode source UUID.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static inline int
+swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet)
+{
+ struct swim_src_uuid_bin uuid_bin;
+ char *pos = swim_packet_alloc(packet, sizeof(uuid_bin));
+ if (pos == NULL)
+ return 0;
+ swim_src_uuid_bin_create(&uuid_bin, &swim->self->uuid);
+ memcpy(pos, &uuid_bin, sizeof(uuid_bin));
+ return 1;
+}
+
+/** Encode SWIM components into a UDP packet. */
+static void
+swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
+{
+ swim_packet_create(packet);
+ char *header = swim_packet_alloc(packet, 1);
+ int map_size = 0;
+ map_size += swim_encode_src_uuid(swim, packet);
+ map_size += swim_encode_anti_entropy(swim, packet);
+
+ assert(mp_sizeof_map(map_size) == 1 && map_size == 2);
+ mp_encode_map(header, map_size);
+}
+
+/**
+ * Once per specified timeout trigger a next round step. In round
+ * step a next memeber is taken from the round queue and a round
+ * message is sent to him. One member per step.
+ */
+static void
+swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ if (rlist_empty(&swim->queue_round) && swim_new_round(swim) != 0) {
+ diag_log();
+ return;
+ }
+ /*
+ * Possibly empty, if no members but self are specified.
+ */
+ if (rlist_empty(&swim->queue_round))
+ return;
+
+ swim_encode_round_msg(swim, &swim->round_step_task.packet);
+ struct swim_member *m =
+ rlist_first_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ swim_task_send(&swim->round_step_task, &m->addr, &swim->scheduler);
+ ev_periodic_stop(loop, p);
+}
+
+/**
+ * After a round message is sent, the addressee can be popped from
+ * the queue, and the next step is scheduled.
+ */
+static void
+swim_round_step_complete(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ (void) rc;
+ (void) task;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ ev_periodic_start(loop(), &swim->round_tick);
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+}
+
+/**
+ * Update member's UUID if it is changed. On UUID change the
+ * member is reinserted into the members table with a new UUID.
+ * @retval 0 Success.
+ * @retval -1 Error. Out of memory or the new UUID is already in
+ * use.
+ */
+static int
+swim_member_update_uuid(struct swim_member *member,
+ const struct tt_uuid *new_uuid, struct swim *swim)
+{
+ if (tt_uuid_is_equal(new_uuid, &member->uuid))
+ return 0;
+ if (swim_find_member(swim, new_uuid) != NULL) {
+ diag_set(SwimError, "duplicate UUID '%s'",
+ swim_uuid_str(new_uuid));
+ return -1;
+ }
+ struct mh_swim_table_t *t = swim->members;
+ struct tt_uuid old_uuid = member->uuid;
+ member->uuid = *new_uuid;
+ if (mh_swim_table_put(t, (const struct swim_member **) &member, NULL,
+ NULL) == mh_end(t)) {
+ member->uuid = old_uuid;
+ diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
+ return -1;
+ }
+ struct mh_swim_table_key key = {member->hash, &old_uuid};
+ mh_swim_table_del(t, mh_swim_table_find(t, key, NULL), NULL);
+ member->hash = swim_uuid_hash(new_uuid);
+ return 0;
+}
+
+/** Update member's address.*/
+static inline void
+swim_member_update_addr(struct swim_member *member,
+ const struct sockaddr_in *addr)
+{
+ member->addr = *addr;
+}
+
+/**
+ * Update or create a member by its definition, received from a
+ * remote instance.
+ * @retval NULL Error.
+ * @retval New member, or updated old member.
+ */
+static struct swim_member *
+swim_update_member(struct swim *swim, const struct swim_member_def *def)
+{
+ struct swim_member *member = swim_find_member(swim, &def->uuid);
+ if (member == NULL) {
+ member = swim_member_new(swim, &def->addr, &def->uuid,
+ def->status);
+ return member;
+ }
+ swim_member_update_addr(member, &def->addr);
+ return member;
+}
+
+/** Decode an anti-entropy message, update members table. */
+static int
+swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "invalid anti-entropy message:";
+ uint32_t size;
+ if (swim_decode_array(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ for (uint64_t i = 0; i < size; ++i) {
+ struct swim_member_def def;
+ if (swim_member_def_decode(&def, pos, end, msg_pref) != 0)
+ return -1;
+ if (swim_update_member(swim, &def) == NULL) {
+ /*
+ * Not a critical error. Other members
+ * still can be updated.
+ */
+ diag_log();
+ }
+ }
+ return 0;
+}
+
+/** Process a new message. */
+static void
+swim_on_input(struct swim_scheduler *scheduler, const char *pos,
+ const char *end, const struct sockaddr_in *src)
+{
+ (void) src;
+ const char *msg_pref = "invalid message:";
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct tt_uuid uuid;
+ uint32_t size;
+ if (swim_decode_map(&pos, end, &size, msg_pref, "root") != 0)
+ goto error;
+ if (size == 0) {
+ diag_set(SwimError, "%s body can not be empty", msg_pref);
+ goto error;
+ }
+ uint64_t key;
+ if (swim_decode_uint(&pos, end, &key, msg_pref, "a key") != 0)
+ goto error;
+ if (key != SWIM_SRC_UUID) {
+ diag_set(SwimError, "%s first key should be source UUID",
+ msg_pref);
+ goto error;
+ }
+ if (swim_decode_uuid(&uuid, &pos, end, msg_pref, "source uuid") != 0)
+ goto error;
+ --size;
+ for (uint32_t i = 0; i < size; ++i) {
+ if (swim_decode_uint(&pos, end, &key, msg_pref, "a key") != 0)
+ goto error;
+ switch(key) {
+ case SWIM_ANTI_ENTROPY:
+ say_verbose("SWIM: process anti-entropy");
+ if (swim_process_anti_entropy(swim, &pos, end) != 0)
+ goto error;
+ break;
+ default:
+ diag_set(SwimError, "%s unexpected key", msg_pref);
+ goto error;
+ }
+ }
+ return;
+error:
+ diag_log();
+}
+
+struct swim *
+swim_new(void)
+{
+ struct swim *swim = (struct swim *) calloc(1, sizeof(*swim));
+ if (swim == NULL) {
+ diag_set(OutOfMemory, sizeof(*swim), "calloc", "swim");
+ return NULL;
+ }
+ swim->members = mh_swim_table_new();
+ if (swim->members == NULL) {
+ free(swim);
+ diag_set(OutOfMemory, sizeof(*swim->members),
+ "mh_swim_table_new", "members");
+ return NULL;
+ }
+ rlist_create(&swim->queue_round);
+ ev_init(&swim->round_tick, swim_round_step_begin);
+ ev_periodic_set(&swim->round_tick, 0, HEARTBEAT_RATE_DEFAULT, NULL);
+ swim->round_tick.data = (void *) swim;
+ swim_task_create(&swim->round_step_task, swim_round_step_complete,
+ NULL);
+ swim_scheduler_create(&swim->scheduler, swim_on_input);
+ return swim;
+}
+
+/**
+ * Parse URI, filter out everything but IP addresses and ports,
+ * and fill a struct sockaddr_in.
+ */
+static inline int
+swim_uri_to_addr(const char *uri, struct sockaddr_in *addr,
+ const char *msg_pref)
+{
+ struct sockaddr_storage storage;
+ if (sio_uri_to_addr(uri, (struct sockaddr *) &storage) != 0)
+ return -1;
+ if (storage.ss_family != AF_INET) {
+ diag_set(IllegalParams, "%s only IP sockets are supported",
+ msg_pref);
+ return -1;
+ }
+ *addr = *((struct sockaddr_in *) &storage);
+ return 0;
+}
+
+int
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ const struct tt_uuid *uuid)
+{
+ const char *msg_pref = "swim.cfg:";
+ if (heartbeat_rate < 0) {
+ diag_set(IllegalParams, "%s heartbeat_rate should be a "\
+ "positive number", msg_pref);
+ return -1;
+ }
+ struct sockaddr_in addr;
+ if (uri != NULL && swim_uri_to_addr(uri, &addr, msg_pref) != 0)
+ return -1;
+ bool is_first_cfg = swim->self == NULL;
+ if (is_first_cfg) {
+ if (uuid == NULL || tt_uuid_is_nil(uuid) || uri == NULL) {
+ diag_set(SwimError, "%s UUID and URI are mandatory in "\
+ "a first config", msg_pref);
+ return -1;
+ }
+ swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE);
+ if (swim->self == NULL)
+ return -1;
+ } else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
+ uuid = &swim->self->uuid;
+ } else if (! tt_uuid_is_equal(uuid, &swim->self->uuid)) {
+ if (swim_find_member(swim, uuid) != NULL) {
+ diag_set(SwimError, "%s a member with such UUID "\
+ "already exists", msg_pref);
+ return -1;
+ }
+ /*
+ * Reserve one cell for reinsertion of self with a
+ * new UUID. Reserve is necessary for atomic
+ * reconfiguration. Without reservation it is
+ * possible that the instance is bound to a new
+ * URI, but failed to update UUID due to memory
+ * issues.
+ */
+ if (mh_swim_table_reserve(swim->members,
+ mh_size(swim->members) + 1,
+ NULL) != 0) {
+ diag_set(OutOfMemory, sizeof(mh_int_t), "malloc",
+ "node");
+ return -1;
+ }
+
+ }
+ if (uri != NULL && swim_scheduler_bind(&swim->scheduler, &addr) != 0) {
+ if (is_first_cfg) {
+ swim_member_delete(swim, swim->self);
+ swim->self = NULL;
+ }
+ return -1;
+ }
+ if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
+ ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
+
+ ev_periodic_start(loop(), &swim->round_tick);
+
+ if (! is_first_cfg) {
+ swim_member_update_addr(swim->self, &addr);
+ int rc = swim_member_update_uuid(swim->self, uuid, swim);
+ /* Reserved above. */
+ assert(rc == 0);
+ (void) rc;
+ }
+ return 0;
+}
+
+/**
+ * Check if a SWIM instance is not configured, and if so - set an
+ * error in a diagnostics area.
+ */
+static inline int
+swim_check_is_configured(const struct swim *swim, const char *msg_pref)
+{
+ if (swim->self != NULL)
+ return 0;
+ diag_set(SwimError, "%s the instance is not configured", msg_pref);
+ return -1;
+}
+
+int
+swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
+{
+ const char *msg_pref = "swim.add_member:";
+ if (swim_check_is_configured(swim, msg_pref) != 0)
+ return -1;
+ struct sockaddr_in addr;
+ if (swim_uri_to_addr(uri, &addr, msg_pref) != 0)
+ return -1;
+ struct swim_member *member = swim_find_member(swim, uuid);
+ if (member == NULL) {
+ member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE);
+ return member == NULL ? -1 : 0;
+ }
+ diag_set(SwimError, "%s a member with such UUID already exists",
+ msg_pref);
+ return 1;
+}
+
+int
+swim_remove_member(struct swim *swim, const struct tt_uuid *uuid)
+{
+ const char *msg_pref = "swim.remove_member:";
+ if (swim_check_is_configured(swim, msg_pref) != 0)
+ return -1;
+ struct swim_member *member = swim_find_member(swim, uuid);
+ if (member == NULL)
+ return 0;
+ if (member == swim->self) {
+ diag_set(SwimError, "%s can not remove self", msg_pref);
+ return -1;
+ }
+ swim_member_delete(swim, member);
+ return 0;
+}
+
+void
+swim_info(struct swim *swim, struct info_handler *info)
+{
+ info_begin(info);
+ for (mh_int_t node = mh_first(swim->members),
+ end = mh_end(swim->members); node != end;
+ node = mh_next(swim->members, node)) {
+ struct swim_member *m =
+ *mh_swim_table_node(swim->members, node);
+ info_table_begin(info,
+ sio_strfaddr((struct sockaddr *) &m->addr,
+ sizeof(m->addr)));
+ info_append_str(info, "status",
+ swim_member_status_strs[m->status]);
+ info_append_str(info, "uuid", swim_uuid_str(&m->uuid));
+ info_table_end(info);
+ }
+ info_end(info);
+}
+
+void
+swim_delete(struct swim *swim)
+{
+ swim_scheduler_destroy(&swim->scheduler);
+ ev_periodic_stop(loop(), &swim->round_tick);
+ swim_task_destroy(&swim->round_step_task);
+ mh_int_t node = mh_first(swim->members);
+ while (node != mh_end(swim->members)) {
+ struct swim_member *m =
+ *mh_swim_table_node(swim->members, node);
+ swim_member_delete(swim, m);
+ node = mh_first(swim->members);
+ }
+ mh_swim_table_delete(swim->members);
+}
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
new file mode 100644
index 000000000..a98decc86
--- /dev/null
+++ b/src/lib/swim/swim.h
@@ -0,0 +1,91 @@
+#ifndef TARANTOOL_SWIM_H_INCLUDED
+#define TARANTOOL_SWIM_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct info_handler;
+struct swim;
+struct tt_uuid;
+
+/**
+ * Create a new SWIM instance. Just creation without binding,
+ * setting any parameters. Allocation and initialization only.
+ */
+struct swim *
+swim_new(void);
+
+/**
+ * Configure or reconfigure a SWIM instance.
+ *
+ * @param swim SWIM instance to configure.
+ * @param uri URI in the format "ip:port".
+ * @param heartbeat_rate Rate of sending round messages. It does
+ * not mean that each member will be checked each
+ * @heartbeat_rate seconds. It is rather the protocol
+ * speed. Protocol period depends on member count and
+ * @heartbeat_rate.
+ * @param uuid UUID of this instance. Must be unique over the
+ * cluster.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
+ const struct tt_uuid *uuid);
+
+/**
+ * Stop listening and broadcasting messages, cleanup all internal
+ * structures, free memory.
+ */
+void
+swim_delete(struct swim *swim);
+
+/** Add a new member. */
+int
+swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid);
+
+/** Silently remove a member from members table. */
+int
+swim_remove_member(struct swim *swim, const struct tt_uuid *uuid);
+
+/** Dump member statuses into @a info. */
+void
+swim_info(struct swim *swim, struct info_handler *info);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* TARANTOOL_SWIM_H_INCLUDED */
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
new file mode 100644
index 000000000..20973acaf
--- /dev/null
+++ b/src/lib/swim/swim_io.c
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim_io.h"
+#include "swim_proto.h"
+#include "fiber.h"
+#include "sio.h"
+
+/**
+ * Allocate memory for meta. The same as mere alloc, but moves
+ * body pointer.
+ */
+static inline void
+swim_packet_alloc_meta(struct swim_packet *packet, int size)
+{
+ char *tmp = swim_packet_alloc(packet, size);
+ assert(tmp != NULL);
+ (void) tmp;
+ packet->body = packet->pos;
+}
+
+void
+swim_packet_create(struct swim_packet *packet)
+{
+ packet->pos = packet->body;
+ packet->body = packet->buf;
+ swim_packet_alloc_meta(packet, sizeof(struct swim_meta_header_bin));
+}
+
+void
+swim_task_create(struct swim_task *task, swim_task_f complete,
+ swim_task_f cancel)
+{
+ memset(task, 0, sizeof(*task));
+ task->complete = complete;
+ task->cancel = cancel;
+ swim_packet_create(&task->packet);
+ rlist_create(&task->in_queue_output);
+}
+
+/** Put the task into the queue of output tasks. */
+static inline void
+swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler)
+{
+ assert(rlist_empty(&task->in_queue_output));
+ rlist_add_tail_entry(&scheduler->queue_output, task, in_queue_output);
+ ev_io_start(loop(), &scheduler->output);
+}
+
+void
+swim_task_send(struct swim_task *task, const struct sockaddr_in *dst,
+ struct swim_scheduler *scheduler)
+{
+ task->dst = *dst;
+ swim_task_schedule(task, scheduler);
+}
+
+/**
+ * Dispatch a next output event. Build packet meta and send the
+ * packet.
+ */
+static void
+swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events);
+
+/**
+ * Dispatch a next input event. Unpack meta, forward a packet or
+ * propagate further to protocol logic.
+ */
+static void
+swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events);
+
+void
+swim_scheduler_create(struct swim_scheduler *scheduler,
+ swim_scheduler_on_input_f on_input)
+{
+ ev_init(&scheduler->output, swim_scheduler_on_output);
+ scheduler->output.data = (void *) scheduler;
+ ev_init(&scheduler->input, swim_scheduler_on_input);
+ scheduler->input.data = (void *) scheduler;
+ rlist_create(&scheduler->queue_output);
+ scheduler->on_input = on_input;
+ swim_transport_create(&scheduler->transport);
+}
+
+int
+swim_scheduler_bind(struct swim_scheduler *scheduler,
+ const struct sockaddr_in *addr)
+{
+ struct swim_transport *t = &scheduler->transport;
+ if (swim_transport_bind(t, (const struct sockaddr *) addr,
+ sizeof(*addr)) != 0)
+ return -1;
+ ev_io_set(&scheduler->input, t->fd, EV_READ);
+ ev_io_set(&scheduler->output, t->fd, EV_WRITE);
+ return 0;
+}
+
+void
+swim_scheduler_destroy(struct swim_scheduler *scheduler)
+{
+ struct swim_task *t, *tmp;
+ /*
+ * Use 'safe', because cancelation can delete the task
+ * from the queue, or even delete the task itself.
+ */
+ rlist_foreach_entry_safe(t, &scheduler->queue_output, in_queue_output,
+ tmp) {
+ if (t->cancel != NULL)
+ t->cancel(t, scheduler, -1);
+ }
+ swim_transport_destroy(&scheduler->transport);
+ ev_io_stop(loop(), &scheduler->output);
+ ev_io_stop(loop(), &scheduler->input);
+}
+
+static void
+swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_WRITE) != 0);
+ (void) events;
+ struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+ if (rlist_empty(&scheduler->queue_output)) {
+ /*
+ * Possible, if a member pushed a task and then
+ * was deleted together with it.
+ */
+ ev_io_stop(loop, io);
+ return;
+ }
+ struct swim_task *task =
+ rlist_shift_entry(&scheduler->queue_output, struct swim_task,
+ in_queue_output);
+ say_verbose("SWIM: send to %s",
+ sio_strfaddr((struct sockaddr *) &task->dst,
+ sizeof(task->dst)));
+ struct swim_meta_header_bin header;
+ swim_meta_header_bin_create(&header, &scheduler->transport.addr);
+ memcpy(task->packet.meta, &header, sizeof(header));
+ int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
+ task->packet.pos - task->packet.buf,
+ (const struct sockaddr *) &task->dst,
+ sizeof(task->dst));
+ if (rc != 0)
+ diag_log();
+ if (task->complete != NULL)
+ task->complete(task, scheduler, rc);
+}
+
+static void
+swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
+{
+ assert((events & EV_READ) != 0);
+ (void) events;
+ (void) loop;
+ struct swim_scheduler *scheduler = (struct swim_scheduler *) io->data;
+ struct sockaddr_in src;
+ socklen_t len = sizeof(src);
+ char buf[UDP_PACKET_SIZE];
+ ssize_t size = swim_transport_recv(&scheduler->transport, buf,
+ sizeof(buf),
+ (struct sockaddr *) &src, &len);
+ if (size <= 0) {
+ if (size < 0)
+ goto error;
+ return;
+ }
+ say_verbose("SWIM: received from %s",
+ sio_strfaddr((struct sockaddr *) &src, len));
+ struct swim_meta_def meta;
+ const char *pos = buf, *end = pos + size;
+ if (swim_meta_def_decode(&meta, &pos, end) < 0)
+ goto error;
+ scheduler->on_input(scheduler, pos, end, &meta.src);
+ return;
+error:
+ diag_log();
+}
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
new file mode 100644
index 000000000..68fb89818
--- /dev/null
+++ b/src/lib/swim/swim_io.h
@@ -0,0 +1,225 @@
+#ifndef TARANTOOL_SWIM_IO_H_INCLUDED
+#define TARANTOOL_SWIM_IO_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "trivia/util.h"
+#include "small/rlist.h"
+#include "salad/stailq.h"
+#include "swim_transport.h"
+#include "tarantool_ev.h"
+#include <stdbool.h>
+#include <arpa/inet.h>
+
+/**
+ * SWIM protocol transport level.
+ */
+
+struct swim_task;
+struct swim_scheduler;
+
+enum {
+ /**
+ * Default MTU is 1500. MTU (when IPv4 is used) consists
+ * of IPv4 header, UDP header, Data. IPv4 has 20 bytes
+ * header, UDP - 8 bytes. So Data = 1500 - 20 - 8 = 1472.
+ * TODO: adapt to other MTUs which can be reduced in some
+ * networks by their admins. Or allow to specify MTU in
+ * configuration.
+ */
+ UDP_PACKET_SIZE = 1472,
+};
+
+/**
+ * UDP packet. Works as an allocator, allowing to fill its body
+ * gradually, while preserving prefix for metadata.
+ *
+ * < - - - -UDP_PACKET_SIZE- - - - ->
+ * +--------+-----------------------+
+ * | meta | body | *free* |
+ * +--------+-----------------------+
+ * ^ ^ ^ ^
+ * meta body pos end
+ * buf
+ */
+struct swim_packet {
+ /** End of the body. */
+ char *pos;
+ /**
+ * Starting position of body in the buffer. Not the same
+ * as buf, because the latter has metadata at the
+ * beginning.
+ */
+ char *body;
+ /**
+ * Alias for swim_packet.buf. Just sugar for code working
+ * with meta.
+ */
+ char meta[0];
+ /** Packet body buffer. */
+ char buf[UDP_PACKET_SIZE];
+ /**
+ * Pointer to the end of the buffer. Just sugar to do not
+ * write 'buf + sizeof(buf)' each time.
+ */
+ char end[0];
+};
+
+/**
+ * Ensure that the packet can fit @a size bytes more. Multiple
+ * reserves of the same size will return the same pointer until
+ * advance is called.
+ */
+static inline char *
+swim_packet_reserve(struct swim_packet *packet, int size)
+{
+ return packet->pos + size > packet->end ? NULL : packet->pos;
+}
+
+/**
+ * Propagate body end pointer. This declares next @a size bytes as
+ * occupied.
+ */
+static inline void
+swim_packet_advance(struct swim_packet *packet, int size)
+{
+ assert(packet->pos + size <= packet->end);
+ packet->pos += size;
+}
+
+/** Reserve + advance. */
+static inline char *
+swim_packet_alloc(struct swim_packet *packet, int size)
+{
+ char *res = swim_packet_reserve(packet, size);
+ if (res == NULL)
+ return NULL;
+ swim_packet_advance(packet, size);
+ return res;
+}
+
+/** Initialize @a packet, reserve some space for meta. */
+void
+swim_packet_create(struct swim_packet *packet);
+
+typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
+ const char *buf, const char *end,
+ const struct sockaddr_in *src);
+
+/** Planner and executor of input and output operations.*/
+struct swim_scheduler {
+ /** Transport to send/receive packets. */
+ struct swim_transport transport;
+ /**
+ * Function called when a packet is received. It takes
+ * packet body, while meta is handled by transport level
+ * completely.
+ */
+ swim_scheduler_on_input_f on_input;
+ /**
+ * Event dispatcher of incomming messages. Takes them from
+ * the network.
+ */
+ struct ev_io input;
+ /**
+ * Event dispatcher of outcomming messages. Takes tasks
+ * from queue_output.
+ */
+ struct ev_io output;
+ /** Queue of output tasks ready to write now. */
+ struct rlist queue_output;
+};
+
+/** Initialize scheduler. */
+void
+swim_scheduler_create(struct swim_scheduler *scheduler,
+ swim_scheduler_on_input_f on_input);
+
+/**
+ * Bind or rebind the scheduler to an address. In case of rebind
+ * the old socket is closed.
+ */
+int
+swim_scheduler_bind(struct swim_scheduler *scheduler,
+ const struct sockaddr_in *addr);
+
+/** Destroy scheduler, its queues, close the socket. */
+void
+swim_scheduler_destroy(struct swim_scheduler *scheduler);
+
+/**
+ * Each SWIM component in a common case independently may want to
+ * push some data into the network. Dissemination sends events,
+ * failure detection sends pings, acks. Anti-entropy sends member
+ * tables. The intention to send a data is called IO task and is
+ * stored in a queue that is dispatched when output is possible.
+ */
+typedef void (*swim_task_f)(struct swim_task *,
+ struct swim_scheduler *scheduler, int rc);
+
+struct swim_task {
+ /**
+ * Function called when the task has completed. Error code
+ * or 0 are passed as an argument.
+ */
+ swim_task_f complete;
+ /**
+ * Function, called when a scheduler is under destruction,
+ * and it cancels all its tasks.
+ */
+ swim_task_f cancel;
+ /** Packet to send. */
+ struct swim_packet packet;
+ /** Destination address. */
+ struct sockaddr_in dst;
+ /** Place in a queue of tasks. */
+ struct rlist in_queue_output;
+};
+
+/**
+ * Put the task into a queue of tasks. Eventually it will be sent.
+ */
+void
+swim_task_send(struct swim_task *task, const struct sockaddr_in *dst,
+ struct swim_scheduler *scheduler);
+
+/** Initialize the task, without scheduling. */
+void
+swim_task_create(struct swim_task *task, swim_task_f complete,
+ swim_task_f cancel);
+
+/** Destroy the task, pop from the queue. */
+static inline void
+swim_task_destroy(struct swim_task *task)
+{
+ rlist_del_entry(task, in_queue_output);
+}
+
+#endif /* TARANTOOL_SWIM_IO_H_INCLUDED */
\ No newline at end of file
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
new file mode 100644
index 000000000..a273cb815
--- /dev/null
+++ b/src/lib/swim/swim_proto.c
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim_proto.h"
+#include "msgpuck.h"
+#include "say.h"
+#include "version.h"
+#include "diag.h"
+
+const char *swim_member_status_strs[] = {
+ "alive",
+};
+
+int
+swim_decode_map(const char **pos, const char *end, uint32_t *size,
+ const char *msg_pref, const char *param_name)
+{
+ if (mp_typeof(**pos) != MP_MAP || mp_check_map(*pos, end) > 0) {
+ diag_set(SwimError, "%s %s should be a map", msg_pref,
+ param_name);
+ return -1;
+ }
+ *size = mp_decode_map(pos);
+ return 0;
+}
+
+int
+swim_decode_array(const char **pos, const char *end, uint32_t *size,
+ const char *msg_pref, const char *param_name)
+{
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ diag_set(SwimError, "%s %s should be an array", msg_pref,
+ param_name);
+ return -1;
+ }
+ *size = mp_decode_array(pos);
+ return 0;
+}
+
+int
+swim_decode_uint(const char **pos, const char *end, uint64_t *value,
+ const char *msg_pref, const char *param_name)
+{
+ if (mp_typeof(**pos) != MP_UINT || mp_check_uint(*pos, end) > 0) {
+ diag_set(SwimError, "%s %s should be a uint", msg_pref,
+ param_name);
+ return -1;
+ }
+ *value = mp_decode_uint(pos);
+ return 0;
+}
+
+static inline int
+swim_decode_ip(struct sockaddr_in *address, const char **pos, const char *end,
+ const char *msg_pref, const char *param_name)
+{
+ uint64_t ip;
+ if (swim_decode_uint(pos, end, &ip, msg_pref, param_name) != 0)
+ return -1;
+ if (ip > UINT32_MAX) {
+ diag_set(SwimError, "%s %s is an invalid IP address", msg_pref,
+ param_name);
+ return -1;
+ }
+ address->sin_addr.s_addr = ip;
+ return 0;
+}
+
+static inline int
+swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end,
+ const char *msg_pref, const char *param_name)
+{
+ uint64_t port;
+ if (swim_decode_uint(pos, end, &port, msg_pref, param_name) != 0)
+ return -1;
+ if (port > UINT16_MAX) {
+ diag_set(SwimError, "%s %s is an invalid port", msg_pref,
+ param_name);
+ return -1;
+ }
+ address->sin_port = port;
+ return 0;
+}
+
+int
+swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
+ const char *msg_pref, const char *param_name)
+{
+ if (mp_typeof(**pos) != MP_BIN || mp_check_binl(*pos, end) > 0) {
+ diag_set(SwimError, "%s %s should be bin", msg_pref,
+ param_name);
+ return -1;
+ }
+ if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) {
+ diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
+ return -1;
+ }
+ memcpy(uuid, *pos, UUID_LEN);
+ *pos += UUID_LEN;
+ return 0;
+}
+
+void
+swim_member_def_create(struct swim_member_def *def)
+{
+ memset(def, 0, sizeof(*def));
+ def->status = MEMBER_ALIVE;
+}
+
+/**
+ * Decode a MessagePack value of @a key and store it in @a def.
+ * @param key Key to read value of.
+ * @param[in][out] pos Where a value is stored.
+ * @param end End of the buffer.
+ * @param msg_pref Error message prefix.
+ * @param[out] def Where to store the value.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_decode_member_key(enum swim_member_key key, const char **pos,
+ const char *end, const char *msg_pref,
+ struct swim_member_def *def)
+{
+ uint64_t tmp;
+ switch (key) {
+ case SWIM_MEMBER_STATUS:
+ if (swim_decode_uint(pos, end, &tmp, msg_pref,
+ "member status") != 0)
+ return -1;
+ if (tmp >= swim_member_status_MAX) {
+ diag_set(SwimError, "%s unknown member status",
+ msg_pref);
+ return -1;
+ }
+ def->status = (enum swim_member_status) tmp;
+ break;
+ case SWIM_MEMBER_ADDRESS:
+ if (swim_decode_ip(&def->addr, pos, end, msg_pref,
+ "member address") != 0)
+ return -1;
+ break;
+ case SWIM_MEMBER_PORT:
+ if (swim_decode_port(&def->addr, pos, end, msg_pref,
+ "member port") != 0)
+ return -1;
+ break;
+ case SWIM_MEMBER_UUID:
+ if (swim_decode_uuid(&def->uuid, pos, end, msg_pref,
+ "member uuid") != 0)
+ return -1;
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
+int
+swim_member_def_decode(struct swim_member_def *def, const char **pos,
+ const char *end, const char *msg_pref)
+{
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "member") != 0)
+ return -1;
+ swim_member_def_create(def);
+ for (uint32_t j = 0; j < size; ++j) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, msg_pref,
+ "member key") != 0)
+ return -1;
+ if (key >= swim_member_key_MAX) {
+ diag_set(SwimError, "%s unknown member key", msg_pref);
+ return -1;
+ }
+ if (swim_decode_member_key(key, pos, end, msg_pref, def) != 0)
+ return -1;
+ }
+ if (def->addr.sin_port == 0 || def->addr.sin_addr.s_addr == 0) {
+ diag_set(SwimError, "%s member address is mandatory", msg_pref);
+ return -1;
+ }
+ if (tt_uuid_is_nil(&def->uuid)) {
+ diag_set(SwimError, "%s member uuid is mandatory", msg_pref);
+ return -1;
+ }
+ return 0;
+}
+
+void
+swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
+ const struct tt_uuid *uuid)
+{
+ header->k_uuid = SWIM_SRC_UUID;
+ header->m_uuid = 0xc4;
+ header->m_uuid_len = UUID_LEN;
+ memcpy(header->v_uuid, uuid, UUID_LEN);
+}
+
+void
+swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
+ uint16_t batch_size)
+{
+ header->k_anti_entropy = SWIM_ANTI_ENTROPY;
+ header->m_anti_entropy = 0xcd;
+ header->v_anti_entropy = mp_bswap_u16(batch_size);
+}
+
+void
+swim_member_bin_fill(struct swim_member_bin *header,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ enum swim_member_status status)
+{
+ header->v_status = status;
+ header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(addr->sin_port);
+ memcpy(header->v_uuid, uuid, UUID_LEN);
+}
+
+void
+swim_member_bin_create(struct swim_member_bin *header)
+{
+ header->m_header = 0x84;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDRESS;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_uuid = SWIM_MEMBER_UUID;
+ header->m_uuid = 0xc4;
+ header->m_uuid_len = UUID_LEN;
+}
+
+void
+swim_meta_header_bin_create(struct swim_meta_header_bin *header,
+ const struct sockaddr_in *src)
+{
+ header->m_header = 0x83;
+ header->k_version = SWIM_META_TARANTOOL_VERSION;
+ header->m_version = 0xce;
+ header->v_version = mp_bswap_u32(tarantool_version_id());
+ header->k_addr = SWIM_META_SRC_ADDRESS;
+ header->m_addr = 0xce;
+ header->v_addr = mp_bswap_u32(src->sin_addr.s_addr);
+ header->k_port = SWIM_META_SRC_PORT;
+ header->m_port = 0xcd;
+ header->v_port = mp_bswap_u16(src->sin_port);
+}
+
+int
+swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
+ const char *end)
+{
+ const char *msg_pref = "invalid meta section:";
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ memset(def, 0, sizeof(*def));
+ for (uint32_t i = 0; i < size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
+ return -1;
+ switch (key) {
+ case SWIM_META_TARANTOOL_VERSION:
+ if (swim_decode_uint(pos, end, &key, msg_pref,
+ "version") != 0)
+ return -1;
+ if (key > UINT32_MAX) {
+ diag_set(SwimError, "%s invalid version, too "\
+ "big", msg_pref);
+ return -1;
+ }
+ def->version = key;
+ break;
+ case SWIM_META_SRC_ADDRESS:
+ if (swim_decode_ip(&def->src, pos, end, msg_pref,
+ "source address") != 0)
+ return -1;
+ break;
+ case SWIM_META_SRC_PORT:
+ if (swim_decode_port(&def->src, pos, end, msg_pref,
+ "source port") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (def->version == 0) {
+ diag_set(SwimError, "%s version is mandatory", msg_pref);
+ return -1;
+ }
+ if (def->src.sin_port == 0 || def->src.sin_addr.s_addr == 0) {
+ diag_set(SwimError, "%s source address is mandatory", msg_pref);
+ return -1;
+ }
+ return 0;
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
new file mode 100644
index 000000000..6e36f0b07
--- /dev/null
+++ b/src/lib/swim/swim_proto.h
@@ -0,0 +1,320 @@
+#ifndef TARANTOOL_SWIM_PROTO_H_INCLUDED
+#define TARANTOOL_SWIM_PROTO_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "trivia/util.h"
+#include "tt_uuid.h"
+#include <arpa/inet.h>
+#include <stdbool.h>
+
+/**
+ * SWIM binary protocol structures and helpers. Below is a picture
+ * of a SWIM message template:
+ *
+ * +----------Meta section, handled by transport level-----------+
+ * | { |
+ * | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
+ * | SWIM_META_SRC_ADDRESS: uint, ip, |
+ * | SWIM_META_SRC_PORT: uint, port |
+ * | } |
+ * +-------------------Protocol logic section--------------------+
+ * | { |
+ * | SWIM_SRC_UUID: 16 byte UUID, |
+ * | |
+ * | AND |
+ * | |
+ * | SWIM_ANTI_ENTROPY: [ |
+ * | { |
+ * | SWIM_MEMBER_STATUS: uint, enum member_status, |
+ * | SWIM_MEMBER_ADDRESS: uint, ip, |
+ * | SWIM_MEMBER_PORT: uint, port, |
+ * | SWIM_MEMBER_UUID: 16 byte UUID |
+ * | }, |
+ * | ... |
+ * | ], |
+ * | } |
+ * +-------------------------------------------------------------+
+ */
+
+enum swim_member_status {
+ /** The instance is ok, responds to requests. */
+ MEMBER_ALIVE = 0,
+ swim_member_status_MAX,
+};
+
+extern const char *swim_member_status_strs[];
+
+/**
+ * SWIM member attributes from anti-entropy and dissemination
+ * messages.
+ */
+struct swim_member_def {
+ struct tt_uuid uuid;
+ struct sockaddr_in addr;
+ enum swim_member_status status;
+};
+
+/** Initialize the definition with default values. */
+void
+swim_member_def_create(struct swim_member_def *def);
+
+/**
+ * Decode member definition from a MessagePack buffer.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos Start of the MessagePack buffer.
+ * @param end End of the MessagePack buffer.
+ * @param msg_pref A prefix of an error message to use for
+ * diag_set, when something is wrong.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_member_def_decode(struct swim_member_def *def, const char **pos,
+ const char *end, const char *msg_pref);
+
+/**
+ * Main round messages can carry merged failure detection
+ * messages and anti-entropy. With these keys the components can
+ * be distinguished from each other.
+ */
+enum swim_body_key {
+ SWIM_SRC_UUID = 0,
+ SWIM_ANTI_ENTROPY,
+};
+
+/**
+ * One of SWIM packet body components - SWIM_SRC_UUID. It is not
+ * in the meta section, handled by the transport, because the
+ * transport has nothing to do with UUIDs - it operates by IP/port
+ * only. This component shall be first in message's body.
+ */
+struct PACKED swim_src_uuid_bin {
+ /** mp_encode_uint(SWIM_SRC_UUID) */
+ uint8_t k_uuid;
+ /** mp_encode_bin(UUID_LEN) */
+ uint8_t m_uuid;
+ uint8_t m_uuid_len;
+ uint8_t v_uuid[UUID_LEN];
+};
+
+/** Initialize source UUID section. */
+void
+swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
+ const struct tt_uuid *uuid);
+
+/** {{{ Anti-entropy component */
+
+/**
+ * Attributes of each record of a broadcasted members table. Just
+ * the same as some of struct swim_member attributes.
+ */
+enum swim_member_key {
+ SWIM_MEMBER_STATUS = 0,
+ SWIM_MEMBER_ADDRESS,
+ SWIM_MEMBER_PORT,
+ SWIM_MEMBER_UUID,
+ swim_member_key_MAX,
+};
+
+/** SWIM anti-entropy MessagePack header template. */
+struct PACKED swim_anti_entropy_header_bin {
+ /** mp_encode_uint(SWIM_ANTI_ENTROPY) */
+ uint8_t k_anti_entropy;
+ /** mp_encode_array(...) */
+ uint8_t m_anti_entropy;
+ uint16_t v_anti_entropy;
+};
+
+/** Initialize SWIM_ANTI_ENTROPY header. */
+void
+swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
+ uint16_t batch_size);
+
+/**
+ * SWIM member MessagePack template. Represents one record in
+ * anti-entropy section.
+ */
+struct PACKED swim_member_bin {
+ /** mp_encode_map(4) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_UUID) */
+ uint8_t k_uuid;
+ /** mp_encode_bin(UUID_LEN) */
+ uint8_t m_uuid;
+ uint8_t m_uuid_len;
+ uint8_t v_uuid[UUID_LEN];
+};
+
+/** Initialize antri-entropy record. */
+void
+swim_member_bin_create(struct swim_member_bin *header);
+
+/**
+ * Since usually there are many members, it is faster to reset a
+ * few fields in an existing template, then each time create a
+ * new template. So the usage pattern is create(), fill(),
+ * fill() ... .
+ */
+void
+swim_member_bin_fill(struct swim_member_bin *header,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ enum swim_member_status status);
+
+/** }}} Anti-entropy component */
+
+/** {{{ Meta component */
+
+/**
+ * Meta component keys, completely handled by the transport level.
+ */
+enum swim_meta_key {
+ /**
+ * Version is now unused, but in future can help in
+ * the protocol improvement, extension.
+ */
+ SWIM_META_TARANTOOL_VERSION = 0,
+ /**
+ * Source IP/port are stored in body of UDP packet despite
+ * the fact that UDP has them in its header. This is
+ * because
+ * - packet body is going to be encrypted, but header
+ * is still open and anybody can catch the packet,
+ * change source IP/port, and therefore execute
+ * man-in-the-middle attack;
+ *
+ * - some network filters can change the address to an
+ * address of a router or another device.
+ */
+ SWIM_META_SRC_ADDRESS,
+ SWIM_META_SRC_PORT,
+};
+
+/**
+ * Each SWIM packet carries meta info, which helps to determine
+ * SWIM protocol version, final packet destination and any other
+ * internal details, not linked with etalon SWIM protocol.
+ *
+ * The meta header is mandatory, preceeds main protocol data as a
+ * separate MessagePack map.
+ */
+struct PACKED swim_meta_header_bin {
+ /** mp_encode_map(3) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
+ uint8_t k_version;
+ /** mp_encode_uint(tarantool_version_id()) */
+ uint8_t m_version;
+ uint32_t v_version;
+
+ /** mp_encode_uint(SWIM_META_SRC_ADDRESS) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_META_SRC_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+};
+
+/** Initialize meta section. */
+void
+swim_meta_header_bin_create(struct swim_meta_header_bin *header,
+ const struct sockaddr_in *src);
+
+/** Meta definition. */
+struct swim_meta_def {
+ /** Tarantool version. */
+ uint32_t version;
+ /** Source of the message. */
+ struct sockaddr_in src;
+};
+
+/**
+ * Decode meta section into its definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
+ const char *end);
+
+/** }}} Meta component */
+
+/**
+ * Helpers to decode some values - map, array, etc with
+ * appropriate checks. All of them set diagnostics on an error
+ * with a specified message prefix and a parameter name.
+ */
+
+int
+swim_decode_map(const char **pos, const char *end, uint32_t *size,
+ const char *msg_pref, const char *param_name);
+
+int
+swim_decode_array(const char **pos, const char *end, uint32_t *size,
+ const char *msg_pref, const char *param_name);
+
+int
+swim_decode_uint(const char **pos, const char *end, uint64_t *value,
+ const char *msg_pref, const char *param_name);
+
+int
+swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
+ const char *msg_pref, const char *param_name);
+
+#endif /* TARANTOOL_SWIM_PROTO_H_INCLUDED */
diff --git a/src/lib/swim/swim_transport.h b/src/lib/swim/swim_transport.h
new file mode 100644
index 000000000..fa569caaa
--- /dev/null
+++ b/src/lib/swim/swim_transport.h
@@ -0,0 +1,73 @@
+#ifndef TARANTOOL_SWIM_TRANSPORT_H_INCLUDED
+#define TARANTOOL_SWIM_TRANSPORT_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "trivia/util.h"
+#include <arpa/inet.h>
+
+/** Transport implementation. */
+struct swim_transport {
+ /** Socket. */
+ int fd;
+ /** Socket address. */
+ struct sockaddr_in addr;
+};
+
+/**
+ * Despite there are no transport vtab, those are virtual methods.
+ * But virtualization is handled on compilation time. This header
+ * file has one implementation for server, and another for tests.
+ * Transport source is built as a separate library.
+ *
+ * Methods below for server are just wrappers of corresponding
+ * system calls, working with UDP sockets.
+ */
+
+ssize_t
+swim_transport_send(struct swim_transport *transport, const void *data,
+ size_t size, const struct sockaddr *addr,
+ socklen_t addr_size);
+
+ssize_t
+swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size,
+ struct sockaddr *addr, socklen_t *addr_size);
+
+int
+swim_transport_bind(struct swim_transport *transport,
+ const struct sockaddr *addr, socklen_t addr_len);
+
+void
+swim_transport_destroy(struct swim_transport *transport);
+
+void
+swim_transport_create(struct swim_transport *transport);
+
+#endif /* TARANTOOL_SWIM_TRANSPORT_H_INCLUDED */
diff --git a/src/lua/init.c b/src/lua/init.c
index ca4b47f3a..5b47aa3e3 100644
--- a/src/lua/init.c
+++ b/src/lua/init.c
@@ -58,6 +58,7 @@
#include "lua/fio.h"
#include "lua/httpc.h"
#include "lua/utf8.h"
+#include "lua/swim.h"
#include "digest.h"
#include <small/ibuf.h>
@@ -450,6 +451,7 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv)
tarantool_lua_socket_init(L);
tarantool_lua_pickle_init(L);
tarantool_lua_digest_init(L);
+ tarantool_lua_swim_init(L);
luaopen_http_client_driver(L);
lua_pop(L, 1);
luaopen_msgpack(L);
diff --git a/src/lua/swim.c b/src/lua/swim.c
new file mode 100644
index 000000000..317f53e73
--- /dev/null
+++ b/src/lua/swim.c
@@ -0,0 +1,370 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "utils.h"
+#include "diag.h"
+#include "tt_uuid.h"
+#include "swim/swim.h"
+#include "swim/swim_transport.h"
+#include "small/ibuf.h"
+#include "lua/info.h"
+#include <info.h>
+
+/** SWIM instances are pushed as cdata with this id. */
+uint32_t CTID_STRUCT_SWIM_PTR;
+
+/**
+ * Get @a n-th value from a Lua stack as a struct swim pointer.
+ * @param L Lua state.
+ * @param n Where pointer is stored on Lua stack.
+ *
+ * @retval NULL The stack position does not exist or it is not a
+ * struct swim pointer.
+ * @retval not NULL Valid SWIM pointer.
+ */
+static inline struct swim *
+lua_swim_ptr(struct lua_State *L, int n)
+{
+ uint32_t ctypeid;
+ if (lua_type(L, n) != LUA_TCDATA)
+ return NULL;
+ void *swim = luaL_checkcdata(L, n, &ctypeid);
+ if (ctypeid != CTID_STRUCT_SWIM_PTR)
+ return NULL;
+ return *(struct swim **) swim;
+}
+
+/**
+ * Delete SWIM instance passed via first Lua stack position. Used
+ * by Lua GC.
+ */
+static int
+lua_swim_gc(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "SWIM gc expected struct swim *");
+ swim_delete(swim);
+ return 0;
+}
+
+/**
+ * Get a value from a table that is supposed to be a timeout.
+ * @param L Lua state.
+ * @param ncfg Where on the Lua stack a table with the timeout is
+ * stored.
+ * @param fieldname Name of the table field storing the timeout.
+ * @param funcname Caller function name, used to build a detailed
+ * error message.
+ *
+ * @retval 0 > A timeout value.
+ * @retval -1 The field is nil.
+ */
+static inline double
+lua_swim_get_timeout_field(struct lua_State *L, int ncfg, const char *fieldname,
+ const char *funcname)
+{
+ double timeout = -1;
+ lua_getfield(L, ncfg, fieldname);
+ if (lua_isnumber(L, -1)) {
+ timeout = lua_tonumber(L, -1);
+ if (timeout <= 0) {
+ return luaL_error(L, "swim.%s: %s should be positive "\
+ "number", funcname, fieldname);
+ }
+ } else if (! lua_isnil(L, -1)) {
+ return luaL_error(L, "swim.%s: %s should be positive number",
+ funcname, fieldname);
+ }
+ lua_pop(L, 1);
+ return timeout;
+}
+
+/**
+ * Get a value from a table that is supposed to be a UUID.
+ * @param L Lua state.
+ * @param ncfg Where on the Lua stack a table with the UUID is
+ * stored.
+ * @param fieldname Name of the table field storing the UUID.
+ * @param funcname Caller function name, used to build a detailed
+ * error message.
+ * @param[out] uuid Result UUID. Nil UUID is stored, if the field
+ * was nil.
+ */
+static inline void
+lua_swim_get_uuid_field(struct lua_State *L, int ncfg, const char *fieldname,
+ const char *funcname, struct tt_uuid *uuid)
+{
+ lua_getfield(L, ncfg, fieldname);
+ if (lua_isstring(L, -1)) {
+ if (tt_uuid_from_string(lua_tostring(L, -1), uuid) != 0) {
+ luaL_error(L, "swim.%s: %s is invalid", funcname,
+ fieldname);
+ }
+ } else if (lua_isnil(L, -1)) {
+ *uuid = uuid_nil;
+ } else {
+ luaL_error(L, "swim.%s: %s should be a string", funcname,
+ fieldname);
+ }
+ lua_pop(L, 1);
+}
+
+/**
+ * Get a value from a table that is supposed to be a URI.
+ * @param L Lua state.
+ * @param ncfg Where on the Lua stack a table with the URI is
+ * stored.
+ * @param fieldname Name of the table field storing the URI.
+ * @param funcname Caller function name, used to build a detailed
+ * error message.
+ *
+ * @retval not NULL A URI.
+ * @retval NULL The field is nil.
+ */
+static inline const char *
+lua_swim_get_uri_field(struct lua_State *L, int ncfg, const char *fieldname,
+ const char *funcname)
+{
+ const char *uri = NULL;
+ lua_getfield(L, ncfg, fieldname);
+ if (lua_isstring(L, -1)) {
+ uri = lua_tostring(L, -1);
+ } else if (! lua_isnil(L, -1)) {
+ luaL_error(L, "swim.%s: %s should be a string URI", funcname,
+ fieldname);
+ }
+ lua_pop(L, 1);
+ return uri;
+}
+
+/**
+ * Configure @a swim instance using a table stored in @a ncfg-th
+ * position on the Lua stack.
+ * @param L Lua state.
+ * @param ncfg Where configuration is stored on the Lua stack.
+ * @param swim SWIM instance to configure.
+ * @param funcname Caller function name to use in error messages.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error, stored in diagnostics area. Critical errors
+ * like OOM or incorrect usage are thrown.
+ */
+static int
+lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim,
+ const char *funcname)
+{
+ if (! lua_istable(L, ncfg)) {
+ return luaL_error(L, "swim.%s: expected table config",
+ funcname);
+ }
+ const char *server_uri =
+ lua_swim_get_uri_field(L, ncfg, "server", funcname);
+ struct tt_uuid uuid;
+ lua_swim_get_uuid_field(L, ncfg, "uuid", funcname, &uuid);
+ double heartbeat_rate =
+ lua_swim_get_timeout_field(L, ncfg, "heartbeat", funcname);
+
+ return swim_cfg(swim, server_uri, heartbeat_rate, &uuid);
+}
+
+/**
+ * Create a new SWIM instance. The Lua stack can contain either 0
+ * parameters to just create a new non-configured SWIM instance,
+ * or 1 parameter with a config to configure the new instance
+ * immediately.
+ * @param L Lua state.
+ * @retval 1 A SWIM instance.
+ * @retval 2 Nil and an error object. On invalid Lua parameters
+ * and OOM it throws.
+ */
+static int
+lua_swim_new(struct lua_State *L)
+{
+ int top = lua_gettop(L);
+ if (top > 1)
+ return luaL_error(L, "Usage: swim.new([{<config>}])");
+ struct swim *swim = swim_new();
+ if (swim != NULL) {
+ *(struct swim **)luaL_pushcdata(L, CTID_STRUCT_SWIM_PTR) = swim;
+ lua_pushcfunction(L, lua_swim_gc);
+ luaL_setcdatagc(L, -2);
+ if (top == 0 || lua_swim_cfg_impl(L, 1, swim, "new") == 0)
+ return 1;
+ lua_pop(L, 1);
+ }
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+}
+
+/**
+ * Configure an existing SWIM instance. The Lua stack should
+ * contain two values - a SWIM instance to configure, and a
+ * config.
+ * @param L Lua state.
+ * @retval 1 True.
+ * @retval 2 Nil and an error object. On invalid Lua parameters
+ * and OOM it throws.
+ */
+static int
+lua_swim_cfg(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:cfg({<config>})");
+ if (lua_swim_cfg_impl(L, 2, swim, "cfg") != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+/**
+ * Add a new member to a SWIM instance. The Lua stack should
+ * contain two values - a SWIM instance to add to, and a config of
+ * a new member. Config is a table, containing UUID and URI keys.
+ * @param L Lua state.
+ * @retval 1 True.
+ * @retval 2 Nil and an error object. On invalid Lua parameters
+ * and OOM it throws.
+ */
+static int
+lua_swim_add_member(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (lua_gettop(L) != 2 || swim == NULL || !lua_istable(L, 1))
+ return luaL_error(L, "Usage: swim:add_member({<config>})");
+ const char *uri = lua_swim_get_uri_field(L, 1, "uri", "add_member");
+ struct tt_uuid uuid;
+ lua_swim_get_uuid_field(L, 1, "uuid", "add_member", &uuid);
+
+ if (swim_add_member(swim, uri, &uuid) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+/**
+ * Silently remove a member from a SWIM instance's members table.
+ * The Lua stack should contain two values - a SWIM instance to
+ * remove from, and a UUID of a sentenced member.
+ * @param L Lua state.
+ * @retval 1 True.
+ * @retval 2 Nil and an error object. On invalid Lua parameters
+ * and OOM it throws.
+ */
+static int
+lua_swim_remove_member(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (lua_gettop(L) != 2 || swim == NULL)
+ return luaL_error(L, "Usage: swim:remove_member(uuid)");
+ if (! lua_isstring(L, -1)) {
+ return luaL_error(L, "swim.remove_member: member UUID should "\
+ "be a string");
+ }
+ struct tt_uuid uuid;
+ if (tt_uuid_from_string(lua_tostring(L, 1), &uuid) != 0)
+ return luaL_error(L, "swim.remove_member: invalid UUID");
+
+ if (swim_remove_member(swim, &uuid) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+/**
+ * Destroy and delete a SWIM instance. All its memory is freed, it
+ * stops participating in any rounds, the socket is closed. No
+ * special quit messages are broadcasted - the quit is silent. So
+ * other members will think that this one is dead. The Lua stack
+ * should contain one value - a SWIM instance to delete.
+ */
+static int
+lua_swim_delete(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:delete()");
+ swim_delete(swim);
+ uint32_t ctypeid;
+ struct swim **cdata = (struct swim **) luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == CTID_STRUCT_SWIM_PTR);
+ *cdata = NULL;
+ return 0;
+}
+
+/**
+ * Collect information about this instance's members table.
+ * @param L Lua state.
+ * @retval 1 Info table.
+ */
+static int
+lua_swim_info(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (swim == NULL)
+ return luaL_error(L, "Usage: swim:info()");
+ struct info_handler info;
+ luaT_info_handler_create(&info, L);
+ swim_info(swim, &info);
+ return 1;
+}
+
+void
+tarantool_lua_swim_init(struct lua_State *L)
+{
+ static const struct luaL_Reg lua_swim_methods [] = {
+ {"new", lua_swim_new},
+ {"cfg", lua_swim_cfg},
+ {"add_member", lua_swim_add_member},
+ {"remove_member", lua_swim_remove_member},
+ {"delete", lua_swim_delete},
+ {"info", lua_swim_info},
+ {NULL, NULL}
+ };
+ luaL_register_module(L, "swim", lua_swim_methods);
+ lua_pop(L, 1);
+ int rc = luaL_cdef(L, "struct swim;");
+ assert(rc == 0);
+ (void) rc;
+ CTID_STRUCT_SWIM_PTR = luaL_ctypeid(L, "struct swim *");
+ assert(CTID_STRUCT_SWIM_PTR != 0);
+};
diff --git a/src/lua/swim.h b/src/lua/swim.h
new file mode 100644
index 000000000..989cf62b3
--- /dev/null
+++ b/src/lua/swim.h
@@ -0,0 +1,47 @@
+#ifndef INCLUDES_TARANTOOL_LUA_SWIM_H
+#define INCLUDES_TARANTOOL_LUA_SWIM_H
+/*
+ * Copyright 2010-2015, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct lua_State;
+
+void
+tarantool_lua_swim_init(struct lua_State *L);
+
+#if defined(__cplusplus)
+}
+#endif
+
+#endif /* INCLUDES_TARANTOOL_LUA_SWIM_H */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 16739f75d..cfaf93fb9 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -206,3 +206,6 @@ target_link_libraries(checkpoint_schedule.test m unit)
add_executable(sio.test sio.c)
target_link_libraries(sio.test unit core)
+
+add_executable(swim.test swim.c swim_test_transport.c)
+target_link_libraries(swim.test unit core swim)
diff --git a/test/unit/swim.c b/test/unit/swim.c
new file mode 100644
index 000000000..df5d4d0d9
--- /dev/null
+++ b/test/unit/swim.c
@@ -0,0 +1,34 @@
+#include "memory.h"
+#include "fiber.h"
+#include "unit.h"
+#include "swim/swim_transport.h"
+
+static int
+main_f(va_list ap)
+{
+ (void) ap;
+ return 0;
+}
+
+int
+main()
+{
+ header();
+ plan(1);
+ ok(true, "true is true");
+
+ memory_init();
+ fiber_init(fiber_c_invoke);
+
+ struct fiber *main_fiber = fiber_new("main", main_f);
+ assert(main_fiber != NULL);
+ fiber_wakeup(main_fiber);
+ ev_run(loop(), 0);
+
+ fiber_free();
+ memory_free();
+
+ int rc = check_plan();
+ footer();
+ return rc;
+}
\ No newline at end of file
diff --git a/test/unit/swim_test_transport.c b/test/unit/swim_test_transport.c
new file mode 100644
index 000000000..d1cfc9098
--- /dev/null
+++ b/test/unit/swim_test_transport.c
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "swim/swim_transport.h"
+
+ssize_t
+swim_transport_send(struct swim_transport *transport, const void *data,
+ size_t size, const struct sockaddr *addr,
+ socklen_t addr_size)
+{
+ (void) transport;
+ (void) data;
+ (void) size;
+ (void) addr;
+ (void) addr_size;
+ return 0;
+}
+
+ssize_t
+swim_transport_recv(struct swim_transport *transport, void *buffer, size_t size,
+ struct sockaddr *addr, socklen_t *addr_size)
+{
+ (void) transport;
+ (void) buffer;
+ (void) size;
+ (void) addr;
+ (void) addr_size;
+ return 0;
+}
+
+int
+swim_transport_bind(struct swim_transport *transport,
+ const struct sockaddr *addr, socklen_t addr_len)
+{
+ (void) transport;
+ (void) addr;
+ (void) addr_len;
+ return 0;
+}
+
+void
+swim_transport_destroy(struct swim_transport *transport)
+{
+ (void) transport;
+}
+
+void
+swim_transport_create(struct swim_transport *transport)
+{
+ (void) transport;
+}
\ No newline at end of file
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component
2019-01-30 21:28 ` [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
@ 2019-02-21 18:35 ` Konstantin Osipov
2019-02-26 18:28 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 23+ messages in thread
From: Konstantin Osipov @ 2019-02-21 18:35 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [19/01/31 10:28]:
> +/**
> + * Helper to do not call tt_static_buf() in all places where it is
> + * wanted to get string UUID.
> + */
> +static inline const char *
> +swim_uuid_str(const struct tt_uuid *uuid)
> +{
> + char *buf = tt_static_buf();
> + tt_uuid_to_string(uuid, buf);
> + return buf;
> +}
I thought swim should not depend on src/. How does this work?
By the same token, it should not depend on diag_* or otherwise
diag* should be moved to src/lib as a separate library.
> +#define mh_name _swim_table
> +struct mh_swim_table_key {
> + uint32_t hash;
> + const struct tt_uuid *uuid;
> +};
> +#define mh_key_t struct mh_swim_table_key
> +#define mh_node_t struct swim_member *
> +#define mh_arg_t void *
> +#define mh_hash(a, arg) ((*a)->hash)
> +#define mh_hash_key(a, arg) (a.hash)
> +#define mh_cmp(a, b, arg) (tt_uuid_compare(&(*a)->uuid, &(*b)->uuid))
> +#define mh_cmp_key(a, b, arg) (tt_uuid_compare(a.uuid, &(*b)->uuid))
> +#define MH_SOURCE 1
> +#include "salad/mhash.h"
It's OK to depend on salad.
Shall we move tt_static_buf() implementation to small/ or some
other lib/ library to make sure swim does not depend on src?
> +static inline struct swim *
> +swim_by_scheduler(struct swim_scheduler *scheduler)
By strange convention we call such functions simply after the name
of the class, this was here even before I joined :)
In other words:
static inline struct swim *
swim(struct swim_scheduler *scheduler);
> + */
> +static void
> +swim_member_delete(struct swim *swim, struct swim_member *member)
It should be subject-verb-object, so swim_delete_member().
> +static struct swim_member *
> +swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
> + const struct tt_uuid *uuid, enum swim_member_status status)
swim_create_member() or swim_add_member().
> +{
> + struct swim_member *member =
> + (struct swim_member *) calloc(1, sizeof(*member));
> + if (member == NULL) {
> + diag_set(OutOfMemory, sizeof(*member), "calloc", "member");
> + return NULL;
> + }
> + member->status = status;
> + member->addr = *addr;
> + member->uuid = *uuid;
> + member->hash = swim_uuid_hash(uuid);
> + mh_int_t rc = mh_swim_table_put(swim->members,
> + (const struct swim_member **) &member,
> + NULL, NULL);
This part is swim_member_new(), if you want to have
swim_member_new().
> + if (rc == mh_end(swim->members)) {
> + free(member);
free(member) is swim_member_delete().
> + diag_set(OutOfMemory, sizeof(mh_int_t), "malloc", "node");
> + return NULL;
> + }
> + rlist_add_entry(&swim->queue_round, member, in_queue_round);
> +
> + say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
> + return member;
This is swim_add_member().
> +static struct swim_member **
> +swim_shuffle_members(struct swim *swim)
> +{
> + struct mh_swim_table_t *members = swim->members;
> + struct swim_member **shuffled;
> + int bsize = sizeof(shuffled[0]) * mh_size(members);
> + shuffled = (struct swim_member **) malloc(bsize);
> + if (shuffled == NULL) {
> + diag_set(OutOfMemory, bsize, "malloc", "shuffled");
> + return NULL;
> + }
Shan't we ensure shuffle never fails? Why do you make a copy of
the shuffled members each time, isn't it easier to allocate a
shuffled array once?
> +/**
> + * Encode anti-entropy header and random members data as many as
> + * possible to the end of the packet.
> + * @retval 0 Not error, but nothing is encoded.
> + * @retval 1 Something is encoded.
> + */
> +static int
> +swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
Please try to avoid flipping the meaning of the return value. 0 on
success -1 on error.
If you return the number of encoded messages, and the function
never fails, please use unsigned, not int.
> + * Encode source UUID.
> + * @retval 0 Not error, but nothing is encoded.
> + * @retval 1 Something is encoded.
> + */
> +static inline int
> +swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet)
> +{
> + struct swim_src_uuid_bin uuid_bin;
> + char *pos = swim_packet_alloc(packet, sizeof(uuid_bin));
> + if (pos == NULL)
> + return 0;
Looks like you ignore the error because swim algorithm does not
depend on error. I believe the decision to abort the current round on
error should be made in the algorithm body, not inside nested
functions. The functions should faithfully report problems up.
> +void
> +swim_info(struct swim *swim, struct info_handler *info)
> +{
> + info_begin(info);
> + for (mh_int_t node = mh_first(swim->members),
> + end = mh_end(swim->members); node != end;
> + node = mh_next(swim->members, node)) {
> + struct swim_member *m =
> + *mh_swim_table_node(swim->members, node);
> + info_table_begin(info,
> + sio_strfaddr((struct sockaddr *) &m->addr,
> + sizeof(m->addr)));
> + info_append_str(info, "status",
> + swim_member_status_strs[m->status]);
> + info_append_str(info, "uuid", swim_uuid_str(&m->uuid));
> + info_table_end(info);
> + }
> + info_end(info);
> +}
Ouch, swim_info() ties swim together with info.*.
All this together calls for leaving the core swim features in
src/lib/swim and having a wrapper in src/ which adds diag* and
info*
> +static void
> +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events);
I once again encourage you to consider merging scheduler and IO
components. Let's look at your test harness to see how this is
feasible.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 23+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component
2019-02-21 18:35 ` [tarantool-patches] " Konstantin Osipov
@ 2019-02-26 18:28 ` Vladislav Shpilevoy
0 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-02-26 18:28 UTC (permalink / raw)
To: tarantool-patches, Konstantin Osipov; +Cc: vdavydov.dev
Thanks for the review.
>> +static inline struct swim *
>> +swim_by_scheduler(struct swim_scheduler *scheduler)
>
> By strange convention we call such functions simply after the name
> of the class, this was here even before I joined :)
>
> In other words:
>
> static inline struct swim *
> swim(struct swim_scheduler *scheduler);
I failed to find any examples. On the contrary, I found
that usually container_of is used without a wrapper in
similar cases:
- memtx_engine.cc:1183
- memtx_hash.c:167, 187
- memtx_tree.c:386, 407
- relay.cc:440, 688, 701
- vinyl.c:2433, 2440, 2459, 2945
- ... many other vinyl files ...
- cbus.c:443, 490, 505, 557, 575, 588
- say.c:314
So I do not think that such policy works for that case.
>
>> + */
>> +static void
>> +swim_member_delete(struct swim *swim, struct swim_member *member)
>
> It should be subject-verb-object, so swim_delete_member().
It is. 'swim_member' is subject, 'delete' is verb, 'object' is
omitted. Now I refactored all the code, that swim_member has
only two methods:
swim_member_new, swim_member_delete
All other methods are owned by swim:
swim_new_member, swim_delete_member, swim_wait_ack,
swim_update_member_status, swim_update_member_addr,
swim_update_member, swim_on_member_status_update,
swim_on_member_payload_update etc ...
It really looks better now.
>
>> +static struct swim_member *
>> +swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
>> + const struct tt_uuid *uuid, enum swim_member_status status)
>
> swim_create_member() or swim_add_member().
>
swim_add_member is a public API, which you approved by the way.
I will use swim_new_member. 'Create' does not fit here, because
this function does not take a member to initialize as a parameter.
>
>> +{
>> + struct swim_member *member =
>> + (struct swim_member *) calloc(1, sizeof(*member));
>> + if (member == NULL) {
>> + diag_set(OutOfMemory, sizeof(*member), "calloc", "member");
>> + return NULL;
>> + }
>> + member->status = status;
>> + member->addr = *addr;
>> + member->uuid = *uuid;
>> + member->hash = swim_uuid_hash(uuid);
>> + mh_int_t rc = mh_swim_table_put(swim->members,
>> + (const struct swim_member **) &member,
>> + NULL, NULL);
>
> This part is swim_member_new(), if you want to have
> swim_member_new().>
>> + if (rc == mh_end(swim->members)) {
>> + free(member);
>
> free(member) is swim_member_delete().
Ok, done. I do not paste diff here because it is too big.
>> +static struct swim_member **
>> +swim_shuffle_members(struct swim *swim)
>> +{
>> + struct mh_swim_table_t *members = swim->members;
>> + struct swim_member **shuffled;
>> + int bsize = sizeof(shuffled[0]) * mh_size(members);
>> + shuffled = (struct swim_member **) malloc(bsize);
>> + if (shuffled == NULL) {
>> + diag_set(OutOfMemory, bsize, "malloc", "shuffled");
>> + return NULL;
>> + }
>
> Shan't we ensure shuffle never fails? Why do you make a copy of
> the shuffled members each time, isn't it easier to allocate a
> shuffled array once?
If you are talking about why I do not cache the memory, then
because 1) a fail does not break swim, 2) it complicates code.
If you mean why do I shuffle the array after each round again -
it is because what randomness is supposed to do here by the
protocol definition. If I shuffle that only once, then some
members can stand in the same positions on different cluster
members and have not even network load during their whole
lifetime. This is why the original SWIM describes eternal
random selection instead of a single shuffle per lifetime.
>
>> +/**
>> + * Encode anti-entropy header and random members data as many as
>> + * possible to the end of the packet.
>> + * @retval 0 Not error, but nothing is encoded.
>> + * @retval 1 Something is encoded.
>> + */
>> +static int
>> +swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
>
> Please try to avoid flipping the meaning of the return value. 0 on
> success -1 on error.
It is not an error. And that function can not fail.
>
> If you return the number of encoded messages,
I do not return the number of encoded messages. What I
return stated in the comment.
> and the function
> never fails, please use unsigned, not int.
We never use unsigned without strict necessity. We even have
this issue: https://github.com/tarantool/tarantool/issues/2161.
So I leave signed.
>
>> + * Encode source UUID.
>> + * @retval 0 Not error, but nothing is encoded.
>> + * @retval 1 Something is encoded.
>> + */
>> +static inline int
>> +swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet)
>> +{
>> + struct swim_src_uuid_bin uuid_bin;
>> + char *pos = swim_packet_alloc(packet, sizeof(uuid_bin));
>> + if (pos == NULL)
>> + return 0;
>
> Looks like you ignore the error because swim algorithm does not
> depend on error. I believe the decision to abort the current round on
> error should be made in the algorithm body, not inside nested
> functions. The functions should faithfully report problems up.
It is not an error. Read the code more attentive. This code
fills the packet with data until it is full. If something
did not fit - it is not an error. The packet is sent anyway.
In all other places, where a real error occurs, it lifts up
to a user, or to a libev callback, where it is reported.
>
>> +void
>> +swim_info(struct swim *swim, struct info_handler *info)
>> +{
>> + info_begin(info);
>> + for (mh_int_t node = mh_first(swim->members),
>> + end = mh_end(swim->members); node != end;
>> + node = mh_next(swim->members, node)) {
>> + struct swim_member *m =
>> + *mh_swim_table_node(swim->members, node);
>> + info_table_begin(info,
>> + sio_strfaddr((struct sockaddr *) &m->addr,
>> + sizeof(m->addr)));
>> + info_append_str(info, "status",
>> + swim_member_status_strs[m->status]);
>> + info_append_str(info, "uuid", swim_uuid_str(&m->uuid));
>> + info_table_end(info);
>> + }
>> + info_end(info);
>> +}
>
> Ouch, swim_info() ties swim together with info.*.
'Info' is moved into src/lib in a separate patchset.
>
> All this together calls for leaving the core swim features in
> src/lib/swim and having a wrapper in src/ which adds diag* and
> info*
>
>> +static void
>> +swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events);
>
> I once again encourage you to consider merging scheduler and IO
> components. Let's look at your test harness to see how this is
> feasible.
As discussed verbally, it still makes no sense to merge them.
>
>
> --
> Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
> http://tarantool.io - www.twitter.com/kostja_osipov
>
Despite the fixes, the commit is not ready. I still work on
tests.
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 05/12] [RAW] swim: introduce failure detection component
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (6 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 04/12] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 06/12] [RAW] swim: introduce dissemination component Vladislav Shpilevoy
` (3 subsequent siblings)
11 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
Failure detection components allows to find which members are
already dead.
Part of #3234
---
src/lib/swim/swim.c | 405 ++++++++++++++++++++++++++++++++++++--
src/lib/swim/swim.h | 11 +-
src/lib/swim/swim_io.c | 21 ++
src/lib/swim/swim_io.h | 9 +
src/lib/swim/swim_proto.c | 82 +++++++-
src/lib/swim/swim_proto.h | 101 +++++++++-
src/lua/swim.c | 34 +++-
7 files changed, 641 insertions(+), 22 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index adf01cfad..a862f52a4 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -131,6 +131,31 @@ enum {
* value.
*/
HEARTBEAT_RATE_DEFAULT = 1,
+ /**
+ * If a ping was sent, it is considered to be lost after
+ * this time without an ack. Nothing special in this
+ * value.
+ */
+ ACK_TIMEOUT_DEFAULT = 30,
+ /**
+ * If a member has not been responding to pings this
+ * number of times, it is considered to be dead.
+ * According to the SWIM paper, for a member it is enough
+ * to do not respond on one direct ping, and on K
+ * simultanous indirect pings, to be considered as dead.
+ * Seems too little, so here it is bigger.
+ */
+ NO_ACKS_TO_DEAD = 3,
+ /**
+ * If a member confirmed to be dead, it is removed from
+ * the membership after at least this number of
+ * unacknowledged pings. According to the SWIM paper, a
+ * dead member is deleted immediately. But here it is held
+ * for a while to 1) maybe refute its dead status, 2)
+ * disseminate the status via dissemination and
+ * anti-entropy components.
+ */
+ NO_ACKS_TO_GC = 2,
};
/**
@@ -193,6 +218,31 @@ struct swim_member {
* Position in a queue of members in the current round.
*/
struct rlist in_queue_round;
+ /**
+ *
+ * Failure detection component
+ */
+ /** Growing number to refute old messages. */
+ uint64_t incarnation;
+ /**
+ * How many pings did not receive an ack in a row being in
+ * the current status. After a threshold the instance is
+ * marked as dead. After more it is removed from the
+ * table. On each status or incarnation change this
+ * counter is reset.
+ */
+ int unacknowledged_pings;
+ /**
+ * When the latest ping is considered to be
+ * unacknowledged.
+ */
+ double ping_deadline;
+ /** Ready at hand regular ACK task. */
+ struct swim_task ack_task;
+ /** Ready at hand regular PING task. */
+ struct swim_task ping_task;
+ /** Position in a queue of members waiting for an ack. */
+ struct rlist in_queue_wait_ack;
};
#define mh_name _swim_table
@@ -250,8 +300,77 @@ struct swim {
* ones.
*/
struct swim_scheduler scheduler;
+ /**
+ *
+ * Failure detection component
+ */
+ /**
+ * Members waiting for an ACK. On too long absence of an
+ * ACK a member is considered to be dead and is removed.
+ * The list is sorted by deadline in ascending order (tail
+ * is newer, head is older).
+ */
+ struct rlist queue_wait_ack;
+ /** Generator of ack checking events. */
+ struct ev_periodic wait_ack_tick;
};
+/** Put the member into a list of ACK waiters. */
+static void
+swim_member_wait_ack(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_wait_ack)) {
+ member->ping_deadline = fiber_time() +
+ swim->wait_ack_tick.interval;
+ rlist_add_tail_entry(&swim->queue_wait_ack, member,
+ in_queue_wait_ack);
+ }
+}
+
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_status_is_updated(struct swim_member *member)
+{
+ member->unacknowledged_pings = 0;
+}
+
+/**
+ * Update status and incarnation of the member if needed. Statuses
+ * are compared as a compound key: {incarnation, status}. So @a
+ * new_status can override an old one only if its incarnation is
+ * greater, or the same, but its status is "bigger". Statuses are
+ * compared by their identifier, so "alive" < "dead". This
+ * protects from the case when a member is detected as dead on one
+ * instance, but overriden by another instance with the same
+ * incarnation "alive" message.
+ */
+static inline void
+swim_member_update_status(struct swim_member *member,
+ enum swim_member_status new_status,
+ uint64_t incarnation, struct swim *swim)
+{
+ (void) swim;
+ /*
+ * Source of truth about self is this instance and it is
+ * never updated from remote. Refutation is handled
+ * separately.
+ */
+ assert(member != swim->self);
+ if (member->incarnation == incarnation) {
+ if (member->status < new_status) {
+ member->status = new_status;
+ swim_member_status_is_updated(member);
+ }
+ } else if (member->incarnation < incarnation) {
+ member->status = new_status;
+ member->incarnation = incarnation;
+ swim_member_status_is_updated(member);
+ }
+}
+
/**
* A helper to get a pointer to a SWIM instance having only a
* pointer to it scheduler. It is used by task complete functions.
@@ -276,6 +395,11 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
mh_swim_table_del(swim->members, rc, NULL);
rlist_del_entry(member, in_queue_round);
+ /* Failure detection component. */
+ rlist_del_entry(member, in_queue_wait_ack);
+ swim_task_destroy(&member->ack_task);
+ swim_task_destroy(&member->ping_task);
+
free(member);
}
@@ -290,13 +414,34 @@ swim_find_member(struct swim *swim, const struct tt_uuid *uuid)
return *mh_swim_table_node(swim->members, node);
}
+/**
+ * Once a ping is sent, the member should start waiting for an
+ * ACK.
+ */
+static void
+swim_ping_task_complete(struct swim_task *task,
+ struct swim_scheduler *scheduler, int rc)
+{
+ /*
+ * If ping send has failed, it makes to sense to wait for
+ * an ACK.
+ */
+ if (rc != 0)
+ return;
+ struct swim *swim = swim_by_scheduler(scheduler);
+ struct swim_member *m = container_of(task, struct swim_member,
+ ping_task);
+ swim_member_wait_ack(swim, m);
+}
+
/**
* Register a new member with a specified status. Here it is
* added to the hash, to the 'next' queue.
*/
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
- const struct tt_uuid *uuid, enum swim_member_status status)
+ const struct tt_uuid *uuid, enum swim_member_status status,
+ uint64_t incarnation)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -318,6 +463,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
}
rlist_add_entry(&swim->queue_round, member, in_queue_round);
+ /* Failure detection component. */
+ member->incarnation = incarnation;
+ rlist_create(&member->in_queue_wait_ack);
+ swim_task_create(&member->ack_task, NULL, NULL);
+ swim_task_create(&member->ping_task, swim_ping_task_complete, NULL);
+
say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
return member;
}
@@ -402,7 +553,7 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
break;
size = new_size;
swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
- m->status);
+ m->status, m->incarnation);
memcpy(pos, &member_bin, sizeof(member_bin));
/*
* First random member could be choosen too close
@@ -439,6 +590,26 @@ swim_encode_src_uuid(struct swim *swim, struct swim_packet *packet)
return 1;
}
+/**
+ * Encode failure detection component.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
+ enum swim_fd_msg_type type)
+{
+ struct swim_fd_header_bin fd_header_bin;
+ int size = sizeof(fd_header_bin);
+ char *pos = swim_packet_alloc(packet, size);
+ if (pos == NULL)
+ return 0;
+ swim_fd_header_bin_create(&fd_header_bin, type,
+ swim->self->incarnation);
+ memcpy(pos, &fd_header_bin, size);
+ return 1;
+}
+
/** Encode SWIM components into a UDP packet. */
static void
swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -447,9 +618,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
char *header = swim_packet_alloc(packet, 1);
int map_size = 0;
map_size += swim_encode_src_uuid(swim, packet);
+ map_size += swim_encode_failure_detection(swim, packet,
+ SWIM_FD_MSG_PING);
map_size += swim_encode_anti_entropy(swim, packet);
- assert(mp_sizeof_map(map_size) == 1 && map_size == 2);
+ assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
mp_encode_map(header, map_size);
}
@@ -494,8 +667,89 @@ swim_round_step_complete(struct swim_task *task,
(void) task;
struct swim *swim = swim_by_scheduler(scheduler);
ev_periodic_start(loop(), &swim->round_tick);
- rlist_shift_entry(&swim->queue_round, struct swim_member,
- in_queue_round);
+ struct swim_member *m =
+ rlist_shift_entry(&swim->queue_round, struct swim_member,
+ in_queue_round);
+ if (rc == 0) {
+ /*
+ * Each round message contains failure detection
+ * section with a ping.
+ */
+ swim_member_wait_ack(swim, m);
+ }
+}
+
+/** Schedule send of a failure detection message. */
+static void
+swim_send_fd_request(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst, enum swim_fd_msg_type type)
+{
+ /*
+ * Reset packet allocator in case if task is being reused.
+ */
+ swim_packet_create(&task->packet);
+ char *header = swim_packet_alloc(&task->packet, 1);
+ int map_size = swim_encode_src_uuid(swim, &task->packet);
+ map_size += swim_encode_failure_detection(swim, &task->packet, type);
+ assert(map_size == 2);
+ mp_encode_map(header, map_size);
+ say_verbose("SWIM: send %s to %s", swim_fd_msg_type_strs[type],
+ sio_strfaddr((struct sockaddr *) dst, sizeof(*dst)));
+ swim_task_send(task, dst, &swim->scheduler);
+}
+
+/** Schedule send of an ack. */
+static inline void
+swim_send_ack(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst)
+{
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_ACK);
+}
+
+/** Schedule send of a ping. */
+static inline void
+swim_send_ping(struct swim *swim, struct swim_task *task,
+ const struct sockaddr_in *dst)
+{
+ swim_send_fd_request(swim, task, dst, SWIM_FD_MSG_PING);
+}
+
+/**
+ * Check for unacknowledged pings. A ping is unacknowledged if an
+ * ack was not received during ack timeout. An unacknowledged ping
+ * is resent here.
+ */
+static void
+swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
+{
+ assert((events & EV_PERIODIC) != 0);
+ (void) loop;
+ (void) events;
+ struct swim *swim = (struct swim *) p->data;
+ struct swim_member *m, *tmp;
+ double current_time = fiber_time();
+ rlist_foreach_entry_safe(m, &swim->queue_wait_ack, in_queue_wait_ack,
+ tmp) {
+ if (current_time < m->ping_deadline)
+ break;
+ ++m->unacknowledged_pings;
+ switch (m->status) {
+ case MEMBER_ALIVE:
+ if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
+ m->status = MEMBER_DEAD;
+ swim_member_status_is_updated(m);
+ }
+ break;
+ case MEMBER_DEAD:
+ if (m->unacknowledged_pings >= NO_ACKS_TO_GC)
+ swim_member_delete(swim, m);
+ break;
+ default:
+ unreachable();
+ }
+ swim_send_ping(swim, &m->ping_task, &m->addr);
+ rlist_del_entry(m, in_queue_wait_ack);
+ }
}
/**
@@ -536,7 +790,11 @@ static inline void
swim_member_update_addr(struct swim_member *member,
const struct sockaddr_in *addr)
{
- member->addr = *addr;
+ if (addr->sin_port != member->addr.sin_port ||
+ addr->sin_addr.s_addr != member->addr.sin_addr.s_addr) {
+ member->addr = *addr;
+ swim_member_status_is_updated(member);
+ }
}
/**
@@ -550,11 +808,50 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
{
struct swim_member *member = swim_find_member(swim, &def->uuid);
if (member == NULL) {
+ if (def->status == MEMBER_DEAD) {
+ /*
+ * Do not 'resurrect' dead members to
+ * prevent 'ghost' members. Ghost member
+ * is a one declared as dead, sent via
+ * anti-entropy, and removed from local
+ * members table, but then returned back
+ * from received anti-entropy, as again
+ * dead. Such dead members could 'live'
+ * forever.
+ */
+ return NULL;
+ }
member = swim_member_new(swim, &def->addr, &def->uuid,
- def->status);
+ def->status, def->incarnation);
return member;
}
- swim_member_update_addr(member, &def->addr);
+ struct swim_member *self = swim->self;
+ if (member != self) {
+ if (def->incarnation >= member->incarnation) {
+ swim_member_update_addr(member, &def->addr);
+ swim_member_update_status(member, def->status,
+ def->incarnation, swim);
+ }
+ return member;
+ }
+ /*
+ * It is possible that other instances know a bigger
+ * incarnation of this instance - such thing happens when
+ * the instance restarts and loses its local incarnation
+ * number. It will be restored by receiving dissemination
+ * and anti-entropy messages about self.
+ */
+ if (self->incarnation < def->incarnation)
+ self->incarnation = def->incarnation;
+ if (def->status != MEMBER_ALIVE &&
+ def->incarnation == self->incarnation) {
+ /*
+ * In the cluster a gossip exists that this
+ * instance is not alive. Refute this information
+ * with a bigger incarnation.
+ */
+ self->incarnation++;
+ }
return member;
}
@@ -581,12 +878,54 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
return 0;
}
+/**
+ * Decode a failure detection message. Schedule acks, process
+ * acks.
+ */
+static int
+swim_process_failure_detection(struct swim *swim, const char **pos,
+ const char *end, const struct sockaddr_in *src,
+ const struct tt_uuid *uuid)
+{
+ const char *msg_pref = "invalid failure detection message:";
+ struct swim_failure_detection_def def;
+ struct swim_member_def mdef;
+ if (swim_failure_detection_def_decode(&def, pos, end, msg_pref) != 0)
+ return -1;
+ swim_member_def_create(&mdef);
+ memset(&mdef, 0, sizeof(mdef));
+ mdef.addr = *src;
+ mdef.incarnation = def.incarnation;
+ mdef.uuid = *uuid;
+ struct swim_member *member = swim_update_member(swim, &mdef);
+ if (member == NULL)
+ return -1;
+
+ switch (def.type) {
+ case SWIM_FD_MSG_PING:
+ swim_send_ack(swim, &member->ack_task, &member->addr);
+ break;
+ case SWIM_FD_MSG_ACK:
+ if (def.incarnation >= member->incarnation) {
+ /*
+ * Pings are reset above, in
+ * swim_update_member().
+ */
+ assert(member->unacknowledged_pings == 0);
+ rlist_del_entry(member, in_queue_wait_ack);
+ }
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
const char *end, const struct sockaddr_in *src)
{
- (void) src;
const char *msg_pref = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
@@ -617,6 +956,12 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
if (swim_process_anti_entropy(swim, &pos, end) != 0)
goto error;
break;
+ case SWIM_FAILURE_DETECTION:
+ say_verbose("SWIM: process failure detection");
+ if (swim_process_failure_detection(swim, &pos, end,
+ src, &uuid) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", msg_pref);
goto error;
@@ -649,6 +994,12 @@ swim_new(void)
swim_task_create(&swim->round_step_task, swim_round_step_complete,
NULL);
swim_scheduler_create(&swim->scheduler, swim_on_input);
+
+ /* Failure detection component. */
+ rlist_create(&swim->queue_wait_ack);
+ ev_init(&swim->wait_ack_tick, swim_check_acks);
+ ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL);
+ swim->wait_ack_tick.data = (void *) swim;
return swim;
}
@@ -674,12 +1025,12 @@ swim_uri_to_addr(const char *uri, struct sockaddr_in *addr,
int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
- const struct tt_uuid *uuid)
+ double ack_timeout, const struct tt_uuid *uuid)
{
const char *msg_pref = "swim.cfg:";
- if (heartbeat_rate < 0) {
- diag_set(IllegalParams, "%s heartbeat_rate should be a "\
- "positive number", msg_pref);
+ if (heartbeat_rate < 0 || ack_timeout < 0) {
+ diag_set(IllegalParams, "%s heartbeat_rate and ack_timeout "\
+ "should be positive numbers", msg_pref);
return -1;
}
struct sockaddr_in addr;
@@ -692,7 +1043,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
"a first config", msg_pref);
return -1;
}
- swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE);
+ swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE,
+ 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -730,7 +1082,11 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
+ if (swim->wait_ack_tick.interval != ack_timeout && ack_timeout > 0)
+ ev_periodic_set(&swim->wait_ack_tick, 0, ack_timeout, NULL);
+
ev_periodic_start(loop(), &swim->round_tick);
+ ev_periodic_start(loop(), &swim->wait_ack_tick);
if (! is_first_cfg) {
swim_member_update_addr(swim->self, &addr);
@@ -766,7 +1122,7 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
- member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE);
+ member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0);
return member == NULL ? -1 : 0;
}
diag_set(SwimError, "%s a member with such UUID already exists",
@@ -791,6 +1147,23 @@ swim_remove_member(struct swim *swim, const struct tt_uuid *uuid)
return 0;
}
+int
+swim_probe_member(struct swim *swim, const char *uri)
+{
+ const char *msg_pref = "swim.probe_member:";
+ if (swim_check_is_configured(swim, msg_pref) != 0)
+ return -1;
+ struct sockaddr_in addr;
+ if (swim_uri_to_addr(uri, &addr, msg_pref) != 0)
+ return -1;
+ struct swim_task *t = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb);
+ if (t == NULL)
+ return -1;
+ swim_send_ping(swim, t, &addr);
+ return 0;
+}
+
void
swim_info(struct swim *swim, struct info_handler *info)
{
@@ -806,6 +1179,7 @@ swim_info(struct swim *swim, struct info_handler *info)
info_append_str(info, "status",
swim_member_status_strs[m->status]);
info_append_str(info, "uuid", swim_uuid_str(&m->uuid));
+ info_append_int(info, "incarnation", (int64_t) m->incarnation);
info_table_end(info);
}
info_end(info);
@@ -816,6 +1190,7 @@ swim_delete(struct swim *swim)
{
swim_scheduler_destroy(&swim->scheduler);
ev_periodic_stop(loop(), &swim->round_tick);
+ ev_periodic_stop(loop(), &swim->wait_ack_tick);
swim_task_destroy(&swim->round_step_task);
mh_int_t node = mh_first(swim->members);
while (node != mh_end(swim->members)) {
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index a98decc86..9d21a739d 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -55,6 +55,8 @@ swim_new(void);
* @heartbeat_rate seconds. It is rather the protocol
* speed. Protocol period depends on member count and
* @heartbeat_rate.
+ * @param ack_timeout Time in seconds after which a ping is
+ * considered to be unacknowledged.
* @param uuid UUID of this instance. Must be unique over the
* cluster.
*
@@ -63,7 +65,7 @@ swim_new(void);
*/
int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
- const struct tt_uuid *uuid);
+ double ack_timeout, const struct tt_uuid *uuid);
/**
* Stop listening and broadcasting messages, cleanup all internal
@@ -80,6 +82,13 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid);
int
swim_remove_member(struct swim *swim, const struct tt_uuid *uuid);
+/**
+ * Send a ping to this address. If an ACK is received, the member
+ * will be added.
+ */
+int
+swim_probe_member(struct swim *swim, const char *uri);
+
/** Dump member statuses into @a info. */
void
swim_info(struct swim *swim, struct info_handler *info);
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 20973acaf..170d7af77 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -65,6 +65,27 @@ swim_task_create(struct swim_task *task, swim_task_f complete,
rlist_create(&task->in_queue_output);
}
+struct swim_task *
+swim_task_new(swim_task_f complete, swim_task_f cancel)
+{
+ struct swim_task *task = (struct swim_task *) malloc(sizeof(*task));
+ if (task == NULL) {
+ diag_set(OutOfMemory, sizeof(*task), "malloc", "task");
+ return NULL;
+ }
+ swim_task_create(task, complete, cancel);
+ return task;
+}
+
+void
+swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
+ int rc)
+{
+ (void) rc;
+ (void) scheduler;
+ free(task);
+}
+
/** Put the task into the queue of output tasks. */
static inline void
swim_task_schedule(struct swim_task *task, struct swim_scheduler *scheduler)
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 68fb89818..508d1ef6e 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -215,6 +215,15 @@ void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel);
+/** Allocate and create a new task. */
+struct swim_task *
+swim_task_new(swim_task_f complete, swim_task_f cancel);
+
+/** Callback to delete a task after its completion. */
+void
+swim_task_delete_cb(struct swim_task *task, struct swim_scheduler *scheduler,
+ int rc);
+
/** Destroy the task, pop from the queue. */
static inline void
swim_task_destroy(struct swim_task *task)
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index a273cb815..542b988c1 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -36,6 +36,12 @@
const char *swim_member_status_strs[] = {
"alive",
+ "dead",
+};
+
+const char *swim_fd_msg_type_strs[] = {
+ "ping",
+ "ack",
};
int
@@ -178,6 +184,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member uuid") != 0)
return -1;
break;
+ case SWIM_MEMBER_INCARNATION:
+ if (swim_decode_uint(pos, end, &def->incarnation, msg_pref,
+ "member incarnation") != 0)
+ return -1;
+ break;
default:
unreachable();
}
@@ -225,6 +236,70 @@ swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
memcpy(header->v_uuid, uuid, UUID_LEN);
}
+void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation)
+{
+ header->k_header = SWIM_FAILURE_DETECTION;
+ header->m_header = 0x82;
+
+ header->k_type = SWIM_FD_MSG_TYPE;
+ header->v_type = type;
+
+ header->k_incarnation = SWIM_FD_INCARNATION;
+ header->m_incarnation = 0xcf;
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+int
+swim_failure_detection_def_decode(struct swim_failure_detection_def *def,
+ const char **pos, const char *end,
+ const char *msg_pref)
+{
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ memset(def, 0, sizeof(*def));
+ def->type = swim_fd_msg_type_MAX;
+ if (size != 2) {
+ diag_set(SwimError, "%s root map should have two keys - "\
+ "message type and incarnation", msg_pref);
+ return -1;
+ }
+ for (int i = 0; i < (int) size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
+ return -1;
+ switch(key) {
+ case SWIM_FD_MSG_TYPE:
+ if (swim_decode_uint(pos, end, &key, msg_pref,
+ "message type") != 0)
+ return -1;
+ if (key >= swim_fd_msg_type_MAX) {
+ diag_set(SwimError, "%s unknown message type",
+ msg_pref);
+ return -1;
+ }
+ def->type = key;
+ break;
+ case SWIM_FD_INCARNATION:
+ if (swim_decode_uint(pos, end, &def->incarnation,
+ msg_pref, "incarnation") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unexpected key", msg_pref);
+ return -1;
+ }
+ }
+ if (def->type == swim_fd_msg_type_MAX) {
+ diag_set(SwimError, "%s message type should be specified",
+ msg_pref);
+ return -1;
+ }
+ return 0;
+}
+
void
swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
uint16_t batch_size)
@@ -237,18 +312,19 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status)
+ enum swim_member_status status, uint64_t incarnation)
{
header->v_status = status;
header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
header->v_port = mp_bswap_u16(addr->sin_port);
memcpy(header->v_uuid, uuid, UUID_LEN);
+ header->v_incarnation = mp_bswap_u64(incarnation);
}
void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x84;
+ header->m_header = 0x85;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
@@ -257,6 +333,8 @@ swim_member_bin_create(struct swim_member_bin *header)
header->k_uuid = SWIM_MEMBER_UUID;
header->m_uuid = 0xc4;
header->m_uuid_len = UUID_LEN;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
}
void
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 6e36f0b07..91a0bca9d 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -51,12 +51,20 @@
* | |
* | AND |
* | |
+ * | SWIM_FAILURE_DETECTION: { |
+ * | SWIM_FD_MSG_TYPE: uint, enum swim_fd_msg_type, |
+ * | SWIM_FD_INCARNATION: uint |
+ * | }, |
+ * | |
+ * | OR/AND |
+ * | |
* | SWIM_ANTI_ENTROPY: [ |
* | { |
* | SWIM_MEMBER_STATUS: uint, enum member_status, |
* | SWIM_MEMBER_ADDRESS: uint, ip, |
* | SWIM_MEMBER_PORT: uint, port, |
- * | SWIM_MEMBER_UUID: 16 byte UUID |
+ * | SWIM_MEMBER_UUID: 16 byte UUID, |
+ * | SWIM_MEMBER_INCARNATION: uint |
* | }, |
* | ... |
* | ], |
@@ -67,6 +75,11 @@
enum swim_member_status {
/** The instance is ok, responds to requests. */
MEMBER_ALIVE = 0,
+ /**
+ * The member is considered to be dead. It will disappear
+ * from the membership, if it is not pinned.
+ */
+ MEMBER_DEAD,
swim_member_status_MAX,
};
@@ -79,6 +92,7 @@ extern const char *swim_member_status_strs[];
struct swim_member_def {
struct tt_uuid uuid;
struct sockaddr_in addr;
+ uint64_t incarnation;
enum swim_member_status status;
};
@@ -109,6 +123,7 @@ swim_member_def_decode(struct swim_member_def *def, const char **pos,
enum swim_body_key {
SWIM_SRC_UUID = 0,
SWIM_ANTI_ENTROPY,
+ SWIM_FAILURE_DETECTION,
};
/**
@@ -131,6 +146,79 @@ void
swim_src_uuid_bin_create(struct swim_src_uuid_bin *header,
const struct tt_uuid *uuid);
+/** {{{ Failure detection component */
+
+/** Failure detection component keys. */
+enum swim_fd_key {
+ /** Type of the failure detection message: ping or ack. */
+ SWIM_FD_MSG_TYPE,
+ /**
+ * Incarnation of the sender. To make the member alive if
+ * it was considered to be dead, but ping/ack with greater
+ * incarnation was received from it.
+ */
+ SWIM_FD_INCARNATION,
+};
+
+/** Failure detection message type. */
+enum swim_fd_msg_type {
+ SWIM_FD_MSG_PING,
+ SWIM_FD_MSG_ACK,
+ swim_fd_msg_type_MAX,
+};
+
+extern const char *swim_fd_msg_type_strs[];
+
+/** SWIM failure detection MessagePack header template. */
+struct PACKED swim_fd_header_bin {
+ /** mp_encode_uint(SWIM_FAILURE_DETECTION) */
+ uint8_t k_header;
+ /** mp_encode_map(2) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_FD_MSG_TYPE) */
+ uint8_t k_type;
+ /** mp_encode_uint(enum swim_fd_msg_type) */
+ uint8_t v_type;
+
+ /** mp_encode_uint(SWIM_FD_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+/** Initialize failure detection section. */
+void
+swim_fd_header_bin_create(struct swim_fd_header_bin *header,
+ enum swim_fd_msg_type type, uint64_t incarnation);
+
+/** A decoded failure detection message. */
+struct swim_failure_detection_def {
+ /** Type of the message. */
+ enum swim_fd_msg_type type;
+ /** Incarnation of the sender. */
+ uint64_t incarnation;
+};
+
+/**
+ * Decode failure detection from a MessagePack buffer.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos Start of the MessagePack buffer.
+ * @param end End of the MessagePack buffer.
+ * @param msg_pref A prefix of an error message to use for
+ * diag_set, when something is wrong.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+int
+swim_failure_detection_def_decode(struct swim_failure_detection_def *def,
+ const char **pos, const char *end,
+ const char *msg_pref);
+
+/** }}} Failure detection component */
+
/** {{{ Anti-entropy component */
/**
@@ -142,6 +230,7 @@ enum swim_member_key {
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
+ SWIM_MEMBER_INCARNATION,
swim_member_key_MAX,
};
@@ -164,7 +253,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
* anti-entropy section.
*/
struct PACKED swim_member_bin {
- /** mp_encode_map(4) */
+ /** mp_encode_map(5) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -190,6 +279,12 @@ struct PACKED swim_member_bin {
uint8_t m_uuid;
uint8_t m_uuid_len;
uint8_t v_uuid[UUID_LEN];
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
};
/** Initialize antri-entropy record. */
@@ -205,7 +300,7 @@ swim_member_bin_create(struct swim_member_bin *header);
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status);
+ enum swim_member_status status, uint64_t incarnation);
/** }}} Anti-entropy component */
diff --git a/src/lua/swim.c b/src/lua/swim.c
index 317f53e73..a20c2dc0d 100644
--- a/src/lua/swim.c
+++ b/src/lua/swim.c
@@ -192,8 +192,10 @@ lua_swim_cfg_impl(struct lua_State *L, int ncfg, struct swim *swim,
lua_swim_get_uuid_field(L, ncfg, "uuid", funcname, &uuid);
double heartbeat_rate =
lua_swim_get_timeout_field(L, ncfg, "heartbeat", funcname);
+ double ack_timeout =
+ lua_swim_get_timeout_field(L, ncfg, "ack_timeout", funcname);
- return swim_cfg(swim, server_uri, heartbeat_rate, &uuid);
+ return swim_cfg(swim, server_uri, heartbeat_rate, ack_timeout, &uuid);
}
/**
@@ -348,6 +350,35 @@ lua_swim_info(struct lua_State *L)
return 1;
}
+/**
+ * Send a ping to a URI assuming that there is a member, which
+ * will respond with an ack, and will be added to the local
+ * members table.The Lua stack should contain two values - a SWIM
+ * instance to probe by, and a URI of a member.
+ * @param L Lua state.
+ * @retval 1 True.
+ * @retval 2 Nil and an error object. On invalid Lua parameters
+ * and OOM it throws.
+ */
+static int
+lua_swim_probe_member(struct lua_State *L)
+{
+ struct swim *swim = lua_swim_ptr(L, 1);
+ if (lua_gettop(L) != 2 || swim == NULL)
+ return luaL_error(L, "Usage: swim:probe_member(uri)");
+ if (! lua_isstring(L, 2)) {
+ return luaL_error(L, "swim.probe_member: member URI should "\
+ "be a string");
+ }
+ if (swim_probe_member(swim, lua_tostring(L, 2)) != 0) {
+ lua_pushnil(L);
+ luaT_pusherror(L, diag_last_error(diag_get()));
+ return 2;
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
void
tarantool_lua_swim_init(struct lua_State *L)
{
@@ -358,6 +389,7 @@ tarantool_lua_swim_init(struct lua_State *L)
{"remove_member", lua_swim_remove_member},
{"delete", lua_swim_delete},
{"info", lua_swim_info},
+ {"probe_member", lua_swim_probe_member},
{NULL, NULL}
};
luaL_register_module(L, "swim", lua_swim_methods);
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 06/12] [RAW] swim: introduce dissemination component
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (7 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 05/12] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 07/12] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
` (2 subsequent siblings)
11 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
Dissemination components broadcasts events about member status
updates.
Part of #3234
---
src/lib/swim/swim.c | 223 ++++++++++++++++++++++++++++++++++++--
src/lib/swim/swim_proto.c | 58 ++++++++++
src/lib/swim/swim_proto.h | 108 ++++++++++++++++++
3 files changed, 377 insertions(+), 12 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index a862f52a4..353e55254 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -243,6 +243,42 @@ struct swim_member {
struct swim_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about member status update. So formally,
+ * this structure already has all the needed attributes.
+ * But also an event somehow should be sent to all members
+ * at least once according to SWIM, so it requires
+ * something like TTL for each type of event, which
+ * decrements on each send. And a member can not be
+ * removed from the global table until it gets dead and
+ * its status TTLs is 0, so as to allow other members
+ * learn its dead status.
+ */
+ int status_ttl;
+ /**
+ * Events are put into a queue sorted by event occurrence
+ * time.
+ */
+ struct rlist in_queue_events;
+ /**
+ * Old UUID is sent for a while after its update so as to
+ * allow other members to update this members's record
+ * in their tables.
+ */
+ struct tt_uuid old_uuid;
+ /**
+ * UUID is quite heavy structure, so an old UUID is sent
+ * only this number of times. A current UUID is sent
+ * always. Moreover, if someone wanted to reuse UUID,
+ * always sending old ones would make it much harder to
+ * detect which instance has just updated UUID, and which
+ * old UUID is handed over to another instance.
+ */
+ int old_uuid_ttl;
};
#define mh_name _swim_table
@@ -313,6 +349,12 @@ struct swim {
struct rlist queue_wait_ack;
/** Generator of ack checking events. */
struct ev_periodic wait_ack_tick;
+ /**
+ *
+ * Dissemination component
+ */
+ /** Queue of events sorted by occurrence time. */
+ struct rlist queue_events;
};
/** Put the member into a list of ACK waiters. */
@@ -327,14 +369,42 @@ swim_member_wait_ack(struct swim *swim, struct swim_member *member)
}
}
+/**
+ * On literally any update of a member it stands into a queue of
+ * events to disseminate the update. Note that status TTL is
+ * always set, even if UUID is updated, or any other attribute. It
+ * is because 1) it simplifies the code when status TTL is bigger
+ * than all other ones, 2) status occupies only 2 bytes in a
+ * packet, so it is never worse to send it on any update, but
+ * reduces entropy.
+ */
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_events)) {
+ rlist_add_tail_entry(&swim->queue_events, member,
+ in_queue_events);
+ }
+ member->status_ttl = mh_size(swim->members);
+}
+
/**
* Make all needed actions to process a member's update like a
* change of its status, or incarnation, or both.
*/
static void
-swim_member_status_is_updated(struct swim_member *member)
+swim_member_status_is_updated(struct swim_member *member, struct swim *swim)
{
member->unacknowledged_pings = 0;
+ swim_schedule_event(swim, member);
+}
+
+/** Make all needed actions to process member's UUID update. */
+static void
+swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim)
+{
+ member->old_uuid_ttl = mh_size(swim->members);
+ swim_schedule_event(swim, member);
}
/**
@@ -352,7 +422,6 @@ swim_member_update_status(struct swim_member *member,
enum swim_member_status new_status,
uint64_t incarnation, struct swim *swim)
{
- (void) swim;
/*
* Source of truth about self is this instance and it is
* never updated from remote. Refutation is handled
@@ -362,12 +431,12 @@ swim_member_update_status(struct swim_member *member,
if (member->incarnation == incarnation) {
if (member->status < new_status) {
member->status = new_status;
- swim_member_status_is_updated(member);
+ swim_member_status_is_updated(member, swim);
}
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
- swim_member_status_is_updated(member);
+ swim_member_status_is_updated(member, swim);
}
}
@@ -400,6 +469,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
swim_task_destroy(&member->ack_task);
swim_task_destroy(&member->ping_task);
+ /* Dissemination component. */
+ rlist_del_entry(member, in_queue_events);
+
free(member);
}
@@ -469,6 +541,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
swim_task_create(&member->ack_task, NULL, NULL);
swim_task_create(&member->ping_task, swim_ping_task_complete, NULL);
+ /* Dissemination component. */
+ rlist_create(&member->in_queue_events);
+ swim_member_status_is_updated(member, swim);
+
say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
return member;
}
@@ -610,6 +686,51 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
return 1;
}
+/**
+ * Encode dissemination component.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
+{
+ struct swim_diss_header_bin diss_header_bin;
+ int size = sizeof(diss_header_bin);
+ char *header = swim_packet_reserve(packet, size);
+ if (header == NULL)
+ return 0;
+ int i = 0;
+ struct swim_member *m;
+ struct swim_event_bin event_bin;
+ struct swim_old_uuid_bin old_uuid_bin;
+ swim_event_bin_create(&event_bin);
+ swim_old_uuid_bin_create(&old_uuid_bin);
+ rlist_foreach_entry(m, &swim->queue_events, in_queue_events) {
+ int new_size = size + sizeof(event_bin);
+ if (m->old_uuid_ttl > 0)
+ new_size += sizeof(old_uuid_bin);
+ char *pos = swim_packet_reserve(packet, new_size);
+ if (pos == NULL)
+ break;
+ size = new_size;
+ swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
+ m->incarnation, m->old_uuid_ttl);
+ memcpy(pos, &event_bin, sizeof(event_bin));
+ if (m->old_uuid_ttl > 0) {
+ pos += sizeof(event_bin);
+ swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid);
+ memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin));
+ }
+ ++i;
+ }
+ if (i == 0)
+ return 0;
+ swim_diss_header_bin_create(&diss_header_bin, i);
+ memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+ swim_packet_advance(packet, size);
+ return 1;
+}
+
/** Encode SWIM components into a UDP packet. */
static void
swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -620,12 +741,36 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
map_size += swim_encode_src_uuid(swim, packet);
map_size += swim_encode_failure_detection(swim, packet,
SWIM_FD_MSG_PING);
+ map_size += swim_encode_dissemination(swim, packet);
map_size += swim_encode_anti_entropy(swim, packet);
assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
mp_encode_map(header, map_size);
}
+/**
+ * Decrement TTLs of all events. It is done after each round step.
+ * Note, that when there are too many events to fit into a packet,
+ * the tail of events list without being disseminated start
+ * reeking and rotting, and the most far events can be deleted
+ * without ever being sent. But hardly this situation is reachable
+ * since even 1000 bytes can fit 37 events of ~27 bytes each, that
+ * means in fact a failure of 37 instances. In such a case rotting
+ * events are the most mild problem.
+ */
+static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+ struct swim_member *member, *tmp;
+ rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+ tmp) {
+ if (member->old_uuid_ttl > 0)
+ --member->old_uuid_ttl;
+ if (--member->status_ttl == 0)
+ rlist_del_entry(member, in_queue_events);
+ }
+}
+
/**
* Once per specified timeout trigger a next round step. In round
* step a next memeber is taken from the round queue and a round
@@ -676,6 +821,8 @@ swim_round_step_complete(struct swim_task *task,
* section with a ping.
*/
swim_member_wait_ack(swim, m);
+ /* As well as dissemination. */
+ swim_decrease_events_ttl(swim);
}
}
@@ -737,11 +884,12 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
case MEMBER_ALIVE:
if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
- swim_member_status_is_updated(m);
+ swim_member_status_is_updated(m, swim);
}
break;
case MEMBER_DEAD:
- if (m->unacknowledged_pings >= NO_ACKS_TO_GC)
+ if (m->unacknowledged_pings >= NO_ACKS_TO_GC &&
+ m->status_ttl == 0)
swim_member_delete(swim, m);
break;
default:
@@ -782,18 +930,20 @@ swim_member_update_uuid(struct swim_member *member,
struct mh_swim_table_key key = {member->hash, &old_uuid};
mh_swim_table_del(t, mh_swim_table_find(t, key, NULL), NULL);
member->hash = swim_uuid_hash(new_uuid);
+ member->old_uuid = old_uuid;
+ swim_member_uuid_is_updated(member, swim);
return 0;
}
/** Update member's address.*/
static inline void
swim_member_update_addr(struct swim_member *member,
- const struct sockaddr_in *addr)
+ const struct sockaddr_in *addr, struct swim *swim)
{
if (addr->sin_port != member->addr.sin_port ||
addr->sin_addr.s_addr != member->addr.sin_addr.s_addr) {
member->addr = *addr;
- swim_member_status_is_updated(member);
+ swim_member_status_is_updated(member, swim);
}
}
@@ -807,6 +957,9 @@ static struct swim_member *
swim_update_member(struct swim *swim, const struct swim_member_def *def)
{
struct swim_member *member = swim_find_member(swim, &def->uuid);
+ struct swim_member *old_member = NULL;
+ if (! tt_uuid_is_nil(&def->old_uuid))
+ old_member = swim_find_member(swim, &def->old_uuid);
if (member == NULL) {
if (def->status == MEMBER_DEAD) {
/*
@@ -821,19 +974,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
*/
return NULL;
}
- member = swim_member_new(swim, &def->addr, &def->uuid,
- def->status, def->incarnation);
+ if (old_member == NULL) {
+ member = swim_member_new(swim, &def->addr, &def->uuid,
+ def->status, def->incarnation);
+ } else if (swim_member_update_uuid(old_member, &def->uuid,
+ swim) == 0) {
+ member = old_member;
+ }
return member;
}
struct swim_member *self = swim->self;
if (member != self) {
if (def->incarnation >= member->incarnation) {
- swim_member_update_addr(member, &def->addr);
+ swim_member_update_addr(member, &def->addr, swim);
swim_member_update_status(member, def->status,
def->incarnation, swim);
+ if (old_member != NULL) {
+ assert(member != old_member);
+ swim_member_delete(swim, old_member);
+ }
}
return member;
}
+ uint64_t old_incarnation = self->incarnation;
/*
* It is possible that other instances know a bigger
* incarnation of this instance - such thing happens when
@@ -852,6 +1015,8 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
*/
self->incarnation++;
}
+ if (old_incarnation != self->incarnation)
+ swim_member_status_is_updated(self, swim);
return member;
}
@@ -920,6 +1085,31 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
}
return 0;
}
+/**
+ * Decode a dissemination message. Schedule new events, update
+ * members.
+ */
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "invald dissemination message:";
+ uint32_t size;
+ if (swim_decode_array(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ for (uint32_t i = 0; i < size; ++i) {
+ struct swim_member_def def;
+ if (swim_member_def_decode(&def, pos, end, msg_pref) != 0)
+ return -1;
+ if (swim_update_member(swim, &def) == NULL) {
+ /*
+ * Not a critical error - other updates
+ * still can be applied.
+ */
+ diag_log();
+ }
+ }
+ return 0;
+}
/** Process a new message. */
static void
@@ -962,6 +1152,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
src, &uuid) != 0)
goto error;
break;
+ case SWIM_DISSEMINATION:
+ say_verbose("SWIM: process dissemination");
+ if (swim_process_dissemination(swim, &pos, end) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", msg_pref);
goto error;
@@ -1000,6 +1195,10 @@ swim_new(void)
ev_init(&swim->wait_ack_tick, swim_check_acks);
ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL);
swim->wait_ack_tick.data = (void *) swim;
+
+ /* Dissemination component. */
+ rlist_create(&swim->queue_events);
+
return swim;
}
@@ -1089,7 +1288,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
ev_periodic_start(loop(), &swim->wait_ack_tick);
if (! is_first_cfg) {
- swim_member_update_addr(swim->self, &addr);
+ swim_member_update_addr(swim->self, &addr, swim);
int rc = swim_member_update_uuid(swim->self, uuid, swim);
/* Reserved above. */
assert(rc == 0);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 542b988c1..e31c67682 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -189,6 +189,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member incarnation") != 0)
return -1;
break;
+ case SWIM_MEMBER_OLD_UUID:
+ if (swim_decode_uuid(&def->old_uuid, pos, end, msg_pref,
+ "member old uuid") != 0)
+ return -1;
+ break;
default:
unreachable();
}
@@ -337,6 +342,59 @@ swim_member_bin_create(struct swim_member_bin *header)
header->m_incarnation = 0xcf;
}
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+ uint16_t batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xcd;
+ header->v_header = mp_bswap_u16(batch_size);
+}
+
+void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDRESS;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_uuid = SWIM_MEMBER_UUID;
+ header->m_uuid = 0xc4;
+ header->m_uuid_len = UUID_LEN;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
+}
+
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+ enum swim_member_status status,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ uint64_t incarnation, int old_uuid_ttl)
+{
+ header->m_header = 0x85 + (old_uuid_ttl > 0);
+ header->v_status = status;
+ header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(addr->sin_port);
+ memcpy(header->v_uuid, uuid, UUID_LEN);
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+void
+swim_old_uuid_bin_create(struct swim_old_uuid_bin *header)
+{
+ header->k_uuid = SWIM_MEMBER_OLD_UUID;
+ header->m_uuid = 0xc4;
+ header->m_uuid_len = UUID_LEN;
+}
+
+void
+swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
+ const struct tt_uuid *uuid)
+{
+ memcpy(header->v_uuid, uuid, UUID_LEN);
+}
+
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
const struct sockaddr_in *src)
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 91a0bca9d..a3dc1164e 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -58,6 +58,19 @@
* | |
* | OR/AND |
* | |
+ * | SWIM_DISSEMINATION: [ |
+ * | { |
+ * | SWIM_MEMBER_STATUS: uint, enum member_status, |
+ * | SWIM_MEMBER_ADDRESS: uint, ip, |
+ * | SWIM_MEMBER_PORT: uint, port, |
+ * | SWIM_MEMBER_UUID: 16 byte UUID, |
+ * | SWIM_MEMBER_INCARNATION: uint |
+ * | }, |
+ * | ... |
+ * | ], |
+ * | |
+ * | OR/AND |
+ * | |
* | SWIM_ANTI_ENTROPY: [ |
* | { |
* | SWIM_MEMBER_STATUS: uint, enum member_status, |
@@ -91,6 +104,7 @@ extern const char *swim_member_status_strs[];
*/
struct swim_member_def {
struct tt_uuid uuid;
+ struct tt_uuid old_uuid;
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
@@ -124,6 +138,7 @@ enum swim_body_key {
SWIM_SRC_UUID = 0,
SWIM_ANTI_ENTROPY,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/**
@@ -231,6 +246,7 @@ enum swim_member_key {
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
SWIM_MEMBER_INCARNATION,
+ SWIM_MEMBER_OLD_UUID,
swim_member_key_MAX,
};
@@ -304,6 +320,98 @@ swim_member_bin_fill(struct swim_member_bin *header,
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MessagePack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint16_t v_header;
+};
+
+/** Initialize dissemination header. */
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+ uint16_t batch_size);
+
+/** SWIM event MessagePack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(5 or 6) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_UUID) */
+ uint8_t k_uuid;
+ /** mp_encode_bin(UUID_LEN) */
+ uint8_t m_uuid;
+ uint8_t m_uuid_len;
+ uint8_t v_uuid[UUID_LEN];
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+/** Initialize dissemination record. */
+void
+swim_event_bin_create(struct swim_event_bin *header);
+
+/**
+ * Since usually there are many evnets, it is faster to reset a
+ * few fields in an existing template, then each time create a
+ * new template. So the usage pattern is create(), fill(),
+ * fill() ... .
+ */
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+ enum swim_member_status status,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ uint64_t incarnation, int old_uuid_ttl);
+
+/** Optional attribute of an event - old UUID of a member. */
+struct swim_old_uuid_bin {
+ /** mp_encode_uint(SWIM_MEMBER_OLD_UUID) */
+ uint8_t k_uuid;
+ /** mp_encode_bin(UUID_LEN) */
+ uint8_t m_uuid;
+ uint8_t m_uuid_len;
+ uint8_t v_uuid[UUID_LEN];
+};
+
+/** Initialize old UUID field. */
+void
+swim_old_uuid_bin_create(struct swim_old_uuid_bin *header);
+
+/**
+ * Set mutable fields of the field, by the same principle as event
+ * filling.
+ */
+void
+swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
+ const struct tt_uuid *uuid);
+
+/** }}} Dissemination component */
+
/** {{{ Meta component */
/**
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 07/12] [RAW] swim: keep encoded round message cached
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (8 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 06/12] [RAW] swim: introduce dissemination component Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 08/12] [RAW] swim: introduce payload Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 09/12] [RAW] swim: introduce routing Vladislav Shpilevoy
11 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
During a SWIM round a message is being handed out consisting of
at most 4 sections. Parts of the message change rarely, by
member attributes update and by removal of some of them. So it is
possible to cache the message and send it during several round
steps in a row. Or even do not rebuild it the whole round.
Part of #3234
---
src/lib/swim/swim.c | 21 +++++++++++++++++----
src/lib/swim/swim_io.h | 7 +++++++
2 files changed, 24 insertions(+), 4 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 353e55254..40faa296e 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -357,6 +357,13 @@ struct swim {
struct rlist queue_events;
};
+/** Reset cached round message on any change of any member. */
+static inline void
+cached_round_msg_invalidate(struct swim *swim)
+{
+ swim_packet_create(&swim->round_step_task.packet);
+}
+
/** Put the member into a list of ACK waiters. */
static void
swim_member_wait_ack(struct swim *swim, struct swim_member *member)
@@ -386,6 +393,7 @@ swim_schedule_event(struct swim *swim, struct swim_member *member)
in_queue_events);
}
member->status_ttl = mh_size(swim->members);
+ cached_round_msg_invalidate(swim);
}
/**
@@ -458,6 +466,7 @@ static void
swim_member_delete(struct swim *swim, struct swim_member *member)
{
say_verbose("SWIM: member %s is deleted", swim_uuid_str(&member->uuid));
+ cached_round_msg_invalidate(swim);
struct mh_swim_table_key key = {member->hash, &member->uuid};
mh_int_t rc = mh_swim_table_find(swim->members, key, NULL);
assert(rc != mh_end(swim->members));
@@ -733,8 +742,11 @@ swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
/** Encode SWIM components into a UDP packet. */
static void
-swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
+swim_encode_round_msg(struct swim *swim)
{
+ if (swim_packet_body_size(&swim->round_step_task.packet) > 0)
+ return;
+ struct swim_packet *packet = &swim->round_step_task.packet;
swim_packet_create(packet);
char *header = swim_packet_alloc(packet, 1);
int map_size = 0;
@@ -766,8 +778,10 @@ swim_decrease_events_ttl(struct swim *swim)
tmp) {
if (member->old_uuid_ttl > 0)
--member->old_uuid_ttl;
- if (--member->status_ttl == 0)
+ if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
+ cached_round_msg_invalidate(swim);
+ }
}
}
@@ -791,8 +805,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
*/
if (rlist_empty(&swim->queue_round))
return;
-
- swim_encode_round_msg(swim, &swim->round_step_task.packet);
+ swim_encode_round_msg(swim);
struct swim_member *m =
rlist_first_entry(&swim->queue_round, struct swim_member,
in_queue_round);
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 508d1ef6e..4d694857d 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -125,6 +125,13 @@ swim_packet_alloc(struct swim_packet *packet, int size)
return res;
}
+/** Size of the packet body. Meta is not counted. */
+static inline int
+swim_packet_body_size(const struct swim_packet *packet)
+{
+ return packet->pos - packet->body;
+}
+
/** Initialize @a packet, reserve some space for meta. */
void
swim_packet_create(struct swim_packet *packet);
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 08/12] [RAW] swim: introduce payload
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (9 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 07/12] [RAW] swim: keep encoded round message cached Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
2019-01-30 21:28 ` [PATCH v4 09/12] [RAW] swim: introduce routing Vladislav Shpilevoy
11 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
Payload is an arbitrary user data, disseminated just like other
member attributes.
Part of #3234
---
src/lib/swim/swim.c | 102 +++++++++++++++++++++++++++++++++++---
src/lib/swim/swim.h | 4 ++
src/lib/swim/swim_proto.c | 50 +++++++++++++++----
src/lib/swim/swim_proto.h | 27 ++++++++--
4 files changed, 163 insertions(+), 20 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 40faa296e..78dbc6092 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -259,6 +259,16 @@ struct swim_member {
* learn its dead status.
*/
int status_ttl;
+ /** Arbitrary user data, disseminated on each change. */
+ char *payload;
+ /** Payload size, in bytes. */
+ int payload_size;
+ /**
+ * TTL of payload. At most this number of times payload is
+ * sent as a part of dissemination component. Reset on
+ * each update.
+ */
+ int payload_ttl;
/**
* Events are put into a queue sorted by event occurrence
* time.
@@ -415,6 +425,14 @@ swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim)
swim_schedule_event(swim, member);
}
+/** Make all needed actions to process member's payload update. */
+static void
+swim_member_payload_is_updated(struct swim_member *member, struct swim *swim)
+{
+ member->payload_ttl = mh_size(swim->members);
+ swim_schedule_event(swim, member);
+}
+
/**
* Update status and incarnation of the member if needed. Statuses
* are compared as a compound key: {incarnation, status}. So @a
@@ -458,6 +476,31 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
return container_of(scheduler, struct swim, scheduler);
}
+/**
+ * Update members payload if necessary. If a payload is the same -
+ * nothing happens. Fortunately, memcmp here is not expensive,
+ * because 1) payload change is extra rare event usually,
+ * 2) max payload size is very limited.
+ */
+static inline int
+swim_member_update_payload(struct swim_member *member, const char *payload,
+ int payload_size, struct swim *swim)
+{
+ if (payload_size == member->payload_size &&
+ memcmp(payload, member->payload, payload_size) == 0)
+ return 0;
+ char *new_payload = (char *) realloc(member->payload, payload_size);
+ if (new_payload == NULL) {
+ diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+ return -1;
+ }
+ memcpy(new_payload, payload, payload_size);
+ member->payload = new_payload;
+ member->payload_size = payload_size;
+ swim_member_payload_is_updated(member, swim);
+ return 0;
+}
+
/**
* Remove the member from all queues, hashes, destroy it and free
* the memory.
@@ -480,6 +523,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
/* Dissemination component. */
rlist_del_entry(member, in_queue_events);
+ free(member->payload);
free(member);
}
@@ -522,7 +566,7 @@ swim_ping_task_complete(struct swim_task *task,
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
const struct tt_uuid *uuid, enum swim_member_status status,
- uint64_t incarnation)
+ uint64_t incarnation, const char *payload, int payload_size)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -553,6 +597,11 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
/* Dissemination component. */
rlist_create(&member->in_queue_events);
swim_member_status_is_updated(member, swim);
+ if (swim_member_update_payload(member, payload, payload_size,
+ swim) != 0) {
+ swim_member_delete(swim, member);
+ return NULL;
+ }
say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
return member;
@@ -632,14 +681,17 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
i < member_count; ++i) {
struct swim_member *m = *mh_swim_table_node(t, rc);
- int new_size = size + sizeof(member_bin);
+ int new_size = size + sizeof(member_bin) + m->payload_size;
char *pos = swim_packet_reserve(packet, new_size);
if (pos == NULL)
break;
size = new_size;
swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
- m->status, m->incarnation);
+ m->status, m->incarnation,
+ m->payload_size);
memcpy(pos, &member_bin, sizeof(member_bin));
+ pos += sizeof(member_bin);
+ memcpy(pos, m->payload, m->payload_size);
/*
* First random member could be choosen too close
* to the hash end. Here the cycle is wrapped, if
@@ -718,17 +770,27 @@ swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
int new_size = size + sizeof(event_bin);
if (m->old_uuid_ttl > 0)
new_size += sizeof(old_uuid_bin);
+ if (m->payload_ttl > 0) {
+ new_size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) +
+ mp_sizeof_bin(m->payload_size);
+ }
char *pos = swim_packet_reserve(packet, new_size);
if (pos == NULL)
break;
size = new_size;
swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
- m->incarnation, m->old_uuid_ttl);
+ m->incarnation, m->old_uuid_ttl,
+ m->payload_ttl);
memcpy(pos, &event_bin, sizeof(event_bin));
+ pos += sizeof(event_bin);
if (m->old_uuid_ttl > 0) {
- pos += sizeof(event_bin);
swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid);
memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin));
+ pos += sizeof(old_uuid_bin);
+ }
+ if (m->payload_ttl > 0) {
+ pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD);
+ mp_encode_bin(pos, m->payload, m->payload_size);
}
++i;
}
@@ -778,6 +840,8 @@ swim_decrease_events_ttl(struct swim *swim)
tmp) {
if (member->old_uuid_ttl > 0)
--member->old_uuid_ttl;
+ if (member->payload_ttl > 0)
+ --member->payload_ttl;
if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
cached_round_msg_invalidate(swim);
@@ -989,7 +1053,9 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
}
if (old_member == NULL) {
member = swim_member_new(swim, &def->addr, &def->uuid,
- def->status, def->incarnation);
+ def->status, def->incarnation,
+ def->payload,
+ def->payload_size);
} else if (swim_member_update_uuid(old_member, &def->uuid,
swim) == 0) {
member = old_member;
@@ -1002,6 +1068,13 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
swim_member_update_addr(member, &def->addr, swim);
swim_member_update_status(member, def->status,
def->incarnation, swim);
+ if (def->is_payload_specified &&
+ swim_member_update_payload(member, def->payload,
+ def->payload_size,
+ swim) != 0) {
+ /* Not such a critical error. */
+ diag_log();
+ }
if (old_member != NULL) {
assert(member != old_member);
swim_member_delete(swim, old_member);
@@ -1256,7 +1329,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE,
- 0);
+ 0, NULL, 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1323,6 +1396,18 @@ swim_check_is_configured(const struct swim *swim, const char *msg_pref)
return -1;
}
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size)
+{
+ if (payload_size > MAX_PAYLOAD_SIZE) {
+ diag_set(IllegalParams, "Payload should be <= %d",
+ MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ return swim_member_update_payload(swim->self, payload, payload_size,
+ swim);
+}
+
int
swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
{
@@ -1334,7 +1419,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
- member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0);
+ member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0,
+ NULL, 0);
return member == NULL ? -1 : 0;
}
diag_set(SwimError, "%s a member with such UUID already exists",
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 9d21a739d..dced172c0 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -67,6 +67,10 @@ int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
double ack_timeout, const struct tt_uuid *uuid);
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size);
+
/**
* Stop listening and broadcasting messages, cleanup all internal
* structures, free memory.
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index e31c67682..284d35695 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -115,21 +115,36 @@ swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end,
return 0;
}
-int
-swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
- const char *msg_pref, const char *param_name)
+static inline int
+swim_decode_bin(const char **bin, uint32_t *size, const char **pos,
+ const char *end, const char *msg_pref, const char *param_name)
{
if (mp_typeof(**pos) != MP_BIN || mp_check_binl(*pos, end) > 0) {
diag_set(SwimError, "%s %s should be bin", msg_pref,
param_name);
return -1;
}
- if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) {
+ *bin = mp_decode_bin(pos, size);
+ if (*pos > end) {
+ diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
+ return -1;
+ }
+ return 0;
+}
+
+int
+swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
+ const char *msg_pref, const char *param_name)
+{
+ uint32_t size;
+ const char *bin;
+ if (swim_decode_bin(&bin, &size, pos, end, msg_pref, param_name) != 0)
+ return -1;
+ if (size != UUID_LEN) {
diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
return -1;
}
- memcpy(uuid, *pos, UUID_LEN);
- *pos += UUID_LEN;
+ memcpy(uuid, bin, UUID_LEN);
return 0;
}
@@ -157,6 +172,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
struct swim_member_def *def)
{
uint64_t tmp;
+ uint32_t len;
switch (key) {
case SWIM_MEMBER_STATUS:
if (swim_decode_uint(pos, end, &tmp, msg_pref,
@@ -194,6 +210,18 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member old uuid") != 0)
return -1;
break;
+ case SWIM_MEMBER_PAYLOAD:
+ if (swim_decode_bin(&def->payload, &len, pos, end, msg_pref,
+ "member payload") != 0)
+ return -1;
+ if (len > MAX_PAYLOAD_SIZE) {
+ diag_set(SwimError, "%s member payload size should be "\
+ "<= %d", msg_pref, MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ def->payload_size = (int) len;
+ def->is_payload_specified = true;
+ break;
default:
unreachable();
}
@@ -317,13 +345,15 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation)
+ enum swim_member_status status, uint64_t incarnation,
+ uint16_t payload_size)
{
header->v_status = status;
header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
header->v_port = mp_bswap_u16(addr->sin_port);
memcpy(header->v_uuid, uuid, UUID_LEN);
header->v_incarnation = mp_bswap_u64(incarnation);
+ header->v_payload_size = mp_bswap_u16(payload_size);
}
void
@@ -340,6 +370,8 @@ swim_member_bin_create(struct swim_member_bin *header)
header->m_uuid_len = UUID_LEN;
header->k_incarnation = SWIM_MEMBER_INCARNATION;
header->m_incarnation = 0xcf;
+ header->k_payload = SWIM_MEMBER_PAYLOAD;
+ header->m_payload_size = 0xc5;
}
void
@@ -370,9 +402,9 @@ void
swim_event_bin_fill(struct swim_event_bin *header,
enum swim_member_status status,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- uint64_t incarnation, int old_uuid_ttl)
+ uint64_t incarnation, int old_uuid_ttl, int payload_ttl)
{
- header->m_header = 0x85 + (old_uuid_ttl > 0);
+ header->m_header = 0x85 + (old_uuid_ttl > 0) + (payload_ttl > 0);
header->v_status = status;
header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
header->v_port = mp_bswap_u16(addr->sin_port);
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index a3dc1164e..353605c35 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -35,6 +35,11 @@
#include <arpa/inet.h>
#include <stdbool.h>
+enum {
+ /** Reserve 272 bytes for headers. */
+ MAX_PAYLOAD_SIZE = 1200,
+};
+
/**
* SWIM binary protocol structures and helpers. Below is a picture
* of a SWIM message template:
@@ -108,6 +113,13 @@ struct swim_member_def {
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
+ const char *payload;
+ int payload_size;
+ /**
+ * Zero payload size does not mean that payload is not
+ * specified. It can be just empty.
+ */
+ bool is_payload_specified;
};
/** Initialize the definition with default values. */
@@ -247,6 +259,7 @@ enum swim_member_key {
SWIM_MEMBER_UUID,
SWIM_MEMBER_INCARNATION,
SWIM_MEMBER_OLD_UUID,
+ SWIM_MEMBER_PAYLOAD,
swim_member_key_MAX,
};
@@ -301,6 +314,13 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(64bit incarnation) */
uint8_t m_incarnation;
uint64_t v_incarnation;
+
+ /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+ uint8_t k_payload;
+ /** mp_encode_bin(16bit bin header) */
+ uint8_t m_payload_size;
+ uint16_t v_payload_size;
+ /** Payload data ... */
};
/** Initialize antri-entropy record. */
@@ -316,7 +336,8 @@ swim_member_bin_create(struct swim_member_bin *header);
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation);
+ enum swim_member_status status, uint64_t incarnation,
+ uint16_t payload_size);
/** }}} Anti-entropy component */
@@ -338,7 +359,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
/** SWIM event MessagePack template. */
struct PACKED swim_event_bin {
- /** mp_encode_map(5 or 6) */
+ /** mp_encode_map(5, or 6, or 7) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -386,7 +407,7 @@ void
swim_event_bin_fill(struct swim_event_bin *header,
enum swim_member_status status,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- uint64_t incarnation, int old_uuid_ttl);
+ uint64_t incarnation, int old_uuid_ttl, int payload_ttl);
/** Optional attribute of an event - old UUID of a member. */
struct swim_old_uuid_bin {
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread
* [PATCH v4 09/12] [RAW] swim: introduce routing
2019-01-30 21:28 [PATCH v4 00/12] SWIM draft Vladislav Shpilevoy
` (10 preceding siblings ...)
2019-01-30 21:28 ` [PATCH v4 08/12] [RAW] swim: introduce payload Vladislav Shpilevoy
@ 2019-01-30 21:28 ` Vladislav Shpilevoy
11 siblings, 0 replies; 23+ messages in thread
From: Vladislav Shpilevoy @ 2019-01-30 21:28 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja, vdavydov.dev
It is going to be used for indirect ping/acks.
Part of #3234
---
src/lib/swim/swim.c | 4 +-
src/lib/swim/swim_io.c | 100 +++++++++++++++++++++++++++++++++++---
src/lib/swim/swim_io.h | 17 ++++++-
src/lib/swim/swim_proto.c | 92 ++++++++++++++++++++++++++++++++++-
src/lib/swim/swim_proto.h | 76 +++++++++++++++++++++++++++--
5 files changed, 274 insertions(+), 15 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 78dbc6092..5cec3789a 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -1200,8 +1200,10 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
/** Process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler, const char *pos,
- const char *end, const struct sockaddr_in *src)
+ const char *end, const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy)
{
+ (void) proxy;
const char *msg_pref = "invalid message:";
struct swim *swim = swim_by_scheduler(scheduler);
struct tt_uuid uuid;
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 170d7af77..a8fb1f588 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -54,6 +54,50 @@ swim_packet_create(struct swim_packet *packet)
swim_packet_alloc_meta(packet, sizeof(struct swim_meta_header_bin));
}
+/** Fill metadata prefix of a packet. */
+static inline void
+swim_packet_build_meta(struct swim_packet *packet,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *route_src,
+ const struct sockaddr_in *route_dst)
+{
+ char *meta = packet->meta;
+ char *end = packet->body;
+ /*
+ * Meta is already built, do nothing. It is used for
+ * packet forwarding, when route source != this instance.
+ */
+ if (meta == end)
+ return;
+ struct swim_meta_header_bin header;
+ struct swim_route_bin route;
+ assert(meta + sizeof(header) <= end);
+ swim_meta_header_bin_create(&header, src, route_dst != NULL);
+ memcpy(meta, &header, sizeof(header));
+ if (route_dst != NULL) {
+ meta += sizeof(header);
+ assert(meta + sizeof(route) <= end);
+ swim_route_bin_create(&route, route_src, route_dst);
+ memcpy(meta, &route, sizeof(route));
+ }
+ /* Now the meta is build and the body consumes it. */
+ packet->body = packet->meta;
+}
+
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy)
+{
+ /*
+ * Meta should be reserved before body encoding is
+ * started. Otherwise it would be necessary to move
+ * already encoded body, maybe losing its tail.
+ */
+ assert(swim_packet_body_size(&task->packet) == 0);
+ task->proxy = *proxy;
+ task->is_proxy_specified = true;
+ swim_packet_alloc_meta(&task->packet, sizeof(struct swim_route_bin));
+}
+
void
swim_task_create(struct swim_task *task, swim_task_f complete,
swim_task_f cancel)
@@ -178,16 +222,24 @@ swim_scheduler_on_output(struct ev_loop *loop, struct ev_io *io, int events)
struct swim_task *task =
rlist_shift_entry(&scheduler->queue_output, struct swim_task,
in_queue_output);
+ const struct sockaddr_in *src = &scheduler->transport.addr;
+ const struct sockaddr_in *dst, *proxy_dst, *proxy_src;
+ if (task->is_proxy_specified) {
+ dst = &task->proxy;
+ proxy_dst = dst;
+ proxy_src = src;
+ } else {
+ dst = &task->dst;
+ proxy_dst = NULL;
+ proxy_src = NULL;
+ }
+ swim_packet_build_meta(&task->packet, src, proxy_src, proxy_dst);
say_verbose("SWIM: send to %s",
- sio_strfaddr((struct sockaddr *) &task->dst,
- sizeof(task->dst)));
- struct swim_meta_header_bin header;
- swim_meta_header_bin_create(&header, &scheduler->transport.addr);
- memcpy(task->packet.meta, &header, sizeof(header));
+ sio_strfaddr((struct sockaddr *) dst, sizeof(*dst)));
int rc = swim_transport_send(&scheduler->transport, task->packet.buf,
task->packet.pos - task->packet.buf,
- (const struct sockaddr *) &task->dst,
- sizeof(task->dst));
+ (const struct sockaddr *) dst,
+ sizeof(*dst));
if (rc != 0)
diag_log();
if (task->complete != NULL)
@@ -216,9 +268,41 @@ swim_scheduler_on_input(struct ev_loop *loop, struct ev_io *io, int events)
sio_strfaddr((struct sockaddr *) &src, len));
struct swim_meta_def meta;
const char *pos = buf, *end = pos + size;
+ struct sockaddr_in *self = &scheduler->transport.addr;
if (swim_meta_def_decode(&meta, &pos, end) < 0)
goto error;
- scheduler->on_input(scheduler, pos, end, &meta.src);
+ /*
+ * Check if this instance is not a receiver and possibly
+ * forward the packet.
+ */
+ if (! meta.is_route_specified) {
+ scheduler->on_input(scheduler, pos, end, &meta.src, NULL);
+ } else if (meta.route.dst.sin_port == self->sin_port &&
+ meta.route.dst.sin_addr.s_addr == self->sin_addr.s_addr) {
+ scheduler->on_input(scheduler, pos, end, &meta.route.src,
+ &meta.src);
+ } else {
+ /* Forward the packet. */
+ struct swim_task *task = swim_task_new(swim_task_delete_cb,
+ swim_task_delete_cb);
+ if (task == NULL)
+ goto error;
+ swim_task_proxy(task, &meta.route.dst);
+ /*
+ * Meta should be rebuilt with the different
+ * source address - this instance. It is used by a
+ * receiver to send a reply through this instance
+ * again.
+ */
+ swim_packet_build_meta(&task->packet, self, &meta.route.src,
+ &meta.route.dst);
+ /* Copy the original body without a touch. */
+ size = end - pos;
+ char *body = swim_packet_alloc(&task->packet, size);
+ assert(body != NULL);
+ memcpy(body, pos, size);
+ swim_task_send(task, &meta.route.dst, scheduler);
+ }
return;
error:
diag_log();
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 4d694857d..0ba8972f0 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -138,7 +138,8 @@ swim_packet_create(struct swim_packet *packet);
typedef void (*swim_scheduler_on_input_f)(struct swim_scheduler *scheduler,
const char *buf, const char *end,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *proxy);
/** Planner and executor of input and output operations.*/
struct swim_scheduler {
@@ -206,10 +207,24 @@ struct swim_task {
struct swim_packet packet;
/** Destination address. */
struct sockaddr_in dst;
+ /**
+ * Optional proxy via which the destination should be
+ * reached.
+ */
+ struct sockaddr_in proxy;
+ /** True if a proxy is specified. */
+ bool is_proxy_specified;
/** Place in a queue of tasks. */
struct rlist in_queue_output;
};
+/**
+ * Set the proxy for the task. Before sending this proxy will be
+ * dumped into metadata.
+ */
+void
+swim_task_proxy(struct swim_task *task, const struct sockaddr_in *proxy);
+
/**
* Put the task into a queue of tasks. Eventually it will be sent.
*/
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 284d35695..8b1ed76a7 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -429,9 +429,9 @@ swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src)
+ const struct sockaddr_in *src, bool has_routing)
{
- header->m_header = 0x83;
+ header->m_header = 0x83 + has_routing;
header->k_version = SWIM_META_TARANTOOL_VERSION;
header->m_version = 0xce;
header->v_version = mp_bswap_u32(tarantool_version_id());
@@ -443,6 +443,69 @@ swim_meta_header_bin_create(struct swim_meta_header_bin *header,
header->v_port = mp_bswap_u16(src->sin_port);
}
+/**
+ * Decode meta routing section into meta definition object.
+ * @param[out] def Definition to decode into.
+ * @param[in][out] pos MessagePack buffer to decode.
+ * @param end End of the MessagePack buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+swim_meta_def_decode_route(struct swim_meta_def *def, const char **pos,
+ const char *end)
+{
+ const char *msg_pref = "invalid routing section:";
+ uint32_t size;
+ if (swim_decode_map(pos, end, &size, msg_pref, "route") != 0)
+ return -1;
+ for (uint32_t i = 0; i < size; ++i) {
+ uint64_t key;
+ if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
+ return -1;
+ switch (key) {
+ case SWIM_ROUTE_SRC_ADDRESS:
+ if (swim_decode_ip(&def->route.src, pos, end, msg_pref,
+ "source address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_SRC_PORT:
+ if (swim_decode_port(&def->route.src, pos, end,
+ msg_pref, "source port") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_ADDRESS:
+ if (swim_decode_ip(&def->route.dst, pos, end, msg_pref,
+ "destination address") != 0)
+ return -1;
+ break;
+ case SWIM_ROUTE_DST_PORT:
+ if (swim_decode_port(&def->route.dst, pos, end,
+ msg_pref, "destination port") != 0)
+ return -1;
+ break;
+ default:
+ diag_set(SwimError, "%s unknown key", msg_pref);
+ return -1;
+ }
+ }
+ if (def->route.src.sin_port == 0 ||
+ def->route.src.sin_addr.s_addr == 0) {
+ diag_set(SwimError, "%s source address should be specified",
+ msg_pref);
+ return -1;
+ }
+ if (def->route.dst.sin_port == 0 ||
+ def->route.dst.sin_addr.s_addr == 0) {
+ diag_set(SwimError, "%s destination address should be "\
+ "specified", msg_pref);
+ return -1;
+ }
+ def->is_route_specified = true;
+ return 0;
+}
+
int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end)
@@ -457,6 +520,10 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
if (swim_decode_uint(pos, end, &key, msg_pref, "a key") != 0)
return -1;
switch (key) {
+ case SWIM_META_ROUTING:
+ if (swim_meta_def_decode_route(def, pos, end) != 0)
+ return -1;
+ break;
case SWIM_META_TARANTOOL_VERSION:
if (swim_decode_uint(pos, end, &key, msg_pref,
"version") != 0)
@@ -493,3 +560,24 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
}
return 0;
}
+
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst)
+{
+ route->k_routing = SWIM_META_ROUTING;
+ route->m_routing = 0x84;
+ route->k_src_addr = SWIM_ROUTE_SRC_ADDRESS;
+ route->m_src_addr = 0xce;
+ route->v_src_addr = mp_bswap_u32(src->sin_addr.s_addr);
+ route->k_src_port = SWIM_ROUTE_SRC_PORT;
+ route->m_src_port = 0xcd;
+ route->v_src_port = mp_bswap_u16(src->sin_port);
+ route->k_dst_addr = SWIM_ROUTE_DST_ADDRESS;
+ route->m_dst_addr = 0xce;
+ route->v_dst_addr = mp_bswap_u32(dst->sin_addr.s_addr);
+ route->k_dst_port = SWIM_ROUTE_DST_PORT;
+ route->m_dst_port = 0xcd;
+ route->v_dst_port = mp_bswap_u16(dst->sin_port);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 353605c35..fe9eb85c5 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -48,7 +48,13 @@ enum {
* | { |
* | SWIM_META_TARANTOOL_VERSION: uint, Tarantool version ID,|
* | SWIM_META_SRC_ADDRESS: uint, ip, |
- * | SWIM_META_SRC_PORT: uint, port |
+ * | SWIM_META_SRC_PORT: uint, port, |
+ * | SWIM_META_ROUTING: { |
+ * | SWIM_ROUTE_SRC_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_SRC_PORT: uint, port, |
+ * | SWIM_ROUTE_DST_ADDRESS: uint, ip, |
+ * | SWIM_ROUTE_DST_PORT: uint, port |
+ * | } |
* | } |
* +-------------------Protocol logic section--------------------+
* | { |
@@ -458,6 +464,7 @@ enum swim_meta_key {
*/
SWIM_META_SRC_ADDRESS,
SWIM_META_SRC_PORT,
+ SWIM_META_ROUTING,
};
/**
@@ -469,7 +476,7 @@ enum swim_meta_key {
* separate MessagePack map.
*/
struct PACKED swim_meta_header_bin {
- /** mp_encode_map(3) */
+ /** mp_encode_map(3 or 4) */
uint8_t m_header;
/** mp_encode_uint(SWIM_META_TARANTOOL_VERSION) */
@@ -494,7 +501,7 @@ struct PACKED swim_meta_header_bin {
/** Initialize meta section. */
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
- const struct sockaddr_in *src);
+ const struct sockaddr_in *src, bool has_routing);
/** Meta definition. */
struct swim_meta_def {
@@ -502,6 +509,12 @@ struct swim_meta_def {
uint32_t version;
/** Source of the message. */
struct sockaddr_in src;
+ /** Route source and destination. */
+ bool is_route_specified;
+ struct {
+ struct sockaddr_in src;
+ struct sockaddr_in dst;
+ } route;
};
/**
@@ -517,6 +530,63 @@ int
swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
const char *end);
+enum swim_route_key {
+ /**
+ * True source of the packet. Can be different from the
+ * packet sender. It is expected that the answer should
+ * be sent back to this address. Maybe indirectly through
+ * the same proxy.
+ */
+ SWIM_ROUTE_SRC_ADDRESS = 0,
+ SWIM_ROUTE_SRC_PORT,
+ /**
+ * True destination of the packet. Can be different from
+ * this instance, receiver. If it is for another instance,
+ * then this packet is forwarded to the latter.
+ */
+ SWIM_ROUTE_DST_ADDRESS,
+ SWIM_ROUTE_DST_PORT,
+ swim_route_key_MAX,
+};
+
+/** Route section template. Describes source, destination. */
+struct PACKED swim_route_bin {
+ /** mp_encode_uint(SWIM_ROUTING) */
+ uint8_t k_routing;
+ /** mp_encode_map(4) */
+ uint8_t m_routing;
+
+ /** mp_encode_uint(SWIM_ROUTE_SRC_ADDRESS) */
+ uint8_t k_src_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_src_addr;
+ uint32_t v_src_addr;
+
+ /** mp_encode_uint(SWIM_ROUTE_SRC_PORT) */
+ uint8_t k_src_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_src_port;
+ uint16_t v_src_port;
+
+ /** mp_encode_uint(SWIM_ROUTE_DST_ADDRESS) */
+ uint8_t k_dst_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_dst_addr;
+ uint32_t v_dst_addr;
+
+ /** mp_encode_uint(SWIM_ROUTE_DST_PORT) */
+ uint8_t k_dst_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_dst_port;
+ uint16_t v_dst_port;
+};
+
+/** Initialize routing section. */
+void
+swim_route_bin_create(struct swim_route_bin *route,
+ const struct sockaddr_in *src,
+ const struct sockaddr_in *dst);
+
/** }}} Meta component */
/**
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 23+ messages in thread