Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
@ 2021-11-16 10:17 Serge Petrenko via Tarantool-patches
  2021-11-16 11:29 ` Vladimir Davydov via Tarantool-patches
  2021-11-16 22:49 ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 2 replies; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-11-16 10:17 UTC (permalink / raw)
  To: vdavydov, v.shpilevoy; +Cc: tarantool-patches

coio_accept, coio_read, coio_write, coio_writev used to handle spurious
wakeups correctly: if the timeout hasn't passed yet, they would simply
retry reading (or writing) and fall asleep once again if no data is
ready.

This behaviour changed in the following patches:
577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
iostream wrapper for socket I/O").

Now the functions timeout on the very first spurious wakeup.

Fix this, add the appropriate unit tests and a test_iostream
implementation for the ease of testing.
---
No ticket
Branch: https://github.com/tarantool/tarantool/tree/sp/coio-rw-fixup

 src/lib/core/coio.c       |  43 ++++++-------
 test/unit/CMakeLists.txt  |   2 +-
 test/unit/coio.cc         |  80 +++++++++++++++++++++++-
 test/unit/coio.result     |   9 +++
 test/unit/test_iostream.c | 125 ++++++++++++++++++++++++++++++++++++++
 test/unit/test_iostream.h |  69 +++++++++++++++++++++
 6 files changed, 303 insertions(+), 25 deletions(-)
 create mode 100644 test/unit/test_iostream.c
 create mode 100644 test/unit/test_iostream.h

diff --git a/src/lib/core/coio.c b/src/lib/core/coio.c
index cc1cfa776..4c61c13ca 100644
--- a/src/lib/core/coio.c
+++ b/src/lib/core/coio.c
@@ -234,19 +234,19 @@ coio_accept(int sfd, struct sockaddr *addr, socklen_t addrlen,
 		}
 		if (!sio_wouldblock(errno))
 			return -1;
+		if (delay <= 0) {
+			diag_set(TimedOut);
+			return -1;
+		}
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached.
 		 */
-		int revents = coio_wait(sfd, EV_READ, delay);
+		coio_wait(sfd, EV_READ, delay);
 		if (fiber_is_cancelled()) {
 			diag_set(FiberIsCancelled);
 			return -1;
 		}
-		if (revents == 0) {
-			diag_set(TimedOut);
-			return -1;
-		}
 		coio_timeout_update(&start, &delay);
 	}
 }
@@ -287,20 +287,19 @@ coio_read_ahead_timeout(struct iostream *io, void *buf, size_t sz,
 		} else if (nrd == IOSTREAM_ERROR) {
 			return -1;
 		}
+		if (delay <= 0) {
+			diag_set(TimedOut);
+			return -1;
+		}
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is being reached.
 		 */
-		int revents = coio_wait(io->fd, iostream_status_to_events(nrd),
-					delay);
+		coio_wait(io->fd, iostream_status_to_events(nrd), delay);
 		if (fiber_is_cancelled()) {
 			diag_set(FiberIsCancelled);
 			return -1;
 		}
-		if (revents == 0) {
-			diag_set(TimedOut);
-			return -1;
-		}
 		coio_timeout_update(&start, &delay);
 	}
 }
@@ -383,21 +382,20 @@ coio_write_timeout(struct iostream *io, const void *buf, size_t sz,
 		} else if (nwr == IOSTREAM_ERROR) {
 			return -1;
 		}
+		if (delay <= 0) {
+			diag_set(TimedOut);
+			return -1;
+		}
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		int revents = coio_wait(io->fd, iostream_status_to_events(nwr),
-					delay);
+		coio_wait(io->fd, iostream_status_to_events(nwr), delay);
 		if (fiber_is_cancelled()) {
 			diag_set(FiberIsCancelled);
 			return -1;
 		}
-		if (revents == 0) {
-			diag_set(TimedOut);
-			return -1;
-		}
 		coio_timeout_update(&start, &delay);
 	}
 }
@@ -447,21 +445,20 @@ coio_writev_timeout(struct iostream *io, struct iovec *iov, int iovcnt,
 		} else if (nwr == IOSTREAM_ERROR) {
 			return -1;
 		}
+		if (delay <= 0) {
+			diag_set(TimedOut);
+			return -1;
+		}
 		/*
 		 * Yield control to other fibers until the
 		 * timeout is reached or the socket is
 		 * ready.
 		 */
-		int revents = coio_wait(io->fd, iostream_status_to_events(nwr),
-					delay);
+		coio_wait(io->fd, iostream_status_to_events(nwr), delay);
 		if (fiber_is_cancelled()) {
 			diag_set(FiberIsCancelled);
 			return -1;
 		}
-		if (revents == 0) {
-			diag_set(TimedOut);
-			return -1;
-		}
 		coio_timeout_update(&start, &delay);
 	}
 	return total;
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 79b0402c7..9a3423b14 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -119,7 +119,7 @@ if (GLIBC_USED)
     target_link_libraries(cbus_hang.test core unit stat)
 endif ()
 
-add_executable(coio.test coio.cc core_test_utils.c)
+add_executable(coio.test coio.cc core_test_utils.c test_iostream.c)
 target_link_libraries(coio.test core eio bit uri unit)
 
 if (ENABLE_BUNDLED_MSGPUCK)
diff --git a/test/unit/coio.cc b/test/unit/coio.cc
index 9d107a22e..b3d8068b7 100644
--- a/test/unit/coio.cc
+++ b/test/unit/coio.cc
@@ -4,7 +4,7 @@
 #include "coio_task.h"
 #include "fio.h"
 #include "unit.h"
-#include "unit.h"
+#include "test_iostream.h"
 
 int
 touch_f(va_list ap)
@@ -115,6 +115,82 @@ test_getaddrinfo(void)
 	footer();
 }
 
+static int
+test_read_f(va_list ap)
+{
+	struct iostream *io = va_arg(ap, struct iostream *);
+	char buf[5];
+	int rc = coio_read(io, buf, sizeof(buf));
+	if (rc < (ssize_t)sizeof(buf))
+		return -1;
+	return 0;
+}
+
+static int
+test_write_f(va_list ap)
+{
+	struct iostream *io = va_arg(ap, struct iostream *);
+	const char str[] = "test";
+	int rc = coio_write_timeout(io, str, sizeof(str), TIMEOUT_INFINITY);
+	if (rc < (ssize_t)sizeof(str))
+		return -1;
+	return 0;
+}
+
+static int
+test_writev_f(va_list ap)
+{
+	struct iostream *io = va_arg(ap, struct iostream *);
+	const char str[] = "test";
+	struct iovec iov = {(void *)str, sizeof(str)};
+	int rc = coio_writev(io, &iov, 1, 0);
+	if (rc < (ssize_t)sizeof(str))
+		return -1;
+	return 0;
+}
+
+static void
+read_write_test(void)
+{
+	header();
+
+	fiber_func test_funcs[] = {
+		test_read_f,
+		test_write_f,
+		test_writev_f,
+	};
+	const char *descr[] = {
+		"read",
+		"write",
+		"writev",
+	};
+
+	int num_tests = sizeof(test_funcs) / sizeof(test_funcs[0]);
+	plan(2 * num_tests);
+
+	struct test_stream s;
+	test_stream_create(&s, 0, 0);
+	for (int i = 0; i < num_tests; i++) {
+		test_stream_reset(&s, 0, 0);
+		struct fiber *f = fiber_new_xc("rw_test", test_funcs[i]);
+		fiber_set_joinable(f, true);
+		fiber_start(f, &s.io);
+		fiber_wakeup(f);
+		fiber_sleep(0);
+		ok(!fiber_is_dead(f), "coio_%s handle spurious wakeup",
+		   descr[i]);
+		test_stream_reset(&s, INT_MAX, INT_MAX);
+		fiber_wakeup(f);
+		int rc = fiber_join(f);
+		ok(rc == 0, "coio_%s success after a spurious wakeup",
+		   descr[i]);
+	}
+	test_stream_destroy(&s);
+
+	check_plan();
+	footer();
+}
+
 static int
 main_f(va_list ap)
 {
@@ -135,6 +211,8 @@ main_f(va_list ap)
 
 	test_getaddrinfo();
 
+	read_write_test();
+
 	ev_break(loop(), EVBREAK_ALL);
 	return 0;
 }
diff --git a/test/unit/coio.result b/test/unit/coio.result
index 90b567140..368914bcd 100644
--- a/test/unit/coio.result
+++ b/test/unit/coio.result
@@ -12,3 +12,12 @@ ok 1 - getaddrinfo
 ok 2 - getaddrinfo retval
 ok 3 - getaddrinfo error message
 	*** test_getaddrinfo: done ***
+	*** read_write_test ***
+1..6
+ok 1 - coio_read handle spurious wakeup
+ok 2 - coio_read success after a spurious wakeup
+ok 3 - coio_write handle spurious wakeup
+ok 4 - coio_write success after a spurious wakeup
+ok 5 - coio_writev handle spurious wakeup
+ok 6 - coio_writev success after a spurious wakeup
+	*** read_write_test: done ***
diff --git a/test/unit/test_iostream.c b/test/unit/test_iostream.c
new file mode 100644
index 000000000..c50b63944
--- /dev/null
+++ b/test/unit/test_iostream.c
@@ -0,0 +1,125 @@
+/*
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
+ */
+
+#include <sys/uio.h>
+#include <fcntl.h>
+#include <sys/errno.h>
+
+#include "test_iostream.h"
+#include "trivia/util.h"
+
+static ssize_t
+test_stream_read(struct iostream *io, void *buf, size_t count)
+{
+	struct test_stream_ctx *ctx = io->ctx;
+	io->fd = ctx->rfd;
+	(void)buf;
+	ssize_t nrd = 0;
+	if (ctx->avail_rd == 0)
+		return IOSTREAM_WANT_READ;
+	nrd = MIN(count, ctx->avail_rd);
+	ctx->avail_rd -= nrd;
+	return nrd;
+}
+
+static ssize_t
+test_stream_write(struct iostream *io, const void *buf, size_t count)
+{
+	assert(io->fd >= 0);
+	struct test_stream_ctx *ctx = io->ctx;
+	io->fd = ctx->wfd;
+	(void)buf;
+	ssize_t nwr = 0;
+	if (ctx->avail_wr == 0)
+		return IOSTREAM_WANT_WRITE;
+	nwr = MIN(count, ctx->avail_wr);
+	ctx->avail_wr -= nwr;
+	return nwr;
+}
+
+static ssize_t
+test_stream_writev(struct iostream *io, const struct iovec *iov, int iovcnt)
+{
+	assert(io->fd >= 0);
+	struct test_stream_ctx *ctx = io->ctx;
+	io->fd = ctx->wfd;
+	if (ctx->avail_wr == 0)
+		return IOSTREAM_WANT_WRITE;
+	ssize_t start_wr = ctx->avail_wr;
+	for (int i = 0; i < iovcnt && ctx->avail_wr > 0; i++) {
+		size_t nwr = MIN(iov[i].iov_len, ctx->avail_wr);
+		ctx->avail_wr -= nwr;
+	}
+	return start_wr - ctx->avail_wr;
+}
+
+static void
+test_stream_delete_ctx(void *ptr)
+{
+	struct test_stream_ctx *ctx = ptr;
+	close(ctx->rfds[0]);
+	close(ctx->rfds[1]);
+	close(ctx->wfds[0]);
+	close(ctx->wfds[1]);
+}
+
+static const struct iostream_vtab test_stream_vtab = {
+	test_stream_delete_ctx,
+	test_stream_read,
+	test_stream_write,
+	test_stream_writev,
+};
+
+static void
+fill_pipe(int fd)
+{
+	char buf[4096];
+	int rc = 0;
+	rc = fcntl(fd, F_SETFL, O_NONBLOCK);
+	assert(rc >= 0);
+	while (rc >= 0 || errno == EINTR)
+		rc = write(fd, buf, sizeof(buf));
+	assert(errno == EAGAIN || errno == EWOULDBLOCK);
+}
+
+static void
+test_stream_ctx_create(struct test_stream_ctx *ctx, size_t maxrd, size_t maxwr)
+{
+	ctx->avail_rd = maxrd;
+	ctx->avail_wr = maxwr;
+	int rc = pipe(ctx->rfds);
+	assert(rc == 0);
+	rc = pipe(ctx->wfds);
+	assert(rc == 0);
+	/* rfd is never readable, since we didn't write anything to the pipe. */
+	ctx->rfd = ctx->rfds[0];
+	ctx->wfd = ctx->wfds[1];
+	/* Make wfd never writeable by filling the pipe. */
+	fill_pipe(ctx->wfd);
+}
+
+void
+test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr)
+{
+	test_stream_ctx_create(&s->ctx, maxrd, maxwr);
+	s->io.ctx = &s->ctx;
+	s->io.vtab = &test_stream_vtab;
+	s->io.fd = -1; /* Initialized on demand. */
+}
+
+void
+test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr)
+{
+	struct test_stream_ctx *ctx = s->io.ctx;
+	ctx->avail_rd = maxrd;
+	ctx->avail_wr = maxwr;
+}
+
+void
+test_stream_destroy(struct test_stream *s)
+{
+	iostream_destroy(&s->io);
+}
diff --git a/test/unit/test_iostream.h b/test/unit/test_iostream.h
new file mode 100644
index 000000000..fd8b998aa
--- /dev/null
+++ b/test/unit/test_iostream.h
@@ -0,0 +1,69 @@
+/*
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
+ */
+
+#pragma once
+
+#include "core/iostream.h"
+/**
+ * A testing stream implementation. Doesn't write anything anywhere, but is
+ * useful for testing coio behaviour when waiting for stream to become readable
+ * or writeable.
+ */
+
+#if defined (__cplusplus)
+extern "C" {
+#endif
+
+struct test_stream_ctx {
+	/** How many bytes can be written before the stream would block. */
+	size_t avail_wr;
+	/** How many bytes can be read before the stream would block. */
+	size_t avail_rd;
+	/**
+	 * A file descriptor which is never ready for reads. Points to rfds[0],
+	 * the reading end of a pipe which is never written to.
+	 */
+	int rfd;
+	/** A pipe used to simulate a fd never ready for reads. */
+	int rfds[2];
+	/**
+	 * A file descriptor which is never ready for writes. Points to
+	 * wfds[1], the writing end of a filled up pipe.
+	 */
+	int wfd;
+	/** A pipe to simulate a fd never ready for writes. */
+	int wfds[2];
+};
+
+struct test_stream {
+	struct iostream io;
+	struct test_stream_ctx ctx;
+};
+
+/**
+ * Create an instance of a testing iostream.
+ * The stream will allow reading up to \a maxrd and writing up to \a maxwr
+ * bytes before blocking.
+ */
+void
+test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr);
+
+/**
+ * Reset the stream. Allow additional \a maxrd and \a maxwr bytes of input /
+ * output respectively.
+ */
+void
+test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr);
+
+/**
+ * Destroy the stream.
+ */
+void
+test_stream_destroy(struct test_stream *s);
+
+#if defined (__cplusplus)
+} /* extern "C" */
+#endif
-- 
2.30.1 (Apple Git-130)


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-16 10:17 [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly Serge Petrenko via Tarantool-patches
@ 2021-11-16 11:29 ` Vladimir Davydov via Tarantool-patches
  2021-11-16 11:36   ` Serge Petrenko via Tarantool-patches
  2021-11-16 22:49 ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 1 reply; 9+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-11-16 11:29 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches

Could you please create a pull request?

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-16 11:29 ` Vladimir Davydov via Tarantool-patches
@ 2021-11-16 11:36   ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-11-16 11:36 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: v.shpilevoy, tarantool-patches



16.11.2021 14:29, Vladimir Davydov пишет:
> Could you please create a pull request?
Ok, sure.
https://github.com/tarantool/tarantool/pull/6612

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-16 10:17 [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly Serge Petrenko via Tarantool-patches
  2021-11-16 11:29 ` Vladimir Davydov via Tarantool-patches
@ 2021-11-16 22:49 ` Vladislav Shpilevoy via Tarantool-patches
  2021-11-17  7:33   ` Serge Petrenko via Tarantool-patches
  1 sibling, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-11-16 22:49 UTC (permalink / raw)
  To: Serge Petrenko, vdavydov; +Cc: tarantool-patches

Hi! Thanks for the patch!

On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote:
> coio_accept, coio_read, coio_write, coio_writev used to handle spurious
> wakeups correctly: if the timeout hasn't passed yet, they would simply
> retry reading (or writing) and fall asleep once again if no data is
> ready.
> 
> This behaviour changed in the following patches:
> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
> iostream wrapper for socket I/O").
> 
> Now the functions timeout on the very first spurious wakeup.
> 
> Fix this, add the appropriate unit tests and a test_iostream
> implementation for the ease of testing.

Don't we have the same problem with coio_connect_addr() (used in
coio_connect_timeout())?

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-16 22:49 ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-11-17  7:33   ` Serge Petrenko via Tarantool-patches
  2021-11-17 23:11     ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-11-17  7:33 UTC (permalink / raw)
  To: Vladislav Shpilevoy, vdavydov; +Cc: tarantool-patches



17.11.2021 01:49, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!

Thanks for your answer!

>
> On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote:
>> coio_accept, coio_read, coio_write, coio_writev used to handle spurious
>> wakeups correctly: if the timeout hasn't passed yet, they would simply
>> retry reading (or writing) and fall asleep once again if no data is
>> ready.
>>
>> This behaviour changed in the following patches:
>> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
>> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
>> iostream wrapper for socket I/O").
>>
>> Now the functions timeout on the very first spurious wakeup.
>>
>> Fix this, add the appropriate unit tests and a test_iostream
>> implementation for the ease of testing.
> Don't we have the same problem with coio_connect_addr() (used in
> coio_connect_timeout())?
Not really. Neither coio_connect_addr() nor coio_connect_timeout() retry
the connection. So even the previous version would throw an error after
a spurious wakeup. Just the error would be different.
Before the change it would throw SocketError, not TimedOut, but I don't 
think
this matters much. Does it?

By the change I mean (2db0741b) "coio: return fd from coio_connect".

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-17  7:33   ` Serge Petrenko via Tarantool-patches
@ 2021-11-17 23:11     ` Vladislav Shpilevoy via Tarantool-patches
  2021-11-19 10:30       ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-11-17 23:11 UTC (permalink / raw)
  To: Serge Petrenko, vdavydov; +Cc: tarantool-patches

>> On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote:
>>> coio_accept, coio_read, coio_write, coio_writev used to handle spurious
>>> wakeups correctly: if the timeout hasn't passed yet, they would simply
>>> retry reading (or writing) and fall asleep once again if no data is
>>> ready.
>>>
>>> This behaviour changed in the following patches:
>>> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
>>> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
>>> iostream wrapper for socket I/O").
>>>
>>> Now the functions timeout on the very first spurious wakeup.
>>>
>>> Fix this, add the appropriate unit tests and a test_iostream
>>> implementation for the ease of testing.
>> Don't we have the same problem with coio_connect_addr() (used in
>> coio_connect_timeout())?
> Not really. Neither coio_connect_addr() nor coio_connect_timeout() retry
> the connection. So even the previous version would throw an error after
> a spurious wakeup. Just the error would be different.
> Before the change it would throw SocketError, not TimedOut, but I don't think
> this matters much. Does it?
> 
> By the change I mean (2db0741b) "coio: return fd from coio_connect".

Ok, then never mind. I will wait for fixes of Vova's comments regarding the
tests, but the bugfix itself LGTM.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-17 23:11     ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-11-19 10:30       ` Serge Petrenko via Tarantool-patches
  2021-11-21 14:24         ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-11-19 10:30 UTC (permalink / raw)
  To: Vladislav Shpilevoy, vdavydov; +Cc: tarantool-patches



18.11.2021 02:11, Vladislav Shpilevoy пишет:
>>> On 16.11.2021 11:17, Serge Petrenko via Tarantool-patches wrote:
>>>> coio_accept, coio_read, coio_write, coio_writev used to handle spurious
>>>> wakeups correctly: if the timeout hasn't passed yet, they would simply
>>>> retry reading (or writing) and fall asleep once again if no data is
>>>> ready.
>>>>
>>>> This behaviour changed in the following patches:
>>>> 577a640a7fdec986d19101ed04d2afa80e951c78 ("coio: pass fd to
>>>> coio_accept") and 4f84859dcdd6126b0bdcda810b7f5f58386bdac6 ("Introduce
>>>> iostream wrapper for socket I/O").
>>>>
>>>> Now the functions timeout on the very first spurious wakeup.
>>>>
>>>> Fix this, add the appropriate unit tests and a test_iostream
>>>> implementation for the ease of testing.
>>> Don't we have the same problem with coio_connect_addr() (used in
>>> coio_connect_timeout())?
>> Not really. Neither coio_connect_addr() nor coio_connect_timeout() retry
>> the connection. So even the previous version would throw an error after
>> a spurious wakeup. Just the error would be different.
>> Before the change it would throw SocketError, not TimedOut, but I don't think
>> this matters much. Does it?
>>
>> By the change I mean (2db0741b) "coio: return fd from coio_connect".
> Ok, then never mind. I will wait for fixes of Vova's comments regarding the
> tests, but the bugfix itself LGTM.

Thanks for your answer!

Here's the incremental diff after the fixes.
Long story short, I've deleted the test_iostream wrapper.
There wasn't much use for it.
I've force-pushed the new version on the branch.


===============================
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 71c8ba981..d8b1aac0d 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -121,7 +121,7 @@ if (GLIBC_USED)
      target_link_libraries(cbus_hang.test core unit stat)
  endif ()

-add_executable(coio.test coio.cc core_test_utils.c test_iostream.c)
+add_executable(coio.test coio.cc core_test_utils.c)
  target_link_libraries(coio.test core eio bit uri unit)

  if (ENABLE_BUNDLED_MSGPUCK)
diff --git a/test/unit/coio.cc b/test/unit/coio.cc
index 194503fda..d55bc2347 100644
--- a/test/unit/coio.cc
+++ b/test/unit/coio.cc
@@ -4,7 +4,11 @@
  #include "coio_task.h"
  #include "fio.h"
  #include "unit.h"
-#include "test_iostream.h"
+#include "iostream.h"
+
+#include <fcntl.h>
+#include <sys/uio.h>
+#include <sys/errno.h>

  int
  touch_f(va_list ap)
@@ -137,7 +141,7 @@ static int
  test_read_f(va_list ap)
  {
      struct iostream *io = va_arg(ap, struct iostream *);
-    char buf[5];
+    char buf[1024];
      int rc = coio_read(io, buf, sizeof(buf));
      if (rc < (ssize_t)sizeof(buf))
          return -1;
@@ -148,9 +152,9 @@ static int
  test_write_f(va_list ap)
  {
      struct iostream *io = va_arg(ap, struct iostream *);
-    const char str[] = "test";
-    int rc = coio_write_timeout(io, str, sizeof(str), TIMEOUT_INFINITY);
-    if (rc < (ssize_t)sizeof(str))
+    char buf[1024] = "";
+    int rc = coio_write_timeout(io, buf, sizeof(buf), TIMEOUT_INFINITY);
+    if (rc < (ssize_t)sizeof(buf))
          return -1;
      return 0;
  }
@@ -159,14 +163,45 @@ static int
  test_writev_f(va_list ap)
  {
      struct iostream *io = va_arg(ap, struct iostream *);
-    const char str[] = "test";
-    struct iovec iov = {(void *)str, sizeof(str)};
+    char buf[1024] = "";
+    struct iovec iov = {(void *)buf, sizeof(buf)};
      int rc = coio_writev(io, &iov, 1, 0);
-    if (rc < (ssize_t)sizeof(str))
+    if (rc < (ssize_t)sizeof(buf))
          return -1;
      return 0;
  }

+static void
+fill_pipe(int fd)
+{
+    char buf[1024] = "";
+    int rc = 0;
+    while (rc >= 0 || errno == EINTR)
+        rc = write(fd, buf, sizeof(buf));
+    fail_unless(errno == EAGAIN || errno == EWOULDBLOCK);
+}
+
+static void
+empty_pipe(int fd)
+{
+    char buf[1024];
+    int rc = 0;
+    while (rc >= 0 || errno == EINTR)
+        rc = read(fd, buf, sizeof(buf));
+    fail_unless(errno == EAGAIN || errno == EWOULDBLOCK);
+}
+
+static void
+create_pipe(int fds[2])
+{
+    int rc = pipe(fds);
+    fail_unless(rc >= 0);
+    rc = fcntl(fds[0], F_SETFL, O_NONBLOCK);
+    fail_unless(rc >= 0);
+    rc = fcntl(fds[1], F_SETFL, O_NONBLOCK);
+    fail_unless(rc >= 0);
+}
+
  static void
  read_write_test(void)
  {
@@ -186,25 +221,36 @@ read_write_test(void)
      int num_tests = sizeof(test_funcs) / sizeof(test_funcs[0]);
      plan(2 * num_tests);

-    struct test_stream s;
-    test_stream_create(&s, 0, 0);
+    int fds[2];
+    create_pipe(fds);
      for (int i = 0; i < num_tests; i++) {
-        test_stream_reset(&s, 0, 0);
+        struct iostream io;
+        if (i == 0) {
+            /* A non-readable fd, since the pipe is empty. */
+            iostream_create(&io, fds[0]);
+        } else {
+            iostream_create(&io, fds[1]);
+            /* Make the fd non-writable. */
+            fill_pipe(fds[1]);
+        }
          struct fiber *f = fiber_new_xc("rw_test", test_funcs[i]);
          fiber_set_joinable(f, true);
-        fiber_start(f, &s.io);
+        fiber_start(f, &io);
          fiber_wakeup(f);
          fiber_sleep(0);
          ok(!fiber_is_dead(f), "coio_%s handle spurious wakeup",
             descr[i]);
-        test_stream_reset(&s, INT_MAX, INT_MAX);
-        fiber_wakeup(f);
+        if (i == 0)
+            fill_pipe(fds[1]);
+        else
+            empty_pipe(fds[0]);
          int rc = fiber_join(f);
          ok(rc == 0, "coio_%s success after a spurious wakeup",
             descr[i]);
+        iostream_destroy(&io);
      }
-    test_stream_destroy(&s);
-
+    close(fds[0]);
+    close(fds[1]);
      check_plan();
      footer();
  }
diff --git a/test/unit/test_iostream.c b/test/unit/test_iostream.c
deleted file mode 100644
index c50b63944..000000000
--- a/test/unit/test_iostream.c
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * SPDX-License-Identifier: BSD-2-Clause
- *
- * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
- */
-
-#include <sys/uio.h>
-#include <fcntl.h>
-#include <sys/errno.h>
-
-#include "test_iostream.h"
-#include "trivia/util.h"
-
-static ssize_t
-test_stream_read(struct iostream *io, void *buf, size_t count)
-{
-    struct test_stream_ctx *ctx = io->ctx;
-    io->fd = ctx->rfd;
-    (void)buf;
-    ssize_t nrd = 0;
-    if (ctx->avail_rd == 0)
-        return IOSTREAM_WANT_READ;
-    nrd = MIN(count, ctx->avail_rd);
-    ctx->avail_rd -= nrd;
-    return nrd;
-}
-
-static ssize_t
-test_stream_write(struct iostream *io, const void *buf, size_t count)
-{
-    assert(io->fd >= 0);
-    struct test_stream_ctx *ctx = io->ctx;
-    io->fd = ctx->wfd;
-    (void)buf;
-    ssize_t nwr = 0;
-    if (ctx->avail_wr == 0)
-        return IOSTREAM_WANT_WRITE;
-    nwr = MIN(count, ctx->avail_wr);
-    ctx->avail_wr -= nwr;
-    return nwr;
-}
-
-static ssize_t
-test_stream_writev(struct iostream *io, const struct iovec *iov, int 
iovcnt)
-{
-    assert(io->fd >= 0);
-    struct test_stream_ctx *ctx = io->ctx;
-    io->fd = ctx->wfd;
-    if (ctx->avail_wr == 0)
-        return IOSTREAM_WANT_WRITE;
-    ssize_t start_wr = ctx->avail_wr;
-    for (int i = 0; i < iovcnt && ctx->avail_wr > 0; i++) {
-        size_t nwr = MIN(iov[i].iov_len, ctx->avail_wr);
-        ctx->avail_wr -= nwr;
-    }
-    return start_wr - ctx->avail_wr;
-}
-
-static void
-test_stream_delete_ctx(void *ptr)
-{
-    struct test_stream_ctx *ctx = ptr;
-    close(ctx->rfds[0]);
-    close(ctx->rfds[1]);
-    close(ctx->wfds[0]);
-    close(ctx->wfds[1]);
-}
-
-static const struct iostream_vtab test_stream_vtab = {
-    test_stream_delete_ctx,
-    test_stream_read,
-    test_stream_write,
-    test_stream_writev,
-};
-
-static void
-fill_pipe(int fd)
-{
-    char buf[4096];
-    int rc = 0;
-    rc = fcntl(fd, F_SETFL, O_NONBLOCK);
-    assert(rc >= 0);
-    while (rc >= 0 || errno == EINTR)
-        rc = write(fd, buf, sizeof(buf));
-    assert(errno == EAGAIN || errno == EWOULDBLOCK);
-}
-
-static void
-test_stream_ctx_create(struct test_stream_ctx *ctx, size_t maxrd, 
size_t maxwr)
-{
-    ctx->avail_rd = maxrd;
-    ctx->avail_wr = maxwr;
-    int rc = pipe(ctx->rfds);
-    assert(rc == 0);
-    rc = pipe(ctx->wfds);
-    assert(rc == 0);
-    /* rfd is never readable, since we didn't write anything to the 
pipe. */
-    ctx->rfd = ctx->rfds[0];
-    ctx->wfd = ctx->wfds[1];
-    /* Make wfd never writeable by filling the pipe. */
-    fill_pipe(ctx->wfd);
-}
-
-void
-test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr)
-{
-    test_stream_ctx_create(&s->ctx, maxrd, maxwr);
-    s->io.ctx = &s->ctx;
-    s->io.vtab = &test_stream_vtab;
-    s->io.fd = -1; /* Initialized on demand. */
-}
-
-void
-test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr)
-{
-    struct test_stream_ctx *ctx = s->io.ctx;
-    ctx->avail_rd = maxrd;
-    ctx->avail_wr = maxwr;
-}
-
-void
-test_stream_destroy(struct test_stream *s)
-{
-    iostream_destroy(&s->io);
-}
diff --git a/test/unit/test_iostream.h b/test/unit/test_iostream.h
deleted file mode 100644
index fd8b998aa..000000000
--- a/test/unit/test_iostream.h
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * SPDX-License-Identifier: BSD-2-Clause
- *
- * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
- */
-
-#pragma once
-
-#include "core/iostream.h"
-/**
- * A testing stream implementation. Doesn't write anything anywhere, but is
- * useful for testing coio behaviour when waiting for stream to become 
readable
- * or writeable.
- */
-
-#if defined (__cplusplus)
-extern "C" {
-#endif
-
-struct test_stream_ctx {
-    /** How many bytes can be written before the stream would block. */
-    size_t avail_wr;
-    /** How many bytes can be read before the stream would block. */
-    size_t avail_rd;
-    /**
-     * A file descriptor which is never ready for reads. Points to rfds[0],
-     * the reading end of a pipe which is never written to.
-     */
-    int rfd;
-    /** A pipe used to simulate a fd never ready for reads. */
-    int rfds[2];
-    /**
-     * A file descriptor which is never ready for writes. Points to
-     * wfds[1], the writing end of a filled up pipe.
-     */
-    int wfd;
-    /** A pipe to simulate a fd never ready for writes. */
-    int wfds[2];
-};
-
-struct test_stream {
-    struct iostream io;
-    struct test_stream_ctx ctx;
-};
-
-/**
- * Create an instance of a testing iostream.
- * The stream will allow reading up to \a maxrd and writing up to \a maxwr
- * bytes before blocking.
- */
-void
-test_stream_create(struct test_stream *s, size_t maxrd, size_t maxwr);
-
-/**
- * Reset the stream. Allow additional \a maxrd and \a maxwr bytes of 
input /
- * output respectively.
- */
-void
-test_stream_reset(struct test_stream *s, size_t maxrd, size_t maxwr);
-
-/**
- * Destroy the stream.
- */
-void
-test_stream_destroy(struct test_stream *s);
-
-#if defined (__cplusplus)
-} /* extern "C" */
-#endif


===============================

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-19 10:30       ` Serge Petrenko via Tarantool-patches
@ 2021-11-21 14:24         ` Vladislav Shpilevoy via Tarantool-patches
  2021-11-22  6:47           ` Serge Petrenko via Tarantool-patches
  0 siblings, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2021-11-21 14:24 UTC (permalink / raw)
  To: Serge Petrenko, vdavydov; +Cc: tarantool-patches

Hi! Thanks for the fixes!

In the commit message you still mention test_iostream.

After you remove it - LGTM.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly
  2021-11-21 14:24         ` Vladislav Shpilevoy via Tarantool-patches
@ 2021-11-22  6:47           ` Serge Petrenko via Tarantool-patches
  0 siblings, 0 replies; 9+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-11-22  6:47 UTC (permalink / raw)
  To: Vladislav Shpilevoy, vdavydov; +Cc: tarantool-patches



21.11.2021 17:24, Vladislav Shpilevoy пишет:
> Hi! Thanks for the fixes!
>
> In the commit message you still mention test_iostream.
>
> After you remove it - LGTM.
My bad. Fixed.

-- 
Serge Petrenko


^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2021-11-22  6:47 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-11-16 10:17 [Tarantool-patches] [PATCH] coio: handle spurious wakeup correctly Serge Petrenko via Tarantool-patches
2021-11-16 11:29 ` Vladimir Davydov via Tarantool-patches
2021-11-16 11:36   ` Serge Petrenko via Tarantool-patches
2021-11-16 22:49 ` Vladislav Shpilevoy via Tarantool-patches
2021-11-17  7:33   ` Serge Petrenko via Tarantool-patches
2021-11-17 23:11     ` Vladislav Shpilevoy via Tarantool-patches
2021-11-19 10:30       ` Serge Petrenko via Tarantool-patches
2021-11-21 14:24         ` Vladislav Shpilevoy via Tarantool-patches
2021-11-22  6:47           ` Serge Petrenko via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox