X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=b78f9d206f19321808246191a9395d96632679ce;hb=4f0f4342c0268e239fd8bb6bd98ad2583b3485dd;hp=ea85c838d5bbd8a90450d6aaf25cd9cd9cbe590f;hpb=6d16c4760494abbe54b449f6a9b7d51336a1e3c3;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index ea85c83..b78f9d2 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -707,12 +707,6 @@ run_thread: cap = myCapability(); #endif - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } - cap->r.rInHaskell = rtsFalse; // The TSO might have moved, eg. if it re-entered the RTS and a GC @@ -731,6 +725,12 @@ run_thread: #endif ACQUIRE_LOCK(&sched_mutex); + + // We have run some Haskell code: there might be blackhole-blocked + // threads to wake up now. + if ( blackhole_queue != END_TSO_QUEUE ) { + blackholes_need_checking = rtsTrue; + } #if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId());); @@ -840,8 +840,9 @@ scheduleCheckBlockedThreads(void) #if defined(RTS_SUPPORTS_THREADS) // We shouldn't be here... barf("schedule: awaitEvent() in threaded RTS"); -#endif +#else awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); +#endif } } @@ -1485,7 +1486,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop // if the nursery has only one block. + ACQUIRE_SM_LOCK bd = allocGroup( blocks ); + RELEASE_SM_LOCK cap->r.rNursery->n_blocks += blocks; // link the new group into the list @@ -1905,6 +1908,11 @@ scheduleDoGC( rtsBool force_major ) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // + // This does mean that there will be multiple entries in the + // thread->capability hash table for the current thread, but + // they will be removed as normal when the capabilities are + // released again. + // // Someone else is already trying to GC if (waiting_for_gc) return; @@ -1924,19 +1932,28 @@ scheduleDoGC( rtsBool force_major ) * atomically frames. When next scheduled they will try to * commit, this commit will fail and they will retry. */ - for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { - if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateTransaction (t -> trec)) { - IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); - - // strip the stack back to the ATOMICALLY_FRAME, aborting - // the (nested) transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - raiseAsync_(t, NULL, rtsTrue); - + { + StgTSO *next; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + #ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); #endif + } + } } } } @@ -2296,7 +2313,6 @@ StgTSO * createThread(nat size) #endif { - StgTSO *tso; nat stack_size; @@ -2527,8 +2543,8 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ -static void -scheduleThread_(StgTSO *tso) +void +scheduleThreadLocked(StgTSO *tso) { // The thread goes at the *end* of the run-queue, to avoid possible // starvation of any threads already on the queue. @@ -2540,7 +2556,7 @@ void scheduleThread(StgTSO* tso) { ACQUIRE_LOCK(&sched_mutex); - scheduleThread_(tso); + scheduleThreadLocked(tso); RELEASE_LOCK(&sched_mutex); } @@ -2681,9 +2697,62 @@ exitScheduler( void ) { interrupted = rtsTrue; shutting_down_scheduler = rtsTrue; + #if defined(RTS_SUPPORTS_THREADS) if (threadIsTask(osThreadId())) { taskStop(); } stopTaskManager(); + // + // What can we do here? There are a bunch of worker threads, it + // might be nice to let them exit cleanly. There may be some main + // threads in the run queue; we should let them return to their + // callers with an Interrupted state. We can't in general wait + // for all the running Tasks to stop, because some might be off in + // a C call that is blocked. + // + // Letting the run queue drain is the safest thing. That lets any + // main threads return that can return, and cleans up all the + // runnable threads. Then we grab all the Capabilities to stop + // anything unexpected happening while we shut down. + // + // ToDo: this doesn't let us get the time stats from the worker + // tasks, because they haven't called taskStop(). + // + ACQUIRE_LOCK(&sched_mutex); + { + nat i; + for (i = 1000; i > 0; i--) { + if (EMPTY_RUN_QUEUE()) { + IF_DEBUG(scheduler, sched_belch("run queue is empty")); + break; + } + IF_DEBUG(scheduler, sched_belch("yielding")); + RELEASE_LOCK(&sched_mutex); + prodWorker(); + yieldThread(); + ACQUIRE_LOCK(&sched_mutex); + } + } + +#ifdef SMP + { + Capability *cap; + int n_capabilities = RtsFlags.ParFlags.nNodes; + Capability *caps[n_capabilities]; + nat i; + + while (n_capabilities > 0) { + IF_DEBUG(scheduler, sched_belch("exitScheduler: grabbing all the capabilies (%d left)", n_capabilities)); + waitForReturnCapability(&sched_mutex, &cap); + n_capabilities--; + caps[n_capabilities] = cap; + } + } +#else + { + Capability *cap; + waitForReturnCapability(&sched_mutex, &cap); + } +#endif #endif } @@ -3226,6 +3295,8 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) void awakenBlockedQueueNoLock(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); } @@ -3234,6 +3305,8 @@ awakenBlockedQueueNoLock(StgTSO *tso) void awakenBlockedQueue(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm ACQUIRE_LOCK(&sched_mutex); while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); @@ -3592,6 +3665,7 @@ checkBlackHoles( void ) ASSERT(t->why_blocked == BlockedOnBlackHole); type = get_itbl(t->block_info.closure)->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) { + IF_DEBUG(sanity,checkTSO(t)); t = unblockOneLocked(t); *prev = t; any_woke_up = rtsTrue; @@ -3793,7 +3867,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgThunk *)allocate(sizeofW(StgThunk)+1); + raise = (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception;