From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v2 4/6] [RAW] swim: keep encoded round message cached Date: Tue, 25 Dec 2018 22:19:27 +0300 Message-Id: <3209be496909a01580ea482136ce4552c7de7999.1545765055.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, kostja@tarantool.org List-ID: During a SWIM round a message is being handed out consisting of at most 3 sections. Parts of the message change rarely, by member attributes update and by removal of some of them. So it is possible to cache the message and send it during several round steps in a row. Or even do not rebuild it the whole round. Also, it allows to send message parts on separate libev EV_WRITE events, because now the message is stored globally and can be iterated from different events. Part of #3234 --- src/lib/swim/swim.c | 52 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index 15e079f11..f880066c5 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -345,6 +345,13 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_queue_round; + /** + * It is true, if the member is being sent in the current + * SWIM round, so it is encoded into cached round msg. + * When this flag is true, and the member has changed or + * removed, the cached round msg is invalidated. + */ + bool is_being_sent_in_this_round; /** * * Failure detection component @@ -473,6 +480,17 @@ struct swim { struct ev_periodic wait_ack_tick; /** Queue of events sorted by occurrence time. */ struct rlist queue_events; + /** + * When a new round starts, it builds a new message + * consisting of up to 3 components. This message is then + * being handed out to the cluster members. Most of time + * the message remains unchanged and can be cached to do + * not rebuild it on each step. It should be invalidated + * in 2 cases only: a member, encoded here, has changed + * its attributes (incarnation, status); a member, encoded + * here, is dead too long and removed. + */ + struct swim_msg cached_round_msg; }; static inline uint64_t @@ -481,6 +499,13 @@ sockaddr_in_hash(const struct sockaddr_in *a) return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port; } +static inline void +cached_round_msg_invalidate(struct swim *swim) +{ + swim_msg_destroy(&swim->cached_round_msg); + swim_msg_create(&swim->cached_round_msg); +} + /** * Main round messages can carry merged failure detection * messages and anti-entropy. With these keys the components can @@ -788,6 +813,8 @@ static void swim_member_is_updated(struct swim *swim, struct swim_member *member) { swim_schedule_event(swim, member); + if (member->is_being_sent_in_this_round) + cached_round_msg_invalidate(swim); } /** @@ -876,6 +903,8 @@ swim_find_member(struct swim *swim, const struct sockaddr_in *addr) static inline void swim_member_delete(struct swim *swim, struct swim_member *member) { + if (member->is_being_sent_in_this_round) + cached_round_msg_invalidate(swim); uint64_t key = sockaddr_in_hash(&member->addr); mh_int_t rc = mh_i64ptr_find(swim->members, key, NULL); assert(rc != mh_end(swim->members)); @@ -924,6 +953,7 @@ swim_shuffle_members(struct swim *swim) int j = rand() / (RAND_MAX / (i + 1) + 1); SWAP(shuffled[i], shuffled[j]); } + cached_round_msg_invalidate(swim); return 0; } @@ -1062,9 +1092,9 @@ swim_encode_dissemination(struct swim *swim, struct swim_msg *msg) /** Encode SWIM components into a sequence of UDP packets. */ static int -swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) +swim_encode_round_msg(struct swim *swim) { - swim_msg_create(msg); + struct swim_msg *msg = &swim->cached_round_msg; struct swim_msg_part *part = swim_msg_reserve(msg, 1); if (part == NULL) return -1; @@ -1089,9 +1119,13 @@ swim_encode_round_msg(struct swim *swim, struct swim_msg *msg) assert(mp_sizeof_map(map_size) == 1); mp_encode_map(header, map_size); + for (int i = 0; i < rc; ++i) { + struct swim_member *member = swim->shuffled_members[i]; + member->is_being_sent_in_this_round = true; + } return 0; error: - swim_msg_destroy(msg); + cached_round_msg_invalidate(swim); return -1; } @@ -1126,8 +1160,8 @@ swim_send_round_msg(struct swim_io_task *task) if (rlist_empty(&swim->queue_round)) goto next_round_step; - struct swim_msg msg; - if (swim_encode_round_msg(swim, &msg) != 0) { + if (swim_msg_is_empty(&swim->cached_round_msg) && + swim_encode_round_msg(swim) != 0) { diag_log(); goto next_round_step; } @@ -1137,15 +1171,15 @@ swim_send_round_msg(struct swim_io_task *task) say_verbose("SWIM: send to %s", sio_strfaddr((struct sockaddr *) &m->addr, sizeof(m->addr))); - for (struct swim_msg_part *part = swim_msg_first_part(&msg); - part != NULL; part = swim_msg_part_next(part)) { + for (struct swim_msg_part *part = + swim_msg_first_part(&swim->cached_round_msg); part != NULL; + part = swim_msg_part_next(part)) { if (swim->transport.send_round_msg(swim->output.fd, part->body, part->size, (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1) diag_log(); } - swim_msg_destroy(&msg); swim_member_schedule_ack_wait(swim, m); swim_decrease_events_ttl(swim); rlist_del_entry(m, in_queue_round); @@ -1679,6 +1713,7 @@ swim_new(void) ev_periodic_set(&swim->wait_ack_tick, 0, ACK_TIMEOUT, NULL); swim->wait_ack_tick.data = (void *) swim; rlist_create(&swim->queue_events); + swim_msg_create(&swim->cached_round_msg); return swim; } @@ -1811,4 +1846,5 @@ swim_delete(struct swim *swim) } mh_i64ptr_delete(swim->members); free(swim->shuffled_members); + cached_round_msg_invalidate(swim); } -- 2.17.2 (Apple Git-113)