[Tarantool-patches] [PATCH v2] Add some cancellation guard

Leonid Vasiliev lvasiliev at tarantool.org
Thu Dec 26 10:46:38 MSK 2019


We need to set a thread cancellation guard, because
another thread may cancel the current thread at a
really bad time (messages flush, mutex lock)

Fixes: #4127

https://github.com/tarantool/tarantool/issues/4127
https://github.com/tarantool/tarantool/tree/lvasiliev/gh-4127-WAL-thread-stucks

---
 src/lib/core/cbus.c        |  22 +++++
 src/tt_pthread.h           |   5 +
 test/unit/CMakeLists.txt   |   5 +
 test/unit/cbus_hang.c      | 187 +++++++++++++++++++++++++++++++++++++
 test/unit/cbus_hang.result |   4 +
 5 files changed, 223 insertions(+)
 create mode 100644 test/unit/cbus_hang.c
 create mode 100644 test/unit/cbus_hang.result

diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
index b3b1280e7..d3566bc3a 100644
--- a/src/lib/core/cbus.c
+++ b/src/lib/core/cbus.c
@@ -143,6 +143,13 @@ cpipe_destroy(struct cpipe *pipe)
 	struct cmsg_poison *poison = malloc(sizeof(struct cmsg_poison));
 	cmsg_init(&poison->msg, route);
 	poison->endpoint = pipe->endpoint;
+
+	/*
+	 * The thread should not be canceled while mutex is locked
+	*/
+	int old_cancel_state;
+	tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
+
 	/*
 	 * Avoid the general purpose cpipe_push_input() since
 	 * we want to control the way the poison message is
@@ -165,6 +172,8 @@ cpipe_destroy(struct cpipe *pipe)
 	ev_async_send(endpoint->consumer, &endpoint->async);
 	tt_pthread_mutex_unlock(&endpoint->mutex);
 
+	tt_pthread_setcancelstate(old_cancel_state, NULL);
+
 	TRASH(pipe);
 }
 
@@ -284,6 +293,17 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
 	/* Trigger task processing when the queue becomes non-empty. */
 	bool output_was_empty;
 
+	/*
+	 * We need to set a thread cancellation guard, because
+	 * another thread may cancel the current thread
+	 * (write() is a cancellation point in ev_async_send)
+	 * and the activation of the ev_async watcher
+	 * through ev_async_send will fail.
+	 */
+
+	int old_cancel_state;
+	tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
+
 	tt_pthread_mutex_lock(&endpoint->mutex);
 	output_was_empty = stailq_empty(&endpoint->output);
 	/** Flush input */
@@ -297,6 +317,8 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
 
 		ev_async_send(endpoint->consumer, &endpoint->async);
 	}
+
+	tt_pthread_setcancelstate(old_cancel_state, NULL);
 }
 
 void
diff --git a/src/tt_pthread.h b/src/tt_pthread.h
index 40b43b112..6bb19eb23 100644
--- a/src/tt_pthread.h
+++ b/src/tt_pthread.h
@@ -300,6 +300,11 @@
 
 #define tt_pthread_getspecific(key) pthread_getspecific(key)
 
+#define tt_pthread_setcancelstate(state, oldstate)	\
+({	int e__ = pthread_setcancelstate(state, oldstate);\
+	tt_pthread_error(e__);				\
+})
+
 /** Set the current thread's name
  */
 static inline void
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 4a57597e9..fb8f74be0 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -98,6 +98,11 @@ target_link_libraries(cbus_stress.test core stat)
 add_executable(cbus.test cbus.c)
 target_link_libraries(cbus.test core unit stat)
 
+if (${CMAKE_HOST_SYSTEM_NAME} MATCHES "Linux")
+    add_executable(cbus_hang.test cbus_hang.c)
+    target_link_libraries(cbus_hang.test core unit stat)
+endif ()
+
 add_executable(coio.test coio.cc)
 target_link_libraries(coio.test core eio bit uri unit)
 
diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c
new file mode 100644
index 000000000..8b3671001
--- /dev/null
+++ b/test/unit/cbus_hang.c
@@ -0,0 +1,187 @@
+#include "cbus.h"
+#include "fiber.h"
+#include "memory.h"
+#include "unit.h"
+
+
+struct cord hang_worker;
+struct cord canceled_worker;
+
+struct cbus_endpoint hang_endpoint;
+struct cpipe pipe_from_cl_to_hang;
+struct cpipe pipe_from_main_to_hang;
+
+/*
+ * We want to cancel canceled thread in the moment of cpipe_flush_cb
+ * will be processing.
+ * A Linux specific dirty hack will be used for reproduce the bug.
+ * We need to synchronize the main thread and the canceled worker thread.
+ * So, do it using the endpoint's mutex internal field(__data.__lock).
+ * __lock == 0 - unlock
+ * __lock == 1 - lock
+ * __lock == 2 - possible waiters exists
+ * After pthred create - __lock change state from 1 to 2
+*/
+
+pthread_mutex_t endpoint_hack_mutex_1;
+pthread_cond_t endpoint_hack_cond_1;
+
+pthread_mutex_t endpoint_hack_mutex_2;
+pthread_cond_t endpoint_hack_cond_2;
+
+
+static
+void join_fail(int signum) {
+	(void)signum;
+	printf("Can't join the hang worker\n");
+	exit(EXIT_FAILURE);
+}
+
+static void
+do_nothing(struct cmsg *m)
+{
+	(void) m;
+}
+
+static int
+hang_worker_f(va_list ap)
+{
+	(void) ap;
+	cbus_endpoint_create(&hang_endpoint, "hang_worker",
+	                     fiber_schedule_cb, fiber());
+
+	tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+	tt_pthread_cond_signal(&endpoint_hack_cond_1);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+	cbus_loop(&hang_endpoint);
+	cbus_endpoint_destroy(&hang_endpoint, cbus_process);
+	return 0;
+}
+
+static void
+hang_worker_start()
+{
+	cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL);
+}
+
+static int
+canceled_worker_f(va_list ap)
+{
+	(void) ap;
+
+	tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+	tt_pthread_cond_signal(&endpoint_hack_cond_1);
+
+	/* Wait a start command from the main thread */
+	tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+	tt_pthread_cond_wait(&endpoint_hack_cond_2, &endpoint_hack_mutex_2);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
+
+	cpipe_create(&pipe_from_cl_to_hang, "hang_worker");
+	cpipe_set_max_input(&pipe_from_cl_to_hang, 1);
+	static struct cmsg_hop nothing_route = { do_nothing, NULL };
+	static struct cmsg nothing_msg;
+	cmsg_init(&nothing_msg, &nothing_route);
+	/*
+	 * We need to use the cpipe_push_input cause
+	 * an ev_invoke must be called for a hang reproducing
+	 */
+	cpipe_push_input(&pipe_from_cl_to_hang, &nothing_msg);
+	cpipe_destroy(&pipe_from_cl_to_hang);
+	return 0;
+}
+
+static void
+canceled_worker_start()
+{
+	cord_costart(&canceled_worker, "canceled_worker",
+	             canceled_worker_f, NULL);
+}
+
+static int
+main_f(va_list ap)
+{
+	(void) ap;
+
+	/* Start the endpoint's mutex hack */
+
+	/* Initialize the endpoint mutex */
+	tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+	hang_worker_start();
+	tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+	/*
+	 * Create (only create) the canceled worker before the endpoint mutex will be locked
+	 * for the hack work correctly
+	*/
+	tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
+	canceled_worker_start();
+	tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
+
+	tt_pthread_mutex_lock(&(hang_endpoint.mutex));
+
+	/* Start canceled worker */
+	tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
+	tt_pthread_cond_signal(&endpoint_hack_cond_2);
+	tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
+
+	while(hang_endpoint.mutex.__data.__lock < 2) {
+		usleep(200);
+	}
+
+	tt_pthread_cancel(canceled_worker.id);
+	tt_pthread_mutex_unlock(&(hang_endpoint.mutex));
+	/* Hack end */
+
+	tt_pthread_join(canceled_worker.id, NULL);
+
+	unsigned join_timeout = 5;
+	signal(SIGALRM, join_fail); // For exit in a hang case
+	alarm(join_timeout);
+
+	cpipe_create(&pipe_from_main_to_hang, "hang_worker");
+	cbus_stop_loop(&pipe_from_main_to_hang);
+	cpipe_destroy(&pipe_from_main_to_hang);
+
+	cord_join(&hang_worker);
+	ok(true, "The hang worker has been joined");
+	alarm(0);
+
+	ev_break(loop(), EVBREAK_ALL);
+	return 0;
+}
+
+int
+main()
+{
+	header();
+	plan(1);
+
+	memory_init();
+	fiber_init(fiber_c_invoke);
+	cbus_init();
+	tt_pthread_cond_init(&endpoint_hack_cond_1, NULL);
+	tt_pthread_mutex_init(&endpoint_hack_mutex_1, NULL);
+	tt_pthread_cond_init(&endpoint_hack_cond_2, NULL);
+	tt_pthread_mutex_init(&endpoint_hack_mutex_2, NULL);
+
+	struct fiber *main_fiber = fiber_new("main", main_f);
+	assert(main_fiber != NULL);
+	fiber_wakeup(main_fiber);
+	ev_run(loop(), 0);
+
+	tt_pthread_cond_destroy(&endpoint_hack_cond_1);
+	tt_pthread_cond_destroy(&endpoint_hack_cond_2);
+	cbus_free();
+	fiber_free();
+	memory_free();
+
+	int rc = check_plan();
+	footer();
+	return rc;
+}
diff --git a/test/unit/cbus_hang.result b/test/unit/cbus_hang.result
new file mode 100644
index 000000000..d5810e8cf
--- /dev/null
+++ b/test/unit/cbus_hang.result
@@ -0,0 +1,4 @@
+	*** main ***
+1..1
+ok 1 - The hang worker has been joined
+	*** main: done ***
-- 
2.17.1



More information about the Tarantool-patches mailing list