From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Subject: Re: [tarantool-patches] Re: [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component References: <68930a7f6647aaa3f161223470e33a52012a3569.1546077015.git.v.shpilevoy@tarantool.org> <20190109091211.GC20509@chai> From: Vladislav Shpilevoy Message-ID: <9be1d1d0-2e8c-a982-b9eb-af3b413089bd@tarantool.org> Date: Tue, 15 Jan 2019 17:42:08 +0300 MIME-Version: 1.0 In-Reply-To: <20190109091211.GC20509@chai> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit To: tarantool-patches@freelists.org, Konstantin Osipov Cc: vdavydov.dev@gmail.com List-ID: Hi! Thanks for the review! See my answers inlined. On 09/01/2019 12:12, Konstantin Osipov wrote: > * Vladislav Shpilevoy [18/12/29 15:07]: >> +enum { >> + /** How often to send membership messages and pings. */ > > .. in seconds (please add these two words to the comment). > >> + HEARTBEAT_RATE_DEFAULT = 1, >> +}; >> + @@ -87,7 +87,10 @@ */ enum { - /** How often to send membership messages and pings. */ + /** + * How often to send membership messages and pings in + * seconds. + */ HEARTBEAT_RATE_DEFAULT = 1, }; > > \ is / modulO >> + * Take a random number not blindly calculating a module, but I did not add 'is' on purpose - here 'take' is not a noun, but verb, and calculating is an adjective. @@ -95,7 +95,7 @@ enum { }; /** - * Take a random number not blindly calculating a module, but + * Take a random number not blindly calculating a modulo, but * scaling random number down the given borders to save * distribution. A result belongs the range [start, end]. */ > > It took me some time to wrap my head around this > sentence before I cracked what's a module. > boundaries to preserve the original >> + * scaling random number down the given borders to save >> + * distribution. A result belongs the range [start, end]. > The result belongs to the range @@ -96,8 +96,9 @@ enum { /** * Take a random number not blindly calculating a modulo, but - * scaling random number down the given borders to save - * distribution. A result belongs the range [start, end]. + * scaling random number down the given boundaries to preserve the + * original distribution. The result belongs the range + * [start, end]. */ static inline int swim_scaled_rand(int start, int end) >> + */ >> +static inline int >> +swim_scaled_rand(int start, int end) >> +{ >> + assert(end > start); >> + return rand() / (RAND_MAX / (end - start + 1) + 1); >> +} >> + >> + * Global hash of all known members of the cluster. Hash >> + * key is bitwise combination of ip and port > > What's a bitwise combination? I see later that you shift the IP address > and concatenate it with the port value. It would be a > concatenation then. I do not want to adhere to a concrete way of bit combination here - it is encapsulated in sockaddr_in_hash(). > Why not simply run the entire sockarddr_in > through murmur, this would be portable across transports > and reliably random? The good old days of the trick you used in > sockaddr_in_hash are bygone IMHO. I do not like complex hashes here like murmur. Also, struct sockaddr_in is allowed to have move fields than sin_port and sin_addr.s_addr, so it is not portable to take a hash of the entire struct. For example, I could create the struct on the stack and fill its sin_addr and sin_port, and get a struct from recvfrom with the same sin_addr and sin_port, but with filled other fields - hashes would be different. > > You can easily remember the hash value in swim_member struct, > so that all hash table lookups are easy. I do not want to store hash here because mh_i64ptr already stores hashes in mh_i64ptr_node. > >> + * struct member, describing a remote instance. The only >> + * purpose of such strange hash function is to be able to >> + * reuse mh_i64ptr_t instead of introducing one more >> + * implementation of mhash. >> + * >> + * Discovered members live here until they are >> + * unavailable - in such a case they are removed from the >> + * hash. But a subset of members are pinned - the ones >> + * added explicitly via API. When a member is pinned, it >> + * can not be removed from the hash, and the module will >> + * ping him constantly. > > Great comments btw, makes the whole thing easy to understand. I > will stop by your desk to fix a couple of grammar issues I found. Ok, looking forward. > >> +static inline uint64_t >> +sockaddr_in_hash(const struct sockaddr_in *a) >> +{ >> + return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port; >> +} > >> +struct PACKED swim_member_bin { >> + /** mp_encode_map(3) */ >> + uint8_t m_header; >> + >> + /** mp_encode_uint(SWIM_MEMBER_STATUS) */ >> + uint8_t k_status; >> + /** mp_encode_uint(enum member_status) */ >> + uint8_t v_status; >> + >> + /** mp_encode_uint(SWIM_MEMBER_ADDRESS) */ >> + uint8_t k_addr; >> + /** mp_encode_uint(addr.sin_addr.s_addr) */ >> + uint8_t m_addr; >> + uint32_t v_addr; >> + >> + /** mp_encode_uint(SWIM_MEMBER_PORT) */ >> + uint8_t k_port; >> + /** mp_encode_uint(addr.sin_port) */ >> + uint8_t m_port; >> + uint16_t v_port; >> +}; > > Please create a patch for the docs extending the binary protocol > with these new messages. I would also ponder a bit more about > extending iproto_constants.h with swim command codes in case we > ever want to transport them over iproto. I guess, it is better to write a doc request on the whole swim module once we agree on the protocol details and API. Concerning iproto - I think we can move constants when we decide to transport swim over iproto, *if* we decide it ever. What is more, iproto is box/, swim is lib/ and moving constants to iproto requires making swim part of box. I do not like it, however as an alternative in future it is possible to move some swim constants into a separate header like swim_bin.h and include it to iproto_constants.h. But now it makes no sense. > >> + * SWIM_ANTI_ENTROPY: [ >> + * { >> + * SWIM_MEMBER_STATUS: uint, enum member_status, >> + * SWIM_MEMBER_ADDRESS: uint, ip, >> + * SWIM_MEMBER_PORT: uint, port >> + * }, > > Is this going to work only over ip network? Why not use server > uuids as member identifiers? Yes, IP, and only in UDP which was approved by you. UUIDs are useless in terms of transport - you can not send a packet to UUID. Only to ip and possibly port. But as verbally discussed, UUIDs can be used as an identifier in local membership table. We have decided to implement them, and I will do it later. > >> + for (mh_int_t node = mh_first(members), end = mh_end(members); >> + node != end; node = mh_next(members, node), ++i) { >> + shuffled[i] = (struct swim_member *) >> + mh_i64ptr_node(members, node)->val; >> + int j = swim_scaled_rand(0, i); >> + SWAP(shuffled[i], shuffled[j]); >> + } > > Please add a comment that this method preserves even distribution > of a random sequence, ideally explaining why, or at least > mentioning that we tested it. @@ -377,6 +377,10 @@ swim_shuffle_members(struct swim *swim) shuffled = new_shuffled; swim->shuffled_members = new_shuffled; int i = 0; + /* + * This shuffling preserves even distribution of a random + * sequence, that is proved by testing. + */ for (mh_int_t node = mh_first(members), end = mh_end(members); node != end; node = mh_next(members, node), ++i) { shuffled[i] = (struct swim_member *) > >> +/** >> + * Encode anti-entropy header and members data as many as >> + * possible to the end of a last packet. >> + * @retval -1 Error. >> + * @retval 0 Not error, but nothing is encoded. >> + * @retval 1 Something is encoded. >> + */ > > I would appreciate a bit more lengthy comment about your chunking > strategy when pushing this data over UDP. Do you prepare all > chunks and then send them? Do you fill a chunk and flush it along > the way? What is chunk size (it's a pity it's not part of any > function signature, it's the defining constraint of every function > below. Do you have static asserts for the case when msgpack packet > doesn't fit the udp packet? This function is not a place for transport details, but I will answer your questions below nonetheless. > Do I prepare all chunks and then send them? I guess, under chunks you mean packets. Yes, I prepare all packets and then send them. > Do I fill a chunk and flush it along the way? No, it is not possible, until I did not receive EV_WRITE event. > What is chunk size? See swim_io.h UDP_PACKET_SIZE. > "it's a pity it's not part of any function signature". And it should not be part of any function signature. UDP packet size is known (unless MTU is modified by admin, but even in such a case a new MTU would be stored in struct swim). > Do you have static asserts for the case when msgpack packet doesn't fit the udp packet? No. During packing if a packet size is exceeded I just fill its header and start a new packet. During unpacking (process_...() functions) I do not care about packet size nor packets concept at all - I just parse msgpack in a buffer of a given size. > >> + >> +static int >> +swim_process_member_key(enum swim_member_key key, const char **pos, >> + const char *end, const char *msg_pref, >> + struct swim_member_def *def) > > No comment. What's going on here? Parsing of a single swim_member_key value and filling swim_member_def. I thought it is obvious from the function name, but as you wish, I will write a comment. @@ -544,12 +544,23 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def) } } +/** + * Decode a MessagePack value of @a key and store it in @a def. + * @param key Key to read value of. + * @param[in][out] pos Where a value is stored. + * @param end End of the buffer. + * @param msg_pref Error message prefix. + * @param[out] def Where to store the value. + * + * @retval 0 Success. + * @retval -1 Error. + */ static int > >> +{ >> + switch(key) { > Missing space. swim_process_member_key(enum swim_member_key key, const char **pos, const char *end, const char *msg_pref, struct swim_member_def *def) { - switch(key) { + switch (key) { case SWIM_MEMBER_STATUS: > >> + case SWIM_MEMBER_STATUS: >> + if (mp_typeof(**pos) != MP_UINT || >> + mp_check_uint(*pos, end) > 0) { >> + say_error("%s member status should be uint", msg_pref); >> + return -1; >> + } >> + key = mp_decode_uint(pos); >> + if (key >= swim_member_status_MAX) { >> + say_error("%s unknown member status", msg_pref); >> + return -1; >> + } >> + def->status = (enum swim_member_status) key; >> + break; >> + case SWIM_MEMBER_ADDRESS: >> + if (mp_typeof(**pos) != MP_UINT || >> + mp_check_uint(*pos, end) > 0) { >> + say_error("%s member address should be uint", msg_pref); >> + return -1; >> + } >> + def->addr.sin_addr.s_addr = mp_decode_uint(pos); >> + break; >> + case SWIM_MEMBER_PORT: >> + if (mp_typeof(**pos) != MP_UINT || >> + mp_check_uint(*pos, end) > 0) { >> + say_error("%s member port should be uint", msg_pref); >> + return -1; >> + } >> + uint64_t port = mp_decode_uint(pos); >> + if (port > UINT16_MAX) { >> + say_error("%s member port is invalid", msg_pref); >> + return -1; >> + } >> + def->addr.sin_port = port; >> + break; >> + default: >> + unreachable(); >> + } >> + return 0; >> +} > > OK, I understand now it's merely updating a member configuration. It is not updating of a member configuration, but filling of swim_member_def structure. Like opts_parse_key. > But then what's the event flow of such update? What happens after > the update? I know from the public API that we suspend all > activities while update is in progress, but this is not so obvious > when you reading the code. Nothing happens after the update in this commit. A struct swim_member is just added to a members table and never changes nor disappears. Also, I do not understand what do you mean saying "we suspend all activities while update is in progress". Of course we suspend, since it is TX thread. Any action in TX thread suspend all other actions. > >> +/** Decode an anti-entropy message, update members table. */ >> +static int >> +swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) > > Why not swim_decode_whatever? Because it is not just decoding. After decoding of each member actions may be performed to update/add/delete a member. > process is a very general world, it could mean any action. Exactly. Any action may be performed during decoding. > >> +{ >> + const char *msg_pref = "Invalid SWIM anti-entropy message:"; >> + if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) { >> + say_error("%s message should be an array", msg_pref); >> + return -1; >> + } >> + uint64_t size = mp_decode_array(pos); >> + for (uint64_t i = 0; i < size; ++i) { >> + if (mp_typeof(**pos) != MP_MAP || >> + mp_check_map(*pos, end) > 0) { >> + say_error("%s member should be map", msg_pref); >> + return -1; >> + } >> + uint64_t map_size = mp_decode_map(pos); >> + struct swim_member_def def; >> + swim_member_def_create(&def); >> + for (uint64_t j = 0; j < map_size; ++j) { >> + if (mp_typeof(**pos) != MP_UINT || >> + mp_check_uint(*pos, end) > 0) { >> + say_error("%s member key should be uint", >> + msg_pref); >> + return -1; >> + } >> + uint64_t key = mp_decode_uint(pos); >> + if (key >= swim_member_key_MAX) { >> + say_error("%s unknown member key", msg_pref); >> + return -1; >> + } >> + if (swim_process_member_key(key, pos, end, msg_pref, >> + &def) != 0) >> + return -1; >> + } >> + if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) { >> + say_error("%s member address should be specified", >> + msg_pref); >> + return -1; >> + } >> + swim_process_member_update(swim, &def); > > Why not swim_udpate_member? For names consistency - process_, process_update, ... . But as you wish. @@ -530,7 +530,7 @@ swim_member_def_create(struct swim_member_def *def) } static void -swim_process_member_update(struct swim *swim, struct swim_member_def *def) +swim_update_member(struct swim *swim, struct swim_member_def *def) { struct swim_member *member = swim_find_member(swim, &def->addr); /* @@ -641,7 +641,7 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end) msg_pref); return -1; } - swim_process_member_update(swim, &def); + swim_update_member(swim, &def); } return 0; > > Someone needs to check that there is no buffer overflow in this > code (I will check in the next review round). Where is a buffer to overflow? If you mean checking of the buffer borders, I do it via mp_check_...() before decoding of any value. > >> +/** >> + * Convert a string URI like "ip:port" to sockaddr_in structure. >> + */ >> +static int >> +uri_to_addr(const char *str, struct sockaddr_in *addr) >> +{ >> + struct uri uri; >> + if (uri_parse(&uri, str) != 0 || uri.service == NULL) >> + goto invalid_uri; >> + in_addr_t iaddr; >> + if (uri.host_len == strlen(URI_HOST_UNIX) && >> + memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) { >> + diag_set(IllegalParams, "Unix sockets are not supported"); >> + return -1; >> + } >> + if (uri.host_len == 0) { >> + iaddr = htonl(INADDR_ANY); >> + } else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) { >> + iaddr = htonl(INADDR_LOOPBACK); >> + } else { >> + iaddr = inet_addr(tt_cstr(uri.host, uri.host_len)); >> + if (iaddr == (in_addr_t) -1) >> + goto invalid_uri; >> + } >> + int port = htons(atoi(uri.service)); >> + memset(addr, 0, sizeof(*addr)); >> + addr->sin_family = AF_INET; >> + addr->sin_addr.s_addr = iaddr; >> + addr->sin_port = port; >> + return 0; >> + >> +invalid_uri: >> + diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str); >> + return -1; >> +} > > Shouldn't this be part of sio (it's a utility function). Done in a separate commit. > >> + } >> + if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) { >> + swim_member_delete(swim, new_self); >> + return -1; > > Why is this a scheduler function, not member function? > Because I do not bind a member. I bind the whole struct swim to an address. All communication with each member goes then through this address.