[tarantool-patches] [PATCH v3 1/6] [RAW] swim: introduce SWIM's anti-entropy component

Konstantin Osipov kostja at tarantool.org
Wed Jan 9 12:12:11 MSK 2019


* Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/12/29 15:07]:
> +enum {
> +	/** How often to send membership messages and pings. */

.. in seconds (please add these two words to the comment).

> +	HEARTBEAT_RATE_DEFAULT = 1,
> +};
> +

                       \ is /                        modulO
> + * Take a random number not blindly calculating a module, but

It took me some time to wrap my head around this
sentence before I cracked what's a module.
                                           boundaries to preserve the original
> + * scaling random number down the given borders to save
> + * distribution. A result belongs the range [start, end].
                    The result belongs to the range 
> + */
> +static inline int
> +swim_scaled_rand(int start, int end)
> +{
> +	assert(end > start);
> +	return rand() / (RAND_MAX / (end - start + 1) + 1);
> +}
> +
> +	 * Global hash of all known members of the cluster. Hash
> +	 * key is bitwise combination of ip and port

What's a bitwise combination? I see later that you shift the IP address
and concatenate it with the port value. It would be a
concatenation then. Why not simply run the entire sockarddr_in
through murmur, this would be portable across transports 
and reliably random? The good old days of the trick you used in
sockaddr_in_hash are bygone IMHO.

You can easily remember the hash value in swim_member struct,
so that all hash table lookups are easy.

> +	 * struct member, describing a remote instance. The only
> +	 * purpose of such strange hash function is to be able to
> +	 * reuse mh_i64ptr_t instead of introducing one more
> +	 * implementation of mhash.
> +	 *
> +	 * Discovered members live here until they are
> +	 * unavailable - in such a case they are removed from the
> +	 * hash. But a subset of members are pinned - the ones
> +	 * added explicitly via API. When a member is pinned, it
> +	 * can not be removed from the hash, and the module will
> +	 * ping him constantly.

Great comments btw, makes the whole thing easy to understand. I
will stop by your desk to fix a couple of grammar issues I found.

> +static inline uint64_t
> +sockaddr_in_hash(const struct sockaddr_in *a)
> +{
> +	return ((uint64_t) a->sin_addr.s_addr << 16) | a->sin_port;
> +}

See my comment re hash function.

> +struct PACKED swim_member_bin {
> +	/** mp_encode_map(3) */
> +	uint8_t m_header;
> +
> +	/** mp_encode_uint(SWIM_MEMBER_STATUS) */
> +	uint8_t k_status;
> +	/** mp_encode_uint(enum member_status) */
> +	uint8_t v_status;
> +
> +	/** mp_encode_uint(SWIM_MEMBER_ADDRESS) */
> +	uint8_t k_addr;
> +	/** mp_encode_uint(addr.sin_addr.s_addr) */
> +	uint8_t m_addr;
> +	uint32_t v_addr;
> +
> +	/** mp_encode_uint(SWIM_MEMBER_PORT) */
> +	uint8_t k_port;
> +	/** mp_encode_uint(addr.sin_port) */
> +	uint8_t m_port;
> +	uint16_t v_port;
> +};

Please create a patch for the docs extending the binary protocol
with these new messages. I would also ponder a bit more about
extending iproto_constants.h with swim command codes in case we
ever want to transport them over iproto.

> + *     SWIM_ANTI_ENTROPY: [
> + *         {
> + *             SWIM_MEMBER_STATUS: uint, enum member_status,
> + *             SWIM_MEMBER_ADDRESS: uint, ip,
> + *             SWIM_MEMBER_PORT: uint, port
> + *         },

Is this going to work only over ip network? Why not use server
uuids as member identifiers? 

> +	for (mh_int_t node = mh_first(members), end = mh_end(members);
> +	     node != end; node = mh_next(members, node), ++i) {
> +		shuffled[i] = (struct swim_member *)
> +			mh_i64ptr_node(members, node)->val;
> +		int j = swim_scaled_rand(0, i);
> +		SWAP(shuffled[i], shuffled[j]);
> +	}

Please add a comment that this method preserves even distribution
of a random sequence, ideally explaining why, or at least
mentioning that we tested it.

> +/**
> + * Encode anti-entropy header and members data as many as
> + * possible to the end of a last packet.
> + * @retval -1 Error.
> + * @retval 0 Not error, but nothing is encoded.
> + * @retval 1 Something is encoded.
> + */

I would appreciate a bit more lengthy comment about your chunking
strategy when pushing this data over UDP. Do you prepare all
chunks and then send them? Do you fill a chunk and flush it along
the way? What is chunk size (it's a pity it's not part of any 
function signature, it's the defining constraint of every function
below. Do you have static asserts for the case when msgpack packet
doesn't fit the udp packet?

> +
> +static int
> +swim_process_member_key(enum swim_member_key key, const char **pos,
> +			const char *end, const char *msg_pref,
> +			struct swim_member_def *def)

No comment. What's going on here? 

> +{
> +	switch(key) {
    Missing space.

> +	case SWIM_MEMBER_STATUS:
> +		if (mp_typeof(**pos) != MP_UINT ||
> +		    mp_check_uint(*pos, end) > 0) {
> +			say_error("%s member status should be uint", msg_pref);
> +			return -1;
> +		}
> +		key = mp_decode_uint(pos);
> +		if (key >= swim_member_status_MAX) {
> +			say_error("%s unknown member status", msg_pref);
> +			return -1;
> +		}
> +		def->status = (enum swim_member_status) key;
> +		break;
> +	case SWIM_MEMBER_ADDRESS:
> +		if (mp_typeof(**pos) != MP_UINT ||
> +		    mp_check_uint(*pos, end) > 0) {
> +			say_error("%s member address should be uint", msg_pref);
> +			return -1;
> +		}
> +		def->addr.sin_addr.s_addr = mp_decode_uint(pos);
> +		break;
> +	case SWIM_MEMBER_PORT:
> +		if (mp_typeof(**pos) != MP_UINT ||
> +		    mp_check_uint(*pos, end) > 0) {
> +			say_error("%s member port should be uint", msg_pref);
> +			return -1;
> +		}
> +		uint64_t port = mp_decode_uint(pos);
> +		if (port > UINT16_MAX) {
> +			say_error("%s member port is invalid", msg_pref);
> +			return -1;
> +		}
> +		def->addr.sin_port = port;
> +		break;
> +	default:
> +		unreachable();
> +	}
> +	return 0;
> +}

OK, I understand now it's merely updating a member configuration.
But then what's the event flow of such update? What happens after
the update? I know from the public API that we suspend all
activities while update is in progress, but this is not so obvious
when you reading the code.

> +/** Decode an anti-entropy message, update members table. */
> +static int
> +swim_process_anti_entropy(struct swim *swim, const char **pos, const char *end)

Why not swim_decode_whatever? 
process is a very general world, it could mean any action.

> +{
> +	const char *msg_pref = "Invalid SWIM anti-entropy message:";
> +	if (mp_typeof(**pos) != MP_ARRAY || mp_check_array(*pos, end) > 0) {
> +		say_error("%s message should be an array", msg_pref);
> +		return -1;
> +	}
> +	uint64_t size = mp_decode_array(pos);
> +	for (uint64_t i = 0; i < size; ++i) {
> +		if (mp_typeof(**pos) != MP_MAP ||
> +		    mp_check_map(*pos, end) > 0) {
> +			say_error("%s member should be map", msg_pref);
> +			return -1;
> +		}
> +		uint64_t map_size = mp_decode_map(pos);
> +		struct swim_member_def def;
> +		swim_member_def_create(&def);
> +		for (uint64_t j = 0; j < map_size; ++j) {
> +			if (mp_typeof(**pos) != MP_UINT ||
> +			    mp_check_uint(*pos, end) > 0) {
> +				say_error("%s member key should be uint",
> +					  msg_pref);
> +				return -1;
> +			}
> +			uint64_t key = mp_decode_uint(pos);
> +			if (key >= swim_member_key_MAX) {
> +				say_error("%s unknown member key", msg_pref);
> +				return -1;
> +			}
> +			if (swim_process_member_key(key, pos, end, msg_pref,
> +						    &def) != 0)
> +				return -1;
> +		}
> +		if (def.addr.sin_port == 0 || def.addr.sin_addr.s_addr == 0) {
> +			say_error("%s member address should be specified",
> +				  msg_pref);
> +			return -1;
> +		}
> +		swim_process_member_update(swim, &def);

Why not swim_udpate_member? 

Someone needs to check that there is no buffer overflow in this
code (I will check in the next review round).

> +/**
> + * Convert a string URI like "ip:port" to sockaddr_in structure.
> + */
> +static int
> +uri_to_addr(const char *str, struct sockaddr_in *addr)
> +{
> +	struct uri uri;
> +	if (uri_parse(&uri, str) != 0 || uri.service == NULL)
> +		goto invalid_uri;
> +	in_addr_t iaddr;
> +	if (uri.host_len == strlen(URI_HOST_UNIX) &&
> +	    memcmp(uri.host, URI_HOST_UNIX, uri.host_len) == 0) {
> +		diag_set(IllegalParams, "Unix sockets are not supported");
> +		return -1;
> +	}
> +	if (uri.host_len == 0) {
> +		iaddr = htonl(INADDR_ANY);
> +	} else if (uri.host_len == 9 && memcmp("localhost", uri.host, 9) == 0) {
> +		iaddr = htonl(INADDR_LOOPBACK);
> +	} else {
> +		iaddr = inet_addr(tt_cstr(uri.host, uri.host_len));
> +		if (iaddr == (in_addr_t) -1)
> +			goto invalid_uri;
> +	}
> +	int port = htons(atoi(uri.service));
> +	memset(addr, 0, sizeof(*addr));
> +	addr->sin_family = AF_INET;
> +	addr->sin_addr.s_addr = iaddr;
> +	addr->sin_port = port;
> +	return 0;
> +
> +invalid_uri:
> +	diag_set(SocketError, sio_socketname(-1), "invalid uri \"%s\"", str);
> +	return -1;
> +}

Shouldn't this be part of sio (it's a utility function). 

> +	}
> +	if (swim_scheduler_bind(&swim->scheduler, &addr) != 0) {
> +		swim_member_delete(swim, new_self);
> +		return -1;

Why is this a scheduler function, not member function?

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov



More information about the Tarantool-patches mailing list