X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;ds=sidebyside;f=ghc%2Frts%2FSchedule.c;h=33db7e685fc32811ca4309fb9aa2ad4c6c47a33b;hb=324e96d2ebfcb113cd97c43ef043d591ef87de71;hp=1a3843a50874c43a5445e9339675594bd8b1ab3d;hpb=6e2ea06c4a72866396f1b754ec8c2091a9b1e20b;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 1a3843a..33db7e6 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.131 2002/02/18 13:26:13 sof Exp $ + * $Id: Schedule.c,v 1.176 2003/10/01 10:49:08 wolfgang Exp $ * * (c) The GHC Team, 1998-2000 * @@ -84,18 +84,19 @@ #include "StgRun.h" #include "StgStartup.h" #include "Hooks.h" +#define COMPILING_SCHEDULER #include "Schedule.h" #include "StgMiscClosures.h" #include "Storage.h" #include "Interpreter.h" #include "Exception.h" #include "Printer.h" -#include "Main.h" #include "Signals.h" #include "Sanity.h" #include "Stats.h" -#include "Itimer.h" +#include "Timer.h" #include "Prelude.h" +#include "ThreadLabels.h" #ifdef PROFILING #include "Proftimer.h" #include "ProfHeap.h" @@ -114,40 +115,40 @@ #include "OSThreads.h" #include "Task.h" +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif + +#include +#include #include -//@node Variables and Data structures, Prototypes, Includes, Main scheduling code -//@subsection Variables and Data structures +#ifdef HAVE_ERRNO_H +#include +#endif -/* Main threads: - * - * These are the threads which clients have requested that we run. - * - * In a 'threaded' build, we might have several concurrent clients all - * waiting for results, and each one will wait on a condition variable - * until the result is available. - * - * In non-SMP, clients are strictly nested: the first client calls - * into the RTS, which might call out again to C with a _ccall_GC, and - * eventually re-enter the RTS. - * - * Main threads information is kept in a linked list: - */ -//@cindex StgMainThread -typedef struct StgMainThread_ { - StgTSO * tso; - SchedulerStatus stat; - StgClosure ** ret; -#if defined(RTS_SUPPORTS_THREADS) - Condition wakeup; +#ifdef THREADED_RTS +#define USED_IN_THREADED_RTS +#else +#define USED_IN_THREADED_RTS STG_UNUSED +#endif + +#ifdef RTS_SUPPORTS_THREADS +#define USED_WHEN_RTS_SUPPORTS_THREADS +#else +#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED #endif - struct StgMainThread_ *link; -} StgMainThread; + +//@node Variables and Data structures, Prototypes, Includes, Main scheduling code +//@subsection Variables and Data structures /* Main thread queue. * Locks required: sched_mutex. */ -static StgMainThread *main_threads; +StgMainThread *main_threads = NULL; /* Thread queues. * Locks required: sched_mutex. @@ -174,16 +175,18 @@ StgTSO *ccalling_threadss[MAX_PROC]; #else /* !GRAN */ -StgTSO *run_queue_hd, *run_queue_tl; -StgTSO *blocked_queue_hd, *blocked_queue_tl; -StgTSO *sleeping_queue; /* perhaps replace with a hash table? */ +StgTSO *run_queue_hd = NULL; +StgTSO *run_queue_tl = NULL; +StgTSO *blocked_queue_hd = NULL; +StgTSO *blocked_queue_tl = NULL; +StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */ #endif /* Linked list of all threads. * Used for detecting garbage collected threads. */ -StgTSO *all_threads; +StgTSO *all_threads = NULL; /* When a thread performs a safe C call (_ccall_GC, using old * terminology), it gets put on the suspended_ccalling_threads @@ -200,17 +203,17 @@ static StgTSO *threadStackOverflow(StgTSO *tso); /* flag set by signal handler to precipitate a context switch */ //@cindex context_switch -nat context_switch; +nat context_switch = 0; /* if this flag is set as well, give up execution */ //@cindex interrupted -rtsBool interrupted; +rtsBool interrupted = rtsFalse; /* Next thread ID to allocate. - * Locks required: sched_mutex + * Locks required: thread_id_mutex */ //@cindex next_thread_id -StgThreadID next_thread_id = 1; +static StgThreadID next_thread_id = 1; /* * Pointers to the state of the current thread. @@ -221,14 +224,15 @@ StgThreadID next_thread_id = 1; /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) - * + 1 (the realworld token for an IO thread) * + 1 (the closure to enter) + * + 1 (stg_ap_v_ret) + * + 1 (spare slot req'd by stg_ap_v_ret) * * A thread with this stack will bomb immediately with a stack * overflow, which will increase its stack size. */ -#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) +#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3) #if defined(GRAN) @@ -241,17 +245,19 @@ StgTSO *CurrentTSO; */ StgTSO dummy_tso; -rtsBool ready_to_gc; +static rtsBool ready_to_gc; + +/* + * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) -- + * in an MT setting, needed to signal that a worker thread shouldn't hang around + * in the scheduler when it is out of work. + */ +static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); -static void schedule ( void ); +static void schedule ( StgMainThread *mainThread, Capability *initialCapability ); void interruptStgRts ( void ); -#if defined(GRAN) -static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri ); -#else -static StgTSO * createThread_ ( nat size, rtsBool have_lock ); -#endif static void detectBlackHoles ( void ); @@ -266,6 +272,13 @@ static void sched_belch(char *s, ...); Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; +/* + * A heavyweight solution to the problem of protecting + * the thread_id from concurrent update. + */ +Mutex thread_id_mutex = INIT_MUTEX_VAR; + + # if defined(SMP) static Condition gc_pending_cond = INIT_COND_VAR; nat await_death; @@ -280,21 +293,13 @@ rtsBool emitSchedule = rtsTrue; #endif #if DEBUG -char *whatNext_strs[] = { - "ThreadEnterGHC", +static char *whatNext_strs[] = { "ThreadRunGHC", - "ThreadEnterInterp", + "ThreadInterpret", "ThreadKilled", + "ThreadRelocated", "ThreadComplete" }; - -char *threadReturnCode_strs[] = { - "HeapOverflow", /* might also be StackOverflow */ - "StackOverflow", - "ThreadYielding", - "ThreadBlocked", - "ThreadFinished" -}; #endif #if defined(PAR) @@ -308,17 +313,40 @@ StgTSO * activateSpark (rtsSpark spark); StgTSO *MainTSO; */ -#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) +static rtsBool startingWorkerThread = rtsFalse; + static void taskStart(void); static void taskStart(void) { - schedule(); + Capability *cap; + + ACQUIRE_LOCK(&sched_mutex); + startingWorkerThread = rtsFalse; + waitForWorkCapability(&sched_mutex, &cap, NULL); + RELEASE_LOCK(&sched_mutex); + + schedule(NULL,cap); } -#endif - - +void +startSchedulerTaskIfNecessary(void) +{ + if(run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE) + { + if(!startingWorkerThread) + { // we don't want to start another worker thread + // just because the last one hasn't yet reached the + // "waiting for capability" state + startingWorkerThread = rtsTrue; + startTask(taskStart); + } + } +} +#endif //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code //@subsection Main scheduling loop @@ -360,10 +388,11 @@ taskStart(void) ------------------------------------------------------------------------ */ //@cindex schedule static void -schedule( void ) +schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, + Capability *initialCapability ) { StgTSO *t; - Capability *cap; + Capability *cap = initialCapability; StgThreadReturnCode ret; #if defined(GRAN) rtsEvent *event; @@ -378,16 +407,24 @@ schedule( void ) # endif #endif rtsBool was_interrupted = rtsFalse; + StgTSOWhatNext prev_what_next; ACQUIRE_LOCK(&sched_mutex); #if defined(RTS_SUPPORTS_THREADS) - /* Check to see whether there are any worker threads - waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, cap); + /* in the threaded case, the capability is either passed in via the initialCapability + parameter, or initialized inside the scheduler loop */ - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); + IF_DEBUG(scheduler, + fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n", + osThreadId(), osThreadId())); + IF_DEBUG(scheduler, + fprintf(stderr,"### main thread: %p\n",mainThread)); + IF_DEBUG(scheduler, + fprintf(stderr,"### initial cap: %p\n",initialCapability)); +#else + /* simply initialise it in the non-threaded case */ + grabCapability(&cap); #endif #if defined(GRAN) @@ -425,15 +462,42 @@ schedule( void ) IF_DEBUG(scheduler, printAllThreads()); +#if defined(RTS_SUPPORTS_THREADS) + /* Check to see whether there are any worker threads + waiting to deposit external call results. If so, + yield our capability... if we have a capability, that is. */ + if(cap) + yieldToReturningWorker(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + + /* If we do not currently hold a capability, we wait for one */ + if(!cap) + { + waitForWorkCapability(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap", + osThreadId())); + } +#endif + /* If we're interrupted (the user pressed ^C, or some other * termination condition occurred), kill all the currently running * threads. */ if (interrupted) { IF_DEBUG(scheduler, sched_belch("interrupted")); - deleteAllThreads(); interrupted = rtsFalse; was_interrupted = rtsTrue; +#if defined(RTS_SUPPORTS_THREADS) + // In the threaded RTS, deadlock detection doesn't work, + // so just exit right away. + prog_belch("interrupted"); + releaseCapability(cap); + RELEASE_LOCK(&sched_mutex); + shutdownHaskellAndExit(EXIT_SUCCESS); +#else + deleteAllThreads(); +#endif } /* Go through the list of main threads and wake up any @@ -443,34 +507,63 @@ schedule( void ) */ #if defined(RTS_SUPPORTS_THREADS) { - StgMainThread *m, **prev; - prev = &main_threads; - for (m = main_threads; m != NULL; m = m->link) { - switch (m->tso->what_next) { - case ThreadComplete: - if (m->ret) { - *(m->ret) = (StgClosure *)m->tso->sp[0]; - } - *prev = m->link; - m->stat = Success; - broadcastCondition(&m->wakeup); - break; - case ThreadKilled: - if (m->ret) *(m->ret) = NULL; - *prev = m->link; - if (was_interrupted) { - m->stat = Interrupted; - } else { - m->stat = Killed; - } - broadcastCondition(&m->wakeup); - break; - default: - break; + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { + if (m->tso->what_next == ThreadComplete + || m->tso->what_next == ThreadKilled) + { + if(m == mainThread) + { + if(m->tso->what_next == ThreadComplete) + { + if (m->ret) + { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(m->ret) = (StgClosure *)m->tso->sp[1]; + } + m->stat = Success; + } + else + { + if (m->ret) + { + *(m->ret) = NULL; + } + if (was_interrupted) + { + m->stat = Interrupted; + } + else + { + m->stat = Killed; + } + } + *prev = m->link; + +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif + releaseCapability(cap); + RELEASE_LOCK(&sched_mutex); + return; + } + else + { + // The current OS thread can not handle the fact that the Haskell + // thread "m" has ended. + // "m" is bound; the scheduler loop in it's bound OS thread has + // to return, so let's pass our capability directly to that thread. + passCapability(&sched_mutex, cap, &m->bound_thread_cond); + cap = NULL; + } + } } - } } - + + if(!cap) // If we gave our capability away, + continue; // go to the top to get it back + #else /* not threaded */ # if defined(PAR) @@ -483,12 +576,16 @@ schedule( void ) StgMainThread *m = main_threads; if (m->tso->what_next == ThreadComplete || m->tso->what_next == ThreadKilled) { +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif main_threads = main_threads->link; if (m->tso->what_next == ThreadComplete) { - /* we finished successfully, fill in the return value */ - if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; }; - m->stat = Success; - return; + // We finished successfully, fill in the return value + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; }; + m->stat = Success; + return; } else { if (m->ret) { *(m->ret) = NULL; }; if (was_interrupted) { @@ -545,19 +642,24 @@ schedule( void ) #endif // SMP /* check for signals each time around the scheduler */ -#ifndef mingw32_TARGET_OS +#if defined(RTS_USER_SIGNALS) if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); } #endif /* Check whether any waiting threads need to be woken up. If the * run queue is empty, and there are no other tasks running, we * can wait indefinitely for something to happen. - * ToDo: what if another client comes along & requests another - * main thread? */ - if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) { + if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) +#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) + || EMPTY_RUN_QUEUE() +#endif + ) + { awaitEvent( EMPTY_RUN_QUEUE() #if defined(SMP) && allFreeCapabilities() @@ -578,10 +680,8 @@ schedule( void ) * If no threads are black holed, we have a deadlock situation, so * inform all the main threads. */ -#ifndef PAR - if ( EMPTY_RUN_QUEUE() - && EMPTY_QUEUE(blocked_queue_hd) - && EMPTY_QUEUE(sleeping_queue) +#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS) + if ( EMPTY_THREAD_QUEUES() #if defined(RTS_SUPPORTS_THREADS) && EMPTY_QUEUE(suspended_ccalling_threads) #endif @@ -595,37 +695,65 @@ schedule( void ) /* and SMP mode ..? */ releaseCapability(cap); #endif + // Garbage collection can release some new threads due to + // either (a) finalizers or (b) threads resurrected because + // they are about to be send BlockedOnDeadMVar. Any threads + // thus released will be immediately runnable. GarbageCollect(GetRoots,rtsTrue); - if ( EMPTY_QUEUE(blocked_queue_hd) - && EMPTY_RUN_QUEUE() - && EMPTY_QUEUE(sleeping_queue) ) { - IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes...")); - detectBlackHoles(); + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } - /* No black holes, so probably a real deadlock. Send the - * current main thread the Deadlock exception (or in the SMP - * build, send *all* main threads the deadlock exception, - * since none of them can make progress). - */ - if ( EMPTY_RUN_QUEUE() ) { - StgMainThread *m; + IF_DEBUG(scheduler, + sched_belch("still deadlocked, checking for black holes...")); + detectBlackHoles(); + + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } + +#if defined(RTS_USER_SIGNALS) + /* If we have user-installed signal handlers, then wait + * for signals to arrive rather then bombing out with a + * deadlock. + */ #if defined(RTS_SUPPORTS_THREADS) - for (m = main_threads; m != NULL; m = m->link) { - switch (m->tso->why_blocked) { - case BlockedOnBlackHole: - raiseAsync(m->tso, (StgClosure *)NonTermination_closure); - break; - case BlockedOnException: - case BlockedOnMVar: - raiseAsync(m->tso, (StgClosure *)Deadlock_closure); - break; - default: - barf("deadlock: main thread blocked in a strange way"); - } - } + if ( 0 ) { /* hmm..what to do? Simply stop waiting for + a signal with no runnable threads (or I/O + suspended ones) leads nowhere quick. + For now, simply shut down when we reach this + condition. + + ToDo: define precisely under what conditions + the Scheduler should shut down in an MT setting. + */ #else - m = main_threads; + if ( anyUserHandlers() ) { +#endif + IF_DEBUG(scheduler, + sched_belch("still deadlocked, waiting for signals...")); + + awaitUserSignals(); + + // we might be interrupted... + if (interrupted) { continue; } + + if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); + startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); + } + ASSERT(!EMPTY_RUN_QUEUE()); + goto not_deadlocked; + } +#endif + + /* Probably a real deadlock. Send the current main thread the + * Deadlock exception (or in the SMP build, send *all* main + * threads the deadlock exception, since none of them can make + * progress). + */ + { + StgMainThread *m; +#if defined(RTS_SUPPORTS_THREADS) + for (m = main_threads; m != NULL; m = m->link) { switch (m->tso->why_blocked) { case BlockedOnBlackHole: raiseAsync(m->tso, (StgClosure *)NonTermination_closure); @@ -637,19 +765,36 @@ schedule( void ) default: barf("deadlock: main thread blocked in a strange way"); } -#endif } -#if defined(RTS_SUPPORTS_THREADS) - /* ToDo: revisit conditions (and mechanism) for shutting - down a multi-threaded world */ - if ( EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); - shutdownHaskellAndExit(0); +#else + m = main_threads; + switch (m->tso->why_blocked) { + case BlockedOnBlackHole: + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + break; + case BlockedOnException: + case BlockedOnMVar: + raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + break; + default: + barf("deadlock: main thread blocked in a strange way"); } #endif - ASSERT( !EMPTY_RUN_QUEUE() ); } + +#if defined(RTS_SUPPORTS_THREADS) + /* ToDo: revisit conditions (and mechanism) for shutting + down a multi-threaded world */ + IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); + RELEASE_LOCK(&sched_mutex); + shutdownHaskell(); + return; +#endif } + not_deadlocked: + +#elif defined(RTS_SUPPORTS_THREADS) + /* ToDo: add deadlock detection in threaded RTS */ #elif defined(PAR) /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ #endif @@ -665,6 +810,7 @@ schedule( void ) #endif #if defined(RTS_SUPPORTS_THREADS) +#if defined(SMP) /* block until we've got a thread on the run queue and a free * capability. * @@ -672,17 +818,24 @@ schedule( void ) if ( EMPTY_RUN_QUEUE() ) { /* Give up our capability */ releaseCapability(cap); + + /* If we're in the process of shutting down (& running the + * a batch of finalisers), don't wait around. + */ + if ( shutting_down_scheduler ) { + RELEASE_LOCK(&sched_mutex); + return; + } IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); waitForWorkCapability(&sched_mutex, &cap, rtsTrue); IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); -#if 0 - while ( EMPTY_RUN_QUEUE() ) { - waitForWorkCapability(&sched_mutex, &cap); - IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); - } -#endif + } +#else + if ( EMPTY_RUN_QUEUE() ) { + continue; // nothing to do } #endif +#endif #if defined(GRAN) if (RtsFlags.GranFlags.Light) @@ -965,19 +1118,71 @@ schedule( void ) // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); #endif - - grabCapability(&cap); + +#ifdef THREADED_RTS + { + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == t) + break; + } + + if(m) + { + if(m == mainThread) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in bound OS thread %u\n", + t, osThreadId())); + // yes, the Haskell thread is bound to the current native thread + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p bound to other OS thread than %u\n", + t, osThreadId())); + // no, bound to a different Haskell thread: pass to that thread + PUSH_ON_RUN_QUEUE(t); + passCapability(&sched_mutex,cap,&m->bound_thread_cond); + cap = NULL; + continue; + } + } + else + { + // The thread we want to run is not bound. + if(mainThread == NULL) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in worker OS thread %u\n", + t, osThreadId())); + // if we are a worker thread, + // we may run it here + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n", + t, mainThread, osThreadId())); + // no, the current native thread is bound to a different + // Haskell thread, so pass it to any worker thread + PUSH_ON_RUN_QUEUE(t); + passCapabilityToWorker(&sched_mutex, cap); + cap = NULL; + continue; + } + } + } +#endif + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 */ - if ( -#ifdef PROFILING - RtsFlags.ProfFlags.profileInterval == 0 || -#endif - (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 + if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0 && (run_queue_hd != END_TSO_QUEUE || blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE))) @@ -985,10 +1190,12 @@ schedule( void ) else context_switch = 0; +run_thread: + RELEASE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", - t->id, t, whatNext_strs[t->what_next])); + IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", + t->id, whatNext_strs[t->what_next])); #ifdef PROFILING startHeapProfTimer(); @@ -997,19 +1204,19 @@ schedule( void ) /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ /* Run the current thread */ - switch (cap->r.rCurrentTSO->what_next) { + prev_what_next = t->what_next; + switch (prev_what_next) { case ThreadKilled: case ThreadComplete: /* Thread already finished, return to scheduler. */ ret = ThreadFinished; break; - case ThreadEnterGHC: - ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r); - break; case ThreadRunGHC: + errno = t->saved_errno; ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); + t->saved_errno = errno; break; - case ThreadEnterInterp: + case ThreadInterpret: ret = interpretBCO(cap); break; default: @@ -1024,9 +1231,9 @@ schedule( void ) #endif ACQUIRE_LOCK(&sched_mutex); - -#ifdef SMP - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); + +#ifdef RTS_SUPPORTS_THREADS + IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); #endif @@ -1057,9 +1264,8 @@ schedule( void ) blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE; - IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", - t->id, t, - whatNext_strs[t->what_next], blocks)); + IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", + t->id, whatNext_strs[t->what_next], blocks)); // don't do this if it would push us over the // alloc_blocks_lim limit; we'll GC first. @@ -1080,14 +1286,26 @@ schedule( void ) } cap->r.rCurrentNursery->u.back = bd; - // initialise it as a nursery block - bd->step = g0s0; - bd->gen_no = 0; - bd->flags = 0; - bd->free = bd->start; + // initialise it as a nursery block. We initialise the + // step, gen_no, and flags field of *every* sub-block in + // this large block, because this is easier than making + // sure that we always find the block head of a large + // block whenever we call Bdescr() (eg. evacuate() and + // isAlive() in the GC would both have to do this, at + // least). + { + bdescr *x; + for (x = bd; x < bd + blocks; x++) { + x->step = g0s0; + x->gen_no = 0; + x->flags = 0; + } + } // don't forget to update the block count in g0s0. g0s0->n_blocks += blocks; + // This assert can be a killer if the app is doing lots + // of large block allocations. ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); // now update the nursery to point to the new block @@ -1106,8 +1324,8 @@ schedule( void ) * maybe set context_switch and wait till they all pile in, * then have them wait on a GC condition variable. */ - IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", - t->id, t, whatNext_strs[t->what_next])); + IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", + t->id, whatNext_strs[t->what_next])); threadPaused(t); #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); @@ -1138,8 +1356,8 @@ schedule( void ) // DumpGranEvent(GR_DESCHEDULE, t); globalParStats.tot_stackover++; #endif - IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", - t->id, t, whatNext_strs[t->what_next])); + IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", + t->id, whatNext_strs[t->what_next])); /* just adjust the stack for this thread, then pop it back * on the run queue. */ @@ -1179,24 +1397,29 @@ schedule( void ) * GC is finished. */ IF_DEBUG(scheduler, - if (t->what_next == ThreadEnterInterp) { - /* ToDo: or maybe a timer expired when we were in Hugs? - * or maybe someone hit ctrl-C - */ - belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", - t->id, t, whatNext_strs[t->what_next]); + if (t->what_next != prev_what_next) { + belch("--<< thread %ld (%s) stopped to switch evaluators", + t->id, whatNext_strs[t->what_next]); } else { - belch("--<< thread %ld (%p; %s) stopped, yielding", - t->id, t, whatNext_strs[t->what_next]); + belch("--<< thread %ld (%s) stopped, yielding", + t->id, whatNext_strs[t->what_next]); } ); - threadPaused(t); - IF_DEBUG(sanity, //belch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); ASSERT(t->link == END_TSO_QUEUE); + + // Shortcut if we're just switching evaluators: don't bother + // doing stack squeezing (which can be expensive), just run the + // thread. + if (t->what_next != prev_what_next) { + goto run_thread; + } + + threadPaused(t); + #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); @@ -1204,6 +1427,7 @@ schedule( void ) //belch("&& Doing sanity check on all ThreadQueues (and their TSOs)."); checkThreadQsSanity(rtsTrue)); #endif + #if defined(PAR) if (RtsFlags.ParFlags.doFairScheduling) { /* this does round-robin scheduling; good for concurrency */ @@ -1213,9 +1437,10 @@ schedule( void ) PUSH_ON_RUN_QUEUE(t); } #else - /* this does round-robin scheduling; good for concurrency */ + // this does round-robin scheduling; good for concurrency APPEND_TO_RUN_QUEUE(t); #endif + #if defined(GRAN) /* add a ContinueThread event to actually process the thread */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -1227,7 +1452,7 @@ schedule( void ) G_CURR_THREADQ(0)); #endif /* GRAN */ break; - + case ThreadBlocked: #if defined(GRAN) IF_DEBUG(scheduler, @@ -1271,7 +1496,8 @@ schedule( void ) * case it'll be on the relevant queue already. */ IF_DEBUG(scheduler, - fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t); + fprintf(stderr, "--<< thread %d (%s) stopped: ", + t->id, whatNext_strs[t->what_next]); printThreadBlockage(t); fprintf(stderr, "\n")); @@ -1292,7 +1518,8 @@ schedule( void ) /* We also end up here if the thread kills itself with an * uncaught exception, see Exception.hc. */ - IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t)); + IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", + t->id, whatNext_strs[t->what_next])); #if defined(GRAN) endThread(t, CurrentProc); // clean-up the thread #elif defined(PAR) @@ -1314,14 +1541,13 @@ schedule( void ) default: barf("schedule: invalid thread return code %d", (int)ret); } - -#if defined(RTS_SUPPORTS_THREADS) - /* I don't understand what this re-grab is doing -- sof */ - grabCapability(&cap); -#endif #ifdef PROFILING - if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) { + // When we have +RTS -i0 and we're heap profiling, do a census at + // every GC. This lets us get repeatable runs for debugging. + if (performHeapProfile || + (RtsFlags.ProfFlags.profileInterval==0 && + RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) { GarbageCollect(GetRoots, rtsTrue); heapCensus(); performHeapProfile = rtsFalse; @@ -1377,28 +1603,172 @@ schedule( void ) } /* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#ifdef THREADED_RTS + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) +{ +#ifdef THREADED_RTS + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == tso) + return rtsTrue; + } +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS +static void +deleteThreadImmediately(StgTSO *tso); +#endif + +StgInt +forkProcess(StgTSO* tso) +{ +#ifndef mingw32_TARGET_OS + pid_t pid; + StgTSO* t,*next; + + IF_DEBUG(scheduler,sched_belch("forking!")); + ACQUIRE_LOCK(&sched_mutex); + + pid = fork(); + if (pid) { /* parent */ + + /* just return the pid */ + + } else { /* child */ +#ifdef THREADED_RTS + /* wipe all other threads */ + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + tso->link = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + /* Don't kill the current thread.. */ + if (t->id == tso->id) { + continue; + } + + if (isThreadBound(t)) { + // If the thread is bound, the OS thread that the thread is bound to + // no longer exists after the fork() system call. + // The bound Haskell thread is therefore unable to run at all; + // we must not give it a chance to survive by catching the + // ThreadKilled exception. So we kill it "brutally" rather than + // using deleteThread. + deleteThreadImmediately(t); + } else { + deleteThread(t); + } + } + + if (isThreadBound(tso)) { + } else { + // If the current is not bound, then we should make it so. + // The OS thread left over by fork() is special in that the process + // will terminate as soon as the thread terminates; + // we'd expect forkProcess to behave similarily. + // FIXME - we don't do this. + } +#else + StgMainThread *m; + rtsBool doKill; + /* wipe all other threads */ + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + tso->link = END_TSO_QUEUE; + + /* When clearing out the threads, we need to ensure + that a 'main thread' is left behind; if there isn't, + the Scheduler will shutdown next time it is entered. + + ==> we don't kill a thread that's on the main_threads + list (nor the current thread.) + + [ Attempts at implementing the more ambitious scheme of + killing the main_threads also, and then adding the + current thread onto the main_threads list if it wasn't + there already, failed -- waitThread() (for one) wasn't + up to it. If it proves to be desirable to also kill + the main threads, then this scheme will have to be + revisited (and fully debugged!) + + -- sof 7/2002 + ] + */ + /* DO NOT TOUCH THE QUEUES directly because most of the code around + us is picky about finding the thread still in its queue when + handling the deleteThread() */ + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + /* Don't kill the current thread.. */ + if (t->id == tso->id) continue; + doKill=rtsTrue; + /* ..or a main thread */ + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso->id == t->id) { + doKill=rtsFalse; + break; + } + } + if (doKill) { + deleteThread(t); + } + } +#endif + } + RELEASE_LOCK(&sched_mutex); + return pid; +#else /* mingw32 */ + barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); + /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */ + return -1; +#endif /* mingw32 */ +} + +/* --------------------------------------------------------------------------- * deleteAllThreads(): kill all the live threads. * * This is used when we catch a user interrupt (^C), before performing * any necessary cleanups and running finalizers. + * + * Locks: sched_mutex held. * ------------------------------------------------------------------------- */ -void deleteAllThreads ( void ) +void +deleteAllThreads ( void ) { StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); - for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) { - next = t->link; + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->global_link; deleteThread(t); - } - for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) { - next = t->link; - deleteThread(t); - } - for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) { - next = t->link; - deleteThread(t); - } + } run_queue_hd = run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; @@ -1426,10 +1796,16 @@ void deleteAllThreads ( void ) * ------------------------------------------------------------------------- */ StgInt -suspendThread( StgRegTable *reg, rtsBool concCall ) +suspendThread( StgRegTable *reg, + rtsBool concCall +#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG) + STG_UNUSED +#endif + ) { nat tok; Capability *cap; + int saved_errno = errno; /* assume that *reg is a pointer to the StgRegTable part * of a Capability. @@ -1439,14 +1815,25 @@ suspendThread( StgRegTable *reg, rtsBool concCall ) ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); + sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall)); + + // XXX this might not be necessary --SDM + cap->r.rCurrentTSO->what_next = ThreadRunGHC; threadPaused(cap->r.rCurrentTSO); cap->r.rCurrentTSO->link = suspended_ccalling_threads; suspended_ccalling_threads = cap->r.rCurrentTSO; #if defined(RTS_SUPPORTS_THREADS) - cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; + if(cap->r.rCurrentTSO->blocked_exceptions == NULL) + { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; + cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE; + } + else + { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc; + } #endif /* Use the thread ID as the token; it should be unique */ @@ -1458,36 +1845,32 @@ suspendThread( StgRegTable *reg, rtsBool concCall ) #if defined(RTS_SUPPORTS_THREADS) /* Preparing to leave the RTS, so ensure there's a native thread/task waiting to take over. - - ToDo: optimise this and only create a new task if there's a need - for one (i.e., if there's only one Concurrent Haskell thread alive, - there's no need to create a new task). */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok)); - if (concCall) { - startTask(taskStart); - } + IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId())); #endif /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; return tok; } StgRegTable * -resumeThread( StgInt tok, rtsBool concCall ) +resumeThread( StgInt tok, + rtsBool concCall STG_UNUSED ) { StgTSO *tso, **prev; Capability *cap; + int saved_errno = errno; #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ - if ( concCall ) { - grabReturnCapability(&sched_mutex, &cap); - } else { - grabCapability(&cap); - } + ACQUIRE_LOCK(&sched_mutex); + grabReturnCapability(&sched_mutex, &cap); + + IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId())); #else grabCapability(&cap); #endif @@ -1506,12 +1889,23 @@ resumeThread( StgInt tok, rtsBool concCall ) barf("resumeThread: thread not found"); } tso->link = END_TSO_QUEUE; + +#if defined(RTS_SUPPORTS_THREADS) + if(tso->why_blocked == BlockedOnCCall) + { + awakenBlockedQueueNoLock(tso->blocked_exceptions); + tso->blocked_exceptions = NULL; + } +#endif + /* Reset blocking status */ tso->why_blocked = NotBlocked; - RELEASE_LOCK(&sched_mutex); - cap->r.rCurrentTSO = tso; +#if defined(RTS_SUPPORTS_THREADS) + RELEASE_LOCK(&sched_mutex); +#endif + errno = saved_errno; return &cap->r; } @@ -1528,10 +1922,11 @@ static void unblockThread(StgTSO *tso); * instances of Eq/Ord for ThreadIds. * ------------------------------------------------------------------------ */ -int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) +int +cmp_thread(StgPtr tso1, StgPtr tso2) { - StgThreadID id1 = tso1->id; - StgThreadID id2 = tso2->id; + StgThreadID id1 = ((StgTSO *)tso1)->id; + StgThreadID id2 = ((StgTSO *)tso2)->id; if (id1 < id2) return (-1); if (id1 > id2) return 1; @@ -1543,11 +1938,28 @@ int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) * * This is used in the implementation of Show for ThreadIds. * ------------------------------------------------------------------------ */ -int rts_getThreadId(const StgTSO *tso) +int +rts_getThreadId(StgPtr tso) { - return tso->id; + return ((StgTSO *)tso)->id; } +#ifdef DEBUG +void +labelThread(StgPtr tso, char *label) +{ + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); + strncpy(buf,label,len); + /* Update will free the old memory for us */ + updateThreadLabel((StgWord)tso,buf); +} +#endif /* DEBUG */ + /* --------------------------------------------------------------------------- Create a new thread. @@ -1565,25 +1977,12 @@ int rts_getThreadId(const StgTSO *tso) #if defined(GRAN) /* currently pri (priority) is only used in a GRAN setup -- HWL */ StgTSO * -createThread(nat stack_size, StgInt pri) -{ - return createThread_(stack_size, rtsFalse, pri); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock, StgInt pri) -{ +createThread(nat size, StgInt pri) #else StgTSO * -createThread(nat stack_size) -{ - return createThread_(stack_size, rtsFalse); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock) -{ +createThread(nat size) #endif +{ StgTSO *tso; nat stack_size; @@ -1620,19 +2019,23 @@ createThread_(nat size, rtsBool have_lock) #if defined(GRAN) SET_GRAN_HDR(tso, ThisPE); #endif - tso->what_next = ThreadEnterGHC; + + // Always start with the compiled code evaluator + tso->what_next = ThreadRunGHC; /* tso->id needs to be unique. For now we use a heavyweight mutex to * protect the increment operation on next_thread_id. * In future, we could use an atomic increment instead. */ - if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); } + ACQUIRE_LOCK(&thread_id_mutex); tso->id = next_thread_id++; - if (!have_lock) { RELEASE_LOCK(&sched_mutex); } + RELEASE_LOCK(&thread_id_mutex); tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->saved_errno = 0; + tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -1645,8 +2048,6 @@ createThread_(nat size, rtsBool have_lock) /* put a stop frame on the stack */ tso->sp -= sizeofW(StgStopFrame); SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); - tso->su = (StgUpdateFrame*)tso->sp; - // ToDo: check this #if defined(GRAN) tso->link = END_TSO_QUEUE; @@ -1765,7 +2166,7 @@ createSparkThread(rtsSpark spark) } else { threadsCreated++; - tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); + tso = createThread(RtsFlags.GcFlags.initialStkSize); if (tso==END_TSO_QUEUE) barf("createSparkThread: Cannot create TSO"); #if defined(DIST) @@ -1806,6 +2207,11 @@ activateSpark (rtsSpark spark) } #endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m, + Capability *initialCapability + ); + + /* --------------------------------------------------------------------------- * scheduleThread() * @@ -1816,17 +2222,12 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ -static void scheduleThread_ (StgTSO* tso, rtsBool createTask); +static void scheduleThread_ (StgTSO* tso); void -scheduleThread_(StgTSO *tso - , rtsBool createTask -#if !defined(THREADED_RTS) - STG_UNUSED -#endif - ) +scheduleThread_(StgTSO *tso) { - ACQUIRE_LOCK(&sched_mutex); + // Precondition: sched_mutex must be held. /* Put the new thread on the head of the runnable queue. The caller * better push an appropriate closure on this thread's stack @@ -1834,30 +2235,54 @@ scheduleThread_(StgTSO *tso * soon as we release the scheduler lock below. */ PUSH_ON_RUN_QUEUE(tso); -#if defined(THREADED_RTS) - /* If main() is scheduling a thread, don't bother creating a - * new task. - */ - if ( createTask ) { - startTask(taskStart); - } -#endif THREAD_RUNNABLE(); #if 0 IF_DEBUG(scheduler,printTSO(tso)); #endif - RELEASE_LOCK(&sched_mutex); } void scheduleThread(StgTSO* tso) { - return scheduleThread_(tso, rtsFalse); + ACQUIRE_LOCK(&sched_mutex); + scheduleThread_(tso); + RELEASE_LOCK(&sched_mutex); } -void scheduleExtThread(StgTSO* tso) -{ - return scheduleThread_(tso, rtsTrue); +SchedulerStatus +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability) +{ // Precondition: sched_mutex must be held + StgMainThread *m; + + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; +#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#else + initCondition(&m->wakeup); +#endif +#endif + + /* Put the thread on the main-threads list prior to scheduling the TSO. + Failure to do so introduces a race condition in the MT case (as + identified by Wolfgang Thaller), whereby the new task/OS thread + created by scheduleThread_() would complete prior to the thread + that spawned it managed to put 'itself' on the main-threads list. + The upshot of it all being that the worker thread wouldn't get to + signal the completion of the its work item for the main thread to + see (==> it got stuck waiting.) -- sof 6/02. + */ + IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id)); + + m->link = main_threads; + main_threads = m; + + scheduleThread_(tso); + + return waitThread_(m, initialCapability); } /* --------------------------------------------------------------------------- @@ -1919,6 +2344,7 @@ initScheduler(void) * the scheduler. */ initMutex(&sched_mutex); initMutex(&term_mutex); + initMutex(&thread_id_mutex); initCondition(&thread_ready_cond); #endif @@ -1976,6 +2402,7 @@ exitScheduler( void ) #if defined(RTS_SUPPORTS_THREADS) stopTaskManager(); #endif + shutting_down_scheduler = rtsTrue; } /* ----------------------------------------------------------------------------- @@ -2022,13 +2449,13 @@ finishAllThreads ( void ) { do { while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL); + waitThread ( run_queue_hd, NULL, NULL ); } while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } } while (blocked_queue_hd != END_TSO_QUEUE || @@ -2037,57 +2464,48 @@ finishAllThreads ( void ) } SchedulerStatus -waitThread(StgTSO *tso, /*out*/StgClosure **ret) +waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability) { -#if defined(THREADED_RTS) - return waitThread_(tso,ret, rtsFalse); -#else - return waitThread_(tso,ret); -#endif -} - -SchedulerStatus -waitThread_(StgTSO *tso, - /*out*/StgClosure **ret -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif - ) -{ StgMainThread *m; SchedulerStatus stat; - ACQUIRE_LOCK(&sched_mutex); - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; m->ret = ret; m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#else initCondition(&m->wakeup); #endif +#endif + /* see scheduleWaitThread() comment */ + ACQUIRE_LOCK(&sched_mutex); m->link = main_threads; main_threads = m; - IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); + IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id)); -#if defined(RTS_SUPPORTS_THREADS) + stat = waitThread_(m,initialCapability); + + RELEASE_LOCK(&sched_mutex); + return stat; +} -# if defined(THREADED_RTS) - if (!blockWaiting) { - /* In the threaded case, the OS thread that called main() - * gets to enter the RTS directly without going via another - * task/thread. - */ - RELEASE_LOCK(&sched_mutex); - schedule(); - ASSERT(m->stat != NoStatus); - } else -# endif - { - IF_DEBUG(scheduler, sched_belch("sfoo")); +static +SchedulerStatus +waitThread_(StgMainThread* m, Capability *initialCapability) +{ + SchedulerStatus stat; + + // Precondition: sched_mutex must be held. + IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); + +#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS) + { // FIXME: does this still make sense? + // It's not for the threaded rts => SMP only do { waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); @@ -2098,28 +2516,30 @@ waitThread_(StgTSO *tso, procStatus[MainProc] = Busy; // status of main PE CurrentProc = MainProc; // PE to run it on - schedule(); + RELEASE_LOCK(&sched_mutex); + schedule(m,initialCapability); #else RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); + ACQUIRE_LOCK(&sched_mutex); ASSERT(m->stat != NoStatus); #endif stat = m->stat; #if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) + closeCondition(&m->bound_thread_cond); +#else closeCondition(&m->wakeup); #endif +#endif IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", m->tso->id)); - free(m); - -#if defined(THREADED_RTS) - if (blockWaiting) -#endif - RELEASE_LOCK(&sched_mutex); + stgFree(m); + // Postcondition: sched_mutex still held return stat; } @@ -2246,8 +2666,6 @@ take_off_run_queue(StgTSO *tso) { void GetRoots(evac_fn evac) { - StgMainThread *m; - #if defined(GRAN) { nat i; @@ -2286,9 +2704,6 @@ GetRoots(evac_fn evac) } #endif - for (m = main_threads; m != NULL; m = m->link) { - evac((StgClosure **)&m->tso); - } if (suspended_ccalling_threads != END_TSO_QUEUE) { evac((StgClosure **)&suspended_ccalling_threads); } @@ -2296,6 +2711,30 @@ GetRoots(evac_fn evac) #if defined(PAR) || defined(GRAN) markSparkQueue(evac); #endif + +#if defined(RTS_USER_SIGNALS) + // mark the signal handlers (signals should be already blocked) + markSignalHandlers(evac); +#endif + + // main threads which have completed need to be retained until they + // are dealt with in the main scheduler loop. They won't be + // retained any other way: the GC will drop them from the + // all_threads list, so we have to be careful to treat them as roots + // here. + { + StgMainThread *m; + for (m = main_threads; m != NULL; m = m->link) { + switch (m->tso->what_next) { + case ThreadComplete: + case ThreadKilled: + evac((StgClosure **)&m->tso); + break; + default: + break; + } + } + } } /* ----------------------------------------------------------------------------- @@ -2311,7 +2750,7 @@ GetRoots(evac_fn evac) This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ -void (*extra_roots)(evac_fn); +static void (*extra_roots)(evac_fn); void performGC(void) @@ -2358,7 +2797,7 @@ performGCWithRoots(void (*get_roots)(evac_fn)) static StgTSO * threadStackOverflow(StgTSO *tso) { - nat new_stack_size, new_tso_size, diff, stack_words; + nat new_stack_size, new_tso_size, stack_words; StgPtr new_sp; StgTSO *dest; @@ -2399,25 +2838,19 @@ threadStackOverflow(StgTSO *tso) memcpy(new_sp, tso->sp, stack_words * sizeof(W_)); /* relocate the stack pointers... */ - diff = (P_)new_sp - (P_)tso->sp; /* In *words* */ - dest->su = (StgUpdateFrame *) ((P_)dest->su + diff); - dest->sp = new_sp; + dest->sp = new_sp; dest->stack_size = new_stack_size; - /* and relocate the update frame list */ - relocate_stack(dest, diff); - /* Mark the old TSO as relocated. We have to check for relocated * TSOs in the garbage collector and any primops that deal with TSOs. * - * It's important to set the sp and su values to just beyond the end + * It's important to set the sp value to just beyond the end * of the stack, so we don't attempt to scavenge any part of the * dead TSO's stack. */ tso->what_next = ThreadRelocated; tso->link = dest; tso->sp = (P_)&(tso->stack[tso->stack_size]); - tso->su = (StgUpdateFrame *)tso->sp; tso->why_blocked = NotBlocked; dest->mut_link = NULL; @@ -2719,6 +3152,17 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) } #else /* !GRAN && !PAR */ + +#ifdef RTS_SUPPORTS_THREADS +void +awakenBlockedQueueNoLock(StgTSO *tso) +{ + while (tso != END_TSO_QUEUE) { + tso = unblockOneLocked(tso); + } +} +#endif + void awakenBlockedQueue(StgTSO *tso) { @@ -2758,13 +3202,15 @@ interruptStgRts(void) NB: only the type of the blocking queue is different in GranSim and GUM the operations on the queue-elements are the same long live polymorphism! + + Locks: sched_mutex is held upon entry and exit. + */ static void unblockThread(StgTSO *tso) { StgBlockingQueueElement *t, **last; - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { case NotBlocked: @@ -2836,6 +3282,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { /* take TSO off blocked_queue */ StgBlockingQueueElement *prev = NULL; @@ -2874,7 +3323,7 @@ unblockThread(StgTSO *tso) goto done; } } - barf("unblockThread (I/O): TSO not found"); + barf("unblockThread (delay): TSO not found"); } default: @@ -2886,20 +3335,20 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #else static void unblockThread(StgTSO *tso) { StgTSO *t, **last; + + /* To avoid locking unnecessarily. */ + if (tso->why_blocked == NotBlocked) { + return; + } - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { - case NotBlocked: - return; /* not blocked */ - case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -2963,6 +3412,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { StgTSO *prev = NULL; for (t = blocked_queue_hd; t != END_TSO_QUEUE; @@ -2999,7 +3451,7 @@ unblockThread(StgTSO *tso) goto done; } } - barf("unblockThread (I/O): TSO not found"); + barf("unblockThread (delay): TSO not found"); } default: @@ -3011,7 +3463,6 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #endif @@ -3031,12 +3482,12 @@ unblockThread(StgTSO *tso) * the top of the stack. * * How exactly do we save all the active computations? We create an - * AP_UPD for every UpdateFrame on the stack. Entering one of these - * AP_UPDs pushes everything from the corresponding update frame + * AP_STACK for every UpdateFrame on the stack. Entering one of these + * AP_STACKs pushes everything from the corresponding update frame * upwards onto the stack. (Actually, it pushes everything up to the - * next update frame plus a pointer to the next AP_UPD object. - * Entering the next AP_UPD object pushes more onto the stack until we - * reach the last AP_UPD object - at which point the stack should look + * next update frame plus a pointer to the next AP_STACK object. + * Entering the next AP_STACK object pushes more onto the stack until we + * reach the last AP_STACK object - at which point the stack should look * exactly as it did when we killed the TSO and we can continue * execution by entering the closure on top of the stack. * @@ -3045,6 +3496,8 @@ unblockThread(StgTSO *tso) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * + * Locks: sched_mutex held upon entry nor exit. + * * -------------------------------------------------------------------------- */ void @@ -3053,195 +3506,200 @@ deleteThread(StgTSO *tso) raiseAsync(tso,NULL); } -void -raiseAsync(StgTSO *tso, StgClosure *exception) -{ - StgUpdateFrame* su = tso->su; - StgPtr sp = tso->sp; - - /* Thread already dead? */ - if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; - } - - IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id)); - - /* Remove it from any blocking queues */ - unblockThread(tso); +#ifdef THREADED_RTS +static void +deleteThreadImmediately(StgTSO *tso) +{ // for forkProcess only: + // delete thread without giving it a chance to catch the KillThread exception - /* The stack freezing code assumes there's a closure pointer on - * the top of the stack. This isn't always the case with compiled - * code, so we have to push a dummy closure on the top which just - * returns to the next return address on the stack. - */ - if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) { - *(--sp) = (W_)&stg_dummy_ret_closure; + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; } +#if defined(RTS_SUPPORTS_THREADS) + if (tso->why_blocked != BlockedOnCCall + && tso->why_blocked != BlockedOnCCall_NoUnblockExc) +#endif + unblockThread(tso); + tso->what_next = ThreadKilled; +} +#endif - while (1) { - nat words = ((P_)su - (P_)sp) - 1; - nat i; - StgAP_UPD * ap; - - /* If we find a CATCH_FRAME, and we've got an exception to raise, - * then build the THUNK raise(exception), and leave it on - * top of the CATCH_FRAME ready to enter. - */ - if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) { - StgCatchFrame *cf = (StgCatchFrame *)su; - StgClosure *raise; - - /* we've got an exception to raise, so let's pass it to the - * handler in this frame. - */ - raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); - TICK_ALLOC_SE_THK(1,0); - SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); - raise->payload[0] = exception; - - /* throw away the stack from Sp up to the CATCH_FRAME. - */ - sp = (P_)su - 1; - - /* Ensure that async excpetions are blocked now, so we don't get - * a surprise exception before we get around to executing the - * handler. - */ - if (tso->blocked_exceptions == NULL) { - tso->blocked_exceptions = END_TSO_QUEUE; - } +void +raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) +{ + /* When raising async exs from contexts where sched_mutex isn't held; + use raiseAsyncWithLock(). */ + ACQUIRE_LOCK(&sched_mutex); + raiseAsync(tso,exception); + RELEASE_LOCK(&sched_mutex); +} - /* Put the newly-built THUNK on top of the stack, ready to execute - * when the thread restarts. - */ - sp[0] = (W_)raise; - tso->sp = sp; - tso->su = su; - tso->what_next = ThreadEnterGHC; - IF_DEBUG(sanity, checkTSO(tso)); - return; +void +raiseAsync(StgTSO *tso, StgClosure *exception) +{ + StgRetInfoTable *info; + StgPtr sp; + + // Thread already dead? + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; } - /* First build an AP_UPD consisting of the stack chunk above the - * current update frame, with the top word on the stack as the - * fun field. - */ - ap = (StgAP_UPD *)allocate(AP_sizeW(words)); + IF_DEBUG(scheduler, + sched_belch("raising exception in thread %ld.", tso->id)); - ASSERT(words >= 0); + // Remove it from any blocking queues + unblockThread(tso); + + sp = tso->sp; - ap->n_args = words; - ap->fun = (StgClosure *)sp[0]; - sp++; - for(i=0; i < (nat)words; ++i) { - ap->payload[i] = (StgClosure *)*sp++; + // The stack freezing code assumes there's a closure pointer on + // the top of the stack, so we have to arrange that this is the case... + // + if (sp[0] == (W_)&stg_enter_info) { + sp++; + } else { + sp--; + sp[0] = (W_)&stg_dummy_ret_closure; } - - switch (get_itbl(su)->type) { - - case UPDATE_FRAME: - { - SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); - TICK_ALLOC_UP_THK(words+1,0); - - IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Updating "); - printPtr((P_)su->updatee); - fprintf(stderr, " with "); - printObj((StgClosure *)ap); - ); - - /* Replace the updatee with an indirection - happily - * this will also wake up any threads currently - * waiting on the result. - * - * Warning: if we're in a loop, more than one update frame on - * the stack may point to the same object. Be careful not to - * overwrite an IND_OLDGEN in this case, because we'll screw - * up the mutable lists. To be on the safe side, don't - * overwrite any kind of indirection at all. See also - * threadSqueezeStack in GC.c, where we have to make a similar - * check. - */ - if (!closure_IND(su->updatee)) { - UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */ - } - su = su->link; - sp += sizeofW(StgUpdateFrame) -1; - sp[0] = (W_)ap; /* push onto stack */ - break; - } - case CATCH_FRAME: - { - StgCatchFrame *cf = (StgCatchFrame *)su; - StgClosure* o; - - /* We want a PAP, not an AP_UPD. Fortunately, the - * layout's the same. - */ - SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */); - TICK_ALLOC_UPD_PAP(words+1,0); + while (1) { + nat i; + + // 1. Let the top of the stack be the "current closure" + // + // 2. Walk up the stack until we find either an UPDATE_FRAME or a + // CATCH_FRAME. + // + // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the + // current closure applied to the chunk of stack up to (but not + // including) the update frame. This closure becomes the "current + // closure". Go back to step 2. + // + // 4. If it's a CATCH_FRAME, then leave the exception handler on + // top of the stack applied to the exception. + // + // 5. If it's a STOP_FRAME, then kill the thread. - /* now build o = FUN(catch,ap,handler) */ - o = (StgClosure *)allocate(sizeofW(StgClosure)+2); - TICK_ALLOC_FUN(2,0); - SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */); - o->payload[0] = (StgClosure *)ap; - o->payload[1] = cf->handler; + StgPtr frame; - IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Built "); - printObj((StgClosure *)o); - ); + frame = sp + 1; + info = get_ret_itbl((StgClosure *)frame); - /* pop the old handler and put o on the stack */ - su = cf->link; - sp += sizeofW(StgCatchFrame) - 1; - sp[0] = (W_)o; - break; - } - - case SEQ_FRAME: - { - StgSeqFrame *sf = (StgSeqFrame *)su; - StgClosure* o; - - SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */); - TICK_ALLOC_UPD_PAP(words+1,0); + while (info->i.type != UPDATE_FRAME + && (info->i.type != CATCH_FRAME || exception == NULL) + && info->i.type != STOP_FRAME) { + frame += stack_frame_sizeW((StgClosure *)frame); + info = get_ret_itbl((StgClosure *)frame); + } - /* now build o = FUN(seq,ap) */ - o = (StgClosure *)allocate(sizeofW(StgClosure)+1); - TICK_ALLOC_SE_THK(1,0); - SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */); - o->payload[0] = (StgClosure *)ap; + switch (info->i.type) { + + case CATCH_FRAME: + // If we find a CATCH_FRAME, and we've got an exception to raise, + // then build the THUNK raise(exception), and leave it on + // top of the CATCH_FRAME ready to enter. + // + { +#ifdef PROFILING + StgCatchFrame *cf = (StgCatchFrame *)frame; +#endif + StgClosure *raise; + + // we've got an exception to raise, so let's pass it to the + // handler in this frame. + // + raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); + TICK_ALLOC_SE_THK(1,0); + SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); + raise->payload[0] = exception; + + // throw away the stack from Sp up to the CATCH_FRAME. + // + sp = frame - 1; + + /* Ensure that async excpetions are blocked now, so we don't get + * a surprise exception before we get around to executing the + * handler. + */ + if (tso->blocked_exceptions == NULL) { + tso->blocked_exceptions = END_TSO_QUEUE; + } + + /* Put the newly-built THUNK on top of the stack, ready to execute + * when the thread restarts. + */ + sp[0] = (W_)raise; + sp[-1] = (W_)&stg_enter_info; + tso->sp = sp-1; + tso->what_next = ThreadRunGHC; + IF_DEBUG(sanity, checkTSO(tso)); + return; + } - IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Built "); - printObj((StgClosure *)o); - ); + case UPDATE_FRAME: + { + StgAP_STACK * ap; + nat words; + + // First build an AP_STACK consisting of the stack chunk above the + // current update frame, with the top word on the stack as the + // fun field. + // + words = frame - sp - 1; + ap = (StgAP_STACK *)allocate(PAP_sizeW(words)); + + ap->size = words; + ap->fun = (StgClosure *)sp[0]; + sp++; + for(i=0; i < (nat)words; ++i) { + ap->payload[i] = (StgClosure *)*sp++; + } + + SET_HDR(ap,&stg_AP_STACK_info, + ((StgClosure *)frame)->header.prof.ccs /* ToDo */); + TICK_ALLOC_UP_THK(words+1,0); + + IF_DEBUG(scheduler, + fprintf(stderr, "scheduler: Updating "); + printPtr((P_)((StgUpdateFrame *)frame)->updatee); + fprintf(stderr, " with "); + printObj((StgClosure *)ap); + ); + + // Replace the updatee with an indirection - happily + // this will also wake up any threads currently + // waiting on the result. + // + // Warning: if we're in a loop, more than one update frame on + // the stack may point to the same object. Be careful not to + // overwrite an IND_OLDGEN in this case, because we'll screw + // up the mutable lists. To be on the safe side, don't + // overwrite any kind of indirection at all. See also + // threadSqueezeStack in GC.c, where we have to make a similar + // check. + // + if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) { + // revert the black hole + UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap); + } + sp += sizeofW(StgUpdateFrame) - 1; + sp[0] = (W_)ap; // push onto stack + break; + } - /* pop the old handler and put o on the stack */ - su = sf->link; - sp += sizeofW(StgSeqFrame) - 1; - sp[0] = (W_)o; - break; - } - - case STOP_FRAME: - /* We've stripped the entire stack, the thread is now dead. */ - sp += sizeofW(StgStopFrame) - 1; - sp[0] = (W_)exception; /* save the exception */ - tso->what_next = ThreadKilled; - tso->su = (StgUpdateFrame *)(sp+1); - tso->sp = sp; - return; - - default: - barf("raiseAsync"); + case STOP_FRAME: + // We've stripped the entire stack, the thread is now dead. + sp += sizeofW(StgStopFrame); + tso->what_next = ThreadKilled; + tso->sp = sp; + return; + + default: + barf("raiseAsync"); + } } - } - barf("raiseAsync"); + barf("raiseAsync"); } /* ----------------------------------------------------------------------------- @@ -3250,6 +3708,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception) up and sent a signal: BlockedOnDeadMVar if the thread was blocked on an MVar, or NonTermination if the thread was blocked on a Black Hole. + + Locks: sched_mutex isn't held upon entry nor exit. -------------------------------------------------------------------------- */ void @@ -3266,6 +3726,7 @@ resurrectThreads( StgTSO *threads ) switch (tso->why_blocked) { case BlockedOnMVar: case BlockedOnException: + /* Called by GC - sched_mutex lock is currently held. */ raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure); break; case BlockedOnBlackHole: @@ -3290,68 +3751,71 @@ resurrectThreads( StgTSO *threads ) * * This is only done in a deadlock situation in order to avoid * performance overhead in the normal case. + * + * Locks: sched_mutex is held upon entry and exit. * -------------------------------------------------------------------------- */ static void detectBlackHoles( void ) { - StgTSO *t = all_threads; - StgUpdateFrame *frame; + StgTSO *tso = all_threads; + StgClosure *frame; StgClosure *blocked_on; + StgRetInfoTable *info; - for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { + for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) { - while (t->what_next == ThreadRelocated) { - t = t->link; - ASSERT(get_itbl(t)->type == TSO); + while (tso->what_next == ThreadRelocated) { + tso = tso->link; + ASSERT(get_itbl(tso)->type == TSO); } - if (t->why_blocked != BlockedOnBlackHole) { + if (tso->why_blocked != BlockedOnBlackHole) { continue; } + blocked_on = tso->block_info.closure; - blocked_on = t->block_info.closure; - - for (frame = t->su; ; frame = frame->link) { - switch (get_itbl(frame)->type) { + frame = (StgClosure *)tso->sp; + while(1) { + info = get_ret_itbl(frame); + switch (info->i.type) { case UPDATE_FRAME: - if (frame->updatee == blocked_on) { + if (((StgUpdateFrame *)frame)->updatee == blocked_on) { /* We are blocking on one of our own computations, so * send this thread the NonTermination exception. */ IF_DEBUG(scheduler, - sched_belch("thread %d is blocked on itself", t->id)); - raiseAsync(t, (StgClosure *)NonTermination_closure); + sched_belch("thread %d is blocked on itself", tso->id)); + raiseAsync(tso, (StgClosure *)NonTermination_closure); goto done; } - else { - continue; - } - - case CATCH_FRAME: - case SEQ_FRAME: - continue; + frame = (StgClosure *) ((StgUpdateFrame *)frame + 1); + continue; + case STOP_FRAME: - break; - } - break; - } + goto done; - done: ; - } + // normal stack frames; do nothing except advance the pointer + default: + (StgPtr)frame += stack_frame_sizeW(frame); + } + } + done: ; + } } //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code //@subsection Debugging Routines /* ----------------------------------------------------------------------------- - Debugging: why is a thread blocked + * Debugging: why is a thread blocked + * [Also provides useful information when debugging threaded programs + * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02] -------------------------------------------------------------------------- */ -#ifdef DEBUG - +static void printThreadBlockage(StgTSO *tso) { @@ -3362,6 +3826,11 @@ printThreadBlockage(StgTSO *tso) case BlockedOnWrite: fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd); break; +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: + fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID); + break; +#endif case BlockedOnDelay: fprintf(stderr,"is blocked until %d", tso->block_info.target); break; @@ -3392,6 +3861,9 @@ printThreadBlockage(StgTSO *tso) case BlockedOnCCall: fprintf(stderr,"is blocked on an external call"); break; + case BlockedOnCCall_NoUnblockExc: + fprintf(stderr,"is blocked on an external call (exceptions were already blocked)"); + break; #endif default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", @@ -3399,6 +3871,7 @@ printThreadBlockage(StgTSO *tso) } } +static void printThreadStatus(StgTSO *tso) { @@ -3418,30 +3891,35 @@ void printAllThreads(void) { StgTSO *t; + void *label; # if defined(GRAN) char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; ullong_format_string(TIME_ON_PROC(CurrentProc), time_string, rtsFalse/*no commas!*/); - sched_belch("all threads at [%s]:", time_string); + fprintf(stderr, "all threads at [%s]:\n", time_string); # elif defined(PAR) char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; ullong_format_string(CURRENT_TIME, time_string, rtsFalse/*no commas!*/); - sched_belch("all threads at [%s]:", time_string); + fprintf(stderr,"all threads at [%s]:\n", time_string); # else - sched_belch("all threads:"); + fprintf(stderr,"all threads:\n"); # endif for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - fprintf(stderr, "\tthread %d ", t->id); + fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t); + label = lookupThreadLabel((StgWord)t); + if (label) fprintf(stderr,"[\"%s\"] ",(char *)label); printThreadStatus(t); fprintf(stderr,"\n"); } } +#ifdef DEBUG + /* Print a whole blocking queue attached to node (debugging only). */ @@ -3624,6 +4102,7 @@ sched_belch(char *s, ...) #endif vfprintf(stderr, s, ap); fprintf(stderr, "\n"); + va_end(ap); } #endif /* DEBUG */