X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=b78f9d206f19321808246191a9395d96632679ce;hb=4f0f4342c0268e239fd8bb6bd98ad2583b3485dd;hp=dcd32dad766457390730bf1cd820459d1ccc75be;hpb=6b7f3c93728b29302748b5cde56101588bdcb9bf;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index dcd32da..b78f9d2 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -707,12 +707,6 @@ run_thread: cap = myCapability(); #endif - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } - cap->r.rInHaskell = rtsFalse; // The TSO might have moved, eg. if it re-entered the RTS and a GC @@ -731,6 +725,12 @@ run_thread: #endif ACQUIRE_LOCK(&sched_mutex); + + // We have run some Haskell code: there might be blackhole-blocked + // threads to wake up now. + if ( blackhole_queue != END_TSO_QUEUE ) { + blackholes_need_checking = rtsTrue; + } #if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId());); @@ -841,8 +841,9 @@ scheduleCheckBlockedThreads(void) // We shouldn't be here... barf("schedule: awaitEvent() in threaded RTS"); #else - awaitEvent( EMPTY_RUN_QUEUE() ); + awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); #endif + } } @@ -1485,7 +1486,9 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop // if the nursery has only one block. + ACQUIRE_SM_LOCK bd = allocGroup( blocks ); + RELEASE_SM_LOCK cap->r.rNursery->n_blocks += blocks; // link the new group into the list @@ -1675,7 +1678,709 @@ scheduleHandleThreadBlocked( StgTSO *t /* ngoq Dogh! ASSERT(procStatus[CurrentProc]==Busy || - ((procStatus[Curren + ((procStatus[CurrentProc]==Fetching) && + (t->block_info.closure!=(StgClosure*)NULL))); + if (run_queue_hds[CurrentProc] == END_TSO_QUEUE && + !(!RtsFlags.GranFlags.DoAsyncFetch && + procStatus[CurrentProc]==Fetching)) + procStatus[CurrentProc] = Idle; + */ +#elif defined(PAR) + IF_DEBUG(scheduler, + debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", + t->id, t, whatNext_strs[t->what_next], t->block_info.closure)); + IF_PAR_DEBUG(bq, + + if (t->block_info.closure!=(StgClosure*)NULL) + print_bq(t->block_info.closure)); + + /* Send a fetch (if BlockedOnGA) and dump event to log file */ + blockThread(t); + + /* whatever we schedule next, we must log that schedule */ + emitSchedule = rtsTrue; + +#else /* !GRAN */ + + // We don't need to do anything. The thread is blocked, and it + // has tidied up its stack and placed itself on whatever queue + // it needs to be on. + +#if !defined(SMP) + ASSERT(t->why_blocked != NotBlocked); + // This might not be true under SMP: we don't have + // exclusive access to this TSO, so someone might have + // woken it up by now. This actually happens: try + // conc023 +RTS -N2. +#endif + + IF_DEBUG(scheduler, + debugBelch("--<< thread %d (%s) stopped: ", + t->id, whatNext_strs[t->what_next]); + printThreadBlockage(t); + debugBelch("\n")); + + /* Only for dumping event to log file + ToDo: do I need this in GranSim, too? + blockThread(t); + */ +#endif +} + +/* ----------------------------------------------------------------------------- + * Handle a thread that returned to the scheduler with ThreadFinished + * ASSUMES: sched_mutex + * -------------------------------------------------------------------------- */ + +static rtsBool +scheduleHandleThreadFinished( StgMainThread *mainThread + USED_WHEN_RTS_SUPPORTS_THREADS, + Capability *cap, + StgTSO *t ) +{ + /* Need to check whether this was a main thread, and if so, + * return with the return value. + * + * We also end up here if the thread kills itself with an + * uncaught exception, see Exception.cmm. + */ + IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", + t->id, whatNext_strs[t->what_next])); + +#if defined(GRAN) + endThread(t, CurrentProc); // clean-up the thread +#elif defined(PARALLEL_HASKELL) + /* For now all are advisory -- HWL */ + //if(t->priority==AdvisoryPriority) ?? + advisory_thread_count--; // JB: Caution with this counter, buggy! + +# if defined(DIST) + if(t->dist.priority==RevalPriority) + FinishReval(t); +# endif + +# if defined(EDENOLD) + // the thread could still have an outport... (BUG) + if (t->eden.outport != -1) { + // delete the outport for the tso which has finished... + IF_PAR_DEBUG(eden_ports, + debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n", + t->eden.outport, t->id)); + deleteOPT(t); + } + // thread still in the process (HEAVY BUG! since outport has just been closed...) + if (t->eden.epid != -1) { + IF_PAR_DEBUG(eden_ports, + debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n", + t->id, t->eden.epid)); + removeTSOfromProcess(t); + } +# endif + +# if defined(PAR) + if (RtsFlags.ParFlags.ParStats.Full && + !RtsFlags.ParFlags.ParStats.Suppressed) + DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */); + + // t->par only contains statistics: left out for now... + IF_PAR_DEBUG(fish, + debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n", + t->id,t,t->par.sparkname)); +# endif +#endif // PARALLEL_HASKELL + + // + // Check whether the thread that just completed was a main + // thread, and if so return with the result. + // + // There is an assumption here that all thread completion goes + // through this point; we need to make sure that if a thread + // ends up in the ThreadKilled state, that it stays on the run + // queue so it can be dealt with here. + // + if ( +#if defined(RTS_SUPPORTS_THREADS) + mainThread != NULL +#else + mainThread->tso == t +#endif + ) + { + // We are a bound thread: this must be our thread that just + // completed. + ASSERT(mainThread->tso == t); + + if (t->what_next == ThreadComplete) { + if (mainThread->ret) { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; + } + mainThread->stat = Success; + } else { + if (mainThread->ret) { + *(mainThread->ret) = NULL; + } + if (interrupted) { + mainThread->stat = Interrupted; + } else { + mainThread->stat = Killed; + } + } +#ifdef DEBUG + removeThreadLabel((StgWord)mainThread->tso->id); +#endif + if (mainThread->prev == NULL) { + ASSERT(mainThread == main_threads); + main_threads = mainThread->link; + } else { + mainThread->prev->link = mainThread->link; + } + if (mainThread->link != NULL) { + mainThread->link->prev = mainThread->prev; + } + releaseCapability(cap); + return rtsTrue; // tells schedule() to return + } + +#ifdef RTS_SUPPORTS_THREADS + ASSERT(t->main == NULL); +#else + if (t->main != NULL) { + // Must be a main thread that is not the topmost one. Leave + // it on the run queue until the stack has unwound to the + // point where we can deal with this. Leaving it on the run + // queue also ensures that the garbage collector knows about + // this thread and its return value (it gets dropped from the + // all_threads list so there's no other way to find it). + APPEND_TO_RUN_QUEUE(t); + } +#endif + return rtsFalse; +} + +/* ----------------------------------------------------------------------------- + * Perform a heap census, if PROFILING + * -------------------------------------------------------------------------- */ + +static rtsBool +scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) +{ +#if defined(PROFILING) + // When we have +RTS -i0 and we're heap profiling, do a census at + // every GC. This lets us get repeatable runs for debugging. + if (performHeapProfile || + (RtsFlags.ProfFlags.profileInterval==0 && + RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) { + GarbageCollect(GetRoots, rtsTrue); + heapCensus(); + performHeapProfile = rtsFalse; + return rtsTrue; // true <=> we already GC'd + } +#endif + return rtsFalse; +} + +/* ----------------------------------------------------------------------------- + * Perform a garbage collection if necessary + * ASSUMES: sched_mutex + * -------------------------------------------------------------------------- */ + +static void +scheduleDoGC( rtsBool force_major ) +{ + StgTSO *t; +#ifdef SMP + Capability *cap; + static rtsBool waiting_for_gc; + int n_capabilities = RtsFlags.ParFlags.nNodes - 1; + // subtract one because we're already holding one. + Capability *caps[n_capabilities]; +#endif + +#ifdef SMP + // In order to GC, there must be no threads running Haskell code. + // Therefore, the GC thread needs to hold *all* the capabilities, + // and release them after the GC has completed. + // + // This seems to be the simplest way: previous attempts involved + // making all the threads with capabilities give up their + // capabilities and sleep except for the *last* one, which + // actually did the GC. But it's quite hard to arrange for all + // the other tasks to sleep and stay asleep. + // + // This does mean that there will be multiple entries in the + // thread->capability hash table for the current thread, but + // they will be removed as normal when the capabilities are + // released again. + // + + // Someone else is already trying to GC + if (waiting_for_gc) return; + waiting_for_gc = rtsTrue; + + while (n_capabilities > 0) { + IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities)); + waitForReturnCapability(&sched_mutex, &cap); + n_capabilities--; + caps[n_capabilities] = cap; + } + + waiting_for_gc = rtsFalse; +#endif + + /* Kick any transactions which are invalid back to their + * atomically frames. When next scheduled they will try to + * commit, this commit will fail and they will retry. + */ + { + StgTSO *next; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); + + // strip the stack back to the ATOMICALLY_FRAME, aborting + // the (nested) transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + raiseAsync_(t, NULL, rtsTrue); + +#ifdef REG_R1 + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); +#endif + } + } + } + } + } + + // so this happens periodically: + scheduleCheckBlackHoles(); + + IF_DEBUG(scheduler, printAllThreads()); + + /* everybody back, start the GC. + * Could do it in this thread, or signal a condition var + * to do it in another thread. Either way, we need to + * broadcast on gc_pending_cond afterward. + */ +#if defined(RTS_SUPPORTS_THREADS) + IF_DEBUG(scheduler,sched_belch("doing GC")); +#endif + GarbageCollect(GetRoots, force_major); + +#if defined(SMP) + { + // release our stash of capabilities. + nat i; + for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) { + releaseCapability(caps[i]); + } + } +#endif + +#if defined(GRAN) + /* add a ContinueThread event to continue execution of current thread */ + new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], + ContinueThread, + t, (StgClosure*)NULL, (rtsSpark*)NULL); + IF_GRAN_DEBUG(bq, + debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n"); + G_EVENTQ(0); + G_CURR_THREADQ(0)); +#endif /* GRAN */ +} + +/* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#if defined(RTS_SUPPORTS_THREADS) + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) +{ +#if defined(RTS_SUPPORTS_THREADS) + return (tso->main != NULL); +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ + +#ifndef mingw32_HOST_OS +#define FORKPROCESS_PRIMOP_SUPPORTED +#endif + +#ifdef FORKPROCESS_PRIMOP_SUPPORTED +static void +deleteThreadImmediately(StgTSO *tso); +#endif +StgInt +forkProcess(HsStablePtr *entry +#ifndef FORKPROCESS_PRIMOP_SUPPORTED + STG_UNUSED +#endif + ) +{ +#ifdef FORKPROCESS_PRIMOP_SUPPORTED + pid_t pid; + StgTSO* t,*next; + StgMainThread *m; + SchedulerStatus rc; + + IF_DEBUG(scheduler,sched_belch("forking!")); + rts_lock(); // This not only acquires sched_mutex, it also + // makes sure that no other threads are running + + pid = fork(); + + if (pid) { /* parent */ + + /* just return the pid */ + rts_unlock(); + return pid; + + } else { /* child */ + + + // delete all threads + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + // don't allow threads to catch the ThreadKilled exception + deleteThreadImmediately(t); + } + + // wipe the main thread list + while((m = main_threads) != NULL) { + main_threads = m->link; +# ifdef THREADED_RTS + closeCondition(&m->bound_thread_cond); +# endif + stgFree(m); + } + + rc = rts_evalStableIO(entry, NULL); // run the action + rts_checkSchedStatus("forkProcess",rc); + + rts_unlock(); + + hs_exit(); // clean up and exit + stg_exit(0); + } +#else /* !FORKPROCESS_PRIMOP_SUPPORTED */ + barf("forkProcess#: primop not supported, sorry!\n"); + return -1; +#endif +} + +/* --------------------------------------------------------------------------- + * deleteAllThreads(): kill all the live threads. + * + * This is used when we catch a user interrupt (^C), before performing + * any necessary cleanups and running finalizers. + * + * Locks: sched_mutex held. + * ------------------------------------------------------------------------- */ + +void +deleteAllThreads ( void ) +{ + StgTSO* t, *next; + IF_DEBUG(scheduler,sched_belch("deleting all threads")); + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(t); + } + } + + // The run queue now contains a bunch of ThreadKilled threads. We + // must not throw these away: the main thread(s) will be in there + // somewhere, and the main scheduler loop has to deal with it. + // Also, the run queue is the only thing keeping these threads from + // being GC'd, and we don't want the "main thread has been GC'd" panic. + + ASSERT(blocked_queue_hd == END_TSO_QUEUE); + ASSERT(blackhole_queue == END_TSO_QUEUE); + ASSERT(sleeping_queue == END_TSO_QUEUE); +} + +/* startThread and insertThread are now in GranSim.c -- HWL */ + + +/* --------------------------------------------------------------------------- + * Suspending & resuming Haskell threads. + * + * When making a "safe" call to C (aka _ccall_GC), the task gives back + * its capability before calling the C function. This allows another + * task to pick up the capability and carry on running Haskell + * threads. It also means that if the C call blocks, it won't lock + * the whole system. + * + * The Haskell thread making the C call is put to sleep for the + * duration of the call, on the susepended_ccalling_threads queue. We + * give out a token to the task, which it can use to resume the thread + * on return from the C function. + * ------------------------------------------------------------------------- */ + +StgInt +suspendThread( StgRegTable *reg ) +{ + nat tok; + Capability *cap; + int saved_errno = errno; + + /* assume that *reg is a pointer to the StgRegTable part + * of a Capability. + */ + cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable))); + + ACQUIRE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler, + sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); + + // XXX this might not be necessary --SDM + cap->r.rCurrentTSO->what_next = ThreadRunGHC; + + threadPaused(cap->r.rCurrentTSO); + cap->r.rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->r.rCurrentTSO; + + if(cap->r.rCurrentTSO->blocked_exceptions == NULL) { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; + cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE; + } else { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc; + } + + /* Use the thread ID as the token; it should be unique */ + tok = cap->r.rCurrentTSO->id; + + /* Hand back capability */ + cap->r.rInHaskell = rtsFalse; + releaseCapability(cap); + +#if defined(RTS_SUPPORTS_THREADS) + /* Preparing to leave the RTS, so ensure there's a native thread/task + waiting to take over. + */ + IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); +#endif + + RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; + return tok; +} + +StgRegTable * +resumeThread( StgInt tok ) +{ + StgTSO *tso, **prev; + Capability *cap; + int saved_errno = errno; + +#if defined(RTS_SUPPORTS_THREADS) + /* Wait for permission to re-enter the RTS with the result. */ + ACQUIRE_LOCK(&sched_mutex); + waitForReturnCapability(&sched_mutex, &cap); + + IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok)); +#else + grabCapability(&cap); +#endif + + /* Remove the thread off of the suspended list */ + prev = &suspended_ccalling_threads; + for (tso = suspended_ccalling_threads; + tso != END_TSO_QUEUE; + prev = &tso->link, tso = tso->link) { + if (tso->id == (StgThreadID)tok) { + *prev = tso->link; + break; + } + } + if (tso == END_TSO_QUEUE) { + barf("resumeThread: thread not found"); + } + tso->link = END_TSO_QUEUE; + + if(tso->why_blocked == BlockedOnCCall) { + awakenBlockedQueueNoLock(tso->blocked_exceptions); + tso->blocked_exceptions = NULL; + } + + /* Reset blocking status */ + tso->why_blocked = NotBlocked; + + cap->r.rCurrentTSO = tso; + cap->r.rInHaskell = rtsTrue; + RELEASE_LOCK(&sched_mutex); + errno = saved_errno; + return &cap->r; +} + +/* --------------------------------------------------------------------------- + * Comparing Thread ids. + * + * This is used from STG land in the implementation of the + * instances of Eq/Ord for ThreadIds. + * ------------------------------------------------------------------------ */ + +int +cmp_thread(StgPtr tso1, StgPtr tso2) +{ + StgThreadID id1 = ((StgTSO *)tso1)->id; + StgThreadID id2 = ((StgTSO *)tso2)->id; + + if (id1 < id2) return (-1); + if (id1 > id2) return 1; + return 0; +} + +/* --------------------------------------------------------------------------- + * Fetching the ThreadID from an StgTSO. + * + * This is used in the implementation of Show for ThreadIds. + * ------------------------------------------------------------------------ */ +int +rts_getThreadId(StgPtr tso) +{ + return ((StgTSO *)tso)->id; +} + +#ifdef DEBUG +void +labelThread(StgPtr tso, char *label) +{ + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); + strncpy(buf,label,len); + /* Update will free the old memory for us */ + updateThreadLabel(((StgTSO *)tso)->id,buf); +} +#endif /* DEBUG */ + +/* --------------------------------------------------------------------------- + Create a new thread. + + The new thread starts with the given stack size. Before the + scheduler can run, however, this thread needs to have a closure + (and possibly some arguments) pushed on its stack. See + pushClosure() in Schedule.h. + + createGenThread() and createIOThread() (in SchedAPI.h) are + convenient packaged versions of this function. + + currently pri (priority) is only used in a GRAN setup -- HWL + ------------------------------------------------------------------------ */ +#if defined(GRAN) +/* currently pri (priority) is only used in a GRAN setup -- HWL */ +StgTSO * +createThread(nat size, StgInt pri) +#else +StgTSO * +createThread(nat size) +#endif +{ + StgTSO *tso; + nat stack_size; + + /* First check whether we should create a thread at all */ +#if defined(PARALLEL_HASKELL) + /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ + if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) { + threadsIgnored++; + debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", + RtsFlags.ParFlags.maxThreads, advisory_thread_count); + return END_TSO_QUEUE; + } + threadsCreated++; +#endif + +#if defined(GRAN) + ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); +#endif + + // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW + + /* catch ridiculously small stack sizes */ + if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) { + size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; + } + + stack_size = size - TSO_STRUCT_SIZEW; + + tso = (StgTSO *)allocate(size); + TICK_ALLOC_TSO(stack_size, 0); + + SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); +#if defined(GRAN) + SET_GRAN_HDR(tso, ThisPE); +#endif + + // Always start with the compiled code evaluator + tso->what_next = ThreadRunGHC; + + tso->id = next_thread_id++; + tso->why_blocked = NotBlocked; + tso->blocked_exceptions = NULL; + + tso->saved_errno = 0; + tso->main = NULL; + + tso->stack_size = stack_size; + tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) + - TSO_STRUCT_SIZEW; + tso->sp = (P_)&(tso->stack) + stack_size; + + tso->trec = NO_TREC; + +#ifdef PROFILING + tso->prof.CCCS = CCS_MAIN; +#endif + + /* put a stop frame on the stack */ + tso->sp -= sizeofW(StgStopFrame); + SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); + tso->link = END_TSO_QUEUE; + + // ToDo: check this +#if defined(GRAN) + /* uses more flexible routine in GranSim */ + insertThread(tso, CurrentProc); +#else + /* In a non-GranSim setup the pushing of a TSO onto the runq is separated * from its creation */ #endif @@ -1838,8 +2543,8 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ -static void -scheduleThread_(StgTSO *tso) +void +scheduleThreadLocked(StgTSO *tso) { // The thread goes at the *end* of the run-queue, to avoid possible // starvation of any threads already on the queue. @@ -1851,7 +2556,7 @@ void scheduleThread(StgTSO* tso) { ACQUIRE_LOCK(&sched_mutex); - scheduleThread_(tso); + scheduleThreadLocked(tso); RELEASE_LOCK(&sched_mutex); } @@ -1992,9 +2697,62 @@ exitScheduler( void ) { interrupted = rtsTrue; shutting_down_scheduler = rtsTrue; + #if defined(RTS_SUPPORTS_THREADS) if (threadIsTask(osThreadId())) { taskStop(); } stopTaskManager(); + // + // What can we do here? There are a bunch of worker threads, it + // might be nice to let them exit cleanly. There may be some main + // threads in the run queue; we should let them return to their + // callers with an Interrupted state. We can't in general wait + // for all the running Tasks to stop, because some might be off in + // a C call that is blocked. + // + // Letting the run queue drain is the safest thing. That lets any + // main threads return that can return, and cleans up all the + // runnable threads. Then we grab all the Capabilities to stop + // anything unexpected happening while we shut down. + // + // ToDo: this doesn't let us get the time stats from the worker + // tasks, because they haven't called taskStop(). + // + ACQUIRE_LOCK(&sched_mutex); + { + nat i; + for (i = 1000; i > 0; i--) { + if (EMPTY_RUN_QUEUE()) { + IF_DEBUG(scheduler, sched_belch("run queue is empty")); + break; + } + IF_DEBUG(scheduler, sched_belch("yielding")); + RELEASE_LOCK(&sched_mutex); + prodWorker(); + yieldThread(); + ACQUIRE_LOCK(&sched_mutex); + } + } + +#ifdef SMP + { + Capability *cap; + int n_capabilities = RtsFlags.ParFlags.nNodes; + Capability *caps[n_capabilities]; + nat i; + + while (n_capabilities > 0) { + IF_DEBUG(scheduler, sched_belch("exitScheduler: grabbing all the capabilies (%d left)", n_capabilities)); + waitForReturnCapability(&sched_mutex, &cap); + n_capabilities--; + caps[n_capabilities] = cap; + } + } +#else + { + Capability *cap; + waitForReturnCapability(&sched_mutex, &cap); + } +#endif #endif } @@ -2537,6 +3295,8 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) void awakenBlockedQueueNoLock(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); } @@ -2545,6 +3305,8 @@ awakenBlockedQueueNoLock(StgTSO *tso) void awakenBlockedQueue(StgTSO *tso) { + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm ACQUIRE_LOCK(&sched_mutex); while (tso != END_TSO_QUEUE) { tso = unblockOneLocked(tso); @@ -2903,6 +3665,7 @@ checkBlackHoles( void ) ASSERT(t->why_blocked == BlockedOnBlackHole); type = get_itbl(t->block_info.closure)->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) { + IF_DEBUG(sanity,checkTSO(t)); t = unblockOneLocked(t); *prev = t; any_woke_up = rtsTrue; @@ -3104,7 +3867,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgThunk *)allocate(sizeofW(StgThunk)+1); + raise = (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -3441,7 +4204,159 @@ printThreadStatus(StgTSO *tso) { switch (tso->what_next) { case ThreadKilled: - debugBelch("has + debugBelch("has been killed"); + break; + case ThreadComplete: + debugBelch("has completed"); + break; + default: + printThreadBlockage(tso); + } +} + +void +printAllThreads(void) +{ + StgTSO *t; + +# if defined(GRAN) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(TIME_ON_PROC(CurrentProc), + time_string, rtsFalse/*no commas!*/); + + debugBelch("all threads at [%s]:\n", time_string); +# elif defined(PARALLEL_HASKELL) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(CURRENT_TIME, + time_string, rtsFalse/*no commas!*/); + + debugBelch("all threads at [%s]:\n", time_string); +# else + debugBelch("all threads:\n"); +# endif + + for (t = all_threads; t != END_TSO_QUEUE; ) { + debugBelch("\tthread %4d @ %p ", t->id, (void *)t); +#if defined(DEBUG) + { + void *label = lookupThreadLabel(t->id); + if (label) debugBelch("[\"%s\"] ",(char *)label); + } +#endif + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + t = t->link; + } else { + printThreadStatus(t); + debugBelch("\n"); + t = t->global_link; + } + } +} + +#ifdef DEBUG + +// useful from gdb +void +printThreadQueue(StgTSO *t) +{ + nat i = 0; + for (; t != END_TSO_QUEUE; t = t->link) { + debugBelch("\tthread %d @ %p ", t->id, (void *)t); + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + } else { + printThreadStatus(t); + debugBelch("\n"); + } + i++; + } + debugBelch("%d threads on queue\n", i); +} + +/* + Print a whole blocking queue attached to node (debugging only). +*/ +# if defined(PARALLEL_HASKELL) +void +print_bq (StgClosure *node) +{ + StgBlockingQueueElement *bqe; + StgTSO *tso; + rtsBool end; + + debugBelch("## BQ of closure %p (%s): ", + node, info_type(node)); + + /* should cover all closures that may have a blocking queue */ + ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || + get_itbl(node)->type == FETCH_ME_BQ || + get_itbl(node)->type == RBH || + get_itbl(node)->type == MVAR); + + ASSERT(node!=(StgClosure*)NULL); // sanity check + + print_bqe(((StgBlockingQueue*)node)->blocking_queue); +} + +/* + Print a whole blocking queue starting with the element bqe. +*/ +void +print_bqe (StgBlockingQueueElement *bqe) +{ + rtsBool end; + + /* + NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; + */ + for (end = (bqe==END_BQ_QUEUE); + !end; // iterate until bqe points to a CONSTR + end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), + bqe = end ? END_BQ_QUEUE : bqe->link) { + ASSERT(bqe != END_BQ_QUEUE); // sanity check + ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check + /* types of closures that may appear in a blocking queue */ + ASSERT(get_itbl(bqe)->type == TSO || + get_itbl(bqe)->type == BLOCKED_FETCH || + get_itbl(bqe)->type == CONSTR); + /* only BQs of an RBH end with an RBH_Save closure */ + //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); + + switch (get_itbl(bqe)->type) { + case TSO: + debugBelch(" TSO %u (%x),", + ((StgTSO *)bqe)->id, ((StgTSO *)bqe)); + break; + case BLOCKED_FETCH: + debugBelch(" BF (node=%p, ga=((%x, %d, %x)),", + ((StgBlockedFetch *)bqe)->node, + ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid, + ((StgBlockedFetch *)bqe)->ga.payload.gc.slot, + ((StgBlockedFetch *)bqe)->ga.weight); + break; + case CONSTR: + debugBelch(" %s (IP %p),", + (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : + get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : + get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : + "RBH_Save_?"), get_itbl(bqe)); + break; + default: + barf("Unexpected closure type %s in blocking queue", // of %p (%s)", + info_type((StgClosure *)bqe)); // , node, info_type(node)); + break; + } + } /* for */ + debugBelch("\n"); +} +# elif defined(GRAN) +void +print_bq (StgClosure *node) +{ + StgBlockingQueueElement *bqe; + PEs node_loc, tso_loc; + rtsBool end; /* should cover all closures that may have a blocking queue */ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||