From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH 3/5] swim: introduce a dissemination component Date: Mon, 17 Dec 2018 15:53:21 +0300 Message-Id: <8c392a1f3796487465b29936483cee575ea339b0.1545047950.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com List-ID: Dissemination components broadcasts events about member status updates. Public API: swim.cfg({server = , members = , heartbeat = }) Configures the SWIM module. @server - URI of UDP server to which other cluster members will send SWIM messages. It should have the format "ip:port". @members - array of URIs explicitly defined by a user. These members are never deleted from members table until they are removed from the configuration explicitly. SWIM downloads from them their members tables, merges with its own and repeats. @heartbeat - how often send a part of members table to another member. Note, that it is not how ofter send the whole table, nor how ofter to send the table to all members. It is only one step of the protocol. swim.stop() Stops the SWIM module: shuts down the server, closes socket, destroys queues, frees memory. Note that after it swim.cfg can be called again. swim.info() Show info about each known member in the format: { ["ip:port"] = { status = , incarnation = } } Closes #3234 --- src/lib/swim/swim.c | 237 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 231 insertions(+), 6 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 6b2f1ca0c..bbf6b7fd5 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -48,7 +48,6 @@ * - indirect ping. * - increment own incarnation on each round. * - attach dst incarnation to ping. - * - fix swim_member_bin mp_encode_map(2) to 3 in the first patch. */ /** @@ -224,6 +223,26 @@ struct swim_member { struct swim_io_task ping_task; /** Position in a queue of members waiting for an ack. */ struct rlist in_queue_wait_ack; + /** + * + * Dissemination component + * + * Dissemination component sends events. Event is a + * notification about member status update. So formally, + * this structure already has all the needed attributes. + * But also an event somehow should be sent to all members + * at least once according to SWIM, so it requires + * something like TTL, which decrements on each send. And + * a member can not be removed from the global table until + * it gets dead and its dissemination TTL is 0, so as to + * allow other members learn its dead status. + */ + int dissemination_ttl; + /** + * Events are put into a queue sorted by event occurrence + * time. + */ + struct rlist in_queue_events; }; /** @@ -240,6 +259,7 @@ static struct swim_member *self = NULL; enum swim_component_type { SWIM_ANTI_ENTROPY = 0, SWIM_FAILURE_DETECTION, + SWIM_DISSEMINATION, }; /** {{{ Failure detection component */ @@ -438,6 +458,92 @@ static struct swim_io_task round_step_task = { /** }}} Anti-entropy component */ +/** {{{ Dissemination component */ + +/** SWIM dissemination MsgPack template. */ +struct PACKED swim_diss_header_bin { + /** mp_encode_uint(SWIM_DISSEMINATION) */ + uint8_t k_header; + /** mp_encode_array() */ + uint8_t m_header; + uint32_t v_header; +}; + +static inline void +swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size) +{ + header->k_header = SWIM_DISSEMINATION; + header->m_header = 0xdd; + header->v_header = mp_bswap_u32(batch_size); +} + +/** SWIM event MsgPack template. */ +struct PACKED swim_event_bin { + /** mp_encode_map(4) */ + uint8_t m_header; + + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ + uint8_t k_status; + /** mp_encode_uint(enum member_status) */ + uint8_t v_status; + + /** mp_encode_uint(SWIM_MEMBER_ADDR) */ + uint8_t k_addr; + /** mp_encode_uint(addr.sin_addr.s_addr) */ + uint8_t m_addr; + uint32_t v_addr; + + /** mp_encode_uint(SWIM_MEMBER_PORT) */ + uint8_t k_port; + /** mp_encode_uint(addr.sin_port) */ + uint8_t m_port; + uint16_t v_port; + + /** mp_encode_uint(SWIM_MEMBER_INCARNATION) */ + uint8_t k_incarnation; + /** mp_encode_uint(64bit incarnation) */ + uint8_t m_incarnation; + uint64_t v_incarnation; +}; + +static inline void +swim_event_bin_create(struct swim_event_bin *header) +{ + header->m_header = 0x84; + header->k_status = SWIM_MEMBER_STATUS; + header->k_addr = SWIM_MEMBER_ADDR; + header->m_addr = 0xce; + header->k_port = SWIM_MEMBER_PORT; + header->m_port = 0xcd; + header->k_incarnation = SWIM_MEMBER_INCARNATION; + header->m_incarnation = 0xcf; +} + +static inline void +swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member) +{ + header->v_status = member->status; + header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); + header->v_port = mp_bswap_u16(member->addr.sin_port); + header->v_incarnation = mp_bswap_u64(member->incarnation); +} + +/** Queue of events sorted by occurrence time. */ +static RLIST_HEAD(queue_events); +static int event_count = 0; + +static inline void +swim_schedule_event(struct swim_member *member) +{ + if (rlist_empty(&member->in_queue_events)) { + rlist_add_tail_entry(&queue_events, member, in_queue_events); + event_count++; + } + member->dissemination_ttl = mh_size(members); +} + +/** }}} Dissemination component */ + /** * SWIM message structure: * { @@ -448,6 +554,18 @@ static struct swim_io_task round_step_task = { * * OR/AND * + * SWIM_DISSEMINATION: [ + * { + * SWIM_MEMBER_STATUS: uint, enum member_status, + * SWIM_MEMBER_ADDR: uint, ip, + * SWIM_MEMBER_PORT: uint, port, + * SWIM_MEMBER_INCARNATION: uint + * }, + * ... + * ], + * + * OR/AND + * * SWIM_ANTI_ENTROPY: [ * { * SWIM_MEMBER_STATUS: uint, enum member_status, @@ -531,6 +649,16 @@ swim_send_ack(struct swim_io_task *task); static void swim_send_ping(struct swim_io_task *task); +/** + * Make all needed actions to process a member's update like a + * change of its status, or incarnation, or both. + */ +static void +swim_member_is_updated(struct swim_member *member) +{ + swim_schedule_event(member); +} + /** * Update status of the member if needed. Statuses are compared as * a compound key: {incarnation, status}. So @a new_status can @@ -548,11 +676,14 @@ swim_member_update_status(struct swim_member *member, { assert(member != self); if (member->incarnation == incarnation) { - if (member->status < new_status) + if (member->status < new_status) { member->status = new_status; + swim_member_is_updated(member); + } } else if (member->incarnation < incarnation) { member->status = new_status; member->incarnation = incarnation; + swim_member_is_updated(member); } } @@ -589,6 +720,8 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status, swim_io_task_create(&member->ping_task, swim_send_ping); rlist_add_entry(&queue_round, member, in_queue_round); rlist_create(&member->in_queue_wait_ack); + rlist_create(&member->in_queue_events); + swim_schedule_event(member); return member; } @@ -617,6 +750,7 @@ swim_member_delete(struct swim_member *member) swim_io_task_destroy(&member->ping_task); rlist_del_entry(member, in_queue_round); rlist_del_entry(member, in_queue_wait_ack); + assert(rlist_empty(&member->in_queue_events)); free(member); } @@ -696,19 +830,53 @@ swim_encode_round_msg(char *buffer, int size) assert((uint)size > sizeof(struct swim_fd_header_bin) + 1); size -= sizeof(struct swim_fd_header_bin) + 1; + int diss_batch_size = calculate_bin_batch_size( + sizeof(struct swim_diss_header_bin), + sizeof(struct swim_event_bin), size); + if (diss_batch_size > event_count) + diss_batch_size = event_count; + size -= sizeof(struct swim_diss_header_bin) - + diss_batch_size * sizeof(struct swim_event_bin); + int ae_batch_size = calculate_bin_batch_size( sizeof(struct swim_anti_entropy_header_bin), sizeof(struct swim_member_bin), size); if (ae_batch_size > shuffled_members_size) ae_batch_size = shuffled_members_size; - buffer = mp_encode_map(buffer, 2); + buffer = mp_encode_map(buffer, 1 + (diss_batch_size > 0) + + (ae_batch_size > 0)); struct swim_fd_header_bin fd_header_bin; swim_fd_header_bin_create(&fd_header_bin, SWIM_FD_MSG_PING); memcpy(buffer, &fd_header_bin, sizeof(fd_header_bin)); buffer += sizeof(fd_header_bin); + if (diss_batch_size > 0) { + struct swim_diss_header_bin diss_header_bin; + swim_diss_header_bin_create(&diss_header_bin, diss_batch_size); + memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin)); + buffer += sizeof(diss_header_bin); + + int i = 0; + struct swim_member *member, *tmp; + struct swim_event_bin event_bin; + swim_event_bin_create(&event_bin); + rlist_foreach_entry_safe(member, &queue_events, in_queue_events, + tmp) { + swim_event_bin_reset(&event_bin, member); + memcpy(buffer, &event_bin, sizeof(event_bin)); + buffer += sizeof(event_bin); + rlist_del_entry(member, in_queue_events); + --member->dissemination_ttl; + if (++i >= diss_batch_size) + break; + } + event_count -= diss_batch_size; + } + + if (ae_batch_size == 0) + return buffer - start; struct swim_anti_entropy_header_bin ae_header_bin; swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size); memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin)); @@ -840,12 +1008,14 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events) break; ++m->failed_pings; if (m->failed_pings >= NO_ACKS_TO_GC) { - if (!m->is_pinned) + if (!m->is_pinned && m->dissemination_ttl == 0) swim_member_delete(m); continue; } - if (m->failed_pings >= NO_ACKS_TO_DEAD) + if (m->failed_pings >= NO_ACKS_TO_DEAD) { m->status = MEMBER_DEAD; + swim_member_is_updated(m); + } swim_io_task_push(&m->ping_task); rlist_del_entry(m, in_queue_wait_ack); } @@ -1094,6 +1264,50 @@ swim_process_failure_detection(const char **pos, const char *end, return 0; } +static int +swim_process_dissemination(const char **pos, const char *end) +{ + const char *msg_pref = "Invald SWIM dissemination message:"; + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) { + say_error("%s message should be an array", msg_pref); + return -1; + } + uint64_t size = mp_decode_array(pos); + for (uint64_t i = 0; i < size; ++i) { + if (mp_typeof(**pos) != MP_MAP || + mp_check_map(*pos, end) > 0) { + say_error("%s event should be map", msg_pref); + return -1; + } + uint64_t map_size = mp_decode_map(pos); + struct swim_member_def def; + swim_member_def_create(&def); + for (uint64_t j = 0; j < map_size; ++j) { + if (mp_typeof(**pos) != MP_UINT || + mp_check_uint(*pos, end) > 0) { + say_error("%s event key should be uint", + msg_pref); + return -1; + } + uint64_t key = mp_decode_uint(pos); + if (key >= swim_member_key_MAX) { + say_error("%s unknown event key", msg_pref); + return -1; + } + if (swim_process_member_key(key, pos, end, msg_pref, + &def) != 0) + return -1; + } + if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) { + say_error("%s member address should be specified", + msg_pref); + return -1; + } + swim_process_member_update(&def); + } + return 0; +} + /** Receive and process a new message. */ static void swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) @@ -1140,6 +1354,11 @@ swim_on_input(struct ev_loop *loop, struct ev_io *io, int events) &addr) != 0) return; break; + case SWIM_DISSEMINATION: + say_verbose("SWIM: process dissemination"); + if (swim_process_dissemination(&pos, end) != 0) + return; + break; default: say_error("%s unknown component type component is "\ "supported", msg_pref); @@ -1299,6 +1518,7 @@ swim_cfg(const char **member_uris, int member_uri_count, const char *server_uri, error: for (int i = 0; i < new_cfg_size; ++i) { if (new_cfg[i]->status == new_status) { + rlist_del_entry(new_cfg[i], in_queue_events); swim_member_delete(new_cfg[i]); if (new_self == new_cfg[i]) new_self = NULL; @@ -1306,8 +1526,10 @@ error: } if (member_uri_count > 0) free(new_cfg); - if (new_self != NULL && new_self->status == new_status) + if (new_self != NULL && new_self->status == new_status) { + rlist_del_entry(new_self, in_queue_events); swim_member_delete(new_self); + } return -1; } @@ -1346,6 +1568,7 @@ swim_stop(void) while (node != mh_end(members)) { struct swim_member *m = (struct swim_member *) mh_i64ptr_node(members, node)->val; + rlist_del_entry(m, in_queue_events); swim_member_delete(m); node = mh_first(members); } @@ -1358,9 +1581,11 @@ swim_stop(void) cfg_size = 0; shuffled_members = NULL; shuffled_members_size = 0; + event_count = 0; rlist_create(&queue_wait_ack); rlist_create(&queue_output); rlist_create(&queue_round); + rlist_create(&queue_events); } #ifndef NDEBUG -- 2.17.2 (Apple Git-113)