[Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Nov 23 16:45:52 MSK 2019


Thanks for the patch!

See 23 comments below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Relaying from C-written wal requires coio to be a C-compliant. So
> get rid of exception from coio interface.
> 
> Part of #980
> ---
>  src/box/applier.cc      |  52 ++++++----
>  src/box/box.cc          |   9 +-
>  src/box/relay.cc        |  11 ++-
>  src/box/xrow_io.cc      |  59 +++++------
>  src/box/xrow_io.h       |  11 ++-
>  src/lib/core/coio.cc    | 212 +++++++++++++++++++++-------------------
>  src/lib/core/coio.h     |  13 +--
>  src/lib/core/coio_buf.h |   8 ++
>  8 files changed, 207 insertions(+), 168 deletions(-)
> > @@ -345,8 +348,9 @@ applier_connect(struct applier *applier)
>  	 * election on bootstrap.
>  	 */
>  	xrow_encode_vote(&row);
> -	coio_write_xrow(coio, &row);
> -	coio_read_xrow(coio, ibuf, &row);
> +	if (coio_write_xrow(coio, &row) < 0 ||
> +	    coio_read_xrow(coio, ibuf, &row) < 0)

1. coio_read_xrow() uses coio_breadn(), which calls ibuf_reserve_xc().
It is not exception safe.

> +		diag_raise();
>  	if (row.type == IPROTO_OK) {
>  		xrow_decode_ballot_xc(&row, &applier->ballot);
>  	} else try {> diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
> index 48707982b..f432c6b49 100644
> --- a/src/box/xrow_io.cc
> +++ b/src/box/xrow_io.cc
> @@ -35,71 +35,74 @@
>  #include "error.h"
>  #include "msgpuck/msgpuck.h"
>  
> -void
> +ssize_t
>  coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
>  {
>  	/* Read fixed header */
> -	if (ibuf_used(in) < 1)
> -		coio_breadn(coio, in, 1);
> +	if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0)
> +		return -1;
>  
>  	/* Read length */
>  	if (mp_typeof(*in->rpos) != MP_UINT) {
> -		tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -			  "packet length");
> +		diag_set(ClientError, ER_INVALID_MSGPACK,
> +			 "packet length");
> +		return -1;
>  	}
>  	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
> -	if (to_read > 0)
> -		coio_breadn(coio, in, to_read);
> +	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
> +		return -1;
>  
>  	uint32_t len = mp_decode_uint((const char **) &in->rpos);
>  
>  	/* Read header and body */
>  	to_read = len - ibuf_used(in);
> -	if (to_read > 0)
> -		coio_breadn(coio, in, to_read);
> +	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
> +		return -1;
>  
> -	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
> -			      true);

2. xrow_header_decode_xc() is now unused and can be dropped.

> +	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
> +				  true);
>  }
>  
> -void
> -coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
> -			  struct xrow_header *row, ev_tstamp timeout)
> +ssize_t
> +coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
> +		       struct xrow_header *row, ev_tstamp timeout)
>  {
>  	ev_tstamp start, delay;
>  	coio_timeout_init(&start, &delay, timeout);
>  	/* Read fixed header */
> -	if (ibuf_used(in) < 1)
> -		coio_breadn_timeout(coio, in, 1, delay);
> +	if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0)
> +		return -1;
>  	coio_timeout_update(&start, &delay);
>  
>  	/* Read length */
>  	if (mp_typeof(*in->rpos) != MP_UINT) {
> -		tnt_raise(ClientError, ER_INVALID_MSGPACK,
> -			  "packet length");
> +		diag_set(ClientError, ER_INVALID_MSGPACK,
> +			 "packet length");
> +		return -1;
>  	}
>  	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
> -	if (to_read > 0)
> -		coio_breadn_timeout(coio, in, to_read, delay);
> +	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
> +		return -1;
>  	coio_timeout_update(&start, &delay);
>  
>  	uint32_t len = mp_decode_uint((const char **) &in->rpos);
>  
>  	/* Read header and body */
>  	to_read = len - ibuf_used(in);
> -	if (to_read > 0)
> -		coio_breadn_timeout(coio, in, to_read, delay);
> +	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
> +		return -1;
>  
> -	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
> -			      true);
> +	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
> +				  true);
>  }
>  
> -
> -void
> +ssize_t
>  coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
>  {
>  	struct iovec iov[XROW_IOVMAX];
> -	int iovcnt = xrow_to_iovec_xc(row, iov);

3. xrow_to_iovec_xc() is now unused and can be dropped.

> -	coio_writev(coio, iov, iovcnt, 0);
> +	int iovcnt = xrow_to_iovec(row, iov);
> +	if (iovcnt < 0)
> +		return -1;
> +	return coio_writev(coio, iov, iovcnt, 0);
>  }
>  

4. The whole xrow_io.cc is now exception safe. I propose you to
rename it to .c .Perhaps it would be good to rename all safe
.cc files to .c in one commit.

> diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
> index e88d724d5..96a529c2c 100644
> --- a/src/lib/core/coio.cc
> +++ b/src/lib/core/coio.cc
> @@ -41,12 +41,6 @@
>  #include "scoped_guard.h"

5. scoped guard header is not needed anymore here.

>  #include "coio_task.h" /* coio_resolve() */
>  
> @@ -79,36 +75,43 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr,
>  {
>  	ev_loop *loop = loop();
>  	if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0)
> -		diag_raise();
> -	auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); });
> -	if (sio_connect(coio->fd, addr, len) == 0) {
> -		coio_guard.is_active = false;
> +		return -1;
> +	if (sio_connect(coio->fd, addr, len) == 0)
>  		return 0;
> -	}
>  	if (errno != EINPROGRESS)
> -		diag_raise();
> +		goto close;
>  	/*
>  	 * Wait until socket is ready for writing or
>  	 * timed out.
>  	 */
>  	ev_io_set(coio, coio->fd, EV_WRITE);
>  	ev_io_start(loop, coio);
> -	bool is_timedout = coio_fiber_yield_timeout(coio, timeout);
> +	bool is_timedout;
> +	is_timedout = coio_fiber_yield_timeout(coio, timeout);

6. Why did you break the line?

> -	int error = EINPROGRESS;
> -	socklen_t sz = sizeof(error);
> +	int error;
> +	socklen_t sz;
> +	error = EINPROGRESS;
> +	sz = sizeof(error);

7. Why did you break these lines? They were perfectly fine.

>  	if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR,
>  		       &error, &sz))
> -		diag_raise();
> +		goto close;
>  	if (error != 0) {
>  		errno = error;
> -		tnt_raise(SocketError, sio_socketname(coio->fd), "connect");
> +		diag_set(SocketError, sio_socketname(coio->fd), "connect");
> +		goto close;
>  	}
> -	coio_guard.is_active = false;
>  	return 0;
> +
> +close:

8. I propose you to rename this to 'fail:' or something else meaning
an error, since this is not just a normal close. It happens at an
error only.

> +	evio_close(loop, coio);
> +	return -1;
>  }
>  
>  void
> @@ -201,41 +204,37 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
>  	    hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE;
>  	    hints.ai_protocol = 0;
>  	    int rc = coio_getaddrinfo(host, service, &hints, &ai, delay);
> -	    if (rc != 0) {
> -		    diag_raise();
> -		    panic("unspecified getaddrinfo error");
> -	    }
> +	    if (rc != 0)
> +			return -1;

9. Double indentation.

10. You can drop 'rc' and check coio_getaddrinfo() result
directly:

    if (coio_getaddrinfo(...) != 0)
            return -1;

>  	}
> -	auto addrinfo_guard = make_scoped_guard([=] {
> -		if (!uri->host_hint) freeaddrinfo(ai);
> -		else free(ai_local.ai_addr);
> -	});
> +	struct addrinfo *first_ai = ai;
>  	evio_timeout_update(loop(), &start, &delay);
>  
>  	coio_timeout_init(&start, &delay, timeout);
>  	assert(! evio_has_fd(coio));
> -	while (ai) {
> -		try {
> -			if (coio_connect_addr(coio, ai->ai_addr,
> -					      ai->ai_addrlen, delay))
> -				return -1;
> +	while (ai && delay >= 0) {
> +		if (coio_connect_addr(coio, ai->ai_addr,
> +				      ai->ai_addrlen, delay) == 0) {
>  			if (addr != NULL) {
>  				assert(addr_len != NULL);
>  				*addr_len = MIN(ai->ai_addrlen, *addr_len);
>  				memcpy(addr, ai->ai_addr, *addr_len);
>  			}
>  			return 0; /* connected */

11. struct addrinfo *ai leaks here.

> -		} catch (SocketError *e) {
> -			if (ai->ai_next == NULL)
> -				throw;
> -			/* ignore exception and try the next address */
>  		}
> -		ai = ai->ai_next;
>  		ev_now_update(loop);
>  		coio_timeout_update(&start, &delay);
> +		ai = ai->ai_next;

12. Why did you move this line?

13. Before your patch 'SocketError' was allowed in case there are
more addresses to try. Other errors were not tolerated. Why
do you tolerate them now?

>  	}
>  
> -	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
> +	/* Set an error if not timedout. */
> +	if (delay >= 0)
> +		diag_set(SocketError, sio_socketname(coio->fd), "connection failed");

14. Please, don't make functional changes.

> +	if (!uri->host_hint)
> +		freeaddrinfo(first_ai);
> +	else
> +		free(ai_local.ai_addr);
> +	return -1;
>  }
>  
>  /**
> @@ -259,12 +256,12 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
>  			if (evio_setsockopt_client(fd, addr->sa_family,
>  						   SOCK_STREAM) != 0) {
>  				close(fd);
> -				diag_raise();
> +				return -1;

15. CoioGuard did ev_io_stop() here. Why did you remove it?

>  			}
>  			return fd;
>  		}
>  		if (! sio_wouldblock(errno))
> -			diag_raise();
> +			return -1;

16. The same.

>  		/* The socket is not ready, yield */
>  		if (! ev_is_active(coio)) {
>  			ev_io_set(coio, coio->fd, EV_READ);
> @@ -275,11 +272,16 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
>  		 * timeout is reached.
>  		 */
>  		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
> -		fiber_testcancel();
> +		ev_io_stop(loop(), coio);

17. Why did you stop it unconditionally? It was stopped only
in case of timeout or fiber cancelation before your patch. Otherwise
the cycle was working further without stop of the coio.

> +		if (fiber_is_cancelled()) {
> +			diag_set(FiberIsCancelled);
> +			break;
> +		}
>  		if (is_timedout)
> -			tnt_raise(TimedOut);
> +			break;
>  		coio_timeout_update(&start, &delay);
>  	}
> +	return -1;
>  }
>  
>  /**
> @@ -302,8 +304,6 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
>  
>  	ssize_t to_read = (ssize_t) sz;
>  
> -	CoioGuard coio_guard(coio);

18. All the same as about the previous guard. In all other 'guarded'
places too.

> -
>  	while (true) {
>  		/*
>  		 * Sic: assume the socket is ready: since
> @@ -446,9 +449,11 @@ coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt)
>  {
>  	sio_add_to_iov(iov, -offset);
>  	ssize_t nwr = sio_writev(fd, iov, iovcnt);
> +	if (nwr < 0 && !sio_wouldblock(errno))
> +		return -1;
>  	sio_add_to_iov(iov, offset);
> -	if (nwr < 0 && ! sio_wouldblock(errno))
> -		diag_raise();
> +	if (nwr < 0)
> +		return 0;

19. Why? Please, don't make functional changes in a refactoring
patch.

>  	return nwr;
>  }
>  
> @@ -461,14 +466,15 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
>  	struct iovec *end = iov + iovcnt;
>  	ev_tstamp start, delay;
>  	coio_timeout_init(&start, &delay, timeout);
> -	CoioGuard coio_guard(coio);
>  
>  	/* Avoid a syscall in case of 0 iovcnt. */
>  	while (iov < end) {
>  		/* Write as much data as possible. */
>  		ssize_t nwr = coio_flush(coio->fd, iov, iov_len,
>  					 end - iov);
> -		if (nwr >= 0) {
> +		if (nwr < 0)
> +			return -1;

20. Why do you return -1? Before your patch it went to
testcancel() below.

> +		if (nwr > 0) {
>  			total += nwr;
>  			/*
>  			 * If there was a hint for the total size
> @@ -638,12 +649,13 @@ coio_service_init(struct coio_service *service, const char *name,
>  	service->handler_param = handler_param;
>  }
>  
> -void
> +int
>  coio_service_start(struct evio_service *service, const char *uri)
>  {
>  	if (evio_service_bind(service, uri) != 0 ||
>  	    evio_service_listen(service) != 0)
> -		diag_raise();
> +		return -1;
> +	return -0;

21. -0 ?

>  }
>  
>  void
> @@ -661,7 +673,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout)
>  	coio_timeout_init(&start, &delay, timeout);
>  	fiber_yield_timeout(delay);
>  	ev_stat_stop(loop(), stat);
> -	fiber_testcancel();

22. Testcancel was supposed to throw an error if the fiber
is cancelled. Now you ignore cancellation. Please, don't.
In other places too.

>  }
>  
>  typedef void (*ev_child_cb)(ev_loop *, ev_child *, int);
> @@ -689,7 +700,6 @@ coio_waitpid(pid_t pid)
>  	fiber_set_cancellable(allow_cancel);
>  	ev_child_stop(loop(), &cw);
>  	int status = cw.rstatus;
> -	fiber_testcancel();
>  	return status;
>  }
>  
> diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
> index 6a2337689..d557f2869 100644
> --- a/src/lib/core/coio.h
> +++ b/src/lib/core/coio.h
> @@ -71,7 +70,7 @@ void
>  coio_create(struct ev_io *coio, int fd);
>  
>  static inline void
> -coio_close(ev_loop *loop, struct ev_io *coio)
> +coio_destroy(ev_loop *loop, struct ev_io *coio)

23. Please, avoid changes not related to the patch
purpose.


More information about the Tarantool-patches mailing list