X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=7ba2fc06b1c2af01019ebb618a9f60848338b68b;hb=3595da95b2ca0d60c9100f77541b6ce36e49363c;hp=0503a1a8b580120d8875b15bd6d55a4bc25a93cc;hpb=211f35e061e70a2b101cf1062549c943981d67bf;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 0503a1a..7ba2fc0 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -174,6 +174,9 @@ static StgTSO *suspended_ccalling_threads; /* flag set by signal handler to precipitate a context switch */ int context_switch = 0; +/* flag that tracks whether we have done any execution in this time slice. */ +nat recent_activity = ACTIVITY_YES; + /* if this flag is set as well, give up execution */ rtsBool interrupted = rtsFalse; @@ -455,8 +458,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, CurrentTSO = event->tso; #endif - IF_DEBUG(scheduler, printAllThreads()); - #if defined(RTS_SUPPORTS_THREADS) // Yield the capability to higher-priority tasks if necessary. // @@ -473,6 +474,15 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // We now have a capability... #endif + +#if 0 /* extra sanity checking */ + { + StgMainThread *m; + for (m = main_threads; m != NULL; m = m->link) { + ASSERT(get_itbl(m->tso)->type == TSO); + } + } +#endif // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -668,6 +678,8 @@ run_thread: errno = t->saved_errno; cap->r.rInHaskell = rtsTrue; + recent_activity = ACTIVITY_YES; + switch (prev_what_next) { case ThreadKilled: @@ -688,6 +700,13 @@ run_thread: barf("schedule: invalid what_next field"); } +#if defined(SMP) + // in SMP mode, we might return with a different capability than + // we started with, if the Haskell thread made a foreign call. So + // let's find out what our current Capability is: + cap = myCapability(); +#endif + // We have run some Haskell code: there might be blackhole-blocked // threads to wake up now. if ( blackhole_queue != END_TSO_QUEUE ) { @@ -741,7 +760,6 @@ run_thread: case ThreadBlocked: scheduleHandleThreadBlocked(t); - threadPaused(t); break; case ThreadFinished: @@ -850,6 +868,12 @@ scheduleCheckBlackHoles( void ) static void scheduleDetectDeadlock(void) { + +#if defined(PARALLEL_HASKELL) + // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL + return; +#endif + /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the @@ -858,7 +882,16 @@ scheduleDetectDeadlock(void) */ if ( EMPTY_THREAD_QUEUES() ) { -#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) + /* + * In the threaded RTS, we only check for deadlock if there + * has been no activity in a complete timeslice. This means + * we won't eagerly start a full GC just because we don't have + * any threads to run currently. + */ + if (recent_activity != ACTIVITY_INACTIVE) return; +#endif + IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); // Garbage collection can release some new threads due to @@ -866,10 +899,12 @@ scheduleDetectDeadlock(void) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. + GarbageCollect(GetRoots,rtsTrue); + recent_activity = ACTIVITY_DONE_GC; if ( !EMPTY_RUN_QUEUE() ) return; -#if defined(RTS_USER_SIGNALS) +#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS) /* If we have user-installed signal handlers, then wait * for signals to arrive rather then bombing out with a * deadlock. @@ -891,6 +926,7 @@ scheduleDetectDeadlock(void) } #endif +#if !defined(RTS_SUPPORTS_THREADS) /* Probably a real deadlock. Send the current main thread the * Deadlock exception (or in the SMP build, send *all* main * threads the deadlock exception, since none of them can make @@ -900,6 +936,7 @@ scheduleDetectDeadlock(void) StgMainThread *m; m = main_threads; switch (m->tso->why_blocked) { + case BlockedOnSTM: case BlockedOnBlackHole: case BlockedOnException: case BlockedOnMVar: @@ -909,11 +946,6 @@ scheduleDetectDeadlock(void) barf("deadlock: main thread blocked in a strange way"); } } - -#elif defined(RTS_SUPPORTS_THREADS) - // ToDo: add deadlock detection in threaded RTS -#elif defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif } } @@ -1448,12 +1480,13 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", (long)t->id, whatNext_strs[t->what_next], blocks)); - // don't do this if it would push us over the - // alloc_blocks_lim limit; we'll GC first. - if (alloc_blocks + blocks < alloc_blocks_lim) { + // don't do this if the nursery is (nearly) full, we'll GC first. + if (cap->r.rCurrentNursery->link != NULL || + cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop + // if the nursery has only one block. - alloc_blocks += blocks; bd = allocGroup( blocks ); + cap->r.rNursery->n_blocks += blocks; // link the new group into the list bd->link = cap->r.rCurrentNursery; @@ -1463,8 +1496,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } else { #if !defined(SMP) ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0->blocks == cap->r.rNursery); - g0s0->blocks = bd; + g0s0 == cap->r.rNursery); #endif cap->r.rNursery->blocks = bd; } @@ -1480,20 +1512,15 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { bdescr *x; for (x = bd; x < bd + blocks; x++) { - x->step = g0s0; + x->step = cap->r.rNursery; x->gen_no = 0; x->flags = 0; } } -#if !defined(SMP) - // don't forget to update the block count in g0s0. - g0s0->n_blocks += blocks; - // This assert can be a killer if the app is doing lots // of large block allocations. - ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); -#endif + IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery)); // now update the nursery to point to the new block cap->r.rCurrentNursery = bd; @@ -1507,14 +1534,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - /* make all the running tasks block on a condition variable, - * maybe set context_switch and wait till they all pile in, - * then have them wait on a GC condition variable. - */ IF_DEBUG(scheduler, debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", (long)t->id, whatNext_strs[t->what_next])); - threadPaused(t); #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); #elif defined(PARALLEL_HASKELL) @@ -1546,7 +1568,6 @@ scheduleHandleStackOverflow( StgTSO *t) /* just adjust the stack for this thread, then pop it back * on the run queue. */ - threadPaused(t); { /* enlarge the stack */ StgTSO *new_t = threadStackOverflow(t); @@ -1605,8 +1626,6 @@ scheduleHandleYield( StgTSO *t, nat prev_what_next ) return rtsTrue; } - threadPaused(t); - #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); @@ -1679,12 +1698,19 @@ scheduleHandleThreadBlocked( StgTSO *t emitSchedule = rtsTrue; #else /* !GRAN */ - /* don't need to do anything. Either the thread is blocked on - * I/O, in which case we'll have called addToBlockedQueue - * previously, or it's blocked on an MVar or Blackhole, in which - * case it'll be on the relevant queue already. - */ + + // We don't need to do anything. The thread is blocked, and it + // has tidied up its stack and placed itself on whatever queue + // it needs to be on. + +#if !defined(SMP) ASSERT(t->why_blocked != NotBlocked); + // This might not be true under SMP: we don't have + // exclusive access to this TSO, so someone might have + // woken it up by now. This actually happens: try + // conc023 +RTS -N2. +#endif + IF_DEBUG(scheduler, debugBelch("--<< thread %d (%s) stopped: ", t->id, whatNext_strs[t->what_next]); @@ -1801,12 +1827,13 @@ scheduleHandleThreadFinished( StgMainThread *mainThread removeThreadLabel((StgWord)mainThread->tso->id); #endif if (mainThread->prev == NULL) { + ASSERT(mainThread == main_threads); main_threads = mainThread->link; } else { mainThread->prev->link = mainThread->link; } if (mainThread->link != NULL) { - mainThread->link->prev = NULL; + mainThread->link->prev = mainThread->prev; } releaseCapability(cap); return rtsTrue; // tells schedule() to return @@ -1917,6 +1944,8 @@ scheduleDoGC( Capability *cap STG_UNUSED ) // so this happens periodically: scheduleCheckBlackHoles(); + IF_DEBUG(scheduler, printAllThreads()); + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1957,7 +1986,7 @@ scheduleDoGC( Capability *cap STG_UNUSED ) StgBool rtsSupportsBoundThreads(void) { -#ifdef THREADED_RTS +#if defined(RTS_SUPPORTS_THREADS) return rtsTrue; #else return rtsFalse; @@ -1971,7 +2000,7 @@ rtsSupportsBoundThreads(void) StgBool isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) { -#ifdef THREADED_RTS +#if defined(RTS_SUPPORTS_THREADS) return (tso->main != NULL); #endif return rtsFalse; @@ -2065,8 +2094,12 @@ deleteAllThreads ( void ) StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->global_link; - deleteThread(t); + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(t); + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2951,7 +2984,7 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) #endif #if defined(GRAN) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgTSO *tso; @@ -2991,7 +3024,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) tso->id, tso)); } #elif defined(PARALLEL_HASKELL) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgBlockingQueueElement *next; @@ -3037,7 +3070,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) } #else /* !GRAN && !PARALLEL_HASKELL */ -static StgTSO * +StgTSO * unblockOneLocked(StgTSO *tso) { StgTSO *next; @@ -3219,6 +3252,11 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; + threadRunnable(); + /* ToDo: if invoked from a signal handler, this threadRunnable + * only works if there's another thread (not this one) waiting to + * be woken up. + */ } /* ----------------------------------------------------------------------------- @@ -3342,6 +3380,12 @@ unblockThread(StgTSO *tso) blocked_queue_tl = (StgTSO *)prev; } } +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif goto done; } } @@ -3476,6 +3520,12 @@ unblockThread(StgTSO *tso) blocked_queue_tl = prev; } } +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif goto done; } } @@ -3738,12 +3788,12 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) #ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)frame; #endif - StgClosure *raise; + StgThunk *raise; // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); + raise = (StgThunk *)allocate(sizeofW(StgThunk)+1); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -3781,7 +3831,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) // fun field. // words = frame - sp - 1; - ap = (StgAP_STACK *)allocate(PAP_sizeW(words)); + ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words)); ap->size = words; ap->fun = (StgClosure *)sp[0]; @@ -3848,7 +3898,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) StgWord raiseExceptionHelper (StgTSO *tso, StgClosure *exception) { - StgClosure *raise_closure = NULL; + StgThunk *raise_closure = NULL; StgPtr p, next; StgRetInfoTable *info; // @@ -3885,11 +3935,11 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE); + (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure); + UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure); p = next; continue; @@ -4111,7 +4161,7 @@ printAllThreads(void) debugBelch("all threads:\n"); # endif - for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { + for (t = all_threads; t != END_TSO_QUEUE; ) { debugBelch("\tthread %d @ %p ", t->id, (void *)t); #if defined(DEBUG) { @@ -4119,8 +4169,14 @@ printAllThreads(void) if (label) debugBelch("[\"%s\"] ",(char *)label); } #endif - printThreadStatus(t); - debugBelch("\n"); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + t = t->link; + } else { + printThreadStatus(t); + debugBelch("\n"); + t = t->global_link; + } } }