[PATCH 5/5] swim: keep encoded round message cached

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Mon Dec 17 15:53:23 MSK 2018


During a SWIM round a message is being handed out
consisting of at most 3 sections. Parts of the message
change rarely, by member attributes update, or by
removal of some of them. So it is possible to cache
the message and send it during several round steps in
a row. Or even do not rebuild it the whole round.

Follow up #3234
---
 src/lib/swim/swim.c | 75 +++++++++++++++++++++++++++++++++++++--------
 1 file changed, 63 insertions(+), 12 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index df57ef470..a04c1646c 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -43,11 +43,11 @@
  *
  * Optional:
  * - do not send self.
- * - cache encoded batch.
  * - refute immediately.
  * - indirect ping.
  * - increment own incarnation on each round.
  * - attach dst incarnation to ping.
+ * - fix errors in sio_listen/bind.
  */
 
 /**
@@ -198,6 +198,13 @@ struct swim_member {
 	 * Position in a queue of members in the current round.
 	 */
 	struct rlist in_queue_round;
+	/**
+	 * It is true, if the member is being sent in the current
+	 * SWIM round, so it is encoded into cached round msg.
+	 * When this flag is true, and the member has changed or
+	 * removed, the cached round msg is invalidated.
+	 */
+	bool is_being_sent_in_this_round;
 	/**
 	 *
 	 *               Failure detection component
@@ -645,6 +652,27 @@ static int cfg_size = 0;
  */
 static struct swim_member **shuffled_members = NULL;
 static int shuffled_members_size = 0;
+/**
+ * When a new round starts, it builds a new message consisting of
+ * up to 3 components. This message is then being handed out to
+ * the cluster members. Most of time the message remains unchanged
+ * and can be cached to do not rebuild it on each step. It should
+ * be invalidated in 2 cases only: a member, encoded here, has
+ * changed its attributes (incarnation, status); a member, encoded
+ * here, is dead too long and removed.
+ */
+static char *cached_round_msg = NULL;
+/**
+ * Payload size in the cached_round_msg buffer. Capacity is
+ * UDP packet size.
+ */
+static int cached_round_msg_size = 0;
+
+static inline void
+cached_round_msg_invalidate(void)
+{
+	cached_round_msg_size = 0;
+}
 
 /** Queue of io tasks ready to push now. */
 static RLIST_HEAD(queue_output);
@@ -670,6 +698,8 @@ static void
 swim_member_is_updated(struct swim_member *member)
 {
 	swim_schedule_event(member);
+	if (member->is_being_sent_in_this_round)
+		cached_round_msg_invalidate();
 }
 
 /**
@@ -716,6 +746,7 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
 	}
 	member->status = status;
 	member->addr = *addr;
+	member->is_being_sent_in_this_round = false;
 	member->incarnation = incarnation;
 	member->is_pinned = false;
 	member->failed_pings = 0;
@@ -755,6 +786,8 @@ swim_find_member(const struct sockaddr_in *addr)
 static inline void
 swim_member_delete(struct swim_member *member)
 {
+	if (member->is_being_sent_in_this_round)
+		cached_round_msg_invalidate();
 	uint64_t key = sockaddr_in_hash(&member->addr);
 	mh_int_t rc = mh_i64ptr_find(members, key, NULL);
 	assert(rc != mh_end(members));
@@ -797,6 +830,7 @@ swim_shuffle_members(void)
 		int j = rand() / (RAND_MAX / (i + 1) + 1);
 		SWAP(shuffled_members[i], shuffled_members[j]);
 	}
+	cached_round_msg_invalidate();
 	return 0;
 }
 
@@ -833,12 +867,24 @@ calculate_bin_batch_size(int header_size, int member_size, int avail_size)
 }
 
 static int
-swim_encode_round_msg(char *buffer, int size)
+swim_encode_round_msg(void)
 {
-	char *start = buffer;
 	if ((shuffled_members == NULL || rlist_empty(&queue_round)) &&
 	    swim_new_round() != 0)
 		return -1;
+	if (cached_round_msg_size > 0)
+		return 0;
+	if (cached_round_msg == NULL) {
+		cached_round_msg = malloc(UDP_PACKET_SIZE);
+		if (cached_round_msg == NULL) {
+			diag_set(OutOfMemory, UDP_PACKET_SIZE, "malloc",
+				 "cached_round_msg");
+			return -1;
+		}
+	}
+	char *buffer = cached_round_msg;
+	int i, size = UDP_PACKET_SIZE;
+
 	/* -1 - for the root map header. */
 	assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
 	size -= sizeof(struct swim_fd_header_bin) + 1;
@@ -871,7 +917,7 @@ swim_encode_round_msg(char *buffer, int size)
 		memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin));
 		buffer += sizeof(diss_header_bin);
 
-		int i = 0;
+		i = 0;
 		struct swim_member *member, *tmp;
 		struct swim_event_bin event_bin;
 		swim_event_bin_create(&event_bin);
@@ -888,8 +934,9 @@ swim_encode_round_msg(char *buffer, int size)
 		event_count -= diss_batch_size;
 	}
 
+	i = 0;
 	if (ae_batch_size == 0)
-		return buffer - start;
+		goto end;
 	struct swim_anti_entropy_header_bin ae_header_bin;
 	swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
 	memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
@@ -897,13 +944,18 @@ swim_encode_round_msg(char *buffer, int size)
 
 	struct swim_member_bin member_bin;
 	swim_member_bin_create(&member_bin);
-	for (int i = 0; i < ae_batch_size; ++i) {
+	for (; i < ae_batch_size; ++i) {
 		struct swim_member *member = shuffled_members[i];
+		member->is_being_sent_in_this_round = true;
 		swim_member_bin_reset(&member_bin, member);
 		memcpy(buffer, &member_bin, sizeof(member_bin));
 		buffer += sizeof(member_bin);
 	}
-	return buffer - start;
+end:
+	for (; i < shuffled_members_size; ++i)
+		shuffled_members[i]->is_being_sent_in_this_round = false;
+	cached_round_msg_size = buffer - cached_round_msg;
+	return 0;
 }
 
 /**
@@ -915,9 +967,7 @@ swim_send_round_msg(struct swim_io_task *task)
 {
 	(void) task;
 	assert(task->cb == swim_send_round_msg);
-	char buffer[UDP_PACKET_SIZE];
-	int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE);
-	if (size < 0) {
+	if (swim_encode_round_msg() != 0) {
 		diag_log();
 		goto end;
 	}
@@ -930,8 +980,9 @@ swim_send_round_msg(struct swim_io_task *task)
 	say_verbose("SWIM: send to %s",
 		    sio_strfaddr((struct sockaddr *) &m->addr,
 				 sizeof(m->addr)));
-	if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr,
-		       sizeof(m->addr)) == -1 && ! sio_wouldblock(errno))
+	if (sio_sendto(output.fd, cached_round_msg, cached_round_msg_size, 0,
+		       (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 &&
+		       ! sio_wouldblock(errno))
 		diag_log();
 	swim_member_schedule_ack_wait(m);
 	rlist_del_entry(m, in_queue_round);
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list