X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=52fd4d5df655f76668314dc399cb0fca1111d88c;hb=c520a3a2752ffcec5710a88a8a2e219c20edfc8a;hp=a82b6a7dce0647be0386607491add1fc3b7a77a8;hpb=afd08a9c06ae4b15e33e26e5a2818801c7fee429;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index a82b6a7..52fd4d5 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -72,20 +72,6 @@ # define STATIC_INLINE static #endif -#ifdef THREADED_RTS -#define USED_WHEN_THREADED_RTS -#define USED_WHEN_NON_THREADED_RTS STG_UNUSED -#else -#define USED_WHEN_THREADED_RTS STG_UNUSED -#define USED_WHEN_NON_THREADED_RTS -#endif - -#ifdef SMP -#define USED_WHEN_SMP -#else -#define USED_WHEN_SMP STG_UNUSED -#endif - /* ----------------------------------------------------------------------------- * Global variables * -------------------------------------------------------------------------- */ @@ -151,7 +137,7 @@ nat recent_activity = ACTIVITY_YES; /* if this flag is set as well, give up execution * LOCK: none (changes once, from false->true) */ -rtsBool interrupted = rtsFalse; +rtsBool sched_state = SCHED_RUNNING; /* Next thread ID to allocate. * LOCK: sched_mutex @@ -189,7 +175,7 @@ rtsBool shutting_down_scheduler = rtsFalse; /* * This mutex protects most of the global scheduler data in - * the THREADED_RTS and (inc. SMP) runtime. + * the THREADED_RTS runtime. */ #if defined(THREADED_RTS) Mutex sched_mutex; @@ -213,11 +199,12 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); -#if defined(SMP) +#if defined(THREADED_RTS) static void schedulePushWork(Capability *cap, Task *task); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); +static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); #if defined(GRAN) @@ -241,7 +228,9 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, StgTSO *t ); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); -static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major); +static Capability *scheduleDoGC(Capability *cap, Task *task, + rtsBool force_major, + void (*get_roots)(evac_fn)); static void unblockThread(Capability *cap, StgTSO *tso); static rtsBool checkBlackHoles(Capability *cap); @@ -253,7 +242,7 @@ static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically, StgPtr stop_here); static void deleteThread (Capability *cap, StgTSO *tso); -static void deleteRunQueue (Capability *cap); +static void deleteAllThreads (Capability *cap); #ifdef DEBUG static void printThreadBlockage(StgTSO *tso); @@ -387,14 +376,14 @@ schedule (Capability *initialCapability, Task *task) // thread for a bit, even if there are others banging at the // door. first = rtsFalse; - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); } else { // Yield the capability to higher-priority tasks if necessary. yieldCapability(&cap, task); } #endif -#ifdef SMP +#if defined(THREADED_RTS) schedulePushWork(cap,task); #endif @@ -407,31 +396,70 @@ schedule (Capability *initialCapability, Task *task) stg_exit(EXIT_FAILURE); } + // The interruption / shutdown sequence. + // + // In order to cleanly shut down the runtime, we want to: + // * make sure that all main threads return to their callers + // with the state 'Interrupted'. + // * clean up all OS threads assocated with the runtime + // * free all memory etc. + // + // So the sequence for ^C goes like this: // - // Test for interruption. If interrupted==rtsTrue, then either - // we received a keyboard interrupt (^C), or the scheduler is - // trying to shut down all the tasks (shutting_down_scheduler) in - // the threaded RTS. + // * ^C handler sets sched_state := SCHED_INTERRUPTING and + // arranges for some Capability to wake up // - if (interrupted) { - deleteRunQueue(cap); -#if defined(SMP) + // * all threads in the system are halted, and the zombies are + // placed on the run queue for cleaning up. We acquire all + // the capabilities in order to delete the threads, this is + // done by scheduleDoGC() for convenience (because GC already + // needs to acquire all the capabilities). We can't kill + // threads involved in foreign calls. + // + // * sched_state := SCHED_INTERRUPTED + // + // * somebody calls shutdownHaskell(), which calls exitScheduler() + // + // * sched_state := SCHED_SHUTTING_DOWN + // + // * all workers exit when the run queue on their capability + // drains. All main threads will also exit when their TSO + // reaches the head of the run queue and they can return. + // + // * eventually all Capabilities will shut down, and the RTS can + // exit. + // + // * We might be left with threads blocked in foreign calls, + // we should really attempt to kill these somehow (TODO); + + switch (sched_state) { + case SCHED_RUNNING: + break; + case SCHED_INTERRUPTING: + IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING")); +#if defined(THREADED_RTS) discardSparksCap(cap); #endif - if (shutting_down_scheduler) { - IF_DEBUG(scheduler, sched_belch("shutting down")); - // If we are a worker, just exit. If we're a bound thread - // then we will exit below when we've removed our TSO from - // the run queue. - if (task->tso == NULL && emptyRunQueue(cap)) { - return cap; - } - } else { - IF_DEBUG(scheduler, sched_belch("interrupted")); + /* scheduleDoGC() deletes all the threads */ + cap = scheduleDoGC(cap,task,rtsFalse,GetRoots); + break; + case SCHED_INTERRUPTED: + IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED")); + break; + case SCHED_SHUTTING_DOWN: + IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN")); + // If we are a worker, just exit. If we're a bound thread + // then we will exit below when we've removed our TSO from + // the run queue. + if (task->tso == NULL && emptyRunQueue(cap)) { + return cap; } + break; + default: + barf("sched_state: %d", sched_state); } -#if defined(SMP) +#if defined(THREADED_RTS) // If the run queue is empty, take a spark and turn it into a thread. { if (emptyRunQueue(cap)) { @@ -445,7 +473,7 @@ schedule (Capability *initialCapability, Task *task) } } } -#endif // SMP +#endif // THREADED_RTS scheduleStartSignalHandlers(cap); @@ -455,9 +483,14 @@ schedule (Capability *initialCapability, Task *task) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } + scheduleCheckWakeupThreads(cap); + scheduleCheckBlockedThreads(cap); scheduleDetectDeadlock(cap,task); +#if defined(THREADED_RTS) + cap = task->cap; // reload cap, it might have changed +#endif // Normally, the only way we can get here with no threads to // run is if a keyboard interrupt received during @@ -469,7 +502,7 @@ schedule (Capability *initialCapability, Task *task) // as a result of a console event having been delivered. if ( emptyRunQueue(cap) ) { #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) - ASSERT(interrupted); + ASSERT(sched_state >= SCHED_INTERRUPTING); #endif continue; // nothing to do } @@ -573,11 +606,16 @@ run_thread: // ---------------------------------------------------------------------- // Run the current thread + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + ASSERT(t->cap == cap); + prev_what_next = t->what_next; errno = t->saved_errno; cap->in_haskell = rtsTrue; + dirtyTSO(t); + recent_activity = ACTIVITY_YES; switch (prev_what_next) { @@ -624,7 +662,7 @@ run_thread: // be running again, see code below. t->saved_errno = errno; -#ifdef SMP +#if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that // blocked, we are in limbo - the TSO is now owned by whatever it // is blocked on, and may in fact already have been woken up, @@ -633,13 +671,14 @@ run_thread: // immediately and return to normaility. if (ret == ThreadBlocked) { IF_DEBUG(scheduler, - debugBelch("--<< thread %d (%s) stopped: blocked\n", - t->id, whatNext_strs[t->what_next])); + sched_belch("--<< thread %d (%s) stopped: blocked\n", + t->id, whatNext_strs[t->what_next])); continue; } #endif - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + ASSERT(t->cap == cap); // ---------------------------------------------------------------------- @@ -681,7 +720,7 @@ run_thread: case ThreadFinished: if (scheduleHandleThreadFinished(cap, task, t)) return cap; - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); break; default: @@ -689,7 +728,9 @@ run_thread: } if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } - if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); } + if (ready_to_gc) { + cap = scheduleDoGC(cap,task,rtsFalse,GetRoots); + } } /* end of while() */ IF_PAR_DEBUG(verbose, @@ -728,14 +769,17 @@ schedulePreLoop(void) * Push work to other Capabilities if we have some. * -------------------------------------------------------------------------- */ -#ifdef SMP +#if defined(THREADED_RTS) static void -schedulePushWork(Capability *cap USED_WHEN_SMP, - Task *task USED_WHEN_SMP) +schedulePushWork(Capability *cap USED_IF_THREADS, + Task *task USED_IF_THREADS) { Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; + // migration can be turned off with +RTS -qg + if (!RtsFlags.ParFlags.migrate) return; + // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) @@ -785,7 +829,8 @@ schedulePushWork(Capability *cap USED_WHEN_SMP, next = t->link; t->link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated - || t->bound == task) { // don't move my bound thread + || t->bound == task // don't move my bound thread + || tsoLocked(t)) { // don't move a locked thread prev->link = t; prev = t; } else if (i == n_free_caps) { @@ -795,8 +840,10 @@ schedulePushWork(Capability *cap USED_WHEN_SMP, prev->link = t; prev = t; } else { + IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no)); appendToRunQueue(free_caps[i],t); if (t->bound) { t->bound->cap = free_caps[i]; } + t->cap = free_caps[i]; i++; } } @@ -853,7 +900,7 @@ scheduleStartSignalHandlers(Capability *cap STG_UNUSED) * ------------------------------------------------------------------------- */ static void -scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS) +scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) { #if !defined(THREADED_RTS) // @@ -870,6 +917,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS) /* ---------------------------------------------------------------------------- + * Check for threads woken up by other Capabilities + * ------------------------------------------------------------------------- */ + +static void +scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + // Any threads that were woken up by other Capabilities get + // appended to our run queue. + if (!emptyWakeupQueue(cap)) { + ACQUIRE_LOCK(&cap->lock); + if (emptyRunQueue(cap)) { + cap->run_queue_hd = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } else { + cap->run_queue_tl->link = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } + cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; + RELEASE_LOCK(&cap->lock); + } +#endif +} + +/* ---------------------------------------------------------------------------- * Check for threads blocked on BLACKHOLEs that can be woken up * ------------------------------------------------------------------------- */ static void @@ -895,7 +967,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) { #if defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL + // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL return; #endif @@ -924,7 +996,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - scheduleDoGC( cap, task, rtsTrue/*force major GC*/ ); + cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots); + recent_activity = ACTIVITY_DONE_GC; if ( !emptyRunQueue(cap) ) return; @@ -945,7 +1018,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) } // either we have threads to run, or we were interrupted: - ASSERT(!emptyRunQueue(cap) || interrupted); + ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING); } #endif @@ -1514,7 +1587,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#if !defined(SMP) +#if !defined(THREADED_RTS) ASSERT(g0s0->blocks == cap->r.rCurrentNursery && g0s0 == cap->r.rNursery); #endif @@ -1719,9 +1792,9 @@ scheduleHandleThreadBlocked( StgTSO *t // has tidied up its stack and placed itself on whatever queue // it needs to be on. -#if !defined(SMP) +#if !defined(THREADED_RTS) ASSERT(t->why_blocked != NotBlocked); - // This might not be true under SMP: we don't have + // This might not be true under THREADED_RTS: we don't have // exclusive access to this TSO, so someone might have // woken it up by now. This actually happens: try // conc023 +RTS -N2. @@ -1839,7 +1912,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) if (task->ret) { *(task->ret) = NULL; } - if (interrupted) { + if (sched_state >= SCHED_INTERRUPTING) { task->stat = Interrupted; } else { task->stat = Killed; @@ -1867,8 +1940,19 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) if (performHeapProfile || (RtsFlags.ProfFlags.profileInterval==0 && RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) { + + // checking black holes is necessary before GC, otherwise + // there may be threads that are unreachable except by the + // blackhole queue, which the GC will consider to be + // deadlocked. + scheduleCheckBlackHoles(&MainCapability); + + IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census")); GarbageCollect(GetRoots, rtsTrue); + + IF_DEBUG(scheduler, sched_belch("performing heap census")); heapCensus(); + performHeapProfile = rtsFalse; return rtsTrue; // true <=> we already GC'd } @@ -1880,17 +1964,18 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) * Perform a garbage collection if necessary * -------------------------------------------------------------------------- */ -static void -scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) +static Capability * +scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, + rtsBool force_major, void (*get_roots)(evac_fn)) { StgTSO *t; -#ifdef SMP +#ifdef THREADED_RTS static volatile StgWord waiting_for_gc; rtsBool was_waiting; nat i; #endif -#ifdef SMP +#ifdef THREADED_RTS // In order to GC, there must be no threads running Haskell code. // Therefore, the GC thread needs to hold *all* the capabilities, // and release them after the GC has completed. @@ -1906,9 +1991,9 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) if (was_waiting) { do { IF_DEBUG(scheduler, sched_belch("someone else is trying to GC...")); - yieldCapability(&cap,task); + if (cap) yieldCapability(&cap,task); } while (waiting_for_gc); - return; + return cap; // NOTE: task->cap might have changed here } for (i=0; i < n_capabilities; i++) { @@ -1952,7 +2037,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - raiseAsync_(cap, t, NULL, rtsTrue, NULL); + raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL); #ifdef REG_R1 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); @@ -1964,10 +2049,20 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) } // so this happens periodically: - scheduleCheckBlackHoles(cap); + if (cap) scheduleCheckBlackHoles(cap); IF_DEBUG(scheduler, printAllThreads()); + /* + * We now have all the capabilities; if we're in an interrupting + * state, then we should take the opportunity to delete all the + * threads in the system. + */ + if (sched_state >= SCHED_INTERRUPTING) { + deleteAllThreads(&capabilities[0]); + sched_state = SCHED_INTERRUPTED; + } + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1976,9 +2071,9 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) #if defined(THREADED_RTS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif - GarbageCollect(GetRoots, force_major); + GarbageCollect(get_roots, force_major); -#if defined(SMP) +#if defined(THREADED_RTS) // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { if (cap != &capabilities[i]) { @@ -1986,7 +2081,11 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) releaseCapability(&capabilities[i]); } } - task->cap = cap; + if (cap) { + task->cap = cap; + } else { + task->cap = NULL; + } #endif #if defined(GRAN) @@ -1999,6 +2098,8 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) G_EVENTQ(0); G_CURR_THREADQ(0)); #endif /* GRAN */ + + return cap; } /* --------------------------------------------------------------------------- @@ -2021,7 +2122,7 @@ rtsSupportsBoundThreads(void) * ------------------------------------------------------------------------- */ StgBool -isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS) +isThreadBound(StgTSO* tso USED_IF_THREADS) { #if defined(THREADED_RTS) return (tso->bound != NULL); @@ -2033,13 +2134,13 @@ isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS) * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -#if !defined(mingw32_HOST_OS) && !defined(SMP) +#if !defined(mingw32_HOST_OS) #define FORKPROCESS_PRIMOP_SUPPORTED #endif #ifdef FORKPROCESS_PRIMOP_SUPPORTED static void -deleteThreadImmediately(Capability *cap, StgTSO *tso); +deleteThread_(Capability *cap, StgTSO *tso); #endif StgInt forkProcess(HsStablePtr *entry @@ -2054,6 +2155,13 @@ forkProcess(HsStablePtr *entry StgTSO* t,*next; Capability *cap; +#if defined(THREADED_RTS) + if (RtsFlags.ParFlags.nNodes > 1) { + errorBelch("forking not supported with +RTS -N greater than 1"); + stg_exit(EXIT_FAILURE); + } +#endif + IF_DEBUG(scheduler,sched_belch("forking!")); // ToDo: for SMP, we should probably acquire *all* the capabilities @@ -2069,27 +2177,54 @@ forkProcess(HsStablePtr *entry } else { // child - // delete all threads - cap->run_queue_hd = END_TSO_QUEUE; - cap->run_queue_tl = END_TSO_QUEUE; - + // Now, all OS threads except the thread that forked are + // stopped. We need to stop all Haskell threads, including + // those involved in foreign calls. Also we need to delete + // all Tasks, because they correspond to OS threads that are + // now gone. + for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->link; - - // don't allow threads to catch the ThreadKilled exception - deleteThreadImmediately(cap,t); + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + // don't allow threads to catch the ThreadKilled + // exception, but we do want to raiseAsync() because these + // threads may be evaluating thunks that we need later. + deleteThread_(cap,t); + } } - // wipe the task list + // Empty the run queue. It seems tempting to let all the + // killed threads stay on the run queue as zombies to be + // cleaned up later, but some of them correspond to bound + // threads for which the corresponding Task does not exist. + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + + // Any suspended C-calling Tasks are no more, their OS threads + // don't exist now: + cap->suspended_ccalling_tasks = NULL; + + // Empty the all_threads list. Otherwise, the garbage + // collector may attempt to resurrect some of these threads. + all_threads = END_TSO_QUEUE; + + // Wipe the task list, except the current Task. ACQUIRE_LOCK(&sched_mutex); for (task = all_tasks; task != NULL; task=task->all_link) { - if (task != cap->running_task) discardTask(task); + if (task != cap->running_task) { + discardTask(task); + } } RELEASE_LOCK(&sched_mutex); #if defined(THREADED_RTS) - // wipe our spare workers list. + // Wipe our spare workers list, they no longer exist. New + // workers will be created if necessary. cap->spare_workers = NULL; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; #endif cap = rts_evalStableIO(cap, entry, NULL); // run the action @@ -2106,22 +2241,34 @@ forkProcess(HsStablePtr *entry } /* --------------------------------------------------------------------------- - * Delete the threads on the run queue of the current capability. + * Delete all the threads in the system * ------------------------------------------------------------------------- */ static void -deleteRunQueue (Capability *cap) +deleteAllThreads ( Capability *cap ) { - StgTSO *t, *next; - for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) { - ASSERT(t->what_next != ThreadRelocated); - next = t->link; - deleteThread(cap, t); - } -} + StgTSO* t, *next; + IF_DEBUG(scheduler,sched_belch("deleting all threads")); + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(cap,t); + } + } -/* startThread and insertThread are now in GranSim.c -- HWL */ + // The run queue now contains a bunch of ThreadKilled threads. We + // must not throw these away: the main thread(s) will be in there + // somewhere, and the main scheduler loop has to deal with it. + // Also, the run queue is the only thing keeping these threads from + // being GC'd, and we don't want the "main thread has been GC'd" panic. +#if !defined(THREADED_RTS) + ASSERT(blocked_queue_hd == END_TSO_QUEUE); + ASSERT(sleeping_queue == END_TSO_QUEUE); +#endif +} /* ----------------------------------------------------------------------------- Managing the suspended_ccalling_tasks list. @@ -2257,6 +2404,11 @@ resumeThread (void *task_) cap->in_haskell = rtsTrue; errno = saved_errno; + /* We might have GC'd, mark the TSO dirty again */ + dirtyTSO(tso); + + IF_DEBUG(sanity, checkTSO(tso)); + return &cap->r; } @@ -2370,9 +2522,11 @@ createThread(Capability *cap, nat size) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->flags = TSO_DIRTY; tso->saved_errno = 0; tso->bound = NULL; + tso->cap = cap; tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) @@ -2569,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso) appendToRunQueue(cap,tso); } +void +scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) +{ +#if defined(THREADED_RTS) + tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't + // move this thread from now on. + cpu %= RtsFlags.ParFlags.nNodes; + if (cpu == cap->no) { + appendToRunQueue(cap,tso); + } else { + Capability *target_cap = &capabilities[cpu]; + if (tso->bound) { + tso->bound->cap = target_cap; + } + tso->cap = target_cap; + wakeupThreadOnCapability(target_cap,tso); + } +#else + appendToRunQueue(cap,tso); +#endif +} + Capability * scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { @@ -2580,6 +2756,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) // This TSO is now a bound thread; make the Task and TSO // point to each other. tso->bound = task; + tso->cap = cap; task->tso = tso; task->ret = ret; @@ -2599,7 +2776,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) cap = schedule(cap,task); ASSERT(task->stat != NoStatus); - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id)); return cap; @@ -2665,7 +2842,7 @@ initScheduler(void) all_threads = END_TSO_QUEUE; context_switch = 0; - interrupted = 0; + sched_state = SCHED_RUNNING; RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; @@ -2680,17 +2857,17 @@ initScheduler(void) /* A capability holds the state a native thread needs in * order to execute STG code. At least one capability is - * floating around (only SMP builds have more than one). + * floating around (only THREADED_RTS builds have more than one). */ initCapabilities(); initTaskManager(); -#if defined(SMP) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) initSparkPools(); #endif -#if defined(SMP) +#if defined(THREADED_RTS) /* * Eagerly start one worker to run each Capability, except for * Capability 0. The idea is that we're probably going to start a @@ -2715,18 +2892,25 @@ initScheduler(void) void exitScheduler( void ) { - interrupted = rtsTrue; - shutting_down_scheduler = rtsTrue; + Task *task = NULL; + +#if defined(THREADED_RTS) + ACQUIRE_LOCK(&sched_mutex); + task = newBoundTask(); + RELEASE_LOCK(&sched_mutex); +#endif + + // If we haven't killed all the threads yet, do it now. + if (sched_state < SCHED_INTERRUPTED) { + sched_state = SCHED_INTERRUPTING; + scheduleDoGC(NULL,task,rtsFalse,GetRoots); + } + sched_state = SCHED_SHUTTING_DOWN; #if defined(THREADED_RTS) { - Task *task; nat i; - ACQUIRE_LOCK(&sched_mutex); - task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); - for (i = 0; i < n_capabilities; i++) { shutdownCapability(&capabilities[i], task); } @@ -2780,25 +2964,31 @@ GetRoots( evac_fn evac ) for (i = 0; i < n_capabilities; i++) { cap = &capabilities[i]; - evac((StgClosure **)&cap->run_queue_hd); - evac((StgClosure **)&cap->run_queue_tl); - + evac((StgClosure **)(void *)&cap->run_queue_hd); + evac((StgClosure **)(void *)&cap->run_queue_tl); +#if defined(THREADED_RTS) + evac((StgClosure **)(void *)&cap->wakeup_queue_hd); + evac((StgClosure **)(void *)&cap->wakeup_queue_tl); +#endif for (task = cap->suspended_ccalling_tasks; task != NULL; task=task->next) { - evac((StgClosure **)&task->suspended_tso); + IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id)); + evac((StgClosure **)(void *)&task->suspended_tso); } + } + #if !defined(THREADED_RTS) - evac((StgClosure **)&blocked_queue_hd); - evac((StgClosure **)&blocked_queue_tl); - evac((StgClosure **)&sleeping_queue); + evac((StgClosure **)(void *)&blocked_queue_hd); + evac((StgClosure **)(void *)&blocked_queue_tl); + evac((StgClosure **)(void *)&sleeping_queue); #endif #endif - evac((StgClosure **)&blackhole_queue); + // evac((StgClosure **)&blackhole_queue); -#if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN) markSparkQueue(evac); #endif @@ -2823,26 +3013,32 @@ GetRoots( evac_fn evac ) static void (*extra_roots)(evac_fn); +static void +performGC_(rtsBool force_major, void (*get_roots)(evac_fn)) +{ + Task *task = myTask(); + + if (task == NULL) { + ACQUIRE_LOCK(&sched_mutex); + task = newBoundTask(); + RELEASE_LOCK(&sched_mutex); + scheduleDoGC(NULL,task,force_major, get_roots); + boundTaskExiting(task); + } else { + scheduleDoGC(NULL,task,force_major, get_roots); + } +} + void performGC(void) { -#ifdef THREADED_RTS - // ToDo: we have to grab all the capabilities here. - errorBelch("performGC not supported in threaded RTS (yet)"); - stg_exit(EXIT_FAILURE); -#endif - /* Obligated to hold this lock upon entry */ - GarbageCollect(GetRoots,rtsFalse); + performGC_(rtsFalse, GetRoots); } void performMajorGC(void) { -#ifdef THREADED_RTS - errorBelch("performMayjorGC not supported in threaded RTS (yet)"); - stg_exit(EXIT_FAILURE); -#endif - GarbageCollect(GetRoots,rtsTrue); + performGC_(rtsTrue, GetRoots); } static void @@ -2855,12 +3051,8 @@ AllRoots(evac_fn evac) void performGCWithRoots(void (*get_roots)(evac_fn)) { -#ifdef THREADED_RTS - errorBelch("performGCWithRoots not supported in threaded RTS (yet)"); - stg_exit(EXIT_FAILURE); -#endif extra_roots = get_roots; - GarbageCollect(AllRoots,rtsFalse); + performGC_(rtsFalse, AllRoots); } /* ----------------------------------------------------------------------------- @@ -3083,21 +3275,34 @@ unblockOne(Capability *cap, StgTSO *tso) ASSERT(get_itbl(tso)->type == TSO); ASSERT(tso->why_blocked != NotBlocked); + tso->why_blocked = NotBlocked; next = tso->link; tso->link = END_TSO_QUEUE; - // We might have just migrated this TSO to our Capability: - if (tso->bound) { - tso->bound->cap = cap; +#if defined(THREADED_RTS) + if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) { + // We are waking up this thread on the current Capability, which + // might involve migrating it from the Capability it was last on. + if (tso->bound) { + ASSERT(tso->bound->cap == tso->cap); + tso->bound->cap = cap; + } + tso->cap = cap; + appendToRunQueue(cap,tso); + // we're holding a newly woken thread, make sure we context switch + // quickly so we can migrate it if necessary. + context_switch = 1; + } else { + // we'll try to wake it up on the Capability it was last on. + wakeupThreadOnCapability(tso->cap, tso); } - +#else appendToRunQueue(cap,tso); - - // we're holding a newly woken thread, make sure we context switch - // quickly so we can migrate it if necessary. context_switch = 1; - IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id)); +#endif + + IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no)); return next; } @@ -3234,7 +3439,7 @@ awakenBlockedQueue(Capability *cap, StgTSO *tso) void interruptStgRts(void) { - interrupted = 1; + sched_state = SCHED_INTERRUPTING; context_switch = 1; #if defined(THREADED_RTS) prodAllCapabilities(); @@ -3542,6 +3747,12 @@ unblockThread(Capability *cap, StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; appendToRunQueue(cap,tso); + + // We might have just migrated this TSO to our Capability: + if (tso->bound) { + tso->bound->cap = cap; + } + tso->cap = cap; } #endif @@ -3624,9 +3835,10 @@ checkBlackHoles (Capability *cap) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * - * ToDo: in SMP mode, this function is only safe if either (a) we hold - * all the Capabilities (eg. in GC), or (b) we own the Capability that - * the TSO is currently blocked on or on the run queue of. + * ToDo: in THREADED_RTS mode, this function is only safe if either + * (a) we hold all the Capabilities (eg. in GC, or if there is only + * one Capability), or (b) we own the Capability that the TSO is + * currently blocked on or on the run queue of. * * -------------------------------------------------------------------------- */ @@ -3661,6 +3873,9 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, // Remove it from any blocking queues unblockThread(cap,tso); + // mark it dirty; we're about to change its stack. + dirtyTSO(tso); + sp = tso->sp; // The stack freezing code assumes there's a closure pointer on @@ -3746,7 +3961,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, sp += sizeofW(StgUpdateFrame) - 1; sp[0] = (W_)ap; // push onto stack frame = sp + 1; - break; + continue; //no need to bump frame } case STOP_FRAME: @@ -3771,7 +3986,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); + raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -3866,20 +4081,17 @@ deleteThread (Capability *cap, StgTSO *tso) #ifdef FORKPROCESS_PRIMOP_SUPPORTED static void -deleteThreadImmediately(Capability *cap, StgTSO *tso) +deleteThread_(Capability *cap, StgTSO *tso) { // for forkProcess only: - // delete thread without giving it a chance to catch the KillThread exception + // like deleteThread(), but we delete threads in foreign calls, too. - if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; - } - - if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - unblockThread(cap,tso); - } - - tso->what_next = ThreadKilled; + if (tso->why_blocked == BlockedOnCCall || + tso->why_blocked == BlockedOnCCall_NoUnblockExc) { + unblockOne(cap,tso); + tso->what_next = ThreadKilled; + } else { + deleteThread(cap,tso); + } } #endif @@ -3904,7 +4116,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // thunks which are currently under evaluataion. // - // + // OLD COMMENT (we don't have MIN_UPD_SIZE now): // LDV profiling: stg_raise_info has THUNK as its closure // type. Since a THUNK takes at least MIN_UPD_SIZE words in its // payload, MIN_UPD_SIZE is more approprate than 1. It seems that @@ -3932,7 +4144,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); + (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } @@ -4037,13 +4249,8 @@ resurrectThreads (StgTSO *threads) all_threads = tso; IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id)); - // Wake up the thread on the Capability it was last on for a - // bound thread, or last_free_capability otherwise. - if (tso->bound) { - cap = tso->bound->cap; - } else { - cap = last_free_capability; - } + // Wake up the thread on the Capability it was last on + cap = tso->cap; switch (tso->why_blocked) { case BlockedOnMVar: