X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=b78f9d206f19321808246191a9395d96632679ce;hb=4f0f4342c0268e239fd8bb6bd98ad2583b3485dd;hp=6e4d95535dab33264501402f10e11ab0a38f4048;hpb=ce1a3417380dd75e5b93bd5d13c58658780e12e5;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 6e4d955..b78f9d2 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -174,14 +174,12 @@ static StgTSO *suspended_ccalling_threads; /* flag set by signal handler to precipitate a context switch */ int context_switch = 0; +/* flag that tracks whether we have done any execution in this time slice. */ +nat recent_activity = ACTIVITY_YES; + /* if this flag is set as well, give up execution */ rtsBool interrupted = rtsFalse; -/* If this flag is set, we are running Haskell code. Used to detect - * uses of 'foreign import unsafe' that should be 'safe'. - */ -static rtsBool in_haskell = rtsFalse; - /* Next thread ID to allocate. * Locks required: thread_id_mutex */ @@ -290,7 +288,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, Capability *cap, StgTSO *t ); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); -static void scheduleDoGC(Capability *cap); +static void scheduleDoGC(rtsBool force_major); static void unblockThread(StgTSO *tso); static rtsBool checkBlackHoles(void); @@ -307,6 +305,7 @@ static void raiseAsync_(StgTSO *tso, StgClosure *exception, static void printThreadBlockage(StgTSO *tso); static void printThreadStatus(StgTSO *tso); +void printThreadQueue(StgTSO *tso); #if defined(PARALLEL_HASKELL) StgTSO * createSparkThread(rtsSpark spark); @@ -318,13 +317,13 @@ StgTSO * activateSpark (rtsSpark spark); * ------------------------------------------------------------------------- */ #if defined(RTS_SUPPORTS_THREADS) -static rtsBool startingWorkerThread = rtsFalse; +static nat startingWorkerThread = 0; static void taskStart(void) { ACQUIRE_LOCK(&sched_mutex); - startingWorkerThread = rtsFalse; + startingWorkerThread--; schedule(NULL,NULL); taskStop(); RELEASE_LOCK(&sched_mutex); @@ -335,14 +334,14 @@ startSchedulerTaskIfNecessary(void) { if ( !EMPTY_RUN_QUEUE() && !shutting_down_scheduler // not if we're shutting down - && !startingWorkerThread) + && startingWorkerThread==0) { // we don't want to start another worker thread // just because the last one hasn't yet reached the // "waiting for capability" state - startingWorkerThread = rtsTrue; + startingWorkerThread++; if (!maybeStartNewWorker(taskStart)) { - startingWorkerThread = rtsFalse; + startingWorkerThread--; } } } @@ -460,13 +459,12 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, CurrentTSO = event->tso; #endif - IF_DEBUG(scheduler, printAllThreads()); - #if defined(RTS_SUPPORTS_THREADS) // Yield the capability to higher-priority tasks if necessary. // if (cap != NULL) { - yieldCapability(&cap); + yieldCapability(&cap, + mainThread ? &mainThread->bound_thread_cond : NULL ); } // If we do not currently hold a capability, we wait for one @@ -479,10 +477,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // We now have a capability... #endif +#if 0 /* extra sanity checking */ + { + StgMainThread *m; + for (m = main_threads; m != NULL; m = m->link) { + ASSERT(get_itbl(m->tso)->type == TSO); + } + } +#endif + // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). - if (in_haskell) { + if (cap->r.rInHaskell) { errorBelch("schedule: re-entered unsafely.\n" " Perhaps a 'foreign import unsafe' should be 'safe'?"); stg_exit(1); @@ -621,21 +628,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, sched_belch("### thread %d bound to another OS thread", t->id)); // no, bound to a different Haskell thread: pass to that thread PUSH_ON_RUN_QUEUE(t); - passCapability(&m->bound_thread_cond); continue; } } else { if(mainThread != NULL) - // The thread we want to run is bound. + // The thread we want to run is unbound. { IF_DEBUG(scheduler, sched_belch("### this OS thread cannot run thread %d", t->id)); // no, the current native thread is bound to a different // Haskell thread, so pass it to any worker thread PUSH_ON_RUN_QUEUE(t); - passCapabilityToWorker(); continue; } } @@ -671,7 +676,9 @@ run_thread: prev_what_next = t->what_next; errno = t->saved_errno; - in_haskell = rtsTrue; + cap->r.rInHaskell = rtsTrue; + + recent_activity = ACTIVITY_YES; switch (prev_what_next) { @@ -693,13 +700,14 @@ run_thread: barf("schedule: invalid what_next field"); } - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } +#if defined(SMP) + // in SMP mode, we might return with a different capability than + // we started with, if the Haskell thread made a foreign call. So + // let's find out what our current Capability is: + cap = myCapability(); +#endif - in_haskell = rtsFalse; + cap->r.rInHaskell = rtsFalse; // The TSO might have moved, eg. if it re-entered the RTS and a GC // happened. So find the new location: @@ -717,6 +725,12 @@ run_thread: #endif ACQUIRE_LOCK(&sched_mutex); + + // We have run some Haskell code: there might be blackhole-blocked + // threads to wake up now. + if ( blackhole_queue != END_TSO_QUEUE ) { + blackholes_need_checking = rtsTrue; + } #if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId());); @@ -746,7 +760,6 @@ run_thread: case ThreadBlocked: scheduleHandleThreadBlocked(t); - threadPaused(t); break; case ThreadFinished: @@ -758,7 +771,7 @@ run_thread: } if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } - if (ready_to_gc) { scheduleDoGC(cap); } + if (ready_to_gc) { scheduleDoGC(rtsFalse); } } /* end of while() */ IF_PAR_DEBUG(verbose, @@ -827,8 +840,9 @@ scheduleCheckBlockedThreads(void) #if defined(RTS_SUPPORTS_THREADS) // We shouldn't be here... barf("schedule: awaitEvent() in threaded RTS"); -#endif +#else awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); +#endif } } @@ -853,8 +867,14 @@ scheduleCheckBlackHoles( void ) * ------------------------------------------------------------------------- */ static void -scheduleDetectDeadlock(void) +scheduleDetectDeadlock() { + +#if defined(PARALLEL_HASKELL) + // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL + return; +#endif + /* * Detect deadlock: when we have no threads to run, there are no * threads blocked, waiting for I/O, or sleeping, and all the @@ -863,7 +883,16 @@ scheduleDetectDeadlock(void) */ if ( EMPTY_THREAD_QUEUES() ) { -#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) + /* + * In the threaded RTS, we only check for deadlock if there + * has been no activity in a complete timeslice. This means + * we won't eagerly start a full GC just because we don't have + * any threads to run currently. + */ + if (recent_activity != ACTIVITY_INACTIVE) return; +#endif + IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); // Garbage collection can release some new threads due to @@ -871,10 +900,12 @@ scheduleDetectDeadlock(void) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - GarbageCollect(GetRoots,rtsTrue); + + scheduleDoGC( rtsTrue/*force major GC*/ ); + recent_activity = ACTIVITY_DONE_GC; if ( !EMPTY_RUN_QUEUE() ) return; -#if defined(RTS_USER_SIGNALS) +#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS) /* If we have user-installed signal handlers, then wait * for signals to arrive rather then bombing out with a * deadlock. @@ -896,6 +927,7 @@ scheduleDetectDeadlock(void) } #endif +#if !defined(RTS_SUPPORTS_THREADS) /* Probably a real deadlock. Send the current main thread the * Deadlock exception (or in the SMP build, send *all* main * threads the deadlock exception, since none of them can make @@ -905,6 +937,7 @@ scheduleDetectDeadlock(void) StgMainThread *m; m = main_threads; switch (m->tso->why_blocked) { + case BlockedOnSTM: case BlockedOnBlackHole: case BlockedOnException: case BlockedOnMVar: @@ -914,11 +947,6 @@ scheduleDetectDeadlock(void) barf("deadlock: main thread blocked in a strange way"); } } - -#elif defined(RTS_SUPPORTS_THREADS) - // ToDo: add deadlock detection in threaded RTS -#elif defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif } } @@ -1453,12 +1481,15 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", (long)t->id, whatNext_strs[t->what_next], blocks)); - // don't do this if it would push us over the - // alloc_blocks_lim limit; we'll GC first. - if (alloc_blocks + blocks < alloc_blocks_lim) { + // don't do this if the nursery is (nearly) full, we'll GC first. + if (cap->r.rCurrentNursery->link != NULL || + cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop + // if the nursery has only one block. - alloc_blocks += blocks; + ACQUIRE_SM_LOCK bd = allocGroup( blocks ); + RELEASE_SM_LOCK + cap->r.rNursery->n_blocks += blocks; // link the new group into the list bd->link = cap->r.rCurrentNursery; @@ -1466,13 +1497,11 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#ifdef SMP - cap->r.rNursery = g0s0->blocks = bd; -#else +#if !defined(SMP) ASSERT(g0s0->blocks == cap->r.rCurrentNursery && - g0s0->blocks == cap->r.rNursery); - cap->r.rNursery = g0s0->blocks = bd; + g0s0 == cap->r.rNursery); #endif + cap->r.rNursery->blocks = bd; } cap->r.rCurrentNursery->u.back = bd; @@ -1486,20 +1515,15 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { bdescr *x; for (x = bd; x < bd + blocks; x++) { - x->step = g0s0; + x->step = cap->r.rNursery; x->gen_no = 0; x->flags = 0; } } -#if !defined(SMP) - // don't forget to update the block count in g0s0. - g0s0->n_blocks += blocks; - // This assert can be a killer if the app is doing lots // of large block allocations. - ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); -#endif + IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery)); // now update the nursery to point to the new block cap->r.rCurrentNursery = bd; @@ -1513,14 +1537,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } } - /* make all the running tasks block on a condition variable, - * maybe set context_switch and wait till they all pile in, - * then have them wait on a GC condition variable. - */ IF_DEBUG(scheduler, debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", (long)t->id, whatNext_strs[t->what_next])); - threadPaused(t); #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); #elif defined(PARALLEL_HASKELL) @@ -1552,7 +1571,6 @@ scheduleHandleStackOverflow( StgTSO *t) /* just adjust the stack for this thread, then pop it back * on the run queue. */ - threadPaused(t); { /* enlarge the stack */ StgTSO *new_t = threadStackOverflow(t); @@ -1611,8 +1629,6 @@ scheduleHandleYield( StgTSO *t, nat prev_what_next ) return rtsTrue; } - threadPaused(t); - #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); @@ -1685,12 +1701,19 @@ scheduleHandleThreadBlocked( StgTSO *t emitSchedule = rtsTrue; #else /* !GRAN */ - /* don't need to do anything. Either the thread is blocked on - * I/O, in which case we'll have called addToBlockedQueue - * previously, or it's blocked on an MVar or Blackhole, in which - * case it'll be on the relevant queue already. - */ + + // We don't need to do anything. The thread is blocked, and it + // has tidied up its stack and placed itself on whatever queue + // it needs to be on. + +#if !defined(SMP) ASSERT(t->why_blocked != NotBlocked); + // This might not be true under SMP: we don't have + // exclusive access to this TSO, so someone might have + // woken it up by now. This actually happens: try + // conc023 +RTS -N2. +#endif + IF_DEBUG(scheduler, debugBelch("--<< thread %d (%s) stopped: ", t->id, whatNext_strs[t->what_next]); @@ -1807,12 +1830,13 @@ scheduleHandleThreadFinished( StgMainThread *mainThread removeThreadLabel((StgWord)mainThread->tso->id); #endif if (mainThread->prev == NULL) { + ASSERT(mainThread == main_threads); main_threads = mainThread->link; } else { mainThread->prev->link = mainThread->link; } if (mainThread->link != NULL) { - mainThread->link->prev = NULL; + mainThread->link->prev = mainThread->prev; } releaseCapability(cap); return rtsTrue; // tells schedule() to return @@ -1862,10 +1886,12 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) * -------------------------------------------------------------------------- */ static void -scheduleDoGC( Capability *cap STG_UNUSED ) +scheduleDoGC( rtsBool force_major ) { StgTSO *t; #ifdef SMP + Capability *cap; + static rtsBool waiting_for_gc; int n_capabilities = RtsFlags.ParFlags.nNodes - 1; // subtract one because we're already holding one. Capability *caps[n_capabilities]; @@ -1882,33 +1908,52 @@ scheduleDoGC( Capability *cap STG_UNUSED ) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // + // This does mean that there will be multiple entries in the + // thread->capability hash table for the current thread, but + // they will be removed as normal when the capabilities are + // released again. + // - caps[n_capabilities] = cap; + // Someone else is already trying to GC + if (waiting_for_gc) return; + waiting_for_gc = rtsTrue; + while (n_capabilities > 0) { IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities)); waitForReturnCapability(&sched_mutex, &cap); n_capabilities--; caps[n_capabilities] = cap; } + + waiting_for_gc = rtsFalse; #endif /* Kick any transactions which are invalid back to their * atomically frames. When next scheduled they will try to * commit, this commit will fail and they will retry. */ - for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) { - if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateTransaction (t -> trec)) { - IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); - - // strip the stack back to the ATOMICALLY_FRAME, aborting - // the (nested) transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - raiseAsync_(t, NULL, rtsTrue); - + { + StgTSO *next; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + #ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); #endif + } + } } } } @@ -1916,6 +1961,8 @@ scheduleDoGC( Capability *cap STG_UNUSED ) // so this happens periodically: scheduleCheckBlackHoles(); + IF_DEBUG(scheduler, printAllThreads()); + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1924,7 +1971,7 @@ scheduleDoGC( Capability *cap STG_UNUSED ) #if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif - GarbageCollect(GetRoots,rtsFalse); + GarbageCollect(GetRoots, force_major); #if defined(SMP) { @@ -1956,7 +2003,7 @@ scheduleDoGC( Capability *cap STG_UNUSED ) StgBool rtsSupportsBoundThreads(void) { -#ifdef THREADED_RTS +#if defined(RTS_SUPPORTS_THREADS) return rtsTrue; #else return rtsFalse; @@ -1970,7 +2017,7 @@ rtsSupportsBoundThreads(void) StgBool isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) { -#ifdef THREADED_RTS +#if defined(RTS_SUPPORTS_THREADS) return (tso->main != NULL); #endif return rtsFalse; @@ -2064,8 +2111,12 @@ deleteAllThreads ( void ) StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->global_link; - deleteThread(t); + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(t); + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2132,6 +2183,7 @@ suspendThread( StgRegTable *reg ) tok = cap->r.rCurrentTSO->id; /* Hand back capability */ + cap->r.rInHaskell = rtsFalse; releaseCapability(cap); #if defined(RTS_SUPPORTS_THREADS) @@ -2141,7 +2193,6 @@ suspendThread( StgRegTable *reg ) IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); #endif - in_haskell = rtsFalse; RELEASE_LOCK(&sched_mutex); errno = saved_errno; @@ -2189,7 +2240,7 @@ resumeThread( StgInt tok ) tso->why_blocked = NotBlocked; cap->r.rCurrentTSO = tso; - in_haskell = rtsTrue; + cap->r.rInHaskell = rtsTrue; RELEASE_LOCK(&sched_mutex); errno = saved_errno; return &cap->r; @@ -2262,7 +2313,6 @@ StgTSO * createThread(nat size) #endif { - StgTSO *tso; nat stack_size; @@ -2493,8 +2543,8 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ -static void -scheduleThread_(StgTSO *tso) +void +scheduleThreadLocked(StgTSO *tso) { // The thread goes at the *end* of the run-queue, to avoid possible // starvation of any threads already on the queue. @@ -2506,7 +2556,7 @@ void scheduleThread(StgTSO* tso) { ACQUIRE_LOCK(&sched_mutex); - scheduleThread_(tso); + scheduleThreadLocked(tso); RELEASE_LOCK(&sched_mutex); } @@ -2631,6 +2681,7 @@ initScheduler(void) #if defined(SMP) /* eagerly start some extra workers */ + startingWorkerThread = RtsFlags.ParFlags.nNodes; startTasks(RtsFlags.ParFlags.nNodes, taskStart); #endif @@ -2646,9 +2697,62 @@ exitScheduler( void ) { interrupted = rtsTrue; shutting_down_scheduler = rtsTrue; + #if defined(RTS_SUPPORTS_THREADS) if (threadIsTask(osThreadId())) { taskStop(); } stopTaskManager(); + // + // What can we do here? There are a bunch of worker threads, it + // might be nice to let them exit cleanly. There may be some main + // threads in the run queue; we should let them return to their + // callers with an Interrupted state. We can't in general wait + // for all the running Tasks to stop, because some might be off in + // a C call that is blocked. + // + // Letting the run queue drain is the safest thing. That lets any + // main threads return that can return, and cleans up all the + // runnable threads. Then we grab all the Capabilities to stop + // anything unexpected happening while we shut down. + // + // ToDo: this doesn't let us get the time stats from the worker + // tasks, because they haven't called taskStop(). + // + ACQUIRE_LOCK(&sched_mutex); + { + nat i; + for (i = 1000; i > 0; i--) { + if (EMPTY_RUN_QUEUE()) { + IF_DEBUG(scheduler, sched_belch("run queue is empty")); + break; + } + IF_DEBUG(scheduler, sched_belch("yielding")); + RELEASE_LOCK(&sched_mutex); + prodWorker(); + yieldThread(); + ACQUIRE_LOCK(&sched_mutex); + } + } + +#ifdef SMP + { + Capability *cap; + int n_capabilities = RtsFlags.ParFlags.nNodes; + Capability *caps[n_capabilities]; + nat i; + + while (n_capabilities > 0) { + IF_DEBUG(scheduler, sched_belch("exitScheduler: grabbing all the capabilies (%d left)", n_capabilities)); + waitForReturnCapability(&sched_mutex, &cap); + n_capabilities--; + caps[n_capabilities] = cap; + } + } +#else + { + Capability *cap; + waitForReturnCapability(&sched_mutex, &cap); + } +#endif #endif } @@ -2949,7 +3053,7 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) #endif #if defined(GRAN) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgTSO *tso; @@ -2989,7 +3093,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) tso->id, tso)); } #elif defined(PARALLEL_HASKELL) -static StgBlockingQueueElement * +StgBlockingQueueElement * unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) { StgBlockingQueueElement *next; @@ -3035,7 +3139,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) } #else /* !GRAN && !PARALLEL_HASKELL */ -static StgTSO * +StgTSO * unblockOneLocked(StgTSO *tso) { StgTSO *next; @@ -3191,6 +3295,8 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) void awakenBlockedQueueNoLock(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); } @@ -3199,6 +3305,8 @@ awakenBlockedQueueNoLock(StgTSO *tso) void awakenBlockedQueue(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm ACQUIRE_LOCK(&sched_mutex); while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); @@ -3217,6 +3325,11 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; + threadRunnable(); + /* ToDo: if invoked from a signal handler, this threadRunnable + * only works if there's another thread (not this one) waiting to + * be woken up. + */ } /* ----------------------------------------------------------------------------- @@ -3340,6 +3453,12 @@ unblockThread(StgTSO *tso) blocked_queue_tl = (StgTSO *)prev; } } +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif goto done; } } @@ -3474,6 +3593,12 @@ unblockThread(StgTSO *tso) blocked_queue_tl = prev; } } +#if defined(mingw32_HOST_OS) + /* (Cooperatively) signal that the worker thread should abort + * the request. + */ + abandonWorkRequest(tso->block_info.async_result->reqID); +#endif goto done; } } @@ -3540,6 +3665,7 @@ checkBlackHoles( void ) ASSERT(t->why_blocked == BlockedOnBlackHole); type = get_itbl(t->block_info.closure)->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) { + IF_DEBUG(sanity,checkTSO(t)); t = unblockOneLocked(t); *prev = t; any_woke_up = rtsTrue; @@ -3736,12 +3862,12 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) #ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)frame; #endif - StgClosure *raise; + StgThunk *raise; // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); + raise = (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -3779,7 +3905,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) // fun field. // words = frame - sp - 1; - ap = (StgAP_STACK *)allocate(PAP_sizeW(words)); + ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words)); ap->size = words; ap->fun = (StgClosure *)sp[0]; @@ -3846,7 +3972,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) StgWord raiseExceptionHelper (StgTSO *tso, StgClosure *exception) { - StgClosure *raise_closure = NULL; + StgThunk *raise_closure = NULL; StgPtr p, next; StgRetInfoTable *info; // @@ -3883,11 +4009,11 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE); + (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } - UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure); + UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure); p = next; continue; @@ -4022,10 +4148,10 @@ printThreadBlockage(StgTSO *tso) { switch (tso->why_blocked) { case BlockedOnRead: - debugBelch("is blocked on read from fd %ld", tso->block_info.fd); + debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd)); break; case BlockedOnWrite: - debugBelch("is blocked on write to fd %ld", tso->block_info.fd); + debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd)); break; #if defined(mingw32_HOST_OS) case BlockedOnDoProc: @@ -4033,10 +4159,10 @@ printThreadBlockage(StgTSO *tso) break; #endif case BlockedOnDelay: - debugBelch("is blocked until %ld", tso->block_info.target); + debugBelch("is blocked until %ld", (long)(tso->block_info.target)); break; case BlockedOnMVar: - debugBelch("is blocked on an MVar"); + debugBelch("is blocked on an MVar @ %p", tso->block_info.closure); break; case BlockedOnException: debugBelch("is blocked on delivering an exception to thread %d", @@ -4109,21 +4235,45 @@ printAllThreads(void) debugBelch("all threads:\n"); # endif - for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - debugBelch("\tthread %d @ %p ", t->id, (void *)t); + for (t = all_threads; t != END_TSO_QUEUE; ) { + debugBelch("\tthread %4d @ %p ", t->id, (void *)t); #if defined(DEBUG) { void *label = lookupThreadLabel(t->id); if (label) debugBelch("[\"%s\"] ",(char *)label); } #endif - printThreadStatus(t); - debugBelch("\n"); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + t = t->link; + } else { + printThreadStatus(t); + debugBelch("\n"); + t = t->global_link; + } } } - + #ifdef DEBUG +// useful from gdb +void +printThreadQueue(StgTSO *t) +{ + nat i = 0; + for (; t != END_TSO_QUEUE; t = t->link) { + debugBelch("\tthread %d @ %p ", t->id, (void *)t); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + } else { + printThreadStatus(t); + debugBelch("\n"); + } + i++; + } + debugBelch("%d threads on queue\n", i); +} + /* Print a whole blocking queue attached to node (debugging only). */