[Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception
Georgy Kirichenko
georgy at tarantool.org
Tue Nov 19 19:04:53 MSK 2019
Relaying from C-written wal requires coio to be a C-compliant. So
get rid of exception from coio interface.
Part of #980
---
src/box/applier.cc | 52 ++++++----
src/box/box.cc | 9 +-
src/box/relay.cc | 11 ++-
src/box/xrow_io.cc | 59 +++++------
src/box/xrow_io.h | 11 ++-
src/lib/core/coio.cc | 212 +++++++++++++++++++++-------------------
src/lib/core/coio.h | 13 +--
src/lib/core/coio_buf.h | 8 ++
8 files changed, 207 insertions(+), 168 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a04d13564..294765195 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -173,8 +173,9 @@ applier_writer_f(va_list ap)
continue;
try {
struct xrow_header xrow;
- xrow_encode_vclock(&xrow, &replicaset.vclock);
- coio_write_xrow(&io, &xrow);
+ if (xrow_encode_vclock(&xrow, &replicaset.vclock) != 0 ||
+ coio_write_xrow(&io, &xrow) < 0)
+ diag_raise();
} catch (SocketError *e) {
/*
* There is no point trying to send ACKs if
@@ -308,9 +309,11 @@ applier_connect(struct applier *applier)
*/
applier->addr_len = sizeof(applier->addrstorage);
applier_set_state(applier, APPLIER_CONNECT);
- coio_connect(coio, uri, &applier->addr, &applier->addr_len);
+ if (coio_connect(coio, uri, &applier->addr, &applier->addr_len) != 0)
+ diag_raise();
assert(coio->fd >= 0);
- coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE);
+ if (coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
/* Decode instance version and name from greeting */
@@ -345,8 +348,9 @@ applier_connect(struct applier *applier)
* election on bootstrap.
*/
xrow_encode_vote(&row);
- coio_write_xrow(coio, &row);
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_write_xrow(coio, &row) < 0 ||
+ coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
if (row.type == IPROTO_OK) {
xrow_decode_ballot_xc(&row, &applier->ballot);
} else try {
@@ -374,8 +378,9 @@ applier_connect(struct applier *applier)
applier_set_state(applier, APPLIER_AUTH);
xrow_encode_auth_xc(&row, greeting.salt, greeting.salt_len, uri->login,
uri->login_len, uri->password, uri->password_len);
- coio_write_xrow(coio, &row);
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_write_xrow(coio, &row) < 0 ||
+ coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
if (row.type != IPROTO_OK)
xrow_decode_error_xc(&row); /* auth failed */
@@ -397,7 +402,8 @@ applier_join(struct applier *applier)
struct ibuf *ibuf = &applier->ibuf;
struct xrow_header row;
xrow_encode_join_xc(&row, &INSTANCE_UUID);
- coio_write_xrow(coio, &row);
+ if (coio_write_xrow(coio, &row) < 0)
+ diag_raise();
/**
* Tarantool < 1.7.0: if JOIN is successful, there is no "OK"
@@ -405,7 +411,8 @@ applier_join(struct applier *applier)
*/
if (applier->version_id >= version_id(1, 7, 0)) {
/* Decode JOIN response */
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row); /* re-throw error */
} else if (row.type != IPROTO_OK) {
@@ -428,7 +435,8 @@ applier_join(struct applier *applier)
*/
uint64_t row_count = 0;
while (true) {
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
if (apply_initial_join_row(&row) != 0)
@@ -470,7 +478,8 @@ applier_join(struct applier *applier)
* Receive final data.
*/
while (true) {
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
@@ -529,10 +538,13 @@ applier_read_tx_row(struct applier *applier)
* from the master for quite a while the connection is
* broken - the master might just be idle.
*/
- if (applier->version_id < version_id(1, 7, 7))
- coio_read_xrow(coio, ibuf, row);
- else
- coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+ if (applier->version_id < version_id(1, 7, 7)) {
+ if (coio_read_xrow(coio, ibuf, row) < 0)
+ diag_raise();
+ } else {
+ if (coio_read_xrow_timeout(coio, ibuf, row, timeout) < 0)
+ diag_raise();
+ }
applier->lag = ev_now(loop()) - row->tm;
applier->last_row_time = ev_monotonic_now(loop());
@@ -792,11 +804,13 @@ applier_subscribe(struct applier *applier)
vclock_copy(&vclock, &replicaset.vclock);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&vclock);
- coio_write_xrow(coio, &row);
+ if (coio_write_xrow(coio, &row) < 0)
+ diag_raise();
/* Read SUBSCRIBE response */
if (applier->version_id >= version_id(1, 6, 7)) {
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row); /* error */
} else if (row.type != IPROTO_OK) {
@@ -933,7 +947,7 @@ applier_disconnect(struct applier *applier, enum applier_state state)
applier->writer = NULL;
}
- coio_close(loop(), &applier->io);
+ coio_destroy(loop(), &applier->io);
/* Clear all unparsed input. */
ibuf_reinit(&applier->ibuf);
fiber_gc();
diff --git a/src/box/box.cc b/src/box/box.cc
index a53b6e912..6323e5e6e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1497,7 +1497,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
struct xrow_header row;
xrow_encode_vclock_xc(&row, &stop_vclock);
row.sync = header->sync;
- coio_write_xrow(io, &row);
+ if (coio_write_xrow(io, &row) < 0)
+ diag_raise();
/*
* Final stage: feed replica with WALs in range
@@ -1509,7 +1510,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
/* Send end of WAL stream marker */
xrow_encode_vclock_xc(&row, &replicaset.vclock);
row.sync = header->sync;
- coio_write_xrow(io, &row);
+ if (coio_write_xrow(io, &row) < 0)
+ diag_raise();
/*
* Advance the WAL consumer state to the position where
@@ -1591,7 +1593,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
assert(self != NULL); /* the local registration is read-only */
row.replica_id = self->id;
row.sync = header->sync;
- coio_write_xrow(io, &row);
+ if (coio_write_xrow(io, &row) < 0)
+ diag_raise();
say_info("subscribed replica %s at %s",
tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 5c2b0067e..202620694 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -318,7 +318,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
struct xrow_header row;
xrow_encode_vclock_xc(&row, vclock);
row.sync = sync;
- coio_write_xrow(&relay->io, &row);
+ if (coio_write_xrow(&relay->io, &row) < 0)
+ diag_raise();
/* Send read view to the replica. */
engine_join_xc(&ctx, &relay->stream);
@@ -517,8 +518,9 @@ relay_reader_f(va_list ap)
try {
while (!fiber_is_cancelled()) {
struct xrow_header xrow;
- coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
- replication_disconnect_timeout());
+ if (coio_read_xrow_timeout(&io, &ibuf, &xrow,
+ replication_disconnect_timeout()) < 0)
+ diag_raise();
/* vclock is followed while decoding, zeroing it. */
vclock_create(&relay->recv_vclock);
xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
@@ -716,7 +718,8 @@ relay_send(struct relay *relay, struct xrow_header *packet)
packet->sync = relay->sync;
relay->last_row_time = ev_monotonic_now(loop());
- coio_write_xrow(&relay->io, packet);
+ if (coio_write_xrow(&relay->io, packet) < 0)
+ diag_raise();
fiber_gc();
struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 48707982b..f432c6b49 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -35,71 +35,74 @@
#include "error.h"
#include "msgpuck/msgpuck.h"
-void
+ssize_t
coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
{
/* Read fixed header */
- if (ibuf_used(in) < 1)
- coio_breadn(coio, in, 1);
+ if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0)
+ return -1;
/* Read length */
if (mp_typeof(*in->rpos) != MP_UINT) {
- tnt_raise(ClientError, ER_INVALID_MSGPACK,
- "packet length");
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "packet length");
+ return -1;
}
ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
- if (to_read > 0)
- coio_breadn(coio, in, to_read);
+ if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+ return -1;
uint32_t len = mp_decode_uint((const char **) &in->rpos);
/* Read header and body */
to_read = len - ibuf_used(in);
- if (to_read > 0)
- coio_breadn(coio, in, to_read);
+ if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+ return -1;
- xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
- true);
+ return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+ true);
}
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
- struct xrow_header *row, ev_tstamp timeout)
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+ struct xrow_header *row, ev_tstamp timeout)
{
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
/* Read fixed header */
- if (ibuf_used(in) < 1)
- coio_breadn_timeout(coio, in, 1, delay);
+ if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0)
+ return -1;
coio_timeout_update(&start, &delay);
/* Read length */
if (mp_typeof(*in->rpos) != MP_UINT) {
- tnt_raise(ClientError, ER_INVALID_MSGPACK,
- "packet length");
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "packet length");
+ return -1;
}
ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
- if (to_read > 0)
- coio_breadn_timeout(coio, in, to_read, delay);
+ if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+ return -1;
coio_timeout_update(&start, &delay);
uint32_t len = mp_decode_uint((const char **) &in->rpos);
/* Read header and body */
to_read = len - ibuf_used(in);
- if (to_read > 0)
- coio_breadn_timeout(coio, in, to_read, delay);
+ if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+ return -1;
- xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
- true);
+ return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+ true);
}
-
-void
+ssize_t
coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
{
struct iovec iov[XROW_IOVMAX];
- int iovcnt = xrow_to_iovec_xc(row, iov);
- coio_writev(coio, iov, iovcnt, 0);
+ int iovcnt = xrow_to_iovec(row, iov);
+ if (iovcnt < 0)
+ return -1;
+ return coio_writev(coio, iov, iovcnt, 0);
}
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..365a70db7 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -30,6 +30,7 @@
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+#include <unistd.h>
#if defined(__cplusplus)
extern "C" {
#endif
@@ -38,14 +39,14 @@ struct ev_io;
struct ibuf;
struct xrow_header;
-void
+ssize_t
coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row);
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
- struct xrow_header *row, double timeout);
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+ struct xrow_header *row, double timeout);
-void
+ssize_t
coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index e88d724d5..96a529c2c 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -41,12 +41,6 @@
#include "scoped_guard.h"
#include "coio_task.h" /* coio_resolve() */
-struct CoioGuard {
- struct ev_io *ev_io;
- CoioGuard(struct ev_io *arg) :ev_io(arg) {}
- ~CoioGuard() { ev_io_stop(loop(), ev_io); }
-};
-
typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int);
/** Note: this function does not throw */
@@ -65,12 +59,14 @@ coio_fiber_yield_timeout(struct ev_io *coio, ev_tstamp delay)
coio->data = fiber();
bool is_timedout = fiber_yield_timeout(delay);
coio->data = NULL;
+ if (is_timedout)
+ diag_set(TimedOut);
return is_timedout;
}
/**
* Connect to a host with a specified timeout.
- * @retval -1 timeout
+ * @retval -1 error or timeout
* @retval 0 connected
*/
static int
@@ -79,36 +75,43 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr,
{
ev_loop *loop = loop();
if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0)
- diag_raise();
- auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); });
- if (sio_connect(coio->fd, addr, len) == 0) {
- coio_guard.is_active = false;
+ return -1;
+ if (sio_connect(coio->fd, addr, len) == 0)
return 0;
- }
if (errno != EINPROGRESS)
- diag_raise();
+ goto close;
/*
* Wait until socket is ready for writing or
* timed out.
*/
ev_io_set(coio, coio->fd, EV_WRITE);
ev_io_start(loop, coio);
- bool is_timedout = coio_fiber_yield_timeout(coio, timeout);
+ bool is_timedout;
+ is_timedout = coio_fiber_yield_timeout(coio, timeout);
ev_io_stop(loop, coio);
- fiber_testcancel();
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ goto close;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
- int error = EINPROGRESS;
- socklen_t sz = sizeof(error);
+ goto close;
+ int error;
+ socklen_t sz;
+ error = EINPROGRESS;
+ sz = sizeof(error);
if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR,
&error, &sz))
- diag_raise();
+ goto close;
if (error != 0) {
errno = error;
- tnt_raise(SocketError, sio_socketname(coio->fd), "connect");
+ diag_set(SocketError, sio_socketname(coio->fd), "connect");
+ goto close;
}
- coio_guard.is_active = false;
return 0;
+
+close:
+ evio_close(loop, coio);
+ return -1;
}
void
@@ -152,7 +155,7 @@ coio_fill_addrinfo(struct addrinfo *ai_local, const char *host,
* This function also supports UNIX domain sockets if uri->path is not NULL and
* uri->service is NULL.
*
- * @retval -1 timeout
+ * @retval -1 error or timeout
* @retval 0 connected
*/
int
@@ -201,41 +204,37 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE;
hints.ai_protocol = 0;
int rc = coio_getaddrinfo(host, service, &hints, &ai, delay);
- if (rc != 0) {
- diag_raise();
- panic("unspecified getaddrinfo error");
- }
+ if (rc != 0)
+ return -1;
}
- auto addrinfo_guard = make_scoped_guard([=] {
- if (!uri->host_hint) freeaddrinfo(ai);
- else free(ai_local.ai_addr);
- });
+ struct addrinfo *first_ai = ai;
evio_timeout_update(loop(), &start, &delay);
coio_timeout_init(&start, &delay, timeout);
assert(! evio_has_fd(coio));
- while (ai) {
- try {
- if (coio_connect_addr(coio, ai->ai_addr,
- ai->ai_addrlen, delay))
- return -1;
+ while (ai && delay >= 0) {
+ if (coio_connect_addr(coio, ai->ai_addr,
+ ai->ai_addrlen, delay) == 0) {
if (addr != NULL) {
assert(addr_len != NULL);
*addr_len = MIN(ai->ai_addrlen, *addr_len);
memcpy(addr, ai->ai_addr, *addr_len);
}
return 0; /* connected */
- } catch (SocketError *e) {
- if (ai->ai_next == NULL)
- throw;
- /* ignore exception and try the next address */
}
- ai = ai->ai_next;
ev_now_update(loop);
coio_timeout_update(&start, &delay);
+ ai = ai->ai_next;
}
- tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
+ /* Set an error if not timedout. */
+ if (delay >= 0)
+ diag_set(SocketError, sio_socketname(coio->fd), "connection failed");
+ if (!uri->host_hint)
+ freeaddrinfo(first_ai);
+ else
+ free(ai_local.ai_addr);
+ return -1;
}
/**
@@ -249,8 +248,6 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- CoioGuard coio_guard(coio);
-
while (true) {
/* Assume that there are waiting clients
* available */
@@ -259,12 +256,12 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
if (evio_setsockopt_client(fd, addr->sa_family,
SOCK_STREAM) != 0) {
close(fd);
- diag_raise();
+ return -1;
}
return fd;
}
if (! sio_wouldblock(errno))
- diag_raise();
+ return -1;
/* The socket is not ready, yield */
if (! ev_is_active(coio)) {
ev_io_set(coio, coio->fd, EV_READ);
@@ -275,11 +272,16 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
* timeout is reached.
*/
bool is_timedout = coio_fiber_yield_timeout(coio, delay);
- fiber_testcancel();
+ ev_io_stop(loop(), coio);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
}
+ return -1;
}
/**
@@ -302,8 +304,6 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
ssize_t to_read = (ssize_t) sz;
- CoioGuard coio_guard(coio);
-
while (true) {
/*
* Sic: assume the socket is ready: since
@@ -320,9 +320,8 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
} else if (nrd == 0) {
errno = 0;
return sz - to_read;
- } else if (! sio_wouldblock(errno)) {
- diag_raise();
- }
+ } else if (! sio_wouldblock(errno))
+ return -1;
/* The socket is not ready, yield */
if (! ev_is_active(coio)) {
@@ -333,19 +332,23 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
* Yield control to other fibers until the
* timeout is being reached.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ ev_io_stop(loop(), coio);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
}
+ return -1;
}
/**
* Read at least sz bytes, with readahead.
*
- * Treats EOF as an error, and throws an exception.
+ * Treats EOF as an error.
*
* @retval the number of bytes read, > 0.
*/
@@ -355,8 +358,9 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
ssize_t nrd = coio_read_ahead(coio, buf, sz, bufsiz);
if (nrd < (ssize_t)sz) {
errno = EPIPE;
- tnt_raise(SocketError, sio_socketname(coio->fd),
- "unexpected EOF when reading from socket");
+ diag_set(SocketError, sio_socketname(coio->fd),
+ "unexpected EOF when reading from socket");
+ return -1;
}
return nrd;
}
@@ -364,7 +368,7 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
/**
* Read at least sz bytes, with readahead and timeout.
*
- * Treats EOF as an error, and throws an exception.
+ * Treats EOF as an error.
*
* @retval the number of bytes read, > 0.
*/
@@ -375,8 +379,9 @@ coio_readn_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz
ssize_t nrd = coio_read_ahead_timeout(coio, buf, sz, bufsiz, timeout);
if (nrd < (ssize_t)sz && errno == 0) { /* EOF. */
errno = EPIPE;
- tnt_raise(SocketError, sio_socketname(coio->fd),
- "unexpected EOF when reading from socket");
+ diag_set(SocketError, sio_socketname(coio->fd),
+ "unexpected EOF when reading from socket");
+ return -1;
}
return nrd;
}
@@ -399,8 +404,6 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- CoioGuard coio_guard(coio);
-
while (true) {
/*
* Sic: write as much data as possible,
@@ -413,28 +416,28 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
return sz;
towrite -= nwr;
buf = (char *) buf + nwr;
- } else if (nwr < 0 && !sio_wouldblock(errno)) {
- diag_raise();
- }
+ } else if (nwr < 0 && !sio_wouldblock(errno))
+ return -1;
if (! ev_is_active(coio)) {
ev_io_set(coio, coio->fd, EV_WRITE);
ev_io_start(loop(), coio);
}
- /* Yield control to other fibers. */
- fiber_testcancel();
/*
* Yield control to other fibers until the
* timeout is reached or the socket is
* ready.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
-
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ ev_io_stop(loop(), coio);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
}
+ return -1;
}
/*
@@ -446,9 +449,11 @@ coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt)
{
sio_add_to_iov(iov, -offset);
ssize_t nwr = sio_writev(fd, iov, iovcnt);
+ if (nwr < 0 && !sio_wouldblock(errno))
+ return -1;
sio_add_to_iov(iov, offset);
- if (nwr < 0 && ! sio_wouldblock(errno))
- diag_raise();
+ if (nwr < 0)
+ return 0;
return nwr;
}
@@ -461,14 +466,15 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
struct iovec *end = iov + iovcnt;
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- CoioGuard coio_guard(coio);
/* Avoid a syscall in case of 0 iovcnt. */
while (iov < end) {
/* Write as much data as possible. */
ssize_t nwr = coio_flush(coio->fd, iov, iov_len,
end - iov);
- if (nwr >= 0) {
+ if (nwr < 0)
+ return -1;
+ if (nwr > 0) {
total += nwr;
/*
* If there was a hint for the total size
@@ -487,18 +493,19 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
ev_io_set(coio, coio->fd, EV_WRITE);
ev_io_start(loop(), coio);
}
- /* Yield control to other fibers. */
- fiber_testcancel();
/*
* Yield control to other fibers until the
* timeout is reached or the socket is
* ready.
*/
bool is_timedout = coio_fiber_yield_timeout(coio, delay);
- fiber_testcancel();
-
+ ev_io_stop(loop(), coio);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ return -1;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ return -1;
coio_timeout_update(&start, &delay);
}
return total;
@@ -518,8 +525,6 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- CoioGuard coio_guard(coio);
-
while (true) {
/*
* Sic: write as much data as possible,
@@ -530,7 +535,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
if (nwr > 0)
return nwr;
if (nwr < 0 && ! sio_wouldblock(errno))
- diag_raise();
+ return -1;
if (! ev_is_active(coio)) {
ev_io_set(coio, coio->fd, EV_WRITE);
ev_io_start(loop(), coio);
@@ -540,13 +545,17 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
* timeout is reached or the socket is
* ready.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ ev_io_stop(loop(), coio);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
}
+ return -1;
}
/**
@@ -563,8 +572,6 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- CoioGuard coio_guard(coio);
-
while (true) {
/*
* Read as much data as possible,
@@ -575,7 +582,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
if (nrd >= 0)
return nrd;
if (! sio_wouldblock(errno))
- diag_raise();
+ return -1;
if (! ev_is_active(coio)) {
ev_io_set(coio, coio->fd, EV_READ);
ev_io_start(loop(), coio);
@@ -585,13 +592,17 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
* timeout is reached or the socket is
* ready.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ ev_io_stop(loop(), coio);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
}
+ return -1;
}
static int
@@ -638,12 +649,13 @@ coio_service_init(struct coio_service *service, const char *name,
service->handler_param = handler_param;
}
-void
+int
coio_service_start(struct evio_service *service, const char *uri)
{
if (evio_service_bind(service, uri) != 0 ||
evio_service_listen(service) != 0)
- diag_raise();
+ return -1;
+ return -0;
}
void
@@ -661,7 +673,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout)
coio_timeout_init(&start, &delay, timeout);
fiber_yield_timeout(delay);
ev_stat_stop(loop(), stat);
- fiber_testcancel();
}
typedef void (*ev_child_cb)(ev_loop *, ev_child *, int);
@@ -689,7 +700,6 @@ coio_waitpid(pid_t pid)
fiber_set_cancellable(allow_cancel);
ev_child_stop(loop(), &cw);
int status = cw.rstatus;
- fiber_testcancel();
return status;
}
diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
index 6a2337689..d557f2869 100644
--- a/src/lib/core/coio.h
+++ b/src/lib/core/coio.h
@@ -33,6 +33,9 @@
#include "fiber.h"
#include "trivia/util.h"
#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
#include "evio.h"
/**
@@ -59,10 +62,6 @@ coio_connect(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
return coio_connect_timeout(coio, uri, addr, addr_len, TIMEOUT_INFINITY);
}
-void
-coio_bind(struct ev_io *coio, struct sockaddr *addr,
- socklen_t addrlen);
-
int
coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen,
ev_tstamp timeout);
@@ -71,7 +70,7 @@ void
coio_create(struct ev_io *coio, int fd);
static inline void
-coio_close(ev_loop *loop, struct ev_io *coio)
+coio_destroy(ev_loop *loop, struct ev_io *coio)
{
return evio_close(loop, coio);
}
@@ -164,7 +163,7 @@ coio_service_init(struct coio_service *service, const char *name,
fiber_func handler, void *handler_param);
/** Wait until the service binds to the port. */
-void
+int
coio_service_start(struct evio_service *service, const char *uri);
void
@@ -185,8 +184,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay);
int
coio_waitpid(pid_t pid);
-extern "C" {
-#endif /* defined(__cplusplus) */
/** \cond public */
diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h
index 1ad104985..3a83f8fe1 100644
--- a/src/lib/core/coio_buf.h
+++ b/src/lib/core/coio_buf.h
@@ -45,6 +45,8 @@ coio_bread(struct ev_io *coio, struct ibuf *buf, size_t sz)
{
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_read_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
@@ -61,6 +63,8 @@ coio_bread_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_read_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
timeout);
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
@@ -71,6 +75,8 @@ coio_breadn(struct ev_io *coio, struct ibuf *buf, size_t sz)
{
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_readn_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
@@ -87,6 +93,8 @@ coio_breadn_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_readn_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
timeout);
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
--
2.24.0
More information about the Tarantool-patches
mailing list