X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=2f94cf0beb407f896f5f344ed355cd43fd7c9ebb;hb=674ecfc49b2fc03f95ec85a18d266e9fd8ee3dc4;hp=036c5b0ebf09c7b78e5149f71d1bc3668ab69289;hpb=4ab216140652b1ebdc011bba06f77cd05c614b91;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 036c5b0..2f94cf0 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -288,7 +288,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, Capability *cap, StgTSO *t ); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); -static void scheduleDoGC(Capability *cap); +static void scheduleDoGC(rtsBool force_major); static void unblockThread(StgTSO *tso); static rtsBool checkBlackHoles(void); @@ -305,6 +305,7 @@ static void raiseAsync_(StgTSO *tso, StgClosure *exception, static void printThreadBlockage(StgTSO *tso); static void printThreadStatus(StgTSO *tso); +void printThreadQueue(StgTSO *tso); #if defined(PARALLEL_HASKELL) StgTSO * createSparkThread(rtsSpark spark); @@ -458,13 +459,12 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, CurrentTSO = event->tso; #endif - IF_DEBUG(scheduler, printAllThreads()); - #if defined(RTS_SUPPORTS_THREADS) // Yield the capability to higher-priority tasks if necessary. // if (cap != NULL) { - yieldCapability(&cap); + yieldCapability(&cap, + mainThread ? &mainThread->bound_thread_cond : NULL ); } // If we do not currently hold a capability, we wait for one @@ -476,7 +476,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // We now have a capability... #endif - + #if 0 /* extra sanity checking */ { StgMainThread *m; @@ -628,21 +628,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, sched_belch("### thread %d bound to another OS thread", t->id)); // no, bound to a different Haskell thread: pass to that thread PUSH_ON_RUN_QUEUE(t); - passCapability(&m->bound_thread_cond); continue; } } else { if(mainThread != NULL) - // The thread we want to run is bound. + // The thread we want to run is unbound. { IF_DEBUG(scheduler, sched_belch("### this OS thread cannot run thread %d", t->id)); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread PUSH_ON_RUN_QUEUE(t); - passCapabilityToWorker(); continue; } } @@ -702,11 +700,12 @@ run_thread: barf("schedule: invalid what_next field"); } - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } +#if defined(SMP) + // in SMP mode, we might return with a different capability than + // we started with, if the Haskell thread made a foreign call. So + // let's find out what our current Capability is: + cap = myCapability(); +#endif cap->r.rInHaskell = rtsFalse; @@ -726,6 +725,12 @@ run_thread: #endif ACQUIRE_LOCK(&sched_mutex); + + // We have run some Haskell code: there might be blackhole-blocked + // threads to wake up now. + if ( blackhole_queue != END_TSO_QUEUE ) { + blackholes_need_checking = rtsTrue; + } #if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId());); @@ -755,7 +760,6 @@ run_thread: case ThreadBlocked: scheduleHandleThreadBlocked(t); - threadPaused(t); break; case ThreadFinished: @@ -767,7 +771,7 @@ run_thread: } if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } - if (ready_to_gc) { scheduleDoGC(cap); } + if (ready_to_gc) { scheduleDoGC(rtsFalse); } } /* end of while() */ IF_PAR_DEBUG(verbose, @@ -836,8 +840,9 @@ scheduleCheckBlockedThreads(void) #if defined(RTS_SUPPORTS_THREADS) // We shouldn't be here... barf("schedule: awaitEvent() in threaded RTS"); -#endif +#else awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); +#endif } } @@ -862,7 +867,7 @@ scheduleCheckBlackHoles( void ) * ------------------------------------------------------------------------- */ static void -scheduleDetectDeadlock(void) +scheduleDetectDeadlock() { #if defined(PARALLEL_HASKELL) @@ -895,7 +900,8 @@ scheduleDetectDeadlock(void) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - GarbageCollect(GetRoots,rtsTrue); + + scheduleDoGC( rtsTrue/*force major GC*/ ); recent_activity = ACTIVITY_DONE_GC; if ( !EMPTY_RUN_QUEUE() ) return; @@ -1475,12 +1481,13 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", (long)t->id, whatNext_strs[t->what_next], blocks)); - // don't do this if it would push us over the - // alloc_blocks_lim limit; we'll GC first. - if (alloc_blocks + blocks < alloc_blocks_lim) { + // don't do this if the nursery is (nearly) full, we'll GC first. + if (cap->r.rCurrentNursery->link != NULL || + cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop + // if the nursery has only one block. - alloc_blocks += blocks; bd = allocGroup( blocks ); + cap->r.rNursery->n_blocks += blocks; // link the new group into the list bd->link = cap->r.rCurrentNursery; @@ -1491,7 +1498,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) #if !defined(SMP) ASSERT(g0s0->blocks == cap->r.rCurrentNursery && g0s0 == cap->r.rNursery); - g0s0->blocks = bd; #endif cap->r.rNursery->blocks = bd; } @@ -1507,20 +1513,15 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { bdescr *x; for (x = bd; x < bd + blocks; x++) { - x->step = g0s0; + x->step = cap->r.rNursery; x->gen_no = 0; x->flags = 0; } } -#if !defined(SMP) - // don't forget to update the block count in g0s0. - g0s0->n_blocks += blocks; - // This assert can be a killer if the app is doing lots // of large block allocations. - ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); -#endif + IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery)); // now update the nursery to point to the new block cap->r.rCurrentNursery = bd; @@ -1534,14 +1535,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - /* make all the running tasks block on a condition variable, - * maybe set context_switch and wait till they all pile in, - * then have them wait on a GC condition variable. - */ IF_DEBUG(scheduler, debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", (long)t->id, whatNext_strs[t->what_next])); - threadPaused(t); #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); #elif defined(PARALLEL_HASKELL) @@ -1573,7 +1569,6 @@ scheduleHandleStackOverflow( StgTSO *t) /* just adjust the stack for this thread, then pop it back * on the run queue. */ - threadPaused(t); { /* enlarge the stack */ StgTSO *new_t = threadStackOverflow(t); @@ -1632,8 +1627,6 @@ scheduleHandleYield( StgTSO *t, nat prev_what_next ) return rtsTrue; } - threadPaused(t); - #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); @@ -1706,12 +1699,19 @@ scheduleHandleThreadBlocked( StgTSO *t emitSchedule = rtsTrue; #else /* !GRAN */ - /* don't need to do anything. Either the thread is blocked on - * I/O, in which case we'll have called addToBlockedQueue - * previously, or it's blocked on an MVar or Blackhole, in which - * case it'll be on the relevant queue already. - */ + + // We don't need to do anything. The thread is blocked, and it + // has tidied up its stack and placed itself on whatever queue + // it needs to be on. + +#if !defined(SMP) ASSERT(t->why_blocked != NotBlocked); + // This might not be true under SMP: we don't have + // exclusive access to this TSO, so someone might have + // woken it up by now. This actually happens: try + // conc023 +RTS -N2. +#endif + IF_DEBUG(scheduler, debugBelch("--<< thread %d (%s) stopped: ", t->id, whatNext_strs[t->what_next]); @@ -1884,10 +1884,11 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) * -------------------------------------------------------------------------- */ static void -scheduleDoGC( Capability *cap STG_UNUSED ) +scheduleDoGC( rtsBool force_major ) { StgTSO *t; #ifdef SMP + Capability *cap; static rtsBool waiting_for_gc; int n_capabilities = RtsFlags.ParFlags.nNodes - 1; // subtract one because we're already holding one. @@ -1905,12 +1906,16 @@ scheduleDoGC( Capability *cap STG_UNUSED ) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // + // This does mean that there will be multiple entries in the + // thread->capability hash table for the current thread, but + // they will be removed as normal when the capabilities are + // released again. + // // Someone else is already trying to GC if (waiting_for_gc) return; waiting_for_gc = rtsTrue; - caps[n_capabilities] = cap; while (n_capabilities > 0) { IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities)); waitForReturnCapability(&sched_mutex, &cap); @@ -1925,19 +1930,28 @@ scheduleDoGC( Capability *cap STG_UNUSED ) * atomically frames. When next scheduled they will try to * commit, this commit will fail and they will retry. */ - for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { - if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateTransaction (t -> trec)) { - IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); - - // strip the stack back to the ATOMICALLY_FRAME, aborting - // the (nested) transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - raiseAsync_(t, NULL, rtsTrue); - + { + StgTSO *next; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + #ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); #endif + } + } } } } @@ -1945,6 +1959,8 @@ scheduleDoGC( Capability *cap STG_UNUSED ) // so this happens periodically: scheduleCheckBlackHoles(); + IF_DEBUG(scheduler, printAllThreads()); + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1953,7 +1969,7 @@ scheduleDoGC( Capability *cap STG_UNUSED ) #if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif - GarbageCollect(GetRoots,rtsFalse); + GarbageCollect(GetRoots, force_major); #if defined(SMP) { @@ -2093,8 +2109,12 @@ deleteAllThreads ( void ) StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->global_link; - deleteThread(t); + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(t); + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2295,6 +2315,8 @@ createThread(nat size) StgTSO *tso; nat stack_size; + ACQUIRE_LOCK(&sched_mutex); + /* First check whether we should create a thread at all */ #if defined(PARALLEL_HASKELL) /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ @@ -2302,6 +2324,7 @@ createThread(nat size) threadsIgnored++; debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", RtsFlags.ParFlags.maxThreads, advisory_thread_count); + RELEASE_LOCK(&sched_mutex); return END_TSO_QUEUE; } threadsCreated++; @@ -2452,6 +2475,7 @@ createThread(nat size) IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", (long)tso->id, (long)tso->stack_size)); #endif + RELEASE_LOCK(&sched_mutex); return tso; } @@ -2979,7 +3003,7 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) #endif #if defined(GRAN) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgTSO *tso; @@ -3019,7 +3043,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) tso->id, tso)); } #elif defined(PARALLEL_HASKELL) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgBlockingQueueElement *next; @@ -3065,7 +3089,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) } #else /* !GRAN && !PARALLEL_HASKELL */ -static StgTSO * +StgTSO * unblockOneLocked(StgTSO *tso) { StgTSO *next; @@ -3221,6 +3245,8 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) void awakenBlockedQueueNoLock(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); } @@ -3229,6 +3255,8 @@ awakenBlockedQueueNoLock(StgTSO *tso) void awakenBlockedQueue(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm ACQUIRE_LOCK(&sched_mutex); while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); @@ -4069,10 +4097,10 @@ printThreadBlockage(StgTSO *tso) { switch (tso->why_blocked) { case BlockedOnRead: - debugBelch("is blocked on read from fd %ld", tso->block_info.fd); + debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd)); break; case BlockedOnWrite: - debugBelch("is blocked on write to fd %ld", tso->block_info.fd); + debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd)); break; #if defined(mingw32_HOST_OS) case BlockedOnDoProc: @@ -4080,10 +4108,10 @@ printThreadBlockage(StgTSO *tso) break; #endif case BlockedOnDelay: - debugBelch("is blocked until %ld", tso->block_info.target); + debugBelch("is blocked until %ld", (long)(tso->block_info.target)); break; case BlockedOnMVar: - debugBelch("is blocked on an MVar"); + debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; case BlockedOnException: debugBelch("is blocked on delivering an exception to thread %d", @@ -4156,21 +4184,45 @@ printAllThreads(void) debugBelch("all threads:\n"); # endif - for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - debugBelch("\tthread %d @ %p ", t->id, (void *)t); + for (t = all_threads; t != END_TSO_QUEUE; ) { + debugBelch("\tthread %4d @ %p ", t->id, (void *)t); #if defined(DEBUG) { void *label = lookupThreadLabel(t->id); if (label) debugBelch("[\"%s\"] ",(char *)label); } #endif - printThreadStatus(t); - debugBelch("\n"); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + t = t->link; + } else { + printThreadStatus(t); + debugBelch("\n"); + t = t->global_link; + } } } - + #ifdef DEBUG +// useful from gdb +void +printThreadQueue(StgTSO *t) +{ + nat i = 0; + for (; t != END_TSO_QUEUE; t = t->link) { + debugBelch("\tthread %d @ %p ", t->id, (void *)t); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + } else { + printThreadStatus(t); + debugBelch("\n"); + } + i++; + } + debugBelch("%d threads on queue\n", i); +} + /* Print a whole blocking queue attached to node (debugging only). */