X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=51a8d2a842466069fbed9adbb82bd426cb71198a;hb=dd56e9ab4544e83d27532a8d9058140bfe81825c;hp=636b5179ad9668d857ea22e24ea2ffdad095cf26;hpb=31caec794c3978d55d79f715f21fb72948c9f300;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index 636b517..51a8d2a 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -158,11 +158,7 @@ static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); static void schedulePushWork(Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) -static rtsBool scheduleGetRemoteWork(Capability *cap); -static void scheduleSendPendingMessages(void); -#endif -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); @@ -208,18 +204,8 @@ static char *whatNext_strs[] = { STATIC_INLINE void addToRunQueue( Capability *cap, StgTSO *t ) { -#if defined(PARALLEL_HASKELL) - if (RtsFlags.ParFlags.doFairScheduling) { - // this does round-robin scheduling; good for concurrency - appendToRunQueue(cap,t); - } else { - // this does unfair scheduling; good for parallelism - pushOnRunQueue(cap,t); - } -#else // this does round-robin scheduling; good for concurrency appendToRunQueue(cap,t); -#endif } /* --------------------------------------------------------------------------- @@ -264,9 +250,6 @@ schedule (Capability *initialCapability, Task *task) StgTSO *t; Capability *cap; StgThreadReturnCode ret; -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; -#endif nat prev_what_next; rtsBool ready_to_gc; #if defined(THREADED_RTS) @@ -285,7 +268,9 @@ schedule (Capability *initialCapability, Task *task) if (running_finalizers) { errorBelch("error: a C finalizer called back into Haskell.\n" - " use Foreign.Concurrent.newForeignPtr for Haskell finalizers."); + " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n" + " To create finalizers that may call back into Haskll, use\n" + " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr."); stg_exit(EXIT_FAILURE); } @@ -294,13 +279,7 @@ schedule (Capability *initialCapability, Task *task) // ----------------------------------------------------------- // Scheduler loop starts here: -#if defined(PARALLEL_HASKELL) -#define TERMINATION_CONDITION (!receivedFinish) -#else -#define TERMINATION_CONDITION rtsTrue -#endif - - while (TERMINATION_CONDITION) { + while (1) { // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -382,21 +361,6 @@ schedule (Capability *initialCapability, Task *task) (pushes threads, wakes up idle capabilities for stealing) */ schedulePushWork(cap,task); -#if defined(PARALLEL_HASKELL) - /* since we perform a blocking receive and continue otherwise, - either we never reach here or we definitely have work! */ - // from here: non-empty run queue - ASSERT(!emptyRunQueue(cap)); - - if (PacketsWaiting()) { /* now process incoming messages, if any - pending... - - CAUTION: scheduleGetRemoteWork called - above, waits for messages as well! */ - processMessages(cap, &receivedFinish); - } -#endif // PARALLEL_HASKELL: non-empty run queue! - scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) @@ -690,28 +654,9 @@ scheduleFindWork (Capability *cap) scheduleCheckBlockedThreads(cap); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } #endif - -#if defined(PARALLEL_HASKELL) - // if messages have been buffered... - scheduleSendPendingMessages(); -#endif - -#if defined(PARALLEL_HASKELL) - if (emptyRunQueue(cap)) { - receivedFinish = scheduleGetRemoteWork(cap); - continue; // a new round, (hopefully) with new work - /* - in GUM, this a) sends out a FISH and returns IF no fish is - out already - b) (blocking) awaits and receives messages - - in Eden, this is only the blocking receive, as b) in GUM. - */ - } -#endif } #if defined(THREADED_RTS) @@ -858,7 +803,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); - postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no); + postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no); if (t->bound) { t->bound->cap = free_caps[i]; } t->cap = free_caps[i]; @@ -881,6 +826,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, spark = tryStealSpark(cap->sparks); if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); + + postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no); + newSpark(&(free_caps[i]->r), spark); } } @@ -995,12 +943,6 @@ scheduleCheckBlackHoles (Capability *cap) static void scheduleDetectDeadlock (Capability *cap, Task *task) { - -#if defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL - return; -#endif - /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the @@ -1105,7 +1047,7 @@ scheduleSendPendingMessages(void) * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap) { @@ -1118,51 +1060,6 @@ scheduleActivateSpark(Capability *cap) #endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- - * Get work from a remote node (PARALLEL_HASKELL only) - * ------------------------------------------------------------------------- */ - -#if defined(PARALLEL_HASKELL) -static rtsBool /* return value used in PARALLEL_HASKELL only */ -scheduleGetRemoteWork (Capability *cap STG_UNUSED) -{ -#if defined(PARALLEL_HASKELL) - rtsBool receivedFinish = rtsFalse; - - // idle() , i.e. send all buffers, wait for work - if (RtsFlags.ParFlags.BufferTime) { - IF_PAR_DEBUG(verbose, - debugBelch("...send all pending data,")); - { - nat i; - for (i=1; i<=nPEs; i++) - sendImmediately(i); // send all messages away immediately - } - } - - /* this would be the place for fishing in GUM... - - if (no-earlier-fish-around) - sendFish(choosePe()); - */ - - // Eden:just look for incoming messages (blocking receive) - IF_PAR_DEBUG(verbose, - debugBelch("...wait for incoming messages...\n")); - processMessages(cap, &receivedFinish); // blocking receive... - - - return receivedFinish; - // reenter scheduling look after having received something - -#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */ - - return rtsFalse; /* return value unused in THREADED_RTS */ - -#endif /* PARALLEL_HASKELL */ -} -#endif // PARALLEL_HASKELL || THREADED_RTS - -/* ---------------------------------------------------------------------------- * After running a thread... * ------------------------------------------------------------------------- */ @@ -1373,7 +1270,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) static void scheduleHandleThreadBlocked( StgTSO *t -#if !defined(GRAN) && !defined(DEBUG) +#if !defined(DEBUG) STG_UNUSED #endif ) @@ -2193,7 +2090,7 @@ initScheduler(void) initTaskManager(); -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) +#if defined(THREADED_RTS) initSparkPools(); #endif @@ -2230,9 +2127,7 @@ exitScheduler( { Task *task = NULL; - ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { @@ -2296,9 +2191,7 @@ performGC_(rtsBool force_major) // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the // suspended_ccalling_tasks queue. - ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); waitForReturnCapability(&task->cap,task); scheduleDoGC(task->cap,task,force_major); @@ -2408,13 +2301,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso) tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; - IF_PAR_DEBUG(verbose, - debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n", - tso->id, tso, tso->stack_size); - /* If we're debugging, just print out the top of the stack */ - printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, - tso->sp+64))); - unlockTSO(dest); unlockTSO(tso); @@ -2435,9 +2321,18 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) tso_size_w = tso_sizeW(tso); - if (tso_size_w < MBLOCK_SIZE_W || + if (tso_size_w < MBLOCK_SIZE_W || + // TSO is less than 2 mblocks (since the first mblock is + // shorter than MBLOCK_SIZE_W) + (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 || + // or TSO is not a whole number of megablocks (ensuring + // precondition of splitLargeBlock() below) + (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) || + // or TSO is smaller than the minimum stack size (rounded up) (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) + // or stack is using more than 1/4 of the available space { + // then do nothing return tso; } @@ -2544,6 +2439,10 @@ checkBlackHoles (Capability *cap) prev = &blackhole_queue; t = blackhole_queue; while (t != END_TSO_QUEUE) { + if (t->what_next == ThreadRelocated) { + t = t->_link; + continue; + } ASSERT(t->why_blocked == BlockedOnBlackHole); type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) {