[tarantool-patches] [PATCH 2/6] swim: replace event_bin and member_bin with the passport
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Apr 12 01:22:26 MSK 2019
Event_bin and member_bin binary packet structures were designed
separately for different purposes. Initially the event_bin was
thought having the same fields as passport + optional old UUID +
optional payload. On the other hand, member_bin was supposed to
store the passport + mandatory payload.
But old UUID was cut off in favour of another way of UUID update.
And payload appeared to be optional in both anti-entropy and
dissemination. It means, that member_bin and event_bin are not
needed anymore as separate structures. This commit replaces them
with the passport completely.
Part of #3234
---
src/lib/swim/swim.c | 60 +++++++++++++++++++--------------------
src/lib/swim/swim_proto.c | 38 ++-----------------------
src/lib/swim/swim_proto.h | 56 +++++++++---------------------------
3 files changed, 46 insertions(+), 108 deletions(-)
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 4582e3205..0e7f51adf 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -732,6 +732,27 @@ swim_new_round(struct swim *swim)
return 0;
}
+/**
+ * Encode one member into @a packet using @a passport structure.
+ * @retval 0 Success, encoded.
+ * @retval -1 Not enough memory in the packet.
+ */
+static int
+swim_encode_member(struct swim_packet *packet, struct swim_member *m,
+ struct swim_passport_bin *passport)
+{
+ /* The headers should be initialized. */
+ assert(passport->k_status == SWIM_MEMBER_STATUS);
+ int size = sizeof(*passport);
+ char *pos = swim_packet_alloc(packet, size);
+ if (pos == NULL)
+ return -1;
+ swim_passport_bin_fill(passport, &m->addr, &m->uuid, m->status,
+ m->incarnation);
+ memcpy(pos, passport, sizeof(*passport));
+ return 0;
+}
+
/**
* Encode anti-entropy header and random members data as many as
* possible to the end of the packet.
@@ -741,28 +762,21 @@ static int
swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
{
struct swim_anti_entropy_header_bin ae_header_bin;
- struct swim_member_bin member_bin;
- int size = sizeof(ae_header_bin);
- char *header = swim_packet_reserve(packet, size);
+ struct swim_passport_bin passport_bin;
+ char *header = swim_packet_alloc(packet, sizeof(ae_header_bin));
if (header == NULL)
return 0;
- char *pos = header;
- swim_member_bin_create(&member_bin);
+ swim_passport_bin_create(&passport_bin);
struct mh_swim_table_t *t = swim->members;
int i = 0, member_count = mh_size(t);
int rnd = swim_scaled_rand(0, member_count - 1);
for (mh_int_t rc = mh_swim_table_random(t, rnd), end = mh_end(t);
i < member_count; ++i) {
struct swim_member *m = *mh_swim_table_node(t, rc);
- int new_size = size + sizeof(member_bin);
- if (swim_packet_reserve(packet, new_size) == NULL)
+ if (swim_encode_member(packet, m, &passport_bin) != 0)
break;
- swim_member_bin_fill(&member_bin, &m->addr, &m->uuid,
- m->status, m->incarnation);
- memcpy(pos + size, &member_bin, sizeof(member_bin));
- size = new_size;
/*
- * First random member could be choosen too close
+ * First random member could be chosen too close
* to the hash end. Here the cycle is wrapped, if
* a packet still has free memory, but the
* iterator has already reached the hash end.
@@ -771,9 +785,6 @@ swim_encode_anti_entropy(struct swim *swim, struct swim_packet *packet)
if (rc == end)
rc = mh_first(t);
}
- if (i == 0)
- return 0;
- swim_packet_advance(packet, size);
swim_anti_entropy_header_bin_create(&ae_header_bin, i);
memcpy(header, &ae_header_bin, sizeof(ae_header_bin));
return 1;
@@ -822,32 +833,21 @@ static int
swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
{
struct swim_diss_header_bin diss_header_bin;
- int size = sizeof(diss_header_bin);
- char *header = swim_packet_reserve(packet, size);
+ struct swim_passport_bin passport_bin;
+ char *header = swim_packet_alloc(packet, sizeof(diss_header_bin));
if (header == NULL)
return 0;
+ swim_passport_bin_create(&passport_bin);
int i = 0;
- char *pos = header + size;
struct swim_member *m;
- struct swim_event_bin event_bin;
- swim_event_bin_create(&event_bin);
rlist_foreach_entry(m, &swim->dissemination_queue,
in_dissemination_queue) {
- int new_size = size + sizeof(event_bin);
- if (swim_packet_reserve(packet, new_size) == NULL)
+ if (swim_encode_member(packet, m, &passport_bin) != 0)
break;
- swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
- m->incarnation);
- memcpy(pos, &event_bin, sizeof(event_bin));
- pos += sizeof(event_bin);
- size = new_size;
++i;
}
- if (i == 0)
- return 0;
swim_diss_header_bin_create(&diss_header_bin, i);
memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
- swim_packet_advance(packet, size);
return 1;
}
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 700eff431..d84550663 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -330,9 +330,10 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
header->v_anti_entropy = mp_bswap_u16(batch_size);
}
-static inline void
+void
swim_passport_bin_create(struct swim_passport_bin *passport)
{
+ passport->m_header = 0x85;
passport->k_status = SWIM_MEMBER_STATUS;
passport->k_addr = SWIM_MEMBER_ADDRESS;
passport->m_addr = 0xce;
@@ -345,7 +346,7 @@ swim_passport_bin_create(struct swim_passport_bin *passport)
passport->m_incarnation = 0xcf;
}
-static inline void
+void
swim_passport_bin_fill(struct swim_passport_bin *passport,
const struct sockaddr_in *addr,
const struct tt_uuid *uuid,
@@ -358,22 +359,6 @@ swim_passport_bin_fill(struct swim_passport_bin *passport,
passport->v_incarnation = mp_bswap_u64(incarnation);
}
-void
-swim_member_bin_fill(struct swim_member_bin *header,
- const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation)
-{
- swim_passport_bin_fill(&header->passport, addr, uuid, status,
- incarnation);
-}
-
-void
-swim_member_bin_create(struct swim_member_bin *header)
-{
- header->m_header = 0x85;
- swim_passport_bin_create(&header->passport);
-}
-
void
swim_diss_header_bin_create(struct swim_diss_header_bin *header,
uint16_t batch_size)
@@ -383,23 +368,6 @@ swim_diss_header_bin_create(struct swim_diss_header_bin *header,
header->v_header = mp_bswap_u16(batch_size);
}
-void
-swim_event_bin_create(struct swim_event_bin *header)
-{
- swim_passport_bin_create(&header->passport);
-}
-
-void
-swim_event_bin_fill(struct swim_event_bin *header,
- enum swim_member_status status,
- const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- uint64_t incarnation)
-{
- header->m_header = 0x85;
- swim_passport_bin_fill(&header->passport, addr, uuid, status,
- incarnation);
-}
-
void
swim_meta_header_bin_create(struct swim_meta_header_bin *header,
const struct sockaddr_in *src)
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 6ae4475c0..ab4057185 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -265,11 +265,15 @@ swim_anti_entropy_header_bin_create(struct swim_anti_entropy_header_bin *header,
* state, exact address. The whole passport is necessary for each
* info related to a member: for anti-entropy records, for
* dissemination events. The components can inherit that structure
- * and add more attributes. For example, anti-entropy can add a
- * mandatory payload; dissemination adds optional old UUID and
- * payload.
+ * and add more attributes. Or just encode new attributes after
+ * the passport. For example, anti-entropy can add a payload when
+ * it is up to date; dissemination adds a payload when it is up to
+ * date and TTL is > 0.
*/
struct PACKED swim_passport_bin {
+ /** mp_encode_map(5) */
+ uint8_t m_header;
+
/** mp_encode_uint(SWIM_MEMBER_STATUS) */
uint8_t k_status;
/** mp_encode_uint(enum member_status) */
@@ -301,20 +305,9 @@ struct PACKED swim_passport_bin {
uint64_t v_incarnation;
};
-/**
- * SWIM member MessagePack template. Represents one record in
- * anti-entropy section.
- */
-struct PACKED swim_member_bin {
- /** mp_encode_map(5) */
- uint8_t m_header;
- /** Basic member info like status, address. */
- struct swim_passport_bin passport;
-};
-
-/** Initialize antri-entropy record. */
+/** Initialize a member's binary passport. */
void
-swim_member_bin_create(struct swim_member_bin *header);
+swim_passport_bin_create(struct swim_passport_bin *passport);
/**
* Since usually there are many members, it is faster to reset a
@@ -323,9 +316,10 @@ swim_member_bin_create(struct swim_member_bin *header);
* fill() ... .
*/
void
-swim_member_bin_fill(struct swim_member_bin *header,
- const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- enum swim_member_status status, uint64_t incarnation);
+swim_passport_bin_fill(struct swim_passport_bin *passport,
+ const struct sockaddr_in *addr,
+ const struct tt_uuid *uuid,
+ enum swim_member_status status, uint64_t incarnation);
/** }}} Anti-entropy component */
@@ -345,30 +339,6 @@ void
swim_diss_header_bin_create(struct swim_diss_header_bin *header,
uint16_t batch_size);
-/** SWIM event MessagePack template. */
-struct PACKED swim_event_bin {
- /** mp_encode_map(5 or 6) */
- uint8_t m_header;
- /** Basic member info like status, address. */
- struct swim_passport_bin passport;
-};
-
-/** Initialize dissemination record. */
-void
-swim_event_bin_create(struct swim_event_bin *header);
-
-/**
- * Since usually there are many evnets, it is faster to reset a
- * few fields in an existing template, then each time create a
- * new template. So the usage pattern is create(), fill(),
- * fill() ... .
- */
-void
-swim_event_bin_fill(struct swim_event_bin *header,
- enum swim_member_status status,
- const struct sockaddr_in *addr, const struct tt_uuid *uuid,
- uint64_t incarnation);
-
/** }}} Dissemination component */
/** {{{ Meta component */
--
2.17.2 (Apple Git-113)
More information about the Tarantool-patches
mailing list