From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v3 4/6] [RAW] swim: keep encoded round message cached Date: Sat, 29 Dec 2018 13:14:13 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: During a SWIM round a message is being handed out consisting of at most 3 sections. Parts of the message change rarely, by member attributes update and by removal of some of them. So it is possible to cache the message and send it during several round steps in a row. Or even do not rebuild it the whole round. Also, it allows to send message parts on separate libev EV_WRITE events, because now the message is stored globally and can be iterated from different events. Part of #3234 --- src/lib/swim/swim.c | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 4e7ffbc54..7dff22dd5 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -246,6 +246,8 @@ struct swim { * and preallocated per SWIM instance. */ struct swim_task round_step_task; + /** True, if msg in round_step_task is up to date. */ + bool is_round_msg_valid; /** Transport to send/receive data. */ const struct swim_transport *transport; /** Scheduler of output requests. */ @@ -283,6 +285,12 @@ sockaddr_in_hash(const struct sockaddr_in *a) return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port; } +static inline void +cached_round_msg_invalidate(struct swim *swim) +{ + swim->is_round_msg_valid = false; +} + /** * Main round messages can carry merged failure detection * messages and anti-entropy. With these keys the components can @@ -583,6 +591,7 @@ static void swim_member_status_is_updated(struct swim *swim, struct swim_member *member) { swim_schedule_event(swim, member); + cached_round_msg_invalidate(swim); } /** @@ -620,6 +629,7 @@ swim_member_update_status(struct swim *swim, struct swim_member *member, static void swim_member_delete(struct swim *swim, struct swim_member *member) { + cached_round_msg_invalidate(swim); uint64_t key = sockaddr_in_hash(&member->addr); mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL); assert(rc != mh_end(swim->members)); @@ -711,6 +721,7 @@ swim_shuffle_members(struct swim *swim) int j = swim_scaled_rand(0, i); SWAP(shuffled[i], shuffled[j]); } + cached_round_msg_invalidate(swim); return 0; } @@ -851,9 +862,12 @@ swim_encode_dissemination(struct swim *swim, struct swim_msg *msg) /** Encode SWIM components into a sequence of UDP packets. */ static int -swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) +swim_encode_round_msg(struct swim *swim) { - swim_msg_create(msg); + if (swim->is_round_msg_valid) + return 0; + struct swim_msg *msg = &swim->round_step_task.msg; + swim_msg_reset(msg); struct swim_packet *packet = swim_msg_reserve(msg, 1); if (packet == NULL) return -1; @@ -879,7 +893,7 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) mp_encode_map(header, map_size); return 0; error: - swim_msg_destroy(msg); + cached_round_msg_invalidate(swim); return -1; } @@ -890,8 +904,10 @@ swim_decrease_events_ttl(struct swim *swim) struct swim_member *member, *tmp; rlist_foreach_entry_safe(member, &swim->queue_events, in_queue_events, tmp) { - if (--member->status_ttl == 0) + if (--member->status_ttl == 0) { rlist_del_entry(member, in_queue_events); + cached_round_msg_invalidate(swim); + } } } @@ -915,9 +931,7 @@ swim_round_step_begin(struct ev_loop *loop, struct ev_periodic *p, int events) */ if (rlist_empty(&swim->queue_round)) return; - - struct swim_msg *msg = &swim->round_step_task.msg; - if (swim_encode_round_msg(swim, msg) != 0) { + if (swim_encode_round_msg(swim) != 0) { diag_log(); return; } @@ -937,7 +951,6 @@ swim_round_step_complete(struct swim_task *task) struct swim *swim = container_of(task, struct swim, round_step_task); rlist_shift_entry(&swim->queue_round, struct swim_member, in_queue_round); - swim_msg_reset(&task->msg); ev_periodic_start(loop(), &swim->round_tick); } @@ -1442,7 +1455,10 @@ swim_cfg(struct swim *swim, const char *uri, double heartbeat_rate, if (swim->round_tick.interval != heartbeat_rate && heartbeat_rate > 0) ev_periodic_set(&swim->round_tick, 0, heartbeat_rate, NULL); - swim->self = new_self; + if (new_self != NULL) { + swim->self = new_self; + cached_round_msg_invalidate(swim); + } if (new_transport != NULL) { swim->transport = new_transport; swim_scheduler_set_transport(&swim->scheduler, new_transport); @@ -1518,4 +1534,5 @@ swim_delete(struct swim *swim) } mh_i64ptr_delete(swim->members); free(swim->shuffled_members); + cached_round_msg_invalidate(swim); } -- 2.17.2 (Apple Git-113)