// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+ // Any threads that were woken up by other Capabilities get
+ // appended to our run queue.
+ if (!emptyWakeupQueue(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ if (emptyRunQueue(cap)) {
+ cap->run_queue_hd = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ } else {
+ cap->run_queue_tl->link = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ }
+ cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+ RELEASE_LOCK(&cap->lock);
+ }
+
scheduleCheckBlockedThreads(cap);
scheduleDetectDeadlock(cap,task);
// Run the current thread
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT(t->cap == cap);
prev_what_next = t->what_next;
#endif
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT(t->cap == cap);
// ----------------------------------------------------------------------
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
+ // migration can be turned off with +RTS -qg
+ if (!RtsFlags.ParFlags.migrate) return;
+
// Check whether we have more threads on our run queue, or sparks
// in our pool, that we could hand to another Capability.
if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
appendToRunQueue(free_caps[i],t);
if (t->bound) { t->bound->cap = free_caps[i]; }
+ t->cap = free_caps[i];
i++;
}
}
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(Capability *cap, StgTSO *tso);
+deleteThread_(Capability *cap, StgTSO *tso);
#endif
StgInt
forkProcess(HsStablePtr *entry
} else { // child
- // delete all threads
- cap->run_queue_hd = END_TSO_QUEUE;
- cap->run_queue_tl = END_TSO_QUEUE;
-
+ // Now, all OS threads except the thread that forked are
+ // stopped. We need to stop all Haskell threads, including
+ // those involved in foreign calls. Also we need to delete
+ // all Tasks, because they correspond to OS threads that are
+ // now gone.
+
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->link;
-
- // don't allow threads to catch the ThreadKilled exception
- deleteThreadImmediately(cap,t);
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ // don't allow threads to catch the ThreadKilled
+ // exception, but we do want to raiseAsync() because these
+ // threads may be evaluating thunks that we need later.
+ deleteThread_(cap,t);
+ }
}
- // wipe the task list
+ // Empty the run queue. It seems tempting to let all the
+ // killed threads stay on the run queue as zombies to be
+ // cleaned up later, but some of them correspond to bound
+ // threads for which the corresponding Task does not exist.
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+ // Any suspended C-calling Tasks are no more, their OS threads
+ // don't exist now:
+ cap->suspended_ccalling_tasks = NULL;
+
+ // Empty the all_threads list. Otherwise, the garbage
+ // collector may attempt to resurrect some of these threads.
+ all_threads = END_TSO_QUEUE;
+
+ // Wipe the task list, except the current Task.
ACQUIRE_LOCK(&sched_mutex);
for (task = all_tasks; task != NULL; task=task->all_link) {
- if (task != cap->running_task) discardTask(task);
+ if (task != cap->running_task) {
+ discardTask(task);
+ }
}
RELEASE_LOCK(&sched_mutex);
- cap->suspended_ccalling_tasks = NULL;
-
#if defined(THREADED_RTS)
- // wipe our spare workers list.
+ // Wipe our spare workers list, they no longer exist. New
+ // workers will be created if necessary.
cap->spare_workers = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
tso->saved_errno = 0;
tso->bound = NULL;
+ tso->cap = cap;
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
tso->bound = task;
+ tso->cap = cap;
task->tso = tso;
task->ret = ret;
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
- evac((StgClosure **)&cap->run_queue_hd);
- evac((StgClosure **)&cap->run_queue_tl);
-
+ evac((StgClosure **)(void *)&cap->run_queue_hd);
+ evac((StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+ evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
+ evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
for (task = cap->suspended_ccalling_tasks; task != NULL;
task=task->next) {
- evac((StgClosure **)&task->suspended_tso);
+ IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
+ evac((StgClosure **)(void *)&task->suspended_tso);
}
+
}
+
#if !defined(THREADED_RTS)
evac((StgClosure **)(void *)&blocked_queue_hd);
evac((StgClosure **)(void *)&blocked_queue_tl);
ASSERT(get_itbl(tso)->type == TSO);
ASSERT(tso->why_blocked != NotBlocked);
+
tso->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
- // We might have just migrated this TSO to our Capability:
- if (tso->bound) {
- tso->bound->cap = cap;
+ if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+ // We are waking up this thread on the current Capability, which
+ // might involve migrating it from the Capability it was last on.
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
+ appendToRunQueue(cap,tso);
+ // we're holding a newly woken thread, make sure we context switch
+ // quickly so we can migrate it if necessary.
+ context_switch = 1;
+ } else {
+ // we'll try to wake it up on the Capability it was last on.
+ wakeupThreadOnCapability(tso->cap, tso);
}
- appendToRunQueue(cap,tso);
-
- // we're holding a newly woken thread, make sure we context switch
- // quickly so we can migrate it if necessary.
- context_switch = 1;
- IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
+ IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
return next;
}
if (tso->bound) {
tso->bound->cap = cap;
}
+ tso->cap = cap;
}
#endif
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(Capability *cap, StgTSO *tso)
+deleteThread_(Capability *cap, StgTSO *tso)
{ // for forkProcess only:
- // delete thread without giving it a chance to catch the KillThread exception
-
- if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
- }
-
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unblockThread(cap,tso);
- }
+ // like deleteThread(), but we delete threads in foreign calls, too.
- tso->what_next = ThreadKilled;
+ if (tso->why_blocked == BlockedOnCCall ||
+ tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
+ unblockOne(cap,tso);
+ tso->what_next = ThreadKilled;
+ } else {
+ deleteThread(cap,tso);
+ }
}
#endif
all_threads = tso;
IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
- // Wake up the thread on the Capability it was last on for a
- // bound thread, or last_free_capability otherwise.
- if (tso->bound) {
- cap = tso->bound->cap;
- } else {
- cap = last_free_capability;
- }
+ // Wake up the thread on the Capability it was last on
+ cap = tso->cap;
switch (tso->why_blocked) {
case BlockedOnMVar: