[PATCH v3 4/6] [RAW] swim: keep encoded round message cached

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Dec 29 13:14:13 MSK 2018


During a SWIM round a message is being handed out consisting of
at most 3 sections. Parts of the message change rarely, by
member attributes update and by removal of some of them. So it is
possible to cache the message and send it during several round
steps in a row. Or even do not rebuild it the whole round.

Also, it allows to send message parts on separate libev EV_WRITE
events, because now the message is stored globally and can be
iterated from different events.

Part of #3234
---
 src/lib/swim/swim.c | 35 ++++++++++++++++++++++++++---------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 4e7ffbc54..7dff22dd5 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -246,6 +246,8 @@ struct swim {
 	 * and preallocated per SWIM instance.
 	 */
 	struct swim_task round_step_task;
+	/** True, if msg in round_step_task is up to date. */
+	bool is_round_msg_valid;
 	/** Transport to send/receive data. */
 	const struct swim_transport *transport;
 	/** Scheduler of output requests. */
@@ -283,6 +285,12 @@ sockaddr_in_hash(const struct sockaddr_in *a)
 	return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
 }
 
+static inline void
+cached_round_msg_invalidate(struct swim *swim)
+{
+	swim->is_round_msg_valid = false;
+}
+
 /**
  * Main round messages can carry merged failure detection
  * messages and anti-entropy. With these keys the components can
@@ -583,6 +591,7 @@ static void
 swim_member_status_is_updated(struct swim *swim, struct swim_member *member)
 {
 	swim_schedule_event(swim, member);
+	cached_round_msg_invalidate(swim);
 }
 
 /**
@@ -620,6 +629,7 @@ swim_member_update_status(struct swim *swim, struct swim_member *member,
 static void
 swim_member_delete(struct swim *swim, struct swim_member *member)
 {
+	cached_round_msg_invalidate(swim);
 	uint64_t key = sockaddr_in_hash(&member->addr);
 	mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL);
 	assert(rc != mh_end(swim->members));
@@ -711,6 +721,7 @@ swim_shuffle_members(struct swim *swim)
 		int j = swim_scaled_rand(0, i);
 		SWAP(shuffled[i], shuffled[j]);
 	}
+	cached_round_msg_invalidate(swim);
 	return 0;
 }
 
@@ -851,9 +862,12 @@ swim_encode_dissemination(struct swim *swim, struct swim_msg *msg)
 
 /** Encode SWIM components into a sequence of UDP packets. */
 static int
-swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
+swim_encode_round_msg(struct swim *swim)
 {
-	swim_msg_create(msg);
+	if (swim->is_round_msg_valid)
+		return 0;
+	struct swim_msg *msg = &swim->round_step_task.msg;
+	swim_msg_reset(msg);
 	struct swim_packet *packet = swim_msg_reserve(msg, 1);
 	if (packet == NULL)
 		return -1;
@@ -879,7 +893,7 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg)
 	mp_encode_map(header, map_size);
 	return 0;
 error:
-	swim_msg_destroy(msg);
+	cached_round_msg_invalidate(swim);
 	return -1;
 }
 
@@ -890,8 +904,10 @@ swim_decrease_events_ttl(struct swim *swim)
 	struct swim_member *member, *tmp;
 	rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events,
 				 tmp) {
-		if (--member->status_ttl == 0)
+		if (--member->status_ttl == 0) {
 			rlist_del_entry(member, in_queue_events);
+			cached_round_msg_invalidate(swim);
+		}
 	}
 }
 
@@ -915,9 +931,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events)
 	 */
 	if (rlist_empty(&swim->queue_round))
 		return;
-
-	struct swim_msg *msg = &swim->round_step_task.msg;
-	if (swim_encode_round_msg(swim, msg) != 0) {
+	if (swim_encode_round_msg(swim) != 0) {
 		diag_log();
 		return;
 	}
@@ -937,7 +951,6 @@ swim_round_step_complete(struct swim_task *task)
 	struct swim *swim = container_of(task, struct swim, round_step_task);
 	rlist_shift_entry(&swim->queue_round, struct swim_member,
 			  in_queue_round);
-	swim_msg_reset(&task->msg);
 	ev_periodic_start(loop(), &swim->round_tick);
 }
 
@@ -1442,7 +1455,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate,
 	if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0)
 		ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL);
 
-	swim->self = new_self;
+	if (new_self != NULL) {
+		swim->self = new_self;
+		cached_round_msg_invalidate(swim);
+	}
 	if (new_transport != NULL) {
 		swim->transport = new_transport;
 		swim_scheduler_set_transport(&swim->scheduler, new_transport);
@@ -1518,4 +1534,5 @@ swim_delete(struct swim *swim)
 	}
 	mh_i64ptr_delete(swim->members);
 	free(swim->shuffled_members);
+	cached_round_msg_invalidate(swim);
 }
-- 
2.17.2 (Apple Git-113)




More information about the Tarantool-patches mailing list