[PATCH v2 3/6] [RAW] swim: introduce a dissemination component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue Dec 25 22:19:26 MSK 2018
Dissemination components broadcasts events about member status
updates.
Part of #3234
---
src/lib/swim/swim.c | 270 +++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 265 insertions(+), 5 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 22bc06a60..15e079f11 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -377,6 +377,26 @@ struct swim_member {
struct swim_io_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about member status update. So formally,
+ * this structure already has all the needed attributes.
+ * But also an event somehow should be sent to all members
+ * at least once according to SWIM, so it requires
+ * something like TTL, which decrements on each send. And
+ * a member can not be removed from the global table until
+ * it gets dead and its dissemination TTL is 0, so as to
+ * allow other members learn its dead status.
+ */
+ int dissemination_ttl;
+ /**
+ * Events are put into a queue sorted by event occurrence
+ * time.
+ */
+ struct rlist in_queue_events;
};
/**
@@ -451,6 +471,8 @@ struct swim {
struct rlist queue_wait_ack;
/** Generator of ack checking events. */
struct ev_periodic wait_ack_tick;
+ /** Queue of events sorted by occurrence time. */
+ struct rlist queue_events;
};
static inline uint64_t
@@ -467,6 +489,7 @@ sockaddr_in_hash(const struct sockaddr_in *a)
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/** {{{ Failure detection component */
@@ -634,6 +657,88 @@ swim_member_bin_create(struct swim_member_bin *header)
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xdd;
+ header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(4) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDR) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ header->m_header = 0x84;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDR;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_events)) {
+ rlist_add_tail_entry(&swim->queue_events, member,
+ in_queue_events);
+ }
+ member->dissemination_ttl = mh_size(swim->members);
+}
+
+/** }}} Dissemination component */
+
/**
* SWIM message structure:
* {
@@ -644,6 +749,18 @@ swim_member_bin_create(struct swim_member_bin *header)
*
* OR/AND
*
+ * SWIM_DISSEMINATION: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDR: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
+ * },
+ * ...
+ * ],
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -663,6 +780,16 @@ swim_io_task_push(struct swim_io_task *task)
ev_io_start(loop(), &task->swim->output);
}
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_is_updated(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_event(swim, member);
+}
+
/**
* Update status of the member if needed. Statuses are compared as
* a compound key: {incarnation, status}. So @a new_status can
@@ -678,14 +805,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
enum swim_member_status new_status,
uint64_t incarnation)
{
- (void) swim;
assert(member != swim->self);
if (member->incarnation == incarnation) {
- if (member->status < new_status)
+ if (member->status < new_status) {
member->status = new_status;
+ swim_member_is_updated(swim, member);
+ }
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
+ swim_member_is_updated(swim, member);
}
}
@@ -725,6 +854,8 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
swim_io_task_create(&member->ping_task, swim_send_ping, swim);
rlist_add_entry(&swim->queue_round, member, in_queue_round);
rlist_create(&member->in_queue_wait_ack);
+ rlist_create(&member->in_queue_events);
+ swim_schedule_event(swim, member);
return member;
}
@@ -753,6 +884,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
swim_io_task_destroy(&member->ping_task);
rlist_del_entry(member, in_queue_round);
rlist_del_entry(member, in_queue_wait_ack);
+ assert(rlist_empty(&member->in_queue_events));
free(member);
}
@@ -872,6 +1004,62 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
return 1;
}
+static int
+swim_encode_dissemination_part(struct swim_msg *msg, struct rlist **queue_pos)
+{
+ struct swim_diss_header_bin diss_header_bin;
+ struct swim_msg_part *part =
+ swim_msg_reserve(msg, sizeof(diss_header_bin));
+ if (part == NULL)
+ return -1;
+ char *header = swim_msg_part_pos(part);
+ char *end = swim_msg_part_end(part);
+ char *pos = header + sizeof(diss_header_bin);
+
+ int i = 0;
+ struct swim_member *member, *prev = NULL;
+ struct swim_event_bin event_bin;
+ swim_event_bin_create(&event_bin);
+ rlist_foreach_entry(member, *queue_pos, in_queue_events) {
+ if (pos + sizeof(event_bin) > end)
+ break;
+ swim_event_bin_reset(&event_bin, member);
+ memcpy(pos, &event_bin, sizeof(event_bin));
+ pos += sizeof(event_bin);
+ ++i;
+ prev = member;
+ }
+ if (i == 0)
+ return 0;
+ swim_diss_header_bin_create(&diss_header_bin, i);
+ memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+ swim_msg_part_advance(part, pos - header);
+ if (prev == rlist_last_entry(*queue_pos, struct swim_member,
+ in_queue_events))
+ *queue_pos = NULL;
+ else
+ *queue_pos = rlist_next(&prev->in_queue_events);
+ return i;
+}
+
+/**
+ * Encode failure dissemination component.
+ * @retval Number of encoded events.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
+{
+ int count = 0;
+ struct rlist *pos = rlist_first(&swim->queue_events);
+ while (pos != rlist_last(&swim->queue_events)) {
+ int rc = swim_encode_dissemination_part(msg, &pos);
+ if (rc < 0)
+ return -1;
+ count += rc;
+ }
+ return count;
+}
+
/** Encode SWIM components into a sequence of UDP packets. */
static int
swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -889,6 +1077,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
goto error;
map_size += rc > 0;
+ rc = swim_encode_dissemination(swim, msg);
+ if (rc < 0)
+ goto error;
+ map_size += rc > 0;
+
rc = swim_encode_anti_entropy(swim, msg);
if (rc < 0)
goto error;
@@ -902,6 +1095,17 @@ error:
return -1;
}
+static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+ struct swim_member *member, *tmp;
+ rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+ tmp) {
+ if (--member->dissemination_ttl == 0)
+ rlist_del_entry(member, in_queue_events);
+ }
+}
+
/**
* Do one round step. Send encoded components to a next member
* from the queue.
@@ -943,6 +1147,7 @@ swim_send_round_msg(struct swim_io_task *task)
}
swim_msg_destroy(&msg);
swim_member_schedule_ack_wait(swim, m);
+ swim_decrease_events_ttl(swim);
rlist_del_entry(m, in_queue_round);
next_round_step:
ev_periodic_start(loop(), &swim->round_tick);
@@ -1040,12 +1245,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
break;
++m->failed_pings;
if (m->failed_pings >= NO_ACKS_TO_GC) {
- if (!m->is_pinned)
+ if (!m->is_pinned && m->dissemination_ttl == 0)
swim_member_delete(swim, m);
continue;
}
- if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ if (m->failed_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
+ swim_member_is_updated(swim, m);
+ }
swim_io_task_push(&m->ping_task);
rlist_del_entry(m, in_queue_wait_ack);
}
@@ -1296,6 +1503,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
return 0;
}
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "Invald SWIM dissemination message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s event should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s event key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown event key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(swim, &def);
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
@@ -1344,6 +1595,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events)
&addr) != 0)
return;
break;
+ case SWIM_DISSEMINATION:
+ say_verbose("SWIM: process dissemination");
+ if (swim_process_dissemination(swim, &pos, end) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -1422,6 +1678,7 @@ swim_new(void)
ev_init(&swim->wait_ack_tick, swim_check_acks);
ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
swim->wait_ack_tick.data = (void *) swim;
+ rlist_create(&swim->queue_events);
return swim;
}
@@ -1508,8 +1765,10 @@ swim_remove_member(struct swim *swim, const char *uri)
if (uri_to_addr(uri, &addr) != 0)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
- if (member != NULL)
+ if (member != NULL) {
+ rlist_del_entry(member, in_queue_events);
swim_member_delete(swim, member);
+ }
return 0;
}
@@ -1546,6 +1805,7 @@ swim_delete(struct swim *swim)
while (node != mh_end(swim->members)) {
struct swim_member *m = (struct swim_member *)
mh_i64ptr_node(swim->members, node)->val;
+ rlist_del_entry(m, in_queue_events);
swim_member_delete(swim, m);
node = mh_first(swim->members);
}
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list