[PATCH v4 06/12] [RAW] swim: introduce dissemination component

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Jan 31 00:28:35 MSK 2019


Dissemination components broadcasts events about member status
updates.

Part of #3234
---
 src/lib/swim/swim.c       | 223 ++++++++++++++++++++++++++++++++++++--
 src/lib/swim/swim_proto.c |  58 ++++++++++
 src/lib/swim/swim_proto.h | 108 ++++++++++++++++++
 3 files changed, 377 insertions(+), 12 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index a862f52a4..353e55254 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -243,6 +243,42 @@ struct swim_member {
 	struct swim_task ping_task;
 	/** Position in a queue of members waiting for an ack. */
 	struct rlist in_queue_wait_ack;
+	/**
+	 *
+	 *                 Dissemination component
+	 *
+	 * Dissemination component sends events. Event is a
+	 * notification about member status update. So formally,
+	 * this structure already has all the needed attributes.
+	 * But also an event somehow should be sent to all members
+	 * at least once according to SWIM, so it requires
+	 * something like TTL for each type of event, which
+	 * decrements on each send. And a member can not be
+	 * removed from the global table until it gets dead and
+	 * its status TTLs is 0, so as to allow other members
+	 * learn its dead status.
+	 */
+	int status_ttl;
+	/**
+	 * Events are put into a queue sorted by event occurrence
+	 * time.
+	 */
+	struct rlist in_queue_events;
+	/**
+	 * Old UUID is sent for a while after its update so as to
+	 * allow other members to update this members's record
+	 * in their tables.
+	 */
+	struct tt_uuid old_uuid;
+	/**
+	 * UUID is quite heavy structure, so an old UUID is sent
+	 * only this number of times. A current UUID is sent
+	 * always. Moreover, if someone wanted to reuse UUID,
+	 * always sending old ones would make it much harder to
+	 * detect which instance has just updated UUID, and which
+	 * old UUID is handed over to another instance.
+	 */
+	int old_uuid_ttl;
 };
 
 #define mh_name _swim_table
@@ -313,6 +349,12 @@ struct swim {
 	struct rlist queue_wait_ack;
 	/** Generator of ack checking events. */
 	struct ev_periodic wait_ack_tick;
+	/**
+	 *
+	 *                 Dissemination component
+	 */
+	/** Queue of events sorted by occurrence time. */
+	struct rlist queue_events;
 };
 
 /** Put the member into a list of ACK waiters. */
@@ -327,14 +369,42 @@ swim_member_wait_ack(struct swim *swim, struct swim_member *member)
 	}
 }
 
+/**
+ * On literally any update of a member it stands into a queue of
+ * events to disseminate the update. Note that status TTL is
+ * always set, even if UUID is updated, or any other attribute. It
+ * is because 1) it simplifies the code when status TTL is bigger
+ * than all other ones, 2) status occupies only 2 bytes in a
+ * packet, so it is never worse to send it on any update, but
+ * reduces entropy.
+ */
+static inline void
+swim_schedule_event(struct swim *swim, struct swim_member *member)
+{
+	if (rlist_empty(&member->in_queue_events)) {
+		rlist_add_tail_entry(&swim->queue_events, member,
+				     in_queue_events);
+	}
+	member->status_ttl = mh_size(swim->members);
+}
+
 /**
  * Make all needed actions to process a member's update like a
  * change of its status, or incarnation, or both.
  */
 static void
-swim_member_status_is_updated(struct swim_member *member)
+swim_member_status_is_updated(struct swim_member *member, struct swim *swim)
 {
 	member->unacknowledged_pings = 0;
+	swim_schedule_event(swim, member);
+}
+
+/** Make all needed actions to process member's UUID update. */
+static void
+swim_member_uuid_is_updated(struct swim_member *member, struct swim *swim)
+{
+	member->old_uuid_ttl = mh_size(swim->members);
+	swim_schedule_event(swim, member);
 }
 
 /**
@@ -352,7 +422,6 @@ swim_member_update_status(struct swim_member *member,
 			  enum swim_member_status new_status,
 			  uint64_t incarnation, struct swim *swim)
 {
-	(void) swim;
 	/*
 	 * Source of truth about self is this instance and it is
 	 * never updated from remote. Refutation is handled
@@ -362,12 +431,12 @@ swim_member_update_status(struct swim_member *member,
 	if (member->incarnation == incarnation) {
 		if (member->status < new_status) {
 			member->status = new_status;
-			swim_member_status_is_updated(member);
+			swim_member_status_is_updated(member, swim);
 		}
 	} else if (member->incarnation < incarnation) {
 		member->status = new_status;
 		member->incarnation = incarnation;
-		swim_member_status_is_updated(member);
+		swim_member_status_is_updated(member, swim);
 	}
 }
 
@@ -400,6 +469,9 @@ swim_member_delete(struct swim *swim, struct swim_member *member)
 	swim_task_destroy(&member->ack_task);
 	swim_task_destroy(&member->ping_task);
 
+	/* Dissemination component. */
+	rlist_del_entry(member, in_queue_events);
+
 	free(member);
 }
 
@@ -469,6 +541,10 @@ swim_member_new(struct swim *swim, const struct sockaddr_in *addr,
 	swim_task_create(&member->ack_task, NULL, NULL);
 	swim_task_create(&member->ping_task, swim_ping_task_complete, NULL);
 
+	/* Dissemination component. */
+	rlist_create(&member->in_queue_events);
+	swim_member_status_is_updated(member, swim);
+
 	say_verbose("SWIM: member %s is added", swim_uuid_str(uuid));
 	return member;
 }
@@ -610,6 +686,51 @@ swim_encode_failure_detection(struct swim *swim, struct swim_packet *packet,
 	return 1;
 }
 
+/**
+ * Encode dissemination component.
+ * @retval 0 Not error, but nothing is encoded.
+ * @retval 1 Something is encoded.
+ */
+static int
+swim_encode_dissemination(struct swim *swim, struct swim_packet *packet)
+{
+	struct swim_diss_header_bin diss_header_bin;
+	int size = sizeof(diss_header_bin);
+	char *header = swim_packet_reserve(packet, size);
+	if (header == NULL)
+		return 0;
+	int i = 0;
+	struct swim_member *m;
+	struct swim_event_bin event_bin;
+	struct swim_old_uuid_bin old_uuid_bin;
+	swim_event_bin_create(&event_bin);
+	swim_old_uuid_bin_create(&old_uuid_bin);
+	rlist_foreach_entry(m, &swim->queue_events, in_queue_events) {
+		int new_size = size + sizeof(event_bin);
+		if (m->old_uuid_ttl > 0)
+			new_size += sizeof(old_uuid_bin);
+		char *pos = swim_packet_reserve(packet, new_size);
+		if (pos == NULL)
+			break;
+		size = new_size;
+		swim_event_bin_fill(&event_bin, m->status, &m->addr, &m->uuid,
+				    m->incarnation, m->old_uuid_ttl);
+		memcpy(pos, &event_bin, sizeof(event_bin));
+		if (m->old_uuid_ttl > 0) {
+			pos += sizeof(event_bin);
+			swim_old_uuid_bin_fill(&old_uuid_bin, &m->old_uuid);
+			memcpy(pos, &old_uuid_bin, sizeof(old_uuid_bin));
+		}
+		++i;
+	}
+	if (i == 0)
+		return 0;
+	swim_diss_header_bin_create(&diss_header_bin, i);
+	memcpy(header, &diss_header_bin, sizeof(diss_header_bin));
+	swim_packet_advance(packet, size);
+	return 1;
+}
+
 /** Encode SWIM components into a UDP packet. */
 static void
 swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
@@ -620,12 +741,36 @@ swim_encode_round_msg(struct swim *swim, struct swim_packet *packet)
 	map_size += swim_encode_src_uuid(swim, packet);
 	map_size += swim_encode_failure_detection(swim, packet,
 						  SWIM_FD_MSG_PING);
+	map_size += swim_encode_dissemination(swim, packet);
 	map_size += swim_encode_anti_entropy(swim, packet);
 
 	assert(mp_sizeof_map(map_size) == 1 && map_size >= 2);
 	mp_encode_map(header, map_size);
 }
 
+/**
+ * Decrement TTLs of all events. It is done after each round step.
+ * Note, that when there are too many events to fit into a packet,
+ * the tail of events list without being disseminated start
+ * reeking and rotting, and the most far events can be deleted
+ * without ever being sent. But hardly this situation is reachable
+ * since even 1000 bytes can fit 37 events of ~27 bytes each, that
+ * means in fact a failure of 37 instances. In such a case rotting
+ * events are the most mild problem.
+ */
+static void
+swim_decrease_events_ttl(struct swim *swim)
+{
+	struct swim_member *member, *tmp;
+	rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
+				 tmp) {
+		if (member->old_uuid_ttl > 0)
+			--member->old_uuid_ttl;
+		if (--member->status_ttl == 0)
+			rlist_del_entry(member, in_queue_events);
+	}
+}
+
 /**
  * Once per specified timeout trigger a next round step. In round
  * step a next memeber is taken from the round queue and a round
@@ -676,6 +821,8 @@ swim_round_step_complete(struct swim_task *task,
 		 * section with a ping.
 		 */
 		swim_member_wait_ack(swim, m);
