X-Git-Url: http://git.megacz.com/?p=ghc-hetmet.git;a=blobdiff_plain;f=rts%2FSchedule.c;h=9636223836296e3a457ed97358bd09724a59bf18;hp=141c973f3aa09693f4e94c4a6f3253163469727d;hb=febf1ced754a3996ac1a5877dcded87828560d1c;hpb=0506cb7ec75321eaacc6c279d01d82368d2ca125 diff --git a/rts/Schedule.c b/rts/Schedule.c index 141c973..9636223 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -9,34 +9,24 @@ #include "PosixSource.h" #define KEEP_LOCKCLOSURE #include "Rts.h" -#include "SchedAPI.h" + +#include "sm/Storage.h" #include "RtsUtils.h" -#include "RtsFlags.h" -#include "OSThreads.h" -#include "Storage.h" #include "StgRun.h" -#include "Hooks.h" #include "Schedule.h" -#include "StgMiscClosures.h" #include "Interpreter.h" #include "Printer.h" #include "RtsSignals.h" -#include "Sanity.h" +#include "sm/Sanity.h" #include "Stats.h" #include "STM.h" -#include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" -#include "LdvProfile.h" #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" -#include "GC.h" #include "Weak.h" -#include "EventLog.h" - -/* PARALLEL_HASKELL includes go here */ - +#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N #include "Sparks.h" #include "Capability.h" #include "Task.h" @@ -47,7 +37,9 @@ #include "Trace.h" #include "RaiseAsync.h" #include "Threads.h" -#include "ThrIOManager.h" +#include "Timer.h" +#include "ThreadPaused.h" +#include "Messages.h" #ifdef HAVE_SYS_TYPES_H #include @@ -64,12 +56,9 @@ #include #endif -// Turn off inlining when debugging - it obfuscates things -#ifdef DEBUG -# undef STATIC_INLINE -# define STATIC_INLINE static +#ifdef TRACING +#include "eventlog/EventLog.h" #endif - /* ----------------------------------------------------------------------------- * Global variables * -------------------------------------------------------------------------- */ @@ -81,17 +70,6 @@ StgTSO *blocked_queue_tl = NULL; StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? #endif -/* Threads blocked on blackholes. - * LOCK: sched_mutex+capability, or all capabilities - */ -StgTSO *blackhole_queue = NULL; - -/* The blackhole_queue should be checked for threads to wake up. See - * Schedule.h for more thorough comment. - * LOCK: none (doesn't matter if we miss an update) - */ -rtsBool blackholes_need_checking = rtsFalse; - /* Set to true when the latest garbage collection failed to reclaim * enough space, and the runtime should proceed to shut itself down in * an orderly fashion (emitting profiling info etc.) @@ -154,22 +132,15 @@ static void scheduleYield (Capability **pcap, Task *task); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); -static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); -static void scheduleCheckBlackHoles (Capability *cap); +static void scheduleProcessInbox(Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) -static rtsBool scheduleGetRemoteWork(Capability *cap); -static void scheduleSendPendingMessages(void); -#endif -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); -static void scheduleHandleStackOverflow( Capability *cap, Task *task, - StgTSO *t); -static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, +static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ); static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, @@ -178,11 +149,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc); static Capability *scheduleDoGC(Capability *cap, Task *task, rtsBool force_major); -static rtsBool checkBlackHoles(Capability *cap); - -static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); -static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso); - static void deleteThread (Capability *cap, StgTSO *tso); static void deleteAllThreads (Capability *cap); @@ -190,38 +156,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -#ifdef DEBUG -static char *whatNext_strs[] = { - "(unknown)", - "ThreadRunGHC", - "ThreadInterpret", - "ThreadKilled", - "ThreadRelocated", - "ThreadComplete" -}; -#endif - -/* ----------------------------------------------------------------------------- - * Putting a thread on the run queue: different scheduling policies - * -------------------------------------------------------------------------- */ - -STATIC_INLINE void -addToRunQueue( Capability *cap, StgTSO *t ) -{ -#if defined(PARALLEL_HASKELL) - if (RtsFlags.ParFlags.doFairScheduling) { - // this does round-robin scheduling; good for concurrency - appendToRunQueue(cap,t); - } else { - // this does unfair scheduling; good for parallelism - pushOnRunQueue(cap,t); - } -#else - // this does round-robin scheduling; good for concurrency - appendToRunQueue(cap,t); -#endif -} - /* --------------------------------------------------------------------------- Main scheduling loop. @@ -264,9 +198,6 @@ schedule (Capability *initialCapability, Task *task) StgTSO *t; Capability *cap; StgThreadReturnCode ret; -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; -#endif nat prev_what_next; rtsBool ready_to_gc; #if defined(THREADED_RTS) @@ -279,28 +210,14 @@ schedule (Capability *initialCapability, Task *task) // The sched_mutex is *NOT* held // NB. on return, we still hold a capability. - debugTrace (DEBUG_sched, - "### NEW SCHEDULER LOOP (task: %p, cap: %p)", - task, initialCapability); - - if (running_finalizers) { - errorBelch("error: a C finalizer called back into Haskell.\n" - " use Foreign.Concurrent.newForeignPtr for Haskell finalizers."); - stg_exit(EXIT_FAILURE); - } + debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no); schedulePreLoop(); // ----------------------------------------------------------- // Scheduler loop starts here: -#if defined(PARALLEL_HASKELL) -#define TERMINATION_CONDITION (!receivedFinish) -#else -#define TERMINATION_CONDITION rtsTrue -#endif - - while (TERMINATION_CONDITION) { + while (1) { // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -368,7 +285,7 @@ schedule (Capability *initialCapability, Task *task) // If we are a worker, just exit. If we're a bound thread // then we will exit below when we've removed our TSO from // the run queue. - if (task->tso == NULL && emptyRunQueue(cap)) { + if (!isBoundTask(task) && emptyRunQueue(cap)) { return cap; } break; @@ -382,21 +299,6 @@ schedule (Capability *initialCapability, Task *task) (pushes threads, wakes up idle capabilities for stealing) */ schedulePushWork(cap,task); -#if defined(PARALLEL_HASKELL) - /* since we perform a blocking receive and continue otherwise, - either we never reach here or we definitely have work! */ - // from here: non-empty run queue - ASSERT(!emptyRunQueue(cap)); - - if (PacketsWaiting()) { /* now process incoming messages, if any - pending... - - CAUTION: scheduleGetRemoteWork called - above, waits for messages as well! */ - processMessages(cap, &receivedFinish); - } -#endif // PARALLEL_HASKELL: non-empty run queue! - scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) @@ -423,8 +325,8 @@ schedule (Capability *initialCapability, Task *task) // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); } - yield: scheduleYield(&cap,task); + if (emptyRunQueue(cap)) continue; // look for work again #endif @@ -447,25 +349,25 @@ schedule (Capability *initialCapability, Task *task) // Check whether we can run this thread in the current task. // If not, we have to pass our capability to the right task. { - Task *bound = t->bound; + InCall *bound = t->bound; if (bound) { - if (bound == task) { - debugTrace(DEBUG_sched, - "### Running thread %lu in bound thread", (unsigned long)t->id); + if (bound->task == task) { // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, - "### thread %lu bound to another OS thread", (unsigned long)t->id); + "thread %lu bound to another OS thread", + (unsigned long)t->id); // no, bound to a different Haskell thread: pass to that thread pushOnRunQueue(cap,t); continue; } } else { // The thread we want to run is unbound. - if (task->tso) { + if (task->incall->tso) { debugTrace(DEBUG_sched, - "### this OS thread cannot run thread %lu", (unsigned long)t->id); + "this OS thread cannot run thread %lu", + (unsigned long)t->id); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread pushOnRunQueue(cap,t); @@ -500,20 +402,14 @@ run_thread: // that. cap->r.rCurrentTSO = t; - debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", - (long)t->id, whatNext_strs[t->what_next]); - startHeapProfTimer(); - // Check for exceptions blocked on this thread - maybePerformBlockedException (cap, t); - // ---------------------------------------------------------------------- // Run the current thread ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); - ASSERT(t->bound ? t->bound->cap == cap : 1); + ASSERT(t->bound ? t->bound->task->cap == cap : 1); prev_what_next = t->what_next; @@ -525,6 +421,7 @@ run_thread: cap->in_haskell = rtsTrue; dirty_TSO(cap,t); + dirty_STACK(cap,t->stackobj); #if defined(THREADED_RTS) if (recent_activity == ACTIVITY_DONE_GC) { @@ -535,12 +432,16 @@ run_thread: if (prev == ACTIVITY_DONE_GC) { startTimer(); } - } else { + } else if (recent_activity != ACTIVITY_INACTIVE) { + // If we reached ACTIVITY_INACTIVE, then don't reset it until + // we've done the GC. The thread running here might just be + // the IO manager thread that handle_tick() woke up via + // wakeUpRts(). recent_activity = ACTIVITY_YES; } #endif - postEvent(cap, EVENT_RUN_THREAD, t->id, 0); + traceEventRunThread(cap, t); switch (prev_what_next) { @@ -574,13 +475,6 @@ run_thread: // happened. So find the new location: t = cap->r.rCurrentTSO; - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - // Lock-free test here should be ok, we're just setting a flag. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } - // And save the current errno in this thread. // XXX: possibly bogus for SMP because this thread might already // be running again, see code below. @@ -590,22 +484,17 @@ run_thread: t->saved_winerror = GetLastError(); #endif - postEvent (cap, EVENT_STOP_THREAD, t->id, ret); - -#if defined(THREADED_RTS) - // If ret is ThreadBlocked, and this Task is bound to the TSO that - // blocked, we are in limbo - the TSO is now owned by whatever it - // is blocked on, and may in fact already have been woken up, - // perhaps even on a different Capability. It may be the case - // that task->cap != cap. We better yield this Capability - // immediately and return to normaility. if (ret == ThreadBlocked) { - debugTrace(DEBUG_sched, - "--<< thread %lu (%s) stopped: blocked", - (unsigned long)t->id, whatNext_strs[t->what_next]); - goto yield; + if (t->why_blocked == BlockedOnBlackHole) { + StgTSO *owner = blackHoleOwner(t->block_info.bh->bh); + traceEventStopThread(cap, t, t->why_blocked + 6, + owner != NULL ? owner->id : 0); + } else { + traceEventStopThread(cap, t, t->why_blocked + 6, 0); + } + } else { + traceEventStopThread(cap, t, ret, 0); } -#endif ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); @@ -620,8 +509,6 @@ run_thread: schedulePostRunThread(cap,t); - t = threadStackUnderflow(task,t); - ready_to_gc = rtsFalse; switch (ret) { @@ -630,8 +517,11 @@ run_thread: break; case StackOverflow: - scheduleHandleStackOverflow(cap,task,t); - break; + // just adjust the stack for this thread, then pop it back + // on the run queue. + threadStackOverflow(cap, t); + pushOnRunQueue(cap,t); + break; case ThreadYielding: if (scheduleHandleYield(cap, t, prev_what_next)) { @@ -659,6 +549,30 @@ run_thread: } /* end of while() */ } +/* ----------------------------------------------------------------------------- + * Run queue operations + * -------------------------------------------------------------------------- */ + +void +removeFromRunQueue (Capability *cap, StgTSO *tso) +{ + if (tso->block_info.prev == END_TSO_QUEUE) { + ASSERT(cap->run_queue_hd == tso); + cap->run_queue_hd = tso->_link; + } else { + setTSOLink(cap, tso->block_info.prev, tso->_link); + } + if (tso->_link == END_TSO_QUEUE) { + ASSERT(cap->run_queue_tl == tso); + cap->run_queue_tl = tso->block_info.prev; + } else { + setTSOPrev(cap, tso->_link, tso->block_info.prev); + } + tso->_link = tso->block_info.prev = END_TSO_QUEUE; + + IF_DEBUG(sanity, checkRunQueue(cap)); +} + /* ---------------------------------------------------------------------------- * Setting up the scheduler loop * ------------------------------------------------------------------------- */ @@ -680,38 +594,13 @@ scheduleFindWork (Capability *cap) { scheduleStartSignalHandlers(cap); - // Only check the black holes here if we've nothing else to do. - // During normal execution, the black hole list only gets checked - // at GC time, to avoid repeatedly traversing this possibly long - // list each time around the scheduler. - if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - - scheduleCheckWakeupThreads(cap); + scheduleProcessInbox(cap); scheduleCheckBlockedThreads(cap); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } #endif - -#if defined(PARALLEL_HASKELL) - // if messages have been buffered... - scheduleSendPendingMessages(); -#endif - -#if defined(PARALLEL_HASKELL) - if (emptyRunQueue(cap)) { - receivedFinish = scheduleGetRemoteWork(cap); - continue; // a new round, (hopefully) with new work - /* - in GUM, this a) sends out a FISH and returns IF no fish is - out already - b) (blocking) awaits and receives messages - - in Eden, this is only the blocking receive, as b) in GUM. - */ - } -#endif } #if defined(THREADED_RTS) @@ -726,9 +615,9 @@ shouldYieldCapability (Capability *cap, Task *task) // and this task it bound). return (waiting_for_gc || cap->returning_tasks_hd != NULL || - (!emptyRunQueue(cap) && (task->tso == NULL + (!emptyRunQueue(cap) && (task->incall->tso == NULL ? cap->run_queue_hd->bound != NULL - : cap->run_queue_hd->bound != task))); + : cap->run_queue_hd->bound != task->incall))); } // This is the single place where a Task goes to sleep. There are @@ -747,10 +636,10 @@ scheduleYield (Capability **pcap, Task *task) Capability *cap = *pcap; // if we have work, and we don't need to give up the Capability, continue. + // if (!shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || - !emptyWakeupQueue(cap) || - blackholes_need_checking || + !emptyInbox(cap) || sched_state >= SCHED_INTERRUPTING)) return; @@ -785,7 +674,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; - // migration can be turned off with +RTS -qg + // migration can be turned off with +RTS -qm if (!RtsFlags.ParFlags.migrate) return; // Check whether we have more threads on our run queue, or sparks @@ -801,7 +690,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, for (i=0, n_free_caps=0; i < n_capabilities; i++) { cap0 = &capabilities[i]; if (cap != cap0 && tryGrabCapability(cap0,task)) { - if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) { + if (!emptyRunQueue(cap0) + || cap->returning_tasks_hd != NULL + || cap->inbox != (Message*)END_TSO_QUEUE) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! releaseCapability(cap0); @@ -843,29 +734,31 @@ schedulePushWork(Capability *cap USED_IF_THREADS, for (; t != END_TSO_QUEUE; t = next) { next = t->_link; t->_link = END_TSO_QUEUE; - if (t->what_next == ThreadRelocated - || t->bound == task // don't move my bound thread + if (t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread setTSOLink(cap, prev, t); + setTSOPrev(cap, t, prev); prev = t; } else if (i == n_free_caps) { pushed_to_all = rtsTrue; i = 0; // keep one for us setTSOLink(cap, prev, t); + setTSOPrev(cap, t, prev); prev = t; } else { - debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); - postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no); + traceEventMigrateThread (cap, t, free_caps[i]->no); - if (t->bound) { t->bound->cap = free_caps[i]; } + if (t->bound) { t->bound->task->cap = free_caps[i]; } t->cap = free_caps[i]; i++; } } cap->run_queue_tl = prev; + + IF_DEBUG(sanity, checkRunQueue(cap)); } #ifdef SPARK_PUSHING @@ -881,6 +774,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, spark = tryStealSpark(cap->sparks); if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); + + traceEventStealSpark(free_caps[i], t, cap->no); + newSpark(&(free_caps[i]->r), spark); } } @@ -935,72 +831,18 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) // if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) ) { - awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking ); - } -#endif -} - - -/* ---------------------------------------------------------------------------- - * Check for threads woken up by other Capabilities - * ------------------------------------------------------------------------- */ - -static void -scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) -{ -#if defined(THREADED_RTS) - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); + awaitEvent (emptyRunQueue(cap)); } #endif } /* ---------------------------------------------------------------------------- - * Check for threads blocked on BLACKHOLEs that can be woken up - * ------------------------------------------------------------------------- */ -static void -scheduleCheckBlackHoles (Capability *cap) -{ - if ( blackholes_need_checking ) // check without the lock first - { - ACQUIRE_LOCK(&sched_mutex); - if ( blackholes_need_checking ) { - blackholes_need_checking = rtsFalse; - // important that we reset the flag *before* checking the - // blackhole queue, otherwise we could get deadlock. This - // happens as follows: we wake up a thread that - // immediately runs on another Capability, blocks on a - // blackhole, and then we reset the blackholes_need_checking flag. - checkBlackHoles(cap); - } - RELEASE_LOCK(&sched_mutex); - } -} - -/* ---------------------------------------------------------------------------- * Detect deadlock conditions and attempt to resolve them. * ------------------------------------------------------------------------- */ static void scheduleDetectDeadlock (Capability *cap, Task *task) { - -#if defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL - return; -#endif - /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the @@ -1059,13 +901,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task) /* Probably a real deadlock. Send the current main thread the * Deadlock exception. */ - if (task->tso) { - switch (task->tso->why_blocked) { + if (task->incall->tso) { + switch (task->incall->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: - case BlockedOnException: + case BlockedOnMsgThrowTo: case BlockedOnMVar: - throwToSingleThreaded(cap, task->tso, + throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; default: @@ -1102,63 +944,62 @@ scheduleSendPendingMessages(void) #endif /* ---------------------------------------------------------------------------- - * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) + * Process message in the current Capability's inbox * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void -scheduleActivateSpark(Capability *cap) -{ - if (anySparks()) - { - createSparkThread(cap); - debugTrace(DEBUG_sched, "creating a spark thread"); - } -} -#endif // PARALLEL_HASKELL || THREADED_RTS - -/* ---------------------------------------------------------------------------- - * Get work from a remote node (PARALLEL_HASKELL only) - * ------------------------------------------------------------------------- */ - -#if defined(PARALLEL_HASKELL) -static rtsBool /* return value used in PARALLEL_HASKELL only */ -scheduleGetRemoteWork (Capability *cap STG_UNUSED) +scheduleProcessInbox (Capability *cap USED_IF_THREADS) { -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; - - // idle() , i.e. send all buffers, wait for work - if (RtsFlags.ParFlags.BufferTime) { - IF_PAR_DEBUG(verbose, - debugBelch("...send all pending data,")); - { - nat i; - for (i=1; i<=nPEs; i++) - sendImmediately(i); // send all messages away immediately - } - } - - /* this would be the place for fishing in GUM... +#if defined(THREADED_RTS) + Message *m, *next; + int r; - if (no-earlier-fish-around) - sendFish(choosePe()); - */ + while (!emptyInbox(cap)) { + if (cap->r.rCurrentNursery->link == NULL || + g0->n_new_large_words >= large_alloc_lim) { + scheduleDoGC(cap, cap->running_task, rtsFalse); + } - // Eden:just look for incoming messages (blocking receive) - IF_PAR_DEBUG(verbose, - debugBelch("...wait for incoming messages...\n")); - processMessages(cap, &receivedFinish); // blocking receive... + // don't use a blocking acquire; if the lock is held by + // another thread then just carry on. This seems to avoid + // getting stuck in a message ping-pong situation with other + // processors. We'll check the inbox again later anyway. + // + // We should really use a more efficient queue data structure + // here. The trickiness is that we must ensure a Capability + // never goes idle if the inbox is non-empty, which is why we + // use cap->lock (cap->lock is released as the last thing + // before going idle; see Capability.c:releaseCapability()). + r = TRY_ACQUIRE_LOCK(&cap->lock); + if (r != 0) return; + m = cap->inbox; + cap->inbox = (Message*)END_TSO_QUEUE; - return receivedFinish; - // reenter scheduling look after having received something + RELEASE_LOCK(&cap->lock); -#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */ + while (m != (Message*)END_TSO_QUEUE) { + next = m->link; + executeMessage(cap, m); + m = next; + } + } +#endif +} - return rtsFalse; /* return value unused in THREADED_RTS */ +/* ---------------------------------------------------------------------------- + * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) + * ------------------------------------------------------------------------- */ -#endif /* PARALLEL_HASKELL */ +#if defined(THREADED_RTS) +static void +scheduleActivateSpark(Capability *cap) +{ + if (anySparks()) + { + createSparkThread(cap); + debugTrace(DEBUG_sched, "creating a spark thread"); + } } #endif // PARALLEL_HASKELL || THREADED_RTS @@ -1190,7 +1031,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t) // partially-evaluated thunks on the heap. throwToSingleThreaded_(cap, t, NULL, rtsTrue); - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +// ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } } @@ -1212,19 +1053,21 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE; + if (blocks > BLOCKS_PER_MBLOCK) { + barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc); + } + debugTrace(DEBUG_sched, "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", - (long)t->id, whatNext_strs[t->what_next], blocks); + (long)t->id, what_next_strs[t->what_next], blocks); // don't do this if the nursery is (nearly) full, we'll GC first. if (cap->r.rCurrentNursery->link != NULL || cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop // if the nursery has only one block. - ACQUIRE_SM_LOCK - bd = allocGroup( blocks ); - RELEASE_SM_LOCK - cap->r.rNursery->n_blocks += blocks; + bd = allocGroup_lock(blocks); + cap->r.rNursery->n_blocks += blocks; // link the new group into the list bd->link = cap->r.rCurrentNursery; @@ -1232,10 +1075,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#if !defined(THREADED_RTS) - ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0 == cap->r.rNursery); -#endif cap->r.rNursery->blocks = bd; } cap->r.rCurrentNursery->u.back = bd; @@ -1250,8 +1089,8 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { bdescr *x; for (x = bd; x < bd + blocks; x++) { - x->step = cap->r.rNursery; - x->gen_no = 0; + initBdescr(x,g0,g0); + x->free = x->start; x->flags = 0; } } @@ -1272,17 +1111,13 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped: HeapOverflow", - (long)t->id, whatNext_strs[t->what_next]); - if (cap->r.rHpLim == NULL || cap->context_switch) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the // context switch flag, and we end up waiting for a GC. // See #1984, and concurrent/should_run/1984 cap->context_switch = 0; - addToRunQueue(cap,t); + appendToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); } @@ -1291,78 +1126,43 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } /* ----------------------------------------------------------------------------- - * Handle a thread that returned to the scheduler with ThreadStackOverflow - * -------------------------------------------------------------------------- */ - -static void -scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) -{ - debugTrace (DEBUG_sched, - "--<< thread %ld (%s) stopped, StackOverflow", - (long)t->id, whatNext_strs[t->what_next]); - - /* just adjust the stack for this thread, then pop it back - * on the run queue. - */ - { - /* enlarge the stack */ - StgTSO *new_t = threadStackOverflow(cap, t); - - /* The TSO attached to this Task may have moved, so update the - * pointer to it. - */ - if (task->tso == t) { - task->tso = new_t; - } - pushOnRunQueue(cap,new_t); - } -} - -/* ----------------------------------------------------------------------------- * Handle a thread that returned to the scheduler with ThreadYielding * -------------------------------------------------------------------------- */ static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) { - // Reset the context switch flag. We don't do this just before - // running the thread, because that would mean we would lose ticks - // during GC, which can lead to unfair scheduling (a thread hogs - // the CPU because the tick always arrives during GC). This way - // penalises threads that do a lot of allocation, but that seems - // better than the alternative. - cap->context_switch = 0; - /* put the thread back on the run queue. Then, if we're ready to * GC, check whether this is the last task to stop. If so, wake * up the GC thread. getThread will block during a GC until the * GC is finished. */ -#ifdef DEBUG - if (t->what_next != prev_what_next) { - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped to switch evaluators", - (long)t->id, whatNext_strs[t->what_next]); - } else { - debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped, yielding", - (long)t->id, whatNext_strs[t->what_next]); - } -#endif - - IF_DEBUG(sanity, - //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); - checkTSO(t)); + ASSERT(t->_link == END_TSO_QUEUE); // Shortcut if we're just switching evaluators: don't bother // doing stack squeezing (which can be expensive), just run the // thread. - if (t->what_next != prev_what_next) { + if (cap->context_switch == 0 && t->what_next != prev_what_next) { + debugTrace(DEBUG_sched, + "--<< thread %ld (%s) stopped to switch evaluators", + (long)t->id, what_next_strs[t->what_next]); return rtsTrue; } - addToRunQueue(cap,t); + // Reset the context switch flag. We don't do this just before + // running the thread, because that would mean we would lose ticks + // during GC, which can lead to unfair scheduling (a thread hogs + // the CPU because the tick always arrives during GC). This way + // penalises threads that do a lot of allocation, but that seems + // better than the alternative. + cap->context_switch = 0; + + IF_DEBUG(sanity, + //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); + checkTSO(t)); + + appendToRunQueue(cap,t); return rtsFalse; } @@ -1373,7 +1173,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) static void scheduleHandleThreadBlocked( StgTSO *t -#if !defined(GRAN) && !defined(DEBUG) +#if !defined(DEBUG) STG_UNUSED #endif ) @@ -1385,20 +1185,12 @@ scheduleHandleThreadBlocked( StgTSO *t // ASSERT(t->why_blocked != NotBlocked); // Not true: for example, - // - in THREADED_RTS, the thread may already have been woken - // up by another Capability. This actually happens: try - // conc023 +RTS -N2. // - the thread may have woken itself up already, because // threadPaused() might have raised a blocked throwTo // exception, see maybePerformBlockedException(). #ifdef DEBUG - if (traceClass(DEBUG_sched)) { - debugTraceBegin("--<< thread %lu (%s) stopped: ", - (unsigned long)t->id, whatNext_strs[t->what_next]); - printThreadBlockage(t); - debugTraceEnd(); - } + traceThreadStatus(DEBUG_sched, t); #endif } @@ -1415,13 +1207,9 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) * We also end up here if the thread kills itself with an * uncaught exception, see Exception.cmm. */ - debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", - (unsigned long)t->id, whatNext_strs[t->what_next]); // blocked exceptions can now complete, even if the thread was in - // blocked mode (see #2910). This unconditionally calls - // lockTSO(), which ensures that we don't miss any threads that - // are engaged in throwTo() with this thread as a target. + // blocked mode (see #2910). awakenBlockedExceptionQueue (cap, t); // @@ -1436,7 +1224,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) if (t->bound) { - if (t->bound != task) { + if (t->bound != task->incall) { #if !defined(THREADED_RTS) // Must be a bound thread that is not the topmost one. Leave // it on the run queue until the stack has unwound to the @@ -1453,31 +1241,42 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) #endif } - ASSERT(task->tso == t); + ASSERT(task->incall->tso == t); if (t->what_next == ThreadComplete) { - if (task->ret) { - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(task->ret) = (StgClosure *)task->tso->sp[1]; + if (task->incall->ret) { + // NOTE: return val is stack->sp[1] (see StgStartup.hc) + *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1]; } - task->stat = Success; + task->incall->stat = Success; } else { - if (task->ret) { - *(task->ret) = NULL; + if (task->incall->ret) { + *(task->incall->ret) = NULL; } if (sched_state >= SCHED_INTERRUPTING) { if (heap_overflow) { - task->stat = HeapExhausted; + task->incall->stat = HeapExhausted; } else { - task->stat = Interrupted; + task->incall->stat = Interrupted; } } else { - task->stat = Killed; + task->incall->stat = Killed; } } #ifdef DEBUG - removeThreadLabel((StgWord)task->tso->id); + removeThreadLabel((StgWord)task->incall->tso->id); #endif + + // We no longer consider this thread and task to be bound to + // each other. The TSO lives on until it is GC'd, but the + // task is about to be released by the caller, and we don't + // want anyone following the pointer from the TSO to the + // defunct task (which might have already been + // re-used). This was a real bug: the GC updated + // tso->bound->tso which lead to a deadlock. + t->bound = NULL; + task->incall->tso = NULL; + return rtsTrue; // tells schedule() to return } @@ -1528,7 +1327,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) if (sched_state < SCHED_INTERRUPTING && RtsFlags.ParFlags.parGcEnabled && N >= RtsFlags.ParFlags.parGcGen - && ! oldest_gen->steps[0].mark) + && ! oldest_gen->mark) { gc_type = PENDING_GC_PAR; } else { @@ -1568,7 +1367,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) if (gc_type == PENDING_GC_SEQ) { - postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0); + traceEventRequestSeqGc(cap); + } + else + { + traceEventRequestParGc(cap); + debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); + } + + if (gc_type == PENDING_GC_SEQ) + { // single-threaded GC: grab all the capabilities for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); @@ -1591,16 +1399,11 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { // multi-threaded GC: make sure all the Capabilities donate one // GC thread each. - postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0); - debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); - waitForGcThreads(cap); } + #endif - // so this happens periodically: - if (cap) scheduleCheckBlackHoles(cap); - IF_DEBUG(scheduler, printAllThreads()); delete_threads_and_gc: @@ -1616,9 +1419,8 @@ delete_threads_and_gc: heap_census = scheduleNeedHeapProfile(rtsTrue); + traceEventGcStart(cap); #if defined(THREADED_RTS) - postEvent(cap, EVENT_GC_START, 0, 0); - debugTrace(DEBUG_sched, "doing GC"); // reset waiting_for_gc *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. waiting_for_gc = 0; @@ -1626,7 +1428,7 @@ delete_threads_and_gc: #else GarbageCollect(force_major || heap_census, 0, cap); #endif - postEvent(cap, EVENT_GC_END, 0, 0); + traceEventGcEnd(cap); if (recent_activity == ACTIVITY_INACTIVE && force_major) { @@ -1645,6 +1447,12 @@ delete_threads_and_gc: recent_activity = ACTIVITY_YES; } + if (heap_census) { + debugTrace(DEBUG_sched, "performing heap census"); + heapCensus(); + performHeapProfile = rtsFalse; + } + #if defined(THREADED_RTS) if (gc_type == PENDING_GC_PAR) { @@ -1652,12 +1460,6 @@ delete_threads_and_gc: } #endif - if (heap_census) { - debugTrace(DEBUG_sched, "performing heap census"); - heapCensus(); - performHeapProfile = rtsFalse; - } - if (heap_overflow && sched_state < SCHED_INTERRUPTING) { // GC set the heap_overflow flag, so we should proceed with // an orderly shutdown now. Ultimately we want the main @@ -1715,11 +1517,10 @@ forkProcess(HsStablePtr *entry ) { #ifdef FORKPROCESS_PRIMOP_SUPPORTED - Task *task; pid_t pid; StgTSO* t,*next; Capability *cap; - nat s; + nat g; #if defined(THREADED_RTS) if (RtsFlags.ParFlags.nNodes > 1) { @@ -1741,10 +1542,18 @@ forkProcess(HsStablePtr *entry ACQUIRE_LOCK(&cap->lock); ACQUIRE_LOCK(&cap->running_task->lock); + stopTimer(); // See #4074 + +#if defined(TRACING) + flushEventLog(); // so that child won't inherit dirty file buffers +#endif + pid = fork(); if (pid) { // parent + startTimer(); // #4074 + RELEASE_LOCK(&sched_mutex); RELEASE_LOCK(&cap->lock); RELEASE_LOCK(&cap->running_task->lock); @@ -1761,23 +1570,30 @@ forkProcess(HsStablePtr *entry initMutex(&cap->running_task->lock); #endif - // Now, all OS threads except the thread that forked are +#ifdef TRACING + resetTracing(); +#endif + + // Now, all OS threads except the thread that forked are // stopped. We need to stop all Haskell threads, including // those involved in foreign calls. Also we need to delete // all Tasks, because they correspond to OS threads that are // now gone. - for (s = 0; s < total_steps; s++) { - for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->_link; - } else { - next = t->global_link; + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { + next = t->global_link; // don't allow threads to catch the ThreadKilled // exception, but we do want to raiseAsync() because these // threads may be evaluating thunks that we need later. deleteThread_(cap,t); - } + + // stop the GC from updating the InCall to point to + // the TSO. This is only necessary because the + // OSThread bound to the TSO has been killed, and + // won't get a chance to exit in the usual way (see + // also scheduleHandleThreadFinished). + t->bound = NULL; } } @@ -1790,31 +1606,22 @@ forkProcess(HsStablePtr *entry // Any suspended C-calling Tasks are no more, their OS threads // don't exist now: - cap->suspended_ccalling_tasks = NULL; + cap->suspended_ccalls = NULL; // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. - for (s = 0; s < total_steps; s++) { - all_steps[s].threads = END_TSO_QUEUE; + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + generations[g].threads = END_TSO_QUEUE; } - // Wipe the task list, except the current Task. - ACQUIRE_LOCK(&sched_mutex); - for (task = all_tasks; task != NULL; task=task->all_link) { - if (task != cap->running_task) { -#if defined(THREADED_RTS) - initMutex(&task->lock); // see #1391 -#endif - discardTask(task); - } - } - RELEASE_LOCK(&sched_mutex); + discardTasksExcept(cap->running_task); #if defined(THREADED_RTS) // Wipe our spare workers list, they no longer exist. New // workers will be created if necessary. cap->spare_workers = NULL; - cap->returning_tasks_hd = NULL; + cap->n_spare_workers = 0; + cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; #endif @@ -1823,6 +1630,10 @@ forkProcess(HsStablePtr *entry initTimer(); startTimer(); +#if defined(THREADED_RTS) + cap = ioManagerStartCap(cap); +#endif + cap = rts_evalStableIO(cap, entry, NULL); // run the action rts_checkSchedStatus("forkProcess",cap); @@ -1832,7 +1643,6 @@ forkProcess(HsStablePtr *entry } #else /* !FORKPROCESS_PRIMOP_SUPPORTED */ barf("forkProcess#: primop not supported on this platform, sorry!\n"); - return -1; #endif } @@ -1846,19 +1656,15 @@ deleteAllThreads ( Capability *cap ) // NOTE: only safe to call if we own all capabilities. StgTSO* t, *next; - nat s; + nat g; debugTrace(DEBUG_sched,"deleting all threads"); - for (s = 0; s < total_steps; s++) { - for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->_link; - } else { - next = t->global_link; - deleteThread(cap,t); - } - } - } + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { + next = t->global_link; + deleteThread(cap,t); + } + } // The run queue now contains a bunch of ThreadKilled threads. We // must not throw these away: the main thread(s) will be in there @@ -1873,35 +1679,41 @@ deleteAllThreads ( Capability *cap ) } /* ----------------------------------------------------------------------------- - Managing the suspended_ccalling_tasks list. + Managing the suspended_ccalls list. Locks required: sched_mutex -------------------------------------------------------------------------- */ STATIC_INLINE void suspendTask (Capability *cap, Task *task) { - ASSERT(task->next == NULL && task->prev == NULL); - task->next = cap->suspended_ccalling_tasks; - task->prev = NULL; - if (cap->suspended_ccalling_tasks) { - cap->suspended_ccalling_tasks->prev = task; - } - cap->suspended_ccalling_tasks = task; + InCall *incall; + + incall = task->incall; + ASSERT(incall->next == NULL && incall->prev == NULL); + incall->next = cap->suspended_ccalls; + incall->prev = NULL; + if (cap->suspended_ccalls) { + cap->suspended_ccalls->prev = incall; + } + cap->suspended_ccalls = incall; } STATIC_INLINE void recoverSuspendedTask (Capability *cap, Task *task) { - if (task->prev) { - task->prev->next = task->next; + InCall *incall; + + incall = task->incall; + if (incall->prev) { + incall->prev->next = incall->next; } else { - ASSERT(cap->suspended_ccalling_tasks == task); - cap->suspended_ccalling_tasks = task->next; + ASSERT(cap->suspended_ccalls == incall); + cap->suspended_ccalls = incall->next; } - if (task->next) { - task->next->prev = task->prev; + if (incall->next) { + incall->next->prev = incall->prev; } - task->next = task->prev = NULL; + incall->next = incall->prev = NULL; } /* --------------------------------------------------------------------------- @@ -1914,13 +1726,17 @@ recoverSuspendedTask (Capability *cap, Task *task) * the whole system. * * The Haskell thread making the C call is put to sleep for the - * duration of the call, on the susepended_ccalling_threads queue. We + * duration of the call, on the suspended_ccalling_threads queue. We * give out a token to the task, which it can use to resume the thread * on return from the C function. + * + * If this is an interruptible C call, this means that the FFI call may be + * unceremoniously terminated and should be scheduled on an + * unbound worker thread. * ------------------------------------------------------------------------- */ void * -suspendThread (StgRegTable *reg) +suspendThread (StgRegTable *reg, rtsBool interruptible) { Capability *cap; int saved_errno; @@ -1942,26 +1758,22 @@ suspendThread (StgRegTable *reg) task = cap->running_task; tso = cap->r.rCurrentTSO; - postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL); - debugTrace(DEBUG_sched, - "thread %lu did a safe foreign call", - (unsigned long)cap->r.rCurrentTSO->id); + traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0); // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; threadPaused(cap,tso); - if ((tso->flags & TSO_BLOCKEX) == 0) { - tso->why_blocked = BlockedOnCCall; - tso->flags |= TSO_BLOCKEX; - tso->flags &= ~TSO_INTERRUPTIBLE; + if (interruptible) { + tso->why_blocked = BlockedOnCCall_Interruptible; } else { - tso->why_blocked = BlockedOnCCall_NoUnblockExc; + tso->why_blocked = BlockedOnCCall; } // Hand back capability - task->suspended_tso = tso; + task->incall->suspended_tso = tso; + task->incall->suspended_cap = cap; ACQUIRE_LOCK(&cap->lock); @@ -1971,13 +1783,6 @@ suspendThread (StgRegTable *reg) RELEASE_LOCK(&cap->lock); -#if defined(THREADED_RTS) - /* Preparing to leave the RTS, so ensure there's a native thread/task - waiting to take over. - */ - debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id); -#endif - errno = saved_errno; #if mingw32_HOST_OS SetLastError(saved_winerror); @@ -1989,6 +1794,7 @@ StgRegTable * resumeThread (void *task_) { StgTSO *tso; + InCall *incall; Capability *cap; Task *task = task_; int saved_errno; @@ -2001,34 +1807,36 @@ resumeThread (void *task_) saved_winerror = GetLastError(); #endif - cap = task->cap; + incall = task->incall; + cap = incall->suspended_cap; + task->cap = cap; + // Wait for permission to re-enter the RTS with the result. waitForReturnCapability(&cap,task); // we might be on a different capability now... but if so, our - // entry on the suspended_ccalling_tasks list will also have been + // entry on the suspended_ccalls list will also have been // migrated. // Remove the thread from the suspended list recoverSuspendedTask(cap,task); - tso = task->suspended_tso; - task->suspended_tso = NULL; + tso = incall->suspended_tso; + incall->suspended_tso = NULL; + incall->suspended_cap = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd - postEvent(cap, EVENT_RUN_THREAD, tso->id, 0); - debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); + traceEventRunThread(cap, tso); - if (tso->why_blocked == BlockedOnCCall) { + /* Reset blocking status */ + tso->why_blocked = NotBlocked; + + if ((tso->flags & TSO_BLOCKEX) == 0) { // avoid locking the TSO if we don't have to - if (tso->blocked_exceptions != END_TSO_QUEUE) { - awakenBlockedExceptionQueue(cap,tso); + if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) { + maybePerformBlockedException(cap,tso); } - tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); } - /* Reset blocking status */ - tso->why_blocked = NotBlocked; - cap->r.rCurrentTSO = tso; cap->in_haskell = rtsTrue; errno = saved_errno; @@ -2038,6 +1846,7 @@ resumeThread (void *task_) /* We might have GC'd, mark the TSO dirty again */ dirty_TSO(cap,tso); + dirty_STACK(cap,tso->stackobj); IF_DEBUG(sanity, checkTSO(tso)); @@ -2065,15 +1874,14 @@ scheduleThread(Capability *cap, StgTSO *tso) void scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) { -#if defined(THREADED_RTS) tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't // move this thread from now on. +#if defined(THREADED_RTS) cpu %= RtsFlags.ParFlags.nNodes; if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no); - wakeupThreadOnCapability(cap, &capabilities[cpu], tso); + migrateThread(cap, tso, &capabilities[cpu]); } #else appendToRunQueue(cap,tso); @@ -2084,29 +1892,31 @@ Capability * scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { Task *task; + StgThreadID id; // We already created/initialised the Task task = cap->running_task; // This TSO is now a bound thread; make the Task and TSO // point to each other. - tso->bound = task; + tso->bound = task->incall; tso->cap = cap; - task->tso = tso; - task->ret = ret; - task->stat = NoStatus; + task->incall->tso = tso; + task->incall->ret = ret; + task->incall->stat = NoStatus; appendToRunQueue(cap,tso); - debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id); + id = tso->id; + debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id); cap = schedule(cap,task); - ASSERT(task->stat != NoStatus); + ASSERT(task->incall->stat != NoStatus); ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id); + debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id); return cap; } @@ -2115,23 +1925,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) -void OSThreadProcAttr -workerStart(Task *task) +void scheduleWorker (Capability *cap, Task *task) { - Capability *cap; - - // See startWorkerTask(). - ACQUIRE_LOCK(&task->lock); - cap = task->cap; - RELEASE_LOCK(&task->lock); - - if (RtsFlags.ParFlags.setAffinity) { - setThreadAffinity(cap->no, n_capabilities); - } - - // set the thread-local pointer to the Task: - taskEnter(task); - // schedule() runs without a lock. cap = schedule(cap,task); @@ -2172,8 +1967,6 @@ initScheduler(void) sleeping_queue = END_TSO_QUEUE; #endif - blackhole_queue = END_TSO_QUEUE; - sched_state = SCHED_RUNNING; recent_activity = ACTIVITY_YES; @@ -2193,10 +1986,12 @@ initScheduler(void) initTaskManager(); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) initSparkPools(); #endif + RELEASE_LOCK(&sched_mutex); + #if defined(THREADED_RTS) /* * Eagerly start one worker to run each Capability, except for @@ -2210,35 +2005,27 @@ initScheduler(void) for (i = 1; i < n_capabilities; i++) { cap = &capabilities[i]; ACQUIRE_LOCK(&cap->lock); - startWorkerTask(cap, workerStart); + startWorkerTask(cap); RELEASE_LOCK(&cap->lock); } } #endif - - RELEASE_LOCK(&sched_mutex); } void -exitScheduler( - rtsBool wait_foreign -#if !defined(THREADED_RTS) - __attribute__((unused)) -#endif -) +exitScheduler (rtsBool wait_foreign USED_IF_THREADS) /* see Capability.c, shutdownCapability() */ { Task *task = NULL; - ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; waitForReturnCapability(&task->cap,task); - scheduleDoGC(task->cap,task,rtsFalse); + scheduleDoGC(task->cap,task,rtsFalse); + ASSERT(task->incall->tso == NULL); releaseCapability(task->cap); } sched_state = SCHED_SHUTTING_DOWN; @@ -2248,11 +2035,13 @@ exitScheduler( nat i; for (i = 0; i < n_capabilities; i++) { + ASSERT(task->incall->tso == NULL); shutdownCapability(&capabilities[i], task, wait_foreign); } - boundTaskExiting(task); } #endif + + boundTaskExiting(task); } void @@ -2280,6 +2069,16 @@ freeScheduler( void ) #endif } +void markScheduler (evac_fn evac USED_IF_NOT_THREADS, + void *user USED_IF_NOT_THREADS) +{ +#if !defined(THREADED_RTS) + evac(user, (StgClosure **)(void *)&blocked_queue_hd); + evac(user, (StgClosure **)(void *)&blocked_queue_tl); + evac(user, (StgClosure **)(void *)&sleeping_queue); +#endif +} + /* ----------------------------------------------------------------------------- performGC @@ -2295,10 +2094,8 @@ performGC_(rtsBool force_major) // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the - // suspended_ccalling_tasks queue. - ACQUIRE_LOCK(&sched_mutex); + // suspended_ccalls queue. task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); waitForReturnCapability(&task->cap,task); scheduleDoGC(task->cap,task,force_major); @@ -2318,164 +2115,6 @@ performMajorGC(void) performGC_(rtsTrue); } -/* ----------------------------------------------------------------------------- - Stack overflow - - If the thread has reached its maximum stack size, then raise the - StackOverflow exception in the offending thread. Otherwise - relocate the TSO into a larger chunk of memory and adjust its stack - size appropriately. - -------------------------------------------------------------------------- */ - -static StgTSO * -threadStackOverflow(Capability *cap, StgTSO *tso) -{ - nat new_stack_size, stack_words; - lnat new_tso_size; - StgPtr new_sp; - StgTSO *dest; - - IF_DEBUG(sanity,checkTSO(tso)); - - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - - if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) { - // NB. never raise a StackOverflow exception if the thread is - // inside Control.Exceptino.block. It is impractical to protect - // against stack overflow exceptions, since virtually anything - // can raise one (even 'catch'), so this is the only sensible - // thing to do here. See bug #767. - - debugTrace(DEBUG_gc, - "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)", - (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size); - IF_DEBUG(gc, - /* If we're debugging, just print out the top of the stack */ - printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, - tso->sp+64))); - - // Send this thread the StackOverflow exception - unlockTSO(tso); - throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure); - return tso; - } - - /* Try to double the current stack size. If that takes us over the - * maximum stack size for this thread, then use the maximum instead - * (that is, unless we're already at or over the max size and we - * can't raise the StackOverflow exception (see above), in which - * case just double the size). Finally round up so the TSO ends up as - * a whole number of blocks. - */ - if (tso->stack_size >= tso->max_stack_size) { - new_stack_size = tso->stack_size * 2; - } else { - new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size); - } - new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + - TSO_STRUCT_SIZE)/sizeof(W_); - new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ - new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - - debugTrace(DEBUG_sched, - "increasing stack size from %ld words to %d.", - (long)tso->stack_size, new_stack_size); - - dest = (StgTSO *)allocateLocal(cap,new_tso_size); - TICK_ALLOC_TSO(new_stack_size,0); - - /* copy the TSO block and the old stack into the new area */ - memcpy(dest,tso,TSO_STRUCT_SIZE); - stack_words = tso->stack + tso->stack_size - tso->sp; - new_sp = (P_)dest + new_tso_size - stack_words; - memcpy(new_sp, tso->sp, stack_words * sizeof(W_)); - - /* relocate the stack pointers... */ - dest->sp = new_sp; - dest->stack_size = new_stack_size; - - /* Mark the old TSO as relocated. We have to check for relocated - * TSOs in the garbage collector and any primops that deal with TSOs. - * - * It's important to set the sp value to just beyond the end - * of the stack, so we don't attempt to scavenge any part of the - * dead TSO's stack. - */ - tso->what_next = ThreadRelocated; - setTSOLink(cap,tso,dest); - tso->sp = (P_)&(tso->stack[tso->stack_size]); - tso->why_blocked = NotBlocked; - - IF_PAR_DEBUG(verbose, - debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n", - tso->id, tso, tso->stack_size); - /* If we're debugging, just print out the top of the stack */ - printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, - tso->sp+64))); - - unlockTSO(dest); - unlockTSO(tso); - - IF_DEBUG(sanity,checkTSO(dest)); -#if 0 - IF_DEBUG(scheduler,printTSO(dest)); -#endif - - return dest; -} - -static StgTSO * -threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) -{ - bdescr *bd, *new_bd; - lnat free_w, tso_size_w; - StgTSO *new_tso; - - tso_size_w = tso_sizeW(tso); - - if (tso_size_w < MBLOCK_SIZE_W || - (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) - { - return tso; - } - - // don't allow throwTo() to modify the blocked_exceptions queue - // while we are moving the TSO: - lockClosure((StgClosure *)tso); - - // this is the number of words we'll free - free_w = round_to_mblocks(tso_size_w/2); - - bd = Bdescr((StgPtr)tso); - new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W); - bd->free = bd->start + TSO_STRUCT_SIZEW; - - new_tso = (StgTSO *)new_bd->start; - memcpy(new_tso,tso,TSO_STRUCT_SIZE); - new_tso->stack_size = new_bd->free - new_tso->stack; - - debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", - (long)tso->id, tso_size_w, tso_sizeW(new_tso)); - - tso->what_next = ThreadRelocated; - tso->_link = new_tso; // no write barrier reqd: same generation - - // The TSO attached to this Task may have moved, so update the - // pointer to it. - if (task->tso == tso) { - task->tso = new_tso; - } - - unlockTSO(new_tso); - unlockTSO(tso); - - IF_DEBUG(sanity,checkTSO(new_tso)); - - return new_tso; -} - /* --------------------------------------------------------------------------- Interrupt execution - usually called inside a signal handler so it mustn't do anything fancy. @@ -2486,7 +2125,9 @@ interruptStgRts(void) { sched_state = SCHED_INTERRUPTING; setContextSwitches(); +#if defined(THREADED_RTS) wakeUpRts(); +#endif } /* ----------------------------------------------------------------------------- @@ -2502,67 +2143,15 @@ interruptStgRts(void) will have interrupted any blocking system call in progress anyway. -------------------------------------------------------------------------- */ -void -wakeUpRts(void) -{ #if defined(THREADED_RTS) +void wakeUpRts(void) +{ // This forces the IO Manager thread to wakeup, which will // in turn ensure that some OS thread wakes up and runs the // scheduler loop, which will cause a GC and deadlock check. ioManagerWakeup(); -#endif -} - -/* ----------------------------------------------------------------------------- - * checkBlackHoles() - * - * Check the blackhole_queue for threads that can be woken up. We do - * this periodically: before every GC, and whenever the run queue is - * empty. - * - * An elegant solution might be to just wake up all the blocked - * threads with awakenBlockedQueue occasionally: they'll go back to - * sleep again if the object is still a BLACKHOLE. Unfortunately this - * doesn't give us a way to tell whether we've actually managed to - * wake up any threads, so we would be busy-waiting. - * - * -------------------------------------------------------------------------- */ - -static rtsBool -checkBlackHoles (Capability *cap) -{ - StgTSO **prev, *t; - rtsBool any_woke_up = rtsFalse; - StgHalfWord type; - - // blackhole_queue is global: - ASSERT_LOCK_HELD(&sched_mutex); - - debugTrace(DEBUG_sched, "checking threads blocked on black holes"); - - // ASSUMES: sched_mutex - prev = &blackhole_queue; - t = blackhole_queue; - while (t != END_TSO_QUEUE) { - if (t->what_next == ThreadRelocated) { - t = t->_link; - continue; - } - ASSERT(t->why_blocked == BlockedOnBlackHole); - type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; - if (type != BLACKHOLE && type != CAF_BLACKHOLE) { - IF_DEBUG(sanity,checkTSO(t)); - t = unblockOne(cap, t); - *prev = t; - any_woke_up = rtsTrue; - } else { - prev = &t->_link; - t = t->_link; - } - } - - return any_woke_up; } +#endif /* ----------------------------------------------------------------------------- Deleting threads @@ -2572,8 +2161,8 @@ checkBlackHoles (Capability *cap) exception. -------------------------------------------------------------------------- */ -static void -deleteThread (Capability *cap, StgTSO *tso) +static void +deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) { // NOTE: must only be called on a TSO that we have exclusive // access to, because we will call throwToSingleThreaded() below. @@ -2581,21 +2170,21 @@ deleteThread (Capability *cap, StgTSO *tso) // we must own all Capabilities. if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - throwToSingleThreaded(cap,tso,NULL); + tso->why_blocked != BlockedOnCCall_Interruptible) { + throwToSingleThreaded(tso->cap,tso,NULL); } } #ifdef FORKPROCESS_PRIMOP_SUPPORTED -static void +static void deleteThread_(Capability *cap, StgTSO *tso) { // for forkProcess only: // like deleteThread(), but we delete threads in foreign calls, too. if (tso->why_blocked == BlockedOnCCall || - tso->why_blocked == BlockedOnCCall_NoUnblockExc) { - unblockOne(cap,tso); + tso->why_blocked == BlockedOnCCall_Interruptible) { tso->what_next = ThreadKilled; + appendToRunQueue(tso->cap, tso); } else { deleteThread(cap,tso); } @@ -2641,7 +2230,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // we update any closures pointed to from update frames with the // raise closure that we just built. // - p = tso->sp; + p = tso->stackobj->sp; while(1) { info = get_ret_itbl((StgClosure *)p); next = p + stack_frame_sizeW((StgClosure *)p); @@ -2651,30 +2240,37 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); + (StgThunk *)allocate(cap,sizeofW(StgThunk)+1); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure); + updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee, + (StgClosure *)raise_closure); p = next; continue; case ATOMICALLY_FRAME: debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p); - tso->sp = p; + tso->stackobj->sp = p; return ATOMICALLY_FRAME; case CATCH_FRAME: - tso->sp = p; + tso->stackobj->sp = p; return CATCH_FRAME; case CATCH_STM_FRAME: debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p); - tso->sp = p; + tso->stackobj->sp = p; return CATCH_STM_FRAME; - case STOP_FRAME: - tso->sp = p; + case UNDERFLOW_FRAME: + tso->stackobj->sp = p; + threadStackUnderflow(cap,tso); + p = tso->stackobj->sp; + continue; + + case STOP_FRAME: + tso->stackobj->sp = p; return STOP_FRAME; case CATCH_RETRY_FRAME: @@ -2704,12 +2300,12 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) -------------------------------------------------------------------------- */ StgWord -findRetryFrameHelper (StgTSO *tso) +findRetryFrameHelper (Capability *cap, StgTSO *tso) { StgPtr p, next; StgRetInfoTable *info; - p = tso -> sp; + p = tso->stackobj->sp; while (1) { info = get_ret_itbl((StgClosure *)p); next = p + stack_frame_sizeW((StgClosure *)p); @@ -2718,28 +2314,32 @@ findRetryFrameHelper (StgTSO *tso) case ATOMICALLY_FRAME: debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p during retry", p); - tso->sp = p; + tso->stackobj->sp = p; return ATOMICALLY_FRAME; case CATCH_RETRY_FRAME: debugTrace(DEBUG_stm, "found CATCH_RETRY_FRAME at %p during retrry", p); - tso->sp = p; + tso->stackobj->sp = p; return CATCH_RETRY_FRAME; case CATCH_STM_FRAME: { StgTRecHeader *trec = tso -> trec; - StgTRecHeader *outer = stmGetEnclosingTRec(trec); + StgTRecHeader *outer = trec -> enclosing_trec; debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p during retry", p); debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer); - stmAbortTransaction(tso -> cap, trec); - stmFreeAbortedTRec(tso -> cap, trec); + stmAbortTransaction(cap, trec); + stmFreeAbortedTRec(cap, trec); tso -> trec = outer; p = next; continue; } + case UNDERFLOW_FRAME: + threadStackUnderflow(cap,tso); + p = tso->stackobj->sp; + continue; default: ASSERT(info->i.type != CATCH_FRAME); @@ -2765,14 +2365,14 @@ resurrectThreads (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; - step *step; + generation *gen; for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; - step = Bdescr((P_)tso)->step; - tso->global_link = step->threads; - step->threads = tso; + gen = Bdescr((P_)tso)->gen; + tso->global_link = gen->threads; + gen->threads = tso; debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id); @@ -2781,10 +2381,9 @@ resurrectThreads (StgTSO *threads) switch (tso->why_blocked) { case BlockedOnMVar: - case BlockedOnException: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, - (StgClosure *)blockedOnDeadMVar_closure); + (StgClosure *)blockedIndefinitelyOnMVar_closure); break; case BlockedOnBlackHole: throwToSingleThreaded(cap, tso, @@ -2792,7 +2391,7 @@ resurrectThreads (StgTSO *threads) break; case BlockedOnSTM: throwToSingleThreaded(cap, tso, - (StgClosure *)blockedIndefinitely_closure); + (StgClosure *)blockedIndefinitelyOnSTM_closure); break; case NotBlocked: /* This might happen if the thread was blocked on a black hole @@ -2800,42 +2399,15 @@ resurrectThreads (StgTSO *threads) * can wake up threads, remember...). */ continue; + case BlockedOnMsgThrowTo: + // This can happen if the target is masking, blocks on a + // black hole, and then is found to be unreachable. In + // this case, we want to let the target wake up and carry + // on, and do nothing to this thread. + continue; default: - barf("resurrectThreads: thread blocked in a strange way"); + barf("resurrectThreads: thread blocked in a strange way: %d", + tso->why_blocked); } } } - -/* ----------------------------------------------------------------------------- - performPendingThrowTos is called after garbage collection, and - passed a list of threads that were found to have pending throwTos - (tso->blocked_exceptions was not empty), and were blocked. - Normally this doesn't happen, because we would deliver the - exception directly if the target thread is blocked, but there are - small windows where it might occur on a multiprocessor (see - throwTo()). - - NB. we must be holding all the capabilities at this point, just - like resurrectThreads(). - -------------------------------------------------------------------------- */ - -void -performPendingThrowTos (StgTSO *threads) -{ - StgTSO *tso, *next; - Capability *cap; - step *step; - - for (tso = threads; tso != END_TSO_QUEUE; tso = next) { - next = tso->global_link; - - step = Bdescr((P_)tso)->step; - tso->global_link = step->threads; - step->threads = tso; - - debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); - - cap = tso->cap; - maybePerformBlockedException(cap, tso); - } -}