[PATCH v4 08/12] [RAW] swim: introduce payload
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Jan 31 00:28:37 MSK 2019
Payload is an arbitrary user data, disseminated just like other
member attributes.
Part of #3234
---
src/lib/swim/swim.c | 102 +++++++++++++++++++++++++++++++++++---
src/lib/swim/swim.h | 4 ++
src/lib/swim/swim_proto.c | 50 +++++++++++++++----
src/lib/swim/swim_proto.h | 27 ++++++++--
4 files changed, 163 insertions(+), 20 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 40faa296e..78dbc6092 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -259,6 +259,16 @@ struct swim_member {
* learn its dead status.
*/
int status_ttl;
+ /** Arbitrary user data, disseminated on each change. */
+ char *payload;
+ /** Payload size, in bytes. */
+ int payload_size;
+ /**
+ * TTL of payload. At most this number of times payload is
+ * sent as a part of dissemination component. Reset on
+ * each update.
+ */
+ int payload_ttl;
/**
* Events are put into a queue sorted by event occurrence
* time.
@@ -415,6 +425,14 @@ swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim)
swim_schedule_event(swim, member);
}
+/** Make all needed actions to process member's payload update. */
+static void
+swim_member_payload_is_updated(struct swim_member *member, struct swim *swim)
+{
+ member->payload_ttl = mh_size(swim->members);
+ swim_schedule_event(swim, member);
+}
+
/**
* Update status and incarnation of the member if needed. Statuses
* are compared as a compound key: {incarnation, status}. So @a
@@ -458,6 +476,31 @@ swim_by_scheduler(struct swim_scheduler *scheduler)
return container_of(scheduler, struct swim, scheduler);
}
+/**
+ * Update members payload if necessary. If a payload is the same -
+ * nothing happens. Fortunately, memcmp here is not expensive,
+ * because 1) payload change is extra rare event usually,
+ * 2) max payload size is very limited.
+ */
+static inline int
+swim_member_update_payload(struct swim_member *member, const char *payload,
+ int payload_size, struct swim *swim)
+{
+ if (payload_size == member->payload_size &&
+ memcmp(payload, member->payload, payload_size) == 0)
+ return 0;
+ char *new_payload = (char *) realloc(member->payload, payload_size);
+ if (new_payload == NULL) {
+ diag_set(OutOfMemory, payload_size, "realloc", "new_payload");
+ return -1;
+ }
+ memcpy(new_payload, payload, payload_size);
+ member->payload = new_payload;
+ member->payload_size = payload_size;
+ swim_member_payload_is_updated(member, swim);
+ return 0;
+}
+
/**
* Remove the member from all queues, hashes, destroy it and free
* the memory.
@@ -480,6 +523,7 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
/* Dissemination component. */
rlist_del_entry(member, in_queue_events);
+ free(member->payload);
free(member);
}
@@ -522,7 +566,7 @@ swim_ping_task_complete(struct swim_task *task,
static struct swim_member *
swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
const struct tt_uuid *uuid, enum swim_member_status status,
- uint64_t incarnation)
+ uint64_t incarnation, const char *payload, int payload_size)
{
struct swim_member *member =
(struct swim_member *) calloc(1, sizeof(*member));
@@ -553,6 +597,11 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
/* Dissemination component. */
rlist_create(&member->in_queue_events);
swim_member_status_is_updated(member, swim);
+ if (swim_member_update_payload(member, payload, payload_size,
+ swim) != 0) {
+ swim_member_delete(swim, member);
+ return NULL;
+ }
say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
return member;
@@ -632,14 +681,17 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
i < member_count; ++i) {
struct swim_member *m = *mh_swim_table_node(t, rc);
- int new_size = size + sizeof(member_bin);
+ int new_size = size + sizeof(member_bin) + m->payload_size;
char *pos = swim_packet_reserve(packet, new_size);
if (pos == NULL)
break;
size = new_size;
swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
- m->status, m->incarnation);
+ m->status, m->incarnation,
+ m->payload_size);
memcpy(pos, &member_bin, sizeof(member_bin));
+ pos += sizeof(member_bin);
+ memcpy(pos, m->payload, m->payload_size);
/*
* First random member could be choosen too close
* to the hash end. Here the cycle is wrapped, if
@@ -718,17 +770,27 @@ swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
int new_size = size + sizeof(event_bin);
if (m->old_uuid_ttl > 0)
new_size += sizeof(old_uuid_bin);
+ if (m->payload_ttl > 0) {
+ new_size += mp_sizeof_uint(SWIM_MEMBER_PAYLOAD) +
+ mp_sizeof_bin(m->payload_size);
+ }
char *pos = swim_packet_reserve(packet, new_size);
if (pos == NULL)
break;
size = new_size;
swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
- m->incarnation, m->old_uuid_ttl);
+ m->incarnation, m->old_uuid_ttl,
+ m->payload_ttl);
memcpy(pos, &event_bin, sizeof(event_bin));
+ pos += sizeof(event_bin);
if (m->old_uuid_ttl > 0) {
- pos += sizeof(event_bin);
swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid);
memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin));
+ pos += sizeof(old_uuid_bin);
+ }
+ if (m->payload_ttl > 0) {
+ pos = mp_encode_uint(pos, SWIM_MEMBER_PAYLOAD);
+ mp_encode_bin(pos, m->payload, m->payload_size);
}
++i;
}
@@ -778,6 +840,8 @@ swim_decrease_events_ttl(struct swim *swim)
tmp) {
if (member->old_uuid_ttl > 0)
--member->old_uuid_ttl;
+ if (member->payload_ttl > 0)
+ --member->payload_ttl;
if (--member->status_ttl == 0) {
rlist_del_entry(member, in_queue_events);
cached_round_msg_invalidate(swim);
@@ -989,7 +1053,9 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
}
if (old_member == NULL) {
member = swim_member_new(swim, &def->addr, &def->uuid,
- def->status, def->incarnation);
+ def->status, def->incarnation,
+ def->payload,
+ def->payload_size);
} else if (swim_member_update_uuid(old_member, &def->uuid,
swim) == 0) {
member = old_member;
@@ -1002,6 +1068,13 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
swim_member_update_addr(member, &def->addr, swim);
swim_member_update_status(member, def->status,
def->incarnation, swim);
+ if (def->is_payload_specified &&
+ swim_member_update_payload(member, def->payload,
+ def->payload_size,
+ swim) != 0) {
+ /* Not such a critical error. */
+ diag_log();
+ }
if (old_member != NULL) {
assert(member != old_member);
swim_member_delete(swim, old_member);
@@ -1256,7 +1329,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
return -1;
}
swim->self = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE,
- 0);
+ 0, NULL, 0);
if (swim->self == NULL)
return -1;
} else if (uuid == NULL || tt_uuid_is_nil(uuid)) {
@@ -1323,6 +1396,18 @@ swim_check_is_configured(const struct swim *swim, const char *msg_pref)
return -1;
}
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size)
+{
+ if (payload_size > MAX_PAYLOAD_SIZE) {
+ diag_set(IllegalParams, "Payload should be <= %d",
+ MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ return swim_member_update_payload(swim->self, payload, payload_size,
+ swim);
+}
+
int
swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
{
@@ -1334,7 +1419,8 @@ swim_add_member(struct swim *swim, const char *uri, const struct tt_uuid *uuid)
return -1;
struct swim_member *member = swim_find_member(swim, uuid);
if (member == NULL) {
- member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0);
+ member = swim_member_new(swim, &addr, uuid, MEMBER_ALIVE, 0,
+ NULL, 0);
return member == NULL ? -1 : 0;
}
diag_set(SwimError, "%s a member with such UUID already exists",
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index 9d21a739d..dced172c0 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -67,6 +67,10 @@ int
swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
double ack_timeout, const struct tt_uuid *uuid);
+/** Set payload to disseminate over the cluster. */
+int
+swim_set_payload(struct swim *swim, const char *payload, int payload_size);
+
/**
* Stop listening and broadcasting messages, cleanup all internal
* structures, free memory.
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index e31c67682..284d35695 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -115,21 +115,36 @@ swim_decode_port(struct sockaddr_in *address, const char **pos, const char *end,
return 0;
}
-int
-swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
- const char *msg_pref, const char *param_name)
+static inline int
+swim_decode_bin(const char **bin, uint32_t *size, const char **pos,
+ const char *end, const char *msg_pref, const char *param_name)
{
if (mp_typeof(**pos) != MP_BIN || mp_check_binl(*pos, end) > 0) {
diag_set(SwimError, "%s %s should be bin", msg_pref,
param_name);
return -1;
}
- if (mp_decode_binl(pos) != UUID_LEN || *pos + UUID_LEN > end) {
+ *bin = mp_decode_bin(pos, size);
+ if (*pos > end) {
+ diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
+ return -1;
+ }
+ return 0;
+}
+
+int
+swim_decode_uuid(struct tt_uuid *uuid, const char **pos, const char *end,
+ const char *msg_pref, const char *param_name)
+{
+ uint32_t size;
+ const char *bin;
+ if (swim_decode_bin(&bin, &size, pos, end, msg_pref, param_name) != 0)
+ return -1;
+ if (size != UUID_LEN) {
diag_set(SwimError, "%s %s is invalid", msg_pref, param_name);
return -1;
}
- memcpy(uuid, *pos, UUID_LEN);
- *pos += UUID_LEN;
+ memcpy(uuid, bin, UUID_LEN);
return 0;
}
@@ -157,6 +172,7 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
struct swim_member_def *def)
{
uint64_t tmp;
+ uint32_t len;
switch (key) {
case SWIM_MEMBER_STATUS:
if (swim_decode_uint(pos, end, &tmp, msg_pref,
@@ -194,6 +210,18 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
"member old uuid") != 0)
return -1;
break;
+ case SWIM_MEMBER_PAYLOAD:
+ if (swim_decode_bin(&def->payload, &len, pos, end, msg_pref,
+ "member payload") != 0)
+ return -1;
+ if (len > MAX_PAYLOAD_SIZE) {
+ diag_set(SwimError, "%s member payload size should be "\
+ "<= %d", msg_pref, MAX_PAYLOAD_SIZE);
+ return -1;
+ }
+ def->payload_size = (int) len;
+ def->is_payload_specified = true;
+ break;
default:
unreachable();
}
@@ -317,13 +345,15 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation)
+ enum swim_member_status status, uint64_t incarnation,
+ uint16_t payload_size)
{
header->v_status = status;
header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
header->v_port = mp_bswap_u16(addr->sin_port);
memcpy(header->v_uuid, uuid, UUID_LEN);
header->v_incarnation = mp_bswap_u64(incarnation);
+ header->v_payload_size = mp_bswap_u16(payload_size);
}
void
@@ -340,6 +370,8 @@ swim_member_bin_create(struct swim_member_bin *header)
header->m_uuid_len = UUID_LEN;
header->k_incarnation = SWIM_MEMBER_INCARNATION;
header->m_incarnation = 0xcf;
+ header->k_payload = SWIM_MEMBER_PAYLOAD;
+ header->m_payload_size = 0xc5;
}
void
@@ -370,9 +402,9 @@ void
swim_event_bin_fill(struct swim_event_bin *header,
enum swim_member_status status,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- uint64_t incarnation, int old_uuid_ttl)
+ uint64_t incarnation, int old_uuid_ttl, int payload_ttl)
{
- header->m_header = 0x85 + (old_uuid_ttl > 0);
+ header->m_header = 0x85 + (old_uuid_ttl > 0) + (payload_ttl > 0);
header->v_status = status;
header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
header->v_port = mp_bswap_u16(addr->sin_port);
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index a3dc1164e..353605c35 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -35,6 +35,11 @@
#include <arpa/inet.h>
#include <stdbool.h>
+enum {
+ /** Reserve 272 bytes for headers. */
+ MAX_PAYLOAD_SIZE = 1200,
+};
+
/**
* SWIM binary protocol structures and helpers. Below is a picture
* of a SWIM message template:
@@ -108,6 +113,13 @@ struct swim_member_def {
struct sockaddr_in addr;
uint64_t incarnation;
enum swim_member_status status;
+ const char *payload;
+ int payload_size;
+ /**
+ * Zero payload size does not mean that payload is not
+ * specified. It can be just empty.
+ */
+ bool is_payload_specified;
};
/** Initialize the definition with default values. */
@@ -247,6 +259,7 @@ enum swim_member_key {
SWIM_MEMBER_UUID,
SWIM_MEMBER_INCARNATION,
SWIM_MEMBER_OLD_UUID,
+ SWIM_MEMBER_PAYLOAD,
swim_member_key_MAX,
};
@@ -301,6 +314,13 @@ struct PACKED swim_member_bin {
/** mp_encode_uint(64bit incarnation) */
uint8_t m_incarnation;
uint64_t v_incarnation;
+
+ /** mp_encode_uint(SWIM_MEMBER_PAYLOAD) */
+ uint8_t k_payload;
+ /** mp_encode_bin(16bit bin header) */
+ uint8_t m_payload_size;
+ uint16_t v_payload_size;
+ /** Payload data ... */
};
/** Initialize antri-entropy record. */
@@ -316,7 +336,8 @@ swim_member_bin_create(struct swim_member_bin *header);
void
swim_member_bin_fill(struct swim_member_bin *header,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation);
+ enum swim_member_status status, uint64_t incarnation,
+ uint16_t payload_size);
/** }}} Anti-entropy component */
@@ -338,7 +359,7 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
/** SWIM event MessagePack template. */
struct PACKED swim_event_bin {
- /** mp_encode_map(5 or 6) */
+ /** mp_encode_map(5, or 6, or 7) */
uint8_t m_header;
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
@@ -386,7 +407,7 @@ void
swim_event_bin_fill(struct swim_event_bin *header,
enum swim_member_status status,
const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- uint64_t incarnation, int old_uuid_ttl);
+ uint64_t incarnation, int old_uuid_ttl, int payload_ttl);
/** Optional attribute of an event - old UUID of a member. */
struct swim_old_uuid_bin {
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list