From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com Subject: [PATCH 5/5] swim: keep encoded round message cached Date: Mon, 17 Dec 2018 15:53:23 +0300 [thread overview] Message-ID: <7d4800682ff4ffabd4d8c887432ecde07a5dc75c.1545047950.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1545047950.git.v.shpilevoy@tarantool.org> In-Reply-To: <cover.1545047950.git.v.shpilevoy@tarantool.org> During a SWIM round a message is being handed out consisting of at most 3 sections. Parts of the message change rarely, by member attributes update, or by removal of some of them. So it is possible to cache the message and send it during several round steps in a row. Or even do not rebuild it the whole round. Follow up #3234 --- src/lib/swim/swim.c | 75 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 12 deletions(-) diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c index df57ef470..a04c1646c 100644 --- a/src/lib/swim/swim.c +++ b/src/lib/swim/swim.c @@ -43,11 +43,11 @@ * * Optional: * - do not send self. - * - cache encoded batch. * - refute immediately. * - indirect ping. * - increment own incarnation on each round. * - attach dst incarnation to ping. + * - fix errors in sio_listen/bind. */ /** @@ -198,6 +198,13 @@ struct swim_member { * Position in a queue of members in the current round. */ struct rlist in_queue_round; + /** + * It is true, if the member is being sent in the current + * SWIM round, so it is encoded into cached round msg. + * When this flag is true, and the member has changed or + * removed, the cached round msg is invalidated. + */ + bool is_being_sent_in_this_round; /** * * Failure detection component @@ -645,6 +652,27 @@ static int cfg_size = 0; */ static struct swim_member **shuffled_members = NULL; static int shuffled_members_size = 0; +/** + * When a new round starts, it builds a new message consisting of + * up to 3 components. This message is then being handed out to + * the cluster members. Most of time the message remains unchanged + * and can be cached to do not rebuild it on each step. It should + * be invalidated in 2 cases only: a member, encoded here, has + * changed its attributes (incarnation, status); a member, encoded + * here, is dead too long and removed. + */ +static char *cached_round_msg = NULL; +/** + * Payload size in the cached_round_msg buffer. Capacity is + * UDP packet size. + */ +static int cached_round_msg_size = 0; + +static inline void +cached_round_msg_invalidate(void) +{ + cached_round_msg_size = 0; +} /** Queue of io tasks ready to push now. */ static RLIST_HEAD(queue_output); @@ -670,6 +698,8 @@ static void swim_member_is_updated(struct swim_member *member) { swim_schedule_event(member); + if (member->is_being_sent_in_this_round) + cached_round_msg_invalidate(); } /** @@ -716,6 +746,7 @@ swim_member_new(const struct sockaddr_in *addr, enum swim_member_status status, } member->status = status; member->addr = *addr; + member->is_being_sent_in_this_round = false; member->incarnation = incarnation; member->is_pinned = false; member->failed_pings = 0; @@ -755,6 +786,8 @@ swim_find_member(const struct sockaddr_in *addr) static inline void swim_member_delete(struct swim_member *member) { + if (member->is_being_sent_in_this_round) + cached_round_msg_invalidate(); uint64_t key = sockaddr_in_hash(&member->addr); mh_int_t rc = mh_i64ptr_find(members, key, NULL); assert(rc != mh_end(members)); @@ -797,6 +830,7 @@ swim_shuffle_members(void) int j = rand() / (RAND_MAX / (i + 1) + 1); SWAP(shuffled_members[i], shuffled_members[j]); } + cached_round_msg_invalidate(); return 0; } @@ -833,12 +867,24 @@ calculate_bin_batch_size(int header_size, int member_size, int avail_size) } static int -swim_encode_round_msg(char *buffer, int size) +swim_encode_round_msg(void) { - char *start = buffer; if ((shuffled_members == NULL || rlist_empty(&queue_round)) && swim_new_round() != 0) return -1; + if (cached_round_msg_size > 0) + return 0; + if (cached_round_msg == NULL) { + cached_round_msg = malloc(UDP_PACKET_SIZE); + if (cached_round_msg == NULL) { + diag_set(OutOfMemory, UDP_PACKET_SIZE, "malloc", + "cached_round_msg"); + return -1; + } + } + char *buffer = cached_round_msg; + int i, size = UDP_PACKET_SIZE; + /* -1 - for the root map header. */ assert((uint)size > sizeof(struct swim_fd_header_bin) + 1); size -= sizeof(struct swim_fd_header_bin) + 1; @@ -871,7 +917,7 @@ swim_encode_round_msg(char *buffer, int size) memcpy(buffer, &diss_header_bin, sizeof(diss_header_bin)); buffer += sizeof(diss_header_bin); - int i = 0; + i = 0; struct swim_member *member, *tmp; struct swim_event_bin event_bin; swim_event_bin_create(&event_bin); @@ -888,8 +934,9 @@ swim_encode_round_msg(char *buffer, int size) event_count -= diss_batch_size; } + i = 0; if (ae_batch_size == 0) - return buffer - start; + goto end; struct swim_anti_entropy_header_bin ae_header_bin; swim_anti_entropy_header_bin_create(&ae_header_bin, ae_batch_size); memcpy(buffer, &ae_header_bin, sizeof(ae_header_bin)); @@ -897,13 +944,18 @@ swim_encode_round_msg(char *buffer, int size) struct swim_member_bin member_bin; swim_member_bin_create(&member_bin); - for (int i = 0; i < ae_batch_size; ++i) { + for (; i < ae_batch_size; ++i) { struct swim_member *member = shuffled_members[i]; + member->is_being_sent_in_this_round = true; swim_member_bin_reset(&member_bin, member); memcpy(buffer, &member_bin, sizeof(member_bin)); buffer += sizeof(member_bin); } - return buffer - start; +end: + for (; i < shuffled_members_size; ++i) + shuffled_members[i]->is_being_sent_in_this_round = false; + cached_round_msg_size = buffer - cached_round_msg; + return 0; } /** @@ -915,9 +967,7 @@ swim_send_round_msg(struct swim_io_task *task) { (void) task; assert(task->cb == swim_send_round_msg); - char buffer[UDP_PACKET_SIZE]; - int size = swim_encode_round_msg(buffer, UDP_PACKET_SIZE); - if (size < 0) { + if (swim_encode_round_msg() != 0) { diag_log(); goto end; } @@ -930,8 +980,9 @@ swim_send_round_msg(struct swim_io_task *task) say_verbose("SWIM: send to %s", sio_strfaddr((struct sockaddr *) &m->addr, sizeof(m->addr))); - if (sio_sendto(output.fd, buffer, size, 0, (struct sockaddr *) &m->addr, - sizeof(m->addr)) == -1 && ! sio_wouldblock(errno)) + if (sio_sendto(output.fd, cached_round_msg, cached_round_msg_size, 0, + (struct sockaddr *) &m->addr, sizeof(m->addr)) == -1 && + ! sio_wouldblock(errno)) diag_log(); swim_member_schedule_ack_wait(m); rlist_del_entry(m, in_queue_round); -- 2.17.2 (Apple Git-113)
prev parent reply other threads:[~2018-12-17 12:53 UTC|newest] Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-12-17 12:53 [PATCH 0/5] SWIM Vladislav Shpilevoy 2018-12-17 12:53 ` [PATCH 1/5] swim: introduce SWIM's anti-entropy component Vladislav Shpilevoy 2018-12-17 12:53 ` [PATCH 2/5] swim: introduce failure detection component Vladislav Shpilevoy 2018-12-17 12:53 ` [PATCH 3/5] swim: introduce a dissemination component Vladislav Shpilevoy 2018-12-17 12:53 ` [PATCH 4/5] swim: introduce "suspected" status Vladislav Shpilevoy 2018-12-17 12:53 ` Vladislav Shpilevoy [this message]
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=7d4800682ff4ffabd4d8c887432ecde07a5dc75c.1545047950.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=vdavydov.dev@gmail.com \ --subject='Re: [PATCH 5/5] swim: keep encoded round message cached' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox