From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9677B46970E for ; Thu, 26 Dec 2019 10:46:41 +0300 (MSK) From: Leonid Vasiliev Date: Thu, 26 Dec 2019 10:46:38 +0300 Message-Id: <0e7fddec984120e36c098cd546ecae4fddc726cd.1577346185.git.lvasiliev@tarantool.org> Subject: [Tarantool-patches] [PATCH v2] Add some cancellation guard List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: alexander.turenko@tarantool.org Cc: tarantool-patches@dev.tarantool.org We need to set a thread cancellation guard, because another thread may cancel the current thread at a really bad time (messages flush, mutex lock) Fixes: #4127 https://github.com/tarantool/tarantool/issues/4127 https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks --- src/lib/core/cbus.c | 22 +++++ src/tt_pthread.h | 5 + test/unit/CMakeLists.txt | 5 + test/unit/cbus_hang.c | 187 +++++++++++++++++++++++++++++++++++++ test/unit/cbus_hang.result | 4 + 5 files changed, 223 insertions(+) create mode 100644 test/unit/cbus_hang.c create mode 100644 test/unit/cbus_hang.result diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c index b3b1280e7..d3566bc3a 100644 --- a/src/lib/core/cbus.c +++ b/src/lib/core/cbus.c @@ -143,6 +143,13 @@ cpipe_destroy(struct cpipe *pipe) struct cmsg_poison *poison = malloc(sizeof(struct cmsg_poison)); cmsg_init(&poison->msg, route); poison->endpoint = pipe->endpoint; + + /* + * The thread should not be canceled while mutex is locked + */ + int old_cancel_state; + tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state); + /* * Avoid the general purpose cpipe_push_input() since * we want to control the way the poison message is @@ -165,6 +172,8 @@ cpipe_destroy(struct cpipe *pipe) ev_async_send(endpoint->consumer, &endpoint->async); tt_pthread_mutex_unlock(&endpoint->mutex); + tt_pthread_setcancelstate(old_cancel_state, NULL); + TRASH(pipe); } @@ -284,6 +293,17 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events) /* Trigger task processing when the queue becomes non-empty. */ bool output_was_empty; + /* + * We need to set a thread cancellation guard, because + * another thread may cancel the current thread + * (write() is a cancellation point in ev_async_send) + * and the activation of the ev_async watcher + * through ev_async_send will fail. + */ + + int old_cancel_state; + tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state); + tt_pthread_mutex_lock(&endpoint->mutex); output_was_empty = stailq_empty(&endpoint->output); /** Flush input */ @@ -297,6 +317,8 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events) ev_async_send(endpoint->consumer, &endpoint->async); } + + tt_pthread_setcancelstate(old_cancel_state, NULL); } void diff --git a/src/tt_pthread.h b/src/tt_pthread.h index 40b43b112..6bb19eb23 100644 --- a/src/tt_pthread.h +++ b/src/tt_pthread.h @@ -300,6 +300,11 @@ #define tt_pthread_getspecific(key) pthread_getspecific(key) +#define tt_pthread_setcancelstate(state, oldstate) \ +({ int e__ = pthread_setcancelstate(state, oldstate);\ + tt_pthread_error(e__); \ +}) + /** Set the current thread's name */ static inline void diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 4a57597e9..fb8f74be0 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -98,6 +98,11 @@ target_link_libraries(cbus_stress.test core stat) add_executable(cbus.test cbus.c) target_link_libraries(cbus.test core unit stat) +if (${CMAKE_HOST_SYSTEM_NAME} MATCHES "Linux") + add_executable(cbus_hang.test cbus_hang.c) + target_link_libraries(cbus_hang.test core unit stat) +endif () + add_executable(coio.test coio.cc) target_link_libraries(coio.test core eio bit uri unit) diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c new file mode 100644 index 000000000..8b3671001 --- /dev/null +++ b/test/unit/cbus_hang.c @@ -0,0 +1,187 @@ +#include "cbus.h" +#include "fiber.h" +#include "memory.h" +#include "unit.h" + + +struct cord hang_worker; +struct cord canceled_worker; + +struct cbus_endpoint hang_endpoint; +struct cpipe pipe_from_cl_to_hang; +struct cpipe pipe_from_main_to_hang; + +/* + * We want to cancel canceled thread in the moment of cpipe_flush_cb + * will be processing. + * A Linux specific dirty hack will be used for reproduce the bug. + * We need to synchronize the main thread and the canceled worker thread. + * So, do it using the endpoint's mutex internal field(__data.__lock). + * __lock == 0 - unlock + * __lock == 1 - lock + * __lock == 2 - possible waiters exists + * After pthred create - __lock change state from 1 to 2 +*/ + +pthread_mutex_t endpoint_hack_mutex_1; +pthread_cond_t endpoint_hack_cond_1; + +pthread_mutex_t endpoint_hack_mutex_2; +pthread_cond_t endpoint_hack_cond_2; + + +static +void join_fail(int signum) { + (void)signum; + printf("Can't join the hang worker\n"); + exit(EXIT_FAILURE); +} + +static void +do_nothing(struct cmsg *m) +{ + (void) m; +} + +static int +hang_worker_f(va_list ap) +{ + (void) ap; + cbus_endpoint_create(&hang_endpoint, "hang_worker", + fiber_schedule_cb, fiber()); + + tt_pthread_mutex_lock(&endpoint_hack_mutex_1); + tt_pthread_cond_signal(&endpoint_hack_cond_1); + tt_pthread_mutex_unlock(&endpoint_hack_mutex_1); + + cbus_loop(&hang_endpoint); + cbus_endpoint_destroy(&hang_endpoint, cbus_process); + return 0; +} + +static void +hang_worker_start() +{ + cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL); +} + +static int +canceled_worker_f(va_list ap) +{ + (void) ap; + + tt_pthread_mutex_lock(&endpoint_hack_mutex_1); + tt_pthread_cond_signal(&endpoint_hack_cond_1); + + /* Wait a start command from the main thread */ + tt_pthread_mutex_lock(&endpoint_hack_mutex_2); + tt_pthread_mutex_unlock(&endpoint_hack_mutex_1); + + tt_pthread_cond_wait(&endpoint_hack_cond_2, &endpoint_hack_mutex_2); + tt_pthread_mutex_unlock(&endpoint_hack_mutex_2); + + cpipe_create(&pipe_from_cl_to_hang, "hang_worker"); + cpipe_set_max_input(&pipe_from_cl_to_hang, 1); + static struct cmsg_hop nothing_route = { do_nothing, NULL }; + static struct cmsg nothing_msg; + cmsg_init(¬hing_msg, ¬hing_route); + /* + * We need to use the cpipe_push_input cause + * an ev_invoke must be called for a hang reproducing + */ + cpipe_push_input(&pipe_from_cl_to_hang, ¬hing_msg); + cpipe_destroy(&pipe_from_cl_to_hang); + return 0; +} + +static void +canceled_worker_start() +{ + cord_costart(&canceled_worker, "canceled_worker", + canceled_worker_f, NULL); +} + +static int +main_f(va_list ap) +{ + (void) ap; + + /* Start the endpoint's mutex hack */ + + /* Initialize the endpoint mutex */ + tt_pthread_mutex_lock(&endpoint_hack_mutex_1); + hang_worker_start(); + tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1); + tt_pthread_mutex_unlock(&endpoint_hack_mutex_1); + + /* + * Create (only create) the canceled worker before the endpoint mutex will be locked + * for the hack work correctly + */ + tt_pthread_mutex_lock(&endpoint_hack_mutex_1); + canceled_worker_start(); + tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1); + tt_pthread_mutex_unlock(&endpoint_hack_mutex_1); + + tt_pthread_mutex_lock(&(hang_endpoint.mutex)); + + /* Start canceled worker */ + tt_pthread_mutex_lock(&endpoint_hack_mutex_2); + tt_pthread_cond_signal(&endpoint_hack_cond_2); + tt_pthread_mutex_unlock(&endpoint_hack_mutex_2); + + while(hang_endpoint.mutex.__data.__lock < 2) { + usleep(200); + } + + tt_pthread_cancel(canceled_worker.id); + tt_pthread_mutex_unlock(&(hang_endpoint.mutex)); + /* Hack end */ + + tt_pthread_join(canceled_worker.id, NULL); + + unsigned join_timeout = 5; + signal(SIGALRM, join_fail); // For exit in a hang case + alarm(join_timeout); + + cpipe_create(&pipe_from_main_to_hang, "hang_worker"); + cbus_stop_loop(&pipe_from_main_to_hang); + cpipe_destroy(&pipe_from_main_to_hang); + + cord_join(&hang_worker); + ok(true, "The hang worker has been joined"); + alarm(0); + + ev_break(loop(), EVBREAK_ALL); + return 0; +} + +int +main() +{ + header(); + plan(1); + + memory_init(); + fiber_init(fiber_c_invoke); + cbus_init(); + tt_pthread_cond_init(&endpoint_hack_cond_1, NULL); + tt_pthread_mutex_init(&endpoint_hack_mutex_1, NULL); + tt_pthread_cond_init(&endpoint_hack_cond_2, NULL); + tt_pthread_mutex_init(&endpoint_hack_mutex_2, NULL); + + struct fiber *main_fiber = fiber_new("main", main_f); + assert(main_fiber != NULL); + fiber_wakeup(main_fiber); + ev_run(loop(), 0); + + tt_pthread_cond_destroy(&endpoint_hack_cond_1); + tt_pthread_cond_destroy(&endpoint_hack_cond_2); + cbus_free(); + fiber_free(); + memory_free(); + + int rc = check_plan(); + footer(); + return rc; +} diff --git a/test/unit/cbus_hang.result b/test/unit/cbus_hang.result new file mode 100644 index 000000000..d5810e8cf --- /dev/null +++ b/test/unit/cbus_hang.result @@ -0,0 +1,4 @@ + *** main *** +1..1 +ok 1 - The hang worker has been joined + *** main: done *** -- 2.17.1