[Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback
Leonid Vasiliev
lvasiliev at tarantool.org
Mon Dec 23 15:44:52 MSK 2019
On 12/5/19 10:27 AM, Konstantin Osipov wrote:
> * Leonid Vasiliev <lvasiliev at tarantool.org> [19/12/05 10:24]:
>> On 12/3/19 9:02 PM, Konstantin Osipov wrote:
>>> * Leonid Vasiliev <lvasiliev at tarantool.org> [19/12/03 19:36]:
>>>> https://github.com/tarantool/tarantool/issues/4127
>>>> https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks
>>>
>>> Looks like a great catch.
>>>
>>>> We need to set a thread cancellation guard, because
>>>> another thread may cancel the current thread
>>>> (write() is a cancellation point in ev_async_send)
>>>> and the activation of the ev_async watcher
>>>> through ev_async_send will fail.
>>>
>>> I still don't get from the explanation why it is relevant that
>>> ev_async_send mustn't fail?
>>
>> The cause of why the ev_async_send mustn't fail is unwanted behavior of the
>> tarantool instance. For example: first thread flush cpipe input to a
>> endpoint output and go away while trying to call ev_async_send (write() -
>> cancellation point). Now stailq_empty(&endpoint->output) is false. After
>> that, another thread flush cpipe input to the same endpoint, but it didn't
>> try to call ev_async_send, because output_was_empty is false. As result: a
>> thread of endpoint->consumer didn't wake-up (blocked on epoll_wait). The
>> same situation described in
>> https://github.com/tarantool/tarantool/issues/4127:
>>
>
> Looks like an explanation that deserves to be in a comment prior
> to pthread_setcancelstate().
>
> The issue is, however, if a thread is cancelled and disappears
> before deregistering from cbus, a lot of other bad things will
> happen - because all of its registration entries will sit there.
> The memory, luckily, will not go away, but I am not sure anyone
> but the creating thread can deregister the memory structures
> safaly. Looks like this could be covered with a unit test, what do
> you think?
>
>> at main thread:
>> from void wal_free(void):
>> cbus_stop_loop(&writer->wal_pipe);
>> if (cord_join(&writer->cord)) {...} // wait the "wal" thread
>>
>> at "wal" thread:
>> don't try to call cbus_stop_loop_f (for the reasons described above)
>> blocked at epoll_wait()
>>
>>>
>>>
>
Add a unit test on hang of a thread on join.
---
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 4a57597e9..339521489 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -98,6 +98,9 @@ target_link_libraries(cbus_stress.test core stat)
add_executable(cbus.test cbus.c)
target_link_libraries(cbus.test core unit stat)
+add_executable(cbus_hang.test cbus_hang.c)
+target_link_libraries(cbus_hang.test core unit stat)
+
add_executable(coio.test coio.cc)
target_link_libraries(coio.test core eio bit uri unit)
diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c
new file mode 100644
index 000000000..36487cd9b
--- /dev/null
+++ b/test/unit/cbus_hang.c
@@ -0,0 +1,150 @@
+#include "cbus.h"
+#include "fiber.h"
+#include "memory.h"
+#include "unit.h"
+
+
+struct cord hang_worker;
+struct cord canceled_worker;
+
+struct cbus_endpoint hang_endpoint;
+struct cpipe pipe_from_cl_to_hang;
+struct cpipe pipe_from_main_to_hang;
+
+/*
+ * Hack.
+ * We need to synchronize the main thread and
+ * the canceled worker thread for to do a cancellation
+ * in a specific moment. So, do this using
+ * the endpoint's mutex.
+*/
+pthread_mutex_t endpoint_hack_mutex;
+pthread_cond_t endpoint_hack_cond;
+
+
+static
+void join_fail(int signum) {
+ (void)signum;
+ printf("Can't join the hang worker\n");
+ exit(EXIT_FAILURE);
+}
+
+static void
+do_nothing(struct cmsg *m)
+{
+ (void) m;
+}
+
+static int
+hang_worker_f(va_list ap)
+{
+ (void) ap;
+ cbus_endpoint_create(&hang_endpoint, "hang_worker",
+ fiber_schedule_cb, fiber());
+
+ tt_pthread_mutex_lock(&endpoint_hack_mutex);
+ tt_pthread_cond_signal(&endpoint_hack_cond);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex);
+
+ cbus_loop(&hang_endpoint);
+ cbus_endpoint_destroy(&hang_endpoint, cbus_process);
+ return 0;
+}
+
+static void
+hang_worker_start()
+{
+ cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL);
+}
+
+static int
+canceled_worker_f(va_list ap)
+{
+ (void) ap;
+ cpipe_create(&pipe_from_cl_to_hang, "hang_worker");
+ cpipe_set_max_input(&pipe_from_cl_to_hang, 1);
+ static struct cmsg_hop nothing_route = { do_nothing, NULL };
+ static struct cmsg nothing_msg;
+ cmsg_init(¬hing_msg, ¬hing_route);
+ /*
+ * We need to use the cpipe_push_input cause
+ * an ev_invoke must be called for a hang reproducing
+ */
+ cpipe_push_input(&pipe_from_cl_to_hang, ¬hing_msg);
+ cpipe_destroy(&pipe_from_cl_to_hang);
+ return 0;
+}
+
+static void
+canceled_worker_start()
+{
+ cord_costart(&canceled_worker, "canceled_worker",
+ canceled_worker_f, NULL);
+}
+
+static int
+main_f(va_list ap)
+{
+ (void) ap;
+ hang_worker_start();
+
+ /* Start the endpoint's mutex hack */
+ tt_pthread_mutex_lock(&endpoint_hack_mutex);
+ tt_pthread_cond_wait(&endpoint_hack_cond, &endpoint_hack_mutex);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex);
+
+ tt_pthread_mutex_lock(&(hang_endpoint.mutex));
+
+ canceled_worker_start();
+
+ while(hang_endpoint.mutex.__data.__lock < 2) {
+ usleep(100);
+ }
+
+ tt_pthread_cancel(canceled_worker.id);
+ pthread_mutex_unlock(&(hang_endpoint.mutex));
+ /* Hack end */
+
+ tt_pthread_join(canceled_worker.id, NULL);
+
+ cpipe_create(&pipe_from_main_to_hang, "hang_worker");
+ cbus_stop_loop(&pipe_from_main_to_hang);
+ cpipe_destroy(&pipe_from_main_to_hang);
+
+ unsigned join_timeout = 5;
+ signal(SIGALRM, join_fail); // For exit in a hang case
+ alarm(join_timeout);
+ cord_join(&hang_worker);
+ ok(true, "The hang worker has been joined");
+ alarm(0);
+
+ ev_break(loop(), EVBREAK_ALL);
+ return 0;
+}
+
+int
+main()
+{
+ header();
+ plan(1);
+
+ memory_init();
+ fiber_init(fiber_c_invoke);
+ cbus_init();
+ tt_pthread_cond_init(&endpoint_hack_cond, NULL);
+ tt_pthread_mutex_init(&endpoint_hack_mutex, NULL);
+
+ struct fiber *main_fiber = fiber_new("main", main_f);
+ assert(main_fiber != NULL);
+ fiber_wakeup(main_fiber);
+ ev_run(loop(), 0);
+
+ tt_pthread_cond_destroy(&endpoint_hack_cond);
+ cbus_free();
+ fiber_free();
+ memory_free();
+
+ int rc = check_plan();
+ footer();
+ return rc;
+}
diff --git a/test/unit/cbus_hang.result b/test/unit/cbus_hang.result
new file mode 100644
index 000000000..d5810e8cf
--- /dev/null
+++ b/test/unit/cbus_hang.result
@@ -0,0 +1,4 @@
+ *** main ***
+1..1
+ok 1 - The hang worker has been joined
+ *** main: done ***
More information about the Tarantool-patches
mailing list