Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: vdavydov.dev@gmail.com, kostja@tarantool.org
Subject: [PATCH v2 4/6] [RAW] swim: keep encoded round message cached
Date: Tue, 25 Dec 2018 22:19:27 +0300	[thread overview]
Message-ID: <3209be496909a01580ea482136ce4552c7de7999.1545765055.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1545765055.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1545765055.git.v.shpilevoy@tarantool.org>

During a SWIM round a message is being handed out consisting of
at most 3 sections. Parts of the message change rarely, by
member attributes update and by removal of some of them. So it is
possible to cache the message and send it during several round
steps in a row. Or even do not rebuild it the whole round.

Also, it allows to send message parts on separate libev EV_WRITE
events, because now the message is stored globally and can be
iterated from different events.

Part of #3234
---
 src/lib/swim/swim.c | 52 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 44 insertions(+), 8 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 15e079f11..f880066c5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -345,6 +345,13 @@ struct swim_member {
 	 * Position in a queue of members in the current round.
 	 */
 	struct rlist in_queue_round;
+	/**
+	 * It is true, if the member is being sent in the current
+	 * SWIM round, so it is encoded into cached round msg.
+	 * When this flag is true, and the member has changed or
+	 * removed, the cached round msg is invalidated.
+	 */
+	bool is_being_sent_in_this_round;
 	/**
 	 *
 	 *               Failure detection component
@@ -473,6 +480,17 @@ struct swim {
 	struct ev_periodic wait_ack_tick;
 	/** Queue of events sorted by occurrence time. */
 	struct rlist queue_events;
+	/**
+	 * When a new round starts, it builds a new message
+	 * consisting of up to 3 components. This message is then
+	 * being handed out to the cluster members. Most of time
+	 * the message remains unchanged and can be cached to do
+	 * not rebuild it on each step. It should be invalidated
+	 * in 2 cases only: a member, encoded here, has changed
+	 * its attributes (incarnation, status); a member, encoded
+	 * here, is dead too long and removed.
+	 */
+	struct swim_msg cached_round_msg;
 };
 
 static inline uint64_t
@@ -481,6 +499,13 @@ sockaddr_in_hash(const struct sockaddr_in *a)
 	return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
 }
 
+static inline void
+cached_round_msg_invalidate(struct swim *swim)
+{
+	swim_msg_destroy(&swim->cached_round_msg);
+	swim_msg_create(&swim->cached_round_msg);
+}
+
 /**
  * Main round messages can carry merged failure detection
  * messages and anti-entropy. With these keys the components can
@@ -788,6 +813,8 @@ static void
 swim_member_is_updated(struct swim *swim, struct swim_member *member)
 {
 	swim_schedule_event(swim, member);
+	if (member->is_being_sent_in_this_round)
+		cached_round_msg_invalidate(swim);
 }
 
 /**
@@ -876,6 +903,8 @@ swim_find_member(struct swim *swim, const struct sockaddr_in *addr)
 static inline void
 swim_member_delete(struct swim *swim, struct swim_member *member)
 {
+	if (member->is_being_sent_in_this_round)
+		cached_round_msg_invalidate(swim);
 	uint64_t key = sockaddr_in_hash(&member->addr);
 	mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
 	assert(rc != mh_end(swim->members));
@@ -924,6 +953,7 @@ swim_shuffle_members(struct swim *swim)
 		int j = rand() / (RAND_MAX / (i + 1) + 1);
 		SWAP(shuffled[i], shuffled[j]);
 	}
+	cached_round_msg_invalidate(swim);
 	return 0;
 }
 
@@ -1062,9 +1092,9 @@ swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
 
 /** Encode SWIM components into a sequence of UDP packets. */
 static int
-swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
+swim_encode_round_msg(struct swim *swim)
 {
-	swim_msg_create(msg);
+	struct swim_msg *msg = &swim->cached_round_msg;
 	struct swim_msg_part *part = swim_msg_reserve(msg, 1);
 	if (part == NULL)
 		return -1;
@@ -1089,9 +1119,13 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
 
 	assert(mp_sizeof_map(map_size) == 1);
 	mp_encode_map(header, map_size);
+	for (int i = 0; i < rc; ++i) {
+		struct swim_member *member = swim->shuffled_members[i];
+		member->is_being_sent_in_this_round = true;
+	}
 	return 0;
 error:
-	swim_msg_destroy(msg);
+	cached_round_msg_invalidate(swim);
 	return -1;
 }
 
@@ -1126,8 +1160,8 @@ swim_send_round_msg(struct swim_io_task *task)
 	if (rlist_empty(&swim->queue_round))
 		goto next_round_step;
 
-	struct swim_msg msg;
-	if (swim_encode_round_msg(swim, &msg) != 0) {
+	if (swim_msg_is_empty(&swim->cached_round_msg) &&
+	    swim_encode_round_msg(swim) != 0) {
 		diag_log();
 		goto next_round_step;
 	}
@@ -1137,15 +1171,15 @@ swim_send_round_msg(struct swim_io_task *task)
 	say_verbose("SWIM: send to %s",
 		    sio_strfaddr((struct sockaddr *) &m->addr,
 				 sizeof(m->addr)));
-	for (struct swim_msg_part *part = swim_msg_first_part(&msg);
-	     part != NULL; part = swim_msg_part_next(part)) {
+	for (struct swim_msg_part *part =
+	     swim_msg_first_part(&swim->cached_round_msg); part != NULL;
+	     part = swim_msg_part_next(part)) {
 		if (swim->transport.send_round_msg(swim->output.fd, part->body,
 						   part->size,
 						   (struct sockaddr *) &m->addr,
 						   sizeof(m->addr)) == -1)
 			diag_log();
 	}
-	swim_msg_destroy(&msg);
 	swim_member_schedule_ack_wait(swim, m);
 	swim_decrease_events_ttl(swim);
 	rlist_del_entry(m, in_queue_round);
@@ -1679,6 +1713,7 @@ swim_new(void)
 	ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL);
 	swim->wait_ack_tick.data = (void *) swim;
 	rlist_create(&swim->queue_events);
+	swim_msg_create(&swim->cached_round_msg);
 	return swim;
 }
 
@@ -1811,4 +1846,5 @@ swim_delete(struct swim *swim)
 	}
 	mh_i64ptr_delete(swim->members);
 	free(swim->shuffled_members);
+	cached_round_msg_invalidate(swim);
 }
-- 
2.17.2 (Apple Git-113)

  parent reply	other threads:[~2018-12-25 19:19 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-12-25 19:19 [PATCH v2 0/6] SWIM Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 1/6] [RAW] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 2/6] [RAW] swim: introduce failure detection component Vladislav Shpilevoy
2018-12-25 19:19 ` [PATCH v2 3/6] [RAW] swim: introduce a dissemination component Vladislav Shpilevoy
2018-12-25 19:19 ` Vladislav Shpilevoy [this message]
2018-12-25 19:19 ` [PATCH v2 5/6] [RAW] swim: send one UDP packet per EV_WRITE event Vladislav Shpilevoy
2018-12-26 21:01 ` [tarantool-patches] [PATCH v2 0/6] SWIM Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=3209be496909a01580ea482136ce4552c7de7999.1545765055.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH v2 4/6] [RAW] swim: keep encoded round message cached' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox