[Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback

Leonid Vasiliev lvasiliev at tarantool.org
Mon Dec 23 15:44:52 MSK 2019

On 12/5/19 10:27 AM, Konstantin Osipov wrote:
> * Leonid Vasiliev <lvasiliev at tarantool.org> [19/12/05 10:24]:
>> On 12/3/19 9:02 PM, Konstantin Osipov wrote:
>>> * Leonid Vasiliev <lvasiliev at tarantool.org> [19/12/03 19:36]:
>>>> https://github.com/tarantool/tarantool/issues/4127
>>>> https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks
>>> Looks like a great catch.
>>>> We need to set a thread cancellation guard, because
>>>> another thread may cancel the current thread
>>>> (write() is a cancellation point in ev_async_send)
>>>> and the activation of the ev_async watcher
>>>> through ev_async_send will fail.
>>> I still don't get from the explanation why it is relevant that
>>> ev_async_send mustn't fail?
>> The cause of why the ev_async_send mustn't fail is unwanted behavior of the
>> tarantool instance. For example: first thread flush cpipe input to a
>> endpoint output and go away while trying to call ev_async_send (write() -
>> cancellation point). Now stailq_empty(&endpoint->output) is false. After
>> that, another thread flush cpipe input to the same endpoint, but it didn't
>> try to call ev_async_send, because output_was_empty is false. As result: a
>> thread of endpoint->consumer didn't wake-up (blocked on epoll_wait). The
>> same situation described in
>> https://github.com/tarantool/tarantool/issues/4127:
> Looks like an explanation that deserves to be in a comment prior
> to pthread_setcancelstate().
> The issue is, however, if a thread is cancelled and disappears
> before deregistering from cbus, a lot of other bad things will
> happen - because all of its registration entries will sit there.
> The memory, luckily, will not go away, but I am not sure anyone
> but the creating thread can deregister the memory structures
> safaly. Looks like this could be covered with a unit test, what do
> you think?
>> at main thread:
>> 	from void wal_free(void):
>> 		cbus_stop_loop(&writer->wal_pipe);
>> 		if (cord_join(&writer->cord)) {...}  // wait the "wal" thread
>> at "wal" thread:
>> 	don't try to call cbus_stop_loop_f (for the reasons described above)
>> 	blocked at epoll_wait()

Add a unit test on hang of a thread on join.

diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 4a57597e9..339521489 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -98,6 +98,9 @@ target_link_libraries(cbus_stress.test core stat)
  add_executable(cbus.test cbus.c)
  target_link_libraries(cbus.test core unit stat)

+add_executable(cbus_hang.test cbus_hang.c)
+target_link_libraries(cbus_hang.test core unit stat)
  add_executable(coio.test coio.cc)
  target_link_libraries(coio.test core eio bit uri unit)

diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c
new file mode 100644
index 000000000..36487cd9b
--- /dev/null
+++ b/test/unit/cbus_hang.c
@@ -0,0 +1,150 @@
+#include "cbus.h"
+#include "fiber.h"
+#include "memory.h"
+#include "unit.h"
+struct cord hang_worker;
+struct cord canceled_worker;
+struct cbus_endpoint hang_endpoint;
+struct cpipe pipe_from_cl_to_hang;
+struct cpipe pipe_from_main_to_hang;
+ * Hack.
+ * We need to synchronize the main thread and
+ * the canceled worker thread for to do a cancellation
+ * in a specific moment. So, do this using
+ * the endpoint's mutex.
+pthread_mutex_t endpoint_hack_mutex;
+pthread_cond_t endpoint_hack_cond;
+void join_fail(int signum) {
+	(void)signum;
+	printf("Can't join the hang worker\n");
+static void
+do_nothing(struct cmsg *m)
+	(void) m;
+static int
+hang_worker_f(va_list ap)
+	(void) ap;
+	cbus_endpoint_create(&hang_endpoint, "hang_worker",
+	                     fiber_schedule_cb, fiber());
+	tt_pthread_mutex_lock(&endpoint_hack_mutex);
+	tt_pthread_cond_signal(&endpoint_hack_cond);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex);
+	cbus_loop(&hang_endpoint);
+	cbus_endpoint_destroy(&hang_endpoint, cbus_process);
+	return 0;
+static void
+	cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL);
+static int
+canceled_worker_f(va_list ap)
+	(void) ap;
+	cpipe_create(&pipe_from_cl_to_hang, "hang_worker");
+	cpipe_set_max_input(&pipe_from_cl_to_hang, 1);
+	static struct cmsg_hop nothing_route = { do_nothing, NULL };
+	static struct cmsg nothing_msg;
+	cmsg_init(&nothing_msg, &nothing_route);
+	/*
+	 * We need to use the cpipe_push_input cause
+	 * an ev_invoke must be called for a hang reproducing
+	 */
+	cpipe_push_input(&pipe_from_cl_to_hang, &nothing_msg);
+	cpipe_destroy(&pipe_from_cl_to_hang);
+	return 0;
+static void
+	cord_costart(&canceled_worker, "canceled_worker",
+	             canceled_worker_f, NULL);
+static int
+main_f(va_list ap)
+	(void) ap;
+	hang_worker_start();
+	/* Start the endpoint's mutex hack */
+	tt_pthread_mutex_lock(&endpoint_hack_mutex);
+	tt_pthread_cond_wait(&endpoint_hack_cond, &endpoint_hack_mutex);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex);
+	tt_pthread_mutex_lock(&(hang_endpoint.mutex));
+	canceled_worker_start();
+	while(hang_endpoint.mutex.__data.__lock < 2) {
+		usleep(100);
+	}
+	tt_pthread_cancel(canceled_worker.id);
+	pthread_mutex_unlock(&(hang_endpoint.mutex));
+	/* Hack end */
+	tt_pthread_join(canceled_worker.id, NULL);
+	cpipe_create(&pipe_from_main_to_hang, "hang_worker");
+	cbus_stop_loop(&pipe_from_main_to_hang);
+	cpipe_destroy(&pipe_from_main_to_hang);
+	unsigned join_timeout = 5;
+	signal(SIGALRM, join_fail); // For exit in a hang case
+	alarm(join_timeout);
+	cord_join(&hang_worker);
+	ok(true, "The hang worker has been joined");
+	alarm(0);
+	ev_break(loop(), EVBREAK_ALL);
+	return 0;
+	header();
+	plan(1);
+	memory_init();
+	fiber_init(fiber_c_invoke);
+	cbus_init();
+	tt_pthread_cond_init(&endpoint_hack_cond, NULL);
+	tt_pthread_mutex_init(&endpoint_hack_mutex, NULL);
+	struct fiber *main_fiber = fiber_new("main", main_f);
+	assert(main_fiber != NULL);
+	fiber_wakeup(main_fiber);
+	ev_run(loop(), 0);
+	tt_pthread_cond_destroy(&endpoint_hack_cond);
+	cbus_free();
+	fiber_free();
+	memory_free();
+	int rc = check_plan();
+	footer();
+	return rc;
diff --git a/test/unit/cbus_hang.result b/test/unit/cbus_hang.result
new file mode 100644
index 000000000..d5810e8cf
--- /dev/null
+++ b/test/unit/cbus_hang.result
@@ -0,0 +1,4 @@
+	*** main ***
+ok 1 - The hang worker has been joined
+	*** main: done ***

More information about the Tarantool-patches mailing list