[Tarantool-patches] [PATCH v4 04/11] coio: do not throw an error, minor refactoring
Georgy Kirichenko
georgy at tarantool.org
Wed Feb 12 12:39:13 MSK 2020
Relaying from C-written wal requires coio and xrow_io to be
a C-compliant. So get rid of exception from coio interface.
Also this patch includes some minor refactoring (as code looks ugly
without them):
1. Get rid of unused size_hint from coio_writev_timeout.
2. Handle partial read/write before yield loop.
3. Do not reset errno to 0 in case of reading EOF.
Part of #980
---
src/box/applier.cc | 49 ++--
src/box/box.cc | 9 +-
src/box/relay.cc | 11 +-
src/box/xrow_io.cc | 59 ++---
src/box/xrow_io.h | 11 +-
src/lib/core/coio.cc | 535 ++++++++++++++++++++++++----------------
src/lib/core/coio.h | 19 +-
src/lib/core/coio_buf.h | 8 +
8 files changed, 413 insertions(+), 288 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index ae3d281a5..ad427707a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -173,8 +173,9 @@ applier_writer_f(va_list ap)
continue;
try {
struct xrow_header xrow;
- xrow_encode_vclock(&xrow, &replicaset.vclock);
- coio_write_xrow(&io, &xrow);
+ if (xrow_encode_vclock(&xrow, &replicaset.vclock) != 0 ||
+ coio_write_xrow(&io, &xrow) < 0)
+ diag_raise();
} catch (SocketError *e) {
/*
* There is no point trying to send ACKs if
@@ -308,9 +309,11 @@ applier_connect(struct applier *applier)
*/
applier->addr_len = sizeof(applier->addrstorage);
applier_set_state(applier, APPLIER_CONNECT);
- coio_connect(coio, uri, &applier->addr, &applier->addr_len);
+ if (coio_connect(coio, uri, &applier->addr, &applier->addr_len) != 0)
+ diag_raise();
assert(coio->fd >= 0);
- coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE);
+ if (coio_readn(coio, greetingbuf, IPROTO_GREETING_SIZE) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
/* Decode instance version and name from greeting */
@@ -345,8 +348,9 @@ applier_connect(struct applier *applier)
* election on bootstrap.
*/
xrow_encode_vote(&row);
- coio_write_xrow(coio, &row);
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_write_xrow(coio, &row) < 0 ||
+ coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
if (row.type == IPROTO_OK) {
xrow_decode_ballot_xc(&row, &applier->ballot);
} else try {
@@ -376,8 +380,9 @@ applier_connect(struct applier *applier)
uri->login_len,
uri->password != NULL ? uri->password : "",
uri->password_len);
- coio_write_xrow(coio, &row);
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_write_xrow(coio, &row) < 0 ||
+ coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
if (row.type != IPROTO_OK)
xrow_decode_error_xc(&row); /* auth failed */
@@ -401,7 +406,8 @@ applier_wait_snapshot(struct applier *applier)
*/
if (applier->version_id >= version_id(1, 7, 0)) {
/* Decode JOIN/FETCH_SNAPSHOT response */
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row); /* re-throw error */
} else if (row.type != IPROTO_OK) {
@@ -422,7 +428,8 @@ applier_wait_snapshot(struct applier *applier)
*/
uint64_t row_count = 0;
while (true) {
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
if (apply_snapshot_row(&row) != 0)
@@ -488,7 +495,8 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
* Receive final data.
*/
while (true) {
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
@@ -605,10 +613,13 @@ applier_read_tx_row(struct applier *applier)
* from the master for quite a while the connection is
* broken - the master might just be idle.
*/
- if (applier->version_id < version_id(1, 7, 7))
- coio_read_xrow(coio, ibuf, row);
- else
- coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+ if (applier->version_id < version_id(1, 7, 7)) {
+ if (coio_read_xrow(coio, ibuf, row) < 0)
+ diag_raise();
+ } else {
+ if (coio_read_xrow_timeout(coio, ibuf, row, timeout) < 0)
+ diag_raise();
+ }
applier->lag = ev_now(loop()) - row->tm;
applier->last_row_time = ev_monotonic_now(loop());
@@ -868,11 +879,13 @@ applier_subscribe(struct applier *applier)
vclock_copy(&vclock, &replicaset.vclock);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&vclock, replication_anon);
- coio_write_xrow(coio, &row);
+ if (coio_write_xrow(coio, &row) < 0)
+ diag_raise();
/* Read SUBSCRIBE response */
if (applier->version_id >= version_id(1, 6, 7)) {
- coio_read_xrow(coio, ibuf, &row);
+ if (coio_read_xrow(coio, ibuf, &row) < 0)
+ diag_raise();
if (iproto_type_is_error(row.type)) {
xrow_decode_error_xc(&row); /* error */
} else if (row.type != IPROTO_OK) {
@@ -1009,7 +1022,7 @@ applier_disconnect(struct applier *applier, enum applier_state state)
applier->writer = NULL;
}
- coio_close(loop(), &applier->io);
+ coio_destroy(loop(), &applier->io);
/* Clear all unparsed input. */
ibuf_reinit(&applier->ibuf);
fiber_gc();
diff --git a/src/box/box.cc b/src/box/box.cc
index 611100b8b..ca1696383 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1741,7 +1741,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
struct xrow_header row;
xrow_encode_vclock_xc(&row, &stop_vclock);
row.sync = header->sync;
- coio_write_xrow(io, &row);
+ if (coio_write_xrow(io, &row) < 0)
+ diag_raise();
/*
* Final stage: feed replica with WALs in range
@@ -1753,7 +1754,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
/* Send end of WAL stream marker */
xrow_encode_vclock_xc(&row, &replicaset.vclock);
row.sync = header->sync;
- coio_write_xrow(io, &row);
+ if (coio_write_xrow(io, &row) < 0)
+ diag_raise();
/*
* Advance the WAL consumer state to the position where
@@ -1845,7 +1847,8 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
assert(self != NULL); /* the local registration is read-only */
row.replica_id = self->id;
row.sync = header->sync;
- coio_write_xrow(io, &row);
+ if (coio_write_xrow(io, &row) < 0)
+ diag_raise();
say_info("subscribed replica %s at %s",
tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
diff --git a/src/box/relay.cc b/src/box/relay.cc
index d5a1c9c68..bb7761b99 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -317,7 +317,8 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
struct xrow_header row;
xrow_encode_vclock_xc(&row, vclock);
row.sync = sync;
- coio_write_xrow(&relay->io, &row);
+ if (coio_write_xrow(&relay->io, &row) < 0)
+ diag_raise();
/* Send read view to the replica. */
engine_join_xc(&ctx, &relay->stream);
@@ -516,8 +517,9 @@ relay_reader_f(va_list ap)
try {
while (!fiber_is_cancelled()) {
struct xrow_header xrow;
- coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
- replication_disconnect_timeout());
+ if (coio_read_xrow_timeout(&io, &ibuf, &xrow,
+ replication_disconnect_timeout()) < 0)
+ diag_raise();
/* vclock is followed while decoding, zeroing it. */
vclock_create(&relay->recv_vclock);
xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
@@ -721,7 +723,8 @@ relay_send(struct relay *relay, struct xrow_header *packet)
packet->sync = relay->sync;
relay->last_row_time = ev_monotonic_now(loop());
- coio_write_xrow(&relay->io, packet);
+ if (coio_write_xrow(&relay->io, packet) < 0)
+ diag_raise();
fiber_gc();
struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 48707982b..4e79cd2f0 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -35,71 +35,74 @@
#include "error.h"
#include "msgpuck/msgpuck.h"
-void
+ssize_t
coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row)
{
/* Read fixed header */
- if (ibuf_used(in) < 1)
- coio_breadn(coio, in, 1);
+ if (ibuf_used(in) < 1 && coio_breadn(coio, in, 1) < 0)
+ return -1;
/* Read length */
if (mp_typeof(*in->rpos) != MP_UINT) {
- tnt_raise(ClientError, ER_INVALID_MSGPACK,
- "packet length");
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "packet length");
+ return -1;
}
ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
- if (to_read > 0)
- coio_breadn(coio, in, to_read);
+ if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+ return -1;
uint32_t len = mp_decode_uint((const char **) &in->rpos);
/* Read header and body */
to_read = len - ibuf_used(in);
- if (to_read > 0)
- coio_breadn(coio, in, to_read);
+ if (to_read > 0 && coio_breadn(coio, in, to_read) < 0)
+ return -1;
- xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
- true);
+ return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+ true);
}
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
- struct xrow_header *row, ev_tstamp timeout)
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+ struct xrow_header *row, ev_tstamp timeout)
{
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
/* Read fixed header */
- if (ibuf_used(in) < 1)
- coio_breadn_timeout(coio, in, 1, delay);
+ if (ibuf_used(in) < 1 && coio_breadn_timeout(coio, in, 1, delay) < 0)
+ return -1;
coio_timeout_update(&start, &delay);
/* Read length */
if (mp_typeof(*in->rpos) != MP_UINT) {
- tnt_raise(ClientError, ER_INVALID_MSGPACK,
- "packet length");
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "packet length");
+ return -1;
}
ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
- if (to_read > 0)
- coio_breadn_timeout(coio, in, to_read, delay);
+ if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+ return -1;
coio_timeout_update(&start, &delay);
uint32_t len = mp_decode_uint((const char **) &in->rpos);
/* Read header and body */
to_read = len - ibuf_used(in);
- if (to_read > 0)
- coio_breadn_timeout(coio, in, to_read, delay);
+ if (to_read > 0 && coio_breadn_timeout(coio, in, to_read, delay) < 0)
+ return -1;
- xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
- true);
+ return xrow_header_decode(row, (const char **) &in->rpos, in->rpos + len,
+ true);
}
-
-void
+ssize_t
coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
{
struct iovec iov[XROW_IOVMAX];
- int iovcnt = xrow_to_iovec_xc(row, iov);
- coio_writev(coio, iov, iovcnt, 0);
+ int iovcnt = xrow_to_iovec(row, iov);
+ if (iovcnt < 0)
+ return -1;
+ return coio_writev(coio, iov, iovcnt);
}
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..96c5047b7 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -30,6 +30,7 @@
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
+#include "unistd.h"
#if defined(__cplusplus)
extern "C" {
#endif
@@ -38,14 +39,14 @@ struct ev_io;
struct ibuf;
struct xrow_header;
-void
+ssize_t
coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header *row);
-void
-coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
- struct xrow_header *row, double timeout);
+ssize_t
+coio_read_xrow_timeout(struct ev_io *coio, struct ibuf *in,
+ struct xrow_header *row, double timeout);
-void
+ssize_t
coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index faa7e5bd5..8ae6930de 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -41,12 +41,6 @@
#include "scoped_guard.h"
#include "coio_task.h" /* coio_resolve() */
-struct CoioGuard {
- struct ev_io *ev_io;
- CoioGuard(struct ev_io *arg) :ev_io(arg) {}
- ~CoioGuard() { ev_io_stop(loop(), ev_io); }
-};
-
typedef void (*ev_stat_cb)(ev_loop *, ev_stat *, int);
/** Note: this function does not throw */
@@ -65,12 +59,14 @@ coio_fiber_yield_timeout(struct ev_io *coio, ev_tstamp delay)
coio->data = fiber();
bool is_timedout = fiber_yield_timeout(delay);
coio->data = NULL;
+ if (is_timedout)
+ diag_set(TimedOut);
return is_timedout;
}
/**
* Connect to a host with a specified timeout.
- * @retval -1 timeout
+ * @retval -1 error or timeout
* @retval 0 connected
*/
static int
@@ -79,36 +75,46 @@ coio_connect_addr(struct ev_io *coio, struct sockaddr *addr,
{
ev_loop *loop = loop();
if (evio_socket(coio, addr->sa_family, SOCK_STREAM, 0) != 0)
- diag_raise();
- auto coio_guard = make_scoped_guard([=]{ evio_close(loop, coio); });
- if (sio_connect(coio->fd, addr, len) == 0) {
- coio_guard.is_active = false;
+ return -1;
+ if (sio_connect(coio->fd, addr, len) == 0)
return 0;
+ if (errno != EINPROGRESS) {
+ evio_close(loop, coio);
+ return -1;
}
- if (errno != EINPROGRESS)
- diag_raise();
/*
* Wait until socket is ready for writing or
* timed out.
*/
ev_io_set(coio, coio->fd, EV_WRITE);
ev_io_start(loop, coio);
- bool is_timedout = coio_fiber_yield_timeout(coio, timeout);
+ bool is_timedout;
+ is_timedout = coio_fiber_yield_timeout(coio, timeout);
ev_io_stop(loop, coio);
- fiber_testcancel();
- if (is_timedout)
- tnt_raise(TimedOut);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ evio_close(loop, coio);
+ return -1;
+ }
+ if (is_timedout) {
+ evio_close(loop, coio);
+ return -1;
+ }
int error = EINPROGRESS;
socklen_t sz = sizeof(error);
if (sio_getsockopt(coio->fd, SOL_SOCKET, SO_ERROR,
- &error, &sz))
- diag_raise();
+ &error, &sz)) {
+ evio_close(loop, coio);
+ return -1;
+ }
if (error != 0) {
errno = error;
- tnt_raise(SocketError, sio_socketname(coio->fd), "connect");
+ diag_set(SocketError, sio_socketname(coio->fd), "connect");
+ evio_close(loop, coio);
+ return -1;
}
- coio_guard.is_active = false;
return 0;
+
}
void
@@ -152,7 +158,7 @@ coio_fill_addrinfo(struct addrinfo *ai_local, const char *host,
* This function also supports UNIX domain sockets if uri->path is not NULL and
* uri->service is NULL.
*
- * @retval -1 timeout
+ * @retval -1 error or timeout
* @retval 0 connected
*/
int
@@ -201,52 +207,55 @@ coio_connect_timeout(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
hints.ai_flags = AI_ADDRCONFIG|AI_NUMERICSERV|AI_PASSIVE;
hints.ai_protocol = 0;
int rc = coio_getaddrinfo(host, service, &hints, &ai, delay);
- if (rc != 0) {
- diag_raise();
- panic("unspecified getaddrinfo error");
- }
+ if (rc != 0)
+ return -1;
}
- auto addrinfo_guard = make_scoped_guard([=] {
- if (!uri->host_hint) freeaddrinfo(ai);
- else free(ai_local.ai_addr);
- });
+ struct addrinfo *first_ai = ai;
evio_timeout_update(loop(), &start, &delay);
coio_timeout_init(&start, &delay, timeout);
assert(! evio_has_fd(coio));
- while (ai) {
- try {
- if (coio_connect_addr(coio, ai->ai_addr,
- ai->ai_addrlen, delay))
- return -1;
+ while (ai && delay >= 0) {
+ if (coio_connect_addr(coio, ai->ai_addr,
+ ai->ai_addrlen, delay) == 0) {
if (addr != NULL) {
assert(addr_len != NULL);
*addr_len = MIN(ai->ai_addrlen, *addr_len);
memcpy(addr, ai->ai_addr, *addr_len);
}
+ if (!uri->host_hint)
+ freeaddrinfo(first_ai);
+ else
+ free(ai_local.ai_addr);
return 0; /* connected */
- } catch (SocketError *e) {
- if (ai->ai_next == NULL)
- throw;
- /* ignore exception and try the next address */
}
ai = ai->ai_next;
ev_now_update(loop);
coio_timeout_update(&start, &delay);
}
- tnt_raise(SocketError, sio_socketname(coio->fd), "connection failed");
+ /* Set an error if not timedout. */
+ if (delay >= 0)
+ diag_set(SocketError, sio_socketname(coio->fd),
+ "connection failed");
+ if (!uri->host_hint)
+ freeaddrinfo(first_ai);
+ else
+ free(ai_local.ai_addr);
+ return -1;
}
/* Do not allow to reuse coio by different fiber. */
-static inline void
+static inline int
check_coio_in_use(struct ev_io *coio)
{
if (ev_is_active(coio)) {
errno = EINPROGRESS;
- tnt_raise(SocketError, sio_socketname(coio->fd),
- "already in use");
+ diag_set(SocketError, sio_socketname(coio->fd),
+ "already in use");
+ return -1;
}
+ return 0;
}
/**
@@ -260,45 +269,61 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- check_coio_in_use(coio);
- CoioGuard coio_guard(coio);
-
- while (true) {
- /* Assume that there are waiting clients
- * available */
- int fd = sio_accept(coio->fd, addr, &addrlen);
- if (fd >= 0) {
- if (evio_setsockopt_client(fd, addr->sa_family,
- SOCK_STREAM) != 0) {
- close(fd);
- diag_raise();
- }
- return fd;
- }
- if (! sio_wouldblock(errno))
- diag_raise();
- /* The socket is not ready, yield */
- if (! ev_is_active(coio)) {
- ev_io_set(coio, coio->fd, EV_READ);
- ev_io_start(loop(), coio);
+ if (check_coio_in_use(coio) != 0)
+ return -1;
+
+ /* Assume that there are waiting clients available */
+ int fd = sio_accept(coio->fd, addr, &addrlen);
+
+ if (fd >= 0) {
+ if (evio_setsockopt_client(fd, addr->sa_family,
+ SOCK_STREAM) != 0) {
+ close(fd);
+ return -1;
}
+ return fd;
+ }
+
+ if (!sio_wouldblock(errno))
+ return -1;
+
+ /* The socket is not ready, yield */
+ ev_io_set(coio, coio->fd, EV_READ);
+ ev_io_start(loop(), coio);
+
+ do {
/*
* Yield control to other fibers until the
* timeout is reached.
*/
bool is_timedout = coio_fiber_yield_timeout(coio, delay);
- fiber_testcancel();
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
+ fd = sio_accept(coio->fd, addr, &addrlen);
+ } while (fd < 0 && sio_wouldblock(errno));
+
+ ev_io_stop(loop(), coio);
+
+ if (fd >= 0) {
+ if (evio_setsockopt_client(fd, addr->sa_family,
+ SOCK_STREAM) != 0) {
+ close(fd);
+ return -1;
+ }
+ return fd;
}
+ return -1;
}
/**
* Read at least sz bytes from socket with readahead.
*
- * In case of EOF returns the amount read until eof (possibly 0),
- * and sets errno to 0.
+ * In case of EOF returns the amount read until eof (possibly 0).
* Can read up to bufsiz bytes.
*
* @retval the number of bytes read.
@@ -313,46 +338,68 @@ coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz,
coio_timeout_init(&start, &delay, timeout);
ssize_t to_read = (ssize_t) sz;
+ if (to_read <= 0)
+ return 0;
- check_coio_in_use(coio);
- CoioGuard coio_guard(coio);
+ if (check_coio_in_use(coio) != 0)
+ return -1;
- while (true) {
- /*
- * Sic: assume the socket is ready: since
- * the user called read(), some data must
- * be expected.
- */
- ssize_t nrd = sio_read(coio->fd, buf, bufsiz);
- if (nrd > 0) {
- to_read -= nrd;
- if (to_read <= 0)
- return sz - to_read;
- buf = (char *) buf + nrd;
- bufsiz -= nrd;
- } else if (nrd == 0) {
- errno = 0;
- return sz - to_read;
- } else if (! sio_wouldblock(errno)) {
- diag_raise();
- }
+ ssize_t nrd;
+ /*
+ * Sic: assume the socket is ready: since
+ * the user called read(), some data must
+ * be expected.
+ */
+ do {
+ nrd = sio_read(coio->fd, buf, bufsiz);
+ if (nrd <= 0)
+ break;
+ to_read -= nrd;
+ buf = (char *) buf + nrd;
+ bufsiz -= nrd;
+ } while (to_read > 0);
+
+ if (nrd >= 0)
+ return sz - to_read;
+
+ if (!sio_wouldblock(errno))
+ return -1;
- /* The socket is not ready, yield */
- if (! ev_is_active(coio)) {
- ev_io_set(coio, coio->fd, EV_READ);
- ev_io_start(loop(), coio);
- }
+ /* The socket is not ready, yield */
+ ev_io_set(coio, coio->fd, EV_READ);
+ ev_io_start(loop(), coio);
+
+ do {
/*
* Yield control to other fibers until the
* timeout is being reached.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
- }
+ nrd = sio_read(coio->fd, buf, bufsiz);
+ if (nrd == 0)
+ break;
+ if (nrd < 0) {
+ if (sio_wouldblock(errno))
+ continue;
+ break;
+ }
+ to_read -= nrd;
+ buf = (char *) buf + nrd;
+ bufsiz -= nrd;
+ } while (to_read > 0);
+
+ ev_io_stop(loop(), coio);
+
+ if (nrd < 0)
+ return -1;
+ return sz - to_read;
}
/**
@@ -366,10 +413,13 @@ ssize_t
coio_readn_ahead(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz)
{
ssize_t nrd = coio_read_ahead(coio, buf, sz, bufsiz);
+ if (nrd < 0)
+ return -1;
if (nrd < (ssize_t)sz) {
errno = EPIPE;
- tnt_raise(SocketError, sio_socketname(coio->fd),
- "unexpected EOF when reading from socket");
+ diag_set(SocketError, sio_socketname(coio->fd),
+ "unexpected EOF when reading from socket");
+ return -1;
}
return nrd;
}
@@ -386,10 +436,13 @@ coio_readn_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz
ev_tstamp timeout)
{
ssize_t nrd = coio_read_ahead_timeout(coio, buf, sz, bufsiz, timeout);
- if (nrd < (ssize_t)sz && errno == 0) { /* EOF. */
+ if (nrd < 0)
+ return -1;
+ if (nrd < (ssize_t)sz) { /* EOF. */
errno = EPIPE;
- tnt_raise(SocketError, sio_socketname(coio->fd),
- "unexpected EOF when reading from socket");
+ diag_set(SocketError, sio_socketname(coio->fd),
+ "unexpected EOF when reading from socket");
+ return -1;
}
return nrd;
}
@@ -412,43 +465,62 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- check_coio_in_use(coio);
- CoioGuard coio_guard(coio);
+ if (towrite <= 0)
+ return 0;
+ if (check_coio_in_use(coio) != 0)
+ return -1;
+
+ ssize_t nwr;
+ /*
+ * Sic: write as much data as possible,
+ * assuming the socket is ready.
+ */
+ do {
+ nwr = sio_write(coio->fd, buf, towrite);
+ if (nwr < 0)
+ break;
+ towrite -= nwr;
+ buf = (char *) buf + nwr;
+ } while (towrite > 0);
+
+ if (nwr > 0)
+ return sz;
+
+ if (!sio_wouldblock(errno))
+ return -1;
- while (true) {
- /*
- * Sic: write as much data as possible,
- * assuming the socket is ready.
- */
- ssize_t nwr = sio_write(coio->fd, buf, towrite);
- if (nwr > 0) {
- /* Go past the data just written. */
- if (nwr >= towrite)
- return sz;
- towrite -= nwr;
- buf = (char *) buf + nwr;
- } else if (nwr < 0 && !sio_wouldblock(errno)) {
- diag_raise();
- }
- if (! ev_is_active(coio)) {
- ev_io_set(coio, coio->fd, EV_WRITE);
- ev_io_start(loop(), coio);
- }
- /* Yield control to other fibers. */
- fiber_testcancel();
+ ev_io_set(coio, coio->fd, EV_WRITE);
+ ev_io_start(loop(), coio);
+
+ do {
/*
* Yield control to other fibers until the
* timeout is reached or the socket is
* ready.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
-
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
- }
+ nwr = sio_write(coio->fd, buf, towrite);
+ if (nwr < 0) {
+ if (sio_wouldblock(errno))
+ continue;
+ break;
+ }
+ towrite -= nwr;
+ buf = (char *) buf + nwr;
+ } while (towrite > 0);
+
+ ev_io_stop(loop(), coio);
+
+ if (nwr < 0)
+ return -1;
+ return sz;
}
/*
@@ -456,66 +528,83 @@ coio_write_timeout(struct ev_io *coio, const void *buf, size_t sz,
* Put in an own function to workaround gcc bug with @finally
*/
static inline ssize_t
-coio_flush(int fd, struct iovec *iov, ssize_t offset, int iovcnt)
+coio_flush(int fd, struct iovec **iov, size_t *offset, int iovcnt)
{
- sio_add_to_iov(iov, -offset);
- ssize_t nwr = sio_writev(fd, iov, iovcnt);
- sio_add_to_iov(iov, offset);
- if (nwr < 0 && ! sio_wouldblock(errno))
- diag_raise();
+ sio_add_to_iov(*iov, -*offset);
+ ssize_t nwr = sio_writev(fd, *iov, iovcnt);
+ sio_add_to_iov(*iov, *offset);
+ if (nwr < 0 && !sio_wouldblock(errno))
+ return -1;
+ if (nwr < 0)
+ return 0;
+ /* Successful write adjust iov and offset. */
+ *iov += sio_move_iov(*iov, nwr, offset);
return nwr;
}
ssize_t
coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
- size_t size_hint, ev_tstamp timeout)
+ ev_tstamp timeout)
{
size_t total = 0;
size_t iov_len = 0;
struct iovec *end = iov + iovcnt;
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- check_coio_in_use(coio);
- CoioGuard coio_guard(coio);
+ if (iovcnt == 0)
+ return 0;
+
+ if (check_coio_in_use(coio) != 0)
+ return -1;
+
+ ssize_t nwr;
/* Avoid a syscall in case of 0 iovcnt. */
- while (iov < end) {
- /* Write as much data as possible. */
- ssize_t nwr = coio_flush(coio->fd, iov, iov_len,
- end - iov);
- if (nwr >= 0) {
- total += nwr;
- /*
- * If there was a hint for the total size
- * of the vector, use it.
- */
- if (size_hint > 0 && size_hint == total)
- break;
-
- iov += sio_move_iov(iov, nwr, &iov_len);
- if (iov == end) {
- assert(iov_len == 0);
- break;
- }
- }
- if (! ev_is_active(coio)) {
- ev_io_set(coio, coio->fd, EV_WRITE);
- ev_io_start(loop(), coio);
- }
- /* Yield control to other fibers. */
- fiber_testcancel();
+ do {
+ nwr = coio_flush(coio->fd, &iov, &iov_len, end - iov);
+ if (nwr < 0)
+ break;
+ total += nwr;
+
+ } while (iov < end);
+
+ assert(nwr < 0 || iov_len == 0);
+ if (nwr > 0)
+ return total;
+
+ if (!sio_wouldblock(errno))
+ return -1;
+
+ ev_io_set(coio, coio->fd, EV_WRITE);
+ ev_io_start(loop(), coio);
+ do {
/*
* Yield control to other fibers until the
* timeout is reached or the socket is
* ready.
*/
bool is_timedout = coio_fiber_yield_timeout(coio, delay);
- fiber_testcancel();
-
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
- }
+ nwr = coio_flush(coio->fd, &iov, &iov_len, end - iov);
+ if (nwr < 0) {
+ if (sio_wouldblock(errno))
+ continue;
+ break;
+ }
+ total += nwr;
+ } while (iov < end);
+
+ ev_io_stop(loop(), coio);
+
+ if (nwr < 0)
+ return -1;
+ assert(iov_len == 0);
return total;
}
@@ -533,36 +622,40 @@ coio_sendto_timeout(struct ev_io *coio, const void *buf, size_t sz, int flags,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- check_coio_in_use(coio);
- CoioGuard coio_guard(coio);
+ if (check_coio_in_use(coio) != 0)
+ return -1;
- while (true) {
- /*
- * Sic: write as much data as possible,
- * assuming the socket is ready.
- */
- ssize_t nwr = sio_sendto(coio->fd, buf, sz,
- flags, dest_addr, addrlen);
- if (nwr > 0)
- return nwr;
- if (nwr < 0 && ! sio_wouldblock(errno))
- diag_raise();
- if (! ev_is_active(coio)) {
- ev_io_set(coio, coio->fd, EV_WRITE);
- ev_io_start(loop(), coio);
- }
+ /*
+ * Sic: write as much data as possible,
+ * assuming the socket is ready.
+ */
+ ssize_t nwr = sio_sendto(coio->fd, buf, sz, flags, dest_addr, addrlen);
+ if (nwr > 0 || !sio_wouldblock(errno))
+ return nwr;
+
+ ev_io_set(coio, coio->fd, EV_WRITE);
+ ev_io_start(loop(), coio);
+ do {
/*
* Yield control to other fibers until
* timeout is reached or the socket is
* ready.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
- }
+ nwr = sio_sendto(coio->fd, buf, sz, flags, dest_addr,
+ addrlen);
+ } while (nwr < 0 && sio_wouldblock(errno));
+
+ ev_io_stop(loop(), coio);
+
+ return nwr;
}
/**
@@ -579,36 +672,41 @@ coio_recvfrom_timeout(struct ev_io *coio, void *buf, size_t sz, int flags,
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
- check_coio_in_use(coio);
- CoioGuard coio_guard(coio);
+ if (check_coio_in_use(coio) != 0)
+ return -1;
- while (true) {
- /*
- * Read as much data as possible,
- * assuming the socket is ready.
- */
- ssize_t nrd = sio_recvfrom(coio->fd, buf, sz, flags,
- src_addr, &addrlen);
- if (nrd >= 0)
- return nrd;
- if (! sio_wouldblock(errno))
- diag_raise();
- if (! ev_is_active(coio)) {
- ev_io_set(coio, coio->fd, EV_READ);
- ev_io_start(loop(), coio);
- }
+ /*
+ * Read as much data as possible,
+ * assuming the socket is ready.
+ */
+ ssize_t nrd = sio_recvfrom(coio->fd, buf, sz, flags,
+ src_addr, &addrlen);
+ if (nrd >= 0 || !sio_wouldblock(errno))
+ return nrd;
+
+ ev_io_set(coio, coio->fd, EV_READ);
+ ev_io_start(loop(), coio);
+ do {
/*
* Yield control to other fibers until
* timeout is reached or the socket is
* ready.
*/
- bool is_timedout = coio_fiber_yield_timeout(coio,
- delay);
- fiber_testcancel();
+ bool is_timedout = coio_fiber_yield_timeout(coio, delay);
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ break;
+ }
if (is_timedout)
- tnt_raise(TimedOut);
+ break;
coio_timeout_update(&start, &delay);
- }
+ nrd = sio_recvfrom(coio->fd, buf, sz, flags,
+ src_addr, &addrlen);
+ } while (nrd < 0 && sio_wouldblock(errno));
+
+ ev_io_stop(loop(), coio);
+
+ return nrd;
}
static int
@@ -655,12 +753,13 @@ coio_service_init(struct coio_service *service, const char *name,
service->handler_param = handler_param;
}
-void
+int
coio_service_start(struct evio_service *service, const char *uri)
{
if (evio_service_bind(service, uri) != 0 ||
evio_service_listen(service) != 0)
- diag_raise();
+ return -1;
+ return 0;
}
void
@@ -678,7 +777,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp timeout)
coio_timeout_init(&start, &delay, timeout);
fiber_yield_timeout(delay);
ev_stat_stop(loop(), stat);
- fiber_testcancel();
}
typedef void (*ev_child_cb)(ev_loop *, ev_child *, int);
@@ -706,7 +804,6 @@ coio_waitpid(pid_t pid)
fiber_set_cancellable(allow_cancel);
ev_child_stop(loop(), &cw);
int status = cw.rstatus;
- fiber_testcancel();
return status;
}
diff --git a/src/lib/core/coio.h b/src/lib/core/coio.h
index 6a2337689..4267a0459 100644
--- a/src/lib/core/coio.h
+++ b/src/lib/core/coio.h
@@ -33,6 +33,9 @@
#include "fiber.h"
#include "trivia/util.h"
#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
#include "evio.h"
/**
@@ -59,10 +62,6 @@ coio_connect(struct ev_io *coio, struct uri *uri, struct sockaddr *addr,
return coio_connect_timeout(coio, uri, addr, addr_len, TIMEOUT_INFINITY);
}
-void
-coio_bind(struct ev_io *coio, struct sockaddr *addr,
- socklen_t addrlen);
-
int
coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen,
ev_tstamp timeout);
@@ -71,7 +70,7 @@ void
coio_create(struct ev_io *coio, int fd);
static inline void
-coio_close(ev_loop *loop, struct ev_io *coio)
+coio_destroy(ev_loop *loop, struct ev_io *coio)
{
return evio_close(loop, coio);
}
@@ -141,12 +140,12 @@ coio_write(struct ev_io *coio, const void *buf, size_t sz)
ssize_t
coio_writev_timeout(struct ev_io *coio, struct iovec *iov, int iovcnt,
- size_t size, ev_tstamp timeout);
+ ev_tstamp timeout);
static inline ssize_t
-coio_writev(struct ev_io *coio, struct iovec *iov, int iovcnt, size_t size)
+coio_writev(struct ev_io *coio, struct iovec *iov, int iovcnt)
{
- return coio_writev_timeout(coio, iov, iovcnt, size, TIMEOUT_INFINITY);
+ return coio_writev_timeout(coio, iov, iovcnt, TIMEOUT_INFINITY);
}
ssize_t
@@ -164,7 +163,7 @@ coio_service_init(struct coio_service *service, const char *name,
fiber_func handler, void *handler_param);
/** Wait until the service binds to the port. */
-void
+int
coio_service_start(struct evio_service *service, const char *uri);
void
@@ -185,8 +184,6 @@ coio_stat_stat_timeout(ev_stat *stat, ev_tstamp delay);
int
coio_waitpid(pid_t pid);
-extern "C" {
-#endif /* defined(__cplusplus) */
/** \cond public */
diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h
index 1ad104985..3a83f8fe1 100644
--- a/src/lib/core/coio_buf.h
+++ b/src/lib/core/coio_buf.h
@@ -45,6 +45,8 @@ coio_bread(struct ev_io *coio, struct ibuf *buf, size_t sz)
{
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_read_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
@@ -61,6 +63,8 @@ coio_bread_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_read_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
timeout);
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
@@ -71,6 +75,8 @@ coio_breadn(struct ev_io *coio, struct ibuf *buf, size_t sz)
{
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_readn_ahead(coio, buf->wpos, sz, ibuf_unused(buf));
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
@@ -87,6 +93,8 @@ coio_breadn_timeout(struct ev_io *coio, struct ibuf *buf, size_t sz,
ibuf_reserve_xc(buf, sz);
ssize_t n = coio_readn_ahead_timeout(coio, buf->wpos, sz, ibuf_unused(buf),
timeout);
+ if (n < 0)
+ return -1;
buf->wpos += n;
return n;
}
--
2.25.0
More information about the Tarantool-patches
mailing list