From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v3 3/6] [RAW] swim: introduce a dissemination component Date: Sat, 29 Dec 2018 13:14:12 +0300 Message-Id: <1696a639c946282a9c50cf72ea782963e12bb5df.1546077015.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: Dissemination components broadcasts events about member status updates. Part of #3234 --- src/lib/swim/swim.c | 287 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 282 insertions(+), 5 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index c7bc11bca..4e7ffbc54 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -184,6 +184,27 @@ struct swim_member { struct swim_task ping_task; /** Position in a queue of members waiting for an ack. */ struct rlist in_queue_wait_ack; + /** + * + * Dissemination component + * + * Dissemination component sends events. Event is a + * notification about member status update. So formally, + * this structure already has all the needed attributes. + * But also an event somehow should be sent to all members + * at least once according to SWIM, so it requires + * something like TTL for each type of event, which + * decrements on each send. And a member can not be + * removed from the global table until it gets dead and + * its status TTLs is 0, so as to allow other members + * learn its dead status. + */ + int status_ttl; + /** + * Events are put into a queue sorted by event occurrence + * time. + */ + struct rlist in_queue_events; }; /** @@ -248,6 +269,12 @@ struct swim { struct rlist queue_wait_ack; /** Generator of ack checking events. */ struct ev_periodic wait_ack_tick; + /** + * + * Dissemination component + */ + /** Queue of events sorted by occurrence time. */ + struct rlist queue_events; }; static inline uint64_t @@ -264,6 +291,7 @@ sockaddr_in_hash(const struct sockaddr_in *a) enum swim_component_type { SWIM_ANTI_ENTROPY = 0, SWIM_FAILURE_DETECTION, + SWIM_DISSEMINATION, }; /** {{{ Failure detection component */ @@ -431,6 +459,88 @@ swim_member_bin_create(struct swim_member_bin *header) /** }}} Anti-entropy component */ +/** {{{ Dissemination component */ + +/** SWIM dissemination MsgPack template. */ +struct PACKED swim_diss_header_bin { + /** mp_encode_uint(SWIM_DISSEMINATION) */ + uint8_t k_header; + /** mp_encode_array() */ + uint8_t m_header; + uint32_t v_header; +}; + +static inline void +swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size) +{ + header->k_header = SWIM_DISSEMINATION; + header->m_header = 0xdd; + header->v_header = mp_bswap_u32(batch_size); +} + +/** SWIM event MsgPack template. */ +struct PACKED swim_event_bin { + /** mp_encode_map(4) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ + uint8_t k_status; + /** mp_encode_uint(enum member_status) */ + uint8_t v_status; + + /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */ + uint8_t k_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_addr; + uint32_t v_addr; + + /** mp_encode_uint(SWIM_MEMBER_PORT) */ + uint8_t k_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_port; + uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +static inline void +swim_event_bin_create(struct swim_event_bin *header) +{ + header->m_header = 0x84; + header->k_status = SWIM_MEMBER_STATUS; + header->k_addr = SWIM_MEMBER_ADDRESS; + header->m_addr = 0xce; + header->k_port = SWIM_MEMBER_PORT; + header->m_port = 0xcd; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; +} + +static inline void +swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member) +{ + header->v_status = member->status; + header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); + header->v_port = mp_bswap_u16(member->addr.sin_port); + header->v_incarnation = mp_bswap_u64(member->incarnation); +} + +static inline void +swim_schedule_event(struct swim *swim, struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_events)) { + rlist_add_tail_entry(&swim->queue_events, member, + in_queue_events); + } + member->status_ttl = mh_size(swim->members); +} + +/** }}} Dissemination component */ + /** * SWIM message structure: * { @@ -441,6 +551,18 @@ swim_member_bin_create(struct swim_member_bin *header) * * OR/AND * + * SWIM_DISSEMINATION: [ + * { + * SWIM_MEMBER_STATUS: uint, enum member_status, + * SWIM_MEMBER_ADDRESS: uint, ip, + * SWIM_MEMBER_PORT: uint, port, + * SWIM_MEMBER_INCARNATION: uint + * }, + * ... + * ], + * + * OR/AND + * * SWIM_ANTI_ENTROPY: [ * { * SWIM_MEMBER_STATUS: uint, enum member_status, @@ -453,6 +575,16 @@ swim_member_bin_create(struct swim_member_bin *header) * } */ +/** + * Make all needed actions to process a member's update like a + * change of its status, or incarnation, or both. + */ +static void +swim_member_status_is_updated(struct swim *swim, struct swim_member *member) +{ + swim_schedule_event(swim, member); +} + /** * Update status and incarnation of the member if needed. Statuses * are compared as a compound key: {incarnation, status}. So @a @@ -468,14 +600,16 @@ swim_member_update_status(struct swim *swim, struct swim_member *member, enum swim_member_status new_status, uint64_t incarnation) { - (void) swim; assert(member != swim->self); if (member->incarnation == incarnation) { - if (member->status < new_status) + if (member->status < new_status) { member->status = new_status; + swim_member_status_is_updated(swim, member); + } } else if (member->incarnation < incarnation) { member->status = new_status; member->incarnation = incarnation; + swim_member_status_is_updated(swim, member); } } @@ -497,6 +631,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member) swim_task_destroy(&member->ack_task); swim_task_destroy(&member->ping_task); + /* Dissemination component. */ + assert(rlist_empty(&member->in_queue_events)); + free(member); } @@ -533,6 +670,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, swim_task_create(&member->ack_task, &swim->scheduler, swim_task_reset); swim_task_create(&member->ping_task, &swim->scheduler, swim_task_reset); + /* Dissemination component. */ + rlist_create(&member->in_queue_events); + swim_member_status_is_updated(swim, member); + return member; } @@ -654,6 +795,60 @@ swim_encode_failure_detection(struct swim *swim, struct swim_msg *msg, return 1; } +/** + * Encode a part of the dissemination component into a single SWIM + * packet. + * @retval -1 Error. + * @retval 0 Not error, but nothing is encoded. + * @retval 1 Something is encoded. + */ +static int +swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos) +{ + struct swim_diss_header_bin diss_header_bin; + int size = sizeof(diss_header_bin); + struct swim_packet *packet = swim_msg_reserve(msg, size); + if (packet == NULL) + return -1; + char *header = swim_packet_alloc(packet, size); + + int i = 0; + struct swim_member *member, *prev = NULL; + struct swim_event_bin event_bin; + swim_event_bin_create(&event_bin); + rlist_foreach_entry(member, *queue_pos, in_queue_events) { + char *pos = swim_packet_alloc(packet, sizeof(event_bin)); + if (pos == NULL) + break; + swim_event_bin_reset(&event_bin, member); + memcpy(pos, &event_bin, sizeof(event_bin)); + ++i; + prev = member; + } + if (i == 0) + return 0; + swim_diss_header_bin_create(&diss_header_bin, i); + memcpy(header, &diss_header_bin, sizeof(diss_header_bin)); + swim_packet_flush(packet); + return 1; +} + +/** + * Encode failure dissemination component. + * @retval -1 Error. + * @retval 1 Success, something is encoded. + */ +static int +swim_encode_dissemination(struct swim *swim, struct swim_msg *msg) +{ + struct rlist *pos; + rlist_foreach(pos, &swim->queue_events) { + if (swim_encode_dissemination_packet(msg, &pos) < 0) + return -1; + } + return ! rlist_empty(&swim->queue_events); +} + /** Encode SWIM components into a sequence of UDP packets. */ static int swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) @@ -670,6 +865,11 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) goto error; map_size += rc; + rc = swim_encode_dissemination(swim, msg); + if (rc < 0) + goto error; + map_size += rc; + rc = swim_encode_anti_entropy(swim, msg); if (rc < 0) goto error; @@ -685,6 +885,21 @@ error: /** Once per specified timeout trigger a next broadcast step. */ static void +swim_decrease_events_ttl(struct swim *swim) +{ + struct swim_member *member, *tmp; + rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events, + tmp) { + if (--member->status_ttl == 0) + rlist_del_entry(member, in_queue_events); + } +} + +/** + * Do one round step. Send encoded components to a next member + * from the queue. + */ +static void swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events) { assert((events & EV_PERIODIC) != 0); @@ -712,6 +927,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events) swim_task_schedule(&swim->round_step_task, swim->transport->send_round_msg, &m->addr); swim_member_schedule_ack_wait(swim, m); + swim_decrease_events_ttl(swim); ev_periodic_stop(loop, p); } @@ -779,12 +995,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) break; ++m->failed_pings; if (m->failed_pings >= NO_ACKS_TO_GC) { - if (!m->is_pinned) + if (!m->is_pinned && m->status_ttl == 0) swim_member_delete(swim, m); continue; } - if (m->failed_pings >= NO_ACKS_TO_DEAD) + if (m->failed_pings >= NO_ACKS_TO_DEAD) { m->status = MEMBER_DEAD; + swim_member_status_is_updated(swim, m); + } swim_schedule_ping(swim, m); rlist_del_entry(m, in_queue_wait_ack); } @@ -828,6 +1046,7 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def) def->incarnation); return; } + uint64_t old_incarnation = self->incarnation; /* * It is possible that other instances know a bigger * incarnation of this instance - such thing happens when @@ -846,6 +1065,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def) */ self->incarnation++; } + if (old_incarnation != self->incarnation) + swim_member_status_is_updated(swim, self); } static int @@ -1032,6 +1253,50 @@ swim_process_failure_detection(struct swim *swim, const char **pos, return 0; } +static int +swim_process_dissemination(struct swim *swim, const char **pos, const char *end) +{ + const char *msg_pref = "Invald SWIM dissemination message:"; + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) { + say_error("%s message should be an array", msg_pref); + return -1; + } + uint64_t size = mp_decode_array(pos); + for (uint64_t i = 0; i < size; ++i) { + if (mp_typeof(**pos) != MP_MAP || + mp_check_map(*pos, end) > 0) { + say_error("%s event should be map", msg_pref); + return -1; + } + uint64_t map_size = mp_decode_map(pos); + struct swim_member_def def; + swim_member_def_create(&def); + for (uint64_t j = 0; j < map_size; ++j) { + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s event key should be uint", + msg_pref); + return -1; + } + uint64_t key = mp_decode_uint(pos); + if (key >= swim_member_key_MAX) { + say_error("%s unknown event key", msg_pref); + return -1; + } + if (swim_process_member_key(key, pos, end, msg_pref, + &def) != 0) + return -1; + } + if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) { + say_error("%s member address should be specified", + msg_pref); + return -1; + } + swim_process_member_update(swim, &def); + } + return 0; +} + /** Receive and process a new message. */ static void swim_on_input(struct swim_scheduler *scheduler, @@ -1065,6 +1330,11 @@ swim_on_input(struct swim_scheduler *scheduler, src) != 0) return; break; + case SWIM_DISSEMINATION: + say_verbose("SWIM: process dissemination"); + if (swim_process_dissemination(swim, &pos, end) != 0) + return; + break; default: say_error("%s unknown component type component is "\ "supported", msg_pref); @@ -1142,6 +1412,10 @@ swim_new(void) ev_init(&swim->wait_ack_tick, swim_check_acks); ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL); swim->wait_ack_tick.data = (void *) swim; + + /* Dissemination events. */ + rlist_create(&swim->queue_events); + return swim; } @@ -1199,8 +1473,10 @@ swim_remove_member(struct swim *swim, const char *uri) if (uri_to_addr(uri, &addr) != 0) return -1; struct swim_member *member = swim_find_member(swim, &addr); - if (member != NULL) + if (member != NULL) { + rlist_del_entry(member, in_queue_events); swim_member_delete(swim, member); + } return 0; } @@ -1236,6 +1512,7 @@ swim_delete(struct swim *swim) while (node != mh_end(swim->members)) { struct swim_member *m = (struct swim_member *) mh_i64ptr_node(swim->members, node)->val; + rlist_del_entry(m, in_queue_events); swim_member_delete(swim, m); node = mh_first(swim->members); } -- 2.17.2 (Apple Git-113)