[tarantool-patches] Re: [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Jan 15 17:42:08 MSK 2019


Hi! Thanks for the review! See my answers inlined.

On 09/01/2019 12:12, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/12/29 15:07]:
>> +enum {
>> +	/** How often to send membership messages and pings. */
> 
> .. in seconds (please add these two words to the comment).
> 
>> +	HEARTBEAT_RATE_DEFAULT = 1,
>> +};
>> +

@@ -87,7 +87,10 @@
   */
  
  enum {
-	/** How often to send membership messages and pings. */
+	/**
+	 * How often to send membership messages and pings in
+	 * seconds.
+	 */
  	HEARTBEAT_RATE_DEFAULT = 1,
  };

> 
>                         \ is /                        modulO
>> + * Take a random number not blindly calculating a module, but

I did not add 'is' on purpose - here 'take' is not a noun, but verb,
and calculating is an adjective.

@@ -95,7 +95,7 @@ enum {
  };
  
  /**
- * Take a random number not blindly calculating a module, but
+ * Take a random number not blindly calculating a modulo, but
   * scaling random number down the given borders to save
   * distribution. A result belongs the range [start, end].
   */

> 
> It took me some time to wrap my head around this
> sentence before I cracked what's a module.
>                                             boundaries to preserve the original
>> + * scaling random number down the given borders to save
>> + * distribution. A result belongs the range [start, end].
>                      The result belongs to the range

@@ -96,8 +96,9 @@ enum {
  
  /**
   * Take a random number not blindly calculating a modulo, but
- * scaling random number down the given borders to save
- * distribution. A result belongs the range [start, end].
+ * scaling random number down the given boundaries to preserve the
+ * original distribution. The result belongs the range
+ * [start, end].
   */
  static inline int
  swim_scaled_rand(int start, int end)

>> + */
>> +static inline int
>> +swim_scaled_rand(int start, int end)
>> +{
>> +	assert(end > start);
>> +	return rand() / (RAND_MAX / (end - start + 1) + 1);
>> +}
>> +
>> +	 * Global hash of all known members of the cluster. Hash
>> +	 * key is bitwise combination of ip and port
> 
> What's a bitwise combination? I see later that you shift the IP address
> and concatenate it with the port value. It would be a
> concatenation then.

I do not want to adhere to a concrete way of bit combination here - it
is encapsulated in sockaddr_in_hash().

> Why not simply run the entire sockarddr_in
> through murmur, this would be portable across transports
> and reliably random? The good old days of the trick you used in
> sockaddr_in_hash are bygone IMHO.

I do not like complex hashes here like murmur. Also, struct
sockaddr_in is allowed to have move fields than sin_port and
sin_addr.s_addr, so it is not portable to take a hash of the entire
struct. For example, I could create the struct on the stack and fill
its sin_addr and sin_port, and get a struct from recvfrom with the
same sin_addr and sin_port, but with filled other fields - hashes
would be different.

> 
> You can easily remember the hash value in swim_member struct,
> so that all hash table lookups are easy.

I do not want to store hash here because mh_i64ptr already stores hashes
in mh_i64ptr_node.

> 
>> +	 * struct member, describing a remote instance. The only
>> +	 * purpose of such strange hash function is to be able to
>> +	 * reuse mh_i64ptr_t instead of introducing one more
>> +	 * implementation of mhash.
>> +	 *
>> +	 * Discovered members live here until they are
>> +	 * unavailable - in such a case they are removed from the
>> +	 * hash. But a subset of members are pinned - the ones
>> +	 * added explicitly via API. When a member is pinned, it
>> +	 * can not be removed from the hash, and the module will
>> +	 * ping him constantly.
> 
> Great comments btw, makes the whole thing easy to understand. I
> will stop by your desk to fix a couple of grammar issues I found.

Ok, looking forward.

> 
>> +static inline uint64_t
>> +sockaddr_in_hash(const struct sockaddr_in *a)
>> +{
>> +	return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
>> +}
> 
>> +struct PACKED swim_member_bin {
>> +	/** mp_encode_map(3) */
>> +	uint8_t m_header;
>> +
>> +	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
>> +	uint8_t k_status;
>> +	/** mp_encode_uint(enum member_status) */
>> +	uint8_t v_status;
>> +
>> +	/** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
>> +	uint8_t k_addr;
>> +	/** mp_encode_uint(addr.sin_addr.s_addr) */
>> +	uint8_t m_addr;
>> +	uint32_t v_addr;
>> +
>> +	/** mp_encode_uint(SWIM_MEMBER_PORT) */
>> +	uint8_t k_port;
>> +	/** mp_encode_uint(addr.sin_port) */
>> +	uint8_t m_port;
>> +	uint16_t v_port;
>> +};
> 
> Please create a patch for the docs extending the binary protocol
> with these new messages. I would also ponder a bit more about
> extending iproto_constants.h with swim command codes in case we
> ever want to transport them over iproto.

I guess, it is better to write a doc request on the whole swim module
once we agree on the protocol details and API. Concerning iproto - I
think we can move constants when we decide to transport swim over
iproto, *if* we decide it ever. What is more, iproto is box/, swim is
lib/ and moving constants to iproto requires making swim part of box.
I do not like it, however as an alternative in future it is possible
to move some swim constants into a separate header like swim_bin.h and
include it to iproto_constants.h. But now it makes no sense.

> 
>> + *     SWIM_ANTI_ENTROPY: [
>> + *         {
>> + *             SWIM_MEMBER_STATUS: uint, enum member_status,
>> + *             SWIM_MEMBER_ADDRESS: uint, ip,
>> + *             SWIM_MEMBER_PORT: uint, port
>> + *         },
> 
> Is this going to work only over ip network? Why not use server
> uuids as member identifiers?

Yes, IP, and only in UDP which was approved by you. UUIDs are useless
in terms of transport - you can not send a packet to UUID. Only to ip
and possibly port. But as verbally discussed, UUIDs can be used as an
identifier in local membership table. We have decided to implement
them, and I will do it later.

> 
>> +	for (mh_int_t node = mh_first(members), end = mh_end(members);
>> +	     node != end; node = mh_next(members, node), ++i) {
>> +		shuffled[i] = (struct swim_member *)
>> +			mh_i64ptr_node(members, node)->val;
>> +		int j = swim_scaled_rand(0, i);
>> +		SWAP(shuffled[i], shuffled[j]);
>> +	}
> 
> Please add a comment that this method preserves even distribution
> of a random sequence, ideally explaining why, or at least
> mentioning that we tested it.

@@ -377,6 +377,10 @@ swim_shuffle_members(struct swim *swim)
  	shuffled = new_shuffled;
  	swim->shuffled_members = new_shuffled;
  	int i = 0;
+	/*
+	 * This shuffling preserves even distribution of a random
+	 * sequence, that is proved by testing.
+	 */
  	for (mh_int_t node = mh_first(members), end = mh_end(members);
  	     node != end; node = mh_next(members, node), ++i) {
  		shuffled[i] = (struct swim_member *)

> 
>> +/**
>> + * Encode anti-entropy header and members data as many as
>> + * possible to the end of a last packet.
>> + * @retval -1 Error.
>> + * @retval 0 Not error, but nothing is encoded.
>> + * @retval 1 Something is encoded.
>> + */
> 
> I would appreciate a bit more lengthy comment about your chunking
> strategy when pushing this data over UDP. Do you prepare all
> chunks and then send them? Do you fill a chunk and flush it along
> the way? What is chunk size (it's a pity it's not part of any
> function signature, it's the defining constraint of every function
> below. Do you have static asserts for the case when msgpack packet
> doesn't fit the udp packet?

This function is not a place for transport details, but I will answer
your questions below nonetheless.

> Do I prepare all chunks and then send them?

I guess, under chunks you mean packets. Yes, I prepare all packets
and then send them.

> Do I fill a chunk and flush it along the way?

No, it is not possible, until I did not receive EV_WRITE event.

> What is chunk size?

See swim_io.h UDP_PACKET_SIZE.

> "it's a pity it's not part of any function signature".

And it should not be part of any function signature. UDP
packet size is known (unless MTU is modified by admin, but
even in such a case a new MTU would be stored in struct swim).

> Do you have static asserts for the case when msgpack packet
   doesn't fit the udp packet?

No. During packing if a packet size is exceeded I just fill its
header and start a new packet. During unpacking (process_...()
functions) I do not care about packet size nor packets concept
at all - I just parse msgpack in a buffer of a given size.

> 
>> +
>> +static int
>> +swim_process_member_key(enum swim_member_key key, const char **pos,
>> +			const char *end, const char *msg_pref,
>> +			struct swim_member_def *def)
> 
> No comment. What's going on here?
Parsing of a single swim_member_key value and filling swim_member_def.
I thought it is obvious from the function name, but as you wish, I will
write a comment.

@@ -544,12 +544,23 @@ swim_process_member_update(struct swim *swim, struct swim_member_def *def)
  	}
  }
  
+/**
+ * Decode a MessagePack value of @a key and store it in @a def.
+ * @param key Key to read value of.
+ * @param[in][out] pos Where a value is stored.
+ * @param end End of the buffer.
+ * @param msg_pref Error message prefix.
+ * @param[out] def Where to store the value.
+ *
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
  static int

> 
>> +{
>> +	switch(key) {
>      Missing space.

  swim_process_member_key(enum swim_member_key key, const char **pos,
  			const char *end, const char *msg_pref,
  			struct swim_member_def *def)
  {
-	switch(key) {
+	switch (key) {
  	case SWIM_MEMBER_STATUS:

> 
>> +	case SWIM_MEMBER_STATUS:
>> +		if (mp_typeof(**pos) != MP_UINT ||
>> +		    mp_check_uint(*pos, end) > 0) {
>> +			say_error("%s member status should be uint", msg_pref);
>> +			return -1;
>> +		}
>> +		key = mp_decode_uint(pos);
>> +		if (key >= swim_member_status_MAX) {
>> +			say_error("%s unknown member status", msg_pref);
>> +			return -1;
>> +		}
>> +		def->status = (enum swim_member_status) key;
>> +		break;
>> +	case SWIM_MEMBER_ADDRESS:
>> +		if (mp_typeof(**pos) != MP_UINT ||
>> +		    mp_check_uint(*pos, end) > 0) {
>> +			say_error("%s member address should be uint", msg_pref);
>> +			return -1;
>> +		}
>> +		def->addr.sin_addr.s_addr = mp_decode_uint(pos);
>> +		break;
>> +	case SWIM_MEMBER_PORT:
>> +		if (mp_typeof(**pos) != MP_UINT ||
>> +		    mp_check_uint(*pos, end) > 0) {
>> +			say_error("%s member port should be uint", msg_pref);
>> +			return -1;
>> +		}
>> +		uint64_t port = mp_decode_uint(pos);
>> +		if (port > UINT16_MAX) {
>> +			say_error("%s member port is invalid", msg_pref);
>> +			return -1;
>> +		}
>> +		def->addr.sin_port = port;
>> +		break;
>> +	default:
>> +		unreachable();
>> +	}
>> +	return 0;
>> +}
> 
> OK, I understand now it's merely updating a member configuration.

It is not updating of a member configuration, but filling of swim_member_def
structure. Like opts_parse_key.

> But then what's the event flow of such update? What happens after
> the update? I know from the public API that we suspend all
> activities while update is in progress, but this is not so obvious
> when you reading the code.

Nothing happens after the update in this commit. A struct swim_member is
just added to a members table and never changes nor disappears.

Also, I do not understand what do you mean saying "we suspend all
activities while update is in progress". Of course we suspend, since it is
TX thread. Any action in TX thread suspend all other actions.

> 
>> +/** Decode an anti-entropy message, update members table. */
>> +static int
>> +swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
> 
> Why not swim_decode_whatever?

Because it is not just decoding. After decoding of each member actions
may be performed to update/add/delete a member.

> process is a very general world, it could mean any action.

Exactly. Any action may be performed during decoding.

> 
>> +{
>> +	const char *msg_pref = "Invalid SWIM anti-entropy message:";
>> +	if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
>> +		say_error("%s message should be an array", msg_pref);
>> +		return -1;
>> +	}
>> +	uint64_t size = mp_decode_array(pos);
>> +	for (uint64_t i = 0; i < size; ++i) {
>> +		if (mp_typeof(**pos) != MP_MAP ||
>> +		    mp_check_map(*pos, end) > 0) {
>> +			say_error("%s member should be map", msg_pref);
>> +			return -1;
>> +		}
>> +		uint64_t map_size = mp_decode_map(pos);
>> +		struct swim_member_def def;
>> +		swim_member_def_create(&def);
>> +		for (uint64_t j = 0; j < map_size; ++j) {
>> +			if (mp_typeof(**pos) != MP_UINT ||
>> +			    mp_check_uint(*pos, end) > 0) {
>> +				say_error("%s member key should be uint",
>> +					  msg_pref);
>> +				return -1;
>> +			}
>> +			uint64_t key = mp_decode_uint(pos);
>> +			if (key >= swim_member_key_MAX) {
>> +				say_error("%s unknown member key", msg_pref);
>> +				return -1;
>> +			}
>> +			if (swim_process_member_key(key, pos, end, msg_pref,
>> +						    &def) != 0)
>> +				return -1;
>> +		}
>> +		if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
>> +			say_error("%s member address should be specified",
>> +				  msg_pref);
>> +			return -1;
>> +		}
>> +		swim_process_member_update(swim, &def);
> 
> Why not swim_udpate_member?

For names consistency - process_<component>, process_update, ... .
But as you wish.

@@ -530,7 +530,7 @@ swim_member_def_create(struct swim_member_def *def)
  }
  
  static void
-swim_process_member_update(struct swim *swim, struct swim_member_def *def)
+swim_update_member(struct swim *swim, struct swim_member_def *def)
  {
         struct swim_member *member = swim_find_member(swim, &def->addr);
         /*
@@ -641,7 +641,7 @@ swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)
                                   msg_pref);
                         return -1;
                 }
-               swim_process_member_update(swim, &def);
+               swim_update_member(swim, &def);
         }
         return 0;

> 
> Someone needs to check that there is no buffer overflow in this
> code (I will check in the next review round).

Where is a buffer to overflow? If you mean checking of the buffer
borders, I do it via mp_check_...() before decoding of any value.

> 
>> +/**
>> + * Convert a string URI like "ip:port" to sockaddr_in structure.
>> + */
>> +static int
>> +uri_to_addr(const char *str, struct sockaddr_in *addr)
>> +{
>> +	struct uri uri;
>> +	if (uri_parse(&uri, str) != 0 || uri.service == NULL)
>> +		goto invalid_uri;
>> +	in_addr_t iaddr;
>> +	if (uri.host_len == strlen(URI_HOST_UNIX) &&
>> +	    memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) {
>> +		diag_set(IllegalParams, "Unix sockets are not supported");
>> +		return -1;
>> +	}
>> +	if (uri.host_len == 0) {
>> +		iaddr = htonl(INADDR_ANY);
>> +	} else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) {
>> +		iaddr = htonl(INADDR_LOOPBACK);
>> +	} else {
>> +		iaddr = inet_addr(tt_cstr(uri.host, uri.host_len));
>> +		if (iaddr == (in_addr_t) -1)
>> +			goto invalid_uri;
>> +	}
>> +	int port = htons(atoi(uri.service));
>> +	memset(addr, 0, sizeof(*addr));
>> +	addr->sin_family = AF_INET;
>> +	addr->sin_addr.s_addr = iaddr;
>> +	addr->sin_port = port;
>> +	return 0;
>> +
>> +invalid_uri:
>> +	diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str);
>> +	return -1;
>> +}
> 
> Shouldn't this be part of sio (it's a utility function).

Done in a separate commit.

> 
>> +	}
>> +	if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) {
>> +		swim_member_delete(swim, new_self);
>> +		return -1;
> 
> Why is this a scheduler function, not member function?
> 

Because I do not bind a member. I bind the whole struct swim to an
address. All communication with each member goes then through this
address.



More information about the Tarantool-patches mailing list