[PATCH v3 6/6] [RAW] swim: introduce payload
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Sat Dec 29 13:14:15 MSK 2018
Part of #3234
---
src/lib/swim/swim.c | 139 ++++++++++++++++++++++++++++++++++++++++----
src/lib/swim/swim.h | 4 ++
2 files changed, 131 insertions(+), 12 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 7dff22dd5..fa2ae0273 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -105,6 +105,8 @@ enum {
* of failed pings.
*/
NO_ACKS_TO_GC = NO_ACKS_TO_DEAD + 2,
+ /** Reserve 272 bytes for headers. */
+ MAX_PAYLOAD_SIZE = 1200,
};
/**
@@ -200,6 +202,16 @@ struct swim_member {
* learn its dead status.
*/
int status_ttl;
+ /** Arbitrary user data, disseminated on each change. */
+ char *payload;
+ /** Useless formal comment: payload size. */
+ int payload_size;
+ /**
+ * TTL of payload. At most this number of times payload is
+ * sent as a part of dissemination component. Reset on
+ * each update.
+ */
+ int payload_ttl;
/**
* Events are put into a queue sorted by event occurrence
* time.
@@ -392,6 +404,7 @@ enum swim_member_key {
SWIM_MEMBER_ADDRESS,
SWIM_MEMBER_PORT,
SWIM_MEMBER_INCARNATION,
+ SWIM_MEMBER_PAYLOAD,
swim_member_key_MAX,
};
@@ -415,7 +428,7 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
/** SWIM member MsgPack template. */
struct PACKED swim_member_bin {
- /** mp_encode_map(4) */
+ /** mp_encode_map(5) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -440,6 +453,13 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(64bit incarnation) */
uint8_t m_incarnation;
uint64_t v_incarnation;
+
+ /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+ uint8_t k_payload;
+ /** mp_encode_bin(16bit bin header) */
+ uint8_t m_payload_size;
+ uint16_t v_payload_size;
+ /** Payload data ... */
};
static inline void
@@ -450,12 +470,13 @@ swim_member_bin_reset(struct swim_member_bin *header,
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
header->v_incarnation = mp_bswap_u64(member->incarnation);
+ header->v_payload_size = mp_bswap_u16(member->payload_size);
}
static inline void
swim_member_bin_create(struct swim_member_bin *header)
{
- header->m_header = 0x84;
+ header->m_header = 0x85;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
@@ -463,6 +484,8 @@ swim_member_bin_create(struct swim_member_bin *header)
header->m_port = 0xcd;
header->k_incarnation = SWIM_MEMBER_INCARNATION;
header->m_incarnation = 0xcf;
+ header->k_payload = SWIM_MEMBER_PAYLOAD;
+ header->m_payload_size = 0xc5;
}
/** }}} Anti-entropy component */
@@ -488,7 +511,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header, int batch_size)
/** SWIM event MsgPack template. */
struct PACKED swim_event_bin {
- /** mp_encode_map(4) */
+ /** mp_encode_map(4 or 5) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -518,7 +541,6 @@ struct PACKED swim_event_bin {
static inline void
swim_event_bin_create(struct swim_event_bin *header)
{
- header->m_header = 0x84;
header->k_status = SWIM_MEMBER_STATUS;
header->k_addr = SWIM_MEMBER_ADDRESS;
header->m_addr = 0xce;
@@ -531,6 +553,7 @@ swim_event_bin_create(struct swim_event_bin *header)
static inline void
swim_event_bin_reset(struct swim_event_bin *header, struct swim_member *member)
{
+ header->m_header = 0x84 + member->payload_ttl > 0;
header->v_status = member->status;
header->v_addr = mp_bswap_u32(member->addr.sin_addr.s_addr);
header->v_port = mp_bswap_u16(member->addr.sin_port);
@@ -594,6 +617,14 @@ swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
cached_round_msg_invalidate(swim);
}
+static void
+swim_member_payload_is_updated(struct swim *swim, struct swim_member *member)
+{
+ swim_schedule_event(swim, member);
+ member->payload_ttl = mh_size(swim->members);
+ cached_round_msg_invalidate(swim);
+}
+
/**
* Update status and incarnation of the member if needed. Statuses
* are compared as a compound key: {incarnation, status}. So @a
@@ -622,6 +653,28 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
}
}
+static inline int
+swim_member_update_payload(struct swim *swim, struct swim_member *member,
+ uint64_t incarnation, const char *payload,
+ int payload_size)
+{
+ if (incarnation < member->incarnation)
+ return 0;
+ if (payload_size == member->payload_size &&
+ memcmp(payload, member->payload, payload_size) == 0)
+ return 0;
+ char *new_payload = (char *) realloc(member->payload, payload_size);
+ if (new_payload == NULL) {
+ diag_set(OutOfMemory, payload_size, "malloc", "new_payload");
+ return -1;
+ }
+ memcpy(new_payload, payload, payload_size);
+ member->payload = new_payload;
+ member->payload_size = payload_size;
+ swim_member_payload_is_updated(swim, member);
+ return 0;
+}
+
/**
* Remove the member from all queues, hashes, destroy it and free
* the memory.
@@ -653,7 +706,8 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
*/
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
- enum swim_member_status status, uint64_t incarnation)
+ enum swim_member_status status, uint64_t incarnation,
+ const char *payload, int payload_size)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -683,6 +737,12 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
/* Dissemination component. */
rlist_create(&member->in_queue_events);
swim_member_status_is_updated(swim, member);
+ if (swim_member_update_payload(swim, member, incarnation, payload,
+ payload_size) != 0) {
+ rlist_del_entry(member, in_queue_events);
+ swim_member_delete(swim, member);
+ return NULL;
+ }
return member;
}
@@ -769,12 +829,15 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_msg *msg)
swim_member_bin_create(&member_bin);
for (; i < (int) mh_size(swim->members); ++i) {
- char *pos = swim_packet_alloc(packet, sizeof(member_bin));
+ struct swim_member *member = swim->shuffled_members[i];
+ char *pos = swim_packet_alloc(packet, sizeof(member_bin) +
+ member->payload_size);
if (pos == NULL)
break;
- struct swim_member *member = swim->shuffled_members[i];
swim_member_bin_reset(&member_bin, member);
memcpy(pos, &member_bin, sizeof(member_bin));
+ pos += sizeof(member_bin);
+ memcpy(pos, member->payload, member->payload_size);
}
if (i == 0)
return 0;
@@ -828,11 +891,22 @@ swim_encode_dissemination_packet(struct swim_msg *msg, struct rlist **queue_pos)
struct swim_event_bin event_bin;
swim_event_bin_create(&event_bin);
rlist_foreach_entry(member, *queue_pos, in_queue_events) {
- char *pos = swim_packet_alloc(packet, sizeof(event_bin));
+ int size = sizeof(event_bin);
+ if (member->payload_ttl > 0) {
+ size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) +
+ mp_sizeof_bin(member->payload_size);
+ }
+ char *pos = swim_packet_alloc(packet, size);
if (pos == NULL)
break;
swim_event_bin_reset(&event_bin, member);
memcpy(pos, &event_bin, sizeof(event_bin));
+ pos += sizeof(event_bin);
+ if (member->payload_ttl > 0) {
+ pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD);
+ mp_encode_bin(pos, member->payload,
+ member->payload_size);
+ }
++i;
prev = member;
}
@@ -904,6 +978,10 @@ swim_decrease_events_ttl(struct swim *swim)
struct swim_member *member, *tmp;
rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
tmp) {
+ assert(member->status_ttl > 0);
+ assert(member->status_ttl >= member->payload_ttl);
+ if (member->payload_ttl > 0)
+ --member->payload_ttl;
if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
cached_round_msg_invalidate(swim);
@@ -1029,6 +1107,8 @@ struct swim_member_def {
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
+ const char *payload;
+ int payload_size;
};
static inline void
@@ -1048,7 +1128,8 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
*/
if (member == NULL) {
member = swim_member_new(swim, &def->addr, def->status,
- def->incarnation);
+ def->incarnation, def->payload,
+ def->payload_size);
if (member == NULL)
diag_log();
return;
@@ -1057,6 +1138,10 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
if (member != self) {
swim_member_update_status(swim, member, def->status,
def->incarnation);
+ if (swim_member_update_payload(swim, member, def->incarnation,
+ def->payload,
+ def->payload_size) != 0)
+ diag_log();
return;
}
uint64_t old_incarnation = self->incarnation;
@@ -1131,6 +1216,21 @@ swim_process_member_key(enum swim_member_key key, const char **pos,
}
def->incarnation = mp_decode_uint(pos);
break;
+ case SWIM_MEMBER_PAYLOAD:
+ if (mp_typeof(**pos) != MP_BIN ||\
+ mp_check_binl(*pos, end) > 0) {
+ say_error("%s member payload should be bin", msg_pref);
+ return -1;
+ }
+ uint32_t len;
+ def->payload = mp_decode_bin(pos, &len);
+ if (len > MAX_PAYLOAD_SIZE) {
+ say_error("%s member payload size should be <= %d",
+ msg_pref, MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ def->payload_size = (int) len;
+ break;
default:
unreachable();
}
@@ -1245,7 +1345,8 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
}
struct swim_member *sender = swim_find_member(swim, src);
if (sender == NULL) {
- sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation);
+ sender = swim_member_new(swim, src, MEMBER_ALIVE, incarnation,
+ NULL, 0);
if (sender == NULL) {
diag_log();
return 0;
@@ -1441,7 +1542,8 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
struct swim_member *new_self = NULL;
if (swim_find_member(swim, &addr) == NULL) {
- new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
+ new_self = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL,
+ 0);
if (new_self == NULL)
return -1;
}
@@ -1466,6 +1568,19 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return 0;
}
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size)
+{
+ if (payload_size > MAX_PAYLOAD_SIZE) {
+ diag_set(IllegalParams, "Payload should be <= %d",
+ MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ return swim_member_update_payload(swim, swim->self,
+ swim->self->incarnation, payload,
+ payload_size);
+}
+
int
swim_add_member(struct swim *swim, const char *uri)
{
@@ -1474,7 +1589,7 @@ swim_add_member(struct swim *swim, const char *uri)
return -1;
struct swim_member *member = swim_find_member(swim, &addr);
if (member == NULL) {
- member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0);
+ member = swim_member_new(swim, &addr, MEMBER_ALIVE, 0, NULL, 0);
if (member == NULL)
return -1;
member->is_pinned = true;
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 350fa0cee..a44fcb977 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -65,6 +65,10 @@ int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
const struct swim_transport *new_transport);
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size);
+
/**
* Stop listening and broadcasting messages, cleanup all internal
* structures, free memory.
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list