+		/* As well as dissemination. */
+		swim_decrease_events_ttl(swim);
 	}
 }
 
@@ -737,11 +884,12 @@ swim_check_acks(struct ev_loop *loop, struct ev_periodic *p, int events)
 		case MEMBER_ALIVE:
 			if (m->unacknowledged_pings >= NO_ACKS_TO_DEAD) {
 				m->status = MEMBER_DEAD;
-				swim_member_status_is_updated(m);
+				swim_member_status_is_updated(m, swim);
 			}
 			break;
 		case MEMBER_DEAD:
-			if (m->unacknowledged_pings >= NO_ACKS_TO_GC)
+			if (m->unacknowledged_pings >= NO_ACKS_TO_GC &&
+			    m->status_ttl == 0)
 				swim_member_delete(swim, m);
 			break;
 		default:
@@ -782,18 +930,20 @@ swim_member_update_uuid(struct swim_member *member,
 	struct mh_swim_table_key key = {member->hash, &old_uuid};
 	mh_swim_table_del(t, mh_swim_table_find(t, key, NULL), NULL);
 	member->hash = swim_uuid_hash(new_uuid);
+	member->old_uuid = old_uuid;
+	swim_member_uuid_is_updated(member, swim);
 	return 0;
 }
 
 /** Update member's address.*/
 static inline void
 swim_member_update_addr(struct swim_member *member,
-			const struct sockaddr_in *addr)
+			const struct sockaddr_in *addr, struct swim *swim)
 {
 	if (addr->sin_port != member->addr.sin_port ||
 	    addr->sin_addr.s_addr != member->addr.sin_addr.s_addr) {
 		member->addr = *addr;
-		swim_member_status_is_updated(member);
+		swim_member_status_is_updated(member, swim);
 	}
 }
 
@@ -807,6 +957,9 @@ static struct swim_member *
 swim_update_member(struct swim *swim, const struct swim_member_def *def)
 {
 	struct swim_member *member = swim_find_member(swim, &def->uuid);
+	struct swim_member *old_member = NULL;
+	if (! tt_uuid_is_nil(&def->old_uuid))
+		old_member = swim_find_member(swim, &def->old_uuid);
 	if (member == NULL) {
 		if (def->status == MEMBER_DEAD) {
 			/*
@@ -821,19 +974,29 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
 			 */
 			return NULL;
 		}
-		member = swim_member_new(swim, &def->addr, &def->uuid,
-					 def->status, def->incarnation);
+		if (old_member == NULL) {
+			member = swim_member_new(swim, &def->addr, &def->uuid,
+						 def->status, def->incarnation);
+		} else if (swim_member_update_uuid(old_member, &def->uuid,
+						   swim) == 0) {
+			member = old_member;
+		}
 		return member;
 	}
 	struct swim_member *self = swim->self;
 	if (member != self) {
 		if (def->incarnation >= member->incarnation) {
-			swim_member_update_addr(member, &def->addr);
+			swim_member_update_addr(member, &def->addr, swim);
 			swim_member_update_status(member, def->status,
 						  def->incarnation, swim);
+			if (old_member != NULL) {
+				assert(member != old_member);
+				swim_member_delete(swim, old_member);
+			}
 		}
 		return member;
 	}
+	uint64_t old_incarnation = self->incarnation;
 	/*
 	 * It is possible that other instances know a bigger
 	 * incarnation of this instance - such thing happens when
@@ -852,6 +1015,8 @@ swim_update_member(struct swim *swim, const struct swim_member_def *def)
 		 */
 		self->incarnation++;
 	}
+	if (old_incarnation != self->incarnation)
+		swim_member_status_is_updated(self, swim);
 	return member;
 }
 
@@ -920,6 +1085,31 @@ swim_process_failure_detection(struct swim *swim, const char **pos,
 	}
 	return 0;
 }
+/**
+ * Decode a dissemination message. Schedule new events, update
+ * members.
+ */
+static int
+swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
+{
+	const char *msg_pref = "invald dissemination message:";
+	uint32_t size;
+	if (swim_decode_array(pos, end, &size, msg_pref, "root") != 0)
+		return -1;
+	for (uint32_t i = 0; i < size; ++i) {
+		struct swim_member_def def;
+		if (swim_member_def_decode(&def, pos, end, msg_pref) != 0)
+			return -1;
+		if (swim_update_member(swim, &def) == NULL) {
+			/*
+			 * Not a critical error - other updates
+			 * still can be applied.
+			 */
+			diag_log();
+		}
+	}
+	return 0;
+}
 
 /** Process a new message. */
 static void
