From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v2 3/6] [RAW] swim: introduce a dissemination component Date: Tue, 25 Dec 2018 22:19:26 +0300 Message-Id: <3b5f96c93bdb29e60de4565ba4bcb93edcd6389a.1545765055.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: Dissemination components broadcasts events about member status updates. Part of #3234 --- src/lib/swim/swim.c | 270 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 265 insertions(+), 5 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 22bc06a60..15e079f11 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -377,6 +377,26 @@ struct swim_member { struct swim_io_task ping_task; /** Position in a queue of members waiting for an ack. */ struct rlist in_queue_wait_ack; + /** + * + * Dissemination component + * + * Dissemination component sends events. Event is a + * notification about member status update. So formally, + * this structure already has all the needed attributes. + * But also an event somehow should be sent to all members + * at least once according to SWIM, so it requires + * something like TTL, which decrements on each send. And + * a member can not be removed from the global table until + * it gets dead and its dissemination TTL is 0, so as to + * allow other members learn its dead status. + */ + int dissemination_ttl; + /** + * Events are put into a queue sorted by event occurrence + * time. + */ + struct rlist in_queue_events; }; /** @@ -451,6 +471,8 @@ struct swim { struct rlist queue_wait_ack; /** Generator of ack checking events. */ struct ev_periodic wait_ack_tick; + /** Queue of events sorted by occurrence time. */ + struct rlist queue_events; }; static inline uint64_t @@ -467,6 +489,7 @@ sockaddr_in_hash(const struct sockaddr_in *a) enum swim_component_type { SWIM_ANTI_ENTROPY = 0, SWIM_FAILURE_DETECTION, + SWIM_DISSEMINATION, }; /** {{{ Failure detection component */ @@ -634,6 +657,88 @@ swim_member_bin_create(struct swim_member_bin *header) /** }}} Anti-entropy component */ +/** {{{ Dissemination component */ + +/** SWIM dissemination MsgPack template. */ +struct PACKED swim_diss_header_bin { + /** mp_encode_uint(SWIM_DISSEMINATION) */ + uint8_t k_header; + /** mp_encode_array() */ + uint8_t m_header; + uint32_t v_header; +}; + +static inline void +swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size) +{ + header->k_header = SWIM_DISSEMINATION; + header->m_header = 0xdd; + header->v_header = mp_bswap_u32(batch_size); +} + +/** SWIM event MsgPack template. */ +struct PACKED swim_event_bin { + /** mp_encode_map(4) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ + uint8_t k_status; + /** mp_encode_uint(enum member_status) */ + uint8_t v_status; + + /** mp_encode_uint(SWIM_MEMBER_ADDR) */ + uint8_t k_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_addr; + uint32_t v_addr; + + /** mp_encode_uint(SWIM_MEMBER_PORT) */ + uint8_t k_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_port; + uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +static inline void +swim_event_bin_create(struct swim_event_bin *header) +{ + header->m_header = 0x84; + header->k_status = SWIM_MEMBER_STATUS; + header->k_addr = SWIM_MEMBER_ADDR; + header->m_addr = 0xce; + header->k_port = SWIM_MEMBER_PORT; + header->m_port = 0xcd; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; +} + +static inline void +swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member) +{ + header->v_status = member->status; + header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); + header->v_port = mp_bswap_u16(member->addr.sin_port); + header->v_incarnation = mp_bswap_u64(member->incarnation); +} + +static inline void +swim_schedule_event(struct swim *swim, struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_events)) { + rlist_add_tail_entry(&swim->queue_events, member, + in_queue_events); + } + member->dissemination_ttl = mh_size(swim->members); +} + +/** }}} Dissemination component */ + /** * SWIM message structure: * { @@ -644,6 +749,18 @@ swim_member_bin_create(struct swim_member_bin *header) * * OR/AND * + * SWIM_DISSEMINATION: [ + * { + * SWIM_MEMBER_STATUS: uint, enum member_status, + * SWIM_MEMBER_ADDR: uint, ip, + * SWIM_MEMBER_PORT: uint, port, + * SWIM_MEMBER_INCARNATION: uint + * }, + * ... + * ], + * + * OR/AND + * * SWIM_ANTI_ENTROPY: [ * { * SWIM_MEMBER_STATUS: uint, enum member_status, @@ -663,6 +780,16 @@ swim_io_task_push(struct swim_io_task *task) ev_io_start(loop(), &task->swim->output); } +/** + * Make all needed actions to process a member's update like a + * change of its status, or incarnation, or both. + */ +static void +swim_member_is_updated(struct swim *swim, struct swim_member *member) +{ + swim_schedule_event(swim, member); +} + /** * Update status of the member if needed. Statuses are compared as * a compound key: {incarnation, status}. So @a new_status can @@ -678,14 +805,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member, enum swim_member_status new_status, uint64_t incarnation) { - (void) swim; assert(member != swim->self); if (member->incarnation == incarnation) { - if (member->status < new_status) + if (member->status < new_status) { member->status = new_status; + swim_member_is_updated(swim, member); + } } else if (member->incarnation < incarnation) { member->status = new_status; member->incarnation = incarnation; + swim_member_is_updated(swim, member); } } @@ -725,6 +854,8 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, swim_io_task_create(&member->ping_task, swim_send_ping, swim); rlist_add_entry(&swim->queue_round, member, in_queue_round); rlist_create(&member->in_queue_wait_ack); + rlist_create(&member->in_queue_events); + swim_schedule_event(swim, member); return member; } @@ -753,6 +884,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member) swim_io_task_destroy(&member->ping_task); rlist_del_entry(member, in_queue_round); rlist_del_entry(member, in_queue_wait_ack); + assert(rlist_empty(&member->in_queue_events)); free(member); } @@ -872,6 +1004,62 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg, return 1; } +static int +swim_encode_dissemination_part(struct swim_msg *msg, struct rlist **queue_pos) +{ + struct swim_diss_header_bin diss_header_bin; + struct swim_msg_part *part = + swim_msg_reserve(msg, sizeof(diss_header_bin)); + if (part == NULL) + return -1; + char *header = swim_msg_part_pos(part); + char *end = swim_msg_part_end(part); + char *pos = header + sizeof(diss_header_bin); + + int i = 0; + struct swim_member *member, *prev = NULL; + struct swim_event_bin event_bin; + swim_event_bin_create(&event_bin); + rlist_foreach_entry(member, *queue_pos, in_queue_events) { + if (pos + sizeof(event_bin) > end) + break; + swim_event_bin_reset(&event_bin, member); + memcpy(pos, &event_bin, sizeof(event_bin)); + pos += sizeof(event_bin); + ++i; + prev = member; + } + if (i == 0) + return 0; + swim_diss_header_bin_create(&diss_header_bin, i); + memcpy(header, &diss_header_bin, sizeof(diss_header_bin)); + swim_msg_part_advance(part, pos - header); + if (prev == rlist_last_entry(*queue_pos, struct swim_member, + in_queue_events)) + *queue_pos = NULL; + else + *queue_pos = rlist_next(&prev->in_queue_events); + return i; +} + +/** + * Encode failure dissemination component. + * @retval Number of encoded events. + */ +static int +swim_encode_dissemination(struct swim *swim, struct swim_msg *msg) +{ + int count = 0; + struct rlist *pos = rlist_first(&swim->queue_events); + while (pos != rlist_last(&swim->queue_events)) { + int rc = swim_encode_dissemination_part(msg, &pos); + if (rc < 0) + return -1; + count += rc; + } + return count; +} + /** Encode SWIM components into a sequence of UDP packets. */ static int swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) @@ -889,6 +1077,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) goto error; map_size += rc > 0; + rc = swim_encode_dissemination(swim, msg); + if (rc < 0) + goto error; + map_size += rc > 0; + rc = swim_encode_anti_entropy(swim, msg); if (rc < 0) goto error; @@ -902,6 +1095,17 @@ error: return -1; } +static void +swim_decrease_events_ttl(struct swim *swim) +{ + struct swim_member *member, *tmp; + rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events, + tmp) { + if (--member->dissemination_ttl == 0) + rlist_del_entry(member, in_queue_events); + } +} + /** * Do one round step. Send encoded components to a next member * from the queue. @@ -943,6 +1147,7 @@ swim_send_round_msg(struct swim_io_task *task) } swim_msg_destroy(&msg); swim_member_schedule_ack_wait(swim, m); + swim_decrease_events_ttl(swim); rlist_del_entry(m, in_queue_round); next_round_step: ev_periodic_start(loop(), &swim->round_tick); @@ -1040,12 +1245,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) break; ++m->failed_pings; if (m->failed_pings >= NO_ACKS_TO_GC) { - if (!m->is_pinned) + if (!m->is_pinned && m->dissemination_ttl == 0) swim_member_delete(swim, m); continue; } - if (m->failed_pings >= NO_ACKS_TO_DEAD) + if (m->failed_pings >= NO_ACKS_TO_DEAD) { m->status = MEMBER_DEAD; + swim_member_is_updated(swim, m); + } swim_io_task_push(&m->ping_task); rlist_del_entry(m, in_queue_wait_ack); } @@ -1296,6 +1503,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos, return 0; } +static int +swim_process_dissemination(struct swim *swim, const char **pos, const char *end) +{ + const char *msg_pref = "Invald SWIM dissemination message:"; + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) { + say_error("%s message should be an array", msg_pref); + return -1; + } + uint64_t size = mp_decode_array(pos); + for (uint64_t i = 0; i < size; ++i) { + if (mp_typeof(**pos) != MP_MAP || + mp_check_map(*pos, end) > 0) { + say_error("%s event should be map", msg_pref); + return -1; + } + uint64_t map_size = mp_decode_map(pos); + struct swim_member_def def; + swim_member_def_create(&def); + for (uint64_t j = 0; j < map_size; ++j) { + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s event key should be uint", + msg_pref); + return -1; + } + uint64_t key = mp_decode_uint(pos); + if (key >= swim_member_key_MAX) { + say_error("%s unknown event key", msg_pref); + return -1; + } + if (swim_process_member_key(key, pos, end, msg_pref, + &def) != 0) + return -1; + } + if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) { + say_error("%s member address should be specified", + msg_pref); + return -1; + } + swim_process_member_update(swim, &def); + } + return 0; +} + /** Receive and process a new message. */ static void swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) @@ -1344,6 +1595,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) &addr) != 0) return; break; + case SWIM_DISSEMINATION: + say_verbose("SWIM: process dissemination"); + if (swim_process_dissemination(swim, &pos, end) != 0) + return; + break; default: say_error("%s unknown component type component is "\ "supported", msg_pref); @@ -1422,6 +1678,7 @@ swim_new(void) ev_init(&swim->wait_ack_tick, swim_check_acks); ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL); swim->wait_ack_tick.data = (void *) swim; + rlist_create(&swim->queue_events); return swim; } @@ -1508,8 +1765,10 @@ swim_remove_member(struct swim *swim, const char *uri) if (uri_to_addr(uri, &addr) != 0) return -1; struct swim_member *member = swim_find_member(swim, &addr); - if (member != NULL) + if (member != NULL) { + rlist_del_entry(member, in_queue_events); swim_member_delete(swim, member); + } return 0; } @@ -1546,6 +1805,7 @@ swim_delete(struct swim *swim) while (node != mh_end(swim->members)) { struct swim_member *m = (struct swim_member *) mh_i64ptr_node(swim->members, node)->val; + rlist_del_entry(m, in_queue_events); swim_member_delete(swim, m); node = mh_first(swim->members); } -- 2.17.2 (Apple Git-113)