* [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback @ 2019-12-03 16:33 Leonid Vasiliev 2019-12-03 18:02 ` Konstantin Osipov 0 siblings, 1 reply; 8+ messages in thread From: Leonid Vasiliev @ 2019-12-03 16:33 UTC (permalink / raw) To: alexander.turenko; +Cc: tarantool-patches https://github.com/tarantool/tarantool/issues/4127 https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks We need to set a thread cancellation guard, because another thread may cancel the current thread (write() is a cancellation point in ev_async_send) and the activation of the ev_async watcher through ev_async_send will fail. --- src/lib/core/cbus.c | 13 +++++++++++++ src/tt_pthread.h | 5 +++++ 2 files changed, 18 insertions(+) diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c index b3b1280e7..7e9ef91e3 100644 --- a/src/lib/core/cbus.c +++ b/src/lib/core/cbus.c @@ -284,6 +284,17 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events) /* Trigger task processing when the queue becomes non-empty. */ bool output_was_empty; + /* + * We need to set a thread cancellation guard, because + * another thread may cancel the current thread + * (write() is a cancellation point in ev_async_send) + * and the activation of the ev_async watcher + * through ev_async_send will fail. + */ + + int old_cancel_state; + tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state); + tt_pthread_mutex_lock(&endpoint->mutex); output_was_empty = stailq_empty(&endpoint->output); /** Flush input */ @@ -297,6 +308,8 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events) ev_async_send(endpoint->consumer, &endpoint->async); } + + tt_pthread_setcancelstate(old_cancel_state, NULL); } void diff --git a/src/tt_pthread.h b/src/tt_pthread.h index 40b43b112..6bb19eb23 100644 --- a/src/tt_pthread.h +++ b/src/tt_pthread.h @@ -300,6 +300,11 @@ #define tt_pthread_getspecific(key) pthread_getspecific(key) +#define tt_pthread_setcancelstate(state, oldstate) \ +({ int e__ = pthread_setcancelstate(state, oldstate);\ + tt_pthread_error(e__); \ +}) + /** Set the current thread's name */ static inline void -- 2.17.1 ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback 2019-12-03 16:33 [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback Leonid Vasiliev @ 2019-12-03 18:02 ` Konstantin Osipov 2019-12-05 7:22 ` Leonid Vasiliev 0 siblings, 1 reply; 8+ messages in thread From: Konstantin Osipov @ 2019-12-03 18:02 UTC (permalink / raw) To: Leonid Vasiliev; +Cc: tarantool-patches * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/03 19:36]: > https://github.com/tarantool/tarantool/issues/4127 > https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks Looks like a great catch. > We need to set a thread cancellation guard, because > another thread may cancel the current thread > (write() is a cancellation point in ev_async_send) > and the activation of the ev_async watcher > through ev_async_send will fail. I still don't get from the explanation why it is relevant that ev_async_send mustn't fail? -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback 2019-12-03 18:02 ` Konstantin Osipov @ 2019-12-05 7:22 ` Leonid Vasiliev 2019-12-05 7:27 ` Konstantin Osipov 0 siblings, 1 reply; 8+ messages in thread From: Leonid Vasiliev @ 2019-12-05 7:22 UTC (permalink / raw) To: Konstantin Osipov, alexander.turenko, tarantool-patches On 12/3/19 9:02 PM, Konstantin Osipov wrote: > * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/03 19:36]: >> https://github.com/tarantool/tarantool/issues/4127 >> https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks > > Looks like a great catch. > >> We need to set a thread cancellation guard, because >> another thread may cancel the current thread >> (write() is a cancellation point in ev_async_send) >> and the activation of the ev_async watcher >> through ev_async_send will fail. > > I still don't get from the explanation why it is relevant that > ev_async_send mustn't fail? The cause of why the ev_async_send mustn't fail is unwanted behavior of the tarantool instance. For example: first thread flush cpipe input to a endpoint output and go away while trying to call ev_async_send (write() - cancellation point). Now stailq_empty(&endpoint->output) is false. After that, another thread flush cpipe input to the same endpoint, but it didn't try to call ev_async_send, because output_was_empty is false. As result: a thread of endpoint->consumer didn't wake-up (blocked on epoll_wait). The same situation described in https://github.com/tarantool/tarantool/issues/4127: at main thread: from void wal_free(void): cbus_stop_loop(&writer->wal_pipe); if (cord_join(&writer->cord)) {...} // wait the "wal" thread at "wal" thread: don't try to call cbus_stop_loop_f (for the reasons described above) blocked at epoll_wait() > > ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback 2019-12-05 7:22 ` Leonid Vasiliev @ 2019-12-05 7:27 ` Konstantin Osipov 2019-12-23 12:44 ` Leonid Vasiliev 0 siblings, 1 reply; 8+ messages in thread From: Konstantin Osipov @ 2019-12-05 7:27 UTC (permalink / raw) To: Leonid Vasiliev; +Cc: tarantool-patches * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/05 10:24]: > On 12/3/19 9:02 PM, Konstantin Osipov wrote: > > * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/03 19:36]: > > > https://github.com/tarantool/tarantool/issues/4127 > > > https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks > > > > Looks like a great catch. > > > > > We need to set a thread cancellation guard, because > > > another thread may cancel the current thread > > > (write() is a cancellation point in ev_async_send) > > > and the activation of the ev_async watcher > > > through ev_async_send will fail. > > > > I still don't get from the explanation why it is relevant that > > ev_async_send mustn't fail? > > The cause of why the ev_async_send mustn't fail is unwanted behavior of the > tarantool instance. For example: first thread flush cpipe input to a > endpoint output and go away while trying to call ev_async_send (write() - > cancellation point). Now stailq_empty(&endpoint->output) is false. After > that, another thread flush cpipe input to the same endpoint, but it didn't > try to call ev_async_send, because output_was_empty is false. As result: a > thread of endpoint->consumer didn't wake-up (blocked on epoll_wait). The > same situation described in > https://github.com/tarantool/tarantool/issues/4127: > Looks like an explanation that deserves to be in a comment prior to pthread_setcancelstate(). The issue is, however, if a thread is cancelled and disappears before deregistering from cbus, a lot of other bad things will happen - because all of its registration entries will sit there. The memory, luckily, will not go away, but I am not sure anyone but the creating thread can deregister the memory structures safaly. Looks like this could be covered with a unit test, what do you think? > at main thread: > from void wal_free(void): > cbus_stop_loop(&writer->wal_pipe); > if (cord_join(&writer->cord)) {...} // wait the "wal" thread > > at "wal" thread: > don't try to call cbus_stop_loop_f (for the reasons described above) > blocked at epoll_wait() > > > > > -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback 2019-12-05 7:27 ` Konstantin Osipov @ 2019-12-23 12:44 ` Leonid Vasiliev 2019-12-23 12:55 ` Konstantin Osipov 0 siblings, 1 reply; 8+ messages in thread From: Leonid Vasiliev @ 2019-12-23 12:44 UTC (permalink / raw) To: Konstantin Osipov, alexander.turenko, tarantool-patches On 12/5/19 10:27 AM, Konstantin Osipov wrote: > * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/05 10:24]: >> On 12/3/19 9:02 PM, Konstantin Osipov wrote: >>> * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/03 19:36]: >>>> https://github.com/tarantool/tarantool/issues/4127 >>>> https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks >>> >>> Looks like a great catch. >>> >>>> We need to set a thread cancellation guard, because >>>> another thread may cancel the current thread >>>> (write() is a cancellation point in ev_async_send) >>>> and the activation of the ev_async watcher >>>> through ev_async_send will fail. >>> >>> I still don't get from the explanation why it is relevant that >>> ev_async_send mustn't fail? >> >> The cause of why the ev_async_send mustn't fail is unwanted behavior of the >> tarantool instance. For example: first thread flush cpipe input to a >> endpoint output and go away while trying to call ev_async_send (write() - >> cancellation point). Now stailq_empty(&endpoint->output) is false. After >> that, another thread flush cpipe input to the same endpoint, but it didn't >> try to call ev_async_send, because output_was_empty is false. As result: a >> thread of endpoint->consumer didn't wake-up (blocked on epoll_wait). The >> same situation described in >> https://github.com/tarantool/tarantool/issues/4127: >> > > Looks like an explanation that deserves to be in a comment prior > to pthread_setcancelstate(). > > The issue is, however, if a thread is cancelled and disappears > before deregistering from cbus, a lot of other bad things will > happen - because all of its registration entries will sit there. > The memory, luckily, will not go away, but I am not sure anyone > but the creating thread can deregister the memory structures > safaly. Looks like this could be covered with a unit test, what do > you think? > >> at main thread: >> from void wal_free(void): >> cbus_stop_loop(&writer->wal_pipe); >> if (cord_join(&writer->cord)) {...} // wait the "wal" thread >> >> at "wal" thread: >> don't try to call cbus_stop_loop_f (for the reasons described above) >> blocked at epoll_wait() >> >>> >>> > Add a unit test on hang of a thread on join. --- diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 4a57597e9..339521489 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -98,6 +98,9 @@ target_link_libraries(cbus_stress.test core stat) add_executable(cbus.test cbus.c) target_link_libraries(cbus.test core unit stat) +add_executable(cbus_hang.test cbus_hang.c) +target_link_libraries(cbus_hang.test core unit stat) + add_executable(coio.test coio.cc) target_link_libraries(coio.test core eio bit uri unit) diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c new file mode 100644 index 000000000..36487cd9b --- /dev/null +++ b/test/unit/cbus_hang.c @@ -0,0 +1,150 @@ +#include "cbus.h" +#include "fiber.h" +#include "memory.h" +#include "unit.h" + + +struct cord hang_worker; +struct cord canceled_worker; + +struct cbus_endpoint hang_endpoint; +struct cpipe pipe_from_cl_to_hang; +struct cpipe pipe_from_main_to_hang; + +/* + * Hack. + * We need to synchronize the main thread and + * the canceled worker thread for to do a cancellation + * in a specific moment. So, do this using + * the endpoint's mutex. +*/ +pthread_mutex_t endpoint_hack_mutex; +pthread_cond_t endpoint_hack_cond; + + +static +void join_fail(int signum) { + (void)signum; + printf("Can't join the hang worker\n"); + exit(EXIT_FAILURE); +} + +static void +do_nothing(struct cmsg *m) +{ + (void) m; +} + +static int +hang_worker_f(va_list ap) +{ + (void) ap; + cbus_endpoint_create(&hang_endpoint, "hang_worker", + fiber_schedule_cb, fiber()); + + tt_pthread_mutex_lock(&endpoint_hack_mutex); + tt_pthread_cond_signal(&endpoint_hack_cond); + tt_pthread_mutex_unlock(&endpoint_hack_mutex); + + cbus_loop(&hang_endpoint); + cbus_endpoint_destroy(&hang_endpoint, cbus_process); + return 0; +} + +static void +hang_worker_start() +{ + cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL); +} + +static int +canceled_worker_f(va_list ap) +{ + (void) ap; + cpipe_create(&pipe_from_cl_to_hang, "hang_worker"); + cpipe_set_max_input(&pipe_from_cl_to_hang, 1); + static struct cmsg_hop nothing_route = { do_nothing, NULL }; + static struct cmsg nothing_msg; + cmsg_init(¬hing_msg, ¬hing_route); + /* + * We need to use the cpipe_push_input cause + * an ev_invoke must be called for a hang reproducing + */ + cpipe_push_input(&pipe_from_cl_to_hang, ¬hing_msg); + cpipe_destroy(&pipe_from_cl_to_hang); + return 0; +} + +static void +canceled_worker_start() +{ + cord_costart(&canceled_worker, "canceled_worker", + canceled_worker_f, NULL); +} + +static int +main_f(va_list ap) +{ + (void) ap; + hang_worker_start(); + + /* Start the endpoint's mutex hack */ + tt_pthread_mutex_lock(&endpoint_hack_mutex); + tt_pthread_cond_wait(&endpoint_hack_cond, &endpoint_hack_mutex); + tt_pthread_mutex_unlock(&endpoint_hack_mutex); + + tt_pthread_mutex_lock(&(hang_endpoint.mutex)); + + canceled_worker_start(); + + while(hang_endpoint.mutex.__data.__lock < 2) { + usleep(100); + } + + tt_pthread_cancel(canceled_worker.id); + pthread_mutex_unlock(&(hang_endpoint.mutex)); + /* Hack end */ + + tt_pthread_join(canceled_worker.id, NULL); + + cpipe_create(&pipe_from_main_to_hang, "hang_worker"); + cbus_stop_loop(&pipe_from_main_to_hang); + cpipe_destroy(&pipe_from_main_to_hang); + + unsigned join_timeout = 5; + signal(SIGALRM, join_fail); // For exit in a hang case + alarm(join_timeout); + cord_join(&hang_worker); + ok(true, "The hang worker has been joined"); + alarm(0); + + ev_break(loop(), EVBREAK_ALL); + return 0; +} + +int +main() +{ + header(); + plan(1); + + memory_init(); + fiber_init(fiber_c_invoke); + cbus_init(); + tt_pthread_cond_init(&endpoint_hack_cond, NULL); + tt_pthread_mutex_init(&endpoint_hack_mutex, NULL); + + struct fiber *main_fiber = fiber_new("main", main_f); + assert(main_fiber != NULL); + fiber_wakeup(main_fiber); + ev_run(loop(), 0); + + tt_pthread_cond_destroy(&endpoint_hack_cond); + cbus_free(); + fiber_free(); + memory_free(); + + int rc = check_plan(); + footer(); + return rc; +} diff --git a/test/unit/cbus_hang.result b/test/unit/cbus_hang.result new file mode 100644 index 000000000..d5810e8cf --- /dev/null +++ b/test/unit/cbus_hang.result @@ -0,0 +1,4 @@ + *** main *** +1..1 +ok 1 - The hang worker has been joined + *** main: done *** ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback 2019-12-23 12:44 ` Leonid Vasiliev @ 2019-12-23 12:55 ` Konstantin Osipov 2019-12-24 15:27 ` Leonid Vasiliev 0 siblings, 1 reply; 8+ messages in thread From: Konstantin Osipov @ 2019-12-23 12:55 UTC (permalink / raw) To: Leonid Vasiliev; +Cc: tarantool-patches * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/23 15:48]: > + while(hang_endpoint.mutex.__data.__lock < 2) { > + usleep(100); > + } How does this work? Is there a way to not look inside mutex? -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback 2019-12-23 12:55 ` Konstantin Osipov @ 2019-12-24 15:27 ` Leonid Vasiliev 2019-12-24 15:33 ` Konstantin Osipov 0 siblings, 1 reply; 8+ messages in thread From: Leonid Vasiliev @ 2019-12-24 15:27 UTC (permalink / raw) To: Konstantin Osipov, alexander.turenko, tarantool-patches On 12/23/19 3:55 PM, Konstantin Osipov wrote: > * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/23 15:48]: >> + while(hang_endpoint.mutex.__data.__lock < 2) { >> + usleep(100); >> + } > > How does this work? Is there a way to not look inside mutex? > Other alternatives (which I can come up with) add possible races or cancellation points. We must cancel the thread before cancellation point in ev_async_send (ev_async_send(endpoint->consumer, &endpoint->async);) and as close as possible to it for to reproduce a behavior from https://github.com/tarantool/tarantool/issues/4127 . How it's work (it's hack): mutex.__data.__lock == 0 - mutex unlocked mutex.__data.__lock == 1 - mutex locked mutex.__data.__lock == 2 - waiters are exists We just wait while the canceled worker try to lock the endpoint mutex and cancelate thread at the moment. if you have any ideas how this can be done better and without possible races, tell me. And yes, I must to disable this dirty test for OSes other than Linux. --- diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 339521489..fb8f74be0 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -98,8 +98,10 @@ target_link_libraries(cbus_stress.test core stat) add_executable(cbus.test cbus.c) target_link_libraries(cbus.test core unit stat) -add_executable(cbus_hang.test cbus_hang.c) -target_link_libraries(cbus_hang.test core unit stat) +if (${CMAKE_HOST_SYSTEM_NAME} MATCHES "Linux") + add_executable(cbus_hang.test cbus_hang.c) + target_link_libraries(cbus_hang.test core unit stat) +endif () ^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback 2019-12-24 15:27 ` Leonid Vasiliev @ 2019-12-24 15:33 ` Konstantin Osipov 0 siblings, 0 replies; 8+ messages in thread From: Konstantin Osipov @ 2019-12-24 15:33 UTC (permalink / raw) To: Leonid Vasiliev; +Cc: tarantool-patches * Leonid Vasiliev <lvasiliev@tarantool.org> [19/12/24 18:30]: > > Other alternatives (which I can come up with) add possible races or > cancellation points. > We must cancel the thread before cancellation point in ev_async_send > (ev_async_send(endpoint->consumer, &endpoint->async);) and as close as > possible to it for to reproduce a behavior from > https://github.com/tarantool/tarantool/issues/4127 . I think if you add some stress component to test then it'll hit the right timing once in a while without violating encapsulation. But it's great you do have a way to reproduce at least... -- Konstantin Osipov, Moscow, Russia ^ permalink raw reply [flat|nested] 8+ messages in thread
end of thread, other threads:[~2019-12-24 15:33 UTC | newest] Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-12-03 16:33 [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback Leonid Vasiliev 2019-12-03 18:02 ` Konstantin Osipov 2019-12-05 7:22 ` Leonid Vasiliev 2019-12-05 7:27 ` Konstantin Osipov 2019-12-23 12:44 ` Leonid Vasiliev 2019-12-23 12:55 ` Konstantin Osipov 2019-12-24 15:27 ` Leonid Vasiliev 2019-12-24 15:33 ` Konstantin Osipov
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox