Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: vdavydov.dev@gmail.com
Subject: [PATCH 5/5] swim: keep encoded round message cached
Date: Mon, 17 Dec 2018 15:53:23 +0300	[thread overview]
Message-ID: <7d4800682ff4ffabd4d8c887432ecde07a5dc75c.1545047950.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1545047950.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1545047950.git.v.shpilevoy@tarantool.org>

During a SWIM round a message is being handed out
consisting of at most 3 sections. Parts of the message
change rarely, by member attributes update, or by
removal of some of them. So it is possible to cache
the message and send it during several round steps in
a row. Or even do not rebuild it the whole round.

Follow up #3234
---
 src/lib/swim/swim.c | 75 +++++++++++++++++++++++++++++++++++++--------
 1 file changed, 63 insertions(+), 12 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index df57ef470..a04c1646c 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -43,11 +43,11 @@
  *
  * Optional:
  * - do not send self.
- * - cache encoded batch.
  * - refute immediately.
  * - indirect ping.
  * - increment own incarnation on each round.
  * - attach dst incarnation to ping.
+ * - fix errors in sio_listen/bind.
  */
 
 /**
@@ -198,6 +198,13 @@ struct swim_member {
 	 * Position in a queue of members in the current round.
 	 */
 	struct rlist in_queue_round;
+	/**
+	 * It is true, if the member is being sent in the current
+	 * SWIM round, so it is encoded into cached round msg.
+	 * When this flag is true, and the member has changed or
+	 * removed, the cached round msg is invalidated.
+	 */
+	bool is_being_sent_in_this_round;
 	/**
 	 *
 	 *               Failure detection component
@@ -645,6 +652,27 @@ static int cfg_size = 0;
  */
 static struct swim_member **shuffled_members = NULL;
 static int shuffled_members_size = 0;
+/**
+ * When a new round starts, it builds a new message consisting of
+ * up to 3 components. This message is then being handed out to
+ * the cluster members. Most of time the message remains unchanged
+ * and can be cached to do not rebuild it on each step. It should
+ * be invalidated in 2 cases only: a member, encoded here, has
+ * changed its attributes (incarnation, status); a member, encoded
+ * here, is dead too long and removed.
+ */
+static char *cached_round_msg = NULL;
+/**
+ * Payload size in the cached_round_msg buffer. Capacity is
+ * UDP packet size.
+ */
+static int cached_round_msg_size = 0;
+
+static inline void
+cached_round_msg_invalidate(void)
+{
+	cached_round_msg_size = 0;
+}
 
 /** Queue of io tasks ready to push now. */
 static RLIST_HEAD(queue_output);
@@ -670,6 +698,8 @@ static void
 swim_member_is_updated(struct swim_member *member)
 {
 	swim_schedule_event(member);
+	if (member->is_being_sent_in_this_round)
+		cached_round_msg_invalidate();
 }
 
 /**
@@ -716,6 +746,7 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status,
 	}
 	member->status = status;
 	member->addr = *addr;
+	member->is_being_sent_in_this_round = false;
 	member->incarnation = incarnation;
 	member->is_pinned = false;
 	member->failed_pings = 0;
@@ -755,6 +786,8 @@ swim_find_member(const struct sockaddr_in *addr)
 static inline void
 swim_member_delete(struct swim_member *member)
 {
+	if (member->is_being_sent_in_this_round)
+		cached_round_msg_invalidate();
 	uint64_t key = sockaddr_in_hash(&member->addr);
 	mh_int_t rc = mh_i64ptr_find(members, key, NULL);
 	assert(rc != mh_end(members));
@@ -797,6 +830,7 @@ swim_shuffle_members(void)
 		int j = rand() / (RAND_MAX / (i + 1) + 1);
 		SWAP(shuffled_members[i], shuffled_members[j]);
 	}
+	cached_round_msg_invalidate();
 	return 0;
 }
 
@@ -833,12 +867,24 @@ calculate_bin_batch_size(int header_size, int member_size, int avail_size)
 }
 
 static int
-swim_encode_round_msg(char *buffer, int size)
+swim_encode_round_msg(void)
 {
-	char *start = buffer;
 	if ((shuffled_members == NULL || rlist_empty(&queue_round)) &&
 	    swim_new_round() != 0)
 		return -1;
+	if (cached_round_msg_size > 0)
+		return 0;
+	if (cached_round_msg == NULL) {
+		cached_round_msg = malloc(UDP_PACKET_SIZE);
+		if (cached_round_msg == NULL) {
+			diag_set(OutOfMemory, UDP_PACKET_SIZE, "malloc",
+				 "cached_round_msg");
+			return -1;
+		}
+	}
+	char *buffer = cached_round_msg;
+	int i, size = UDP_PACKET_SIZE;
+
 	/* -1 - for the root map header. */
 	assert((uint)size > sizeof(struct swim_fd_header_bin) + 1);
 	size -= sizeof(struct swim_fd_header_bin) + 1;
