From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v4 06/12] [RAW] swim: introduce dissemination component Date: Thu, 31 Jan 2019 00:28:35 +0300 Message-Id: <7f41ee1bcb773aa4f55ba1165b05a4c694a38ce6.1548883137.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org, vdavydov.dev@gmail.com List-ID: Dissemination components broadcasts events about member status updates. Part of #3234 --- src/lib/swim/swim.c | 223 ++++++++++++++++++++++++++++++++++++-- src/lib/swim/swim_proto.c | 58 ++++++++++ src/lib/swim/swim_proto.h | 108 ++++++++++++++++++ 3 files changed, 377 insertions(+), 12 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index a862f52a4..353e55254 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -243,6 +243,42 @@ struct swim_member { struct swim_task ping_task; /** Position in a queue of members waiting for an ack. */ struct rlist in_queue_wait_ack; + /** + * + * Dissemination component + * + * Dissemination component sends events. Event is a + * notification about member status update. So formally, + * this structure already has all the needed attributes. + * But also an event somehow should be sent to all members + * at least once according to SWIM, so it requires + * something like TTL for each type of event, which + * decrements on each send. And a member can not be + * removed from the global table until it gets dead and + * its status TTLs is 0, so as to allow other members + * learn its dead status. + */ + int status_ttl; + /** + * Events are put into a queue sorted by event occurrence + * time. + */ + struct rlist in_queue_events; + /** + * Old UUID is sent for a while after its update so as to + * allow other members to update this members's record + * in their tables. + */ + struct tt_uuid old_uuid; + /** + * UUID is quite heavy structure, so an old UUID is sent + * only this number of times. A current UUID is sent + * always. Moreover, if someone wanted to reuse UUID, + * always sending old ones would make it much harder to + * detect which instance has just updated UUID, and which + * old UUID is handed over to another instance. + */ + int old_uuid_ttl; }; #define mh_name _swim_table @@ -313,6 +349,12 @@ struct swim { struct rlist queue_wait_ack; /** Generator of ack checking events. */ struct ev_periodic wait_ack_tick; + /** + * + * Dissemination component + */ + /** Queue of events sorted by occurrence time. */ + struct rlist queue_events; }; /** Put the member into a list of ACK waiters. */ @@ -327,14 +369,42 @@ swim_member_wait_ack(struct swim *swim, struct swim_member *member) } } +/** + * On literally any update of a member it stands into a queue of + * events to disseminate the update. Note that status TTL is + * always set, even if UUID is updated, or any other attribute. It + * is because 1) it simplifies the code when status TTL is bigger + * than all other ones, 2) status occupies only 2 bytes in a + * packet, so it is never worse to send it on any update, but + * reduces entropy. + */ +static inline void +swim_schedule_event(struct swim *swim, struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_events)) { + rlist_add_tail_entry(&swim->queue_events, member, + in_queue_events); + } + member->status_ttl = mh_size(swim->members); +} + /** * Make all needed actions to process a member's update like a * change of its status, or incarnation, or both. */ static void -swim_member_status_is_updated(struct swim_member *member) +swim_member_status_is_updated(struct swim_member *member, struct swim *swim) { member->unacknowledged_pings = 0; + swim_schedule_event(swim, member); +} + +/** Make all needed actions to process member's UUID update. */ +static void +swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim) +{ + member->old_uuid_ttl = mh_size(swim->members); + swim_schedule_event(swim, member); } /** @@ -352,7 +422,6 @@ swim_member_update_status(struct swim_member *member, enum swim_member_status new_status, uint64_t incarnation, struct swim *swim) { - (void) swim; /* * Source of truth about self is this instance and it is * never updated from remote. Refutation is handled @@ -362,12 +431,12 @@ swim_member_update_status(struct swim_member *member, if (member->incarnation == incarnation) { if (member->status < new_status) { member->status = new_status; - swim_member_status_is_updated(member); + swim_member_status_is_updated(member, swim); } } else if (member->incarnation < incarnation) { member->status = new_status; member->incarnation = incarnation; - swim_member_status_is_updated(member); + swim_member_status_is_updated(member, swim); } } @@ -400,6 +469,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member) swim_task_destroy(&member->ack_task); swim_task_destroy(&member->ping_task); + /* Dissemination component. */ + rlist_del_entry(member, in_queue_events); + free(member); } @@ -469,6 +541,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, swim_task_create(&member->ack_task, NULL, NULL); swim_task_create(&member->ping_task, swim_ping_task_complete, NULL); + /* Dissemination component. */ + rlist_create(&member->in_queue_events); + swim_member_status_is_updated(member, swim); + say_verbose("SWIM: member %s is added", swim_uuid_str(uuid)); return member; } @@ -610,6 +686,51 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet, return 1; } +/** + * Encode dissemination component. + * @retval 0 Not error, but nothing is encoded. + * @retval 1 Something is encoded. + */ +static int +swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) +{ + struct swim_diss_header_bin diss_header_bin; + int size = sizeof(diss_header_bin); + char *header = swim_packet_reserve(packet, size); + if (header == NULL) + return 0; + int i = 0; + struct swim_member *m; + struct swim_event_bin event_bin; + struct swim_old_uuid_bin old_uuid_bin; + swim_event_bin_create(&event_bin); + swim_old_uuid_bin_create(&old_uuid_bin); + rlist_foreach_entry(m, &swim->queue_events, in_queue_events) { + int new_size = size + sizeof(event_bin); + if (m->old_uuid_ttl > 0) + new_size += sizeof(old_uuid_bin); + char *pos = swim_packet_reserve(packet, new_size); + if (pos == NULL) + break; + size = new_size; + swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid, + m->incarnation, m->old_uuid_ttl); + memcpy(pos, &event_bin, sizeof(event_bin)); + if (m->old_uuid_ttl > 0) { + pos += sizeof(event_bin); + swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid); + memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin)); + } + ++i; + } + if (i == 0) + return 0; + swim_diss_header_bin_create(&diss_header_bin, i); + memcpy(header, &diss_header_bin, sizeof(diss_header_bin)); + swim_packet_advance(packet, size); + return 1; +} + /** Encode SWIM components into a UDP packet. */ static void swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) @@ -620,12 +741,36 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet) map_size += swim_encode_src_uuid(swim, packet); map_size += swim_encode_failure_detection(swim, packet, SWIM_FD_MSG_PING); + map_size += swim_encode_dissemination(swim, packet); map_size += swim_encode_anti_entropy(swim, packet); assert(mp_sizeof_map(map_size) == 1 && map_size >= 2); mp_encode_map(header, map_size); } +/** + * Decrement TTLs of all events. It is done after each round step. + * Note, that when there are too many events to fit into a packet, + * the tail of events list without being disseminated start + * reeking and rotting, and the most far events can be deleted + * without ever being sent. But hardly this situation is reachable + * since even 1000 bytes can fit 37 events of ~27 bytes each, that + * means in fact a failure of 37 instances. In such a case rotting + * events are the most mild problem. + */ +static void +swim_decrease_events_ttl(struct swim *swim) +{ + struct swim_member *member, *tmp; + rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events, + tmp) { + if (member->old_uuid_ttl > 0) + --member->old_uuid_ttl; + if (--member->status_ttl == 0) + rlist_del_entry(member, in_queue_events); + } +} + /** * Once per specified timeout trigger a next round step. In round * step a next memeber is taken from the round queue and a round @@ -676,6 +821,8 @@ swim_round_step_complete(struct swim_task *task, * section with a ping. */ swim_member_wait_ack(swim, m); + /* As well as dissemination. */ + swim_decrease_events_ttl(swim); } } @@ -737,11 +884,12 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) case MEMBER_ALIVE: if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) { m->status = MEMBER_DEAD; - swim_member_status_is_updated(m); + swim_member_status_is_updated(m, swim); } break; case MEMBER_DEAD: - if (m->unacknowledged_pings >= NO_ACKS_TO_GC) + if (m->unacknowledged_pings >= NO_ACKS_TO_GC && + m->status_ttl == 0) swim_member_delete(swim, m); break; default: @@ -782,18 +930,20 @@ swim_member_update_uuid(struct swim_member *member, struct mh_swim_table_key key = {member->hash, &old_uuid}; mh_swim_table_del(t, mh_swim_table_find(t, key, NULL), NULL); member->hash = swim_uuid_hash(new_uuid); + member->old_uuid = old_uuid; + swim_member_uuid_is_updated(member, swim); return 0; } /** Update member's address.*/ static inline void swim_member_update_addr(struct swim_member *member, - const struct sockaddr_in *addr) + const struct sockaddr_in *addr, struct swim *swim) { if (addr->sin_port != member->addr.sin_port || addr->sin_addr.s_addr != member->addr.sin_addr.s_addr) { member->addr = *addr; - swim_member_status_is_updated(member); + swim_member_status_is_updated(member, swim); } } @@ -807,6 +957,9 @@ static struct swim_member * swim_update_member(struct swim *swim, const struct swim_member_def *def) { struct swim_member *member = swim_find_member(swim, &def->uuid); + struct swim_member *old_member = NULL; + if (! tt_uuid_is_nil(&def->old_uuid)) + old_member = swim_find_member(swim, &def->old_uuid); if (member == NULL) { if (def->status == MEMBER_DEAD) { /* @@ -821,19 +974,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def) */ return NULL; } - member = swim_member_new(swim, &def->addr, &def->uuid, - def->status, def->incarnation); + if (old_member == NULL) { + member = swim_member_new(swim, &def->addr, &def->uuid, + def->status, def->incarnation); + } else if (swim_member_update_uuid(old_member, &def->uuid, + swim) == 0) { + member = old_member; + } return member; } struct swim_member *self = swim->self; if (member != self) { if (def->incarnation >= member->incarnation) { - swim_member_update_addr(member, &def->addr); + swim_member_update_addr(member, &def->addr, swim); swim_member_update_status(member, def->status, def->incarnation, swim); + if (old_member != NULL) { + assert(member != old_member); + swim_member_delete(swim, old_member); + } } return member; } + uint64_t old_incarnation = self->incarnation; /* * It is possible that other instances know a bigger * incarnation of this instance - such thing happens when @@ -852,6 +1015,8 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def) */ self->incarnation++; } + if (old_incarnation != self->incarnation) + swim_member_status_is_updated(self, swim); return member; } @@ -920,6 +1085,31 @@ swim_process_failure_detection(struct swim *swim, const char **pos, } return 0; } +/** + * Decode a dissemination message. Schedule new events, update + * members. + */ +static int +swim_process_dissemination(struct swim *swim, const char **pos, const char *end) +{ + const char *msg_pref = "invald dissemination message:"; + uint32_t size; + if (swim_decode_array(pos, end, &size, msg_pref, "root") != 0) + return -1; + for (uint32_t i = 0; i < size; ++i) { + struct swim_member_def def; + if (swim_member_def_decode(&def, pos, end, msg_pref) != 0) + return -1; + if (swim_update_member(swim, &def) == NULL) { + /* + * Not a critical error - other updates + * still can be applied. + */ + diag_log(); + } + } + return 0; +} /** Process a new message. */ static void @@ -962,6 +1152,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos, src, &uuid) != 0) goto error; break; + case SWIM_DISSEMINATION: + say_verbose("SWIM: process dissemination"); + if (swim_process_dissemination(swim, &pos, end) != 0) + goto error; + break; default: diag_set(SwimError, "%s unexpected key", msg_pref); goto error; @@ -1000,6 +1195,10 @@ swim_new(void) ev_init(&swim->wait_ack_tick, swim_check_acks); ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL); swim->wait_ack_tick.data = (void *) swim; + + /* Dissemination component. */ + rlist_create(&swim->queue_events); + return swim; } @@ -1089,7 +1288,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, ev_periodic_start(loop(), &swim->wait_ack_tick); if (! is_first_cfg) { - swim_member_update_addr(swim->self, &addr); + swim_member_update_addr(swim->self, &addr, swim); int rc = swim_member_update_uuid(swim->self, uuid, swim); /* Reserved above. */ assert(rc == 0); diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index 542b988c1..e31c67682 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -189,6 +189,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member incarnation") != 0) return -1; break; + case SWIM_MEMBER_OLD_UUID: + if (swim_decode_uuid(&def->old_uuid, pos, end, msg_pref, + "member old uuid") != 0) + return -1; + break; default: unreachable(); } @@ -337,6 +342,59 @@ swim_member_bin_create(struct swim_member_bin *header) header->m_incarnation = 0xcf; } +void +swim_diss_header_bin_create(struct swim_diss_header_bin *header, + uint16_t batch_size) +{ + header->k_header = SWIM_DISSEMINATION; + header->m_header = 0xcd; + header->v_header = mp_bswap_u16(batch_size); +} + +void +swim_event_bin_create(struct swim_event_bin *header) +{ + header->k_status = SWIM_MEMBER_STATUS; + header->k_addr = SWIM_MEMBER_ADDRESS; + header->m_addr = 0xce; + header->k_port = SWIM_MEMBER_PORT; + header->m_port = 0xcd; + header->k_uuid = SWIM_MEMBER_UUID; + header->m_uuid = 0xc4; + header->m_uuid_len = UUID_LEN; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; +} + +void +swim_event_bin_fill(struct swim_event_bin *header, + enum swim_member_status status, + const struct sockaddr_in *addr, const struct tt_uuid *uuid, + uint64_t incarnation, int old_uuid_ttl) +{ + header->m_header = 0x85 + (old_uuid_ttl > 0); + header->v_status = status; + header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr); + header->v_port = mp_bswap_u16(addr->sin_port); + memcpy(header->v_uuid, uuid, UUID_LEN); + header->v_incarnation = mp_bswap_u64(incarnation); +} + +void +swim_old_uuid_bin_create(struct swim_old_uuid_bin *header) +{ + header->k_uuid = SWIM_MEMBER_OLD_UUID; + header->m_uuid = 0xc4; + header->m_uuid_len = UUID_LEN; +} + +void +swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header, + const struct tt_uuid *uuid) +{ + memcpy(header->v_uuid, uuid, UUID_LEN); +} + void swim_meta_header_bin_create(struct swim_meta_header_bin *header, const struct sockaddr_in *src) diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index 91a0bca9d..a3dc1164e 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -58,6 +58,19 @@ * | | * | OR/AND | * | | + * | SWIM_DISSEMINATION: [ | + * | { | + * | SWIM_MEMBER_STATUS: uint, enum member_status, | + * | SWIM_MEMBER_ADDRESS: uint, ip, | + * | SWIM_MEMBER_PORT: uint, port, | + * | SWIM_MEMBER_UUID: 16 byte UUID, | + * | SWIM_MEMBER_INCARNATION: uint | + * | }, | + * | ... | + * | ], | + * | | + * | OR/AND | + * | | * | SWIM_ANTI_ENTROPY: [ | * | { | * | SWIM_MEMBER_STATUS: uint, enum member_status, | @@ -91,6 +104,7 @@ extern const char *swim_member_status_strs[]; */ struct swim_member_def { struct tt_uuid uuid; + struct tt_uuid old_uuid; struct sockaddr_in addr; uint64_t incarnation; enum swim_member_status status; @@ -124,6 +138,7 @@ enum swim_body_key { SWIM_SRC_UUID = 0, SWIM_ANTI_ENTROPY, SWIM_FAILURE_DETECTION, + SWIM_DISSEMINATION, }; /** @@ -231,6 +246,7 @@ enum swim_member_key { SWIM_MEMBER_PORT, SWIM_MEMBER_UUID, SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_OLD_UUID, swim_member_key_MAX, }; @@ -304,6 +320,98 @@ swim_member_bin_fill(struct swim_member_bin *header, /** }}} Anti-entropy component */ +/** {{{ Dissemination component */ + +/** SWIM dissemination MessagePack template. */ +struct PACKED swim_diss_header_bin { + /** mp_encode_uint(SWIM_DISSEMINATION) */ + uint8_t k_header; + /** mp_encode_array() */ + uint8_t m_header; + uint16_t v_header; +}; + +/** Initialize dissemination header. */ +void +swim_diss_header_bin_create(struct swim_diss_header_bin *header, + uint16_t batch_size); + +/** SWIM event MessagePack template. */ +struct PACKED swim_event_bin { + /** mp_encode_map(5 or 6) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ + uint8_t k_status; + /** mp_encode_uint(enum member_status) */ + uint8_t v_status; + + /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */ + uint8_t k_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_addr; + uint32_t v_addr; + + /** mp_encode_uint(SWIM_MEMBER_PORT) */ + uint8_t k_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_port; + uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_UUID) */ + uint8_t k_uuid; + /** mp_encode_bin(UUID_LEN) */ + uint8_t m_uuid; + uint8_t m_uuid_len; + uint8_t v_uuid[UUID_LEN]; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +/** Initialize dissemination record. */ +void +swim_event_bin_create(struct swim_event_bin *header); + +/** + * Since usually there are many evnets, it is faster to reset a + * few fields in an existing template, then each time create a + * new template. So the usage pattern is create(), fill(), + * fill() ... . + */ +void +swim_event_bin_fill(struct swim_event_bin *header, + enum swim_member_status status, + const struct sockaddr_in *addr, const struct tt_uuid *uuid, + uint64_t incarnation, int old_uuid_ttl); + +/** Optional attribute of an event - old UUID of a member. */ +struct swim_old_uuid_bin { + /** mp_encode_uint(SWIM_MEMBER_OLD_UUID) */ + uint8_t k_uuid; + /** mp_encode_bin(UUID_LEN) */ + uint8_t m_uuid; + uint8_t m_uuid_len; + uint8_t v_uuid[UUID_LEN]; +}; + +/** Initialize old UUID field. */ +void +swim_old_uuid_bin_create(struct swim_old_uuid_bin *header); + +/** + * Set mutable fields of the field, by the same principle as event + * filling. + */ +void +swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header, + const struct tt_uuid *uuid); + +/** }}} Dissemination component */ + /** {{{ Meta component */ /** -- 2.17.2 (Apple Git-113)