From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v3 6/6] [RAW] swim: introduce payload Date: Sat, 29 Dec 2018 13:14:15 +0300 Message-Id: <5dca8948367f2fd35e9bcd5eefcdf26254e7f49d.1546077015.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: Part of #3234 --- src/lib/swim/swim.c | 139 ++++++++++++++++++++++++++++++++++++++++---- src/lib/swim/swim.h | 4 ++ 2 files changed, 131 insertions(+), 12 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 7dff22dd5..fa2ae0273 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -105,6 +105,8 @@ enum { * of failed pings. */ NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2, + /** Reserve 272 bytes for headers. */ + MAX_PAYLOAD_SIZE = 1200, }; /** @@ -200,6 +202,16 @@ struct swim_member { * learn its dead status. */ int status_ttl; + /** Arbitrary user data, disseminated on each change. */ + char *payload; + /** Useless formal comment: payload size. */ + int payload_size; + /** + * TTL of payload. At most this number of times payload is + * sent as a part of dissemination component. Reset on + * each update. + */ + int payload_ttl; /** * Events are put into a queue sorted by event occurrence * time. @@ -392,6 +404,7 @@ enum swim_member_key { SWIM_MEMBER_ADDRESS, SWIM_MEMBER_PORT, SWIM_MEMBER_INCARNATION, + SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -415,7 +428,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, /** SWIM member MsgPack template. */ struct PACKED swim_member_bin { - /** mp_encode_map(4) */ + /** mp_encode_map(5) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -440,6 +453,13 @@ struct PACKED swim_member_bin { /** mp_encode_uint(64bit incarnation) */ uint8_t m_incarnation; uint64_t v_incarnation; + + /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */ + uint8_t k_payload; + /** mp_encode_bin(16bit bin header) */ + uint8_t m_payload_size; + uint16_t v_payload_size; + /** Payload data ... */ }; static inline void @@ -450,12 +470,13 @@ swim_member_bin_reset(struct swim_member_bin *header, header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); header->v_port = mp_bswap_u16(member->addr.sin_port); header->v_incarnation = mp_bswap_u64(member->incarnation); + header->v_payload_size = mp_bswap_u16(member->payload_size); } static inline void swim_member_bin_create(struct swim_member_bin *header) { - header->m_header = 0x84; + header->m_header = 0x85; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDRESS; header->m_addr = 0xce; @@ -463,6 +484,8 @@ swim_member_bin_create(struct swim_member_bin *header) header->m_port = 0xcd; header->k_incarnation = SWIM_MEMBER_INCARNATION; header->m_incarnation = 0xcf; + header->k_payload = SWIM_MEMBER_PAYLOAD; + header->m_payload_size = 0xc5; } /** }}} Anti-entropy component */ @@ -488,7 +511,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size) /** SWIM event MsgPack template. */ struct PACKED swim_event_bin { - /** mp_encode_map(4) */ + /** mp_encode_map(4 or 5) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -518,7 +541,6 @@ struct PACKED swim_event_bin { static inline void swim_event_bin_create(struct swim_event_bin *header) { - header->m_header = 0x84; header->k_status = SWIM_MEMBER_STATUS; header->k_addr = SWIM_MEMBER_ADDRESS; header->m_addr = 0xce; @@ -531,6 +553,7 @@ swim_event_bin_create(struct swim_event_bin *header) static inline void swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member) { + header->m_header = 0x84 + member->payload_ttl > 0; header->v_status = member->status; header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr); header->v_port = mp_bswap_u16(member->addr.sin_port); @@ -594,6 +617,14 @@ swim_member_status_is_updated(struct swim *swim, struct swim_member *member) cached_round_msg_invalidate(swim); } +static void +swim_member_payload_is_updated(struct swim *swim, struct swim_member *member) +{ + swim_schedule_event(swim, member); + member->payload_ttl = mh_size(swim->members); + cached_round_msg_invalidate(swim); +} + /** * Update status and incarnation of the member if needed. Statuses * are compared as a compound key: {incarnation, status}. So @a @@ -622,6 +653,28 @@ swim_member_update_status(struct swim *swim, struct swim_member *member, } } +static inline int +swim_member_update_payload(struct swim *swim, struct swim_member *member, + uint64_t incarnation, const char *payload, + int payload_size) +{ + if (incarnation < member->incarnation) + return 0; + if (payload_size == member->payload_size && + memcmp(payload, member->payload, payload_size) == 0) + return 0; + char *new_payload = (char *) realloc(member->payload, payload_size); + if (new_payload == NULL) { + diag_set(OutOfMemory, payload_size, "malloc", "new_payload"); + return -1; + } + memcpy(new_payload, payload, payload_size); + member->payload = new_payload; + member->payload_size = payload_size; + swim_member_payload_is_updated(swim, member); + return 0; +} + /** * Remove the member from all queues, hashes, destroy it and free * the memory. @@ -653,7 +706,8 @@ swim_member_delete(struct swim *swim, struct swim_member *member) */ static struct swim_member * swim_member_new(struct swim *swim, const struct sockaddr_in *addr, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, uint64_t incarnation, + const char *payload, int payload_size) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -683,6 +737,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, /* Dissemination component. */ rlist_create(&member->in_queue_events); swim_member_status_is_updated(swim, member); + if (swim_member_update_payload(swim, member, incarnation, payload, + payload_size) != 0) { + rlist_del_entry(member, in_queue_events); + swim_member_delete(swim, member); + return NULL; + } return member; } @@ -769,12 +829,15 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg) swim_member_bin_create(&member_bin); for (; i < (int) mh_size(swim->members); ++i) { - char *pos = swim_packet_alloc(packet, sizeof(member_bin)); + struct swim_member *member = swim->shuffled_members[i]; + char *pos = swim_packet_alloc(packet, sizeof(member_bin) + + member->payload_size); if (pos == NULL) break; - struct swim_member *member = swim->shuffled_members[i]; swim_member_bin_reset(&member_bin, member); memcpy(pos, &member_bin, sizeof(member_bin)); + pos += sizeof(member_bin); + memcpy(pos, member->payload, member->payload_size); } if (i == 0) return 0; @@ -828,11 +891,22 @@ swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos) struct swim_event_bin event_bin; swim_event_bin_create(&event_bin); rlist_foreach_entry(member, *queue_pos, in_queue_events) { - char *pos = swim_packet_alloc(packet, sizeof(event_bin)); + int size = sizeof(event_bin); + if (member->payload_ttl > 0) { + size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) + + mp_sizeof_bin(member->payload_size); + } + char *pos = swim_packet_alloc(packet, size); if (pos == NULL) break; swim_event_bin_reset(&event_bin, member); memcpy(pos, &event_bin, sizeof(event_bin)); + pos += sizeof(event_bin); + if (member->payload_ttl > 0) { + pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD); + mp_encode_bin(pos, member->payload, + member->payload_size); + } ++i; prev = member; } @@ -904,6 +978,10 @@ swim_decrease_events_ttl(struct swim *swim) struct swim_member *member, *tmp; rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events, tmp) { + assert(member->status_ttl > 0); + assert(member->status_ttl >= member->payload_ttl); + if (member->payload_ttl > 0) + --member->payload_ttl; if (--member->status_ttl == 0) { rlist_del_entry(member, in_queue_events); cached_round_msg_invalidate(swim); @@ -1029,6 +1107,8 @@ struct swim_member_def { struct sockaddr_in addr; uint64_t incarnation; enum swim_member_status status; + const char *payload; + int payload_size; }; static inline void @@ -1048,7 +1128,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def) */ if (member == NULL) { member = swim_member_new(swim, &def->addr, def->status, - def->incarnation); + def->incarnation, def->payload, + def->payload_size); if (member == NULL) diag_log(); return; @@ -1057,6 +1138,10 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def) if (member != self) { swim_member_update_status(swim, member, def->status, def->incarnation); + if (swim_member_update_payload(swim, member, def->incarnation, + def->payload, + def->payload_size) != 0) + diag_log(); return; } uint64_t old_incarnation = self->incarnation; @@ -1131,6 +1216,21 @@ swim_process_member_key(enum swim_member_key key, const char **pos, } def->incarnation = mp_decode_uint(pos); break; + case SWIM_MEMBER_PAYLOAD: + if (mp_typeof(**pos) != MP_BIN ||\ + mp_check_binl(*pos, end) > 0) { + say_error("%s member payload should be bin", msg_pref); + return -1; + } + uint32_t len; + def->payload = mp_decode_bin(pos, &len); + if (len > MAX_PAYLOAD_SIZE) { + say_error("%s member payload size should be <= %d", + msg_pref, MAX_PAYLOAD_SIZE); + return -1; + } + def->payload_size = (int) len; + break; default: unreachable(); } @@ -1245,7 +1345,8 @@ swim_process_failure_detection(struct swim *swim, const char **pos, } struct swim_member *sender = swim_find_member(swim, src); if (sender == NULL) { - sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation); + sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation, + NULL, 0); if (sender == NULL) { diag_log(); return 0; @@ -1441,7 +1542,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; struct swim_member *new_self = NULL; if (swim_find_member(swim, &addr) == NULL) { - new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0); + new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL, + 0); if (new_self == NULL) return -1; } @@ -1466,6 +1568,19 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return 0; } +int +swim_set_payload(struct swim *swim, const char *payload, int payload_size) +{ + if (payload_size > MAX_PAYLOAD_SIZE) { + diag_set(IllegalParams, "Payload should be <= %d", + MAX_PAYLOAD_SIZE); + return -1; + } + return swim_member_update_payload(swim, swim->self, + swim->self->incarnation, payload, + payload_size); +} + int swim_add_member(struct swim *swim, const char *uri) { @@ -1474,7 +1589,7 @@ swim_add_member(struct swim *swim, const char *uri) return -1; struct swim_member *member = swim_find_member(swim, &addr); if (member == NULL) { - member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0); + member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL, 0); if (member == NULL) return -1; member->is_pinned = true; diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 350fa0cee..a44fcb977 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -65,6 +65,10 @@ int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, const struct swim_transport *new_transport); +/** Set payload to disseminate over the cluster. */ +int +swim_set_payload(struct swim *swim, const char *payload, int payload_size); + /** * Stop listening and broadcasting messages, cleanup all internal * structures, free memory. -- 2.17.2 (Apple Git-113)