[Tarantool-patches] [PATCH v2] Add some cancellation guard
Leonid Vasiliev
lvasiliev at tarantool.org
Thu Dec 26 10:46:38 MSK 2019
We need to set a thread cancellation guard, because
another thread may cancel the current thread at a
really bad time (messages flush, mutex lock)
Fixes: #4127
https://github.com/tarantool/tarantool/issues/4127
https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks
---
src/lib/core/cbus.c | 22 +++++
src/tt_pthread.h | 5 +
test/unit/CMakeLists.txt | 5 +
test/unit/cbus_hang.c | 187 +++++++++++++++++++++++++++++++++++++
test/unit/cbus_hang.result | 4 +
5 files changed, 223 insertions(+)
create mode 100644 test/unit/cbus_hang.c
create mode 100644 test/unit/cbus_hang.result
diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
index b3b1280e7..d3566bc3a 100644
--- a/src/lib/core/cbus.c
+++ b/src/lib/core/cbus.c
@@ -143,6 +143,13 @@ cpipe_destroy(struct cpipe *pipe)
struct cmsg_poison *poison = malloc(sizeof(struct cmsg_poison));
cmsg_init(&poison->msg, route);
poison->endpoint = pipe->endpoint;
+
+ /*
+ * The thread should not be canceled while mutex is locked
+ */
+ int old_cancel_state;
+ tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
+
/*
* Avoid the general purpose cpipe_push_input() since
* we want to control the way the poison message is
@@ -165,6 +172,8 @@ cpipe_destroy(struct cpipe *pipe)
ev_async_send(endpoint->consumer, &endpoint->async);
tt_pthread_mutex_unlock(&endpoint->mutex);
+ tt_pthread_setcancelstate(old_cancel_state, NULL);
+
TRASH(pipe);
}
@@ -284,6 +293,17 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
/* Trigger task processing when the queue becomes non-empty. */
bool output_was_empty;
+ /*
+ * We need to set a thread cancellation guard, because
+ * another thread may cancel the current thread
+ * (write() is a cancellation point in ev_async_send)
+ * and the activation of the ev_async watcher
+ * through ev_async_send will fail.
+ */
+
+ int old_cancel_state;
+ tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
+
tt_pthread_mutex_lock(&endpoint->mutex);
output_was_empty = stailq_empty(&endpoint->output);
/** Flush input */
@@ -297,6 +317,8 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
ev_async_send(endpoint->consumer, &endpoint->async);
}
+
+ tt_pthread_setcancelstate(old_cancel_state, NULL);
}
void
diff --git a/src/tt_pthread.h b/src/tt_pthread.h
index 40b43b112..6bb19eb23 100644
--- a/src/tt_pthread.h
+++ b/src/tt_pthread.h
@@ -300,6 +300,11 @@
#define tt_pthread_getspecific(key) pthread_getspecific(key)
+#define tt_pthread_setcancelstate(state, oldstate) \
+({ int e__ = pthread_setcancelstate(state, oldstate);\
+ tt_pthread_error(e__); \
+})
+
/** Set the current thread's name
*/
static inline void
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 4a57597e9..fb8f74be0 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -98,6 +98,11 @@ target_link_libraries(cbus_stress.test core stat)
add_executable(cbus.test cbus.c)
target_link_libraries(cbus.test core unit stat)
+if (${CMAKE_HOST_SYSTEM_NAME} MATCHES "Linux")
+ add_executable(cbus_hang.test cbus_hang.c)
+ target_link_libraries(cbus_hang.test core unit stat)
+endif ()
+
add_executable(coio.test coio.cc)
target_link_libraries(coio.test core eio bit uri unit)
diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c
new file mode 100644
index 000000000..8b3671001
--- /dev/null
+++ b/test/unit/cbus_hang.c
@@ -0,0 +1,187 @@
+#include "cbus.h"
+#include "fiber.h"
+#include "memory.h"
+#include "unit.h"
+
+
+struct cord hang_worker;
+struct cord canceled_worker;
+
+struct cbus_endpoint hang_endpoint;
+struct cpipe pipe_from_cl_to_hang;
+struct cpipe pipe_from_main_to_hang;
+
+/*
+ * We want to cancel canceled thread in the moment of cpipe_flush_cb
+ * will be processing.
+ * A Linux specific dirty hack will be used for reproduce the bug.
+ * We need to synchronize the main thread and the canceled worker thread.
+ * So, do it using the endpoint's mutex internal field(__data.__lock).
+ * __lock == 0 - unlock
+ * __lock == 1 - lock
+ * __lock == 2 - possible waiters exists
+ * After pthred create - __lock change state from 1 to 2
+*/
+
+pthread_mutex_t endpoint_hack_mutex_1;
+pthread_cond_t endpoint_hack_cond_1;
+
+pthread_mutex_t endpoint_hack_mutex_2;
+pthread_cond_t endpoint_hack_cond_2;
+
+
+static
+void join_fail(int signum) {
+ (void)signum;
+ printf("Can't join the hang worker\n");
+ exit(EXIT_FAILURE);
+}
+
+static void
+do_nothing(struct cmsg *m)
+{
+ (void) m;
+}
+
+static int
+hang_worker_f(va_list ap)
+{
+ (void) ap;
+ cbus_endpoint_create(&hang_endpoint, "hang_worker",
+ fiber_schedule_cb, fiber());
+
+ tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+ tt_pthread_cond_signal(&endpoint_hack_cond_1);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+ cbus_loop(&hang_endpoint);
+ cbus_endpoint_destroy(&hang_endpoint, cbus_process);
+ return 0;
+}
+
+static void
+hang_worker_start()
+{
+ cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL);
+}
+
+static int
+canceled_worker_f(va_list ap)
+{
+ (void) ap;
+
+ tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+ tt_pthread_cond_signal(&endpoint_hack_cond_1);
+
+ /* Wait a start command from the main thread */
+ tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+ tt_pthread_cond_wait(&endpoint_hack_cond_2, &endpoint_hack_mutex_2);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
+
+ cpipe_create(&pipe_from_cl_to_hang, "hang_worker");
+ cpipe_set_max_input(&pipe_from_cl_to_hang, 1);
+ static struct cmsg_hop nothing_route = { do_nothing, NULL };
+ static struct cmsg nothing_msg;
+ cmsg_init(¬hing_msg, ¬hing_route);
+ /*
+ * We need to use the cpipe_push_input cause
+ * an ev_invoke must be called for a hang reproducing
+ */
+ cpipe_push_input(&pipe_from_cl_to_hang, ¬hing_msg);
+ cpipe_destroy(&pipe_from_cl_to_hang);
+ return 0;
+}
+
+static void
+canceled_worker_start()
+{
+ cord_costart(&canceled_worker, "canceled_worker",
+ canceled_worker_f, NULL);
+}
+
+static int
+main_f(va_list ap)
+{
+ (void) ap;
+
+ /* Start the endpoint's mutex hack */
+
+ /* Initialize the endpoint mutex */
+ tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+ hang_worker_start();
+ tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+ /*
+ * Create (only create) the canceled worker before the endpoint mutex will be locked
+ * for the hack work correctly
+ */
+ tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+ canceled_worker_start();
+ tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+ tt_pthread_mutex_lock(&(hang_endpoint.mutex));
+
+ /* Start canceled worker */
+ tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
+ tt_pthread_cond_signal(&endpoint_hack_cond_2);
+ tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
+
+ while(hang_endpoint.mutex.__data.__lock < 2) {
+ usleep(200);
+ }
+
+ tt_pthread_cancel(canceled_worker.id);
+ tt_pthread_mutex_unlock(&(hang_endpoint.mutex));
+ /* Hack end */
+
+ tt_pthread_join(canceled_worker.id, NULL);
+
+ unsigned join_timeout = 5;
+ signal(SIGALRM, join_fail); // For exit in a hang case
+ alarm(join_timeout);
+
+ cpipe_create(&pipe_from_main_to_hang, "hang_worker");
+ cbus_stop_loop(&pipe_from_main_to_hang);
+ cpipe_destroy(&pipe_from_main_to_hang);
+
+ cord_join(&hang_worker);
+ ok(true, "The hang worker has been joined");
+ alarm(0);
+
+ ev_break(loop(), EVBREAK_ALL);
+ return 0;
+}
+
+int
+main()
+{
+ header();
+ plan(1);
+
+ memory_init();
+ fiber_init(fiber_c_invoke);
+ cbus_init();
+ tt_pthread_cond_init(&endpoint_hack_cond_1, NULL);
+ tt_pthread_mutex_init(&endpoint_hack_mutex_1, NULL);
+ tt_pthread_cond_init(&endpoint_hack_cond_2, NULL);
+ tt_pthread_mutex_init(&endpoint_hack_mutex_2, NULL);
+
+ struct fiber *main_fiber = fiber_new("main", main_f);
+ assert(main_fiber != NULL);
+ fiber_wakeup(main_fiber);
+ ev_run(loop(), 0);
+
+ tt_pthread_cond_destroy(&endpoint_hack_cond_1);
+ tt_pthread_cond_destroy(&endpoint_hack_cond_2);
+ cbus_free();
+ fiber_free();
+ memory_free();
+
+ int rc = check_plan();
+ footer();
+ return rc;
+}
diff --git a/test/unit/cbus_hang.result b/test/unit/cbus_hang.result
new file mode 100644
index 000000000..d5810e8cf
--- /dev/null
+++ b/test/unit/cbus_hang.result
@@ -0,0 +1,4 @@
+ *** main ***
+1..1
+ok 1 - The hang worker has been joined
+ *** main: done ***
--
2.17.1
More information about the Tarantool-patches
mailing list