coio_accept, coio_read, coio_write, coio_writev used to handle spurious wakeups correctly: if the timeout hasn't passed yet, they would simply retry reading (or writing) and fall asleep once again if no data is ready. This behaviour changed in the following patches: 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce iostream wrapper for socket I/O"). Now the functions timeout on the very first spurious wakeup. Fix this, add the appropriate unit tests and a test_iostream implementation for the ease of testing. --- No ticket Branch: https://github.com/tarantool/tarantool/tree/sp/coio-rw-fixup src/lib/core/coio.c | 43 ++++++------- test/unit/CMakeLists.txt | 2 +- test/unit/coio.cc | 80 +++++++++++++++++++++++- test/unit/coio.result | 9 +++ test/unit/test_iostream.c | 125 ++++++++++++++++++++++++++++++++++++++ test/unit/test_iostream.h | 69 +++++++++++++++++++++ 6 files changed, 303 insertions(+), 25 deletions(-) create mode 100644 test/unit/test_iostream.c create mode 100644 test/unit/test_iostream.h diff --git a/src/lib/core/coio.c b/src/lib/core/coio.c index cc1cfa776..4c61c13ca 100644 --- a/src/lib/core/coio.c +++ b/src/lib/core/coio.c @@ -234,19 +234,19 @@ coio_accept(int sfd, struct sockaddr *addr, socklen_t addrlen, } if (!sio_wouldblock(errno)) return -1; + if (delay <= 0) { + diag_set(TimedOut); + return -1; + } /* * Yield control to other fibers until the * timeout is reached. */ - int revents = coio_wait(sfd, EV_READ, delay); + coio_wait(sfd, EV_READ, delay); if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); return -1; } - if (revents == 0) { - diag_set(TimedOut); - return -1; - } coio_timeout_update(&start, &delay); } } @@ -287,20 +287,19 @@ coio_read_ahead_timeout(struct iostream *io, void *buf, size_t sz, } else if (nrd == IOSTREAM_ERROR) { return -1; } + if (delay <= 0) { + diag_set(TimedOut); + return -1; + } /* * Yield control to other fibers until the * timeout is being reached. */ - int revents = coio_wait(io->fd, iostream_status_to_events(nrd), - delay); + coio_wait(io->fd, iostream_status_to_events(nrd), delay); if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); return -1; } - if (revents == 0) { - diag_set(TimedOut); - return -1; - } coio_timeout_update(&start, &delay); } } @@ -383,21 +382,20 @@ coio_write_timeout(struct iostream *io, const void *buf, size_t sz, } else if (nwr == IOSTREAM_ERROR) { return -1; } + if (delay <= 0) { + diag_set(TimedOut); + return -1; + } /* * Yield control to other fibers until the * timeout is reached or the socket is * ready. */ - int revents = coio_wait(io->fd, iostream_status_to_events(nwr), - delay); + coio_wait(io->fd, iostream_status_to_events(nwr), delay); if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); return -1; } - if (revents == 0) { - diag_set(TimedOut); - return -1; - } coio_timeout_update(&start, &delay); } } @@ -447,21 +445,20 @@ coio_writev_timeout(struct iostream *io, struct iovec *iov, int iovcnt, } else if (nwr == IOSTREAM_ERROR) { return -1; } + if (delay <= 0) { + diag_set(TimedOut); + return -1; + } /* * Yield control to other fibers until the * timeout is reached or the socket is * ready. */ - int revents = coio_wait(io->fd, iostream_status_to_events(nwr), - delay); + coio_wait(io->fd, iostream_status_to_events(nwr), delay); if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); return -1; } - if (revents == 0) { - diag_set(TimedOut); - return -1; - } coio_timeout_update(&start, &delay); } return total; diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 79b0402c7..9a3423b14 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -119,7 +119,7 @@ if (GLIBC_USED) target_link_libraries(cbus_hang.test core unit stat) endif () -add_executable(coio.test coio.cc core_test_utils.c) +add_executable(coio.test coio.cc core_test_utils.c test_iostream.c) target_link_libraries(coio.test core eio bit uri unit) if (ENABLE_BUNDLED_MSGPUCK) diff --git a/test/unit/coio.cc b/test/unit/coio.cc index 9d107a22e..b3d8068b7 100644 --- a/test/unit/coio.cc +++ b/test/unit/coio.cc @@ -4,7 +4,7 @@ #include "coio_task.h" #include "fio.h" #include "unit.h" -#include "unit.h" +#include "test_iostream.h" int touch_f(va_list ap) @@ -115,6 +115,82 @@ test_getaddrinfo(void) footer(); } +static int +test_read_f(va_list ap) +{ + struct iostream *io = va_arg(ap, struct iostream *); + char buf[5]; + int rc = coio_read(io, buf, sizeof(buf)); + if (rc < (ssize_t)sizeof(buf)) + return -1; + return 0; +} + +static int +test_write_f(va_list ap) +{ + struct iostream *io = va_arg(ap, struct iostream *); + const char str[] = "test"; + int rc = coio_write_timeout(io, str, sizeof(str), TIMEOUT_INFINITY); + if (rc < (ssize_t)sizeof(str)) + return -1; + return 0; +} + +static int +test_writev_f(va_list ap) +{ + struct iostream *io = va_arg(ap, struct iostream *); + const char str[] = "test"; + struct iovec iov = {(void *)str, sizeof(str)}; + int rc = coio_writev(io, &iov, 1, 0); + if (rc < (ssize_t)sizeof(str)) + return -1; + return 0; +} + +static void +read_write_test(void) +{ + header(); + + fiber_func test_funcs[] = { + test_read_f, + test_write_f, + test_writev_f, + }; + const char *descr[] = { + "read", + "write", + "writev", + }; + + int num_tests = sizeof(test_funcs) / sizeof(test_funcs[0]); + plan(2 * num_tests); + + struct test_stream s; + test_stream_create(&s, 0, 0); + for (int i = 0; i < num_tests; i++) { + test_stream_reset(&s, 0, 0); + struct fiber *f = fiber_new_xc("rw_test", test_funcs[i]); + fiber_set_joinable(f, true); + fiber_start(f, &s.io); + fiber_wakeup(f); + fiber_sleep(0); + ok(!fiber_is_dead(f), "coio_%s handle spurious wakeup", + descr[i]); + test_stream_reset(&s, INT_MAX, INT_MAX); + fiber_wakeup(f); + int rc = fiber_join(f); + ok(rc == 0, "coio_%s success after a spurious wakeup", + descr[i]); + } + test_stream_destroy(&s); + + check_plan(); + footer(); +} + static int main_f(va_list ap) { @@ -135,6 +211,8 @@ main_f(va_list ap) test_getaddrinfo(); + read_write_test(); + ev_break(loop(), EVBREAK_ALL); return 0; } diff --git a/test/unit/coio.result b/test/unit/coio.result index 90b567140..368914bcd 100644 --- a/test/unit/coio.result +++ b/test/unit/coio.result @@ -12,3 +12,12 @@ ok 1 - getaddrinfo ok 2 - getaddrinfo retval ok 3 - getaddrinfo error message *** test_getaddrinfo: done *** + *** read_write_test *** +1..6 +ok 1 - coio_read handle spurious wakeup +ok 2 - coio_read success after a spurious wakeup +ok 3 - coio_write handle spurious wakeup +ok 4 - coio_write success after a spurious wakeup +ok 5 - coio_writev handle spurious wakeup +ok 6 - coio_writev success after a spurious wakeup + *** read_write_test: done *** diff --git a/test/unit/test_iostream.c b/test/unit/test_iostream.c new file mode 100644 index 000000000..c50b63944 --- /dev/null +++ b/test/unit/test_iostream.c @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause + * + * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file. + */ + +#include <sys/uio.h> +#include <fcntl.h> +#include <sys/errno.h> + +#include "test_iostream.h" +#include "trivia/util.h" + +static ssize_t +test_stream_read(struct iostream *io, void *buf, size_t count) +{ + struct test_stream_ctx *ctx = io->ctx; + io->fd = ctx->rfd; + (void)buf; + ssize_t nrd = 0; + if (ctx->avail_rd == 0) + return IOSTREAM_WANT_READ; + nrd = MIN(count, ctx->avail_rd); + ctx->avail_rd -= nrd; + return nrd; +} + +static ssize_t +test_stream_write(struct iostream *io, const void *buf, size_t count) +{ + assert(io->fd >= 0); + struct test_stream_ctx *ctx = io->ctx; + io->fd = ctx->wfd; + (void)buf; + ssize_t nwr = 0; + if (ctx->avail_wr == 0) + return IOSTREAM_WANT_WRITE; + nwr = MIN(count, ctx->avail_wr); + ctx->avail_wr -= nwr; + return nwr; +} + +static ssize_t +test_stream_writev(struct iostream *io, const struct iovec *iov, int iovcnt) +{ + assert(io->fd >= 0); + struct test_stream_ctx *ctx = io->ctx; + io->fd = ctx->wfd; + if (ctx->avail_wr == 0) + return IOSTREAM_WANT_WRITE; + ssize_t start_wr = ctx->avail_wr; + for (int i = 0; i < iovcnt && ctx->avail_wr > 0; i++) { + size_t nwr = MIN(iov[i].iov_len, ctx->avail_wr); + ctx->avail_wr -= nwr; + } + return start_wr - ctx->avail_wr; +} + +static void +test_stream_delete_ctx(void *ptr) +{ + struct test_stream_ctx *ctx = ptr; + close(ctx->rfds[0]); + close(ctx->rfds[1]); + close(ctx->wfds[0]); + close(ctx->wfds[1]); +} + +static const struct iostream_vtab test_stream_vtab = { + test_stream_delete_ctx, + test_stream_read, + test_stream_write, + test_stream_writev, +}; + +static void +fill_pipe(int fd) +{ + char buf[4096]; + int rc = 0; + rc = fcntl(fd, F_SETFL, O_NONBLOCK); + assert(rc >= 0); + while (rc >= 0 || errno == EINTR) + rc = write(fd, buf, sizeof(buf)); + assert(errno == EAGAIN || errno == EWOULDBLOCK); +} + +static void +test_stream_ctx_create(struct test_stream_ctx *ctx, size_t maxrd, size_t maxwr) +{ + ctx->avail_rd = maxrd; + ctx->avail_wr = maxwr; + int rc = pipe(ctx->rfds); + assert(rc == 0); + rc = pipe(ctx->wfds); + assert(rc == 0); + /* rfd is never readable, since we didn't write anything to the pipe. */ + ctx->rfd = ctx->rfds[0]; + ctx->wfd = ctx->wfds[1]; + /* Make wfd never writeable by filling the pipe. */ + fill_pipe(ctx->wfd); +} + +void +test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr) +{ + test_stream_ctx_create(&s->ctx, maxrd, maxwr); + s->io.ctx = &s->ctx; + s->io.vtab = &test_stream_vtab; + s->io.fd = -1; /* Initialized on demand. */ +} + +void +test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr) +{ + struct test_stream_ctx *ctx = s->io.ctx; + ctx->avail_rd = maxrd; + ctx->avail_wr = maxwr; +} + +void +test_stream_destroy(struct test_stream *s) +{ + iostream_destroy(&s->io); +} diff --git a/test/unit/test_iostream.h b/test/unit/test_iostream.h new file mode 100644 index 000000000..fd8b998aa --- /dev/null +++ b/test/unit/test_iostream.h @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause + * + * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file. + */ + +#pragma once + +#include "core/iostream.h" +/** + * A testing stream implementation. Doesn't write anything anywhere, but is + * useful for testing coio behaviour when waiting for stream to become readable + * or writeable. + */ + +#if defined (__cplusplus) +extern "C" { +#endif + +struct test_stream_ctx { + /** How many bytes can be written before the stream would block. */ + size_t avail_wr; + /** How many bytes can be read before the stream would block. */ + size_t avail_rd; + /** + * A file descriptor which is never ready for reads. Points to rfds[0], + * the reading end of a pipe which is never written to. + */ + int rfd; + /** A pipe used to simulate a fd never ready for reads. */ + int rfds[2]; + /** + * A file descriptor which is never ready for writes. Points to + * wfds[1], the writing end of a filled up pipe. + */ + int wfd; + /** A pipe to simulate a fd never ready for writes. */ + int wfds[2]; +}; + +struct test_stream { + struct iostream io; + struct test_stream_ctx ctx; +}; + +/** + * Create an instance of a testing iostream. + * The stream will allow reading up to \a maxrd and writing up to \a maxwr + * bytes before blocking. + */ +void +test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr); + +/** + * Reset the stream. Allow additional \a maxrd and \a maxwr bytes of input / + * output respectively. + */ +void +test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr); + +/** + * Destroy the stream. + */ +void +test_stream_destroy(struct test_stream *s); + +#if defined (__cplusplus) +} /* extern "C" */ +#endif -- 2.30.1 (Apple Git-130)
Could you please create a pull request?
16.11.2021 14:29, Vladimir Davydov пишет: > Could you please create a pull request? Ok, sure. https://github.com/tarantool/tarantool/pull/6612 -- Serge Petrenko
Hi! Thanks for the patch!
On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote:
> coio_accept, coio_read, coio_write, coio_writev used to handle spurious
> wakeups correctly: if the timeout hasn't passed yet, they would simply
> retry reading (or writing) and fall asleep once again if no data is
> ready.
>
> This behaviour changed in the following patches:
> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
> iostream wrapper for socket I/O").
>
> Now the functions timeout on the very first spurious wakeup.
>
> Fix this, add the appropriate unit tests and a test_iostream
> implementation for the ease of testing.
Don't we have the same problem with coio_connect_addr() (used in
coio_connect_timeout())?
17.11.2021 01:49, Vladislav Shpilevoy пишет: > Hi! Thanks for the patch! Thanks for your answer! > > On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote: >> coio_accept, coio_read, coio_write, coio_writev used to handle spurious >> wakeups correctly: if the timeout hasn't passed yet, they would simply >> retry reading (or writing) and fall asleep once again if no data is >> ready. >> >> This behaviour changed in the following patches: >> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to >> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce >> iostream wrapper for socket I/O"). >> >> Now the functions timeout on the very first spurious wakeup. >> >> Fix this, add the appropriate unit tests and a test_iostream >> implementation for the ease of testing. > Don't we have the same problem with coio_connect_addr() (used in > coio_connect_timeout())? Not really. Neither coio_connect_addr() nor coio_connect_timeout() retry the connection. So even the previous version would throw an error after a spurious wakeup. Just the error would be different. Before the change it would throw SocketError, not TimedOut, but I don't think this matters much. Does it? By the change I mean (2db0741b) "coio: return fd from coio_connect". -- Serge Petrenko
>> On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote:
>>> coio_accept, coio_read, coio_write, coio_writev used to handle spurious
>>> wakeups correctly: if the timeout hasn't passed yet, they would simply
>>> retry reading (or writing) and fall asleep once again if no data is
>>> ready.
>>>
>>> This behaviour changed in the following patches:
>>> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
>>> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
>>> iostream wrapper for socket I/O").
>>>
>>> Now the functions timeout on the very first spurious wakeup.
>>>
>>> Fix this, add the appropriate unit tests and a test_iostream
>>> implementation for the ease of testing.
>> Don't we have the same problem with coio_connect_addr() (used in
>> coio_connect_timeout())?
> Not really. Neither coio_connect_addr() nor coio_connect_timeout() retry
> the connection. So even the previous version would throw an error after
> a spurious wakeup. Just the error would be different.
> Before the change it would throw SocketError, not TimedOut, but I don't think
> this matters much. Does it?
>
> By the change I mean (2db0741b) "coio: return fd from coio_connect".
Ok, then never mind. I will wait for fixes of Vova's comments regarding the
tests, but the bugfix itself LGTM.
18.11.2021 02:11, Vladislav Shpilevoy пишет: >>> On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote: >>>> coio_accept, coio_read, coio_write, coio_writev used to handle spurious >>>> wakeups correctly: if the timeout hasn't passed yet, they would simply >>>> retry reading (or writing) and fall asleep once again if no data is >>>> ready. >>>> >>>> This behaviour changed in the following patches: >>>> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to >>>> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce >>>> iostream wrapper for socket I/O"). >>>> >>>> Now the functions timeout on the very first spurious wakeup. >>>> >>>> Fix this, add the appropriate unit tests and a test_iostream >>>> implementation for the ease of testing. >>> Don't we have the same problem with coio_connect_addr() (used in >>> coio_connect_timeout())? >> Not really. Neither coio_connect_addr() nor coio_connect_timeout() retry >> the connection. So even the previous version would throw an error after >> a spurious wakeup. Just the error would be different. >> Before the change it would throw SocketError, not TimedOut, but I don't think >> this matters much. Does it? >> >> By the change I mean (2db0741b) "coio: return fd from coio_connect". > Ok, then never mind. I will wait for fixes of Vova's comments regarding the > tests, but the bugfix itself LGTM. Thanks for your answer! Here's the incremental diff after the fixes. Long story short, I've deleted the test_iostream wrapper. There wasn't much use for it. I've force-pushed the new version on the branch. =============================== diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 71c8ba981..d8b1aac0d 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -121,7 +121,7 @@ if (GLIBC_USED) target_link_libraries(cbus_hang.test core unit stat) endif () -add_executable(coio.test coio.cc core_test_utils.c test_iostream.c) +add_executable(coio.test coio.cc core_test_utils.c) target_link_libraries(coio.test core eio bit uri unit) if (ENABLE_BUNDLED_MSGPUCK) diff --git a/test/unit/coio.cc b/test/unit/coio.cc index 194503fda..d55bc2347 100644 --- a/test/unit/coio.cc +++ b/test/unit/coio.cc @@ -4,7 +4,11 @@ #include "coio_task.h" #include "fio.h" #include "unit.h" -#include "test_iostream.h" +#include "iostream.h" + +#include <fcntl.h> +#include <sys/uio.h> +#include <sys/errno.h> int touch_f(va_list ap) @@ -137,7 +141,7 @@ static int test_read_f(va_list ap) { struct iostream *io = va_arg(ap, struct iostream *); - char buf[5]; + char buf[1024]; int rc = coio_read(io, buf, sizeof(buf)); if (rc < (ssize_t)sizeof(buf)) return -1; @@ -148,9 +152,9 @@ static int test_write_f(va_list ap) { struct iostream *io = va_arg(ap, struct iostream *); - const char str[] = "test"; - int rc = coio_write_timeout(io, str, sizeof(str), TIMEOUT_INFINITY); - if (rc < (ssize_t)sizeof(str)) + char buf[1024] = ""; + int rc = coio_write_timeout(io, buf, sizeof(buf), TIMEOUT_INFINITY); + if (rc < (ssize_t)sizeof(buf)) return -1; return 0; } @@ -159,14 +163,45 @@ static int test_writev_f(va_list ap) { struct iostream *io = va_arg(ap, struct iostream *); - const char str[] = "test"; - struct iovec iov = {(void *)str, sizeof(str)}; + char buf[1024] = ""; + struct iovec iov = {(void *)buf, sizeof(buf)}; int rc = coio_writev(io, &iov, 1, 0); - if (rc < (ssize_t)sizeof(str)) + if (rc < (ssize_t)sizeof(buf)) return -1; return 0; } +static void +fill_pipe(int fd) +{ + char buf[1024] = ""; + int rc = 0; + while (rc >= 0 || errno == EINTR) + rc = write(fd, buf, sizeof(buf)); + fail_unless(errno == EAGAIN || errno == EWOULDBLOCK); +} + +static void +empty_pipe(int fd) +{ + char buf[1024]; + int rc = 0; + while (rc >= 0 || errno == EINTR) + rc = read(fd, buf, sizeof(buf)); + fail_unless(errno == EAGAIN || errno == EWOULDBLOCK); +} + +static void +create_pipe(int fds[2]) +{ + int rc = pipe(fds); + fail_unless(rc >= 0); + rc = fcntl(fds[0], F_SETFL, O_NONBLOCK); + fail_unless(rc >= 0); + rc = fcntl(fds[1], F_SETFL, O_NONBLOCK); + fail_unless(rc >= 0); +} + static void read_write_test(void) { @@ -186,25 +221,36 @@ read_write_test(void) int num_tests = sizeof(test_funcs) / sizeof(test_funcs[0]); plan(2 * num_tests); - struct test_stream s; - test_stream_create(&s, 0, 0); + int fds[2]; + create_pipe(fds); for (int i = 0; i < num_tests; i++) { - test_stream_reset(&s, 0, 0); + struct iostream io; + if (i == 0) { + /* A non-readable fd, since the pipe is empty. */ + iostream_create(&io, fds[0]); + } else { + iostream_create(&io, fds[1]); + /* Make the fd non-writable. */ + fill_pipe(fds[1]); + } struct fiber *f = fiber_new_xc("rw_test", test_funcs[i]); fiber_set_joinable(f, true); - fiber_start(f, &s.io); + fiber_start(f, &io); fiber_wakeup(f); fiber_sleep(0); ok(!fiber_is_dead(f), "coio_%s handle spurious wakeup", descr[i]); - test_stream_reset(&s, INT_MAX, INT_MAX); - fiber_wakeup(f); + if (i == 0) + fill_pipe(fds[1]); + else + empty_pipe(fds[0]); int rc = fiber_join(f); ok(rc == 0, "coio_%s success after a spurious wakeup", descr[i]); + iostream_destroy(&io); } - test_stream_destroy(&s); - + close(fds[0]); + close(fds[1]); check_plan(); footer(); } diff --git a/test/unit/test_iostream.c b/test/unit/test_iostream.c deleted file mode 100644 index c50b63944..000000000 --- a/test/unit/test_iostream.c +++ /dev/null @@ -1,125 +0,0 @@ -/* - * SPDX-License-Identifier: BSD-2-Clause - * - * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file. - */ - -#include <sys/uio.h> -#include <fcntl.h> -#include <sys/errno.h> - -#include "test_iostream.h" -#include "trivia/util.h" - -static ssize_t -test_stream_read(struct iostream *io, void *buf, size_t count) -{ - struct test_stream_ctx *ctx = io->ctx; - io->fd = ctx->rfd; - (void)buf; - ssize_t nrd = 0; - if (ctx->avail_rd == 0) - return IOSTREAM_WANT_READ; - nrd = MIN(count, ctx->avail_rd); - ctx->avail_rd -= nrd; - return nrd; -} - -static ssize_t -test_stream_write(struct iostream *io, const void *buf, size_t count) -{ - assert(io->fd >= 0); - struct test_stream_ctx *ctx = io->ctx; - io->fd = ctx->wfd; - (void)buf; - ssize_t nwr = 0; - if (ctx->avail_wr == 0) - return IOSTREAM_WANT_WRITE; - nwr = MIN(count, ctx->avail_wr); - ctx->avail_wr -= nwr; - return nwr; -} - -static ssize_t -test_stream_writev(struct iostream *io, const struct iovec *iov, int iovcnt) -{ - assert(io->fd >= 0); - struct test_stream_ctx *ctx = io->ctx; - io->fd = ctx->wfd; - if (ctx->avail_wr == 0) - return IOSTREAM_WANT_WRITE; - ssize_t start_wr = ctx->avail_wr; - for (int i = 0; i < iovcnt && ctx->avail_wr > 0; i++) { - size_t nwr = MIN(iov[i].iov_len, ctx->avail_wr); - ctx->avail_wr -= nwr; - } - return start_wr - ctx->avail_wr; -} - -static void -test_stream_delete_ctx(void *ptr) -{ - struct test_stream_ctx *ctx = ptr; - close(ctx->rfds[0]); - close(ctx->rfds[1]); - close(ctx->wfds[0]); - close(ctx->wfds[1]); -} - -static const struct iostream_vtab test_stream_vtab = { - test_stream_delete_ctx, - test_stream_read, - test_stream_write, - test_stream_writev, -}; - -static void -fill_pipe(int fd) -{ - char buf[4096]; - int rc = 0; - rc = fcntl(fd, F_SETFL, O_NONBLOCK); - assert(rc >= 0); - while (rc >= 0 || errno == EINTR) - rc = write(fd, buf, sizeof(buf)); - assert(errno == EAGAIN || errno == EWOULDBLOCK); -} - -static void -test_stream_ctx_create(struct test_stream_ctx *ctx, size_t maxrd, size_t maxwr) -{ - ctx->avail_rd = maxrd; - ctx->avail_wr = maxwr; - int rc = pipe(ctx->rfds); - assert(rc == 0); - rc = pipe(ctx->wfds); - assert(rc == 0); - /* rfd is never readable, since we didn't write anything to the pipe. */ - ctx->rfd = ctx->rfds[0]; - ctx->wfd = ctx->wfds[1]; - /* Make wfd never writeable by filling the pipe. */ - fill_pipe(ctx->wfd); -} - -void -test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr) -{ - test_stream_ctx_create(&s->ctx, maxrd, maxwr); - s->io.ctx = &s->ctx; - s->io.vtab = &test_stream_vtab; - s->io.fd = -1; /* Initialized on demand. */ -} - -void -test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr) -{ - struct test_stream_ctx *ctx = s->io.ctx; - ctx->avail_rd = maxrd; - ctx->avail_wr = maxwr; -} - -void -test_stream_destroy(struct test_stream *s) -{ - iostream_destroy(&s->io); -} diff --git a/test/unit/test_iostream.h b/test/unit/test_iostream.h deleted file mode 100644 index fd8b998aa..000000000 --- a/test/unit/test_iostream.h +++ /dev/null @@ -1,69 +0,0 @@ -/* - * SPDX-License-Identifier: BSD-2-Clause - * - * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file. - */ - -#pragma once - -#include "core/iostream.h" -/** - * A testing stream implementation. Doesn't write anything anywhere, but is - * useful for testing coio behaviour when waiting for stream to become readable - * or writeable. - */ - -#if defined (__cplusplus) -extern "C" { -#endif - -struct test_stream_ctx { - /** How many bytes can be written before the stream would block. */ - size_t avail_wr; - /** How many bytes can be read before the stream would block. */ - size_t avail_rd; - /** - * A file descriptor which is never ready for reads. Points to rfds[0], - * the reading end of a pipe which is never written to. - */ - int rfd; - /** A pipe used to simulate a fd never ready for reads. */ - int rfds[2]; - /** - * A file descriptor which is never ready for writes. Points to - * wfds[1], the writing end of a filled up pipe. - */ - int wfd; - /** A pipe to simulate a fd never ready for writes. */ - int wfds[2]; -}; - -struct test_stream { - struct iostream io; - struct test_stream_ctx ctx; -}; - -/** - * Create an instance of a testing iostream. - * The stream will allow reading up to \a maxrd and writing up to \a maxwr - * bytes before blocking. - */ -void -test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr); - -/** - * Reset the stream. Allow additional \a maxrd and \a maxwr bytes of input / - * output respectively. - */ -void -test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr); - -/** - * Destroy the stream. - */ -void -test_stream_destroy(struct test_stream *s); - -#if defined (__cplusplus) -} /* extern "C" */ -#endif =============================== -- Serge Petrenko
Hi! Thanks for the fixes! In the commit message you still mention test_iostream. After you remove it - LGTM.
21.11.2021 17:24, Vladislav Shpilevoy пишет:
> Hi! Thanks for the fixes!
>
> In the commit message you still mention test_iostream.
>
> After you remove it - LGTM.
My bad. Fixed.
--
Serge Petrenko