X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;ds=sidebyside;f=ghc%2Frts%2FSchedule.c;h=d35bac5407d0075bb16d71c2bd79c36967740fe3;hb=b488b59b527511e379174949efc409e8235c8dc7;hp=ed76cb00cbc2dd3cb0b7b66fed464885adf79c76;hpb=0f3205e6c40575910d50bc2cc42020ccf55e07ba;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index ed76cb0..d35bac5 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -174,6 +174,9 @@ static StgTSO *suspended_ccalling_threads; /* flag set by signal handler to precipitate a context switch */ int context_switch = 0; +/* flag that tracks whether we have done any execution in this time slice. */ +nat recent_activity = ACTIVITY_YES; + /* if this flag is set as well, give up execution */ rtsBool interrupted = rtsFalse; @@ -285,7 +288,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, Capability *cap, StgTSO *t ); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); -static void scheduleDoGC(Capability *cap); +static void scheduleDoGC(rtsBool force_major); static void unblockThread(StgTSO *tso); static rtsBool checkBlackHoles(void); @@ -302,6 +305,7 @@ static void raiseAsync_(StgTSO *tso, StgClosure *exception, static void printThreadBlockage(StgTSO *tso); static void printThreadStatus(StgTSO *tso); +void printThreadQueue(StgTSO *tso); #if defined(PARALLEL_HASKELL) StgTSO * createSparkThread(rtsSpark spark); @@ -455,13 +459,12 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, CurrentTSO = event->tso; #endif - IF_DEBUG(scheduler, printAllThreads()); - #if defined(RTS_SUPPORTS_THREADS) // Yield the capability to higher-priority tasks if necessary. // if (cap != NULL) { - yieldCapability(&cap); + yieldCapability(&cap, + mainThread ? &mainThread->bound_thread_cond : NULL ); } // If we do not currently hold a capability, we wait for one @@ -474,6 +477,15 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // We now have a capability... #endif +#if 0 /* extra sanity checking */ + { + StgMainThread *m; + for (m = main_threads; m != NULL; m = m->link) { + ASSERT(get_itbl(m->tso)->type == TSO); + } + } +#endif + // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). @@ -616,21 +628,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, sched_belch("### thread %d bound to another OS thread", t->id)); // no, bound to a different Haskell thread: pass to that thread PUSH_ON_RUN_QUEUE(t); - passCapability(&m->bound_thread_cond); continue; } } else { if(mainThread != NULL) - // The thread we want to run is bound. + // The thread we want to run is unbound. { IF_DEBUG(scheduler, sched_belch("### this OS thread cannot run thread %d", t->id)); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread PUSH_ON_RUN_QUEUE(t); - passCapabilityToWorker(); continue; } } @@ -668,6 +678,8 @@ run_thread: errno = t->saved_errno; cap->r.rInHaskell = rtsTrue; + recent_activity = ACTIVITY_YES; + switch (prev_what_next) { case ThreadKilled: @@ -688,6 +700,13 @@ run_thread: barf("schedule: invalid what_next field"); } +#if defined(SMP) + // in SMP mode, we might return with a different capability than + // we started with, if the Haskell thread made a foreign call. So + // let's find out what our current Capability is: + cap = myCapability(); +#endif + // We have run some Haskell code: there might be blackhole-blocked // threads to wake up now. if ( blackhole_queue != END_TSO_QUEUE ) { @@ -741,7 +760,6 @@ run_thread: case ThreadBlocked: scheduleHandleThreadBlocked(t); - threadPaused(t); break; case ThreadFinished: @@ -753,7 +771,7 @@ run_thread: } if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } - if (ready_to_gc) { scheduleDoGC(cap); } + if (ready_to_gc) { scheduleDoGC(rtsFalse); } } /* end of while() */ IF_PAR_DEBUG(verbose, @@ -822,8 +840,9 @@ scheduleCheckBlockedThreads(void) #if defined(RTS_SUPPORTS_THREADS) // We shouldn't be here... barf("schedule: awaitEvent() in threaded RTS"); -#endif +#else awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); +#endif } } @@ -848,8 +867,14 @@ scheduleCheckBlackHoles( void ) * ------------------------------------------------------------------------- */ static void -scheduleDetectDeadlock(void) +scheduleDetectDeadlock() { + +#if defined(PARALLEL_HASKELL) + // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL + return; +#endif + /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the @@ -858,7 +883,16 @@ scheduleDetectDeadlock(void) */ if ( EMPTY_THREAD_QUEUES() ) { -#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) + /* + * In the threaded RTS, we only check for deadlock if there + * has been no activity in a complete timeslice. This means + * we won't eagerly start a full GC just because we don't have + * any threads to run currently. + */ + if (recent_activity != ACTIVITY_INACTIVE) return; +#endif + IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); // Garbage collection can release some new threads due to @@ -866,10 +900,12 @@ scheduleDetectDeadlock(void) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - GarbageCollect(GetRoots,rtsTrue); + + scheduleDoGC( rtsTrue/*force major GC*/ ); + recent_activity = ACTIVITY_DONE_GC; if ( !EMPTY_RUN_QUEUE() ) return; -#if defined(RTS_USER_SIGNALS) +#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS) /* If we have user-installed signal handlers, then wait * for signals to arrive rather then bombing out with a * deadlock. @@ -891,6 +927,7 @@ scheduleDetectDeadlock(void) } #endif +#if !defined(RTS_SUPPORTS_THREADS) /* Probably a real deadlock. Send the current main thread the * Deadlock exception (or in the SMP build, send *all* main * threads the deadlock exception, since none of them can make @@ -910,11 +947,6 @@ scheduleDetectDeadlock(void) barf("deadlock: main thread blocked in a strange way"); } } - -#elif defined(RTS_SUPPORTS_THREADS) - // ToDo: add deadlock detection in threaded RTS -#elif defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif } } @@ -1449,12 +1481,13 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", (long)t->id, whatNext_strs[t->what_next], blocks)); - // don't do this if it would push us over the - // alloc_blocks_lim limit; we'll GC first. - if (alloc_blocks + blocks < alloc_blocks_lim) { + // don't do this if the nursery is (nearly) full, we'll GC first. + if (cap->r.rCurrentNursery->link != NULL || + cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop + // if the nursery has only one block. - alloc_blocks += blocks; bd = allocGroup( blocks ); + cap->r.rNursery->n_blocks += blocks; // link the new group into the list bd->link = cap->r.rCurrentNursery; @@ -1465,7 +1498,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) #if !defined(SMP) ASSERT(g0s0->blocks == cap->r.rCurrentNursery && g0s0 == cap->r.rNursery); - g0s0->blocks = bd; #endif cap->r.rNursery->blocks = bd; } @@ -1481,20 +1513,15 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { bdescr *x; for (x = bd; x < bd + blocks; x++) { - x->step = g0s0; + x->step = cap->r.rNursery; x->gen_no = 0; x->flags = 0; } } -#if !defined(SMP) - // don't forget to update the block count in g0s0. - g0s0->n_blocks += blocks; - // This assert can be a killer if the app is doing lots // of large block allocations. - ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); -#endif + IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery)); // now update the nursery to point to the new block cap->r.rCurrentNursery = bd; @@ -1508,14 +1535,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - /* make all the running tasks block on a condition variable, - * maybe set context_switch and wait till they all pile in, - * then have them wait on a GC condition variable. - */ IF_DEBUG(scheduler, debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", (long)t->id, whatNext_strs[t->what_next])); - threadPaused(t); #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); #elif defined(PARALLEL_HASKELL) @@ -1547,7 +1569,6 @@ scheduleHandleStackOverflow( StgTSO *t) /* just adjust the stack for this thread, then pop it back * on the run queue. */ - threadPaused(t); { /* enlarge the stack */ StgTSO *new_t = threadStackOverflow(t); @@ -1606,8 +1627,6 @@ scheduleHandleYield( StgTSO *t, nat prev_what_next ) return rtsTrue; } - threadPaused(t); - #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); @@ -1680,12 +1699,19 @@ scheduleHandleThreadBlocked( StgTSO *t emitSchedule = rtsTrue; #else /* !GRAN */ - /* don't need to do anything. Either the thread is blocked on - * I/O, in which case we'll have called addToBlockedQueue - * previously, or it's blocked on an MVar or Blackhole, in which - * case it'll be on the relevant queue already. - */ + + // We don't need to do anything. The thread is blocked, and it + // has tidied up its stack and placed itself on whatever queue + // it needs to be on. + +#if !defined(SMP) ASSERT(t->why_blocked != NotBlocked); + // This might not be true under SMP: we don't have + // exclusive access to this TSO, so someone might have + // woken it up by now. This actually happens: try + // conc023 +RTS -N2. +#endif + IF_DEBUG(scheduler, debugBelch("--<< thread %d (%s) stopped: ", t->id, whatNext_strs[t->what_next]); @@ -1802,6 +1828,7 @@ scheduleHandleThreadFinished( StgMainThread *mainThread removeThreadLabel((StgWord)mainThread->tso->id); #endif if (mainThread->prev == NULL) { + ASSERT(mainThread == main_threads); main_threads = mainThread->link; } else { mainThread->prev->link = mainThread->link; @@ -1857,10 +1884,11 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) * -------------------------------------------------------------------------- */ static void -scheduleDoGC( Capability *cap STG_UNUSED ) +scheduleDoGC( rtsBool force_major ) { StgTSO *t; #ifdef SMP + Capability *cap; static rtsBool waiting_for_gc; int n_capabilities = RtsFlags.ParFlags.nNodes - 1; // subtract one because we're already holding one. @@ -1878,12 +1906,16 @@ scheduleDoGC( Capability *cap STG_UNUSED ) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // + // This does mean that there will be multiple entries in the + // thread->capability hash table for the current thread, but + // they will be removed as normal when the capabilities are + // released again. + // // Someone else is already trying to GC if (waiting_for_gc) return; waiting_for_gc = rtsTrue; - caps[n_capabilities] = cap; while (n_capabilities > 0) { IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities)); waitForReturnCapability(&sched_mutex, &cap); @@ -1898,19 +1930,28 @@ scheduleDoGC( Capability *cap STG_UNUSED ) * atomically frames. When next scheduled they will try to * commit, this commit will fail and they will retry. */ - for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { - if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateTransaction (t -> trec)) { - IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); - - // strip the stack back to the ATOMICALLY_FRAME, aborting - // the (nested) transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - raiseAsync_(t, NULL, rtsTrue); - + { + StgTSO *next; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + #ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); #endif + } + } } } } @@ -1918,6 +1959,8 @@ scheduleDoGC( Capability *cap STG_UNUSED ) // so this happens periodically: scheduleCheckBlackHoles(); + IF_DEBUG(scheduler, printAllThreads()); + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1926,7 +1969,7 @@ scheduleDoGC( Capability *cap STG_UNUSED ) #if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif - GarbageCollect(GetRoots,rtsFalse); + GarbageCollect(GetRoots, force_major); #if defined(SMP) { @@ -1958,7 +2001,7 @@ scheduleDoGC( Capability *cap STG_UNUSED ) StgBool rtsSupportsBoundThreads(void) { -#ifdef THREADED_RTS +#if defined(RTS_SUPPORTS_THREADS) return rtsTrue; #else return rtsFalse; @@ -1972,7 +2015,7 @@ rtsSupportsBoundThreads(void) StgBool isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) { -#ifdef THREADED_RTS +#if defined(RTS_SUPPORTS_THREADS) return (tso->main != NULL); #endif return rtsFalse; @@ -2066,8 +2109,12 @@ deleteAllThreads ( void ) StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->global_link; - deleteThread(t); + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(t); + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2952,7 +2999,7 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) #endif #if defined(GRAN) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgTSO *tso; @@ -2992,7 +3039,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) tso->id, tso)); } #elif defined(PARALLEL_HASKELL) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgBlockingQueueElement *next; @@ -3038,7 +3085,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) } #else /* !GRAN && !PARALLEL_HASKELL */ -static StgTSO * +StgTSO * unblockOneLocked(StgTSO *tso) { StgTSO *next; @@ -3194,6 +3241,8 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) void awakenBlockedQueueNoLock(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); } @@ -3202,6 +3251,8 @@ awakenBlockedQueueNoLock(StgTSO *tso) void awakenBlockedQueue(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm ACQUIRE_LOCK(&sched_mutex); while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); @@ -3220,6 +3271,11 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; + threadRunnable(); + /* ToDo: if invoked from a signal handler, this threadRunnable + * only works if there's another thread (not this one) waiting to + * be woken up. + */ } /* ----------------------------------------------------------------------------- @@ -3343,6 +3399,12 @@ unblockThread(StgTSO *tso) blocked_queue_tl = (StgTSO *)prev; } } +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif goto done; } } @@ -3477,6 +3539,12 @@ unblockThread(StgTSO *tso) blocked_queue_tl = prev; } } +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif goto done; } } @@ -3739,12 +3807,12 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) #ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)frame; #endif - StgClosure *raise; + StgThunk *raise; // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); + raise = (StgThunk *)allocate(sizeofW(StgThunk)+1); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -3849,7 +3917,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) StgWord raiseExceptionHelper (StgTSO *tso, StgClosure *exception) { - StgClosure *raise_closure = NULL; + StgThunk *raise_closure = NULL; StgPtr p, next; StgRetInfoTable *info; // @@ -3886,11 +3954,11 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE); + (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure); + UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure); p = next; continue; @@ -4025,10 +4093,10 @@ printThreadBlockage(StgTSO *tso) { switch (tso->why_blocked) { case BlockedOnRead: - debugBelch("is blocked on read from fd %ld", tso->block_info.fd); + debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd)); break; case BlockedOnWrite: - debugBelch("is blocked on write to fd %ld", tso->block_info.fd); + debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd)); break; #if defined(mingw32_HOST_OS) case BlockedOnDoProc: @@ -4036,10 +4104,10 @@ printThreadBlockage(StgTSO *tso) break; #endif case BlockedOnDelay: - debugBelch("is blocked until %ld", tso->block_info.target); + debugBelch("is blocked until %ld", (long)(tso->block_info.target)); break; case BlockedOnMVar: - debugBelch("is blocked on an MVar"); + debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; case BlockedOnException: debugBelch("is blocked on delivering an exception to thread %d", @@ -4112,21 +4180,45 @@ printAllThreads(void) debugBelch("all threads:\n"); # endif - for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - debugBelch("\tthread %d @ %p ", t->id, (void *)t); + for (t = all_threads; t != END_TSO_QUEUE; ) { + debugBelch("\tthread %4d @ %p ", t->id, (void *)t); #if defined(DEBUG) { void *label = lookupThreadLabel(t->id); if (label) debugBelch("[\"%s\"] ",(char *)label); } #endif - printThreadStatus(t); - debugBelch("\n"); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + t = t->link; + } else { + printThreadStatus(t); + debugBelch("\n"); + t = t->global_link; + } } } - + #ifdef DEBUG +// useful from gdb +void +printThreadQueue(StgTSO *t) +{ + nat i = 0; + for (; t != END_TSO_QUEUE; t = t->link) { + debugBelch("\tthread %d @ %p ", t->id, (void *)t); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + } else { + printThreadStatus(t); + debugBelch("\n"); + } + i++; + } + debugBelch("%d threads on queue\n", i); +} + /* Print a whole blocking queue attached to node (debugging only). */