@@ -871,7 +917,7 @@ swim_encode_round_msg(char *buffer, int size)
 		memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin));
 		buffer += sizeof(diss_header_bin);
 
-		int i = 0;
+		i = 0;
 		struct swim_member *member, *tmp;
 		struct swim_event_bin event_bin;
 		swim_event_bin_create(&event_bin);
@@ -888,8 +934,9 @@ swim_encode_round_msg(char *buffer, int size)
 		event_count -= diss_batch_size;
 	}
 
+	i = 0;
 	if (ae_batch_size == 0)
-		return buffer - start;
+		goto end;
 	struct swim_anti_entropy_header_bin ae_header_bin;
 	swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size);
 	memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin));
@@ -897,13 +944,18 @@ swim_encode_round_msg(char *buffer, int size)
 
 	struct swim_member_bin member_bin;
 	swim_member_bin_create(&member_bin);
-	for (int i = 0; i < ae_batch_size; ++i) {
+	for (; i < ae_batch_size; ++i) {
 		struct swim_member *member = shuffled_members[i];
+		member->is_being_sent_in_this_round = true;
 		swim_member_bin_reset(&member_bin, member);
 		memcpy(buffer, &member_bin, sizeof(member_bin));
 		buffer += sizeof(member_bin);
 	}
-	return buffer - start;
+end:
+	for (; i < shuffled_members_size; ++i)
+		shuffled_members[i]->is_being_sent_in_this_round = false;
+	cached_round_msg_size = buffer - cached_round_msg;
+	return 0;
 }
 
 /**
@@ -915,9 +967,7 @@ swim_send_round_msg(struct swim_io_task *task)
 {
 	(void) task;
 	assert(task->cb == swim_send_round_msg);
-	char buffer[UDP_PACKET_SIZE];
-	int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE);
-	if (size < 0) {
+	if (swim_encode_round_msg() != 0) {
 		diag_log();
 		goto end;
 	}
@@ -930,8 +980,9 @@ swim_send_round_msg(struct swim_io_task *task)
 	say_verbose("SWIM: send to %s",
 		    sio_strfaddr((struct sockaddr *) &m->addr,
 				 sizeof(m->addr)));
-	if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr,
-		       sizeof(m->addr)) == -1 && ! sio_wouldblock(errno))
+	if (sio_sendto(output.fd, cached_round_msg, cached_round_msg_size, 0,
+		       (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 &&
+		       ! sio_wouldblock(errno))
 		diag_log();
 	swim_member_schedule_ack_wait(m);
 	rlist_del_entry(m, in_queue_round);
-- 
2.17.2 (Apple Git-113)

      parent reply	other threads:[~2018-12-17 12:53 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 1/5] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 2/5] swim: introduce failure detection component Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 3/5] swim: introduce a dissemination component Vladislav Shpilevoy
2018-12-17 12:53 ` [PATCH 4/5] swim: introduce "suspected" status Vladislav Shpilevoy
2018-12-17 12:53 ` Vladislav Shpilevoy [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=7d4800682ff4ffabd4d8c887432ecde07a5dc75c.1545047950.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH 5/5] swim: keep encoded round message cached' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox