Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception
Date: Tue, 19 Nov 2019 19:04:53 +0300	[thread overview]
Message-ID: <9c1f887d3357f902948c1a557946e10006d31043.1574178520.git.georgy@tarantool.org> (raw)
In-Reply-To: <cover.1574178520.git.georgy@tarantool.org>

Relaying from C-written wal requires coio to be a C-compliant. So
get rid of exception from coio interface.

Part of #980
---
 src/box/applier.cc      |  52 ++++++----
 src/box/box.cc          |   9 +-
 src/box/relay.cc        |  11 ++-
 src/box/xrow_io.cc      |  59 +++++------
 src/box/xrow_io.h       |  11 ++-
 src/lib/core/coio.cc    | 212 +++++++++++++++++++++-------------------
 src/lib/core/coio.h     |  13 +--
 src/lib/core/coio_buf.h |   8 ++
 8 files changed, 207 insertions(+), 168 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a04d13564..294765195 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -173,8 +173,9 @@ applier_writer_f(va_list ap)
 			continue;
 		try {
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset.vclock);
-			coio_write_xrow(&io, &xrow);
+			if (xrow_encode_vclock(&xrow, &replicaset.vclock) != 0 ||
+			    coio_write_xrow(&io, &xrow) < 0)
+				diag_raise();
 		} catch (SocketError *e) {
 			/*
 			 * There is no point trying to send ACKs if
@@ -308,9 +309,11 @@ applier_connect(struct applier *applier)
 	 */
 	applier->addr_len = sizeof(applier->addrstorage);
 	applier_set_state(applier, APPLIER_CONNECT);
-	coio_connect(coio, uri, &applier->addr, &applier->addr_len);
+	if (coio_connect(coio, uri, &applier->addr, &applier->addr_len) != 0)
+		diag_raise();
 	assert(coio->fd >= 0);
-	coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE);
+	if (coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE) < 0)
+		diag_raise();
 	applier->last_row_time = ev_monotonic_now(loop());
 
 	/* Decode instance version and name from greeting */
@@ -345,8 +348,9 @@ applier_connect(struct applier *applier)
 	 * election on bootstrap.
 	 */
 	xrow_encode_vote(&row);
-	coio_write_xrow(coio, &row);
-	coio_read_xrow(coio, ibuf, &row);
+	if (coio_write_xrow(coio, &row) < 0 ||
+	    coio_read_xrow(coio, ibuf, &row) < 0)
+		diag_raise();
 	if (row.type == IPROTO_OK) {
 		xrow_decode_ballot_xc(&row, &applier->ballot);
 	} else try {
@@ -374,8 +378,9 @@ applier_connect(struct applier *applier)
 	applier_set_state(applier, APPLIER_AUTH);
 	xrow_encode_auth_xc(&row, greeting.salt, greeting.salt_len, uri->login,
 			    uri->login_len, uri->password, uri->password_len);
-	coio_write_xrow(coio, &row);
-	coio_read_xrow(coio, ibuf, &row);
+	if (coio_write_xrow(coio, &row) < 0 ||
+	    coio_read_xrow(coio, ibuf, &row) < 0)
+		diag_raise();
 	applier->last_row_time = ev_monotonic_now(loop());
 	if (row.type != IPROTO_OK)
 		xrow_decode_error_xc(&row); /* auth failed */
@@ -397,7 +402,8 @@ applier_join(struct applier *applier)
 	struct ibuf *ibuf = &applier->ibuf;
 	struct xrow_header row;
 	xrow_encode_join_xc(&row, &INSTANCE_UUID);
-	coio_write_xrow(coio, &row);
+	if (coio_write_xrow(coio, &row) < 0)
+		diag_raise();
 
 	/**
 	 * Tarantool < 1.7.0: if JOIN is successful, there is no "OK"
@@ -405,7 +411,8 @@ applier_join(struct applier *applier)
 	 */
 	if (applier->version_id >= version_id(1, 7, 0)) {
 		/* Decode JOIN response */
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		if (iproto_type_is_error(row.type)) {
 			xrow_decode_error_xc(&row); /* re-throw error */
 		} else if (row.type != IPROTO_OK) {
@@ -428,7 +435,8 @@ applier_join(struct applier *applier)
 	 */
 	uint64_t row_count = 0;
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			if (apply_initial_join_row(&row) != 0)
@@ -470,7 +478,8 @@ applier_join(struct applier *applier)
 	 * Receive final data.
 	 */
 	while (true) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
 			vclock_follow_xrow(&replicaset.vclock, &row);
@@ -529,10 +538,13 @@ applier_read_tx_row(struct applier *applier)
 	 * from the master for quite a while the connection is
 	 * broken - the master might just be idle.
 	 */
-	if (applier->version_id < version_id(1, 7, 7))
-		coio_read_xrow(coio, ibuf, row);
-	else
-		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+	if (applier->version_id < version_id(1, 7, 7)) {
+		if (coio_read_xrow(coio, ibuf, row) < 0)
+			diag_raise();
+	} else {
+		if (coio_read_xrow_timeout(coio, ibuf, row, timeout) < 0)
+			diag_raise();
+	}
 
 	applier->lag = ev_now(loop()) - row->tm;
 	applier->last_row_time = ev_monotonic_now(loop());
@@ -792,11 +804,13 @@ applier_subscribe(struct applier *applier)
 	vclock_copy(&vclock, &replicaset.vclock);
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &vclock);
-	coio_write_xrow(coio, &row);
+	if (coio_write_xrow(coio, &row) < 0)
+		diag_raise();
 
 	/* Read SUBSCRIBE response */
 	if (applier->version_id >= version_id(1, 6, 7)) {
-		coio_read_xrow(coio, ibuf, &row);
+		if (coio_read_xrow(coio, ibuf, &row) < 0)
+			diag_raise();
 		if (iproto_type_is_error(row.type)) {
 			xrow_decode_error_xc(&row);  /* error */
 		} else if (row.type != IPROTO_OK) {
@@ -933,7 +947,7 @@ applier_disconnect(struct applier *applier, enum applier_state state)
 		applier->writer = NULL;
 	}
 
-	coio_close(loop(), &applier->io);
+	coio_destroy(loop(), &applier->io);
 	/* Clear all unparsed input. */
 	ibuf_reinit(&applier->ibuf);
 	fiber_gc();
diff --git a/src/box/box.cc b/src/box/box.cc
index a53b6e912..6323e5e6e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1497,7 +1497,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, &stop_vclock);
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	/*
 	 * Final stage: feed replica with WALs in range
@@ -1509,7 +1510,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
 	/* Send end of WAL stream marker */
 	xrow_encode_vclock_xc(&row, &replicaset.vclock);
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	/*
 	 * Advance the WAL consumer state to the position where
@@ -1591,7 +1593,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	assert(self != NULL); /* the local registration is read-only */
 	row.replica_id = self->id;
 	row.sync = header->sync;
-	coio_write_xrow(io, &row);
+	if (coio_write_xrow(io, &row) < 0)
+		diag_raise();
 
 	say_info("subscribed replica %s at %s",
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 5c2b0067e..202620694 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -318,7 +318,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, vclock);
 	row.sync = sync;
-	coio_write_xrow(&relay->io, &row);
+	if (coio_write_xrow(&relay->io, &row) < 0)
+		diag_raise();
 
 	/* Send read view to the replica. */
 	engine_join_xc(&ctx, &relay->stream);
@@ -517,8 +518,9 @@ relay_reader_f(va_list ap)
 	try {
 		while (!fiber_is_cancelled()) {
 			struct xrow_header xrow;
-			coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
-					replication_disconnect_timeout());
+			if (coio_read_xrow_timeout(&io, &ibuf, &xrow,
+					replication_disconnect_timeout()) < 0)
+				diag_raise();
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
@@ -716,7 +718,8 @@ relay_send(struct relay *relay, struct xrow_header *packet)
 
 	packet->sync = relay->sync;
 	relay->last_row_time = ev_monotonic_now(loop());
-	coio_write_xrow(&relay->io, packet);
+	if (coio_write_xrow(&relay->io, packet) < 0)
+		diag_raise();
 	fiber_gc();
 
 	struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 48707982b..f432c6b49 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -35,71 +35,74 @@
 #include "error.h"
 #include "msgpuck/msgpuck.h"
 
-void
+ssize_t
 coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
 {
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
-		coio_breadn(coio, in, 1);
+	if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0)
+		return -1;
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
-		tnt_raise(ClientError, ER_INVALID_MSGPACK,
-			  "packet length");
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "packet length");
+		return -1;
 	}
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
-	if (to_read > 0)
-		coio_breadn(coio, in, to_read);
+	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+		return -1;
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
 	/* Read header and body */
 	to_read = len - ibuf_used(in);
-	if (to_read > 0)
-		coio_breadn(coio, in, to_read);
+	if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+		return -1;
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
-			      true);
+	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+				  true);
 }
 
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
-			  struct xrow_header *row, ev_tstamp timeout)
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+		       struct xrow_header *row, ev_tstamp timeout)
 {
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 	/* Read fixed header */
-	if (ibuf_used(in) < 1)
-		coio_breadn_timeout(coio, in, 1, delay);
+	if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0)
+		return -1;
 	coio_timeout_update(&start, &delay);
 
 	/* Read length */
 	if (mp_typeof(*in->rpos) != MP_UINT) {
-		tnt_raise(ClientError, ER_INVALID_MSGPACK,
-			  "packet length");
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "packet length");
+		return -1;
 	}
 	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
-	if (to_read > 0)
-		coio_breadn_timeout(coio, in, to_read, delay);
+	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+		return -1;
 	coio_timeout_update(&start, &delay);
 
 	uint32_t len = mp_decode_uint((const char **) &in->rpos);
 
 	/* Read header and body */
 	to_read = len - ibuf_used(in);
-	if (to_read > 0)
-		coio_breadn_timeout(coio, in, to_read, delay);
+	if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+		return -1;
 
-	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
-			      true);
+	return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+				  true);
 }
 
-
-void
+ssize_t
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
 {
 	struct iovec iov[XROW_IOVMAX];
-	int iovcnt = xrow_to_iovec_xc(row, iov);
-	coio_writev(coio, iov, iovcnt, 0);
+	int iovcnt = xrow_to_iovec(row, iov);
+	if (iovcnt < 0)
+		return -1;
+	return coio_writev(coio, iov, iovcnt, 0);
 }
 
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..365a70db7 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -30,6 +30,7 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+#include <unistd.h>
 #if defined(__cplusplus)
 extern "C" {
 #endif
@@ -38,14 +39,14 @@ struct ev_io;
 struct ibuf;
 struct xrow_header;
 
-void
+ssize_t
 coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row);
 
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
-			  struct xrow_header *row, double timeout);
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+		       struct xrow_header *row, double timeout);
 
-void
+ssize_t
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
 
 
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index e88d724d5..96a529c2c 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -41,12 +41,6 @@
 #include "scoped_guard.h"
 #include "coio_task.h" /* coio_resolve() */
 
-struct CoioGuard {
-	struct ev_io *ev_io;
-	CoioGuard(struct ev_io *arg) :ev_io(arg) {}
-	~CoioGuard() { ev_io_stop(loop(), ev_io); }
-};
-
 typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int);
 
 /** Note: this function does not throw */
@@ -65,12 +59,14 @@ coio_fiber_yield_timeout(struct ev_io *coio, ev_tstamp delay)
 	coio->data = fiber();
 	bool is_timedout = fiber_yield_timeout(delay);
 	coio->data = NULL;
+	if (is_timedout)
+		diag_set(TimedOut);
 	return is_timedout;
 }
 
 /**
  * Connect to a host with a specified timeout.
- * @retval -1 timeout
+ * @retval -1 error or timeout
  * @retval 0 connected
  */
 static int
@@ -79,36 +75,43 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr,
 {
 	ev_loop *loop = loop();
 	if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0)
-		diag_raise();
-	auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); });
-	if (sio_connect(coio->fd, addr, len) == 0) {
-		coio_guard.is_active = false;
+		return -1;
+	if (sio_connect(coio->fd, addr, len) == 0)
 		return 0;
-	}
 	if (errno != EINPROGRESS)
-		diag_raise();
+		goto close;
 	/*
 	 * Wait until socket is ready for writing or
 	 * timed out.
 	 */
 	ev_io_set(coio, coio->fd, EV_WRITE);
 	ev_io_start(loop, coio);
-	bool is_timedout = coio_fiber_yield_timeout(coio, timeout);
+	bool is_timedout;
+	is_timedout = coio_fiber_yield_timeout(coio, timeout);
 	ev_io_stop(loop, coio);
-	fiber_testcancel();
+	if (fiber_is_cancelled()) {
+		diag_set(FiberIsCancelled);
+		goto close;
+	}
 	if (is_timedout)
-		tnt_raise(TimedOut);
-	int error = EINPROGRESS;
-	socklen_t sz = sizeof(error);
+		goto close;
+	int error;
+	socklen_t sz;
+	error = EINPROGRESS;
+	sz = sizeof(error);
 	if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR,
 		       &error, &sz))
-		diag_raise();
+		goto close;
 	if (error != 0) {
 		errno = error;
-		tnt_raise(SocketError, sio_socketname(coio->fd), "connect");
+		diag_set(SocketError, sio_socketname(coio->fd), "connect");
+		goto close;
 	}
-	coio_guard.is_active = false;
 	return 0;
+
+close:
+	evio_close(loop, coio);
+	return -1;
 }
 
 void
@@ -152,7 +155,7 @@ coio_fill_addrinfo(struct addrinfo *ai_local, const char *host,
  * This function also supports UNIX domain sockets if uri->path is not NULL and
  * uri->service is NULL.
  *
- * @retval -1 timeout
+ * @retval -1 error or timeout
  * @retval 0 connected
  */
 int
@@ -201,41 +204,37 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 	    hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE;
 	    hints.ai_protocol = 0;
 	    int rc = coio_getaddrinfo(host, service, &hints, &ai, delay);
-	    if (rc != 0) {
-		    diag_raise();
-		    panic("unspecified getaddrinfo error");
-	    }
+	    if (rc != 0)
+			return -1;
 	}
-	auto addrinfo_guard = make_scoped_guard([=] {
-		if (!uri->host_hint) freeaddrinfo(ai);
-		else free(ai_local.ai_addr);
-	});
+	struct addrinfo *first_ai = ai;
 	evio_timeout_update(loop(), &start, &delay);
 
 	coio_timeout_init(&start, &delay, timeout);
 	assert(! evio_has_fd(coio));
-	while (ai) {
-		try {
-			if (coio_connect_addr(coio, ai->ai_addr,
-					      ai->ai_addrlen, delay))
-				return -1;
+	while (ai && delay >= 0) {
+		if (coio_connect_addr(coio, ai->ai_addr,
+				      ai->ai_addrlen, delay) == 0) {
 			if (addr != NULL) {
 				assert(addr_len != NULL);
 				*addr_len = MIN(ai->ai_addrlen, *addr_len);
 				memcpy(addr, ai->ai_addr, *addr_len);
 			}
 			return 0; /* connected */
-		} catch (SocketError *e) {
-			if (ai->ai_next == NULL)
-				throw;
-			/* ignore exception and try the next address */
 		}
-		ai = ai->ai_next;
 		ev_now_update(loop);
 		coio_timeout_update(&start, &delay);
+		ai = ai->ai_next;
 	}
 
-	tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
+	/* Set an error if not timedout. */
+	if (delay >= 0)
+		diag_set(SocketError, sio_socketname(coio->fd), "connection failed");
+	if (!uri->host_hint)
+		freeaddrinfo(first_ai);
+	else
+		free(ai_local.ai_addr);
+	return -1;
 }
 
 /**
@@ -249,8 +248,6 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/* Assume that there are waiting clients
 		 * available */
@@ -259,12 +256,12 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 			if (evio_setsockopt_client(fd, addr->sa_family,
 						   SOCK_STREAM) != 0) {
 				close(fd);
-				diag_raise();
+				return -1;
 			}
 			return fd;
 		}
 		if (! sio_wouldblock(errno))
-			diag_raise();
+			return -1;
 		/* The socket is not ready, yield */
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_READ);
@@ -275,11 +272,16 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
 		 * timeout is reached.
 		 */
 		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
-		fiber_testcancel();
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /**
@@ -302,8 +304,6 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 
 	ssize_t to_read = (ssize_t) sz;
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Sic: assume the socket is ready: since
@@ -320,9 +320,8 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 		} else if (nrd == 0) {
 			errno = 0;
 			return sz - to_read;
-		} else if (! sio_wouldblock(errno)) {
-			diag_raise();
-		}
+		} else if (! sio_wouldblock(errno))
+			return -1;
 
 		/* The socket is not ready, yield */
 		if (! ev_is_active(coio)) {
@@ -333,19 +332,23 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
 		 * Yield control to other fibers until the
 		 * timeout is being reached.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /**
  * Read at least sz bytes, with readahead.
  *
- * Treats EOF as an error, and throws an exception.
+ * Treats EOF as an error.
  *
  * @retval the number of bytes read, > 0.
  */
@@ -355,8 +358,9 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
 	ssize_t nrd = coio_read_ahead(coio, buf, sz, bufsiz);
 	if (nrd < (ssize_t)sz) {
 		errno = EPIPE;
-		tnt_raise(SocketError, sio_socketname(coio->fd),
-			  "unexpected EOF when reading from socket");
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "unexpected EOF when reading from socket");
+		return -1;
 	}
 	return nrd;
 }
@@ -364,7 +368,7 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
 /**
  * Read at least sz bytes, with readahead and timeout.
  *
- * Treats EOF as an error, and throws an exception.
+ * Treats EOF as an error.
  *
  * @retval the number of bytes read, > 0.
  */
@@ -375,8 +379,9 @@ coio_readn_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz
 	ssize_t nrd = coio_read_ahead_timeout(coio, buf, sz, bufsiz, timeout);
 	if (nrd < (ssize_t)sz && errno == 0) { /* EOF. */
 		errno = EPIPE;
-		tnt_raise(SocketError, sio_socketname(coio->fd),
-			  "unexpected EOF when reading from socket");
+		diag_set(SocketError, sio_socketname(coio->fd),
+			 "unexpected EOF when reading from socket");
+		return -1;
 	}
 	return nrd;
 }
@@ -399,8 +404,6 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Sic: write as much data as possible,
@@ -413,28 +416,28 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
 				return sz;
 			towrite -= nwr;
 			buf = (char *) buf + nwr;
-		} else if (nwr < 0 && !sio_wouldblock(errno)) {
-			diag_raise();
-		}
+		} else if (nwr < 0 && !sio_wouldblock(errno))
+			return -1;
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_WRITE);
 			ev_io_start(loop(), coio);
 		}
-		/* Yield control to other fibers. */
-		fiber_testcancel();
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
-
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /*
@@ -446,9 +449,11 @@ coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt)
 {
 	sio_add_to_iov(iov, -offset);
 	ssize_t nwr = sio_writev(fd, iov, iovcnt);
+	if (nwr < 0 && !sio_wouldblock(errno))
+		return -1;
 	sio_add_to_iov(iov, offset);
-	if (nwr < 0 && ! sio_wouldblock(errno))
-		diag_raise();
+	if (nwr < 0)
+		return 0;
 	return nwr;
 }
 
@@ -461,14 +466,15 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
 	struct iovec *end = iov + iovcnt;
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
-	CoioGuard coio_guard(coio);
 
 	/* Avoid a syscall in case of 0 iovcnt. */
 	while (iov < end) {
 		/* Write as much data as possible. */
 		ssize_t nwr = coio_flush(coio->fd, iov, iov_len,
 					 end - iov);
-		if (nwr >= 0) {
+		if (nwr < 0)
+			return -1;
+		if (nwr > 0) {
 			total += nwr;
 			/*
 			 * If there was a hint for the total size
@@ -487,18 +493,19 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
 			ev_io_set(coio, coio->fd, EV_WRITE);
 			ev_io_start(loop(), coio);
 		}
-		/* Yield control to other fibers. */
-		fiber_testcancel();
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
 		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
-		fiber_testcancel();
-
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			return -1;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			return -1;
 		coio_timeout_update(&start, &delay);
 	}
 	return total;
@@ -518,8 +525,6 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Sic: write as much data as possible,
@@ -530,7 +535,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 		if (nwr > 0)
 			return nwr;
 		if (nwr < 0 && ! sio_wouldblock(errno))
-			diag_raise();
+			return -1;
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_WRITE);
 			ev_io_start(loop(), coio);
@@ -540,13 +545,17 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 /**
@@ -563,8 +572,6 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 	ev_tstamp start, delay;
 	coio_timeout_init(&start, &delay, timeout);
 
-	CoioGuard coio_guard(coio);
-
 	while (true) {
 		/*
 		 * Read as much data as possible,
@@ -575,7 +582,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 		if (nrd >= 0)
 			return nrd;
 		if (! sio_wouldblock(errno))
-			diag_raise();
+			return -1;
 		if (! ev_is_active(coio)) {
 			ev_io_set(coio, coio->fd, EV_READ);
 			ev_io_start(loop(), coio);
@@ -585,13 +592,17 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		bool is_timedout = coio_fiber_yield_timeout(coio,
-							    delay);
-		fiber_testcancel();
+		bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+		ev_io_stop(loop(), coio);
+		if (fiber_is_cancelled()) {
+			diag_set(FiberIsCancelled);
+			break;
+		}
 		if (is_timedout)
-			tnt_raise(TimedOut);
+			break;
 		coio_timeout_update(&start, &delay);
 	}
+	return -1;
 }
 
 static int
@@ -638,12 +649,13 @@ coio_service_init(struct coio_service *service, const char *name,
 	service->handler_param = handler_param;
 }
 
-void
+int
 coio_service_start(struct evio_service *service, const char *uri)
 {
 	if (evio_service_bind(service, uri) != 0 ||
 	    evio_service_listen(service) != 0)
-		diag_raise();
+		return -1;
+	return -0;
 }
 
 void
@@ -661,7 +673,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout)
 	coio_timeout_init(&start, &delay, timeout);
 	fiber_yield_timeout(delay);
 	ev_stat_stop(loop(), stat);
-	fiber_testcancel();
 }
 
 typedef void (*ev_child_cb)(ev_loop *, ev_child *, int);
@@ -689,7 +700,6 @@ coio_waitpid(pid_t pid)
 	fiber_set_cancellable(allow_cancel);
 	ev_child_stop(loop(), &cw);
 	int status = cw.rstatus;
-	fiber_testcancel();
 	return status;
 }
 
diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
index 6a2337689..d557f2869 100644
--- a/src/lib/core/coio.h
+++ b/src/lib/core/coio.h
@@ -33,6 +33,9 @@
 #include "fiber.h"
 #include "trivia/util.h"
 #if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
 #include "evio.h"
 
 /**
@@ -59,10 +62,6 @@ coio_connect(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
 	return coio_connect_timeout(coio, uri, addr, addr_len, TIMEOUT_INFINITY);
 }
 
-void
-coio_bind(struct ev_io *coio, struct sockaddr *addr,
-	  socklen_t addrlen);
-
 int
 coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen,
 	    ev_tstamp timeout);
@@ -71,7 +70,7 @@ void
 coio_create(struct ev_io *coio, int fd);
 
 static inline void
-coio_close(ev_loop *loop, struct ev_io *coio)
+coio_destroy(ev_loop *loop, struct ev_io *coio)
 {
 	return evio_close(loop, coio);
 }
@@ -164,7 +163,7 @@ coio_service_init(struct coio_service *service, const char *name,
 		  fiber_func handler, void *handler_param);
 
 /** Wait until the service binds to the port. */
-void
+int
 coio_service_start(struct evio_service *service, const char *uri);
 
 void
@@ -185,8 +184,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay);
 int
 coio_waitpid(pid_t pid);
 
-extern "C" {
-#endif /* defined(__cplusplus) */
 
 /** \cond public */
 
diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h
index 1ad104985..3a83f8fe1 100644
--- a/src/lib/core/coio_buf.h
+++ b/src/lib/core/coio_buf.h
@@ -45,6 +45,8 @@ coio_bread(struct ev_io *coio, struct ibuf *buf, size_t sz)
 {
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_read_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -61,6 +63,8 @@ coio_bread_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_read_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
 			                    timeout);
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -71,6 +75,8 @@ coio_breadn(struct ev_io *coio, struct ibuf *buf, size_t sz)
 {
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_readn_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
@@ -87,6 +93,8 @@ coio_breadn_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
 	ibuf_reserve_xc(buf, sz);
 	ssize_t n = coio_readn_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
 			                     timeout);
+	if (n < 0)
+		return -1;
 	buf->wpos += n;
 	return n;
 }
-- 
2.24.0

  parent reply	other threads:[~2019-11-19 16:05 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-11-19 16:04 [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Georgy Kirichenko
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 1/6] recovery: do not throw an error Georgy Kirichenko
2019-11-23 13:45   ` Vladislav Shpilevoy
2019-11-19 16:04 ` Georgy Kirichenko [this message]
2019-11-23 13:45   ` [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception Vladislav Shpilevoy
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 3/6] xstream: get rid of " Georgy Kirichenko
2019-11-23 13:45   ` Vladislav Shpilevoy
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 4/6] vinyl: do not insert vy_tx twice into writers list Georgy Kirichenko
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 5/6] box: improve recovery journal Georgy Kirichenko
2019-11-23 13:46   ` Vladislav Shpilevoy
2019-11-19 16:04 ` [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join Georgy Kirichenko
2019-11-23 13:46   ` Vladislav Shpilevoy
2019-11-20 17:15 ` [Tarantool-patches] [PATCH 0/6] Synchronous replication preparation Konstantin Osipov
2019-11-23 13:45 ` Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=9c1f887d3357f902948c1a557946e10006d31043.1574178520.git.georgy@tarantool.org \
    --to=georgy@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox