[Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
Serge Petrenko
sergepetrenko at tarantool.org
Tue Nov 16 13:17:31 MSK 2021
coio_accept, coio_read, coio_write, coio_writev used to handle spurious
wakeups correctly: if the timeout hasn't passed yet, they would simply
retry reading (or writing) and fall asleep once again if no data is
ready.
This behaviour changed in the following patches:
577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
iostream wrapper for socket I/O").
Now the functions timeout on the very first spurious wakeup.
Fix this, add the appropriate unit tests and a test_iostream
implementation for the ease of testing.
---
No ticket
Branch: https://github.com/tarantool/tarantool/tree/sp/coio-rw-fixup
src/lib/core/coio.c | 43 ++++++-------
test/unit/CMakeLists.txt | 2 +-
test/unit/coio.cc | 80 +++++++++++++++++++++++-
test/unit/coio.result | 9 +++
test/unit/test_iostream.c | 125 ++++++++++++++++++++++++++++++++++++++
test/unit/test_iostream.h | 69 +++++++++++++++++++++
6 files changed, 303 insertions(+), 25 deletions(-)
create mode 100644 test/unit/test_iostream.c
create mode 100644 test/unit/test_iostream.h
diff --git a/src/lib/core/coio.c b/src/lib/core/coio.c
index cc1cfa776..4c61c13ca 100644
--- a/src/lib/core/coio.c
+++ b/src/lib/core/coio.c
@@ -234,19 +234,19 @@ coio_accept(int sfd, struct sockaddr *addr, socklen_t addrlen,
}
if (!sio_wouldblock(errno))
return -1;
+ if (delay <= 0) {
+ diag_set(TimedOut);
+ return -1;
+ }
/*
* Yield control to other fibers until the
* timeout is reached.
*/
- int revents = coio_wait(sfd, EV_READ, delay);
+ coio_wait(sfd, EV_READ, delay);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
return -1;
}
- if (revents == 0) {
- diag_set(TimedOut);
- return -1;
- }
coio_timeout_update(&start, &delay);
}
}
@@ -287,20 +287,19 @@ coio_read_ahead_timeout(struct iostream *io, void *buf, size_t sz,
} else if (nrd == IOSTREAM_ERROR) {
return -1;
}
+ if (delay <= 0) {
+ diag_set(TimedOut);
+ return -1;
+ }
/*
* Yield control to other fibers until the
* timeout is being reached.
*/
- int revents = coio_wait(io->fd, iostream_status_to_events(nrd),
- delay);
+ coio_wait(io->fd, iostream_status_to_events(nrd), delay);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
return -1;
}
- if (revents == 0) {
- diag_set(TimedOut);
- return -1;
- }
coio_timeout_update(&start, &delay);
}
}
@@ -383,21 +382,20 @@ coio_write_timeout(struct iostream *io, const void *buf, size_t sz,
} else if (nwr == IOSTREAM_ERROR) {
return -1;
}
+ if (delay <= 0) {
+ diag_set(TimedOut);
+ return -1;
+ }
/*
* Yield control to other fibers until the
* timeout is reached or the socket is
* ready.
*/
- int revents = coio_wait(io->fd, iostream_status_to_events(nwr),
- delay);
+ coio_wait(io->fd, iostream_status_to_events(nwr), delay);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
return -1;
}
- if (revents == 0) {
- diag_set(TimedOut);
- return -1;
- }
coio_timeout_update(&start, &delay);
}
}
@@ -447,21 +445,20 @@ coio_writev_timeout(struct iostream *io, struct iovec *iov, int iovcnt,
} else if (nwr == IOSTREAM_ERROR) {
return -1;
}
+ if (delay <= 0) {
+ diag_set(TimedOut);
+ return -1;
+ }
/*
* Yield control to other fibers until the
* timeout is reached or the socket is
* ready.
*/
- int revents = coio_wait(io->fd, iostream_status_to_events(nwr),
- delay);
+ coio_wait(io->fd, iostream_status_to_events(nwr), delay);
if (fiber_is_cancelled()) {
diag_set(FiberIsCancelled);
return -1;
}
- if (revents == 0) {
- diag_set(TimedOut);
- return -1;
- }
coio_timeout_update(&start, &delay);
}
return total;
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 79b0402c7..9a3423b14 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -119,7 +119,7 @@ if (GLIBC_USED)
target_link_libraries(cbus_hang.test core unit stat)
endif ()
-add_executable(coio.test coio.cc core_test_utils.c)
+add_executable(coio.test coio.cc core_test_utils.c test_iostream.c)
target_link_libraries(coio.test core eio bit uri unit)
if (ENABLE_BUNDLED_MSGPUCK)
diff --git a/test/unit/coio.cc b/test/unit/coio.cc
index 9d107a22e..b3d8068b7 100644
--- a/test/unit/coio.cc
+++ b/test/unit/coio.cc
@@ -4,7 +4,7 @@
#include "coio_task.h"
#include "fio.h"
#include "unit.h"
-#include "unit.h"
+#include "test_iostream.h"
int
touch_f(va_list ap)
@@ -115,6 +115,82 @@ test_getaddrinfo(void)
footer();
}
+static int
+test_read_f(va_list ap)
+{
+ struct iostream *io = va_arg(ap, struct iostream *);
+ char buf[5];
+ int rc = coio_read(io, buf, sizeof(buf));
+ if (rc < (ssize_t)sizeof(buf))
+ return -1;
+ return 0;
+}
+
+static int
+test_write_f(va_list ap)
+{
+ struct iostream *io = va_arg(ap, struct iostream *);
+ const char str[] = "test";
+ int rc = coio_write_timeout(io, str, sizeof(str), TIMEOUT_INFINITY);
+ if (rc < (ssize_t)sizeof(str))
+ return -1;
+ return 0;
+}
+
+static int
+test_writev_f(va_list ap)
+{
+ struct iostream *io = va_arg(ap, struct iostream *);
+ const char str[] = "test";
+ struct iovec iov = {(void *)str, sizeof(str)};
+ int rc = coio_writev(io, &iov, 1, 0);
+ if (rc < (ssize_t)sizeof(str))
+ return -1;
+ return 0;
+}
+
+static void
+read_write_test(void)
+{
+ header();
+
+ fiber_func test_funcs[] = {
+ test_read_f,
+ test_write_f,
+ test_writev_f,
+ };
+ const char *descr[] = {
+ "read",
+ "write",
+ "writev",
+ };
+
+ int num_tests = sizeof(test_funcs) / sizeof(test_funcs[0]);
+ plan(2 * num_tests);
+
+ struct test_stream s;
+ test_stream_create(&s, 0, 0);
+ for (int i = 0; i < num_tests; i++) {
+ test_stream_reset(&s, 0, 0);
+ struct fiber *f = fiber_new_xc("rw_test", test_funcs[i]);
+ fiber_set_joinable(f, true);
+ fiber_start(f, &s.io);
+ fiber_wakeup(f);
+ fiber_sleep(0);
+ ok(!fiber_is_dead(f), "coio_%s handle spurious wakeup",
+ descr[i]);
+ test_stream_reset(&s, INT_MAX, INT_MAX);
+ fiber_wakeup(f);
+ int rc = fiber_join(f);
+ ok(rc == 0, "coio_%s success after a spurious wakeup",
+ descr[i]);
+ }
+ test_stream_destroy(&s);
+
+ check_plan();
+ footer();
+}
+
static int
main_f(va_list ap)
{
@@ -135,6 +211,8 @@ main_f(va_list ap)
test_getaddrinfo();
+ read_write_test();
+
ev_break(loop(), EVBREAK_ALL);
return 0;
}
diff --git a/test/unit/coio.result b/test/unit/coio.result
index 90b567140..368914bcd 100644
--- a/test/unit/coio.result
+++ b/test/unit/coio.result
@@ -12,3 +12,12 @@ ok 1 - getaddrinfo
ok 2 - getaddrinfo retval
ok 3 - getaddrinfo error message
*** test_getaddrinfo: done ***
+ *** read_write_test ***
+1..6
+ok 1 - coio_read handle spurious wakeup
+ok 2 - coio_read success after a spurious wakeup
+ok 3 - coio_write handle spurious wakeup
+ok 4 - coio_write success after a spurious wakeup
+ok 5 - coio_writev handle spurious wakeup
+ok 6 - coio_writev success after a spurious wakeup
+ *** read_write_test: done ***
diff --git a/test/unit/test_iostream.c b/test/unit/test_iostream.c
new file mode 100644
index 000000000..c50b63944
--- /dev/null
+++ b/test/unit/test_iostream.c
@@ -0,0 +1,125 @@
+/*
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
+ */
+
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <sys/errno.h>
+
+#include "test_iostream.h"
+#include "trivia/util.h"
+
+static ssize_t
+test_stream_read(struct iostream *io, void *buf, size_t count)
+{
+ struct test_stream_ctx *ctx = io->ctx;
+ io->fd = ctx->rfd;
+ (void)buf;
+ ssize_t nrd = 0;
+ if (ctx->avail_rd == 0)
+ return IOSTREAM_WANT_READ;
+ nrd = MIN(count, ctx->avail_rd);
+ ctx->avail_rd -= nrd;
+ return nrd;
+}
+
+static ssize_t
+test_stream_write(struct iostream *io, const void *buf, size_t count)
+{
+ assert(io->fd >= 0);
+ struct test_stream_ctx *ctx = io->ctx;
+ io->fd = ctx->wfd;
+ (void)buf;
+ ssize_t nwr = 0;
+ if (ctx->avail_wr == 0)
+ return IOSTREAM_WANT_WRITE;
+ nwr = MIN(count, ctx->avail_wr);
+ ctx->avail_wr -= nwr;
+ return nwr;
+}
+
+static ssize_t
+test_stream_writev(struct iostream *io, const struct iovec *iov, int iovcnt)
+{
+ assert(io->fd >= 0);
+ struct test_stream_ctx *ctx = io->ctx;
+ io->fd = ctx->wfd;
+ if (ctx->avail_wr == 0)
+ return IOSTREAM_WANT_WRITE;
+ ssize_t start_wr = ctx->avail_wr;
+ for (int i = 0; i < iovcnt && ctx->avail_wr > 0; i++) {
+ size_t nwr = MIN(iov[i].iov_len, ctx->avail_wr);
+ ctx->avail_wr -= nwr;
+ }
+ return start_wr - ctx->avail_wr;
+}
+
+static void
+test_stream_delete_ctx(void *ptr)
+{
+ struct test_stream_ctx *ctx = ptr;
+ close(ctx->rfds[0]);
+ close(ctx->rfds[1]);
+ close(ctx->wfds[0]);
+ close(ctx->wfds[1]);
+}
+
+static const struct iostream_vtab test_stream_vtab = {
+ test_stream_delete_ctx,
+ test_stream_read,
+ test_stream_write,
+ test_stream_writev,
+};
+
+static void
+fill_pipe(int fd)
+{
+ char buf[4096];
+ int rc = 0;
+ rc = fcntl(fd, F_SETFL, O_NONBLOCK);
+ assert(rc >= 0);
+ while (rc >= 0 || errno == EINTR)
+ rc = write(fd, buf, sizeof(buf));
+ assert(errno == EAGAIN || errno == EWOULDBLOCK);
+}
+
+static void
+test_stream_ctx_create(struct test_stream_ctx *ctx, size_t maxrd, size_t maxwr)
+{
+ ctx->avail_rd = maxrd;
+ ctx->avail_wr = maxwr;
+ int rc = pipe(ctx->rfds);
+ assert(rc == 0);
+ rc = pipe(ctx->wfds);
+ assert(rc == 0);
+ /* rfd is never readable, since we didn't write anything to the pipe. */
+ ctx->rfd = ctx->rfds[0];
+ ctx->wfd = ctx->wfds[1];
+ /* Make wfd never writeable by filling the pipe. */
+ fill_pipe(ctx->wfd);
+}
+
+void
+test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr)
+{
+ test_stream_ctx_create(&s->ctx, maxrd, maxwr);
+ s->io.ctx = &s->ctx;
+ s->io.vtab = &test_stream_vtab;
+ s->io.fd = -1; /* Initialized on demand. */
+}
+
+void
+test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr)
+{
+ struct test_stream_ctx *ctx = s->io.ctx;
+ ctx->avail_rd = maxrd;
+ ctx->avail_wr = maxwr;
+}
+
+void
+test_stream_destroy(struct test_stream *s)
+{
+ iostream_destroy(&s->io);
+}
diff --git a/test/unit/test_iostream.h b/test/unit/test_iostream.h
new file mode 100644
index 000000000..fd8b998aa
--- /dev/null
+++ b/test/unit/test_iostream.h
@@ -0,0 +1,69 @@
+/*
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
+ */
+
+#pragma once
+
+#include "core/iostream.h"
+/**
+ * A testing stream implementation. Doesn't write anything anywhere, but is
+ * useful for testing coio behaviour when waiting for stream to become readable
+ * or writeable.
+ */
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+struct test_stream_ctx {
+ /** How many bytes can be written before the stream would block. */
+ size_t avail_wr;
+ /** How many bytes can be read before the stream would block. */
+ size_t avail_rd;
+ /**
+ * A file descriptor which is never ready for reads. Points to rfds[0],
+ * the reading end of a pipe which is never written to.
+ */
+ int rfd;
+ /** A pipe used to simulate a fd never ready for reads. */
+ int rfds[2];
+ /**
+ * A file descriptor which is never ready for writes. Points to
+ * wfds[1], the writing end of a filled up pipe.
+ */
+ int wfd;
+ /** A pipe to simulate a fd never ready for writes. */
+ int wfds[2];
+};
+
+struct test_stream {
+ struct iostream io;
+ struct test_stream_ctx ctx;
+};
+
+/**
+ * Create an instance of a testing iostream.
+ * The stream will allow reading up to \a maxrd and writing up to \a maxwr
+ * bytes before blocking.
+ */
+void
+test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr);
+
+/**
+ * Reset the stream. Allow additional \a maxrd and \a maxwr bytes of input /
+ * output respectively.
+ */
+void
+test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr);
+
+/**
+ * Destroy the stream.
+ */
+void
+test_stream_destroy(struct test_stream *s);
+
+#if defined (__cplusplus)
+} /* extern "C" */
+#endif
--
2.30.1 (Apple Git-130)
More information about the Tarantool-patches
mailing list