From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp45.i.mail.ru (smtp45.i.mail.ru [94.100.177.105]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 0CF36440F3E for ; Tue, 19 Nov 2019 19:05:02 +0300 (MSK) From: Georgy Kirichenko Date: Tue, 19 Nov 2019 19:04:53 +0300 Message-Id: <9c1f887d3357f902948c1a557946e10006d31043.1574178520.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 2/6] coio: do not htrow an exception List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Relaying from C-written wal requires coio to be a C-compliant. So get rid of exception from coio interface. Part of #980 --- src/box/applier.cc | 52 ++++++---- src/box/box.cc | 9 +- src/box/relay.cc | 11 ++- src/box/xrow_io.cc | 59 +++++------ src/box/xrow_io.h | 11 ++- src/lib/core/coio.cc | 212 +++++++++++++++++++++------------------- src/lib/core/coio.h | 13 +-- src/lib/core/coio_buf.h | 8 ++ 8 files changed, 207 insertions(+), 168 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index a04d13564..294765195 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -173,8 +173,9 @@ applier_writer_f(va_list ap) continue; try { struct xrow_header xrow; - xrow_encode_vclock(&xrow, &replicaset.vclock); - coio_write_xrow(&io, &xrow); + if (xrow_encode_vclock(&xrow, &replicaset.vclock) != 0 || + coio_write_xrow(&io, &xrow) < 0) + diag_raise(); } catch (SocketError *e) { /* * There is no point trying to send ACKs if @@ -308,9 +309,11 @@ applier_connect(struct applier *applier) */ applier->addr_len = sizeof(applier->addrstorage); applier_set_state(applier, APPLIER_CONNECT); - coio_connect(coio, uri, &applier->addr, &applier->addr_len); + if (coio_connect(coio, uri, &applier->addr, &applier->addr_len) != 0) + diag_raise(); assert(coio->fd >= 0); - coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE); + if (coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); /* Decode instance version and name from greeting */ @@ -345,8 +348,9 @@ applier_connect(struct applier *applier) * election on bootstrap. */ xrow_encode_vote(&row); - coio_write_xrow(coio, &row); - coio_read_xrow(coio, ibuf, &row); + if (coio_write_xrow(coio, &row) < 0 || + coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); if (row.type == IPROTO_OK) { xrow_decode_ballot_xc(&row, &applier->ballot); } else try { @@ -374,8 +378,9 @@ applier_connect(struct applier *applier) applier_set_state(applier, APPLIER_AUTH); xrow_encode_auth_xc(&row, greeting.salt, greeting.salt_len, uri->login, uri->login_len, uri->password, uri->password_len); - coio_write_xrow(coio, &row); - coio_read_xrow(coio, ibuf, &row); + if (coio_write_xrow(coio, &row) < 0 || + coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); if (row.type != IPROTO_OK) xrow_decode_error_xc(&row); /* auth failed */ @@ -397,7 +402,8 @@ applier_join(struct applier *applier) struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; xrow_encode_join_xc(&row, &INSTANCE_UUID); - coio_write_xrow(coio, &row); + if (coio_write_xrow(coio, &row) < 0) + diag_raise(); /** * Tarantool < 1.7.0: if JOIN is successful, there is no "OK" @@ -405,7 +411,8 @@ applier_join(struct applier *applier) */ if (applier->version_id >= version_id(1, 7, 0)) { /* Decode JOIN response */ - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); /* re-throw error */ } else if (row.type != IPROTO_OK) { @@ -428,7 +435,8 @@ applier_join(struct applier *applier) */ uint64_t row_count = 0; while (true) { - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { if (apply_initial_join_row(&row) != 0) @@ -470,7 +478,8 @@ applier_join(struct applier *applier) * Receive final data. */ while (true) { - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); @@ -529,10 +538,13 @@ applier_read_tx_row(struct applier *applier) * from the master for quite a while the connection is * broken - the master might just be idle. */ - if (applier->version_id < version_id(1, 7, 7)) - coio_read_xrow(coio, ibuf, row); - else - coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + if (applier->version_id < version_id(1, 7, 7)) { + if (coio_read_xrow(coio, ibuf, row) < 0) + diag_raise(); + } else { + if (coio_read_xrow_timeout(coio, ibuf, row, timeout) < 0) + diag_raise(); + } applier->lag = ev_now(loop()) - row->tm; applier->last_row_time = ev_monotonic_now(loop()); @@ -792,11 +804,13 @@ applier_subscribe(struct applier *applier) vclock_copy(&vclock, &replicaset.vclock); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &vclock); - coio_write_xrow(coio, &row); + if (coio_write_xrow(coio, &row) < 0) + diag_raise(); /* Read SUBSCRIBE response */ if (applier->version_id >= version_id(1, 6, 7)) { - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); /* error */ } else if (row.type != IPROTO_OK) { @@ -933,7 +947,7 @@ applier_disconnect(struct applier *applier, enum applier_state state) applier->writer = NULL; } - coio_close(loop(), &applier->io); + coio_destroy(loop(), &applier->io); /* Clear all unparsed input. */ ibuf_reinit(&applier->ibuf); fiber_gc(); diff --git a/src/box/box.cc b/src/box/box.cc index a53b6e912..6323e5e6e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1497,7 +1497,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) struct xrow_header row; xrow_encode_vclock_xc(&row, &stop_vclock); row.sync = header->sync; - coio_write_xrow(io, &row); + if (coio_write_xrow(io, &row) < 0) + diag_raise(); /* * Final stage: feed replica with WALs in range @@ -1509,7 +1510,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Send end of WAL stream marker */ xrow_encode_vclock_xc(&row, &replicaset.vclock); row.sync = header->sync; - coio_write_xrow(io, &row); + if (coio_write_xrow(io, &row) < 0) + diag_raise(); /* * Advance the WAL consumer state to the position where @@ -1591,7 +1593,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) assert(self != NULL); /* the local registration is read-only */ row.replica_id = self->id; row.sync = header->sync; - coio_write_xrow(io, &row); + if (coio_write_xrow(io, &row) < 0) + diag_raise(); say_info("subscribed replica %s at %s", tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); diff --git a/src/box/relay.cc b/src/box/relay.cc index 5c2b0067e..202620694 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -318,7 +318,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) struct xrow_header row; xrow_encode_vclock_xc(&row, vclock); row.sync = sync; - coio_write_xrow(&relay->io, &row); + if (coio_write_xrow(&relay->io, &row) < 0) + diag_raise(); /* Send read view to the replica. */ engine_join_xc(&ctx, &relay->stream); @@ -517,8 +518,9 @@ relay_reader_f(va_list ap) try { while (!fiber_is_cancelled()) { struct xrow_header xrow; - coio_read_xrow_timeout_xc(&io, &ibuf, &xrow, - replication_disconnect_timeout()); + if (coio_read_xrow_timeout(&io, &ibuf, &xrow, + replication_disconnect_timeout()) < 0) + diag_raise(); /* vclock is followed while decoding, zeroing it. */ vclock_create(&relay->recv_vclock); xrow_decode_vclock_xc(&xrow, &relay->recv_vclock); @@ -716,7 +718,8 @@ relay_send(struct relay *relay, struct xrow_header *packet) packet->sync = relay->sync; relay->last_row_time = ev_monotonic_now(loop()); - coio_write_xrow(&relay->io, packet); + if (coio_write_xrow(&relay->io, packet) < 0) + diag_raise(); fiber_gc(); struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc index 48707982b..f432c6b49 100644 --- a/src/box/xrow_io.cc +++ b/src/box/xrow_io.cc @@ -35,71 +35,74 @@ #include "error.h" #include "msgpuck/msgpuck.h" -void +ssize_t coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row) { /* Read fixed header */ - if (ibuf_used(in) < 1) - coio_breadn(coio, in, 1); + if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0) + return -1; /* Read length */ if (mp_typeof(*in->rpos) != MP_UINT) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "packet length"); + diag_set(ClientError, ER_INVALID_MSGPACK, + "packet length"); + return -1; } ssize_t to_read = mp_check_uint(in->rpos, in->wpos); - if (to_read > 0) - coio_breadn(coio, in, to_read); + if (to_read > 0 && coio_breadn(coio, in, to_read) < 0) + return -1; uint32_t len = mp_decode_uint((const char **) &in->rpos); /* Read header and body */ to_read = len - ibuf_used(in); - if (to_read > 0) - coio_breadn(coio, in, to_read); + if (to_read > 0 && coio_breadn(coio, in, to_read) < 0) + return -1; - xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, - true); + return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len, + true); } -void -coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, - struct xrow_header *row, ev_tstamp timeout) +ssize_t +coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in, + struct xrow_header *row, ev_tstamp timeout) { ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); /* Read fixed header */ - if (ibuf_used(in) < 1) - coio_breadn_timeout(coio, in, 1, delay); + if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0) + return -1; coio_timeout_update(&start, &delay); /* Read length */ if (mp_typeof(*in->rpos) != MP_UINT) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "packet length"); + diag_set(ClientError, ER_INVALID_MSGPACK, + "packet length"); + return -1; } ssize_t to_read = mp_check_uint(in->rpos, in->wpos); - if (to_read > 0) - coio_breadn_timeout(coio, in, to_read, delay); + if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0) + return -1; coio_timeout_update(&start, &delay); uint32_t len = mp_decode_uint((const char **) &in->rpos); /* Read header and body */ to_read = len - ibuf_used(in); - if (to_read > 0) - coio_breadn_timeout(coio, in, to_read, delay); + if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0) + return -1; - xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, - true); + return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len, + true); } - -void +ssize_t coio_write_xrow(struct ev_io *coio, const struct xrow_header *row) { struct iovec iov[XROW_IOVMAX]; - int iovcnt = xrow_to_iovec_xc(row, iov); - coio_writev(coio, iov, iovcnt, 0); + int iovcnt = xrow_to_iovec(row, iov); + if (iovcnt < 0) + return -1; + return coio_writev(coio, iov, iovcnt, 0); } diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h index 0eb7a8ace..365a70db7 100644 --- a/src/box/xrow_io.h +++ b/src/box/xrow_io.h @@ -30,6 +30,7 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include #if defined(__cplusplus) extern "C" { #endif @@ -38,14 +39,14 @@ struct ev_io; struct ibuf; struct xrow_header; -void +ssize_t coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row); -void -coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, - struct xrow_header *row, double timeout); +ssize_t +coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in, + struct xrow_header *row, double timeout); -void +ssize_t coio_write_xrow(struct ev_io *coio, const struct xrow_header *row); diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc index e88d724d5..96a529c2c 100644 --- a/src/lib/core/coio.cc +++ b/src/lib/core/coio.cc @@ -41,12 +41,6 @@ #include "scoped_guard.h" #include "coio_task.h" /* coio_resolve() */ -struct CoioGuard { - struct ev_io *ev_io; - CoioGuard(struct ev_io *arg) :ev_io(arg) {} - ~CoioGuard() { ev_io_stop(loop(), ev_io); } -}; - typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int); /** Note: this function does not throw */ @@ -65,12 +59,14 @@ coio_fiber_yield_timeout(struct ev_io *coio, ev_tstamp delay) coio->data = fiber(); bool is_timedout = fiber_yield_timeout(delay); coio->data = NULL; + if (is_timedout) + diag_set(TimedOut); return is_timedout; } /** * Connect to a host with a specified timeout. - * @retval -1 timeout + * @retval -1 error or timeout * @retval 0 connected */ static int @@ -79,36 +75,43 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr, { ev_loop *loop = loop(); if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0) - diag_raise(); - auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); }); - if (sio_connect(coio->fd, addr, len) == 0) { - coio_guard.is_active = false; + return -1; + if (sio_connect(coio->fd, addr, len) == 0) return 0; - } if (errno != EINPROGRESS) - diag_raise(); + goto close; /* * Wait until socket is ready for writing or * timed out. */ ev_io_set(coio, coio->fd, EV_WRITE); ev_io_start(loop, coio); - bool is_timedout = coio_fiber_yield_timeout(coio, timeout); + bool is_timedout; + is_timedout = coio_fiber_yield_timeout(coio, timeout); ev_io_stop(loop, coio); - fiber_testcancel(); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + goto close; + } if (is_timedout) - tnt_raise(TimedOut); - int error = EINPROGRESS; - socklen_t sz = sizeof(error); + goto close; + int error; + socklen_t sz; + error = EINPROGRESS; + sz = sizeof(error); if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR, &error, &sz)) - diag_raise(); + goto close; if (error != 0) { errno = error; - tnt_raise(SocketError, sio_socketname(coio->fd), "connect"); + diag_set(SocketError, sio_socketname(coio->fd), "connect"); + goto close; } - coio_guard.is_active = false; return 0; + +close: + evio_close(loop, coio); + return -1; } void @@ -152,7 +155,7 @@ coio_fill_addrinfo(struct addrinfo *ai_local, const char *host, * This function also supports UNIX domain sockets if uri->path is not NULL and * uri->service is NULL. * - * @retval -1 timeout + * @retval -1 error or timeout * @retval 0 connected */ int @@ -201,41 +204,37 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE; hints.ai_protocol = 0; int rc = coio_getaddrinfo(host, service, &hints, &ai, delay); - if (rc != 0) { - diag_raise(); - panic("unspecified getaddrinfo error"); - } + if (rc != 0) + return -1; } - auto addrinfo_guard = make_scoped_guard([=] { - if (!uri->host_hint) freeaddrinfo(ai); - else free(ai_local.ai_addr); - }); + struct addrinfo *first_ai = ai; evio_timeout_update(loop(), &start, &delay); coio_timeout_init(&start, &delay, timeout); assert(! evio_has_fd(coio)); - while (ai) { - try { - if (coio_connect_addr(coio, ai->ai_addr, - ai->ai_addrlen, delay)) - return -1; + while (ai && delay >= 0) { + if (coio_connect_addr(coio, ai->ai_addr, + ai->ai_addrlen, delay) == 0) { if (addr != NULL) { assert(addr_len != NULL); *addr_len = MIN(ai->ai_addrlen, *addr_len); memcpy(addr, ai->ai_addr, *addr_len); } return 0; /* connected */ - } catch (SocketError *e) { - if (ai->ai_next == NULL) - throw; - /* ignore exception and try the next address */ } - ai = ai->ai_next; ev_now_update(loop); coio_timeout_update(&start, &delay); + ai = ai->ai_next; } - tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed"); + /* Set an error if not timedout. */ + if (delay >= 0) + diag_set(SocketError, sio_socketname(coio->fd), "connection failed"); + if (!uri->host_hint) + freeaddrinfo(first_ai); + else + free(ai_local.ai_addr); + return -1; } /** @@ -249,8 +248,6 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - CoioGuard coio_guard(coio); - while (true) { /* Assume that there are waiting clients * available */ @@ -259,12 +256,12 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr, if (evio_setsockopt_client(fd, addr->sa_family, SOCK_STREAM) != 0) { close(fd); - diag_raise(); + return -1; } return fd; } if (! sio_wouldblock(errno)) - diag_raise(); + return -1; /* The socket is not ready, yield */ if (! ev_is_active(coio)) { ev_io_set(coio, coio->fd, EV_READ); @@ -275,11 +272,16 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr, * timeout is reached. */ bool is_timedout = coio_fiber_yield_timeout(coio, delay); - fiber_testcancel(); + ev_io_stop(loop(), coio); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); } + return -1; } /** @@ -302,8 +304,6 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, ssize_t to_read = (ssize_t) sz; - CoioGuard coio_guard(coio); - while (true) { /* * Sic: assume the socket is ready: since @@ -320,9 +320,8 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, } else if (nrd == 0) { errno = 0; return sz - to_read; - } else if (! sio_wouldblock(errno)) { - diag_raise(); - } + } else if (! sio_wouldblock(errno)) + return -1; /* The socket is not ready, yield */ if (! ev_is_active(coio)) { @@ -333,19 +332,23 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, * Yield control to other fibers until the * timeout is being reached. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + ev_io_stop(loop(), coio); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); } + return -1; } /** * Read at least sz bytes, with readahead. * - * Treats EOF as an error, and throws an exception. + * Treats EOF as an error. * * @retval the number of bytes read, > 0. */ @@ -355,8 +358,9 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz) ssize_t nrd = coio_read_ahead(coio, buf, sz, bufsiz); if (nrd < (ssize_t)sz) { errno = EPIPE; - tnt_raise(SocketError, sio_socketname(coio->fd), - "unexpected EOF when reading from socket"); + diag_set(SocketError, sio_socketname(coio->fd), + "unexpected EOF when reading from socket"); + return -1; } return nrd; } @@ -364,7 +368,7 @@ coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz) /** * Read at least sz bytes, with readahead and timeout. * - * Treats EOF as an error, and throws an exception. + * Treats EOF as an error. * * @retval the number of bytes read, > 0. */ @@ -375,8 +379,9 @@ coio_readn_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz ssize_t nrd = coio_read_ahead_timeout(coio, buf, sz, bufsiz, timeout); if (nrd < (ssize_t)sz && errno == 0) { /* EOF. */ errno = EPIPE; - tnt_raise(SocketError, sio_socketname(coio->fd), - "unexpected EOF when reading from socket"); + diag_set(SocketError, sio_socketname(coio->fd), + "unexpected EOF when reading from socket"); + return -1; } return nrd; } @@ -399,8 +404,6 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - CoioGuard coio_guard(coio); - while (true) { /* * Sic: write as much data as possible, @@ -413,28 +416,28 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz, return sz; towrite -= nwr; buf = (char *) buf + nwr; - } else if (nwr < 0 && !sio_wouldblock(errno)) { - diag_raise(); - } + } else if (nwr < 0 && !sio_wouldblock(errno)) + return -1; if (! ev_is_active(coio)) { ev_io_set(coio, coio->fd, EV_WRITE); ev_io_start(loop(), coio); } - /* Yield control to other fibers. */ - fiber_testcancel(); /* * Yield control to other fibers until the * timeout is reached or the socket is * ready. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); - + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + ev_io_stop(loop(), coio); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); } + return -1; } /* @@ -446,9 +449,11 @@ coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt) { sio_add_to_iov(iov, -offset); ssize_t nwr = sio_writev(fd, iov, iovcnt); + if (nwr < 0 && !sio_wouldblock(errno)) + return -1; sio_add_to_iov(iov, offset); - if (nwr < 0 && ! sio_wouldblock(errno)) - diag_raise(); + if (nwr < 0) + return 0; return nwr; } @@ -461,14 +466,15 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt, struct iovec *end = iov + iovcnt; ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - CoioGuard coio_guard(coio); /* Avoid a syscall in case of 0 iovcnt. */ while (iov < end) { /* Write as much data as possible. */ ssize_t nwr = coio_flush(coio->fd, iov, iov_len, end - iov); - if (nwr >= 0) { + if (nwr < 0) + return -1; + if (nwr > 0) { total += nwr; /* * If there was a hint for the total size @@ -487,18 +493,19 @@ coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt, ev_io_set(coio, coio->fd, EV_WRITE); ev_io_start(loop(), coio); } - /* Yield control to other fibers. */ - fiber_testcancel(); /* * Yield control to other fibers until the * timeout is reached or the socket is * ready. */ bool is_timedout = coio_fiber_yield_timeout(coio, delay); - fiber_testcancel(); - + ev_io_stop(loop(), coio); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + return -1; + } if (is_timedout) - tnt_raise(TimedOut); + return -1; coio_timeout_update(&start, &delay); } return total; @@ -518,8 +525,6 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - CoioGuard coio_guard(coio); - while (true) { /* * Sic: write as much data as possible, @@ -530,7 +535,7 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags, if (nwr > 0) return nwr; if (nwr < 0 && ! sio_wouldblock(errno)) - diag_raise(); + return -1; if (! ev_is_active(coio)) { ev_io_set(coio, coio->fd, EV_WRITE); ev_io_start(loop(), coio); @@ -540,13 +545,17 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags, * timeout is reached or the socket is * ready. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + ev_io_stop(loop(), coio); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); } + return -1; } /** @@ -563,8 +572,6 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - CoioGuard coio_guard(coio); - while (true) { /* * Read as much data as possible, @@ -575,7 +582,7 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags, if (nrd >= 0) return nrd; if (! sio_wouldblock(errno)) - diag_raise(); + return -1; if (! ev_is_active(coio)) { ev_io_set(coio, coio->fd, EV_READ); ev_io_start(loop(), coio); @@ -585,13 +592,17 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags, * timeout is reached or the socket is * ready. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + ev_io_stop(loop(), coio); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); } + return -1; } static int @@ -638,12 +649,13 @@ coio_service_init(struct coio_service *service, const char *name, service->handler_param = handler_param; } -void +int coio_service_start(struct evio_service *service, const char *uri) { if (evio_service_bind(service, uri) != 0 || evio_service_listen(service) != 0) - diag_raise(); + return -1; + return -0; } void @@ -661,7 +673,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout) coio_timeout_init(&start, &delay, timeout); fiber_yield_timeout(delay); ev_stat_stop(loop(), stat); - fiber_testcancel(); } typedef void (*ev_child_cb)(ev_loop *, ev_child *, int); @@ -689,7 +700,6 @@ coio_waitpid(pid_t pid) fiber_set_cancellable(allow_cancel); ev_child_stop(loop(), &cw); int status = cw.rstatus; - fiber_testcancel(); return status; } diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h index 6a2337689..d557f2869 100644 --- a/src/lib/core/coio.h +++ b/src/lib/core/coio.h @@ -33,6 +33,9 @@ #include "fiber.h" #include "trivia/util.h" #if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + #include "evio.h" /** @@ -59,10 +62,6 @@ coio_connect(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, return coio_connect_timeout(coio, uri, addr, addr_len, TIMEOUT_INFINITY); } -void -coio_bind(struct ev_io *coio, struct sockaddr *addr, - socklen_t addrlen); - int coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen, ev_tstamp timeout); @@ -71,7 +70,7 @@ void coio_create(struct ev_io *coio, int fd); static inline void -coio_close(ev_loop *loop, struct ev_io *coio) +coio_destroy(ev_loop *loop, struct ev_io *coio) { return evio_close(loop, coio); } @@ -164,7 +163,7 @@ coio_service_init(struct coio_service *service, const char *name, fiber_func handler, void *handler_param); /** Wait until the service binds to the port. */ -void +int coio_service_start(struct evio_service *service, const char *uri); void @@ -185,8 +184,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay); int coio_waitpid(pid_t pid); -extern "C" { -#endif /* defined(__cplusplus) */ /** \cond public */ diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h index 1ad104985..3a83f8fe1 100644 --- a/src/lib/core/coio_buf.h +++ b/src/lib/core/coio_buf.h @@ -45,6 +45,8 @@ coio_bread(struct ev_io *coio, struct ibuf *buf, size_t sz) { ibuf_reserve_xc(buf, sz); ssize_t n = coio_read_ahead(coio, buf->wpos, sz, ibuf_unused(buf)); + if (n < 0) + return -1; buf->wpos += n; return n; } @@ -61,6 +63,8 @@ coio_bread_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz, ibuf_reserve_xc(buf, sz); ssize_t n = coio_read_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf), timeout); + if (n < 0) + return -1; buf->wpos += n; return n; } @@ -71,6 +75,8 @@ coio_breadn(struct ev_io *coio, struct ibuf *buf, size_t sz) { ibuf_reserve_xc(buf, sz); ssize_t n = coio_readn_ahead(coio, buf->wpos, sz, ibuf_unused(buf)); + if (n < 0) + return -1; buf->wpos += n; return n; } @@ -87,6 +93,8 @@ coio_breadn_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz, ibuf_reserve_xc(buf, sz); ssize_t n = coio_readn_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf), timeout); + if (n < 0) + return -1; buf->wpos += n; return n; } -- 2.24.0