[tarantool-patches] Re: [PATCH] replication: fix bug with read-only replica as a bootstrap leader

Vladimir Davydov vdavydov.dev at gmail.com
Fri Apr 13 15:38:46 MSK 2018


On Fri, Apr 13, 2018 at 02:13:34PM +0300, Konstantin Belyavskiy wrote:
> Please take a look at newer version.

When you fix something like this, please include the new version of the
patch in the email.

The patch itself looks OK to me. Posting it here for the record.

> From 983b98a91dcd70a3013de373b55294a6af468e17 Mon Sep 17 00:00:00 2001
> From: Konstantin Belyavskiy <k.belyavskiy at tarantool.org>
> Date: Mon, 9 Apr 2018 16:32:26 +0300
> Subject: [PATCH] replication: fix bug with read-only replica as a bootstrap
>  leader
> 
> When bootstrapping a new cluster, each replica from replicaset can
> be chosen as a leader, but if it is 'read-only', bootstrap will
> failed with an error.
> Fixed it by eliminating read-only replicas from voting by adding
> access rights information to IPROTO_REQUEST_VOTE reply.
> 
> Closes #3257
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 9aa951c3..127a8c90 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -223,7 +223,8 @@ applier_connect(struct applier *applier)
>  		if (row.type != IPROTO_OK)
>  			xrow_decode_error_xc(&row);
>  		vclock_create(&applier->vclock);
> -		xrow_decode_vclock_xc(&row, &applier->vclock);
> +		xrow_decode_request_vote_xc(&row, &applier->vclock,
> +					    &applier->remote_is_ro);
>  	}
>  
>  	applier_set_state(applier, APPLIER_CONNECTED);
> diff --git a/src/box/applier.h b/src/box/applier.h
> index f25d6cb2..392113e2 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -95,6 +95,8 @@ struct applier {
>  	uint32_t version_id;
>  	/** Remote vclock at time of connect. */
>  	struct vclock vclock;
> +	/** Remote peer mode, true if read-only, default: false */
> +	bool remote_is_ro;
>  	/** Remote address */
>  	union {
>  		struct sockaddr addr;
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index db582080..81938ce0 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -60,6 +60,8 @@
>  #include "iproto_constants.h"
>  #include "rmean.h"
>  #include "errinj.h"
> +#include "applier.h"
> +#include "cfg.h"
>  
>  /* The number of iproto messages in flight */
>  enum { IPROTO_MSG_MAX = 768 };
> @@ -1363,9 +1365,10 @@ tx_process_misc(struct cmsg *m)
>  					   ::schema_version);
>  			break;
>  		case IPROTO_REQUEST_VOTE:
> -			iproto_reply_vclock_xc(out, msg->header.sync,
> -					       ::schema_version,
> -					       &replicaset.vclock);
> +			iproto_reply_request_vote_xc(out, msg->header.sync,
> +						     ::schema_version,
> +						     &replicaset.vclock,
> +						     cfg_geti("read_only"));
>  			break;
>  		default:
>  			unreachable();
> diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
> index cd7b1d03..3735a91a 100644
> --- a/src/box/iproto_constants.c
> +++ b/src/box/iproto_constants.c
> @@ -40,10 +40,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
>  		/* 0x04 */	MP_DOUBLE, /* IPROTO_TIMESTAMP */
>  		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_VERSION */
>  		/* 0x06 */	MP_UINT,   /* IPROTO_SERVER_VERSION */
> +		/* 0x07 */	MP_UINT,   /* IPROTO_SERVER_IS_RO */
>  	/* }}} */
>  
>  	/* {{{ unused */
> -		/* 0x07 */	MP_UINT,
>  		/* 0x08 */	MP_UINT,
>  		/* 0x09 */	MP_UINT,
>  		/* 0x0a */	MP_UINT,
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 95184248..35812228 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -58,6 +58,7 @@ enum iproto_key {
>  	IPROTO_TIMESTAMP = 0x04,
>  	IPROTO_SCHEMA_VERSION = 0x05,
>  	IPROTO_SERVER_VERSION = 0x06,
> +	IPROTO_SERVER_IS_RO = 0x07,
>  	/* Leave a gap for other keys in the header. */
>  	IPROTO_SPACE_ID = 0x10,
>  	IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index b1c84d36..e29d4ee1 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -685,6 +685,14 @@ replicaset_leader(void)
>  	replicaset_foreach(replica) {
>  		if (replica->applier == NULL)
>  			continue;
> +		/**
> +		 * While bootstrapping a new cluster,
> +		 * read-only replicas shouldn't be considered
> +		 * as a leader.
> +		 */
> +		if (replica->applier->remote_is_ro &&
> +		    replica->applier->vclock.signature == 0)
> +			continue;
>  		if (leader == NULL) {
>  			leader = replica;
>  			continue;
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index f4852564..b3f81a86 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -261,14 +261,16 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version)
>  }
>  
>  int
> -iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
> -		    const struct vclock *vclock)
> +iproto_reply_request_vote(struct obuf *out, uint64_t sync,
> +			  uint32_t schema_version, const struct vclock *vclock,
> +			  bool read_only)
>  {
>  	uint32_t replicaset_size = vclock_size(vclock);
> -	size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
> +	size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(2) +
>  		mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(replicaset_size) +
>  		replicaset_size * (mp_sizeof_uint(UINT32_MAX) +
> -				   mp_sizeof_uint(UINT64_MAX));
> +				   mp_sizeof_uint(UINT64_MAX)) +
> +		mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(true);
>  
>  	char *buf = obuf_reserve(out, max_size);
>  	if (buf == NULL) {
> @@ -278,7 +280,9 @@ iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
>  	}
>  
>  	char *data = buf + IPROTO_HEADER_LEN;
> -	data = mp_encode_map(data, 1);
> +	data = mp_encode_map(data, 2);
> +	data = mp_encode_uint(data, IPROTO_SERVER_IS_RO);
> +	data = mp_encode_bool(data, read_only);
>  	data = mp_encode_uint(data, IPROTO_VCLOCK);
>  	data = mp_encode_map(data, replicaset_size);
>  	struct vclock_iterator it;
> @@ -837,7 +841,7 @@ xrow_encode_subscribe(struct xrow_header *row,
>  int
>  xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>  		      struct tt_uuid *instance_uuid, struct vclock *vclock,
> -		      uint32_t *version_id)
> +		      uint32_t *version_id, bool *read_only)
>  {
>  	if (row->bodycnt == 0) {
>  		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
> @@ -852,6 +856,9 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>  		return -1;
>  	}
>  
> +	/* For backward compatibility initialize read-only with false. */
> +	if (read_only)
> +		*read_only = false;
>  	const char *lsnmap = NULL;
>  	d = data;
>  	uint32_t map_size = mp_decode_map(&d);
> @@ -896,6 +903,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>  			}
>  			*version_id = mp_decode_uint(&d);
>  			break;
> +		case IPROTO_SERVER_IS_RO:
> +			if (read_only == NULL)
> +				goto skip;
> +			if (mp_typeof(*d) != MP_BOOL) {
> +				diag_set(ClientError, ER_INVALID_MSGPACK,
> +					 "invalid STATUS");
> +				return -1;
> +			}
> +			*read_only = mp_decode_bool(&d);
> +			break;
>  		default: skip:
>  			mp_next(&d); /* value */
>  		}
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index d407d151..b10bf26d 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -251,6 +251,8 @@ xrow_encode_subscribe(struct xrow_header *row,
>   * @param[out] replicaset_uuid.
>   * @param[out] instance_uuid.
>   * @param[out] vclock.
> + * @param[out] version_id.
> + * @param[out] read_only.
>   *
>   * @retval  0 Success.
>   * @retval -1 Memory or format error.
> @@ -258,7 +260,7 @@ xrow_encode_subscribe(struct xrow_header *row,
>  int
>  xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>  		      struct tt_uuid *instance_uuid, struct vclock *vclock,
> -		      uint32_t *version_id);
> +		      uint32_t *version_id, bool *read_only);
>  
>  /**
>   * Encode JOIN command.
> @@ -282,7 +284,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
>  static inline int
>  xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
>  {
> -	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL);
> +	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL,
> +				     NULL);
>  }
>  
>  /**
> @@ -307,7 +310,23 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
>  static inline int
>  xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
>  {
> -	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL);
> +	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
> +}
> +
> +/**
> + * Decode peer vclock and access rights (a response to VOTE command).
> + * @param row Row to decode.
> + * @param[out] vclock.
> + * @param[out] read_only.
> + *
> + * @retval  0 Success.
> + * @retval -1 Memory or format error.
> + */
> +static inline int
> +xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock,
> +			 bool *read_only)
> +{
> +	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only);
>  }
>  
>  /**
> @@ -374,13 +393,15 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version);
>   * @param sync Request sync.
>   * @param schema_version.
>   * @param vclock.
> + * @param read_only.
>   *
>   * @retval  0 Success.
>   * @retval -1 Memory error.
>   */
>  int
> -iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version,
> -		    const struct vclock *vclock);
> +iproto_reply_request_vote(struct obuf *out, uint64_t sync,
> +			 uint32_t schema_version, const struct vclock *vclock,
> +			 bool read_only);
>  
>  /**
>   * Write an error packet int output buffer. Doesn't throw if out
> @@ -571,7 +592,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
>  			 uint32_t *replica_version_id)
>  {
>  	if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
> -				  vclock, replica_version_id) != 0)
> +				  vclock, replica_version_id, NULL) != 0)
>  		diag_raise();
>  }
>  
> @@ -608,6 +629,15 @@ xrow_decode_vclock_xc(struct xrow_header *row, struct vclock *vclock)
>  		diag_raise();
>  }
>  
> +/** @copydoc xrow_decode_request_vote. */
> +static inline void
> +xrow_decode_request_vote_xc(struct xrow_header *row, struct vclock *vclock,
> +			    bool *read_only)
> +{
> +	if (xrow_decode_request_vote(row, vclock, read_only) != 0)
> +		diag_raise();
> +}
> +
>  /** @copydoc iproto_reply_ok. */
>  static inline void
>  iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
> @@ -616,12 +646,14 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
>  		diag_raise();
>  }
>  
> -/** @copydoc iproto_reply_vclock. */
> +/** @copydoc iproto_reply_request_vote_xc. */
>  static inline void
> -iproto_reply_vclock_xc(struct obuf *out, uint64_t sync, uint32_t schema_version,
> -		       const struct vclock *vclock)
> +iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync,
> +			     uint32_t schema_version,
> +			     const struct vclock *vclock, bool read_only)
>  {
> -	if (iproto_reply_vclock(out, sync, schema_version, vclock) != 0)
> +	if (iproto_reply_request_vote(out, sync, schema_version,
> +				      vclock, read_only) != 0)
>  		diag_raise();
>  }
>  
> diff --git a/test/replication/replica_uuid_ro.lua b/test/replication/replica_uuid_ro.lua
> new file mode 100644
> index 00000000..dd5dae9e
> --- /dev/null
> +++ b/test/replication/replica_uuid_ro.lua
> @@ -0,0 +1,33 @@
> +#!/usr/bin/env tarantool
> +
> +-- get instance name from filename (replica_uuid_ro1.lua => replica_uuid_ro1)
> +local INSTANCE_ID = string.match(arg[0], "%d")
> +local USER = 'cluster'
> +local PASSWORD = 'somepassword'
> +local SOCKET_DIR = require('fio').cwd()
> +local function instance_uri(instance_id)
> +    --return 'localhost:'..(3310 + instance_id)
> +    return SOCKET_DIR..'/replica_uuid_ro'..instance_id..'.sock';
> +end
> +
> +-- start console first
> +require('console').listen(os.getenv('ADMIN'))
> +
> +box.cfg({
> +    instance_uuid = arg[1];
> +    listen = instance_uri(INSTANCE_ID);
> +--    log_level = 7;
> +    replication = {
> +        USER..':'..PASSWORD..'@'..instance_uri(1);
> +        USER..':'..PASSWORD..'@'..instance_uri(2);
> +    };
> +    read_only = (INSTANCE_ID ~= '1' and true or false);
> +})
> +
> +box.once("bootstrap", function()
> +    local test_run = require('test_run').new()
> +    box.schema.user.create(USER, { password = PASSWORD })
> +    box.schema.user.grant(USER, 'replication')
> +    box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +    box.space.test:create_index('primary')
> +end)
> diff --git a/test/replication/replica_uuid_ro1.lua b/test/replication/replica_uuid_ro1.lua
> new file mode 120000
> index 00000000..342d71c5
> --- /dev/null
> +++ b/test/replication/replica_uuid_ro1.lua
> @@ -0,0 +1 @@
> +replica_uuid_ro.lua
> \ No newline at end of file
> diff --git a/test/replication/replica_uuid_ro2.lua b/test/replication/replica_uuid_ro2.lua
> new file mode 120000
> index 00000000..342d71c5
> --- /dev/null
> +++ b/test/replication/replica_uuid_ro2.lua
> @@ -0,0 +1 @@
> +replica_uuid_ro.lua
> \ No newline at end of file
> diff --git a/test/replication/replicaset_ro_mostly.result b/test/replication/replicaset_ro_mostly.result
> new file mode 100644
> index 00000000..d753a182
> --- /dev/null
> +++ b/test/replication/replicaset_ro_mostly.result
> @@ -0,0 +1,59 @@
> +-- gh-3257 check bootstrap with read-only replica in cluster.
> +-- Old behaviour: failed, since read-only is chosen by uuid.
> +test_run = require('test_run').new()
> +---
> +...
> +SERVERS = {'replica_uuid_ro1', 'replica_uuid_ro2'}
> +---
> +...
> +uuid = require('uuid')
> +---
> +...
> +uuid1 = uuid.new()
> +---
> +...
> +uuid2 = uuid.new()
> +---
> +...
> +function sort_cmp(a, b) return a.time_low > b.time_low and true or false end
> +---
> +...
> +function sort(t) table.sort(t, sort_cmp) return t end
> +---
> +...
> +UUID = sort({uuid1, uuid2}, sort_cmp)
> +---
> +...
> +create_cluster_cmd1 = 'create server %s with script="replication/%s.lua"'
> +---
> +...
> +create_cluster_cmd2 = 'start server %s with args="%s", wait_load=False, wait=False'
> +---
> +...
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +function create_cluster_uuid(servers, uuids)
> +    for i, name in ipairs(servers) do
> +        test_run:cmd(create_cluster_cmd1:format(name, name))
> +        test_run:cmd(create_cluster_cmd2:format(name, uuids[i]))
> +    end
> +end;
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> +-- Deploy a cluster.
> +create_cluster_uuid(SERVERS, UUID)
> +---
> +...
> +test_run:wait_fullmesh(SERVERS)
> +---
> +...
> +-- Cleanup.
> +test_run:drop_cluster(SERVERS)
> +---
> +...
> diff --git a/test/replication/replicaset_ro_mostly.test.lua b/test/replication/replicaset_ro_mostly.test.lua
> new file mode 100644
> index 00000000..539ca5a1
> --- /dev/null
> +++ b/test/replication/replicaset_ro_mostly.test.lua
> @@ -0,0 +1,30 @@
> +-- gh-3257 check bootstrap with read-only replica in cluster.
> +-- Old behaviour: failed, since read-only is chosen by uuid.
> +test_run = require('test_run').new()
> +
> +SERVERS = {'replica_uuid_ro1', 'replica_uuid_ro2'}
> +
> +uuid = require('uuid')
> +uuid1 = uuid.new()
> +uuid2 = uuid.new()
> +function sort_cmp(a, b) return a.time_low > b.time_low and true or false end
> +function sort(t) table.sort(t, sort_cmp) return t end
> +UUID = sort({uuid1, uuid2}, sort_cmp)
> +
> +create_cluster_cmd1 = 'create server %s with script="replication/%s.lua"'
> +create_cluster_cmd2 = 'start server %s with args="%s", wait_load=False, wait=False'
> +
> +test_run:cmd("setopt delimiter ';'")
> +function create_cluster_uuid(servers, uuids)
> +    for i, name in ipairs(servers) do
> +        test_run:cmd(create_cluster_cmd1:format(name, name))
> +        test_run:cmd(create_cluster_cmd2:format(name, uuids[i]))
> +    end
> +end;
> +test_run:cmd("setopt delimiter ''");
> +
> +-- Deploy a cluster.
> +create_cluster_uuid(SERVERS, UUID)
> +test_run:wait_fullmesh(SERVERS)
> +-- Cleanup.
> +test_run:drop_cluster(SERVERS)



More information about the Tarantool-patches mailing list