From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 7EE2E46970E for ; Mon, 23 Dec 2019 15:44:54 +0300 (MSK) References: <73ebdf94c8f03fca216de9141c6541870b1ed938.1575390549.git.lvasiliev@tarantool.org> <20191203180257.GA4364@atlas> <65b9d108-fe36-1c9b-82c0-60e98efce658@tarantool.org> <20191205072706.GA16690@atlas> From: Leonid Vasiliev Message-ID: <442799bc-16a2-d5db-dc8c-d4663adfbe81@tarantool.org> Date: Mon, 23 Dec 2019 15:44:52 +0300 MIME-Version: 1.0 In-Reply-To: <20191205072706.GA16690@atlas> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH] Add a cancellation guard to cpipe flush callback List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Konstantin Osipov , alexander.turenko@tarantool.org, tarantool-patches@dev.tarantool.org On 12/5/19 10:27 AM, Konstantin Osipov wrote: > * Leonid Vasiliev [19/12/05 10:24]: >> On 12/3/19 9:02 PM, Konstantin Osipov wrote: >>> * Leonid Vasiliev [19/12/03 19:36]: >>>> https://github.com/tarantool/tarantool/issues/4127 >>>> https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks >>> >>> Looks like a great catch. >>> >>>> We need to set a thread cancellation guard, because >>>> another thread may cancel the current thread >>>> (write() is a cancellation point in ev_async_send) >>>> and the activation of the ev_async watcher >>>> through ev_async_send will fail. >>> >>> I still don't get from the explanation why it is relevant that >>> ev_async_send mustn't fail? >> >> The cause of why the ev_async_send mustn't fail is unwanted behavior of the >> tarantool instance. For example: first thread flush cpipe input to a >> endpoint output and go away while trying to call ev_async_send (write() - >> cancellation point). Now stailq_empty(&endpoint->output) is false. After >> that, another thread flush cpipe input to the same endpoint, but it didn't >> try to call ev_async_send, because output_was_empty is false. As result: a >> thread of endpoint->consumer didn't wake-up (blocked on epoll_wait). The >> same situation described in >> https://github.com/tarantool/tarantool/issues/4127: >> > > Looks like an explanation that deserves to be in a comment prior > to pthread_setcancelstate(). > > The issue is, however, if a thread is cancelled and disappears > before deregistering from cbus, a lot of other bad things will > happen - because all of its registration entries will sit there. > The memory, luckily, will not go away, but I am not sure anyone > but the creating thread can deregister the memory structures > safaly. Looks like this could be covered with a unit test, what do > you think? > >> at main thread: >> from void wal_free(void): >> cbus_stop_loop(&writer->wal_pipe); >> if (cord_join(&writer->cord)) {...} // wait the "wal" thread >> >> at "wal" thread: >> don't try to call cbus_stop_loop_f (for the reasons described above) >> blocked at epoll_wait() >> >>> >>> > Add a unit test on hang of a thread on join. --- diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 4a57597e9..339521489 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -98,6 +98,9 @@ target_link_libraries(cbus_stress.test core stat) add_executable(cbus.test cbus.c) target_link_libraries(cbus.test core unit stat) +add_executable(cbus_hang.test cbus_hang.c) +target_link_libraries(cbus_hang.test core unit stat) + add_executable(coio.test coio.cc) target_link_libraries(coio.test core eio bit uri unit) diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c new file mode 100644 index 000000000..36487cd9b --- /dev/null +++ b/test/unit/cbus_hang.c @@ -0,0 +1,150 @@ +#include "cbus.h" +#include "fiber.h" +#include "memory.h" +#include "unit.h" + + +struct cord hang_worker; +struct cord canceled_worker; + +struct cbus_endpoint hang_endpoint; +struct cpipe pipe_from_cl_to_hang; +struct cpipe pipe_from_main_to_hang; + +/* + * Hack. + * We need to synchronize the main thread and + * the canceled worker thread for to do a cancellation + * in a specific moment. So, do this using + * the endpoint's mutex. +*/ +pthread_mutex_t endpoint_hack_mutex; +pthread_cond_t endpoint_hack_cond; + + +static +void join_fail(int signum) { + (void)signum; + printf("Can't join the hang worker\n"); + exit(EXIT_FAILURE); +} + +static void +do_nothing(struct cmsg *m) +{ + (void) m; +} + +static int +hang_worker_f(va_list ap) +{ + (void) ap; + cbus_endpoint_create(&hang_endpoint, "hang_worker", + fiber_schedule_cb, fiber()); + + tt_pthread_mutex_lock(&endpoint_hack_mutex); + tt_pthread_cond_signal(&endpoint_hack_cond); + tt_pthread_mutex_unlock(&endpoint_hack_mutex); + + cbus_loop(&hang_endpoint); + cbus_endpoint_destroy(&hang_endpoint, cbus_process); + return 0; +} + +static void +hang_worker_start() +{ + cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL); +} + +static int +canceled_worker_f(va_list ap) +{ + (void) ap; + cpipe_create(&pipe_from_cl_to_hang, "hang_worker"); + cpipe_set_max_input(&pipe_from_cl_to_hang, 1); + static struct cmsg_hop nothing_route = { do_nothing, NULL }; + static struct cmsg nothing_msg; + cmsg_init(¬hing_msg, ¬hing_route); + /* + * We need to use the cpipe_push_input cause + * an ev_invoke must be called for a hang reproducing + */ + cpipe_push_input(&pipe_from_cl_to_hang, ¬hing_msg); + cpipe_destroy(&pipe_from_cl_to_hang); + return 0; +} + +static void +canceled_worker_start() +{ + cord_costart(&canceled_worker, "canceled_worker", + canceled_worker_f, NULL); +} + +static int +main_f(va_list ap) +{ + (void) ap; + hang_worker_start(); + + /* Start the endpoint's mutex hack */ + tt_pthread_mutex_lock(&endpoint_hack_mutex); + tt_pthread_cond_wait(&endpoint_hack_cond, &endpoint_hack_mutex); + tt_pthread_mutex_unlock(&endpoint_hack_mutex); + + tt_pthread_mutex_lock(&(hang_endpoint.mutex)); + + canceled_worker_start(); + + while(hang_endpoint.mutex.__data.__lock < 2) { + usleep(100); + } + + tt_pthread_cancel(canceled_worker.id); + pthread_mutex_unlock(&(hang_endpoint.mutex)); + /* Hack end */ + + tt_pthread_join(canceled_worker.id, NULL); + + cpipe_create(&pipe_from_main_to_hang, "hang_worker"); + cbus_stop_loop(&pipe_from_main_to_hang); + cpipe_destroy(&pipe_from_main_to_hang); + + unsigned join_timeout = 5; + signal(SIGALRM, join_fail); // For exit in a hang case + alarm(join_timeout); + cord_join(&hang_worker); + ok(true, "The hang worker has been joined"); + alarm(0); + + ev_break(loop(), EVBREAK_ALL); + return 0; +} + +int +main() +{ + header(); + plan(1); + + memory_init(); + fiber_init(fiber_c_invoke); + cbus_init(); + tt_pthread_cond_init(&endpoint_hack_cond, NULL); + tt_pthread_mutex_init(&endpoint_hack_mutex, NULL); + + struct fiber *main_fiber = fiber_new("main", main_f); + assert(main_fiber != NULL); + fiber_wakeup(main_fiber); + ev_run(loop(), 0); + + tt_pthread_cond_destroy(&endpoint_hack_cond); + cbus_free(); + fiber_free(); + memory_free(); + + int rc = check_plan(); + footer(); + return rc; +} diff --git a/test/unit/cbus_hang.result b/test/unit/cbus_hang.result new file mode 100644 index 000000000..d5810e8cf --- /dev/null +++ b/test/unit/cbus_hang.result @@ -0,0 +1,4 @@ + *** main *** +1..1 +ok 1 - The hang worker has been joined + *** main: done ***