[Tarantool-patches] [PATCH 2/2] fiber: use fiber_touch and fiber_continue

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 23 02:05:55 MSK 2021


fiber_wakeup() is not safe when called on the self fiber - it
leads to a skip of the next yield, which leads to hard to catch
bugs and might appear much later than the wakeup was called making
it hard to trace the source of the bug.

The previous commit introduces safer versions of wakeup: touch and
continue. This patch uses them everywhere except the tests.

This allows to drop 'f != fiber()' checks in some places which
could work in the current fiber, and ensures in all the other
places the fiber is never the current one.

Closes #5292
---
 src/box/applier.cc           |  4 +---
 src/box/box.cc               |  2 +-
 src/box/gc.c                 | 12 ++++++------
 src/box/journal.c            |  4 ++--
 src/box/memtx_engine.c       |  2 +-
 src/box/raft.c               |  9 ++++-----
 src/box/recovery.cc          |  2 +-
 src/box/txn.c                |  8 +++-----
 src/box/txn_limbo.c          |  9 ++++-----
 src/box/vy_log.c             |  2 +-
 src/box/vy_quota.c           |  2 +-
 src/box/vy_scheduler.c       |  2 +-
 src/lib/core/cbus.c          |  2 +-
 src/lib/core/coio.cc         |  2 +-
 src/lib/core/coio_file.c     |  2 +-
 src/lib/core/coio_task.c     |  2 +-
 src/lib/core/fiber_channel.c |  4 ++--
 src/lib/core/fiber_cond.c    |  4 ++--
 src/lib/core/fiber_cond.h    |  7 ++++---
 src/lib/core/fiber_pool.c    |  2 +-
 src/lib/core/latch.h         |  2 +-
 src/lib/swim/swim.c          |  3 +--
 src/lua/fiber.c              |  2 +-
 23 files changed, 42 insertions(+), 48 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index dc05c91d3..030c46ce5 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -792,9 +792,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
 		txn_limbo_process(&txn_limbo, synchro_entry->req);
 		trigger_run(&replicaset.applier.on_wal_write, NULL);
 	}
-	/* The fiber is the same on final join. */
-	if (synchro_entry->owner != fiber())
-		fiber_wakeup(synchro_entry->owner);
+	fiber_touch(synchro_entry->owner);
 }
 
 /** Process a synchro request. */
diff --git a/src/box/box.cc b/src/box/box.cc
index 59925962d..2060aed50 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1436,7 +1436,7 @@ box_quorum_on_ack_f(struct trigger *trigger, void *event)
 	vclock_follow(&t->vclock, ack->source, new_lsn);
 	++t->ack_count;
 	if (t->ack_count >= t->quorum) {
-		fiber_wakeup(t->waiter);
+		fiber_continue(t->waiter);
 		trigger_clear(trigger);
 	}
 	return 0;
diff --git a/src/box/gc.c b/src/box/gc.c
index 10f899923..2cc1b76de 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -314,7 +314,7 @@ gc_set_wal_cleanup_delay(double wal_cleanup_delay)
 	 * it is already in a regular cleanup stage.
 	 */
 	if (gc.is_paused)
-		fiber_wakeup(gc.cleanup_fiber);
+		fiber_continue(gc.cleanup_fiber);
 }
 
 void
@@ -334,7 +334,7 @@ gc_delay_unref(void)
 		gc.delay_ref--;
 		if (gc.delay_ref == 0) {
 			gc.is_paused = false;
-			fiber_wakeup(gc.cleanup_fiber);
+			fiber_continue(gc.cleanup_fiber);
 		}
 	}
 }
@@ -354,7 +354,7 @@ gc_schedule_cleanup(void)
 	 * the current round completes.
 	 */
 	if (gc.cleanup_scheduled++ == gc.cleanup_completed)
-		fiber_wakeup(gc.cleanup_fiber);
+		fiber_continue(gc.cleanup_fiber);
 }
 
 /**
@@ -425,7 +425,7 @@ gc_set_checkpoint_interval(double interval)
 	checkpoint_schedule_cfg(&gc.checkpoint_schedule,
 				ev_monotonic_now(loop()), interval);
 	if (!gc.checkpoint_is_in_progress)
-		fiber_wakeup(gc.checkpoint_fiber);
+		fiber_continue(gc.checkpoint_fiber);
 }
 
 void
@@ -539,7 +539,7 @@ gc_checkpoint(void)
 	checkpoint_schedule_cfg(&gc.checkpoint_schedule,
 				ev_monotonic_now(loop()),
 				gc.checkpoint_schedule.interval);
-	fiber_wakeup(gc.checkpoint_fiber);
+	fiber_continue(gc.checkpoint_fiber);
 
 	if (gc_do_checkpoint(false) != 0)
 		return -1;
@@ -566,7 +566,7 @@ gc_trigger_checkpoint(void)
 	gc.checkpoint_is_pending = true;
 	checkpoint_schedule_reset(&gc.checkpoint_schedule,
 				  ev_monotonic_now(loop()));
-	fiber_wakeup(gc.checkpoint_fiber);
+	fiber_continue(gc.checkpoint_fiber);
 }
 
 static int
diff --git a/src/box/journal.c b/src/box/journal.c
index 886a15139..b1b334237 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -68,7 +68,7 @@ journal_queue_wakeup(void)
 {
 	struct rlist *list = &journal_queue.waiters;
 	if (!rlist_empty(list) && !journal_queue_is_full())
-		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
+		fiber_continue(rlist_first_entry(list, struct fiber, state));
 }
 
 void
@@ -93,6 +93,6 @@ journal_queue_flush(void)
 		return;
 	struct rlist *list = &journal_queue.waiters;
 	while (!rlist_empty(list))
-		fiber_wakeup(rlist_first_entry(list, struct fiber, state));
+		fiber_continue(rlist_first_entry(list, struct fiber, state));
 	journal_queue_wait();
 }
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index e076cd71d..66f2f28eb 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1170,7 +1170,7 @@ memtx_engine_schedule_gc(struct memtx_engine *memtx,
 			 struct memtx_gc_task *task)
 {
 	stailq_add_tail_entry(&memtx->gc_queue, task, link);
-	fiber_wakeup(memtx->gc_fiber);
+	fiber_continue(memtx->gc_fiber);
 }
 
 void
diff --git a/src/box/raft.c b/src/box/raft.c
index 47c869712..12ddadd4a 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -155,9 +155,8 @@ box_raft_schedule_async(struct raft *raft)
 	 * adapted to this. Also don't wakeup the current fiber - it leads to
 	 * undefined behaviour.
 	 */
-	if ((box_raft_worker->flags & FIBER_IS_CANCELLABLE) != 0 &&
-	    fiber() != box_raft_worker)
-		fiber_wakeup(box_raft_worker);
+	if ((box_raft_worker->flags & FIBER_IS_CANCELLABLE) != 0)
+		fiber_touch(box_raft_worker);
 	box_raft_has_work = true;
 }
 
@@ -283,7 +282,7 @@ box_raft_broadcast(struct raft *raft, const struct raft_msg *msg)
 static void
 box_raft_write_cb(struct journal_entry *entry)
 {
-	fiber_wakeup(entry->complete_data);
+	fiber_continue(entry->complete_data);
 }
 
 static void
@@ -340,7 +339,7 @@ box_raft_wait_leader_found_f(struct trigger *trig, void *event)
 	assert(raft == box_raft());
 	struct fiber *waiter = trig->data;
 	if (raft->leader != REPLICA_ID_NIL || !raft->is_enabled)
-		fiber_wakeup(waiter);
+		fiber_continue(waiter);
 	return 0;
 }
 
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index cd33e7635..ceda1e5c9 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -398,7 +398,7 @@ public:
 	{
 		this->events |= events;
 		if (f->flags & FIBER_IS_CANCELLABLE)
-			fiber_wakeup(f);
+			fiber_continue(f);
 	}
 
 	WalSubscription(const char *wal_dir)
diff --git a/src/box/txn.c b/src/box/txn.c
index 03b39e0de..1737fd2d5 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -504,9 +504,7 @@ txn_free_or_wakeup(struct txn *txn)
 		txn_free(txn);
 	else {
 		txn_set_flags(txn, TXN_IS_DONE);
-		if (txn->fiber != fiber())
-			/* Wake a waiting fiber up. */
-			fiber_wakeup(txn->fiber);
+		fiber_touch(txn->fiber);
 	}
 }
 
@@ -578,8 +576,8 @@ txn_on_journal_write(struct journal_entry *entry)
 		txn_run_wal_write_triggers(txn);
 	if (!txn_has_flag(txn, TXN_WAIT_SYNC))
 		txn_complete_success(txn);
-	else if (txn->fiber != NULL && txn->fiber != fiber())
-		fiber_wakeup(txn->fiber);
+	else if (txn->fiber != NULL)
+		fiber_touch(txn->fiber);
 finish:
 	fiber_set_txn(fiber(), NULL);
 }
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index c22bd6665..f3115d66d 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -275,7 +275,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 		txn_complete_fail(e->txn);
 		if (e == entry)
 			break;
-		fiber_wakeup(e->txn->fiber);
+		fiber_continue(e->txn->fiber);
 	}
 	fiber_set_cancellable(cancellable);
 	diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
@@ -314,8 +314,7 @@ static void
 txn_limbo_write_cb(struct journal_entry *entry)
 {
 	assert(entry->complete_data != NULL);
-	if (fiber() != entry->complete_data)
-		fiber_wakeup(entry->complete_data);
+	fiber_touch(entry->complete_data);
 }
 
 static void
@@ -579,7 +578,7 @@ txn_commit_cb(struct trigger *trigger, void *event)
 	struct confirm_waitpoint *cwp =
 		(struct confirm_waitpoint *)trigger->data;
 	cwp->is_confirm = true;
-	fiber_wakeup(cwp->caller);
+	fiber_continue(cwp->caller);
 	return 0;
 }
 
@@ -590,7 +589,7 @@ txn_rollback_cb(struct trigger *trigger, void *event)
 	struct confirm_waitpoint *cwp =
 		(struct confirm_waitpoint *)trigger->data;
 	cwp->is_rollback = true;
-	fiber_wakeup(cwp->caller);
+	fiber_continue(cwp->caller);
 	return 0;
 }
 
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 8214b054e..2bccc0011 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -771,7 +771,7 @@ vy_log_init(const char *dir)
 				   vy_log_flusher_f);
 	if (vy_log.flusher == NULL)
 		panic("failed to allocate vylog flusher fiber");
-	fiber_wakeup(vy_log.flusher);
+	fiber_continue(vy_log.flusher);
 }
 
 static struct vy_log_tx *
diff --git a/src/box/vy_quota.c b/src/box/vy_quota.c
index f1ac8dd9d..9f57a69a7 100644
--- a/src/box/vy_quota.c
+++ b/src/box/vy_quota.c
@@ -203,7 +203,7 @@ vy_quota_signal(struct vy_quota *q)
 			oldest = n;
 	}
 	if (oldest != NULL)
-		fiber_wakeup(oldest->fiber);
+		fiber_continue(oldest->fiber);
 }
 
 static void
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 7d8324a52..f223aad7c 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -959,7 +959,7 @@ vy_deferred_delete_batch_free_f(struct cmsg *cmsg)
 	/* Notify the caller if this is the last batch. */
 	assert(task->deferred_delete_in_progress > 0);
 	if (--task->deferred_delete_in_progress == 0)
-		fiber_wakeup(task->fiber);
+		fiber_continue(task->fiber);
 }
 
 /**
diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
index 5d91fb948..9fdbf7400 100644
--- a/src/lib/core/cbus.c
+++ b/src/lib/core/cbus.c
@@ -406,7 +406,7 @@ cbus_call_done(struct cmsg *m)
 		return;
 	}
 	msg->complete = true;
-	fiber_wakeup(msg->caller);
+	fiber_continue(msg->caller);
 }
 
 /**
diff --git a/src/lib/core/coio.cc b/src/lib/core/coio.cc
index 5dfe851d4..b191f4cee 100644
--- a/src/lib/core/coio.cc
+++ b/src/lib/core/coio.cc
@@ -747,7 +747,7 @@ coio_wait_cb(struct ev_loop *loop, ev_io *watcher, int revents)
 	(void) loop;
 	struct coio_wdata *wdata = (struct coio_wdata *) watcher->data;
 	wdata->revents = revents;
-	fiber_wakeup(wdata->fiber);
+	fiber_continue(wdata->fiber);
 }
 
 API_EXPORT int
diff --git a/src/lib/core/coio_file.c b/src/lib/core/coio_file.c
index 3ec74a549..7792d7e19 100644
--- a/src/lib/core/coio_file.c
+++ b/src/lib/core/coio_file.c
@@ -121,7 +121,7 @@ coio_complete(eio_req *req)
 	eio->done = true;
 	eio->result = req->result;
 
-	fiber_wakeup(eio->fiber);
+	fiber_continue(eio->fiber);
 	return 0;
 }
 
diff --git a/src/lib/core/coio_task.c b/src/lib/core/coio_task.c
index c8be2dea1..8f58c6dc6 100644
--- a/src/lib/core/coio_task.c
+++ b/src/lib/core/coio_task.c
@@ -185,7 +185,7 @@ coio_on_finish(eio_req *req)
 	task->complete = 1;
 	/* Reset on_timeout hook - resources will be freed by coio_task user */
 	task->base.destroy = NULL;
-	fiber_wakeup(task->fiber);
+	fiber_continue(task->fiber);
 	return 0;
 }
 
diff --git a/src/lib/core/fiber_channel.c b/src/lib/core/fiber_channel.c
index 2ac585648..2676e9b01 100644
--- a/src/lib/core/fiber_channel.c
+++ b/src/lib/core/fiber_channel.c
@@ -141,7 +141,7 @@ fiber_channel_waiter_wakeup(struct fiber *f,
 	/*
 	 * fiber_channel allows an asynchronous cancel. If a fiber
 	 * is cancelled while waiting on a timeout, it is done via
-	 * fiber_wakeup(), which modifies fiber->state link.
+	 * fiber_touch(), which modifies fiber->state link.
 	 * This ensures that a fiber is never on two "state"
 	 * lists: it's either waiting on a channel, or is
 	 * cancelled, ready for execution. This is why
@@ -154,7 +154,7 @@ fiber_channel_waiter_wakeup(struct fiber *f,
 	 * delivered to it. Since 'fiber->state' is used, this
 	 * works correctly with fiber_cancel().
 	 */
-	fiber_wakeup(f);
+	fiber_continue(f);
 }
 
 
diff --git a/src/lib/core/fiber_cond.c b/src/lib/core/fiber_cond.c
index 3624dffd4..b4d34409b 100644
--- a/src/lib/core/fiber_cond.c
+++ b/src/lib/core/fiber_cond.c
@@ -85,7 +85,7 @@ fiber_cond_signal(struct fiber_cond *e)
 	if (! rlist_empty(&e->waiters)) {
 		struct fiber *f;
 		f = rlist_first_entry(&e->waiters, struct fiber, state);
-		fiber_wakeup(f);
+		fiber_continue(f);
 	}
 }
 
@@ -95,7 +95,7 @@ fiber_cond_broadcast(struct fiber_cond *e)
 	while (! rlist_empty(&e->waiters)) {
 		struct fiber *f;
 		f = rlist_first_entry(&e->waiters, struct fiber, state);
-		fiber_wakeup(f);
+		fiber_continue(f);
 	}
 }
 
diff --git a/src/lib/core/fiber_cond.h b/src/lib/core/fiber_cond.h
index 2662e0654..4b338a156 100644
--- a/src/lib/core/fiber_cond.h
+++ b/src/lib/core/fiber_cond.h
@@ -107,9 +107,10 @@ fiber_cond_broadcast(struct fiber_cond *cond);
 /**
  * Suspend the execution of the current fiber (i.e. yield) until
  * fiber_cond_signal() is called. Like pthread_cond, fiber_cond can issue
- * spurious wake ups caused by explicit fiber_wakeup() or fiber_cancel()
- * calls. It is highly recommended to wrap calls to this function into a loop
- * and check an actual predicate and fiber_testcancel() on every iteration.
+ * spurious wake ups caused by explicit fiber_wakeup()/fiber_touch()/
+ * fiber_continue() or fiber_cancel() calls. It is highly recommended to wrap
+ * calls to this function into a loop and check an actual predicate and
+ * fiber_testcancel() on every iteration.
  *
  * @param cond condition
  * @param timeout timeout in seconds
diff --git a/src/lib/core/fiber_pool.c b/src/lib/core/fiber_pool.c
index b980028e4..8d0345a2d 100644
--- a/src/lib/core/fiber_pool.c
+++ b/src/lib/core/fiber_pool.c
@@ -186,7 +186,7 @@ fiber_pool_destroy(struct fiber_pool *pool)
 	pool->idle_timeout = 0;
 	struct fiber *idle_fiber;
 	rlist_foreach_entry(idle_fiber, &pool->idle, state)
-		fiber_wakeup(idle_fiber);
+		fiber_continue(idle_fiber);
 	/**
 	 * Just wait on fiber exit condition until all fibers are done
 	 */
diff --git a/src/lib/core/latch.h b/src/lib/core/latch.h
index 49c59cf63..ccf91742c 100644
--- a/src/lib/core/latch.h
+++ b/src/lib/core/latch.h
@@ -172,7 +172,7 @@ latch_unlock(struct latch *l)
 		 * fiber can intercept this latch.
 		 */
 		l->owner = f;
-		fiber_wakeup(f);
+		fiber_continue(f);
 	}
 }
 
diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index 1ecc90414..9c211c45d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -628,7 +628,7 @@ swim_on_member_update(struct swim *swim, struct swim_member *member,
 			swim_member_ref(member);
 			stailq_add_tail_entry(&swim->event_queue, member,
 					      in_event_queue);
-			fiber_wakeup(swim->event_handler);
+			fiber_continue(swim->event_handler);
 		}
 		member->events |= events;
 	}
@@ -2219,7 +2219,6 @@ swim_kill_event_handler(struct swim *swim)
 	 * reused.
 	 */
 	swim->event_handler = NULL;
-	fiber_wakeup(f);
 	fiber_cancel(f);
 	fiber_join(f);
 }
diff --git a/src/lua/fiber.c b/src/lua/fiber.c
index 02ec3d158..59997dc36 100644
--- a/src/lua/fiber.c
+++ b/src/lua/fiber.c
@@ -520,7 +520,7 @@ lbox_fiber_new(struct lua_State *L)
 		luaL_error(L, "fiber.new(): out of fiber stack");
 
 	struct fiber *f = fiber_create(L);
-	fiber_wakeup(f);
+	fiber_continue(f);
 	return 1;
 }
 
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list