X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=fb63796a9a619d22312218d7119f627e149e61ad;hb=b0758d03f1a00ab0adf57f8157602ef22e8cdd13;hp=e82115bc2997a2380369c4e1ca790bfb58532e6a;hpb=e8d7985d56595f6b8004546bedc41627ca70c528;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index e82115b..fb63796 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -9,34 +9,25 @@ #include "PosixSource.h" #define KEEP_LOCKCLOSURE #include "Rts.h" -#include "SchedAPI.h" + +#include "sm/Storage.h" #include "RtsUtils.h" -#include "RtsFlags.h" -#include "OSThreads.h" -#include "Storage.h" #include "StgRun.h" -#include "Hooks.h" #include "Schedule.h" -#include "StgMiscClosures.h" #include "Interpreter.h" #include "Printer.h" #include "RtsSignals.h" #include "Sanity.h" #include "Stats.h" #include "STM.h" -#include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" -#include "LdvProfile.h" #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" -#include "GC.h" #include "Weak.h" -#include "EventLog.h" - -/* PARALLEL_HASKELL includes go here */ - +#include "eventlog/EventLog.h" +#include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N #include "Sparks.h" #include "Capability.h" #include "Task.h" @@ -47,7 +38,8 @@ #include "Trace.h" #include "RaiseAsync.h" #include "Threads.h" -#include "ThrIOManager.h" +#include "Timer.h" +#include "ThreadPaused.h" #ifdef HAVE_SYS_TYPES_H #include @@ -64,12 +56,6 @@ #include #endif -// Turn off inlining when debugging - it obfuscates things -#ifdef DEBUG -# undef STATIC_INLINE -# define STATIC_INLINE static -#endif - /* ----------------------------------------------------------------------------- * Global variables * -------------------------------------------------------------------------- */ @@ -150,7 +136,7 @@ static Capability *schedule (Capability *initialCapability, Task *task); static void schedulePreLoop (void); static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) -static void scheduleYield (Capability **pcap, Task *task); +static void scheduleYield (Capability **pcap, Task *task, rtsBool); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); @@ -158,11 +144,7 @@ static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) -static rtsBool scheduleGetRemoteWork(Capability *cap); -static void scheduleSendPendingMessages(void); -#endif -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); @@ -192,12 +174,12 @@ static void deleteThread_(Capability *cap, StgTSO *tso); #ifdef DEBUG static char *whatNext_strs[] = { - "(unknown)", - "ThreadRunGHC", - "ThreadInterpret", - "ThreadKilled", - "ThreadRelocated", - "ThreadComplete" + [0] = "(unknown)", + [ThreadRunGHC] = "ThreadRunGHC", + [ThreadInterpret] = "ThreadInterpret", + [ThreadKilled] = "ThreadKilled", + [ThreadRelocated] = "ThreadRelocated", + [ThreadComplete] = "ThreadComplete" }; #endif @@ -208,18 +190,8 @@ static char *whatNext_strs[] = { STATIC_INLINE void addToRunQueue( Capability *cap, StgTSO *t ) { -#if defined(PARALLEL_HASKELL) - if (RtsFlags.ParFlags.doFairScheduling) { - // this does round-robin scheduling; good for concurrency - appendToRunQueue(cap,t); - } else { - // this does unfair scheduling; good for parallelism - pushOnRunQueue(cap,t); - } -#else // this does round-robin scheduling; good for concurrency appendToRunQueue(cap,t); -#endif } /* --------------------------------------------------------------------------- @@ -264,13 +236,11 @@ schedule (Capability *initialCapability, Task *task) StgTSO *t; Capability *cap; StgThreadReturnCode ret; -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; -#endif nat prev_what_next; rtsBool ready_to_gc; #if defined(THREADED_RTS) rtsBool first = rtsTrue; + rtsBool force_yield = rtsFalse; #endif cap = initialCapability; @@ -283,24 +253,12 @@ schedule (Capability *initialCapability, Task *task) "### NEW SCHEDULER LOOP (task: %p, cap: %p)", task, initialCapability); - if (running_finalizers) { - errorBelch("error: a C finalizer called back into Haskell.\n" - " use Foreign.Concurrent.newForeignPtr for Haskell finalizers."); - stg_exit(EXIT_FAILURE); - } - schedulePreLoop(); // ----------------------------------------------------------- // Scheduler loop starts here: -#if defined(PARALLEL_HASKELL) -#define TERMINATION_CONDITION (!receivedFinish) -#else -#define TERMINATION_CONDITION rtsTrue -#endif - - while (TERMINATION_CONDITION) { + while (1) { // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -382,21 +340,6 @@ schedule (Capability *initialCapability, Task *task) (pushes threads, wakes up idle capabilities for stealing) */ schedulePushWork(cap,task); -#if defined(PARALLEL_HASKELL) - /* since we perform a blocking receive and continue otherwise, - either we never reach here or we definitely have work! */ - // from here: non-empty run queue - ASSERT(!emptyRunQueue(cap)); - - if (PacketsWaiting()) { /* now process incoming messages, if any - pending... - - CAUTION: scheduleGetRemoteWork called - above, waits for messages as well! */ - processMessages(cap, &receivedFinish); - } -#endif // PARALLEL_HASKELL: non-empty run queue! - scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) @@ -424,7 +367,9 @@ schedule (Capability *initialCapability, Task *task) } yield: - scheduleYield(&cap,task); + scheduleYield(&cap,task,force_yield); + force_yield = rtsFalse; + if (emptyRunQueue(cap)) continue; // look for work again #endif @@ -603,6 +548,7 @@ run_thread: debugTrace(DEBUG_sched, "--<< thread %lu (%s) stopped: blocked", (unsigned long)t->id, whatNext_strs[t->what_next]); + force_yield = rtsTrue; goto yield; } #endif @@ -620,7 +566,9 @@ run_thread: schedulePostRunThread(cap,t); - t = threadStackUnderflow(task,t); + if (ret != StackOverflow) { + t = threadStackUnderflow(task,t); + } ready_to_gc = rtsFalse; @@ -690,28 +638,9 @@ scheduleFindWork (Capability *cap) scheduleCheckBlockedThreads(cap); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } #endif - -#if defined(PARALLEL_HASKELL) - // if messages have been buffered... - scheduleSendPendingMessages(); -#endif - -#if defined(PARALLEL_HASKELL) - if (emptyRunQueue(cap)) { - receivedFinish = scheduleGetRemoteWork(cap); - continue; // a new round, (hopefully) with new work - /* - in GUM, this a) sends out a FISH and returns IF no fish is - out already - b) (blocking) awaits and receives messages - - in Eden, this is only the blocking receive, as b) in GUM. - */ - } -#endif } #if defined(THREADED_RTS) @@ -742,12 +671,23 @@ shouldYieldCapability (Capability *cap, Task *task) // and also check the benchmarks in nofib/parallel for regressions. static void -scheduleYield (Capability **pcap, Task *task) +scheduleYield (Capability **pcap, Task *task, rtsBool force_yield) { Capability *cap = *pcap; // if we have work, and we don't need to give up the Capability, continue. - if (!shouldYieldCapability(cap,task) && + // + // The force_yield flag is used when a bound thread blocks. This + // is a particularly tricky situation: the current Task does not + // own the TSO any more, since it is on some queue somewhere, and + // might be woken up or manipulated by another thread at any time. + // The TSO and Task might be migrated to another Capability. + // Certain invariants might be in doubt, such as task->bound->cap + // == cap. We have to yield the current Capability immediately, + // no messing around. + // + if (!force_yield && + !shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) || blackholes_need_checking || @@ -998,12 +938,6 @@ scheduleCheckBlackHoles (Capability *cap) static void scheduleDetectDeadlock (Capability *cap, Task *task) { - -#if defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL - return; -#endif - /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the @@ -1108,7 +1042,7 @@ scheduleSendPendingMessages(void) * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap) { @@ -1121,51 +1055,6 @@ scheduleActivateSpark(Capability *cap) #endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- - * Get work from a remote node (PARALLEL_HASKELL only) - * ------------------------------------------------------------------------- */ - -#if defined(PARALLEL_HASKELL) -static rtsBool /* return value used in PARALLEL_HASKELL only */ -scheduleGetRemoteWork (Capability *cap STG_UNUSED) -{ -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; - - // idle() , i.e. send all buffers, wait for work - if (RtsFlags.ParFlags.BufferTime) { - IF_PAR_DEBUG(verbose, - debugBelch("...send all pending data,")); - { - nat i; - for (i=1; i<=nPEs; i++) - sendImmediately(i); // send all messages away immediately - } - } - - /* this would be the place for fishing in GUM... - - if (no-earlier-fish-around) - sendFish(choosePe()); - */ - - // Eden:just look for incoming messages (blocking receive) - IF_PAR_DEBUG(verbose, - debugBelch("...wait for incoming messages...\n")); - processMessages(cap, &receivedFinish); // blocking receive... - - - return receivedFinish; - // reenter scheduling look after having received something - -#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */ - - return rtsFalse; /* return value unused in THREADED_RTS */ - -#endif /* PARALLEL_HASKELL */ -} -#endif // PARALLEL_HASKELL || THREADED_RTS - -/* ---------------------------------------------------------------------------- * After running a thread... * ------------------------------------------------------------------------- */ @@ -1376,7 +1265,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) static void scheduleHandleThreadBlocked( StgTSO *t -#if !defined(GRAN) && !defined(DEBUG) +#if !defined(DEBUG) STG_UNUSED #endif ) @@ -1835,7 +1724,6 @@ forkProcess(HsStablePtr *entry } #else /* !FORKPROCESS_PRIMOP_SUPPORTED */ barf("forkProcess#: primop not supported on this platform, sorry!\n"); - return -1; #endif } @@ -2196,7 +2084,7 @@ initScheduler(void) initTaskManager(); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) initSparkPools(); #endif @@ -2233,9 +2121,7 @@ exitScheduler( { Task *task = NULL; - ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { @@ -2299,9 +2185,7 @@ performGC_(rtsBool force_major) // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the // suspended_ccalling_tasks queue. - ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); waitForReturnCapability(&task->cap,task); scheduleDoGC(task->cap,task,force_major); @@ -2411,13 +2295,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; - IF_PAR_DEBUG(verbose, - debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n", - tso->id, tso, tso->stack_size); - /* If we're debugging, just print out the top of the stack */ - printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, - tso->sp+64))); - unlockTSO(dest); unlockTSO(tso); @@ -2438,9 +2315,18 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) tso_size_w = tso_sizeW(tso); - if (tso_size_w < MBLOCK_SIZE_W || + if (tso_size_w < MBLOCK_SIZE_W || + // TSO is less than 2 mblocks (since the first mblock is + // shorter than MBLOCK_SIZE_W) + (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 || + // or TSO is not a whole number of megablocks (ensuring + // precondition of splitLargeBlock() below) + (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) || + // or TSO is smaller than the minimum stack size (rounded up) (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) + // or stack is using more than 1/4 of the available space { + // then do nothing return tso; } @@ -2489,7 +2375,9 @@ interruptStgRts(void) { sched_state = SCHED_INTERRUPTING; setContextSwitches(); +#if defined(THREADED_RTS) wakeUpRts(); +#endif } /* ----------------------------------------------------------------------------- @@ -2505,16 +2393,15 @@ interruptStgRts(void) will have interrupted any blocking system call in progress anyway. -------------------------------------------------------------------------- */ -void -wakeUpRts(void) -{ #if defined(THREADED_RTS) +void wakeUpRts(void) +{ // This forces the IO Manager thread to wakeup, which will // in turn ensure that some OS thread wakes up and runs the // scheduler loop, which will cause a GC and deadlock check. ioManagerWakeup(); -#endif } +#endif /* ----------------------------------------------------------------------------- * checkBlackHoles()