X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=a5f133e994f9a85846cf628e026db15155346038;hb=14c4df60baa8c86fe7725b821536b3a507511a66;hp=206531d8dcb42741f4b219d02880548d196e9214;hpb=1347b6d55044c7d13c3d0135a29869b797789838;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 206531d..a5f133e 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -145,10 +145,16 @@ StgTSO *run_queue_hd = NULL; StgTSO *run_queue_tl = NULL; StgTSO *blocked_queue_hd = NULL; StgTSO *blocked_queue_tl = NULL; +StgTSO *blackhole_queue = NULL; StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */ #endif +/* The blackhole_queue should be checked for threads to wake up. See + * Schedule.h for more thorough comment. + */ +rtsBool blackholes_need_checking = rtsFalse; + /* Linked list of all threads. * Used for detecting garbage collected threads. */ @@ -171,11 +177,6 @@ int context_switch = 0; /* if this flag is set as well, give up execution */ rtsBool interrupted = rtsFalse; -/* If this flag is set, we are running Haskell code. Used to detect - * uses of 'foreign import unsafe' that should be 'safe'. - */ -static rtsBool in_haskell = rtsFalse; - /* Next thread ID to allocate. * Locks required: thread_id_mutex */ @@ -211,12 +212,6 @@ StgTSO *CurrentTSO; */ StgTSO dummy_tso; -# if defined(SMP) -static Condition gc_pending_cond = INIT_COND_VAR; -# endif - -static rtsBool ready_to_gc; - /* * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) -- * in an MT setting, needed to signal that a worker thread shouldn't hang around @@ -267,9 +262,9 @@ static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // scheduler clearer. // static void schedulePreLoop(void); -static void scheduleHandleInterrupt(void); static void scheduleStartSignalHandlers(void); static void scheduleCheckBlockedThreads(void); +static void scheduleCheckBlackHoles(void); static void scheduleDetectDeadlock(void); #if defined(GRAN) static StgTSO *scheduleProcessEvent(rtsEvent *event); @@ -289,10 +284,11 @@ static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next ); static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, Capability *cap, StgTSO *t ); -static void scheduleDoHeapProfile(void); -static void scheduleDoGC(void); +static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); +static void scheduleDoGC(Capability *cap); static void unblockThread(StgTSO *tso); +static rtsBool checkBlackHoles(void); static SchedulerStatus waitThread_(/*out*/StgMainThread* m, Capability *initialCapability ); @@ -317,34 +313,33 @@ StgTSO * activateSpark (rtsSpark spark); * ------------------------------------------------------------------------- */ #if defined(RTS_SUPPORTS_THREADS) -static rtsBool startingWorkerThread = rtsFalse; +static nat startingWorkerThread = 0; static void taskStart(void) { ACQUIRE_LOCK(&sched_mutex); - startingWorkerThread = rtsFalse; + startingWorkerThread--; schedule(NULL,NULL); + taskStop(); RELEASE_LOCK(&sched_mutex); } void startSchedulerTaskIfNecessary(void) { - if(run_queue_hd != END_TSO_QUEUE - || blocked_queue_hd != END_TSO_QUEUE - || sleeping_queue != END_TSO_QUEUE) - { - if(!startingWorkerThread) - { // we don't want to start another worker thread - // just because the last one hasn't yet reached the - // "waiting for capability" state - startingWorkerThread = rtsTrue; - if (!startTask(taskStart)) { - startingWorkerThread = rtsFalse; - } + if ( !EMPTY_RUN_QUEUE() + && !shutting_down_scheduler // not if we're shutting down + && startingWorkerThread==0) + { + // we don't want to start another worker thread + // just because the last one hasn't yet reached the + // "waiting for capability" state + startingWorkerThread++; + if (!maybeStartNewWorker(taskStart)) { + startingWorkerThread--; + } } - } } #endif @@ -423,6 +418,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, # endif #endif nat prev_what_next; + rtsBool ready_to_gc; // Pre-condition: sched_mutex is held. // We might have a capability, passed in as initialCapability. @@ -461,19 +457,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, IF_DEBUG(scheduler, printAllThreads()); -#if defined(SMP) - // - // Wait until GC has completed, if necessary. - // - if (ready_to_gc) { - if (cap != NULL) { - releaseCapability(cap); - IF_DEBUG(scheduler,sched_belch("waiting for GC")); - waitCondition( &gc_pending_cond, &sched_mutex ); - } - } -#endif - #if defined(RTS_SUPPORTS_THREADS) // Yield the capability to higher-priority tasks if necessary. // @@ -494,13 +477,32 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). - if (in_haskell) { + if (cap->r.rInHaskell) { errorBelch("schedule: re-entered unsafely.\n" " Perhaps a 'foreign import unsafe' should be 'safe'?"); stg_exit(1); } - scheduleHandleInterrupt(); + // + // Test for interruption. If interrupted==rtsTrue, then either + // we received a keyboard interrupt (^C), or the scheduler is + // trying to shut down all the tasks (shutting_down_scheduler) in + // the threaded RTS. + // + if (interrupted) { + if (shutting_down_scheduler) { + IF_DEBUG(scheduler, sched_belch("shutting down")); + releaseCapability(cap); + if (mainThread) { + mainThread->stat = Interrupted; + mainThread->ret = NULL; + } + return; + } else { + IF_DEBUG(scheduler, sched_belch("interrupted")); + deleteAllThreads(); + } + } #if defined(not_yet) && defined(SMP) // @@ -526,6 +528,12 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, scheduleStartSignalHandlers(); + // Only check the black holes here if we've nothing else to do. + // During normal execution, the black hole list only gets checked + // at GC time, to avoid repeatedly traversing this possibly long + // list each time around the scheduler. + if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); } + scheduleCheckBlockedThreads(); scheduleDetectDeadlock(); @@ -652,13 +660,13 @@ run_thread: startHeapProfTimer(); #endif - /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ - /* Run the current thread - */ + // ---------------------------------------------------------------------- + // Run the current thread + prev_what_next = t->what_next; errno = t->saved_errno; - in_haskell = rtsTrue; + cap->r.rInHaskell = rtsTrue; switch (prev_what_next) { @@ -680,7 +688,13 @@ run_thread: barf("schedule: invalid what_next field"); } - in_haskell = rtsFalse; + // We have run some Haskell code: there might be blackhole-blocked + // threads to wake up now. + if ( blackhole_queue != END_TSO_QUEUE ) { + blackholes_need_checking = rtsTrue; + } + + cap->r.rInHaskell = rtsFalse; // The TSO might have moved, eg. if it re-entered the RTS and a GC // happened. So find the new location: @@ -689,7 +703,7 @@ run_thread: // And save the current errno in this thread. t->saved_errno = errno; - /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ + // ---------------------------------------------------------------------- /* Costs for the scheduler are assigned to CCS_SYSTEM */ #if defined(PROFILING) @@ -707,6 +721,8 @@ run_thread: schedulePostRunThread(); + ready_to_gc = rtsFalse; + switch (ret) { case HeapOverflow: ready_to_gc = scheduleHandleHeapOverflow(cap,t); @@ -736,8 +752,8 @@ run_thread: barf("schedule: invalid thread return code %d", (int)ret); } - scheduleDoHeapProfile(); - scheduleDoGC(); + if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } + if (ready_to_gc) { scheduleDoGC(cap); } } /* end of while() */ IF_PAR_DEBUG(verbose, @@ -772,33 +788,6 @@ schedulePreLoop(void) } /* ---------------------------------------------------------------------------- - * Deal with the interrupt flag - * ASSUMES: sched_mutex - * ------------------------------------------------------------------------- */ - -static -void scheduleHandleInterrupt(void) -{ - // - // Test for interruption. If interrupted==rtsTrue, then either - // we received a keyboard interrupt (^C), or the scheduler is - // trying to shut down all the tasks (shutting_down_scheduler) in - // the threaded RTS. - // - if (interrupted) { - if (shutting_down_scheduler) { - IF_DEBUG(scheduler, sched_belch("shutting down")); -#if defined(RTS_SUPPORTS_THREADS) - shutdownThread(); -#endif - } else { - IF_DEBUG(scheduler, sched_belch("interrupted")); - deleteAllThreads(); - } - } -} - -/* ---------------------------------------------------------------------------- * Start any pending signal handlers * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ @@ -806,7 +795,7 @@ void scheduleHandleInterrupt(void) static void scheduleStartSignalHandlers(void) { -#if defined(RTS_USER_SIGNALS) +#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS) if (signals_pending()) { RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); @@ -834,7 +823,22 @@ scheduleCheckBlockedThreads(void) // We shouldn't be here... barf("schedule: awaitEvent() in threaded RTS"); #endif - awaitEvent( EMPTY_RUN_QUEUE() ); + awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); + } +} + + +/* ---------------------------------------------------------------------------- + * Check for threads blocked on BLACKHOLEs that can be woken up + * ASSUMES: sched_mutex + * ------------------------------------------------------------------------- */ +static void +scheduleCheckBlackHoles( void ) +{ + if ( blackholes_need_checking ) + { + checkBlackHoles(); + blackholes_need_checking = rtsFalse; } } @@ -848,18 +852,13 @@ scheduleDetectDeadlock(void) { /* * Detect deadlock: when we have no threads to run, there are no - * threads waiting on I/O or sleeping, and all the other tasks are - * waiting for work, we must have a deadlock of some description. - * - * We first try to find threads blocked on themselves (ie. black - * holes), and generate NonTermination exceptions where necessary. - * - * If no threads are black holed, we have a deadlock situation, so - * inform all the main threads. + * threads blocked, waiting for I/O, or sleeping, and all the + * other tasks are waiting for work, we must have a deadlock of + * some description. */ -#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS) if ( EMPTY_THREAD_QUEUES() ) { +#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); // Garbage collection can release some new threads due to @@ -901,6 +900,7 @@ scheduleDetectDeadlock(void) StgMainThread *m; m = main_threads; switch (m->tso->why_blocked) { + case BlockedOnSTM: case BlockedOnBlackHole: case BlockedOnException: case BlockedOnMVar: @@ -910,13 +910,13 @@ scheduleDetectDeadlock(void) barf("deadlock: main thread blocked in a strange way"); } } - } #elif defined(RTS_SUPPORTS_THREADS) // ToDo: add deadlock detection in threaded RTS #elif defined(PARALLEL_HASKELL) // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif + } } /* ---------------------------------------------------------------------------- @@ -1441,12 +1441,12 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rHpAlloc > BLOCK_SIZE) { // if so, get one and push it on the front of the nursery. bdescr *bd; - nat blocks; + lnat blocks; - blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE; + blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE; IF_DEBUG(scheduler, - debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", + debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", (long)t->id, whatNext_strs[t->what_next], blocks)); // don't do this if it would push us over the @@ -1462,9 +1462,12 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { +#if !defined(SMP) ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0->blocks == cap->r.rNursery); - cap->r.rNursery = g0s0->blocks = bd; + g0s0 == cap->r.rNursery); + g0s0->blocks = bd; +#endif + cap->r.rNursery->blocks = bd; } cap->r.rCurrentNursery->u.back = bd; @@ -1484,11 +1487,14 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } +#if !defined(SMP) // don't forget to update the block count in g0s0. g0s0->n_blocks += blocks; + // This assert can be a killer if the app is doing lots // of large block allocations. ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); +#endif // now update the nursery to point to the new block cap->r.rCurrentNursery = bd; @@ -1827,10 +1833,10 @@ scheduleHandleThreadFinished( StgMainThread *mainThread * Perform a heap census, if PROFILING * -------------------------------------------------------------------------- */ -static void -scheduleDoHeapProfile(void) +static rtsBool +scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) { -#ifdef PROFILING +#if defined(PROFILING) // When we have +RTS -i0 and we're heap profiling, do a census at // every GC. This lets us get repeatable runs for debugging. if (performHeapProfile || @@ -1839,9 +1845,10 @@ scheduleDoHeapProfile(void) GarbageCollect(GetRoots, rtsTrue); heapCensus(); performHeapProfile = rtsFalse; - ready_to_gc = rtsFalse; // we already GC'd + return rtsTrue; // true <=> we already GC'd } #endif + return rtsFalse; } /* ----------------------------------------------------------------------------- @@ -1850,63 +1857,97 @@ scheduleDoHeapProfile(void) * -------------------------------------------------------------------------- */ static void -scheduleDoGC(void) +scheduleDoGC( Capability *cap STG_UNUSED ) { StgTSO *t; +#ifdef SMP + static rtsBool waiting_for_gc; + int n_capabilities = RtsFlags.ParFlags.nNodes - 1; + // subtract one because we're already holding one. + Capability *caps[n_capabilities]; +#endif #ifdef SMP - // The last task to stop actually gets to do the GC. The rest - // of the tasks release their capabilities and wait gc_pending_cond. - if (ready_to_gc && allFreeCapabilities()) -#else - if (ready_to_gc) + // In order to GC, there must be no threads running Haskell code. + // Therefore, the GC thread needs to hold *all* the capabilities, + // and release them after the GC has completed. + // + // This seems to be the simplest way: previous attempts involved + // making all the threads with capabilities give up their + // capabilities and sleep except for the *last* one, which + // actually did the GC. But it's quite hard to arrange for all + // the other tasks to sleep and stay asleep. + // + + // Someone else is already trying to GC + if (waiting_for_gc) return; + waiting_for_gc = rtsTrue; + + caps[n_capabilities] = cap; + while (n_capabilities > 0) { + IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities)); + waitForReturnCapability(&sched_mutex, &cap); + n_capabilities--; + caps[n_capabilities] = cap; + } + + waiting_for_gc = rtsFalse; #endif - { - /* Kick any transactions which are invalid back to their - * atomically frames. When next scheduled they will try to - * commit, this commit will fail and they will retry. - */ - for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { - if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateTransaction (t -> trec)) { - IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); - - // strip the stack back to the ATOMICALLY_FRAME, aborting - // the (nested) transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - raiseAsync_(t, NULL, rtsTrue); - + + /* Kick any transactions which are invalid back to their + * atomically frames. When next scheduled they will try to + * commit, this commit will fail and they will retry. + */ + for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { + if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateTransaction (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + #ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); #endif - } } } - - /* everybody back, start the GC. - * Could do it in this thread, or signal a condition var - * to do it in another thread. Either way, we need to - * broadcast on gc_pending_cond afterward. - */ + } + + // so this happens periodically: + scheduleCheckBlackHoles(); + + /* everybody back, start the GC. + * Could do it in this thread, or signal a condition var + * to do it in another thread. Either way, we need to + * broadcast on gc_pending_cond afterward. + */ #if defined(RTS_SUPPORTS_THREADS) - IF_DEBUG(scheduler,sched_belch("doing GC")); + IF_DEBUG(scheduler,sched_belch("doing GC")); #endif - GarbageCollect(GetRoots,rtsFalse); - ready_to_gc = rtsFalse; + GarbageCollect(GetRoots,rtsFalse); + #if defined(SMP) - broadcastCondition(&gc_pending_cond); + { + // release our stash of capabilities. + nat i; + for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) { + releaseCapability(caps[i]); + } + } #endif + #if defined(GRAN) - /* add a ContinueThread event to continue execution of current thread */ - new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], - ContinueThread, - t, (StgClosure*)NULL, (rtsSpark*)NULL); - IF_GRAN_DEBUG(bq, - debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n"); - G_EVENTQ(0); - G_CURR_THREADQ(0)); + /* add a ContinueThread event to continue execution of current thread */ + new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], + ContinueThread, + t, (StgClosure*)NULL, (rtsSpark*)NULL); + IF_GRAN_DEBUG(bq, + debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n"); + G_EVENTQ(0); + G_CURR_THREADQ(0)); #endif /* GRAN */ - } } /* --------------------------------------------------------------------------- @@ -2036,6 +2077,7 @@ deleteAllThreads ( void ) // being GC'd, and we don't want the "main thread has been GC'd" panic. ASSERT(blocked_queue_hd == END_TSO_QUEUE); + ASSERT(blackhole_queue == END_TSO_QUEUE); ASSERT(sleeping_queue == END_TSO_QUEUE); } @@ -2092,6 +2134,7 @@ suspendThread( StgRegTable *reg ) tok = cap->r.rCurrentTSO->id; /* Hand back capability */ + cap->r.rInHaskell = rtsFalse; releaseCapability(cap); #if defined(RTS_SUPPORTS_THREADS) @@ -2101,7 +2144,6 @@ suspendThread( StgRegTable *reg ) IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); #endif - in_haskell = rtsFalse; RELEASE_LOCK(&sched_mutex); errno = saved_errno; @@ -2149,7 +2191,7 @@ resumeThread( StgInt tok ) tso->why_blocked = NotBlocked; cap->r.rCurrentTSO = tso; - in_haskell = rtsTrue; + cap->r.rInHaskell = rtsTrue; RELEASE_LOCK(&sched_mutex); errno = saved_errno; return &cap->r; @@ -2547,6 +2589,7 @@ initScheduler(void) blocked_queue_hds[i] = END_TSO_QUEUE; blocked_queue_tls[i] = END_TSO_QUEUE; ccalling_threadss[i] = END_TSO_QUEUE; + blackhole_queue[i] = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; } #else @@ -2554,6 +2597,7 @@ initScheduler(void) run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; + blackhole_queue = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; #endif @@ -2584,8 +2628,13 @@ initScheduler(void) initCapabilities(); #if defined(RTS_SUPPORTS_THREADS) - /* start our haskell execution tasks */ - startTaskManager(0,taskStart); + initTaskManager(); +#endif + +#if defined(SMP) + /* eagerly start some extra workers */ + startingWorkerThread = RtsFlags.ParFlags.nNodes; + startTasks(RtsFlags.ParFlags.nNodes, taskStart); #endif #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL) @@ -2598,11 +2647,12 @@ initScheduler(void) void exitScheduler( void ) { + interrupted = rtsTrue; + shutting_down_scheduler = rtsTrue; #if defined(RTS_SUPPORTS_THREADS) - stopTaskManager(); + if (threadIsTask(osThreadId())) { taskStop(); } + stopTaskManager(); #endif - interrupted = rtsTrue; - shutting_down_scheduler = rtsTrue; } /* ---------------------------------------------------------------------------- @@ -2709,6 +2759,10 @@ GetRoots( evac_fn evac ) } #endif + if (blackhole_queue != END_TSO_QUEUE) { + evac((StgClosure **)&blackhole_queue); + } + if (suspended_ccalling_threads != END_TSO_QUEUE) { evac((StgClosure **)&suspended_ccalling_threads); } @@ -2783,7 +2837,8 @@ performGCWithRoots(void (*get_roots)(evac_fn)) static StgTSO * threadStackOverflow(StgTSO *tso) { - nat new_stack_size, new_tso_size, stack_words; + nat new_stack_size, stack_words; + lnat new_tso_size; StgPtr new_sp; StgTSO *dest; @@ -2807,7 +2862,7 @@ threadStackOverflow(StgTSO *tso) * Finally round up so the TSO ends up as a whole number of blocks. */ new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size); - new_tso_size = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + + new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + TSO_STRUCT_SIZE)/sizeof(W_); new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; @@ -3364,12 +3419,9 @@ unblockThread(StgTSO *tso) } case BlockedOnBlackHole: - ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ); { - StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure); - - last = &bq->blocking_queue; - for (t = bq->blocking_queue; t != END_TSO_QUEUE; + last = &blackhole_queue; + for (t = blackhole_queue; t != END_TSO_QUEUE; last = &t->link, t = t->link) { if (t == tso) { *last = tso->link; @@ -3461,6 +3513,49 @@ unblockThread(StgTSO *tso) #endif /* ----------------------------------------------------------------------------- + * checkBlackHoles() + * + * Check the blackhole_queue for threads that can be woken up. We do + * this periodically: before every GC, and whenever the run queue is + * empty. + * + * An elegant solution might be to just wake up all the blocked + * threads with awakenBlockedQueue occasionally: they'll go back to + * sleep again if the object is still a BLACKHOLE. Unfortunately this + * doesn't give us a way to tell whether we've actually managed to + * wake up any threads, so we would be busy-waiting. + * + * -------------------------------------------------------------------------- */ + +static rtsBool +checkBlackHoles( void ) +{ + StgTSO **prev, *t; + rtsBool any_woke_up = rtsFalse; + StgHalfWord type; + + IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes")); + + // ASSUMES: sched_mutex + prev = &blackhole_queue; + t = blackhole_queue; + while (t != END_TSO_QUEUE) { + ASSERT(t->why_blocked == BlockedOnBlackHole); + type = get_itbl(t->block_info.closure)->type; + if (type != BLACKHOLE && type != CAF_BLACKHOLE) { + t = unblockOneLocked(t); + *prev = t; + any_woke_up = rtsTrue; + } else { + prev = &t->link; + t = t->link; + } + } + + return any_woke_up; +} + +/* ----------------------------------------------------------------------------- * raiseAsync() * * The following function implements the magic for raising an @@ -3930,18 +4025,18 @@ printThreadBlockage(StgTSO *tso) { switch (tso->why_blocked) { case BlockedOnRead: - debugBelch("is blocked on read from fd %d", tso->block_info.fd); + debugBelch("is blocked on read from fd %ld", tso->block_info.fd); break; case BlockedOnWrite: - debugBelch("is blocked on write to fd %d", tso->block_info.fd); + debugBelch("is blocked on write to fd %ld", tso->block_info.fd); break; #if defined(mingw32_HOST_OS) case BlockedOnDoProc: - debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID); + debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID); break; #endif case BlockedOnDelay: - debugBelch("is blocked until %d", tso->block_info.target); + debugBelch("is blocked until %ld", tso->block_info.target); break; case BlockedOnMVar: debugBelch("is blocked on an MVar"); @@ -4162,25 +4257,6 @@ print_bq (StgClosure *node) } /* for */ debugBelch("\n"); } -#else -/* - Nice and easy: only TSOs on the blocking queue -*/ -void -print_bq (StgClosure *node) -{ - StgTSO *tso; - - ASSERT(node!=(StgClosure*)NULL); // sanity check - for (tso = ((StgBlockingQueue*)node)->blocking_queue; - tso != END_TSO_QUEUE; - tso=tso->link) { - ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check - ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check - debugBelch(" TSO %d (%p),", tso->id, tso); - } - debugBelch("\n"); -} # endif #if defined(PARALLEL_HASKELL)