From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v4 08/12] [RAW] swim: introduce payload Date: Thu, 31 Jan 2019 00:28:37 +0300 Message-Id: <2978d7e5238a77fed54c4c61d3a892c586747c35.1548883137.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org, vdavydov.dev@gmail.com List-ID: Payload is an arbitrary user data, disseminated just like other member attributes. Part of #3234 --- src/lib/swim/swim.c | 102 +++++++++++++++++++++++++++++++++++--- src/lib/swim/swim.h | 4 ++ src/lib/swim/swim_proto.c | 50 +++++++++++++++---- src/lib/swim/swim_proto.h | 27 ++++++++-- 4 files changed, 163 insertions(+), 20 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 40faa296e..78dbc6092 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -259,6 +259,16 @@ struct swim_member { * learn its dead status. */ int status_ttl; + /** Arbitrary user data, disseminated on each change. */ + char *payload; + /** Payload size, in bytes. */ + int payload_size; + /** + * TTL of payload. At most this number of times payload is + * sent as a part of dissemination component. Reset on + * each update. + */ + int payload_ttl; /** * Events are put into a queue sorted by event occurrence * time. @@ -415,6 +425,14 @@ swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim) swim_schedule_event(swim, member); } +/** Make all needed actions to process member's payload update. */ +static void +swim_member_payload_is_updated(struct swim_member *member, struct swim *swim) +{ + member->payload_ttl = mh_size(swim->members); + swim_schedule_event(swim, member); +} + /** * Update status and incarnation of the member if needed. Statuses * are compared as a compound key: {incarnation, status}. So @a @@ -458,6 +476,31 @@ swim_by_scheduler(struct swim_scheduler *scheduler) return container_of(scheduler, struct swim, scheduler); } +/** + * Update members payload if necessary. If a payload is the same - + * nothing happens. Fortunately, memcmp here is not expensive, + * because 1) payload change is extra rare event usually, + * 2) max payload size is very limited. + */ +static inline int +swim_member_update_payload(struct swim_member *member, const char *payload, + int payload_size, struct swim *swim) +{ + if (payload_size == member->payload_size && + memcmp(payload, member->payload, payload_size) == 0) + return 0; + char *new_payload = (char *) realloc(member->payload, payload_size); + if (new_payload == NULL) { + diag_set(OutOfMemory, payload_size, "realloc", "new_payload"); + return -1; + } + memcpy(new_payload, payload, payload_size); + member->payload = new_payload; + member->payload_size = payload_size; + swim_member_payload_is_updated(member, swim); + return 0; +} + /** * Remove the member from all queues, hashes, destroy it and free * the memory. @@ -480,6 +523,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member) /* Dissemination component. */ rlist_del_entry(member, in_queue_events); + free(member->payload); free(member); } @@ -522,7 +566,7 @@ swim_ping_task_complete(struct swim_task *task, static struct swim_member * swim_member_new(struct swim *swim, const struct sockaddr_in *addr, const struct tt_uuid *uuid, enum swim_member_status status, - uint64_t incarnation) + uint64_t incarnation, const char *payload, int payload_size) { struct swim_member *member = (struct swim_member *) calloc(1, sizeof(*member)); @@ -553,6 +597,11 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr, /* Dissemination component. */ rlist_create(&member->in_queue_events); swim_member_status_is_updated(member, swim); + if (swim_member_update_payload(member, payload, payload_size, + swim) != 0) { + swim_member_delete(swim, member); + return NULL; + } say_verbose("SWIM: member %s is added", swim_uuid_str(uuid)); return member; @@ -632,14 +681,17 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet) for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t); i < member_count; ++i) { struct swim_member *m = *mh_swim_table_node(t, rc); - int new_size = size + sizeof(member_bin); + int new_size = size + sizeof(member_bin) + m->payload_size; char *pos = swim_packet_reserve(packet, new_size); if (pos == NULL) break; size = new_size; swim_member_bin_fill(&member_bin, &m->addr, &m->uuid, - m->status, m->incarnation); + m->status, m->incarnation, + m->payload_size); memcpy(pos, &member_bin, sizeof(member_bin)); + pos += sizeof(member_bin); + memcpy(pos, m->payload, m->payload_size); /* * First random member could be choosen too close * to the hash end. Here the cycle is wrapped, if @@ -718,17 +770,27 @@ swim_encode_dissemination(struct swim *swim, struct swim_packet *packet) int new_size = size + sizeof(event_bin); if (m->old_uuid_ttl > 0) new_size += sizeof(old_uuid_bin); + if (m->payload_ttl > 0) { + new_size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) + + mp_sizeof_bin(m->payload_size); + } char *pos = swim_packet_reserve(packet, new_size); if (pos == NULL) break; size = new_size; swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid, - m->incarnation, m->old_uuid_ttl); + m->incarnation, m->old_uuid_ttl, + m->payload_ttl); memcpy(pos, &event_bin, sizeof(event_bin)); + pos += sizeof(event_bin); if (m->old_uuid_ttl > 0) { - pos += sizeof(event_bin); swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid); memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin)); + pos += sizeof(old_uuid_bin); + } + if (m->payload_ttl > 0) { + pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD); + mp_encode_bin(pos, m->payload, m->payload_size); } ++i; } @@ -778,6 +840,8 @@ swim_decrease_events_ttl(struct swim *swim) tmp) { if (member->old_uuid_ttl > 0) --member->old_uuid_ttl; + if (member->payload_ttl > 0) + --member->payload_ttl; if (--member->status_ttl == 0) { rlist_del_entry(member, in_queue_events); cached_round_msg_invalidate(swim); @@ -989,7 +1053,9 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def) } if (old_member == NULL) { member = swim_member_new(swim, &def->addr, &def->uuid, - def->status, def->incarnation); + def->status, def->incarnation, + def->payload, + def->payload_size); } else if (swim_member_update_uuid(old_member, &def->uuid, swim) == 0) { member = old_member; @@ -1002,6 +1068,13 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def) swim_member_update_addr(member, &def->addr, swim); swim_member_update_status(member, def->status, def->incarnation, swim); + if (def->is_payload_specified && + swim_member_update_payload(member, def->payload, + def->payload_size, + swim) != 0) { + /* Not such a critical error. */ + diag_log(); + } if (old_member != NULL) { assert(member != old_member); swim_member_delete(swim, old_member); @@ -1256,7 +1329,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, return -1; } swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, - 0); + 0, NULL, 0); if (swim->self == NULL) return -1; } else if (uuid == NULL || tt_uuid_is_nil(uuid)) { @@ -1323,6 +1396,18 @@ swim_check_is_configured(const struct swim *swim, const char *msg_pref) return -1; } +int +swim_set_payload(struct swim *swim, const char *payload, int payload_size) +{ + if (payload_size > MAX_PAYLOAD_SIZE) { + diag_set(IllegalParams, "Payload should be <= %d", + MAX_PAYLOAD_SIZE); + return -1; + } + return swim_member_update_payload(swim->self, payload, payload_size, + swim); +} + int swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) { @@ -1334,7 +1419,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid) return -1; struct swim_member *member = swim_find_member(swim, uuid); if (member == NULL) { - member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0); + member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0, + NULL, 0); return member == NULL ? -1 : 0; } diag_set(SwimError, "%s a member with such UUID already exists", diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h index 9d21a739d..dced172c0 100644 --- a/src/lib/swim/swim.h +++ b/src/lib/swim/swim.h @@ -67,6 +67,10 @@ int swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, double ack_timeout, const struct tt_uuid *uuid); +/** Set payload to disseminate over the cluster. */ +int +swim_set_payload(struct swim *swim, const char *payload, int payload_size); + /** * Stop listening and broadcasting messages, cleanup all internal * structures, free memory. diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c index e31c67682..284d35695 100644 --- a/src/lib/swim/swim_proto.c +++ b/src/lib/swim/swim_proto.c @@ -115,21 +115,36 @@ swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end, return 0; } -int -swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, - const char *msg_pref, const char *param_name) +static inline int +swim_decode_bin(const char **bin, uint32_t *size, const char **pos, + const char *end, const char *msg_pref, const char *param_name) { if (mp_typeof(**pos) != MP_BIN || mp_check_binl(*pos, end) > 0) { diag_set(SwimError, "%s %s should be bin", msg_pref, param_name); return -1; } - if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) { + *bin = mp_decode_bin(pos, size); + if (*pos > end) { + diag_set(SwimError, "%s %s is invalid", msg_pref, param_name); + return -1; + } + return 0; +} + +int +swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end, + const char *msg_pref, const char *param_name) +{ + uint32_t size; + const char *bin; + if (swim_decode_bin(&bin, &size, pos, end, msg_pref, param_name) != 0) + return -1; + if (size != UUID_LEN) { diag_set(SwimError, "%s %s is invalid", msg_pref, param_name); return -1; } - memcpy(uuid, *pos, UUID_LEN); - *pos += UUID_LEN; + memcpy(uuid, bin, UUID_LEN); return 0; } @@ -157,6 +172,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, struct swim_member_def *def) { uint64_t tmp; + uint32_t len; switch (key) { case SWIM_MEMBER_STATUS: if (swim_decode_uint(pos, end, &tmp, msg_pref, @@ -194,6 +210,18 @@ swim_decode_member_key(enum swim_member_key key, const char **pos, "member old uuid") != 0) return -1; break; + case SWIM_MEMBER_PAYLOAD: + if (swim_decode_bin(&def->payload, &len, pos, end, msg_pref, + "member payload") != 0) + return -1; + if (len > MAX_PAYLOAD_SIZE) { + diag_set(SwimError, "%s member payload size should be "\ + "<= %d", msg_pref, MAX_PAYLOAD_SIZE); + return -1; + } + def->payload_size = (int) len; + def->is_payload_specified = true; + break; default: unreachable(); } @@ -317,13 +345,15 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header, void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation) + enum swim_member_status status, uint64_t incarnation, + uint16_t payload_size) { header->v_status = status; header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr); header->v_port = mp_bswap_u16(addr->sin_port); memcpy(header->v_uuid, uuid, UUID_LEN); header->v_incarnation = mp_bswap_u64(incarnation); + header->v_payload_size = mp_bswap_u16(payload_size); } void @@ -340,6 +370,8 @@ swim_member_bin_create(struct swim_member_bin *header) header->m_uuid_len = UUID_LEN; header->k_incarnation = SWIM_MEMBER_INCARNATION; header->m_incarnation = 0xcf; + header->k_payload = SWIM_MEMBER_PAYLOAD; + header->m_payload_size = 0xc5; } void @@ -370,9 +402,9 @@ void swim_event_bin_fill(struct swim_event_bin *header, enum swim_member_status status, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - uint64_t incarnation, int old_uuid_ttl) + uint64_t incarnation, int old_uuid_ttl, int payload_ttl) { - header->m_header = 0x85 + (old_uuid_ttl > 0); + header->m_header = 0x85 + (old_uuid_ttl > 0) + (payload_ttl > 0); header->v_status = status; header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr); header->v_port = mp_bswap_u16(addr->sin_port); diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h index a3dc1164e..353605c35 100644 --- a/src/lib/swim/swim_proto.h +++ b/src/lib/swim/swim_proto.h @@ -35,6 +35,11 @@ #include #include +enum { + /** Reserve 272 bytes for headers. */ + MAX_PAYLOAD_SIZE = 1200, +}; + /** * SWIM binary protocol structures and helpers. Below is a picture * of a SWIM message template: @@ -108,6 +113,13 @@ struct swim_member_def { struct sockaddr_in addr; uint64_t incarnation; enum swim_member_status status; + const char *payload; + int payload_size; + /** + * Zero payload size does not mean that payload is not + * specified. It can be just empty. + */ + bool is_payload_specified; }; /** Initialize the definition with default values. */ @@ -247,6 +259,7 @@ enum swim_member_key { SWIM_MEMBER_UUID, SWIM_MEMBER_INCARNATION, SWIM_MEMBER_OLD_UUID, + SWIM_MEMBER_PAYLOAD, swim_member_key_MAX, }; @@ -301,6 +314,13 @@ struct PACKED swim_member_bin { /** mp_encode_uint(64bit incarnation) */ uint8_t m_incarnation; uint64_t v_incarnation; + + /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */ + uint8_t k_payload; + /** mp_encode_bin(16bit bin header) */ + uint8_t m_payload_size; + uint16_t v_payload_size; + /** Payload data ... */ }; /** Initialize antri-entropy record. */ @@ -316,7 +336,8 @@ swim_member_bin_create(struct swim_member_bin *header); void swim_member_bin_fill(struct swim_member_bin *header, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - enum swim_member_status status, uint64_t incarnation); + enum swim_member_status status, uint64_t incarnation, + uint16_t payload_size); /** }}} Anti-entropy component */ @@ -338,7 +359,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, /** SWIM event MessagePack template. */ struct PACKED swim_event_bin { - /** mp_encode_map(5 or 6) */ + /** mp_encode_map(5, or 6, or 7) */ uint8_t m_header; /** mp_encode_uint(SWIM_MEMBER_STATUS) */ @@ -386,7 +407,7 @@ void swim_event_bin_fill(struct swim_event_bin *header, enum swim_member_status status, const struct sockaddr_in *addr, const struct tt_uuid *uuid, - uint64_t incarnation, int old_uuid_ttl); + uint64_t incarnation, int old_uuid_ttl, int payload_ttl); /** Optional attribute of an event - old UUID of a member. */ struct swim_old_uuid_bin { -- 2.17.2 (Apple Git-113)