@@ -962,6 +1152,11 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
 							   src, &uuid) != 0)
 				goto error;
 			break;
+		case SWIM_DISSEMINATION:
+			say_verbose("SWIM: process dissemination");
+			if (swim_process_dissemination(swim, &pos, end) != 0)
+				goto error;
+			break;
 		default:
 			diag_set(SwimError, "%s unexpected key", msg_pref);
 			goto error;
@@ -1000,6 +1195,10 @@ swim_new(void)
 	ev_init(&swim->wait_ack_tick, swim_check_acks);
 	ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT_DEFAULT, NULL);
 	swim->wait_ack_tick.data = (void *) swim;
+
+	/* Dissemination component. */
+	rlist_create(&swim->queue_events);
+
 	return swim;
 }
 
@@ -1089,7 +1288,7 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 	ev_periodic_start(loop(), &swim->wait_ack_tick);
 
 	if (! is_first_cfg) {
-		swim_member_update_addr(swim->self, &addr);
+		swim_member_update_addr(swim->self, &addr, swim);
 		int rc = swim_member_update_uuid(swim->self, uuid, swim);
 		/* Reserved above. */
 		assert(rc == 0);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 542b988c1..e31c67682 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -189,6 +189,11 @@ swim_decode_member_key(enum swim_member_key key, const char **pos,
 				     "member incarnation") != 0)
 			return -1;
 		break;
+	case SWIM_MEMBER_OLD_UUID:
+		if (swim_decode_uuid(&def->old_uuid, pos, end, msg_pref,
+				     "member old uuid") != 0)
+			return -1;
+		break;
 	default:
 		unreachable();
 	}
@@ -337,6 +342,59 @@ swim_member_bin_create(struct swim_member_bin *header)
 	header->m_incarnation = 0xcf;
 }
 
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+			    uint16_t batch_size)
+{
+	header->k_header = SWIM_DISSEMINATION;
+	header->m_header = 0xcd;
+	header->v_header = mp_bswap_u16(batch_size);
+}
+
+void
+swim_event_bin_create(struct swim_event_bin *header)
+{
+	header->k_status = SWIM_MEMBER_STATUS;
+	header->k_addr = SWIM_MEMBER_ADDRESS;
+	header->m_addr = 0xce;
+	header->k_port = SWIM_MEMBER_PORT;
+	header->m_port = 0xcd;
+	header->k_uuid = SWIM_MEMBER_UUID;
+	header->m_uuid = 0xc4;
+	header->m_uuid_len = UUID_LEN;
+	header->k_incarnation = SWIM_MEMBER_INCARNATION;
+	header->m_incarnation = 0xcf;
+}
+
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+		    enum swim_member_status status,
+		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+		    uint64_t incarnation, int old_uuid_ttl)
+{
+	header->m_header = 0x85 + (old_uuid_ttl > 0);
+	header->v_status = status;
+	header->v_addr = mp_bswap_u32(addr->sin_addr.s_addr);
+	header->v_port = mp_bswap_u16(addr->sin_port);
+	memcpy(header->v_uuid, uuid, UUID_LEN);
+	header->v_incarnation = mp_bswap_u64(incarnation);
+}
+
+void
+swim_old_uuid_bin_create(struct swim_old_uuid_bin *header)
+{
+	header->k_uuid = SWIM_MEMBER_OLD_UUID;
+	header->m_uuid = 0xc4;
+	header->m_uuid_len = UUID_LEN;
+}
+
+void
+swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
+		       const struct tt_uuid *uuid)
+{
+	memcpy(header->v_uuid, uuid, UUID_LEN);
+}
+
 void
 swim_meta_header_bin_create(struct swim_meta_header_bin *header,
 			    const struct sockaddr_in *src)
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 91a0bca9d..a3dc1164e 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -58,6 +58,19 @@
  * |                                                             |
  * |               OR/AND                                        |
  * |                                                             |
+ * |     SWIM_DISSEMINATION: [                                   |
+ * |         {                                                   |
+ * |             SWIM_MEMBER_STATUS: uint, enum member_status,   |
+ * |             SWIM_MEMBER_ADDRESS: uint, ip,                  |
+ * |             SWIM_MEMBER_PORT: uint, port,                   |
+ * |             SWIM_MEMBER_UUID: 16 byte UUID,                 |
+ * |             SWIM_MEMBER_INCARNATION: uint                   |
+ * |         },                                                  |
+ * |         ...                                                 |
+ * |     ],                                                      |
+ * |                                                             |
+ * |               OR/AND                                        |
+ * |                                                             |
  * |     SWIM_ANTI_ENTROPY: [                                    |
  * |         {                                                   |
  * |             SWIM_MEMBER_STATUS: uint, enum member_status,   |
@@ -91,6 +104,7 @@ extern const char *swim_member_status_strs[];
  */
 struct swim_member_def {
 	struct tt_uuid uuid;
+	struct tt_uuid old_uuid;
 	struct sockaddr_in addr;
 	uint64_t incarnation;
 	enum swim_member_status status;
@@ -124,6 +138,7 @@ enum swim_body_key {
 	SWIM_SRC_UUID = 0,
 	SWIM_ANTI_ENTROPY,
 	SWIM_FAILURE_DETECTION,
+	SWIM_DISSEMINATION,
 };
 
 /**
@@ -231,6 +246,7 @@ enum swim_member_key {
 	SWIM_MEMBER_PORT,
 	SWIM_MEMBER_UUID,
 	SWIM_MEMBER_INCARNATION,
+	SWIM_MEMBER_OLD_UUID,
 	swim_member_key_MAX,
 };
 
@@ -304,6 +320,98 @@ swim_member_bin_fill(struct swim_member_bin *header,
 
 /** }}}                  Anti-entropy component                 */
 
+/** {{{                 Dissemination component                 */
+
+/** SWIM dissemination MessagePack template. */
+struct PACKED swim_diss_header_bin {
+	/** mp_encode_uint(SWIM_DISSEMINATION) */
+	uint8_t k_header;
+	/** mp_encode_array() */
+	uint8_t m_header;
+	uint16_t v_header;
+};
+
+/** Initialize dissemination header. */
+void
+swim_diss_header_bin_create(struct swim_diss_header_bin *header,
+			    uint16_t batch_size);
+
+/** SWIM event MessagePack template. */
+struct PACKED swim_event_bin {
+	/** mp_encode_map(5 or 6) */
+	uint8_t m_header;
+
+	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
+	uint8_t k_status;
+	/** mp_encode_uint(enum member_status) */
+	uint8_t v_status;
+
+	/** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
+	uint8_t k_addr;
+	/** mp_encode_uint(addr.sin_addr.s_addr) */
+	uint8_t m_addr;
+	uint32_t v_addr;
+
+	/** mp_encode_uint(SWIM_MEMBER_PORT) */
+	uint8_t k_port;
+	/** mp_encode_uint(addr.sin_port) */
+	uint8_t m_port;
+	uint16_t v_port;
+
+	/** mp_encode_uint(SWIM_MEMBER_UUID) */
+	uint8_t k_uuid;
+	/** mp_encode_bin(UUID_LEN) */
+	uint8_t m_uuid;
+	uint8_t m_uuid_len;
+	uint8_t v_uuid[UUID_LEN];
+
+	/** mp_encode_uint(SWIM_MEMBER_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
+};
+
+/** Initialize dissemination record. */
+void
+swim_event_bin_create(struct swim_event_bin *header);
+
+/**
+ * Since usually there are many evnets, it is faster to reset a
+ * few fields in an existing template, then each time create a
+ * new template. So the usage pattern is create(), fill(),
+ * fill() ... .
+ */
+void
+swim_event_bin_fill(struct swim_event_bin *header,
+		    enum swim_member_status status,
+		    const struct sockaddr_in *addr, const struct tt_uuid *uuid,
+		    uint64_t incarnation, int old_uuid_ttl);
+
+/** Optional attribute of an event - old UUID of a member. */
+struct swim_old_uuid_bin {
+	/** mp_encode_uint(SWIM_MEMBER_OLD_UUID) */
+	uint8_t k_uuid;
+	/** mp_encode_bin(UUID_LEN) */
+	uint8_t m_uuid;
+	uint8_t m_uuid_len;
+	uint8_t v_uuid[UUID_LEN];
+};
+
+/** Initialize old UUID field. */
+void
+swim_old_uuid_bin_create(struct swim_old_uuid_bin *header);
+
+/**
+ * Set mutable fields of the field, by the same principle as event
+ * filling.
+ */
+void
+swim_old_uuid_bin_fill(struct swim_old_uuid_bin *header,
+		       const struct tt_uuid *uuid);
+
+/** }}}                 Dissemination component                 */
+
 /** {{{                     Meta component                      */
 
 /**
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list