[PATCH 3/3] vinyl: abort worker threads on shutdown

Vladimir Davydov vdavydov.dev at gmail.com
Thu Mar 1 13:57:32 MSK 2018


If the user terminates tarantool while compaction is in progress,
tarantool won't exit immediately - it will hang until all dump and
compaction tasks that are currently in progress are complete, which
may take quite a while on large data sets. What is especially funny,
once a task has been finished, vinyl will not commit the produced run
file in the vylog, because the event loop has already been stopped,
and so will delete it after restart and schedule the task anew.

This patch makes the scheduler forcefully abort all running tasks
as soon as possible.

Closes #3166
---
 src/box/vy_scheduler.c     | 25 ++++++++++++++++++-------
 src/errinj.h               |  1 +
 test/box/errinj.result     |  4 +++-
 test/vinyl/errinj.result   | 40 ++++++++++++++++++++++++++++++++++++++++
 test/vinyl/errinj.test.lua | 16 ++++++++++++++++
 5 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index d833514c..382cf071 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -82,7 +82,7 @@ struct vy_task_ops {
 	 * which is too heavy for the tx thread (like IO or compression).
 	 * Returns 0 on success. On failure returns -1 and sets diag.
 	 */
-	int (*execute)(struct vy_task *task);
+	int (*execute)(struct vy_scheduler *scheduler, struct vy_task *task);
 	/**
 	 * This function is called by the scheduler upon task completion.
 	 * It may be used to finish the task from the tx thread context.
@@ -349,6 +349,7 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler)
 	/* Stop scheduler fiber. */
 	scheduler->scheduler_fiber = NULL;
 	/* Sic: fiber_cancel() can't be used here. */
+	fiber_cond_signal(&scheduler->dump_cond);
 	fiber_cond_signal(&scheduler->scheduler_cond);
 
 	if (scheduler->is_worker_pool_running)
@@ -621,7 +622,7 @@ vy_run_discard(struct vy_run *run)
 }
 
 static int
-vy_task_write_run(struct vy_task *task)
+vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task)
 {
 	struct vy_index *index = task->index;
 	struct vy_stmt_stream *wi = task->wi;
@@ -647,9 +648,19 @@ vy_task_write_run(struct vy_task *task)
 	int rc;
 	struct tuple *stmt = NULL;
 	while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) {
+		inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE);
+		if (inj != NULL && inj->dparam > 0)
+			usleep(inj->dparam * 1000000);
+
 		rc = vy_run_writer_append_stmt(&writer, stmt);
 		if (rc != 0)
 			break;
+
+		if (!scheduler->is_worker_pool_running) {
+			diag_set(FiberIsCancelled);
+			rc = -1;
+			break;
+		}
 	}
 	wi->iface->stop(wi);
 
@@ -667,9 +678,9 @@ fail:
 }
 
 static int
-vy_task_dump_execute(struct vy_task *task)
+vy_task_dump_execute(struct vy_scheduler *scheduler, struct vy_task *task)
 {
-	return vy_task_write_run(task);
+	return vy_task_write_run(scheduler, task);
 }
 
 static int
@@ -1030,9 +1041,9 @@ err:
 }
 
 static int
-vy_task_compact_execute(struct vy_task *task)
+vy_task_compact_execute(struct vy_scheduler *scheduler, struct vy_task *task)
 {
-	return vy_task_write_run(task);
+	return vy_task_write_run(scheduler, task);
 }
 
 static int
@@ -1617,7 +1628,7 @@ vy_worker_f(void *arg)
 		assert(task != NULL);
 
 		/* Execute task */
-		task->status = task->ops->execute(task);
+		task->status = task->ops->execute(scheduler, task);
 		if (task->status != 0) {
 			struct diag *diag = diag_get();
 			assert(!diag_is_empty(diag));
diff --git a/src/errinj.h b/src/errinj.h
index 20f3824c..352a3c3c 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -106,6 +106,7 @@ struct errinj {
 	_(ERRINJ_VY_POINT_ITER_WAIT, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE, {.dparam = 0}) \
 	_(ERRINJ_VY_DELAY_PK_LOOKUP, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE, {.dparam = 0}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 4ef7e887..054045cb 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -12,7 +12,9 @@ index = space:create_index('primary', { type = 'hash' })
 ...
 errinj.info()
 ---
-- ERRINJ_WAL_WRITE:
+- ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT:
+    state: 0
+  ERRINJ_WAL_WRITE:
     state: false
   ERRINJ_VYRUN_DATA_READ:
     state: false
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 5e4037cf..9adffd96 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -894,14 +894,54 @@ box.snapshot()
 ---
 - ok
 ...
+box.error.injection.set('ERRINJ_VY_RUN_WRITE_TIMEOUT', 0)
+---
+- ok
+...
+--
+-- Check that all dump/compact tasks that are in progress at
+-- the time when the server stops are aborted immediately.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('i1', {parts = {1, 'unsigned'}})
+---
+...
+_ = s:create_index('i2', {parts = {2, 'unsigned'}})
+---
+...
+box.error.injection.set('ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT', 0.01)
+---
+- ok
+...
+for i = 1, 1000 do s:replace{i, i} end
+---
+...
+_ = fiber.create(function() box.snapshot() end)
+---
+...
+fiber.sleep(0.01)
+---
+...
 test_run:cmd('switch default')
 ---
 - true
 ...
+t1 = fiber.time()
+---
+...
 test_run:cmd("stop server test")
 ---
 - true
 ...
+t2 = fiber.time()
+---
+...
+t2 - t1 < 1
+---
+- true
+...
 test_run:cmd("cleanup server test")
 ---
 - true
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index bbfb44ab..b4dc3734 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -357,8 +357,24 @@ fiber.sleep(0)
 s:drop()
 -- Wait for the dump task to complete.
 box.snapshot()
+box.error.injection.set('ERRINJ_VY_RUN_WRITE_TIMEOUT', 0)
+
+--
+-- Check that all dump/compact tasks that are in progress at
+-- the time when the server stops are aborted immediately.
+--
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('i1', {parts = {1, 'unsigned'}})
+_ = s:create_index('i2', {parts = {2, 'unsigned'}})
+box.error.injection.set('ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT', 0.01)
+for i = 1, 1000 do s:replace{i, i} end
+_ = fiber.create(function() box.snapshot() end)
+fiber.sleep(0.01)
 test_run:cmd('switch default')
+t1 = fiber.time()
 test_run:cmd("stop server test")
+t2 = fiber.time()
+t2 - t1 < 1
 test_run:cmd("cleanup server test")
 
 --
-- 
2.11.0




More information about the Tarantool-patches mailing list