[Tarantool-patches] [PATCH v4 04/11] coio: do not throw an error, minor refactoring

Georgy Kirichenko georgy at tarantool.org
Wed Feb 12 12:39:13 MSK 2020


Relaying from C-written wal requires coio and xrow_io to be
a C-compliant. So get rid of exception from coio interface.
Also this patch includes some minor refactoring (as code looks ugly
without them):
 1. Get rid of unused size_hint from coio_writev_timeout.
 2. Handle partial read/write before yield loop.
 3. Do not reset errno to 0 in case of reading EOF.

Part of #980
---
 src/box/applier.cc      |  49 ++--
 src/box/box.cc          |   9 +-
 src/box/relay.cc        |  11 +-
 src/box/xrow_io.cc      |  59 ++---
 src/box/xrow_io.h       |  11 +-
 src/lib/core/coio.cc    | 535 ++++++++++++++++++++++++----------------
 src/lib/core/coio.h     |  19 +-
 src/lib/core/coio_buf.h |   8 +
 8 files changed, 413 insertions(+), 288 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ae3d281a5..ad427707a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -173,8 +173,9 @@ applier_writer_f(va_list ap)
 			continue;
 		try {
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset.vclock);
-			coio_write_xrow(&io, &xrow);
+			if (xrow_encode_vclock(&xrow, &replicaset.vclock) != 0 ||
+			    coio_write_xrow(&io, &xrow) < 0)
+				diag_raise();
 		} catch (SocketError *e) {
 			/*
 			 * There is no point trying to send ACKs if
@@ -308,9 +309,11 @@ applier_connect(struct applier *applier)
 	 */
 	applier->addr_len = sizeof(applier->addrstorage);
 	applier_set_state(applier, APPLIER_CONNECT);
-	coio_connect(coio, uri, &applier->addr, &applier->addr_len);
+	if (coio_connect(coio, uri, &applier->addr, &applier->addr_len) != 0)
+		diag_raise();
 	assert(coio->fd >= 0);
-	coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE);
+	if (coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE) < 0)
+		diag_raise();
 	applier->last_row_time = ev_monotonic_now(loop());
 
 	/* Decode instance version and name from greeting */
@@ -345,8 +348,9 @@ applier_connect(struct applier *applier)
 	 * election on bootstrap.
 	 */
 	xrow_encode_vote(&row);
-	coio_write_xrow(coio, &row);
-	coio_read_xrow(coio, ibuf, &row);
+	if (coio_write_xrow(coio, &row) < 0 ||
+	    coio_read_xrow(coio, ibuf, &row) < 0)
+		diag_raise();
 	if (row.type == IPROTO_OK) {
 		xrow_decode_ballot_xc(&row, &applier->ballot);
 	} else try {
@@ -376,8 +380,9 @@ applier_connect(struct applier *applier)
 			    uri->login_len,
 			    uri->password != NULL ? uri->password : "",
 			    uri->password_len);
-	coio_write_xrow(coio, &row);
-	coio_read_xrow(coio, ibuf, &row);
+	if (coio_write_xrow(coio, &row) < 0 ||
+	    coio_read_xrow(coio, ibuf, &row) < 0)
+		diag_raise();
 	applier->last_row_time = ev_monotonic_now(loop());
 	if (row.type != IPROTO_OK)
 		xrow_decode_error_xc(&row); /* auth failed */
@@ -401,7 +406,8 @@ applier_wait_snapshot(struct applier *applier)
 	 */
 	if (applier->version_id >= version_id(1, 7, 0)) {
 		/* Decode JOIN/FETCH_SNAPSHOT response */
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		if (iproto_type_is_error(row.type)) {
 			xrow_decode_error_xc(&row); /* re-throw error */
 		} else if (row.type != IPROTO_OK) {
@@ -422,7 +428,8 @@ applier_wait_snapshot(struct applier *applier)
 	 */
 	uint64_t row_count = 0;
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			if (apply_snapshot_row(&row) != 0)
@@ -488,7 +495,8 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 	 * Receive final data.
 	 */
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
@@ -605,10 +613,13 @@ applier_read_tx_row(struct applier *applier)
 	 * from the master for quite a while the connection is
 	 * broken - the master might just be idle.
 	 */
-	if (applier->version_id < version_id(1, 7, 7))
-		coio_read_xrow(coio, ibuf, row);
-	else
-		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+	if (applier->version_id < version_id(1, 7, 7)) {
+		if (coio_read_xrow(coio, ibuf, row) < 0)
+			diag_raise();
+	} else {
+		if (coio_read_xrow_timeout(coio, ibuf, row, timeout) < 0)
+			diag_raise();
+	}
 
 	applier->lag = ev_now(loop()) - row->tm;
 	applier->last_row_time = ev_monotonic_now(loop());
@@ -868,11 +879,13 @@ applier_subscribe(struct applier *applier)
 	vclock_copy(&vclock, &replicaset.vclock);
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &vclock, replication_anon);
-	coio_write_xrow(coio, &row);
+	if (coio_write_xrow(coio, &row) < 0)
+		diag_raise();
 
 	/* Read SUBSCRIBE response */
 	if (applier->version_id >= version_id(1, 6, 7)) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		if (iproto_type_is_error(row.type)) {
 			xrow_decode_error_xc(&row);  /* error */
 		} else if (row.type != IPROTO_OK) {
@@ -1009,7 +1022,7 @@ applier_disconnect(struct applier *applier, enum applier_state state)
 		applier->writer = NULL;
 	}
 
-	coio_close(loop(), &applier->io);
+	coio_destroy(loop(), &applier->io);
 	/* Clear all unparsed input. */
 	ibuf_reinit(&applier->ibuf);
 	fiber_gc();
diff --git a/src/box/box.cc b/src/box/box.cc
index 611100b8b..ca1696383 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1741,7 +1741,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, &stop_vclock);
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	/*
 	 * Final stage: feed replica with WALs in range
@@ -1753,7 +1754,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	/* Send end of WAL stream marker */
 	xrow_encode_vclock_xc(&row, &replicaset.vclock);
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	/*
 	 * Advance the WAL consumer state to the position where
@@ -1845,7 +1847,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	assert(self != NULL); /* the local registration is read-only */
 	row.replica_id = self->id;
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	say_info("subscribed replica %s at %s",
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
diff --git a/src/box/relay.cc b/src/box/relay.cc
index d5a1c9c68..bb7761b99 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -317,7 +317,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, vclock);
 	row.sync = sync;
-	coio_write_xrow(&relay->io, &row);
+	if (coio_write_xrow(&relay->io, &row) < 0)
+		diag_raise();
 
 	/* Send read view to the replica. */
 	engine_join_xc(&ctx, &relay->stream);
@@ -516,8 +517,9 @@ relay_reader_f(va_list ap)
 	try {
 		while (!fiber_is_cancelled()) {
 			struct xrow_header xrow;
-			coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
-					replication_disconnect_timeout());
+			if (coio_read_xrow_timeout(&io, &ibuf, &xrow,
+					replication_disconnect_timeout()) < 0)
+				diag_raise();
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
@@ -721,7 +723,8 @@ relay_send(struct relay *relay, struct xrow_header *packet)
 
 	packet->sync = relay->sync;
 	relay->last_row_time = ev_monotonic_now(loop());
-	coio_write_xrow(&relay->io, packet);
+	if (coio_write_xrow(&relay->io, packet) < 0)
+		diag_raise();
 	fiber_gc();
 
 	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 48707982b..4e79cd2f0 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -35,71 +35,74 @@
 #include "error.h"
 #include "msgpuck/msgpuck.h"
 
-void
+ssize_t
 coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
 {
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
-		coio_breadn(coio, in, 1);
+	if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0)
+		return -1;
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
-		tnt_raise(ClientError, ER_INVALID_MSGPACK,
-			  "packet length");
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "packet length");
+		return -1;
 	}
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
-	if (to_read > 0)
-		coio_breadn(coio, in, to_read);
+	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+		return -1;
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
 	/* Read header and body */
 	to_read = len - ibuf_used(in);
-	if (to_read > 0)
-		coio_breadn(coio, in, to_read);
+	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+		return -1;
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
-			      true);
+	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+				  true);
 }
 
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
-			  struct xrow_header *row, ev_tstamp timeout)
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+		       struct xrow_header *row, ev_tstamp timeout)
 {
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
-		coio_breadn_timeout(coio, in, 1, delay);
+	if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0)
+		return -1;
 	coio_timeout_update(&start, &delay);
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
-		tnt_raise(ClientError, ER_INVALID_MSGPACK,
-			  "packet length");
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "packet length");
+		return -1;
 	}
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
-	if (to_read > 0)
-		coio_breadn_timeout(coio, in, to_read, delay);
+	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+		return -1;
 	coio_timeout_update(&start, &delay);
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
 	/* Read header and body */
 	to_read = len - ibuf_used(in);
-	if (to_read > 0)
-		coio_breadn_timeout(coio, in, to_read, delay);
+	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+		return -1;
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
-			      true);
+	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+				  true);
 }
 
-
-void
+ssize_t
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
 {
 	struct iovec iov[XROW_IOVMAX];
-	int iovcnt = xrow_to_iovec_xc(row, iov);
-	coio_writev(coio, iov, iovcnt, 0);
+	int iovcnt = xrow_to_iovec(row, iov);
+	if (iovcnt < 0)
+		return -1;
+	return coio_writev(coio, iov, iovcnt);
 }
 
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..96c5047b7 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -30,6 +30,7 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+#include "unistd.h"
 #if defined(__cplusplus)
 extern "C" {
 #endif
@@ -38,14 +39,14 @@ struct ev_io;
 struct ibuf;
 struct xrow_header;
 
-void
+ssize_t
 coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row);
 
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
-			  struct xrow_header *row, double timeout);
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+		       struct xrow_header *row, double timeout);
 
-void
+ssize_t
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
 
 
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index faa7e5bd5..8ae6930de 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -41,12 +41,6 @@
 #include "scoped_guard.h"
 #include "coio_task.h" /* coio_resolve() */
 
-struct CoioGuard {
-	struct ev_io *ev_io;
-	CoioGuard(struct ev_io *arg) :ev_io(arg) {}
-	~CoioGuard() { ev_io_stop(loop(), ev_io); }
-};
-
 typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int);
 
 /** Note: this function does not throw */
@@ -65,12 +59,14 @@ coio_fiber_yield_timeout(struct ev_io *coio, ev_tstamp delay)
 	coio->data = fiber();
 	bool is_timedout = fiber_yield_timeout(delay);
 	coio->data = NULL;
+	if (is_timedout)
+		diag_set(TimedOut);
 	return is_timedout;
 }
 
 /**
  * Connect to a host with a specified timeout.
- * @retval -1 timeout
+ * @retval -1 error or timeout
  * @retval 0 connected
  */
 static int
@@ -79,36 +75,46 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr,
 {
 	ev_loop *loop = loop();
 	if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0)
-		diag_raise();
-	auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); });
-	if (sio_connect(coio->fd, addr, len) == 0) {
-		coio_guard.is_active = false;
+		return -1;
+	if (sio_connect(coio->fd, addr, len) == 0)
 		return 0;
+	if (errno != EINPROGRESS) {
+		evio_close(loop, coio);
+		return -1;
 	}
-	if (errno != EINPROGRESS)
-		diag_raise();
 	/*
 	 * Wait until socket is ready for writing or
 	 * timed out.
 	 */
 	ev_io_set(coio, coio->fd, EV_WRITE);
 	ev_io_start(loop, coio);
-	bool is_timedout = coio_fiber_yield_timeout(coio, timeout);
+	bool is_timedout;
+	is_timedout = coio_fiber_yield_timeout(coio, timeout);
 	ev_io_stop(loop, coio);
-	fiber_testcancel();
-	if (is_timedout)
-		tnt_raise(TimedOut);
+	if (fiber_is_cancelled()) {
+		diag_set(FiberIsCancelled);
+		evio_close(loop, coio);
+		return -1;
+	}
+	if (is_timedout) {
+		evio_close(loop, coio);
+		return -1;
+	}
 	int error = EINPROGRESS;
 	socklen_t sz = sizeof(error);
 	if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR,
-		       &error, &sz))
-		diag_raise();
+			   &error, &sz)) {
+		evio_close(loop, coio);
+		return -1;
+	}
 	if (error != 0) {
 		errno = error;
-		tnt_raise(SocketError, sio_socketname(coio->fd), "connect");
+		diag_set(SocketError, sio_socketname(coio->fd), "connect");
+		evio_close(loop, coio);
+		return -1;
 	}
-	coio_guard.is_active = false;
 	return 0;
+
 }
 
 void
@@ -152,7 +158,7 @@ coio_fill_addrinfo(struct addrinfo *ai_local, const char *host,
  * This function also supports UNIX domain sockets if uri->path is not NULL and
  * uri->service is NULL.
  *
- * @retval -1 timeout
+ * @retval -1 error or timeout
  * @retval 0 connected
  */
 int
@@ -201,52 +207,55 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 	    hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE;
 	    hints.ai_protocol = 0;
 	    int rc = coio_getaddrinfo(host, service, &hints, &ai, delay);
-	    if (rc != 0) {
-		    diag_raise();
-		    panic("unspecified getaddrinfo error");
-	    }
+	    if (rc != 0)
+			return -1;
 	}
-	auto addrinfo_guard = make_scoped_guard([=] {
-		if (!uri->host_hint) freeaddrinfo(ai);
-		else free(ai_local.ai_addr);
-	});
+	struct addrinfo *first_ai = ai;
 	evio_timeout_update(loop(), &start, &delay);
 
 	coio_timeout_init(&start, &delay, timeout);
 	assert(! evio_has_fd(coio));
-	while (ai) {
-		try {
-			if (coio_connect_addr(coio, ai->ai_addr,
-					      ai->ai_addrlen, delay))
-				return -1;
+	while (ai && delay >= 0) {
+		if (coio_connect_addr(coio, ai->ai_addr,
+				      ai->ai_addrlen, delay) == 0) {
 			if (addr != NULL) {
 				assert(addr_len != NULL);
 				*addr_len = MIN(ai->ai_addrlen, *addr_len);
 				memcpy(addr, ai->ai_addr, *addr_len);
 			}
+			if (!uri->host_hint)
+				freeaddrinfo(first_ai);
+			else
+				free(ai_local.ai_addr);
 			return 0; /* connected */
-		} catch (SocketError *e) {
-			if (ai->ai_next == NULL)
-				throw;
-			/* ignore exception and try the next address */
 		}
 		ai = ai->ai_next;
 		ev_now_update(loop);
 		coio_timeout_update(&start, &delay);
 	}
 
-	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
+	/* Set an error if not timedout. */
+	if (delay >= 0)
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "connection failed");
+	if (!uri->host_hint)
+		freeaddrinfo(first_ai);
+	else
+		free(ai_local.ai_addr);
+	return -1;
 }
 
 /* Do not allow to reuse coio by different fiber. */
-static inline void
+static inline int
 check_coio_in_use(struct ev_io *coio)
 {
 	if (ev_is_active(coio)) {
 		errno = EINPROGRESS;
-		tnt_raise(SocketError, sio_socketname(coio->fd),
-			  "already in use");
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "already in use");
+		return -1;
 	}
+	return 0;
 }
 
 /**
@@ -260,45 +269,61 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	check_coio_in_use(coio);
-	CoioGuard coio_guard(coio);
-
-	while (true) {
-		/* Assume that there are waiting clients
-		 * available */
-		int fd = sio_accept(coio->fd, addr, &addrlen);
-		if (fd >= 0) {
-			if (evio_setsockopt_client(fd, addr->sa_family,
-						   SOCK_STREAM) != 0) {
-				close(fd);
-				diag_raise();
-			}
-			return fd;
-		}
-		if (! sio_wouldblock(errno))
-			diag_raise();
-		/* The socket is not ready, yield */
-		if (! ev_is_active(coio)) {
-			ev_io_set(coio, coio->fd, EV_READ);
-			ev_io_start(loop(), coio);
+	if (check_coio_in_use(coio) != 0)
+		return -1;
+
+	/* Assume that there are waiting clients available */
+	int fd = sio_accept(coio->fd, addr, &addrlen);
+
+	if (fd >= 0) {
+		if (evio_setsockopt_client(fd, addr->sa_family,
+					   SOCK_STREAM) != 0) {
+			close(fd);
+			return -1;
 		}
+		return fd;
+	}
+
+	if (!sio_wouldblock(errno))
+		return -1;
+
+	/* The socket is not ready, yield */
+	ev_io_set(coio, coio->fd, EV_READ);
+	ev_io_start(loop(), coio);
+
+	do {
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached.
 		 */
 		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
-		fiber_testcancel();
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
+		fd = sio_accept(coio->fd, addr, &addrlen);
+	} while (fd < 0 && sio_wouldblock(errno));
+
+	ev_io_stop(loop(), coio);
+
+	if (fd >= 0) {
+		if (evio_setsockopt_client(fd, addr->sa_family,
+					   SOCK_STREAM) != 0) {
+			close(fd);
+			return -1;
+		}
+		return fd;
 	}
+	return -1;
 }
 
 /**
  * Read at least sz bytes from socket with readahead.
  *
- * In case of EOF returns the amount read until eof (possibly 0),
- * and sets errno to 0.
+ * In case of EOF returns the amount read until eof (possibly 0).
  * Can read up to bufsiz bytes.
  *
  * @retval the number of bytes read.
@@ -313,46 +338,68 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 	coio_timeout_init(&start, &delay, timeout);
 
 	ssize_t to_read = (ssize_t) sz;
+	if (to_read <= 0)
+		return 0;
 
-	check_coio_in_use(coio);
-	CoioGuard coio_guard(coio);
+	if (check_coio_in_use(coio) != 0)
+		return -1;
 
-	while (true) {
-		/*
-		 * Sic: assume the socket is ready: since
-		 * the user called read(), some data must
-		 * be expected.
-		 */
-		ssize_t nrd = sio_read(coio->fd, buf, bufsiz);
-		if (nrd > 0) {
-			to_read -= nrd;
-			if (to_read <= 0)
-				return sz - to_read;
-			buf = (char *) buf + nrd;
-			bufsiz -= nrd;
-		} else if (nrd == 0) {
-			errno = 0;
-			return sz - to_read;
-		} else if (! sio_wouldblock(errno)) {
-			diag_raise();
-		}
+	ssize_t nrd;
+	/*
+	 * Sic: assume the socket is ready: since
+	 * the user called read(), some data must
+	 * be expected.
+	 */
+	do {
+		nrd = sio_read(coio->fd, buf, bufsiz);
+		if (nrd <= 0)
+			break;
+		to_read -= nrd;
+		buf = (char *) buf + nrd;
+		bufsiz -= nrd;
+	} while (to_read > 0);
+
+	if (nrd >= 0)
+		return sz - to_read;
+
+	if (!sio_wouldblock(errno))
+		return -1;
 
-		/* The socket is not ready, yield */
-		if (! ev_is_active(coio)) {
-			ev_io_set(coio, coio->fd, EV_READ);
-			ev_io_start(loop(), coio);
-		}
+	/* The socket is not ready, yield */
+	ev_io_set(coio, coio->fd, EV_READ);
+	ev_io_start(loop(), coio);
+
+	do {
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is being reached.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
-	}
+		nrd = sio_read(coio->fd, buf, bufsiz);
+		if (nrd == 0)
+			break;
+		if (nrd < 0) {
+			if (sio_wouldblock(errno))
+				continue;
+			break;
+		}
+		to_read -= nrd;
+		buf = (char *) buf + nrd;
+		bufsiz -= nrd;
+	} while (to_read > 0);
+
+	ev_io_stop(loop(), coio);
+
+	if (nrd < 0)
+		return -1;
+	return sz - to_read;
 }
 
 /**
@@ -366,10 +413,13 @@ ssize_t
 coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
 {
 	ssize_t nrd = coio_read_ahead(coio, buf, sz, bufsiz);
+	if (nrd < 0)
+		return -1;
 	if (nrd < (ssize_t)sz) {
 		errno = EPIPE;
-		tnt_raise(SocketError, sio_socketname(coio->fd),
-			  "unexpected EOF when reading from socket");
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "unexpected EOF when reading from socket");
+		return -1;
 	}
 	return nrd;
 }
@@ -386,10 +436,13 @@ coio_readn_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz
 		         ev_tstamp timeout)
 {
 	ssize_t nrd = coio_read_ahead_timeout(coio, buf, sz, bufsiz, timeout);
-	if (nrd < (ssize_t)sz && errno == 0) { /* EOF. */
+	if (nrd < 0)
+		return -1;
+	if (nrd < (ssize_t)sz) { /* EOF. */
 		errno = EPIPE;
-		tnt_raise(SocketError, sio_socketname(coio->fd),
-			  "unexpected EOF when reading from socket");
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "unexpected EOF when reading from socket");
+		return -1;
 	}
 	return nrd;
 }
@@ -412,43 +465,62 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	check_coio_in_use(coio);
-	CoioGuard coio_guard(coio);
+	if (towrite <= 0)
+		return 0;
+	if (check_coio_in_use(coio) != 0)
+		return -1;
+
+	ssize_t nwr;
+	/*
+	 * Sic: write as much data as possible,
+	 * assuming the socket is ready.
+	 */
+	do {
+		nwr = sio_write(coio->fd, buf, towrite);
+		if (nwr < 0)
+			break;
+		towrite -= nwr;
+		buf = (char *) buf + nwr;
+	} while (towrite > 0);
+
+	if (nwr > 0)
+		return sz;
+
+	if (!sio_wouldblock(errno))
+		return -1;
 
-	while (true) {
-		/*
-		 * Sic: write as much data as possible,
-		 * assuming the socket is ready.
-		 */
-		ssize_t nwr = sio_write(coio->fd, buf, towrite);
-		if (nwr > 0) {
-			/* Go past the data just written. */
-			if (nwr >= towrite)
-				return sz;
-			towrite -= nwr;
-			buf = (char *) buf + nwr;
-		} else if (nwr < 0 && !sio_wouldblock(errno)) {
-			diag_raise();
-		}
-		if (! ev_is_active(coio)) {
-			ev_io_set(coio, coio->fd, EV_WRITE);
-			ev_io_start(loop(), coio);
-		}
-		/* Yield control to other fibers. */
-		fiber_testcancel();
+	ev_io_set(coio, coio->fd, EV_WRITE);
+	ev_io_start(loop(), coio);
+
+	do {
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
-
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
-	}
+		nwr = sio_write(coio->fd, buf, towrite);
+		if (nwr < 0) {
+			if (sio_wouldblock(errno))
+				continue;
+			break;
+		}
+		towrite -= nwr;
+		buf = (char *) buf + nwr;
+	} while (towrite > 0);
+
+	ev_io_stop(loop(), coio);
+
+	if (nwr < 0)
+		return -1;
+	return sz;
 }
 
 /*
@@ -456,66 +528,83 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
  * Put in an own function to workaround gcc bug with @finally
  */
 static inline ssize_t
-coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt)
+coio_flush(int fd, struct iovec **iov, size_t *offset, int iovcnt)
 {
-	sio_add_to_iov(iov, -offset);
-	ssize_t nwr = sio_writev(fd, iov, iovcnt);
-	sio_add_to_iov(iov, offset);
-	if (nwr < 0 && ! sio_wouldblock(errno))
-		diag_raise();
+	sio_add_to_iov(*iov, -*offset);
+	ssize_t nwr = sio_writev(fd, *iov, iovcnt);
+	sio_add_to_iov(*iov, *offset);
+	if (nwr < 0 && !sio_wouldblock(errno))
+		return -1;
+	if (nwr < 0)
+		return 0;
+	/* Successful write adjust iov and offset. */
+	*iov += sio_move_iov(*iov, nwr, offset);
 	return nwr;
 }
 
 ssize_t
 coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
-		    size_t size_hint, ev_tstamp timeout)
+		    ev_tstamp timeout)
 {
 	size_t total = 0;
 	size_t iov_len = 0;
 	struct iovec *end = iov + iovcnt;
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
-	check_coio_in_use(coio);
-	CoioGuard coio_guard(coio);
 
+	if (iovcnt == 0)
+		return 0;
+
+	if (check_coio_in_use(coio) != 0)
+		return -1;
+
+	ssize_t nwr;
 	/* Avoid a syscall in case of 0 iovcnt. */
-	while (iov < end) {
-		/* Write as much data as possible. */
-		ssize_t nwr = coio_flush(coio->fd, iov, iov_len,
-					 end - iov);
-		if (nwr >= 0) {
-			total += nwr;
-			/*
-			 * If there was a hint for the total size
-			 * of the vector, use it.
-			 */
-			if (size_hint > 0 && size_hint == total)
-				break;
-
-			iov += sio_move_iov(iov, nwr, &iov_len);
-			if (iov == end) {
-				assert(iov_len == 0);
-				break;
-			}
-		}
-		if (! ev_is_active(coio)) {
-			ev_io_set(coio, coio->fd, EV_WRITE);
-			ev_io_start(loop(), coio);
-		}
-		/* Yield control to other fibers. */
-		fiber_testcancel();
+	do {
+		nwr = coio_flush(coio->fd, &iov, &iov_len, end - iov);
+		if (nwr < 0)
+			break;
+		total += nwr;
+
+	} while (iov < end);
+
+	assert(nwr < 0 || iov_len == 0);
+	if (nwr > 0)
+		return total;
+
+	if (!sio_wouldblock(errno))
+		return -1;
+
+	ev_io_set(coio, coio->fd, EV_WRITE);
+	ev_io_start(loop(), coio);
+	do {
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
 		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
-		fiber_testcancel();
-
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
-	}
+		nwr = coio_flush(coio->fd, &iov, &iov_len, end - iov);
+		if (nwr < 0) {
+			if (sio_wouldblock(errno))
+				continue;
+			break;
+		}
+		total += nwr;
+	} while (iov < end);
+
+	ev_io_stop(loop(), coio);
+
+	if (nwr < 0)
+		return -1;
+	assert(iov_len == 0);
 	return total;
 }
 
@@ -533,36 +622,40 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	check_coio_in_use(coio);
-	CoioGuard coio_guard(coio);
+	if (check_coio_in_use(coio) != 0)
+		return -1;
 
-	while (true) {
-		/*
-		 * Sic: write as much data as possible,
-		 * assuming the socket is ready.
-		 */
-		ssize_t nwr = sio_sendto(coio->fd, buf, sz,
-					 flags, dest_addr, addrlen);
-		if (nwr > 0)
-			return nwr;
-		if (nwr < 0 && ! sio_wouldblock(errno))
-			diag_raise();
-		if (! ev_is_active(coio)) {
-			ev_io_set(coio, coio->fd, EV_WRITE);
-			ev_io_start(loop(), coio);
-		}
+	/*
+	 * Sic: write as much data as possible,
+	 * assuming the socket is ready.
+	 */
+	ssize_t nwr = sio_sendto(coio->fd, buf, sz, flags, dest_addr, addrlen);
+	if (nwr > 0 || !sio_wouldblock(errno))
+		return nwr;
+
+	ev_io_set(coio, coio->fd, EV_WRITE);
+	ev_io_start(loop(), coio);
+	do {
 		/*
 		 * Yield control to other fibers until
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
-	}
+		nwr = sio_sendto(coio->fd, buf, sz, flags, dest_addr,
+				 addrlen);
+	} while (nwr < 0 && sio_wouldblock(errno));
+
+	ev_io_stop(loop(), coio);
+
+	return nwr;
 }
 
 /**
@@ -579,36 +672,41 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	check_coio_in_use(coio);
-	CoioGuard coio_guard(coio);
+	if (check_coio_in_use(coio) != 0)
+		return -1;
 
-	while (true) {
-		/*
-		 * Read as much data as possible,
-		 * assuming the socket is ready.
-		 */
-		ssize_t nrd = sio_recvfrom(coio->fd, buf, sz, flags,
-					   src_addr, &addrlen);
-		if (nrd >= 0)
-			return nrd;
-		if (! sio_wouldblock(errno))
-			diag_raise();
-		if (! ev_is_active(coio)) {
-			ev_io_set(coio, coio->fd, EV_READ);
-			ev_io_start(loop(), coio);
-		}
+	/*
+	 * Read as much data as possible,
+	 * assuming the socket is ready.
+	 */
+	ssize_t nrd = sio_recvfrom(coio->fd, buf, sz, flags,
+				   src_addr, &addrlen);
+	if (nrd >= 0 || !sio_wouldblock(errno))
+		return nrd;
+
+	ev_io_set(coio, coio->fd, EV_READ);
+	ev_io_start(loop(), coio);
+	do {
 		/*
 		 * Yield control to other fibers until
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
-	}
+		nrd = sio_recvfrom(coio->fd, buf, sz, flags,
+				   src_addr, &addrlen);
+	} while (nrd < 0 && sio_wouldblock(errno));
+
+	ev_io_stop(loop(), coio);
+
+	return nrd;
 }
 
 static int
@@ -655,12 +753,13 @@ coio_service_init(struct coio_service *service, const char *name,
 	service->handler_param = handler_param;
 }
 
-void
+int
 coio_service_start(struct evio_service *service, const char *uri)
 {
 	if (evio_service_bind(service, uri) != 0 ||
 	    evio_service_listen(service) != 0)
-		diag_raise();
+		return -1;
+	return 0;
 }
 
 void
@@ -678,7 +777,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout)
 	coio_timeout_init(&start, &delay, timeout);
 	fiber_yield_timeout(delay);
 	ev_stat_stop(loop(), stat);
-	fiber_testcancel();
 }
 
 typedef void (*ev_child_cb)(ev_loop *, ev_child *, int);
@@ -706,7 +804,6 @@ coio_waitpid(pid_t pid)
 	fiber_set_cancellable(allow_cancel);
 	ev_child_stop(loop(), &cw);
 	int status = cw.rstatus;
-	fiber_testcancel();
 	return status;
 }
 
diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
index 6a2337689..4267a0459 100644
--- a/src/lib/core/coio.h
+++ b/src/lib/core/coio.h
@@ -33,6 +33,9 @@
 #include "fiber.h"
 #include "trivia/util.h"
 #if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
 #include "evio.h"
 
 /**
@@ -59,10 +62,6 @@ coio_connect(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 	return coio_connect_timeout(coio, uri, addr, addr_len, TIMEOUT_INFINITY);
 }
 
-void
-coio_bind(struct ev_io *coio, struct sockaddr *addr,
-	  socklen_t addrlen);
-
 int
 coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen,
 	    ev_tstamp timeout);
@@ -71,7 +70,7 @@ void
 coio_create(struct ev_io *coio, int fd);
 
 static inline void
-coio_close(ev_loop *loop, struct ev_io *coio)
+coio_destroy(ev_loop *loop, struct ev_io *coio)
 {
 	return evio_close(loop, coio);
 }
@@ -141,12 +140,12 @@ coio_write(struct ev_io *coio, const void *buf, size_t sz)
 
 ssize_t
 coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
-		    size_t size, ev_tstamp timeout);
+		    ev_tstamp timeout);
 
 static inline ssize_t
-coio_writev(struct ev_io *coio, struct iovec *iov, int iovcnt, size_t size)
+coio_writev(struct ev_io *coio, struct iovec *iov, int iovcnt)
 {
-	return coio_writev_timeout(coio, iov, iovcnt, size, TIMEOUT_INFINITY);
+	return coio_writev_timeout(coio, iov, iovcnt, TIMEOUT_INFINITY);
 }
 
 ssize_t
@@ -164,7 +163,7 @@ coio_service_init(struct coio_service *service, const char *name,
 		  fiber_func handler, void *handler_param);
 
 /** Wait until the service binds to the port. */
-void
+int
 coio_service_start(struct evio_service *service, const char *uri);
 
 void
@@ -185,8 +184,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay);
 int
 coio_waitpid(pid_t pid);
 
-extern "C" {
-#endif /* defined(__cplusplus) */
 
 /** \cond public */
 
diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h
index 1ad104985..3a83f8fe1 100644
--- a/src/lib/core/coio_buf.h
+++ b/src/lib/core/coio_buf.h
@@ -45,6 +45,8 @@ coio_bread(struct ev_io *coio, struct ibuf *buf, size_t sz)
 {
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_read_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -61,6 +63,8 @@ coio_bread_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_read_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
 			                    timeout);
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -71,6 +75,8 @@ coio_breadn(struct ev_io *coio, struct ibuf *buf, size_t sz)
 {
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_readn_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -87,6 +93,8 @@ coio_breadn_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_readn_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
 			                     timeout);
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
-- 
2.25.0



More information about the Tarantool-patches mailing list