From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A0B8E4696C3 for ; Wed, 12 Feb 2020 12:39:23 +0300 (MSK) From: Georgy Kirichenko Date: Wed, 12 Feb 2020 12:39:13 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 04/11] coio: do not throw an error, minor refactoring List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Relaying from C-written wal requires coio and xrow_io to be a C-compliant. So get rid of exception from coio interface. Also this patch includes some minor refactoring (as code looks ugly without them): 1. Get rid of unused size_hint from coio_writev_timeout. 2. Handle partial read/write before yield loop. 3. Do not reset errno to 0 in case of reading EOF. Part of #980 --- src/box/applier.cc | 49 ++-- src/box/box.cc | 9 +- src/box/relay.cc | 11 +- src/box/xrow_io.cc | 59 ++--- src/box/xrow_io.h | 11 +- src/lib/core/coio.cc | 535 ++++++++++++++++++++++++---------------- src/lib/core/coio.h | 19 +- src/lib/core/coio_buf.h | 8 + 8 files changed, 413 insertions(+), 288 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index ae3d281a5..ad427707a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -173,8 +173,9 @@ applier_writer_f(va_list ap) continue; try { struct xrow_header xrow; - xrow_encode_vclock(&xrow, &replicaset.vclock); - coio_write_xrow(&io, &xrow); + if (xrow_encode_vclock(&xrow, &replicaset.vclock) != 0 || + coio_write_xrow(&io, &xrow) < 0) + diag_raise(); } catch (SocketError *e) { /* * There is no point trying to send ACKs if @@ -308,9 +309,11 @@ applier_connect(struct applier *applier) */ applier->addr_len = sizeof(applier->addrstorage); applier_set_state(applier, APPLIER_CONNECT); - coio_connect(coio, uri, &applier->addr, &applier->addr_len); + if (coio_connect(coio, uri, &applier->addr, &applier->addr_len) != 0) + diag_raise(); assert(coio->fd >= 0); - coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE); + if (coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); /* Decode instance version and name from greeting */ @@ -345,8 +348,9 @@ applier_connect(struct applier *applier) * election on bootstrap. */ xrow_encode_vote(&row); - coio_write_xrow(coio, &row); - coio_read_xrow(coio, ibuf, &row); + if (coio_write_xrow(coio, &row) < 0 || + coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); if (row.type == IPROTO_OK) { xrow_decode_ballot_xc(&row, &applier->ballot); } else try { @@ -376,8 +380,9 @@ applier_connect(struct applier *applier) uri->login_len, uri->password != NULL ? uri->password : "", uri->password_len); - coio_write_xrow(coio, &row); - coio_read_xrow(coio, ibuf, &row); + if (coio_write_xrow(coio, &row) < 0 || + coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); if (row.type != IPROTO_OK) xrow_decode_error_xc(&row); /* auth failed */ @@ -401,7 +406,8 @@ applier_wait_snapshot(struct applier *applier) */ if (applier->version_id >= version_id(1, 7, 0)) { /* Decode JOIN/FETCH_SNAPSHOT response */ - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); /* re-throw error */ } else if (row.type != IPROTO_OK) { @@ -422,7 +428,8 @@ applier_wait_snapshot(struct applier *applier) */ uint64_t row_count = 0; while (true) { - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { if (apply_snapshot_row(&row) != 0) @@ -488,7 +495,8 @@ applier_wait_register(struct applier *applier, uint64_t row_count) * Receive final data. */ while (true) { - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { vclock_follow_xrow(&replicaset.vclock, &row); @@ -605,10 +613,13 @@ applier_read_tx_row(struct applier *applier) * from the master for quite a while the connection is * broken - the master might just be idle. */ - if (applier->version_id < version_id(1, 7, 7)) - coio_read_xrow(coio, ibuf, row); - else - coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + if (applier->version_id < version_id(1, 7, 7)) { + if (coio_read_xrow(coio, ibuf, row) < 0) + diag_raise(); + } else { + if (coio_read_xrow_timeout(coio, ibuf, row, timeout) < 0) + diag_raise(); + } applier->lag = ev_now(loop()) - row->tm; applier->last_row_time = ev_monotonic_now(loop()); @@ -868,11 +879,13 @@ applier_subscribe(struct applier *applier) vclock_copy(&vclock, &replicaset.vclock); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &vclock, replication_anon); - coio_write_xrow(coio, &row); + if (coio_write_xrow(coio, &row) < 0) + diag_raise(); /* Read SUBSCRIBE response */ if (applier->version_id >= version_id(1, 6, 7)) { - coio_read_xrow(coio, ibuf, &row); + if (coio_read_xrow(coio, ibuf, &row) < 0) + diag_raise(); if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); /* error */ } else if (row.type != IPROTO_OK) { @@ -1009,7 +1022,7 @@ applier_disconnect(struct applier *applier, enum applier_state state) applier->writer = NULL; } - coio_close(loop(), &applier->io); + coio_destroy(loop(), &applier->io); /* Clear all unparsed input. */ ibuf_reinit(&applier->ibuf); fiber_gc(); diff --git a/src/box/box.cc b/src/box/box.cc index 611100b8b..ca1696383 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1741,7 +1741,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) struct xrow_header row; xrow_encode_vclock_xc(&row, &stop_vclock); row.sync = header->sync; - coio_write_xrow(io, &row); + if (coio_write_xrow(io, &row) < 0) + diag_raise(); /* * Final stage: feed replica with WALs in range @@ -1753,7 +1754,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Send end of WAL stream marker */ xrow_encode_vclock_xc(&row, &replicaset.vclock); row.sync = header->sync; - coio_write_xrow(io, &row); + if (coio_write_xrow(io, &row) < 0) + diag_raise(); /* * Advance the WAL consumer state to the position where @@ -1845,7 +1847,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) assert(self != NULL); /* the local registration is read-only */ row.replica_id = self->id; row.sync = header->sync; - coio_write_xrow(io, &row); + if (coio_write_xrow(io, &row) < 0) + diag_raise(); say_info("subscribed replica %s at %s", tt_uuid_str(&replica_uuid), sio_socketname(io->fd)); diff --git a/src/box/relay.cc b/src/box/relay.cc index d5a1c9c68..bb7761b99 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -317,7 +317,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) struct xrow_header row; xrow_encode_vclock_xc(&row, vclock); row.sync = sync; - coio_write_xrow(&relay->io, &row); + if (coio_write_xrow(&relay->io, &row) < 0) + diag_raise(); /* Send read view to the replica. */ engine_join_xc(&ctx, &relay->stream); @@ -516,8 +517,9 @@ relay_reader_f(va_list ap) try { while (!fiber_is_cancelled()) { struct xrow_header xrow; - coio_read_xrow_timeout_xc(&io, &ibuf, &xrow, - replication_disconnect_timeout()); + if (coio_read_xrow_timeout(&io, &ibuf, &xrow, + replication_disconnect_timeout()) < 0) + diag_raise(); /* vclock is followed while decoding, zeroing it. */ vclock_create(&relay->recv_vclock); xrow_decode_vclock_xc(&xrow, &relay->recv_vclock); @@ -721,7 +723,8 @@ relay_send(struct relay *relay, struct xrow_header *packet) packet->sync = relay->sync; relay->last_row_time = ev_monotonic_now(loop()); - coio_write_xrow(&relay->io, packet); + if (coio_write_xrow(&relay->io, packet) < 0) + diag_raise(); fiber_gc(); struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc index 48707982b..4e79cd2f0 100644 --- a/src/box/xrow_io.cc +++ b/src/box/xrow_io.cc @@ -35,71 +35,74 @@ #include "error.h" #include "msgpuck/msgpuck.h" -void +ssize_t coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row) { /* Read fixed header */ - if (ibuf_used(in) < 1) - coio_breadn(coio, in, 1); + if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0) + return -1; /* Read length */ if (mp_typeof(*in->rpos) != MP_UINT) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "packet length"); + diag_set(ClientError, ER_INVALID_MSGPACK, + "packet length"); + return -1; } ssize_t to_read = mp_check_uint(in->rpos, in->wpos); - if (to_read > 0) - coio_breadn(coio, in, to_read); + if (to_read > 0 && coio_breadn(coio, in, to_read) < 0) + return -1; uint32_t len = mp_decode_uint((const char **) &in->rpos); /* Read header and body */ to_read = len - ibuf_used(in); - if (to_read > 0) - coio_breadn(coio, in, to_read); + if (to_read > 0 && coio_breadn(coio, in, to_read) < 0) + return -1; - xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, - true); + return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len, + true); } -void -coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, - struct xrow_header *row, ev_tstamp timeout) +ssize_t +coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in, + struct xrow_header *row, ev_tstamp timeout) { ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); /* Read fixed header */ - if (ibuf_used(in) < 1) - coio_breadn_timeout(coio, in, 1, delay); + if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0) + return -1; coio_timeout_update(&start, &delay); /* Read length */ if (mp_typeof(*in->rpos) != MP_UINT) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "packet length"); + diag_set(ClientError, ER_INVALID_MSGPACK, + "packet length"); + return -1; } ssize_t to_read = mp_check_uint(in->rpos, in->wpos); - if (to_read > 0) - coio_breadn_timeout(coio, in, to_read, delay); + if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0) + return -1; coio_timeout_update(&start, &delay); uint32_t len = mp_decode_uint((const char **) &in->rpos); /* Read header and body */ to_read = len - ibuf_used(in); - if (to_read > 0) - coio_breadn_timeout(coio, in, to_read, delay); + if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0) + return -1; - xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len, - true); + return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len, + true); } - -void +ssize_t coio_write_xrow(struct ev_io *coio, const struct xrow_header *row) { struct iovec iov[XROW_IOVMAX]; - int iovcnt = xrow_to_iovec_xc(row, iov); - coio_writev(coio, iov, iovcnt, 0); + int iovcnt = xrow_to_iovec(row, iov); + if (iovcnt < 0) + return -1; + return coio_writev(coio, iov, iovcnt); } diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h index 0eb7a8ace..96c5047b7 100644 --- a/src/box/xrow_io.h +++ b/src/box/xrow_io.h @@ -30,6 +30,7 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "unistd.h" #if defined(__cplusplus) extern "C" { #endif @@ -38,14 +39,14 @@ struct ev_io; struct ibuf; struct xrow_header; -void +ssize_t coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row); -void -coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in, - struct xrow_header *row, double timeout); +ssize_t +coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in, + struct xrow_header *row, double timeout); -void +ssize_t coio_write_xrow(struct ev_io *coio, const struct xrow_header *row); diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc index faa7e5bd5..8ae6930de 100644 --- a/src/lib/core/coio.cc +++ b/src/lib/core/coio.cc @@ -41,12 +41,6 @@ #include "scoped_guard.h" #include "coio_task.h" /* coio_resolve() */ -struct CoioGuard { - struct ev_io *ev_io; - CoioGuard(struct ev_io *arg) :ev_io(arg) {} - ~CoioGuard() { ev_io_stop(loop(), ev_io); } -}; - typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int); /** Note: this function does not throw */ @@ -65,12 +59,14 @@ coio_fiber_yield_timeout(struct ev_io *coio, ev_tstamp delay) coio->data = fiber(); bool is_timedout = fiber_yield_timeout(delay); coio->data = NULL; + if (is_timedout) + diag_set(TimedOut); return is_timedout; } /** * Connect to a host with a specified timeout. - * @retval -1 timeout + * @retval -1 error or timeout * @retval 0 connected */ static int @@ -79,36 +75,46 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr, { ev_loop *loop = loop(); if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0) - diag_raise(); - auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); }); - if (sio_connect(coio->fd, addr, len) == 0) { - coio_guard.is_active = false; + return -1; + if (sio_connect(coio->fd, addr, len) == 0) return 0; + if (errno != EINPROGRESS) { + evio_close(loop, coio); + return -1; } - if (errno != EINPROGRESS) - diag_raise(); /* * Wait until socket is ready for writing or * timed out. */ ev_io_set(coio, coio->fd, EV_WRITE); ev_io_start(loop, coio); - bool is_timedout = coio_fiber_yield_timeout(coio, timeout); + bool is_timedout; + is_timedout = coio_fiber_yield_timeout(coio, timeout); ev_io_stop(loop, coio); - fiber_testcancel(); - if (is_timedout) - tnt_raise(TimedOut); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + evio_close(loop, coio); + return -1; + } + if (is_timedout) { + evio_close(loop, coio); + return -1; + } int error = EINPROGRESS; socklen_t sz = sizeof(error); if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR, - &error, &sz)) - diag_raise(); + &error, &sz)) { + evio_close(loop, coio); + return -1; + } if (error != 0) { errno = error; - tnt_raise(SocketError, sio_socketname(coio->fd), "connect"); + diag_set(SocketError, sio_socketname(coio->fd), "connect"); + evio_close(loop, coio); + return -1; } - coio_guard.is_active = false; return 0; + } void @@ -152,7 +158,7 @@ coio_fill_addrinfo(struct addrinfo *ai_local, const char *host, * This function also supports UNIX domain sockets if uri->path is not NULL and * uri->service is NULL. * - * @retval -1 timeout + * @retval -1 error or timeout * @retval 0 connected */ int @@ -201,52 +207,55 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE; hints.ai_protocol = 0; int rc = coio_getaddrinfo(host, service, &hints, &ai, delay); - if (rc != 0) { - diag_raise(); - panic("unspecified getaddrinfo error"); - } + if (rc != 0) + return -1; } - auto addrinfo_guard = make_scoped_guard([=] { - if (!uri->host_hint) freeaddrinfo(ai); - else free(ai_local.ai_addr); - }); + struct addrinfo *first_ai = ai; evio_timeout_update(loop(), &start, &delay); coio_timeout_init(&start, &delay, timeout); assert(! evio_has_fd(coio)); - while (ai) { - try { - if (coio_connect_addr(coio, ai->ai_addr, - ai->ai_addrlen, delay)) - return -1; + while (ai && delay >= 0) { + if (coio_connect_addr(coio, ai->ai_addr, + ai->ai_addrlen, delay) == 0) { if (addr != NULL) { assert(addr_len != NULL); *addr_len = MIN(ai->ai_addrlen, *addr_len); memcpy(addr, ai->ai_addr, *addr_len); } + if (!uri->host_hint) + freeaddrinfo(first_ai); + else + free(ai_local.ai_addr); return 0; /* connected */ - } catch (SocketError *e) { - if (ai->ai_next == NULL) - throw; - /* ignore exception and try the next address */ } ai = ai->ai_next; ev_now_update(loop); coio_timeout_update(&start, &delay); } - tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed"); + /* Set an error if not timedout. */ + if (delay >= 0) + diag_set(SocketError, sio_socketname(coio->fd), + "connection failed"); + if (!uri->host_hint) + freeaddrinfo(first_ai); + else + free(ai_local.ai_addr); + return -1; } /* Do not allow to reuse coio by different fiber. */ -static inline void +static inline int check_coio_in_use(struct ev_io *coio) { if (ev_is_active(coio)) { errno = EINPROGRESS; - tnt_raise(SocketError, sio_socketname(coio->fd), - "already in use"); + diag_set(SocketError, sio_socketname(coio->fd), + "already in use"); + return -1; } + return 0; } /** @@ -260,45 +269,61 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - check_coio_in_use(coio); - CoioGuard coio_guard(coio); - - while (true) { - /* Assume that there are waiting clients - * available */ - int fd = sio_accept(coio->fd, addr, &addrlen); - if (fd >= 0) { - if (evio_setsockopt_client(fd, addr->sa_family, - SOCK_STREAM) != 0) { - close(fd); - diag_raise(); - } - return fd; - } - if (! sio_wouldblock(errno)) - diag_raise(); - /* The socket is not ready, yield */ - if (! ev_is_active(coio)) { - ev_io_set(coio, coio->fd, EV_READ); - ev_io_start(loop(), coio); + if (check_coio_in_use(coio) != 0) + return -1; + + /* Assume that there are waiting clients available */ + int fd = sio_accept(coio->fd, addr, &addrlen); + + if (fd >= 0) { + if (evio_setsockopt_client(fd, addr->sa_family, + SOCK_STREAM) != 0) { + close(fd); + return -1; } + return fd; + } + + if (!sio_wouldblock(errno)) + return -1; + + /* The socket is not ready, yield */ + ev_io_set(coio, coio->fd, EV_READ); + ev_io_start(loop(), coio); + + do { /* * Yield control to other fibers until the * timeout is reached. */ bool is_timedout = coio_fiber_yield_timeout(coio, delay); - fiber_testcancel(); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); + fd = sio_accept(coio->fd, addr, &addrlen); + } while (fd < 0 && sio_wouldblock(errno)); + + ev_io_stop(loop(), coio); + + if (fd >= 0) { + if (evio_setsockopt_client(fd, addr->sa_family, + SOCK_STREAM) != 0) { + close(fd); + return -1; + } + return fd; } + return -1; } /** * Read at least sz bytes from socket with readahead. * - * In case of EOF returns the amount read until eof (possibly 0), - * and sets errno to 0. + * In case of EOF returns the amount read until eof (possibly 0). * Can read up to bufsiz bytes. * * @retval the number of bytes read. @@ -313,46 +338,68 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, coio_timeout_init(&start, &delay, timeout); ssize_t to_read = (ssize_t) sz; + if (to_read <= 0) + return 0; - check_coio_in_use(coio); - CoioGuard coio_guard(coio); + if (check_coio_in_use(coio) != 0) + return -1; - while (true) { - /* - * Sic: assume the socket is ready: since - * the user called read(), some data must - * be expected. - */ - ssize_t nrd = sio_read(coio->fd, buf, bufsiz); - if (nrd > 0) { - to_read -= nrd; - if (to_read <= 0) - return sz - to_read; - buf = (char *) buf + nrd; - bufsiz -= nrd; - } else if (nrd == 0) { - errno = 0; - return sz - to_read; - } else if (! sio_wouldblock(errno)) { - diag_raise(); - } + ssize_t nrd; + /* + * Sic: assume the socket is ready: since + * the user called read(), some data must + * be expected. + */ + do { + nrd = sio_read(coio->fd, buf, bufsiz); + if (nrd <= 0) + break; + to_read -= nrd; + buf = (char *) buf + nrd; + bufsiz -= nrd; + } while (to_read > 0); + + if (nrd >= 0) + return sz - to_read; + + if (!sio_wouldblock(errno)) + return -1; - /* The socket is not ready, yield */ - if (! ev_is_active(coio)) { - ev_io_set(coio, coio->fd, EV_READ); - ev_io_start(loop(), coio); - } + /* The socket is not ready, yield */ + ev_io_set(coio, coio->fd, EV_READ); + ev_io_start(loop(), coio); + + do { /* * Yield control to other fibers until the * timeout is being reached. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); - } + nrd = sio_read(coio->fd, buf, bufsiz); + if (nrd == 0) + break; + if (nrd < 0) { + if (sio_wouldblock(errno)) + continue; + break; + } + to_read -= nrd; + buf = (char *) buf + nrd; + bufsiz -= nrd; + } while (to_read > 0); + + ev_io_stop(loop(), coio); + + if (nrd < 0) + return -1; + return sz - to_read; } /** @@ -366,10 +413,13 @@ ssize_t coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz) { ssize_t nrd = coio_read_ahead(coio, buf, sz, bufsiz); + if (nrd < 0) + return -1; if (nrd < (ssize_t)sz) { errno = EPIPE; - tnt_raise(SocketError, sio_socketname(coio->fd), - "unexpected EOF when reading from socket"); + diag_set(SocketError, sio_socketname(coio->fd), + "unexpected EOF when reading from socket"); + return -1; } return nrd; } @@ -386,10 +436,13 @@ coio_readn_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz ev_tstamp timeout) { ssize_t nrd = coio_read_ahead_timeout(coio, buf, sz, bufsiz, timeout); - if (nrd < (ssize_t)sz && errno == 0) { /* EOF. */ + if (nrd < 0) + return -1; + if (nrd < (ssize_t)sz) { /* EOF. */ errno = EPIPE; - tnt_raise(SocketError, sio_socketname(coio->fd), - "unexpected EOF when reading from socket"); + diag_set(SocketError, sio_socketname(coio->fd), + "unexpected EOF when reading from socket"); + return -1; } return nrd; } @@ -412,43 +465,62 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - check_coio_in_use(coio); - CoioGuard coio_guard(coio); + if (towrite <= 0) + return 0; + if (check_coio_in_use(coio) != 0) + return -1; + + ssize_t nwr; + /* + * Sic: write as much data as possible, + * assuming the socket is ready. + */ + do { + nwr = sio_write(coio->fd, buf, towrite); + if (nwr < 0) + break; + towrite -= nwr; + buf = (char *) buf + nwr; + } while (towrite > 0); + + if (nwr > 0) + return sz; + + if (!sio_wouldblock(errno)) + return -1; - while (true) { - /* - * Sic: write as much data as possible, - * assuming the socket is ready. - */ - ssize_t nwr = sio_write(coio->fd, buf, towrite); - if (nwr > 0) { - /* Go past the data just written. */ - if (nwr >= towrite) - return sz; - towrite -= nwr; - buf = (char *) buf + nwr; - } else if (nwr < 0 && !sio_wouldblock(errno)) { - diag_raise(); - } - if (! ev_is_active(coio)) { - ev_io_set(coio, coio->fd, EV_WRITE); - ev_io_start(loop(), coio); - } - /* Yield control to other fibers. */ - fiber_testcancel(); + ev_io_set(coio, coio->fd, EV_WRITE); + ev_io_start(loop(), coio); + + do { /* * Yield control to other fibers until the * timeout is reached or the socket is * ready. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); - + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); - } + nwr = sio_write(coio->fd, buf, towrite); + if (nwr < 0) { + if (sio_wouldblock(errno)) + continue; + break; + } + towrite -= nwr; + buf = (char *) buf + nwr; + } while (towrite > 0); + + ev_io_stop(loop(), coio); + + if (nwr < 0) + return -1; + return sz; } /* @@ -456,66 +528,83 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz, * Put in an own function to workaround gcc bug with @finally */ static inline ssize_t -coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt) +coio_flush(int fd, struct iovec **iov, size_t *offset, int iovcnt) { - sio_add_to_iov(iov, -offset); - ssize_t nwr = sio_writev(fd, iov, iovcnt); - sio_add_to_iov(iov, offset); - if (nwr < 0 && ! sio_wouldblock(errno)) - diag_raise(); + sio_add_to_iov(*iov, -*offset); + ssize_t nwr = sio_writev(fd, *iov, iovcnt); + sio_add_to_iov(*iov, *offset); + if (nwr < 0 && !sio_wouldblock(errno)) + return -1; + if (nwr < 0) + return 0; + /* Successful write adjust iov and offset. */ + *iov += sio_move_iov(*iov, nwr, offset); return nwr; } ssize_t coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt, - size_t size_hint, ev_tstamp timeout) + ev_tstamp timeout) { size_t total = 0; size_t iov_len = 0; struct iovec *end = iov + iovcnt; ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - check_coio_in_use(coio); - CoioGuard coio_guard(coio); + if (iovcnt == 0) + return 0; + + if (check_coio_in_use(coio) != 0) + return -1; + + ssize_t nwr; /* Avoid a syscall in case of 0 iovcnt. */ - while (iov < end) { - /* Write as much data as possible. */ - ssize_t nwr = coio_flush(coio->fd, iov, iov_len, - end - iov); - if (nwr >= 0) { - total += nwr; - /* - * If there was a hint for the total size - * of the vector, use it. - */ - if (size_hint > 0 && size_hint == total) - break; - - iov += sio_move_iov(iov, nwr, &iov_len); - if (iov == end) { - assert(iov_len == 0); - break; - } - } - if (! ev_is_active(coio)) { - ev_io_set(coio, coio->fd, EV_WRITE); - ev_io_start(loop(), coio); - } - /* Yield control to other fibers. */ - fiber_testcancel(); + do { + nwr = coio_flush(coio->fd, &iov, &iov_len, end - iov); + if (nwr < 0) + break; + total += nwr; + + } while (iov < end); + + assert(nwr < 0 || iov_len == 0); + if (nwr > 0) + return total; + + if (!sio_wouldblock(errno)) + return -1; + + ev_io_set(coio, coio->fd, EV_WRITE); + ev_io_start(loop(), coio); + do { /* * Yield control to other fibers until the * timeout is reached or the socket is * ready. */ bool is_timedout = coio_fiber_yield_timeout(coio, delay); - fiber_testcancel(); - + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); - } + nwr = coio_flush(coio->fd, &iov, &iov_len, end - iov); + if (nwr < 0) { + if (sio_wouldblock(errno)) + continue; + break; + } + total += nwr; + } while (iov < end); + + ev_io_stop(loop(), coio); + + if (nwr < 0) + return -1; + assert(iov_len == 0); return total; } @@ -533,36 +622,40 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - check_coio_in_use(coio); - CoioGuard coio_guard(coio); + if (check_coio_in_use(coio) != 0) + return -1; - while (true) { - /* - * Sic: write as much data as possible, - * assuming the socket is ready. - */ - ssize_t nwr = sio_sendto(coio->fd, buf, sz, - flags, dest_addr, addrlen); - if (nwr > 0) - return nwr; - if (nwr < 0 && ! sio_wouldblock(errno)) - diag_raise(); - if (! ev_is_active(coio)) { - ev_io_set(coio, coio->fd, EV_WRITE); - ev_io_start(loop(), coio); - } + /* + * Sic: write as much data as possible, + * assuming the socket is ready. + */ + ssize_t nwr = sio_sendto(coio->fd, buf, sz, flags, dest_addr, addrlen); + if (nwr > 0 || !sio_wouldblock(errno)) + return nwr; + + ev_io_set(coio, coio->fd, EV_WRITE); + ev_io_start(loop(), coio); + do { /* * Yield control to other fibers until * timeout is reached or the socket is * ready. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); - } + nwr = sio_sendto(coio->fd, buf, sz, flags, dest_addr, + addrlen); + } while (nwr < 0 && sio_wouldblock(errno)); + + ev_io_stop(loop(), coio); + + return nwr; } /** @@ -579,36 +672,41 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags, ev_tstamp start, delay; coio_timeout_init(&start, &delay, timeout); - check_coio_in_use(coio); - CoioGuard coio_guard(coio); + if (check_coio_in_use(coio) != 0) + return -1; - while (true) { - /* - * Read as much data as possible, - * assuming the socket is ready. - */ - ssize_t nrd = sio_recvfrom(coio->fd, buf, sz, flags, - src_addr, &addrlen); - if (nrd >= 0) - return nrd; - if (! sio_wouldblock(errno)) - diag_raise(); - if (! ev_is_active(coio)) { - ev_io_set(coio, coio->fd, EV_READ); - ev_io_start(loop(), coio); - } + /* + * Read as much data as possible, + * assuming the socket is ready. + */ + ssize_t nrd = sio_recvfrom(coio->fd, buf, sz, flags, + src_addr, &addrlen); + if (nrd >= 0 || !sio_wouldblock(errno)) + return nrd; + + ev_io_set(coio, coio->fd, EV_READ); + ev_io_start(loop(), coio); + do { /* * Yield control to other fibers until * timeout is reached or the socket is * ready. */ - bool is_timedout = coio_fiber_yield_timeout(coio, - delay); - fiber_testcancel(); + bool is_timedout = coio_fiber_yield_timeout(coio, delay); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + break; + } if (is_timedout) - tnt_raise(TimedOut); + break; coio_timeout_update(&start, &delay); - } + nrd = sio_recvfrom(coio->fd, buf, sz, flags, + src_addr, &addrlen); + } while (nrd < 0 && sio_wouldblock(errno)); + + ev_io_stop(loop(), coio); + + return nrd; } static int @@ -655,12 +753,13 @@ coio_service_init(struct coio_service *service, const char *name, service->handler_param = handler_param; } -void +int coio_service_start(struct evio_service *service, const char *uri) { if (evio_service_bind(service, uri) != 0 || evio_service_listen(service) != 0) - diag_raise(); + return -1; + return 0; } void @@ -678,7 +777,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout) coio_timeout_init(&start, &delay, timeout); fiber_yield_timeout(delay); ev_stat_stop(loop(), stat); - fiber_testcancel(); } typedef void (*ev_child_cb)(ev_loop *, ev_child *, int); @@ -706,7 +804,6 @@ coio_waitpid(pid_t pid) fiber_set_cancellable(allow_cancel); ev_child_stop(loop(), &cw); int status = cw.rstatus; - fiber_testcancel(); return status; } diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h index 6a2337689..4267a0459 100644 --- a/src/lib/core/coio.h +++ b/src/lib/core/coio.h @@ -33,6 +33,9 @@ #include "fiber.h" #include "trivia/util.h" #if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + #include "evio.h" /** @@ -59,10 +62,6 @@ coio_connect(struct ev_io *coio, struct uri *uri, struct sockaddr *addr, return coio_connect_timeout(coio, uri, addr, addr_len, TIMEOUT_INFINITY); } -void -coio_bind(struct ev_io *coio, struct sockaddr *addr, - socklen_t addrlen); - int coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen, ev_tstamp timeout); @@ -71,7 +70,7 @@ void coio_create(struct ev_io *coio, int fd); static inline void -coio_close(ev_loop *loop, struct ev_io *coio) +coio_destroy(ev_loop *loop, struct ev_io *coio) { return evio_close(loop, coio); } @@ -141,12 +140,12 @@ coio_write(struct ev_io *coio, const void *buf, size_t sz) ssize_t coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt, - size_t size, ev_tstamp timeout); + ev_tstamp timeout); static inline ssize_t -coio_writev(struct ev_io *coio, struct iovec *iov, int iovcnt, size_t size) +coio_writev(struct ev_io *coio, struct iovec *iov, int iovcnt) { - return coio_writev_timeout(coio, iov, iovcnt, size, TIMEOUT_INFINITY); + return coio_writev_timeout(coio, iov, iovcnt, TIMEOUT_INFINITY); } ssize_t @@ -164,7 +163,7 @@ coio_service_init(struct coio_service *service, const char *name, fiber_func handler, void *handler_param); /** Wait until the service binds to the port. */ -void +int coio_service_start(struct evio_service *service, const char *uri); void @@ -185,8 +184,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay); int coio_waitpid(pid_t pid); -extern "C" { -#endif /* defined(__cplusplus) */ /** \cond public */ diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h index 1ad104985..3a83f8fe1 100644 --- a/src/lib/core/coio_buf.h +++ b/src/lib/core/coio_buf.h @@ -45,6 +45,8 @@ coio_bread(struct ev_io *coio, struct ibuf *buf, size_t sz) { ibuf_reserve_xc(buf, sz); ssize_t n = coio_read_ahead(coio, buf->wpos, sz, ibuf_unused(buf)); + if (n < 0) + return -1; buf->wpos += n; return n; } @@ -61,6 +63,8 @@ coio_bread_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz, ibuf_reserve_xc(buf, sz); ssize_t n = coio_read_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf), timeout); + if (n < 0) + return -1; buf->wpos += n; return n; } @@ -71,6 +75,8 @@ coio_breadn(struct ev_io *coio, struct ibuf *buf, size_t sz) { ibuf_reserve_xc(buf, sz); ssize_t n = coio_readn_ahead(coio, buf->wpos, sz, ibuf_unused(buf)); + if (n < 0) + return -1; buf->wpos += n; return n; } @@ -87,6 +93,8 @@ coio_breadn_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz, ibuf_reserve_xc(buf, sz); ssize_t n = coio_readn_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf), timeout); + if (n < 0) + return -1; buf->wpos += n; return n; } -- 2.25.0