[PATCH v4 06/12] [RAW] swim: introduce dissemination component
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Jan 31 00:28:35 MSK 2019
Dissemination components broadcasts events about member status
updates.
Part of #3234
---
src/lib/swim/swim.c | 223 ++++++++++++++++++++++++++++++++++++--
src/lib/swim/swim_proto.c | 58 ++++++++++
src/lib/swim/swim_proto.h | 108 ++++++++++++++++++
3 files changed, 377 insertions(+), 12 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index a862f52a4..353e55254 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -243,6 +243,42 @@ struct swim_member {
struct swim_task ping_task;
/** Position in a queue of members waiting for an ack. */
struct rlist in_queue_wait_ack;
+ /**
+ *
+ * Dissemination component
+ *
+ * Dissemination component sends events. Event is a
+ * notification about member status update. So formally,
+ * this structure already has all the needed attributes.
+ * But also an event somehow should be sent to all members
+ * at least once according to SWIM, so it requires
+ * something like TTL for each type of event, which
+ * decrements on each send. And a member can not be
+ * removed from the global table until it gets dead and
+ * its status TTLs is 0, so as to allow other members
+ * learn its dead status.
+ */
+ int status_ttl;
+ /**
+ * Events are put into a queue sorted by event occurrence
+ * time.
+ */
+ struct rlist in_queue_events;
+ /**
+ * Old UUID is sent for a while after its update so as to
+ * allow other members to update this members's record
+ * in their tables.
+ */
+ struct tt_uuid old_uuid;
+ /**
+ * UUID is quite heavy structure, so an old UUID is sent
+ * only this number of times. A current UUID is sent
+ * always. Moreover, if someone wanted to reuse UUID,
+ * always sending old ones would make it much harder to
+ * detect which instance has just updated UUID, and which
+ * old UUID is handed over to another instance.
+ */
+ int old_uuid_ttl;
};
#define mh_name _swim_table
@@ -313,6 +349,12 @@ struct swim {
struct rlist queue_wait_ack;
/** Generator of ack checking events. */
struct ev_periodic wait_ack_tick;
+ /**
+ *
+ * Dissemination component
+ */
+ /** Queue of events sorted by occurrence time. */
+ struct rlist queue_events;
};
/** Put the member into a list of ACK waiters. */
@@ -327,14 +369,42 @@ swim_member_wait_ack(struct swim *swim, struct swim_member *member)
}
}
+/**
+ * On literally any update of a member it stands into a queue of
+ * events to disseminate the update. Note that status TTL is
+ * always set, even if UUID is updated, or any other attribute. It
+ * is because 1) it simplifies the code when status TTL is bigger
+ * than all other ones, 2) status occupies only 2 bytes in a
+ * packet, so it is never worse to send it on any update, but
+ * reduces entropy.
+ */
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+ if (rlist_empty(&member->in_queue_events)) {
+ rlist_add_tail_entry(&swim->queue_events, member,
+ in_queue_events);
+ }
+ member->status_ttl = mh_size(swim->members);
+}
+
/**
* Make all needed actions to process a member's update like a
* change of its status, or incarnation, or both.
*/
static void
-swim_member_status_is_updated(struct swim_member *member)
+swim_member_status_is_updated(struct swim_member *member, struct swim *swim)
{
member->unacknowledged_pings = 0;
+ swim_schedule_event(swim, member);
+}
+
+/** Make all needed actions to process member's UUID update. */
+static void
+swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim)
+{
+ member->old_uuid_ttl = mh_size(swim->members);
+ swim_schedule_event(swim, member);
}
/**
@@ -352,7 +422,6 @@ swim_member_update_status(struct swim_member *member,
enum swim_member_status new_status,
uint64_t incarnation, struct swim *swim)
{
- (void) swim;
/*
* Source of truth about self is this instance and it is
* never updated from remote. Refutation is handled
@@ -362,12 +431,12 @@ swim_member_update_status(struct swim_member *member,
if (member->incarnation == incarnation) {
if (member->status < new_status) {
member->status = new_status;
- swim_member_status_is_updated(member);
+ swim_member_status_is_updated(member, swim);
}
} else if (member->incarnation < incarnation) {
member->status = new_status;
member->incarnation = incarnation;
- swim_member_status_is_updated(member);
+ swim_member_status_is_updated(member, swim);
}
}
@@ -400,6 +469,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
swim_task_destroy(&member->ack_task);
swim_task_destroy(&member->ping_task);
+ /* Dissemination component. */
+ rlist_del_entry(member, in_queue_events);
+
free(member);
}
@@ -469,6 +541,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
swim_task_create(&member->ack_task, NULL, NULL);
swim_task_create(&member->ping_task, swim_ping_task_complete, NULL);
+ /* Dissemination component. */
+ rlist_create(&member->in_queue_events);
+ swim_member_status_is_updated(member, swim);
+
say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
return member;
}
@@ -610,6 +686,51 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
return 1;
}
+/**
+ * Encode dissemination component.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
+{
+ struct swim_diss_header_bin diss_header_bin;
+ int size = sizeof(diss_header_bin);
+ char *header = swim_packet_reserve(packet, size);
+ if (header == NULL)
+ return 0;
+ int i = 0;
+ struct swim_member *m;
+ struct swim_event_bin event_bin;
+ struct swim_old_uuid_bin old_uuid_bin;
+ swim_event_bin_create(&event_bin);
+ swim_old_uuid_bin_create(&old_uuid_bin);
+ rlist_foreach_entry(m, &swim->queue_events, in_queue_events) {
+ int new_size = size + sizeof(event_bin);
+ if (m->old_uuid_ttl > 0)
+ new_size += sizeof(old_uuid_bin);
+ char *pos = swim_packet_reserve(packet, new_size);
+ if (pos == NULL)
+ break;
+ size = new_size;
+ swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
+ m->incarnation, m->old_uuid_ttl);
+ memcpy(pos, &event_bin, sizeof(event_bin));
+ if (m->old_uuid_ttl > 0) {
+ pos += sizeof(event_bin);
+ swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid);
+ memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin));
+ }
+ ++i;
+ }
+ if (i == 0)
+ return 0;
+ swim_diss_header_bin_create(&diss_header_bin, i);
+ memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+ swim_packet_advance(packet, size);
+ return 1;
+}
+
/** Encode SWIM components into a UDP packet. */
static void
swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -620,12 +741,36 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
map_size += swim_encode_src_uuid(swim, packet);
map_size += swim_encode_failure_detection(swim, packet,
SWIM_FD_MSG_PING);
+ map_size += swim_encode_dissemination(swim, packet);
map_size += swim_encode_anti_entropy(swim, packet);
assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
mp_encode_map(header, map_size);
}
+/**
+ * Decrement TTLs of all events. It is done after each round step.
+ * Note, that when there are too many events to fit into a packet,
+ * the tail of events list without being disseminated start
+ * reeking and rotting, and the most far events can be deleted
+ * without ever being sent. But hardly this situation is reachable
+ * since even 1000 bytes can fit 37 events of ~27 bytes each, that
+ * means in fact a failure of 37 instances. In such a case rotting
+ * events are the most mild problem.
+ */
+static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+ struct swim_member *member, *tmp;
+ rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+ tmp) {
+ if (member->old_uuid_ttl > 0)
+ --member->old_uuid_ttl;
+ if (--member->status_ttl == 0)
+ rlist_del_entry(member, in_queue_events);
+ }
+}
+
/**
* Once per specified timeout trigger a next round step. In round
* step a next memeber is taken from the round queue and a round
@@ -676,6 +821,8 @@ swim_round_step_complete(struct swim_task *task,
* section with a ping.
*/
swim_member_wait_ack(swim, m);
+ /* As well as dissemination. */
+ swim_decrease_events_ttl(swim);
}
}
@@ -737,11 +884,12 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
case MEMBER_ALIVE:
if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
m->status = MEMBER_DEAD;
- swim_member_status_is_updated(m);
+ swim_member_status_is_updated(m, swim);
}
break;
case MEMBER_DEAD:
- if (m->unacknowledged_pings >= NO_ACKS_TO_GC)
+ if (m->unacknowledged_pings >= NO_ACKS_TO_GC &&
+ m->status_ttl == 0)
swim_member_delete(swim, m);
break;
default:
@@ -782,18 +930,20 @@ swim_member_update_uuid(struct swim_member *member,
struct mh_swim_table_key key = {member->hash, &old_uuid};
mh_swim_table_del(t, mh_swim_table_find(t, key, NULL), NULL);
member->hash = swim_uuid_hash(new_uuid);
+ member->old_uuid = old_uuid;
+ swim_member_uuid_is_updated(member, swim);
return 0;
}
/** Update member's address.*/
static inline void
swim_member_update_addr(struct swim_member *member,
- const struct sockaddr_in *addr)
+ const struct sockaddr_in *addr, struct swim *swim)
{
if (addr->sin_port != member->addr.sin_port ||
addr->sin_addr.s_addr != member->addr.sin_addr.s_addr) {
member->addr = *addr;
- swim_member_status_is_updated(member);
+ swim_member_status_is_updated(member, swim);
}
}
@@ -807,6 +957,9 @@ static struct swim_member *
swim_update_member(struct swim *swim, const struct swim_member_def *def)
{
struct swim_member *member = swim_find_member(swim, &def->uuid);
+ struct swim_member *old_member = NULL;
+ if (! tt_uuid_is_nil(&def->old_uuid))
+ old_member = swim_find_member(swim, &def->old_uuid);
if (member == NULL) {
if (def->status == MEMBER_DEAD) {
/*
@@ -821,19 +974,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
*/
return NULL;
}
- member = swim_member_new(swim, &def->addr, &def->uuid,
- def->status, def->incarnation);
+ if (old_member == NULL) {
+ member = swim_member_new(swim, &def->addr, &def->uuid,
+ def->status, def->incarnation);
+ } else if (swim_member_update_uuid(old_member, &def->uuid,
+ swim) == 0) {
+ member = old_member;
+ }
return member;
}
struct swim_member *self = swim->self;
if (member != self) {
if (def->incarnation >= member->incarnation) {
- swim_member_update_addr(member, &def->addr);
+ swim_member_update_addr(member, &def->addr, swim);
swim_member_update_status(member, def->status,
def->incarnation, swim);
+ if (old_member != NULL) {
+ assert(member != old_member);
+ swim_member_delete(swim, old_member);
+ }
}
return member;
}
+ uint64_t old_incarnation = self->incarnation;
/*
* It is possible that other instances know a bigger
* incarnation of this instance - such thing happens when
@@ -852,6 +1015,8 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
*/
self->incarnation++;
}
+ if (old_incarnation != self->incarnation)
+ swim_member_status_is_updated(self, swim);
return member;
}
@@ -920,6 +1085,31 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
}
return 0;
}
+/**
+ * Decode a dissemination message. Schedule new events, update
+ * members.
+ */
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+ const char *msg_pref = "invald dissemination message:";
+ uint32_t size;
+ if (swim_decode_array(pos, end, &size, msg_pref, "root") != 0)
+ return -1;
+ for (uint32_t i = 0; i < size; ++i) {
+ struct swim_member_def def;
+ if (swim_member_def_decode(&def, pos, end, msg_pref) != 0)
+ return -1;
+ if (swim_update_member(swim, &def) == NULL) {
+ /*
+ * Not a critical error - other updates
+ * still can be applied.
+ */
+ diag_log();
+ }
+ }
+ return 0;
+}
/** Process a new message. */
static void
@@ -962,6 +1152,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
src, &uuid) != 0)
goto error;
break;
+ case SWIM_DISSEMINATION:
+ say_verbose("SWIM: process dissemination");
+ if (swim_process_dissemination(swim, &pos, end) != 0)
+ goto error;
+ break;
default:
diag_set(SwimError, "%s unexpected key", msg_pref);
goto error;
@@ -1000,6 +1195,10 @@ swim_new(void)
ev_init(&swim->wait_ack_tick, swim_check_acks);
ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL);
swim->wait_ack_tick.data = (void *) swim;
+
+ /* Dissemination component. */
+ rlist_create(&swim->queue_events);
+
return swim;
}
@@ -1089,7 +1288,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
ev_periodic_start(loop(), &swim->wait_ack_tick);
if (! is_first_cfg) {
- swim_member_update_addr(swim->self, &addr);
+ swim_member_update_addr(swim->self, &addr, swim);
int rc = swim_member_update_uuid(swim->self, uuid, swim);
/* Reserved above. */
assert(rc == 0);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 542b988c1..e31c67682 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -189,6 +189,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member incarnation") != 0)
return -1;
break;
+ case SWIM_MEMBER_OLD_UUID:
+ if (swim_decode_uuid(&def->old_uuid, pos, end, msg_pref,
+ "member old uuid") != 0)
+ return -1;
+ break;
default:
unreachable();
}
@@ -337,6 +342,59 @@ swim_member_bin_create(struct swim_member_bin *header)
header->m_incarnation = 0xcf;
}
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+ uint16_t batch_size)
+{
+ header->k_header = SWIM_DISSEMINATION;
+ header->m_header = 0xcd;
+ header->v_header = mp_bswap_u16(batch_size);
+}
+
+void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+ header->k_status = SWIM_MEMBER_STATUS;
+ header->k_addr = SWIM_MEMBER_ADDRESS;
+ header->m_addr = 0xce;
+ header->k_port = SWIM_MEMBER_PORT;
+ header->m_port = 0xcd;
+ header->k_uuid = SWIM_MEMBER_UUID;
+ header->m_uuid = 0xc4;
+ header->m_uuid_len = UUID_LEN;
+ header->k_incarnation = SWIM_MEMBER_INCARNATION;
+ header->m_incarnation = 0xcf;
+}
+
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+ enum swim_member_status status,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ uint64_t incarnation, int old_uuid_ttl)
+{
+ header->m_header = 0x85 + (old_uuid_ttl > 0);
+ header->v_status = status;
+ header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
+ header->v_port = mp_bswap_u16(addr->sin_port);
+ memcpy(header->v_uuid, uuid, UUID_LEN);
+ header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+void
+swim_old_uuid_bin_create(struct swim_old_uuid_bin *header)
+{
+ header->k_uuid = SWIM_MEMBER_OLD_UUID;
+ header->m_uuid = 0xc4;
+ header->m_uuid_len = UUID_LEN;
+}
+
+void
+swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
+ const struct tt_uuid *uuid)
+{
+ memcpy(header->v_uuid, uuid, UUID_LEN);
+}
+
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
const struct sockaddr_in *src)
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 91a0bca9d..a3dc1164e 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -58,6 +58,19 @@
* | |
* | OR/AND |
* | |
+ * | SWIM_DISSEMINATION: [ |
+ * | { |
+ * | SWIM_MEMBER_STATUS: uint, enum member_status, |
+ * | SWIM_MEMBER_ADDRESS: uint, ip, |
+ * | SWIM_MEMBER_PORT: uint, port, |
+ * | SWIM_MEMBER_UUID: 16 byte UUID, |
+ * | SWIM_MEMBER_INCARNATION: uint |
+ * | }, |
+ * | ... |
+ * | ], |
+ * | |
+ * | OR/AND |
+ * | |
* | SWIM_ANTI_ENTROPY: [ |
* | { |
* | SWIM_MEMBER_STATUS: uint, enum member_status, |
@@ -91,6 +104,7 @@ extern const char *swim_member_status_strs[];
*/
struct swim_member_def {
struct tt_uuid uuid;
+ struct tt_uuid old_uuid;
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
@@ -124,6 +138,7 @@ enum swim_body_key {
SWIM_SRC_UUID = 0,
SWIM_ANTI_ENTROPY,
SWIM_FAILURE_DETECTION,
+ SWIM_DISSEMINATION,
};
/**
@@ -231,6 +246,7 @@ enum swim_member_key {
SWIM_MEMBER_PORT,
SWIM_MEMBER_UUID,
SWIM_MEMBER_INCARNATION,
+ SWIM_MEMBER_OLD_UUID,
swim_member_key_MAX,
};
@@ -304,6 +320,98 @@ swim_member_bin_fill(struct swim_member_bin *header,
/** }}} Anti-entropy component */
+/** {{{ Dissemination component */
+
+/** SWIM dissemination MessagePack template. */
+struct PACKED swim_diss_header_bin {
+ /** mp_encode_uint(SWIM_DISSEMINATION) */
+ uint8_t k_header;
+ /** mp_encode_array() */
+ uint8_t m_header;
+ uint16_t v_header;
+};
+
+/** Initialize dissemination header. */
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+ uint16_t batch_size);
+
+/** SWIM event MessagePack template. */
+struct PACKED swim_event_bin {
+ /** mp_encode_map(5 or 6) */
+ uint8_t m_header;
+
+ /** mp_encode_uint(SWIM_MEMBER_STATUS) */
+ uint8_t k_status;
+ /** mp_encode_uint(enum member_status) */
+ uint8_t v_status;
+
+ /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+ uint8_t k_addr;
+ /** mp_encode_uint(addr.sin_addr.s_addr) */
+ uint8_t m_addr;
+ uint32_t v_addr;
+
+ /** mp_encode_uint(SWIM_MEMBER_PORT) */
+ uint8_t k_port;
+ /** mp_encode_uint(addr.sin_port) */
+ uint8_t m_port;
+ uint16_t v_port;
+
+ /** mp_encode_uint(SWIM_MEMBER_UUID) */
+ uint8_t k_uuid;
+ /** mp_encode_bin(UUID_LEN) */
+ uint8_t m_uuid;
+ uint8_t m_uuid_len;
+ uint8_t v_uuid[UUID_LEN];
+
+ /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+ uint8_t k_incarnation;
+ /** mp_encode_uint(64bit incarnation) */
+ uint8_t m_incarnation;
+ uint64_t v_incarnation;
+};
+
+/** Initialize dissemination record. */
+void
+swim_event_bin_create(struct swim_event_bin *header);
+
+/**
+ * Since usually there are many evnets, it is faster to reset a
+ * few fields in an existing template, then each time create a
+ * new template. So the usage pattern is create(), fill(),
+ * fill() ... .
+ */
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+ enum swim_member_status status,
+ const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+ uint64_t incarnation, int old_uuid_ttl);
+
+/** Optional attribute of an event - old UUID of a member. */
+struct swim_old_uuid_bin {
+ /** mp_encode_uint(SWIM_MEMBER_OLD_UUID) */
+ uint8_t k_uuid;
+ /** mp_encode_bin(UUID_LEN) */
+ uint8_t m_uuid;
+ uint8_t m_uuid_len;
+ uint8_t v_uuid[UUID_LEN];
+};
+
+/** Initialize old UUID field. */
+void
+swim_old_uuid_bin_create(struct swim_old_uuid_bin *header);
+
+/**
+ * Set mutable fields of the field, by the same principle as event
+ * filling.
+ */
+void
+swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
+ const struct tt_uuid *uuid);
+
+/** }}} Dissemination component */
+
/** {{{ Meta component */
/**
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list