[PATCH v3 3/6] [RAW] swim: introduce a dissemination component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Sat Dec 29 13:14:12 MSK 2018
Dissemination components broadcasts events about member status
updates.
Part of #3234
---
src/lib/swim/swim.c | 287 +++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 282 insertions(+), 5 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index c7bc11bca..4e7ffbc54 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -184,6 +184,27 @@ struct swim_member {
struct swim_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about member status update. So formally,
+ * this structure already has all the needed attributes.
+ * But also an event somehow should be sent to all members
+ * at least once according to SWIM, so it requires
+ * something like TTL for each type of event, which
+ * decrements on each send. And a member can not be
+ * removed from the global table until it gets dead and
+ * its status TTLs is 0, so as to allow other members
+ * learn its dead status.
+ */
+ int status_ttl;
+ /**
+ * Events are put into a queue sorted by event occurrence
+ * time.
+ */
+ struct rlist in_queue_events;
};
/**
@@ -248,6 +269,12 @@ struct swim {
struct rlist queue_wait_ack;
/** Generator of ack checking events. */
struct ev_periodic wait_ack_tick;
+ /**
+ *
+ * Dissemination component
+ */
+ /** Queue of events sorted by occurrence time. */
+ struct rlist queue_events;
};
static inline uint64_t
@@ -264,6 +291,7 @@ sockaddr_in_hash(const struct sockaddr_in *a)
enum swim_component_type {
SWIM_ANTI_ENTROPY = 0,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/** {{{ Failure detection component */
@@ -431,6 +459,88 @@ swim_member_bin_create(struct swim_member_bin *header)
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MsgPack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint32_t v_header;
+};
+
+static inline void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xdd;
+ header->v_header = mp_bswap_u32(batch_size);
+}
+
+/** SWIM event MsgPack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(4) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+static inline void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ header->m_header = 0x84;
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDRESS;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
+}
+
+static inline void
+swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
+{
+ header->v_status = member->status;
+ header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(member->addr.sin_port);
+ header->v_incarnation = mp_bswap_u64(member->incarnation);
+}
+
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_events)) {
+ rlist_add_tail_entry(&swim->queue_events, member,
+ in_queue_events);
+ }
+ member->status_ttl = mh_size(swim->members);
+}
+
+/** }}} Dissemination component */
+
/**
* SWIM message structure:
* {
@@ -441,6 +551,18 @@ swim_member_bin_create(struct swim_member_bin *header)
*
* OR/AND
*
+ * SWIM_DISSEMINATION: [
+ * {
+ * SWIM_MEMBER_STATUS: uint, enum member_status,
+ * SWIM_MEMBER_ADDRESS: uint, ip,
+ * SWIM_MEMBER_PORT: uint, port,
+ * SWIM_MEMBER_INCARNATION: uint
+ * },
+ * ...
+ * ],
+ *
+ * OR/AND
+ *
* SWIM_ANTI_ENTROPY: [
* {
* SWIM_MEMBER_STATUS: uint, enum member_status,
@@ -453,6 +575,16 @@ swim_member_bin_create(struct swim_member_bin *header)
* }
*/
+/**
+ * Make all needed actions to process a member's update like a
+ * change of its status, or incarnation, or both.
+ */
+static void
+swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_event(swim, member);
+}
+
/**
* Update status and incarnation of the member if needed. Statuses
* are compared as a compound key: {incarnation, status}. So @a
@@ -468,14 +600,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
enum swim_member_status new_status,
uint64_t incarnation)
{
- (void) swim;
assert(member != swim->self);
if (member->incarnation == incarnation) {
- if (member->status < new_status)
+ if (member->status < new_status) {
member->status = new_status;
+ swim_member_status_is_updated(swim, member);
+ }
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
+ swim_member_status_is_updated(swim, member);
}
}
@@ -497,6 +631,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
swim_task_destroy(&member->ack_task);
swim_task_destroy(&member->ping_task);
+ /* Dissemination component. */
+ assert(rlist_empty(&member->in_queue_events));
+
free(member);
}
@@ -533,6 +670,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
swim_task_create(&member->ack_task, &swim->scheduler, swim_task_reset);
swim_task_create(&member->ping_task, &swim->scheduler, swim_task_reset);
+ /* Dissemination component. */
+ rlist_create(&member->in_queue_events);
+ swim_member_status_is_updated(swim, member);
+
return member;
}
@@ -654,6 +795,60 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg,
return 1;
}
+/**
+ * Encode a part of the dissemination component into a single SWIM
+ * packet.
+ * @retval -1 Error.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos)
+{
+ struct swim_diss_header_bin diss_header_bin;
+ int size = sizeof(diss_header_bin);
+ struct swim_packet *packet = swim_msg_reserve(msg, size);
+ if (packet == NULL)
+ return -1;
+ char *header = swim_packet_alloc(packet, size);
+
+ int i = 0;
+ struct swim_member *member, *prev = NULL;
+ struct swim_event_bin event_bin;
+ swim_event_bin_create(&event_bin);
+ rlist_foreach_entry(member, *queue_pos, in_queue_events) {
+ char *pos = swim_packet_alloc(packet, sizeof(event_bin));
+ if (pos == NULL)
+ break;
+ swim_event_bin_reset(&event_bin, member);
+ memcpy(pos, &event_bin, sizeof(event_bin));
+ ++i;
+ prev = member;
+ }
+ if (i == 0)
+ return 0;
+ swim_diss_header_bin_create(&diss_header_bin, i);
+ memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+ swim_packet_flush(packet);
+ return 1;
+}
+
+/**
+ * Encode failure dissemination component.
+ * @retval -1 Error.
+ * @retval 1 Success, something is encoded.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
+{
+ struct rlist *pos;
+ rlist_foreach(pos, &swim->queue_events) {
+ if (swim_encode_dissemination_packet(msg, &pos) < 0)
+ return -1;
+ }
+ return ! rlist_empty(&swim->queue_events);
+}
+
/** Encode SWIM components into a sequence of UDP packets. */
static int
swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
@@ -670,6 +865,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
goto error;
map_size += rc;
+ rc = swim_encode_dissemination(swim, msg);
+ if (rc < 0)
+ goto error;
+ map_size += rc;
+
rc = swim_encode_anti_entropy(swim, msg);
if (rc < 0)
goto error;
@@ -685,6 +885,21 @@ error:
/** Once per specified timeout trigger a next broadcast step. */
static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+ struct swim_member *member, *tmp;
+ rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+ tmp) {
+ if (--member->status_ttl == 0)
+ rlist_del_entry(member, in_queue_events);
+ }
+}
+
+/**
+ * Do one round step. Send encoded components to a next member
+ * from the queue.
+ */
+static void
swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
{
assert((events & EV_PERIODIC) != 0);
@@ -712,6 +927,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
swim_task_schedule(&swim->round_step_task,
swim->transport->send_round_msg, &m->addr);
swim_member_schedule_ack_wait(swim, m);
+ swim_decrease_events_ttl(swim);
ev_periodic_stop(loop, p);
}
@@ -779,12 +995,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
break;
++m->failed_pings;
if (m->failed_pings >= NO_ACKS_TO_GC) {
- if (!m->is_pinned)
+ if (!m->is_pinned && m->status_ttl == 0)
swim_member_delete(swim, m);
continue;
}
- if (m->failed_pings >= NO_ACKS_TO_DEAD)
+ if (m->failed_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
+ swim_member_status_is_updated(swim, m);
+ }
swim_schedule_ping(swim, m);
rlist_del_entry(m, in_queue_wait_ack);
}
@@ -828,6 +1046,7 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
def->incarnation);
return;
}
+ uint64_t old_incarnation = self->incarnation;
/*
* It is possible that other instances know a bigger
* incarnation of this instance - such thing happens when
@@ -846,6 +1065,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
*/
self->incarnation++;
}
+ if (old_incarnation != self->incarnation)
+ swim_member_status_is_updated(swim, self);
}
static int
@@ -1032,6 +1253,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
return 0;
}
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "Invald SWIM dissemination message:";
+ if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
+ say_error("%s message should be an array", msg_pref);
+ return -1;
+ }
+ uint64_t size = mp_decode_array(pos);
+ for (uint64_t i = 0; i < size; ++i) {
+ if (mp_typeof(**pos) != MP_MAP ||
+ mp_check_map(*pos, end) > 0) {
+ say_error("%s event should be map", msg_pref);
+ return -1;
+ }
+ uint64_t map_size = mp_decode_map(pos);
+ struct swim_member_def def;
+ swim_member_def_create(&def);
+ for (uint64_t j = 0; j < map_size; ++j) {
+ if (mp_typeof(**pos) != MP_UINT ||
+ mp_check_uint(*pos, end) > 0) {
+ say_error("%s event key should be uint",
+ msg_pref);
+ return -1;
+ }
+ uint64_t key = mp_decode_uint(pos);
+ if (key >= swim_member_key_MAX) {
+ say_error("%s unknown event key", msg_pref);
+ return -1;
+ }
+ if (swim_process_member_key(key, pos, end, msg_pref,
+ &def) != 0)
+ return -1;
+ }
+ if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
+ say_error("%s member address should be specified",
+ msg_pref);
+ return -1;
+ }
+ swim_process_member_update(swim, &def);
+ }
+ return 0;
+}
+
/** Receive and process a new message. */
static void
swim_on_input(struct swim_scheduler *scheduler,
@@ -1065,6 +1330,11 @@ swim_on_input(struct swim_scheduler *scheduler,
src) != 0)
return;
break;
+ case SWIM_DISSEMINATION:
+ say_verbose("SWIM: process dissemination");
+ if (swim_process_dissemination(swim, &pos, end) != 0)
+ return;
+ break;
default:
say_error("%s unknown component type component is "\
"supported", msg_pref);
@@ -1142,6 +1412,10 @@ swim_new(void)
ev_init(&swim->wait_ack_tick, swim_check_acks);
ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
swim->wait_ack_tick.data = (void *) swim;
+
+ /* Dissemination events. */
+ rlist_create(&swim->queue_events);
+
return swim;
}
@@ -1199,8 +1473,10 @@ swim_remove_member(struct swim *swim, const char *uri)
if (uri_to_addr(uri, &addr) != 0)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
- if (member != NULL)
+ if (member != NULL) {
+ rlist_del_entry(member, in_queue_events);
swim_member_delete(swim, member);
+ }
return 0;
}
@@ -1236,6 +1512,7 @@ swim_delete(struct swim *swim)
while (node != mh_end(swim->members)) {
struct swim_member *m = (struct swim_member *)
mh_i64ptr_node(swim->members, node)->val;
+ rlist_del_entry(m, in_queue_events);
swim_member_delete(swim, m);
node = mh_first(swim->members);
}
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list