[Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
Serge Petrenko
sergepetrenko at tarantool.org
Fri Nov 19 13:30:14 MSK 2021
18.11.2021 02:11, Vladislav Shpilevoy пишет:
>>> On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote:
>>>> coio_accept, coio_read, coio_write, coio_writev used to handle spurious
>>>> wakeups correctly: if the timeout hasn't passed yet, they would simply
>>>> retry reading (or writing) and fall asleep once again if no data is
>>>> ready.
>>>>
>>>> This behaviour changed in the following patches:
>>>> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
>>>> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
>>>> iostream wrapper for socket I/O").
>>>>
>>>> Now the functions timeout on the very first spurious wakeup.
>>>>
>>>> Fix this, add the appropriate unit tests and a test_iostream
>>>> implementation for the ease of testing.
>>> Don't we have the same problem with coio_connect_addr() (used in
>>> coio_connect_timeout())?
>> Not really. Neither coio_connect_addr() nor coio_connect_timeout() retry
>> the connection. So even the previous version would throw an error after
>> a spurious wakeup. Just the error would be different.
>> Before the change it would throw SocketError, not TimedOut, but I don't think
>> this matters much. Does it?
>>
>> By the change I mean (2db0741b) "coio: return fd from coio_connect".
> Ok, then never mind. I will wait for fixes of Vova's comments regarding the
> tests, but the bugfix itself LGTM.
Thanks for your answer!
Here's the incremental diff after the fixes.
Long story short, I've deleted the test_iostream wrapper.
There wasn't much use for it.
I've force-pushed the new version on the branch.
===============================
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 71c8ba981..d8b1aac0d 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -121,7 +121,7 @@ if (GLIBC_USED)
target_link_libraries(cbus_hang.test core unit stat)
endif ()
-add_executable(coio.test coio.cc core_test_utils.c test_iostream.c)
+add_executable(coio.test coio.cc core_test_utils.c)
target_link_libraries(coio.test core eio bit uri unit)
if (ENABLE_BUNDLED_MSGPUCK)
diff --git a/test/unit/coio.cc b/test/unit/coio.cc
index 194503fda..d55bc2347 100644
--- a/test/unit/coio.cc
+++ b/test/unit/coio.cc
@@ -4,7 +4,11 @@
#include "coio_task.h"
#include "fio.h"
#include "unit.h"
-#include "test_iostream.h"
+#include "iostream.h"
+
+#include <fcntl.h>
+#include <sys/uio.h>
+#include <sys/errno.h>
int
touch_f(va_list ap)
@@ -137,7 +141,7 @@ static int
test_read_f(va_list ap)
{
struct iostream *io = va_arg(ap, struct iostream *);
- char buf[5];
+ char buf[1024];
int rc = coio_read(io, buf, sizeof(buf));
if (rc < (ssize_t)sizeof(buf))
return -1;
@@ -148,9 +152,9 @@ static int
test_write_f(va_list ap)
{
struct iostream *io = va_arg(ap, struct iostream *);
- const char str[] = "test";
- int rc = coio_write_timeout(io, str, sizeof(str), TIMEOUT_INFINITY);
- if (rc < (ssize_t)sizeof(str))
+ char buf[1024] = "";
+ int rc = coio_write_timeout(io, buf, sizeof(buf), TIMEOUT_INFINITY);
+ if (rc < (ssize_t)sizeof(buf))
return -1;
return 0;
}
@@ -159,14 +163,45 @@ static int
test_writev_f(va_list ap)
{
struct iostream *io = va_arg(ap, struct iostream *);
- const char str[] = "test";
- struct iovec iov = {(void *)str, sizeof(str)};
+ char buf[1024] = "";
+ struct iovec iov = {(void *)buf, sizeof(buf)};
int rc = coio_writev(io, &iov, 1, 0);
- if (rc < (ssize_t)sizeof(str))
+ if (rc < (ssize_t)sizeof(buf))
return -1;
return 0;
}
+static void
+fill_pipe(int fd)
+{
+ char buf[1024] = "";
+ int rc = 0;
+ while (rc >= 0 || errno == EINTR)
+ rc = write(fd, buf, sizeof(buf));
+ fail_unless(errno == EAGAIN || errno == EWOULDBLOCK);
+}
+
+static void
+empty_pipe(int fd)
+{
+ char buf[1024];
+ int rc = 0;
+ while (rc >= 0 || errno == EINTR)
+ rc = read(fd, buf, sizeof(buf));
+ fail_unless(errno == EAGAIN || errno == EWOULDBLOCK);
+}
+
+static void
+create_pipe(int fds[2])
+{
+ int rc = pipe(fds);
+ fail_unless(rc >= 0);
+ rc = fcntl(fds[0], F_SETFL, O_NONBLOCK);
+ fail_unless(rc >= 0);
+ rc = fcntl(fds[1], F_SETFL, O_NONBLOCK);
+ fail_unless(rc >= 0);
+}
+
static void
read_write_test(void)
{
@@ -186,25 +221,36 @@ read_write_test(void)
int num_tests = sizeof(test_funcs) / sizeof(test_funcs[0]);
plan(2 * num_tests);
- struct test_stream s;
- test_stream_create(&s, 0, 0);
+ int fds[2];
+ create_pipe(fds);
for (int i = 0; i < num_tests; i++) {
- test_stream_reset(&s, 0, 0);
+ struct iostream io;
+ if (i == 0) {
+ /* A non-readable fd, since the pipe is empty. */
+ iostream_create(&io, fds[0]);
+ } else {
+ iostream_create(&io, fds[1]);
+ /* Make the fd non-writable. */
+ fill_pipe(fds[1]);
+ }
struct fiber *f = fiber_new_xc("rw_test", test_funcs[i]);
fiber_set_joinable(f, true);
- fiber_start(f, &s.io);
+ fiber_start(f, &io);
fiber_wakeup(f);
fiber_sleep(0);
ok(!fiber_is_dead(f), "coio_%s handle spurious wakeup",
descr[i]);
- test_stream_reset(&s, INT_MAX, INT_MAX);
- fiber_wakeup(f);
+ if (i == 0)
+ fill_pipe(fds[1]);
+ else
+ empty_pipe(fds[0]);
int rc = fiber_join(f);
ok(rc == 0, "coio_%s success after a spurious wakeup",
descr[i]);
+ iostream_destroy(&io);
}
- test_stream_destroy(&s);
-
+ close(fds[0]);
+ close(fds[1]);
check_plan();
footer();
}
diff --git a/test/unit/test_iostream.c b/test/unit/test_iostream.c
deleted file mode 100644
index c50b63944..000000000
--- a/test/unit/test_iostream.c
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * SPDX-License-Identifier: BSD-2-Clause
- *
- * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
- */
-
-#include <sys/uio.h>
-#include <fcntl.h>
-#include <sys/errno.h>
-
-#include "test_iostream.h"
-#include "trivia/util.h"
-
-static ssize_t
-test_stream_read(struct iostream *io, void *buf, size_t count)
-{
- struct test_stream_ctx *ctx = io->ctx;
- io->fd = ctx->rfd;
- (void)buf;
- ssize_t nrd = 0;
- if (ctx->avail_rd == 0)
- return IOSTREAM_WANT_READ;
- nrd = MIN(count, ctx->avail_rd);
- ctx->avail_rd -= nrd;
- return nrd;
-}
-
-static ssize_t
-test_stream_write(struct iostream *io, const void *buf, size_t count)
-{
- assert(io->fd >= 0);
- struct test_stream_ctx *ctx = io->ctx;
- io->fd = ctx->wfd;
- (void)buf;
- ssize_t nwr = 0;
- if (ctx->avail_wr == 0)
- return IOSTREAM_WANT_WRITE;
- nwr = MIN(count, ctx->avail_wr);
- ctx->avail_wr -= nwr;
- return nwr;
-}
-
-static ssize_t
-test_stream_writev(struct iostream *io, const struct iovec *iov, int
iovcnt)
-{
- assert(io->fd >= 0);
- struct test_stream_ctx *ctx = io->ctx;
- io->fd = ctx->wfd;
- if (ctx->avail_wr == 0)
- return IOSTREAM_WANT_WRITE;
- ssize_t start_wr = ctx->avail_wr;
- for (int i = 0; i < iovcnt && ctx->avail_wr > 0; i++) {
- size_t nwr = MIN(iov[i].iov_len, ctx->avail_wr);
- ctx->avail_wr -= nwr;
- }
- return start_wr - ctx->avail_wr;
-}
-
-static void
-test_stream_delete_ctx(void *ptr)
-{
- struct test_stream_ctx *ctx = ptr;
- close(ctx->rfds[0]);
- close(ctx->rfds[1]);
- close(ctx->wfds[0]);
- close(ctx->wfds[1]);
-}
-
-static const struct iostream_vtab test_stream_vtab = {
- test_stream_delete_ctx,
- test_stream_read,
- test_stream_write,
- test_stream_writev,
-};
-
-static void
-fill_pipe(int fd)
-{
- char buf[4096];
- int rc = 0;
- rc = fcntl(fd, F_SETFL, O_NONBLOCK);
- assert(rc >= 0);
- while (rc >= 0 || errno == EINTR)
- rc = write(fd, buf, sizeof(buf));
- assert(errno == EAGAIN || errno == EWOULDBLOCK);
-}
-
-static void
-test_stream_ctx_create(struct test_stream_ctx *ctx, size_t maxrd,
size_t maxwr)
-{
- ctx->avail_rd = maxrd;
- ctx->avail_wr = maxwr;
- int rc = pipe(ctx->rfds);
- assert(rc == 0);
- rc = pipe(ctx->wfds);
- assert(rc == 0);
- /* rfd is never readable, since we didn't write anything to the
pipe. */
- ctx->rfd = ctx->rfds[0];
- ctx->wfd = ctx->wfds[1];
- /* Make wfd never writeable by filling the pipe. */
- fill_pipe(ctx->wfd);
-}
-
-void
-test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr)
-{
- test_stream_ctx_create(&s->ctx, maxrd, maxwr);
- s->io.ctx = &s->ctx;
- s->io.vtab = &test_stream_vtab;
- s->io.fd = -1; /* Initialized on demand. */
-}
-
-void
-test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr)
-{
- struct test_stream_ctx *ctx = s->io.ctx;
- ctx->avail_rd = maxrd;
- ctx->avail_wr = maxwr;
-}
-
-void
-test_stream_destroy(struct test_stream *s)
-{
- iostream_destroy(&s->io);
-}
diff --git a/test/unit/test_iostream.h b/test/unit/test_iostream.h
deleted file mode 100644
index fd8b998aa..000000000
--- a/test/unit/test_iostream.h
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * SPDX-License-Identifier: BSD-2-Clause
- *
- * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
- */
-
-#pragma once
-
-#include "core/iostream.h"
-/**
- * A testing stream implementation. Doesn't write anything anywhere, but is
- * useful for testing coio behaviour when waiting for stream to become
readable
- * or writeable.
- */
-
-#if defined (__cplusplus)
-extern "C" {
-#endif
-
-struct test_stream_ctx {
- /** How many bytes can be written before the stream would block. */
- size_t avail_wr;
- /** How many bytes can be read before the stream would block. */
- size_t avail_rd;
- /**
- * A file descriptor which is never ready for reads. Points to rfds[0],
- * the reading end of a pipe which is never written to.
- */
- int rfd;
- /** A pipe used to simulate a fd never ready for reads. */
- int rfds[2];
- /**
- * A file descriptor which is never ready for writes. Points to
- * wfds[1], the writing end of a filled up pipe.
- */
- int wfd;
- /** A pipe to simulate a fd never ready for writes. */
- int wfds[2];
-};
-
-struct test_stream {
- struct iostream io;
- struct test_stream_ctx ctx;
-};
-
-/**
- * Create an instance of a testing iostream.
- * The stream will allow reading up to \a maxrd and writing up to \a maxwr
- * bytes before blocking.
- */
-void
-test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr);
-
-/**
- * Reset the stream. Allow additional \a maxrd and \a maxwr bytes of
input /
- * output respectively.
- */
-void
-test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr);
-
-/**
- * Destroy the stream.
- */
-void
-test_stream_destroy(struct test_stream *s);
-
-#if defined (__cplusplus)
-} /* extern "C" */
-#endif
===============================
--
Serge Petrenko
More information about the Tarantool-patches
mailing list