X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=6e14fc58db263f5941b83ce5d95e97305144cac7;hb=0542fa233d82cabb178505ce7e5ebab8ac0ba0e9;hp=61cae7ae17429c28ad120d4ec05b39018ae0c480;hpb=fbe64026eac8b559693868f38e6a9e6dad8a8ec6;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 61cae7a..6e14fc5 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,7 +1,7 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.163 2003/03/19 18:41:19 sof Exp $ + * $Id: Schedule.c,v 1.184 2003/12/18 12:24:59 simonmar Exp $ * - * (c) The GHC Team, 1998-2000 + * (c) The GHC Team, 1998-2003 * * Scheduler * @@ -10,36 +10,14 @@ * * WAY Name CPP flag What's it for * -------------------------------------- - * mp GUM PAR Parallel execution on a distributed memory machine + * mp GUM PAR Parallel execution on a distrib. memory machine * s SMP SMP Parallel execution on a shared memory machine * mg GranSim GRAN Simulation of parallel execution * md GUM/GdH DIST Distributed execution (based on GUM) * * --------------------------------------------------------------------------*/ -//@node Main scheduling code, , , -//@section Main scheduling code - /* - * Version with scheduler monitor support for SMPs (WAY=s): - - This design provides a high-level API to create and schedule threads etc. - as documented in the SMP design document. - - It uses a monitor design controlled by a single mutex to exercise control - over accesses to shared data structures, and builds on the Posix threads - library. - - The majority of state is shared. In order to keep essential per-task state, - there is a Capability structure, which contains all the information - needed to run a thread: its STG registers, a pointer to its TSO, a - nursery etc. During STG execution, a pointer to the capability is - kept in a register (BaseReg). - - In a non-SMP build, there is one global capability, namely MainRegTable. - - SDM & KH, 10/99 - * Version with support for distributed memory parallelism aka GUM (WAY=mp): The main scheduling loop in GUM iterates until a finish message is received. @@ -59,22 +37,6 @@ over the events in the global event queue. -- HWL */ -//@menu -//* Includes:: -//* Variables and Data structures:: -//* Main scheduling loop:: -//* Suspend and Resume:: -//* Run queue code:: -//* Garbage Collextion Routines:: -//* Blocking Queue Routines:: -//* Exception Handling Routines:: -//* Debugging Routines:: -//* Index:: -//@end menu - -//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code -//@subsection Includes - #include "PosixSource.h" #include "Rts.h" #include "SchedAPI.h" @@ -126,23 +88,27 @@ #include #include -//@node Variables and Data structures, Prototypes, Includes, Main scheduling code -//@subsection Variables and Data structures +#ifdef HAVE_ERRNO_H +#include +#endif + +#ifdef THREADED_RTS +#define USED_IN_THREADED_RTS +#else +#define USED_IN_THREADED_RTS STG_UNUSED +#endif + +#ifdef RTS_SUPPORTS_THREADS +#define USED_WHEN_RTS_SUPPORTS_THREADS +#else +#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED +#endif /* Main thread queue. * Locks required: sched_mutex. */ StgMainThread *main_threads = NULL; -#ifdef THREADED_RTS -// Pointer to the thread that executes main -// When this thread is finished, the program terminates -// by calling shutdownHaskellAndExit. -// It would be better to add a call to shutdownHaskellAndExit -// to the Main.main wrapper and to remove this hack. -StgMainThread *main_main_thread = NULL; -#endif - /* Thread queues. * Locks required: sched_mutex. */ @@ -195,17 +161,14 @@ static StgTSO *threadStackOverflow(StgTSO *tso); */ /* flag set by signal handler to precipitate a context switch */ -//@cindex context_switch nat context_switch = 0; /* if this flag is set as well, give up execution */ -//@cindex interrupted rtsBool interrupted = rtsFalse; /* Next thread ID to allocate. * Locks required: thread_id_mutex */ -//@cindex next_thread_id static StgThreadID next_thread_id = 1; /* @@ -217,14 +180,15 @@ static StgThreadID next_thread_id = 1; /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) - * + 1 (the realworld token for an IO thread) * + 1 (the closure to enter) + * + 1 (stg_ap_v_ret) + * + 1 (spare slot req'd by stg_ap_v_ret) * * A thread with this stack will bomb immediately with a stack * overflow, which will increase its stack size. */ -#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) +#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3) #if defined(GRAN) @@ -248,15 +212,11 @@ static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); -static void schedule ( void ); +static void schedule ( StgMainThread *mainThread, Capability *initialCapability ); void interruptStgRts ( void ); static void detectBlackHoles ( void ); -#ifdef DEBUG -static void sched_belch(char *s, ...); -#endif - #if defined(RTS_SUPPORTS_THREADS) /* ToDo: carefully document the invariants that go together * with these synchronisation objects. @@ -270,12 +230,6 @@ Mutex term_mutex = INIT_MUTEX_VAR; */ Mutex thread_id_mutex = INIT_MUTEX_VAR; - -# if defined(SMP) -static Condition gc_pending_cond = INIT_COND_VAR; -nat await_death; -# endif - #endif /* RTS_SUPPORTS_THREADS */ #if defined(PAR) @@ -299,32 +253,40 @@ StgTSO * createSparkThread(rtsSpark spark); StgTSO * activateSpark (rtsSpark spark); #endif -/* - * The thread state for the main thread. -// ToDo: check whether not needed any more -StgTSO *MainTSO; - */ +/* ---------------------------------------------------------------------------- + * Starting Tasks + * ------------------------------------------------------------------------- */ + +#if defined(RTS_SUPPORTS_THREADS) +static rtsBool startingWorkerThread = rtsFalse; -#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) static void taskStart(void); static void taskStart(void) { - schedule(); + ACQUIRE_LOCK(&sched_mutex); + schedule(NULL,NULL); + RELEASE_LOCK(&sched_mutex); } -#endif -#if defined(RTS_SUPPORTS_THREADS) void -startSchedulerTask(void) +startSchedulerTaskIfNecessary(void) { - startTask(taskStart); + if(run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE) + { + if(!startingWorkerThread) + { // we don't want to start another worker thread + // just because the last one hasn't yet reached the + // "waiting for capability" state + startingWorkerThread = rtsTrue; + startTask(taskStart); + } + } } #endif -//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code -//@subsection Main scheduling loop - /* --------------------------------------------------------------------------- Main scheduling loop. @@ -360,12 +322,12 @@ startSchedulerTask(void) This is not the ugliest code you could imagine, but it's bloody close. ------------------------------------------------------------------------ */ -//@cindex schedule static void -schedule( void ) +schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, + Capability *initialCapability ) { StgTSO *t; - Capability *cap; + Capability *cap = initialCapability; StgThreadReturnCode ret; #if defined(GRAN) rtsEvent *event; @@ -382,13 +344,20 @@ schedule( void ) rtsBool was_interrupted = rtsFalse; StgTSOWhatNext prev_what_next; - ACQUIRE_LOCK(&sched_mutex); + // Pre-condition: sched_mutex is held. #if defined(RTS_SUPPORTS_THREADS) - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); - IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId())); + // + // in the threaded case, the capability is either passed in via the + // initialCapability parameter, or initialized inside the scheduler + // loop + // + IF_DEBUG(scheduler, + sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)", + mainThread, initialCapability); + ); #else - /* simply initialise it in the non-threaded case */ + // simply initialise it in the non-threaded case grabCapability(&cap); #endif @@ -419,98 +388,115 @@ schedule( void ) while (!receivedFinish) { /* set by processMessages */ /* when receiving PP_FINISH message */ -#else + +#else // everything except GRAN and PAR while (1) { #endif - IF_DEBUG(scheduler, printAllThreads()); + IF_DEBUG(scheduler, printAllThreads()); #if defined(RTS_SUPPORTS_THREADS) - /* Check to see whether there are any worker threads - waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, &cap); + // Yield the capability to higher-priority tasks if necessary. + // + if (cap != NULL) { + yieldCapability(&cap); + } + + // If we do not currently hold a capability, we wait for one + // + if (cap == NULL) { + waitForCapability(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + } + + // We now have a capability... #endif - /* If we're interrupted (the user pressed ^C, or some other - * termination condition occurred), kill all the currently running - * threads. - */ + // + // If we're interrupted (the user pressed ^C, or some other + // termination condition occurred), kill all the currently running + // threads. + // if (interrupted) { - IF_DEBUG(scheduler, sched_belch("interrupted")); - interrupted = rtsFalse; - was_interrupted = rtsTrue; + IF_DEBUG(scheduler, sched_belch("interrupted")); + interrupted = rtsFalse; + was_interrupted = rtsTrue; #if defined(RTS_SUPPORTS_THREADS) - // In the threaded RTS, deadlock detection doesn't work, - // so just exit right away. - prog_belch("interrupted"); - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); + // In the threaded RTS, deadlock detection doesn't work, + // so just exit right away. + prog_belch("interrupted"); + releaseCapability(cap); + RELEASE_LOCK(&sched_mutex); + shutdownHaskellAndExit(EXIT_SUCCESS); #else - deleteAllThreads(); + deleteAllThreads(); #endif } - /* Go through the list of main threads and wake up any - * clients whose computations have finished. ToDo: this - * should be done more efficiently without a linear scan - * of the main threads list, somehow... - */ + // + // Go through the list of main threads and wake up any + // clients whose computations have finished. ToDo: this + // should be done more efficiently without a linear scan + // of the main threads list, somehow... + // #if defined(RTS_SUPPORTS_THREADS) { - StgMainThread *m, **prev; - prev = &main_threads; - for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { - switch (m->tso->what_next) { - case ThreadComplete: - if (m->ret) { - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(m->ret) = (StgClosure *)m->tso->sp[1]; - } - *prev = m->link; - m->stat = Success; - broadcastCondition(&m->wakeup); -#ifdef DEBUG - removeThreadLabel((StgWord)m->tso); -#endif - if(m == main_main_thread) - { - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); - } - break; - case ThreadKilled: - if (m->ret) *(m->ret) = NULL; - *prev = m->link; - if (was_interrupted) { - m->stat = Interrupted; - } else { - m->stat = Killed; - } - broadcastCondition(&m->wakeup); + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { + if (m->tso->what_next == ThreadComplete + || m->tso->what_next == ThreadKilled) + { + if (m == mainThread) + { + if (m->tso->what_next == ThreadComplete) + { + if (m->ret) + { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(m->ret) = (StgClosure *)m->tso->sp[1]; + } + m->stat = Success; + } + else + { + if (m->ret) + { + *(m->ret) = NULL; + } + if (was_interrupted) + { + m->stat = Interrupted; + } + else + { + m->stat = Killed; + } + } + *prev = m->link; + #ifdef DEBUG - removeThreadLabel((StgWord)m->tso); + removeThreadLabel((StgWord)m->tso->id); #endif - if(m == main_main_thread) - { releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); + return; + } + else + { + // The current OS thread can not handle the fact that + // the Haskell thread "m" has ended. "m" is bound; + // the scheduler loop in it's bound OS thread has to + // return, so let's pass our capability directly to + // that thread. + passCapability(&m->bound_thread_cond); + continue; + } } - break; - default: - break; } - } } - + #else /* not threaded */ # if defined(PAR) @@ -524,7 +510,7 @@ schedule( void ) if (m->tso->what_next == ThreadComplete || m->tso->what_next == ThreadKilled) { #ifdef DEBUG - removeThreadLabel((StgWord)m->tso); + removeThreadLabel((StgWord)m->tso->id); #endif main_threads = main_threads->link; if (m->tso->what_next == ThreadComplete) { @@ -546,50 +532,9 @@ schedule( void ) } #endif - /* Top up the run queue from our spark pool. We try to make the - * number of threads in the run queue equal to the number of - * free capabilities. - * - * Disable spark support in SMP for now, non-essential & requires - * a little bit of work to make it compile cleanly. -- sof 1/02. - */ -#if 0 /* defined(SMP) */ - { - nat n = getFreeCapabilities(); - StgTSO *tso = run_queue_hd; - - /* Count the run queue */ - while (n > 0 && tso != END_TSO_QUEUE) { - tso = tso->link; - n--; - } - for (; n > 0; n--) { - StgClosure *spark; - spark = findSpark(rtsFalse); - if (spark == NULL) { - break; /* no more sparks in the pool */ - } else { - /* I'd prefer this to be done in activateSpark -- HWL */ - /* tricky - it needs to hold the scheduler lock and - * not try to re-acquire it -- SDM */ - createSparkThread(spark); - IF_DEBUG(scheduler, - sched_belch("==^^ turning spark of closure %p into a thread", - (StgClosure *)spark)); - } - } - /* We need to wake up the other tasks if we just created some - * work for them. - */ - if (getFreeCapabilities() - n > 1) { - signalCondition( &thread_ready_cond ); - } - } -#endif // SMP - - /* check for signals each time around the scheduler */ -#ifndef mingw32_TARGET_OS +#if defined(RTS_USER_SIGNALS) + // check for signals each time around the scheduler if (signals_pending()) { RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); @@ -602,16 +547,12 @@ schedule( void ) * can wait indefinitely for something to happen. */ if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) -#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) +#if defined(RTS_SUPPORTS_THREADS) || EMPTY_RUN_QUEUE() #endif ) { - awaitEvent( EMPTY_RUN_QUEUE() -#if defined(SMP) - && allFreeCapabilities() -#endif - ); + awaitEvent( EMPTY_RUN_QUEUE() ); } /* we can be interrupted while waiting for I/O... */ if (interrupted) continue; @@ -628,20 +569,9 @@ schedule( void ) * inform all the main threads. */ #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS) - if ( EMPTY_THREAD_QUEUES() -#if defined(RTS_SUPPORTS_THREADS) - && EMPTY_QUEUE(suspended_ccalling_threads) -#endif -#ifdef SMP - && allFreeCapabilities() -#endif - ) + if ( EMPTY_THREAD_QUEUES() ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); -#if defined(THREADED_RTS) - /* and SMP mode ..? */ - releaseCapability(cap); -#endif // Garbage collection can release some new threads due to // either (a) finalizers or (b) threads resurrected because // they are about to be send BlockedOnDeadMVar. Any threads @@ -656,24 +586,12 @@ schedule( void ) if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } -#ifndef mingw32_TARGET_OS +#if defined(RTS_USER_SIGNALS) /* If we have user-installed signal handlers, then wait * for signals to arrive rather then bombing out with a * deadlock. */ -#if defined(RTS_SUPPORTS_THREADS) - if ( 0 ) { /* hmm..what to do? Simply stop waiting for - a signal with no runnable threads (or I/O - suspended ones) leads nowhere quick. - For now, simply shut down when we reach this - condition. - - ToDo: define precisely under what conditions - the Scheduler should shut down in an MT setting. - */ -#else if ( anyUserHandlers() ) { -#endif IF_DEBUG(scheduler, sched_belch("still deadlocked, waiting for signals...")); @@ -699,21 +617,6 @@ schedule( void ) */ { StgMainThread *m; -#if defined(RTS_SUPPORTS_THREADS) - for (m = main_threads; m != NULL; m = m->link) { - switch (m->tso->why_blocked) { - case BlockedOnBlackHole: - raiseAsync(m->tso, (StgClosure *)NonTermination_closure); - break; - case BlockedOnException: - case BlockedOnMVar: - raiseAsync(m->tso, (StgClosure *)Deadlock_closure); - break; - default: - barf("deadlock: main thread blocked in a strange way"); - } - } -#else m = main_threads; switch (m->tso->why_blocked) { case BlockedOnBlackHole: @@ -726,63 +629,21 @@ schedule( void ) default: barf("deadlock: main thread blocked in a strange way"); } -#endif } - -#if defined(RTS_SUPPORTS_THREADS) - /* ToDo: revisit conditions (and mechanism) for shutting - down a multi-threaded world */ - IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); - RELEASE_LOCK(&sched_mutex); - shutdownHaskell(); - return; -#endif } not_deadlocked: #elif defined(RTS_SUPPORTS_THREADS) - /* ToDo: add deadlock detection in threaded RTS */ + // ToDo: add deadlock detection in threaded RTS #elif defined(PAR) - /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ + // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif -#if defined(SMP) - /* If there's a GC pending, don't do anything until it has - * completed. - */ - if (ready_to_gc) { - IF_DEBUG(scheduler,sched_belch("waiting for GC")); - waitCondition( &gc_pending_cond, &sched_mutex ); - } -#endif - #if defined(RTS_SUPPORTS_THREADS) -#if defined(SMP) - /* block until we've got a thread on the run queue and a free - * capability. - * - */ - if ( EMPTY_RUN_QUEUE() ) { - /* Give up our capability */ - releaseCapability(cap); - - /* If we're in the process of shutting down (& running the - * a batch of finalisers), don't wait around. - */ - if ( shutting_down_scheduler ) { - RELEASE_LOCK(&sched_mutex); - return; - } - IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); - waitForWorkCapability(&sched_mutex, &cap, rtsTrue); - IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); - } -#else if ( EMPTY_RUN_QUEUE() ) { continue; // nothing to do } #endif -#endif #if defined(GRAN) if (RtsFlags.GranFlags.Light) @@ -907,7 +768,7 @@ schedule( void ) /* in a GranSim setup the TSO stays on the run queue */ t = CurrentTSO; /* Take a thread from the run queue. */ - t = POP_RUN_QUEUE(); // take_off_run_queue(t); + POP_RUN_QUEUE(t); // take_off_run_queue(t); IF_DEBUG(gran, fprintf(stderr, "GRAN: About to run current thread, which is\n"); @@ -1014,7 +875,7 @@ schedule( void ) ASSERT(run_queue_hd != END_TSO_QUEUE); /* Take a thread from the run queue, if we have work */ - t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE); + POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE); IF_DEBUG(sanity,checkTSO(t)); /* ToDo: write something to the log-file @@ -1058,14 +919,59 @@ schedule( void ) # endif #else /* !GRAN && !PAR */ - /* grab a thread from the run queue */ + // grab a thread from the run queue ASSERT(run_queue_hd != END_TSO_QUEUE); - t = POP_RUN_QUEUE(); + POP_RUN_QUEUE(t); + // Sanity check the thread we're about to run. This can be // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); #endif +#ifdef THREADED_RTS + { + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == t) + break; + } + + if(m) + { + if(m == mainThread) + { + IF_DEBUG(scheduler, + sched_belch("### Running thread %d in bound thread", t->id)); + // yes, the Haskell thread is bound to the current native thread + } + else + { + IF_DEBUG(scheduler, + sched_belch("### thread %d bound to another OS thread", t->id)); + // no, bound to a different Haskell thread: pass to that thread + PUSH_ON_RUN_QUEUE(t); + passCapability(&m->bound_thread_cond); + continue; + } + } + else + { + if(mainThread != NULL) + // The thread we want to run is bound. + { + IF_DEBUG(scheduler, + sched_belch("### this OS thread cannot run thread %d", t->id)); + // no, the current native thread is bound to a different + // Haskell thread, so pass it to any worker thread + PUSH_ON_RUN_QUEUE(t); + passCapabilityToWorker(); + continue; + } + } + } +#endif + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1102,7 +1008,9 @@ run_thread: ret = ThreadFinished; break; case ThreadRunGHC: + errno = t->saved_errno; ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); + t->saved_errno = errno; break; case ThreadInterpret: ret = interpretBCO(cap); @@ -1121,9 +1029,9 @@ run_thread: ACQUIRE_LOCK(&sched_mutex); #ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); + IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); + IF_DEBUG(scheduler,fprintf(stderr,"sched: ");); #endif t = cap->r.rCurrentTSO; @@ -1388,6 +1296,7 @@ run_thread: t->id, whatNext_strs[t->what_next]); printThreadBlockage(t); fprintf(stderr, "\n")); + fflush(stderr); /* Only for dumping event to log file ToDo: do I need this in GranSim, too? @@ -1396,7 +1305,7 @@ run_thread: #endif threadPaused(t); break; - + case ThreadFinished: /* Need to check whether this was a main thread, and if so, signal * the task that started it with the return value. If we have no @@ -1443,11 +1352,7 @@ run_thread: } #endif - if (ready_to_gc -#ifdef SMP - && allFreeCapabilities() -#endif - ) { + if (ready_to_gc) { /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1458,9 +1363,6 @@ run_thread: #endif GarbageCollect(GetRoots,rtsFalse); ready_to_gc = rtsFalse; -#ifdef SMP - broadcastCondition(&gc_pending_cond); -#endif #if defined(GRAN) /* add a ContinueThread event to continue execution of current thread */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -1491,73 +1393,104 @@ run_thread: } /* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#ifdef THREADED_RTS + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) +{ +#ifdef THREADED_RTS + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == tso) + return rtsTrue; + } +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -StgInt forkProcess(StgTSO* tso) { +static void +deleteThreadImmediately(StgTSO *tso); +StgInt +forkProcess(HsStablePtr *entry) +{ #ifndef mingw32_TARGET_OS pid_t pid; StgTSO* t,*next; StgMainThread *m; - rtsBool doKill; + SchedulerStatus rc; IF_DEBUG(scheduler,sched_belch("forking!")); + rts_lock(); // This not only acquires sched_mutex, it also + // makes sure that no other threads are running pid = fork(); + if (pid) { /* parent */ /* just return the pid */ + rts_unlock(); + return pid; } else { /* child */ - /* wipe all other threads */ - run_queue_hd = run_queue_tl = tso; - tso->link = END_TSO_QUEUE; - - /* When clearing out the threads, we need to ensure - that a 'main thread' is left behind; if there isn't, - the Scheduler will shutdown next time it is entered. - - ==> we don't kill a thread that's on the main_threads - list (nor the current thread.) - [ Attempts at implementing the more ambitious scheme of - killing the main_threads also, and then adding the - current thread onto the main_threads list if it wasn't - there already, failed -- waitThread() (for one) wasn't - up to it. If it proves to be desirable to also kill - the main threads, then this scheme will have to be - revisited (and fully debugged!) - - -- sof 7/2002 - ] - */ - /* DO NOT TOUCH THE QUEUES directly because most of the code around - us is picky about finding the thread still in its queue when - handling the deleteThread() */ - - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->link; - /* Don't kill the current thread.. */ - if (t->id == tso->id) continue; - doKill=rtsTrue; - /* ..or a main thread */ - for (m = main_threads; m != NULL; m = m->link) { - if (m->tso->id == t->id) { - doKill=rtsFalse; - break; - } + // delete all threads + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + // don't allow threads to catch the ThreadKilled exception + deleteThreadImmediately(t); } - if (doKill) { - deleteThread(t); + + // wipe the main thread list + while((m = main_threads) != NULL) { + main_threads = m->link; +#ifdef THREADED_RTS + closeCondition(&m->bound_thread_cond); +#endif + stgFree(m); } + +#ifdef RTS_SUPPORTS_THREADS + resetTaskManagerAfterFork(); // tell startTask() and friends that + startingWorkerThread = rtsFalse; // we have no worker threads any more + resetWorkerWakeupPipeAfterFork(); +#endif + + rc = rts_evalStableIO(entry, NULL); // run the action + rts_checkSchedStatus("forkProcess",rc); + + rts_unlock(); + + hs_exit(); // clean up and exit + stg_exit(0); } - } - return pid; #else /* mingw32 */ - barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); - /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */ + barf("forkProcess#: primop not implemented for mingw32, sorry!\n"); return -1; #endif /* mingw32 */ } @@ -1571,7 +1504,8 @@ StgInt forkProcess(StgTSO* tso) { * Locks: sched_mutex held. * ------------------------------------------------------------------------- */ -void deleteAllThreads ( void ) +void +deleteAllThreads ( void ) { StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); @@ -1587,9 +1521,6 @@ void deleteAllThreads ( void ) /* startThread and insertThread are now in GranSim.c -- HWL */ -//@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code -//@subsection Suspend and Resume - /* --------------------------------------------------------------------------- * Suspending & resuming Haskell threads. * @@ -1608,18 +1539,19 @@ void deleteAllThreads ( void ) StgInt suspendThread( StgRegTable *reg, rtsBool concCall -#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG) +#if !defined(DEBUG) STG_UNUSED #endif ) { nat tok; Capability *cap; + int saved_errno = errno; /* assume that *reg is a pointer to the StgRegTable part * of a Capability. */ - cap = (Capability *)((void *)reg - sizeof(StgFunTable)); + cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable))); ACQUIRE_LOCK(&sched_mutex); @@ -1655,35 +1587,31 @@ suspendThread( StgRegTable *reg, /* Preparing to leave the RTS, so ensure there's a native thread/task waiting to take over. */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId())); - //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult - startTask(taskStart); - //} + IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); #endif /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; return tok; } StgRegTable * resumeThread( StgInt tok, - rtsBool concCall -#if !defined(RTS_SUPPORTS_THREADS) - STG_UNUSED -#endif - ) + rtsBool concCall STG_UNUSED ) { StgTSO *tso, **prev; Capability *cap; + int saved_errno = errno; #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ ACQUIRE_LOCK(&sched_mutex); - grabReturnCapability(&sched_mutex, &cap); + waitForReturnCapability(&sched_mutex, &cap); - IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId())); + IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok)); #else grabCapability(&cap); #endif @@ -1715,9 +1643,8 @@ resumeThread( StgInt tok, tso->why_blocked = NotBlocked; cap->r.rCurrentTSO = tso; -#if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); -#endif + errno = saved_errno; return &cap->r; } @@ -1765,13 +1692,10 @@ labelThread(StgPtr tso, char *label) /* Caveat: Once set, you can only set the thread name to "" */ len = strlen(label)+1; - buf = malloc(len); - if (buf == NULL) { - fprintf(stderr,"insufficient memory for labelThread!\n"); - } else - strncpy(buf,label,len); + buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); + strncpy(buf,label,len); /* Update will free the old memory for us */ - updateThreadLabel((StgWord)tso,buf); + updateThreadLabel(((StgTSO *)tso)->id,buf); } #endif /* DEBUG */ @@ -1788,7 +1712,6 @@ labelThread(StgPtr tso, char *label) currently pri (priority) is only used in a GRAN setup -- HWL ------------------------------------------------------------------------ */ -//@cindex createThread #if defined(GRAN) /* currently pri (priority) is only used in a GRAN setup -- HWL */ StgTSO * @@ -1849,6 +1772,8 @@ createThread(nat size) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->saved_errno = 0; + tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -1998,7 +1923,6 @@ createSparkThread(rtsSpark spark) ToDo: fix for SMP (needs to acquire SCHED_MUTEX!) */ #if defined(PAR) -//@cindex activateSpark StgTSO * activateSpark (rtsSpark spark) { @@ -2020,10 +1944,8 @@ activateSpark (rtsSpark spark) } #endif -static SchedulerStatus waitThread_(/*out*/StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m, + Capability *initialCapability ); @@ -2065,38 +1987,37 @@ void scheduleThread(StgTSO* tso) } SchedulerStatus -scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) -{ // Precondition: sched_mutex must be held - StgMainThread *m; +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, + Capability *initialCapability) +{ + // Precondition: sched_mutex must be held + StgMainThread *m; - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; - m->ret = ret; - m->stat = NoStatus; + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) - initCondition(&m->wakeup); + initCondition(&m->bound_thread_cond); #endif - /* Put the thread on the main-threads list prior to scheduling the TSO. - Failure to do so introduces a race condition in the MT case (as - identified by Wolfgang Thaller), whereby the new task/OS thread - created by scheduleThread_() would complete prior to the thread - that spawned it managed to put 'itself' on the main-threads list. - The upshot of it all being that the worker thread wouldn't get to - signal the completion of the its work item for the main thread to - see (==> it got stuck waiting.) -- sof 6/02. - */ - IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id)); - - m->link = main_threads; - main_threads = m; + /* Put the thread on the main-threads list prior to scheduling the TSO. + Failure to do so introduces a race condition in the MT case (as + identified by Wolfgang Thaller), whereby the new task/OS thread + created by scheduleThread_() would complete prior to the thread + that spawned it managed to put 'itself' on the main-threads list. + The upshot of it all being that the worker thread wouldn't get to + signal the completion of the its work item for the main thread to + see (==> it got stuck waiting.) -- sof 6/02. + */ + IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id)); + + m->link = main_threads; + main_threads = m; + + scheduleThread_(tso); - scheduleThread_(tso); -#if defined(THREADED_RTS) - return waitThread_(m, rtsTrue); -#else - return waitThread_(m); -#endif + return waitThread_(m, initialCapability); } /* --------------------------------------------------------------------------- @@ -2108,18 +2029,6 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) * * ------------------------------------------------------------------------ */ -#ifdef SMP -static void -term_handler(int sig STG_UNUSED) -{ - stat_workerStop(); - ACQUIRE_LOCK(&term_mutex); - await_death--; - RELEASE_LOCK(&term_mutex); - shutdownThread(); -} -#endif - void initScheduler(void) { @@ -2163,28 +2072,10 @@ initScheduler(void) initCondition(&thread_ready_cond); #endif -#if defined(SMP) - initCondition(&gc_pending_cond); -#endif - #if defined(RTS_SUPPORTS_THREADS) ACQUIRE_LOCK(&sched_mutex); #endif - /* Install the SIGHUP handler */ -#if defined(SMP) - { - struct sigaction action,oact; - - action.sa_handler = term_handler; - sigemptyset(&action.sa_mask); - action.sa_flags = 0; - if (sigaction(SIGTERM, &action, &oact) != 0) { - barf("can't install TERM handler"); - } - } -#endif - /* A capability holds the state a native thread needs in * order to execute STG code. At least one capability is * floating around (only SMP builds have more than one). @@ -2193,20 +2084,14 @@ initScheduler(void) #if defined(RTS_SUPPORTS_THREADS) /* start our haskell execution tasks */ -# if defined(SMP) - startTaskManager(RtsFlags.ParFlags.nNodes, taskStart); -# else startTaskManager(0,taskStart); -# endif #endif #if /* defined(SMP) ||*/ defined(PAR) initSparkPools(); #endif -#if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); -#endif } @@ -2219,259 +2104,48 @@ exitScheduler( void ) shutting_down_scheduler = rtsTrue; } -/* ----------------------------------------------------------------------------- +/* ---------------------------------------------------------------------------- Managing the per-task allocation areas. Each capability comes with an allocation area. These are fixed-length block lists into which allocation can be done. ToDo: no support for two-space collection at the moment??? - -------------------------------------------------------------------------- */ - -/* ----------------------------------------------------------------------------- - * waitThread is the external interface for running a new computation - * and waiting for the result. - * - * In the non-SMP case, we create a new main thread, push it on the - * main-thread stack, and invoke the scheduler to run it. The - * scheduler will return when the top main thread on the stack has - * completed or died, and fill in the necessary fields of the - * main_thread structure. - * - * In the SMP case, we create a main thread as before, but we then - * create a new condition variable and sleep on it. When our new - * main thread has completed, we'll be woken up and the status/result - * will be in the main_thread struct. - * -------------------------------------------------------------------------- */ - -int -howManyThreadsAvail ( void ) -{ - int i = 0; - StgTSO* q; - for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link) - i++; - for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link) - i++; - for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link) - i++; - return i; -} - -void -finishAllThreads ( void ) -{ - do { - while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL); - } - while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); - } - while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); - } - } while - (blocked_queue_hd != END_TSO_QUEUE || - run_queue_hd != END_TSO_QUEUE || - sleeping_queue != END_TSO_QUEUE); -} - -SchedulerStatus -waitThread(StgTSO *tso, /*out*/StgClosure **ret) -{ - StgMainThread *m; - SchedulerStatus stat; - - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; - m->ret = ret; - m->stat = NoStatus; -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&m->wakeup); -#endif - - /* see scheduleWaitThread() comment */ - ACQUIRE_LOCK(&sched_mutex); - m->link = main_threads; - main_threads = m; - - IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id)); -#if defined(THREADED_RTS) - stat = waitThread_(m, rtsFalse); -#else - stat = waitThread_(m); -#endif - RELEASE_LOCK(&sched_mutex); - return stat; -} + ------------------------------------------------------------------------- */ static SchedulerStatus -waitThread_(StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif - ) +waitThread_(StgMainThread* m, Capability *initialCapability) { SchedulerStatus stat; // Precondition: sched_mutex must be held. - IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); - -#if defined(RTS_SUPPORTS_THREADS) + IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id)); -# if defined(THREADED_RTS) - if (!blockWaiting) { - /* In the threaded case, the OS thread that called main() - * gets to enter the RTS directly without going via another - * task/thread. - */ - main_main_thread = m; - RELEASE_LOCK(&sched_mutex); - schedule(); - ACQUIRE_LOCK(&sched_mutex); - main_main_thread = NULL; - ASSERT(m->stat != NoStatus); - } else -# endif - { - do { - waitCondition(&m->wakeup, &sched_mutex); - } while (m->stat == NoStatus); - } -#elif defined(GRAN) +#if defined(GRAN) /* GranSim specific init */ CurrentTSO = m->tso; // the TSO to run procStatus[MainProc] = Busy; // status of main PE CurrentProc = MainProc; // PE to run it on - - RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); #else - RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); ASSERT(m->stat != NoStatus); #endif stat = m->stat; #if defined(RTS_SUPPORTS_THREADS) - closeCondition(&m->wakeup); + closeCondition(&m->bound_thread_cond); #endif - IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", - m->tso->id)); - free(m); + IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id)); + stgFree(m); // Postcondition: sched_mutex still held return stat; } -//@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code -//@subsection Run queue code - -#if 0 -/* - NB: In GranSim we have many run queues; run_queue_hd is actually a macro - unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an - implicit global variable that has to be correct when calling these - fcts -- HWL -*/ - -/* Put the new thread on the head of the runnable queue. - * The caller of createThread better push an appropriate closure - * on this thread's stack before the scheduler is invoked. - */ -static /* inline */ void -add_to_run_queue(tso) -StgTSO* tso; -{ - ASSERT(tso!=run_queue_hd && tso!=run_queue_tl); - tso->link = run_queue_hd; - run_queue_hd = tso; - if (run_queue_tl == END_TSO_QUEUE) { - run_queue_tl = tso; - } -} - -/* Put the new thread at the end of the runnable queue. */ -static /* inline */ void -push_on_run_queue(tso) -StgTSO* tso; -{ - ASSERT(get_itbl((StgClosure *)tso)->type == TSO); - ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL); - ASSERT(tso!=run_queue_hd && tso!=run_queue_tl); - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_hd = tso; - } else { - run_queue_tl->link = tso; - } - run_queue_tl = tso; -} - -/* - Should be inlined because it's used very often in schedule. The tso - argument is actually only needed in GranSim, where we want to have the - possibility to schedule *any* TSO on the run queue, irrespective of the - actual ordering. Therefore, if tso is not the nil TSO then we traverse - the run queue and dequeue the tso, adjusting the links in the queue. -*/ -//@cindex take_off_run_queue -static /* inline */ StgTSO* -take_off_run_queue(StgTSO *tso) { - StgTSO *t, *prev; - - /* - qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq! - - if tso is specified, unlink that tso from the run_queue (doesn't have - to be at the beginning of the queue); GranSim only - */ - if (tso!=END_TSO_QUEUE) { - /* find tso in queue */ - for (t=run_queue_hd, prev=END_TSO_QUEUE; - t!=END_TSO_QUEUE && t!=tso; - prev=t, t=t->link) - /* nothing */ ; - ASSERT(t==tso); - /* now actually dequeue the tso */ - if (prev!=END_TSO_QUEUE) { - ASSERT(run_queue_hd!=t); - prev->link = t->link; - } else { - /* t is at beginning of thread queue */ - ASSERT(run_queue_hd==t); - run_queue_hd = t->link; - } - /* t is at end of thread queue */ - if (t->link==END_TSO_QUEUE) { - ASSERT(t==run_queue_tl); - run_queue_tl = prev; - } else { - ASSERT(run_queue_tl!=t); - } - t->link = END_TSO_QUEUE; - } else { - /* take tso from the beginning of the queue; std concurrent code */ - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; - } - } - } - return t; -} - -#endif /* 0 */ - -//@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code -//@subsection Garbage Collextion Routines - /* --------------------------------------------------------------------------- Where are the roots that we know about? @@ -2489,7 +2163,7 @@ take_off_run_queue(StgTSO *tso) { */ void -GetRoots(evac_fn evac) +GetRoots( evac_fn evac ) { #if defined(GRAN) { @@ -2537,7 +2211,7 @@ GetRoots(evac_fn evac) markSparkQueue(evac); #endif -#ifndef mingw32_TARGET_OS +#if defined(RTS_USER_SIGNALS) // mark the signal handlers (signals should be already blocked) markSignalHandlers(evac); #endif @@ -2622,7 +2296,7 @@ performGCWithRoots(void (*get_roots)(evac_fn)) static StgTSO * threadStackOverflow(StgTSO *tso) { - nat new_stack_size, new_tso_size, diff, stack_words; + nat new_stack_size, new_tso_size, stack_words; StgPtr new_sp; StgTSO *dest; @@ -2630,7 +2304,7 @@ threadStackOverflow(StgTSO *tso) if (tso->stack_size >= tso->max_stack_size) { IF_DEBUG(gc, - belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld", + belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)", tso->id, tso, tso->stack_size, tso->max_stack_size); /* If we're debugging, just print out the top of the stack */ printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, @@ -2651,7 +2325,7 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); @@ -2663,8 +2337,7 @@ threadStackOverflow(StgTSO *tso) memcpy(new_sp, tso->sp, stack_words * sizeof(W_)); /* relocate the stack pointers... */ - diff = (P_)new_sp - (P_)tso->sp; /* In *words* */ - dest->sp = new_sp; + dest->sp = new_sp; dest->stack_size = new_stack_size; /* Mark the old TSO as relocated. We have to check for relocated @@ -2695,20 +2368,17 @@ threadStackOverflow(StgTSO *tso) return dest; } -//@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code -//@subsection Blocking Queue Routines - /* --------------------------------------------------------------------------- Wake up a queue that was blocked on some resource. ------------------------------------------------------------------------ */ #if defined(GRAN) -static inline void +STATIC_INLINE void unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) { } #elif defined(PAR) -static inline void +STATIC_INLINE void unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) { /* write RESUME events to log file and @@ -2844,7 +2514,7 @@ unblockOneLocked(StgTSO *tso) #endif #if defined(GRAN) || defined(PAR) -inline StgBlockingQueueElement * +INLINE_ME StgBlockingQueueElement * unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) { ACQUIRE_LOCK(&sched_mutex); @@ -2853,7 +2523,7 @@ unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) return bqe; } #else -inline StgTSO * +INLINE_ME StgTSO * unblockOne(StgTSO *tso) { ACQUIRE_LOCK(&sched_mutex); @@ -3000,9 +2670,6 @@ awakenBlockedQueue(StgTSO *tso) } #endif -//@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code -//@subsection Exception Handling Routines - /* --------------------------------------------------------------------------- Interrupt execution - usually called inside a signal handler so it mustn't do anything fancy. @@ -3013,6 +2680,9 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; +#ifdef RTS_SUPPORTS_THREADS + wakeBlockedWorkerThread(); +#endif } /* ----------------------------------------------------------------------------- @@ -3108,6 +2778,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { /* take TSO off blocked_queue */ StgBlockingQueueElement *prev = NULL; @@ -3146,7 +2819,7 @@ unblockThread(StgTSO *tso) goto done; } } - barf("unblockThread (I/O): TSO not found"); + barf("unblockThread (delay): TSO not found"); } default: @@ -3235,6 +2908,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { StgTSO *prev = NULL; for (t = blocked_queue_hd; t != END_TSO_QUEUE; @@ -3271,7 +2947,7 @@ unblockThread(StgTSO *tso) goto done; } } - barf("unblockThread (I/O): TSO not found"); + barf("unblockThread (delay): TSO not found"); } default: @@ -3326,6 +3002,22 @@ deleteThread(StgTSO *tso) raiseAsync(tso,NULL); } +static void +deleteThreadImmediately(StgTSO *tso) +{ // for forkProcess only: + // delete thread without giving it a chance to catch the KillThread exception + + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } +#if defined(RTS_SUPPORTS_THREADS) + if (tso->why_blocked != BlockedOnCCall + && tso->why_blocked != BlockedOnCCall_NoUnblockExc) +#endif + unblockThread(tso); + tso->what_next = ThreadKilled; +} + void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) { @@ -3463,7 +3155,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) TICK_ALLOC_UP_THK(words+1,0); IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Updating "); + fprintf(stderr, "sched: Updating "); printPtr((P_)((StgUpdateFrame *)frame)->updatee); fprintf(stderr, " with "); printObj((StgClosure *)ap); @@ -3608,14 +3300,11 @@ detectBlackHoles( void ) } } -//@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code -//@subsection Debugging Routines - -/* ----------------------------------------------------------------------------- +/* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * [Also provides useful information when debugging threaded programs * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02] - -------------------------------------------------------------------------- */ + ------------------------------------------------------------------------- */ static void @@ -3628,6 +3317,11 @@ printThreadBlockage(StgTSO *tso) case BlockedOnWrite: fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd); break; +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: + fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID); + break; +#endif case BlockedOnDelay: fprintf(stderr,"is blocked until %d", tso->block_info.target); break; @@ -3708,7 +3402,7 @@ printAllThreads(void) for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t); - label = lookupThreadLabel((StgWord)t); + label = lookupThreadLabel(t->id); if (label) fprintf(stderr,"[\"%s\"] ",(char *)label); printThreadStatus(t); fprintf(stderr,"\n"); @@ -3720,7 +3414,6 @@ printAllThreads(void) /* Print a whole blocking queue attached to node (debugging only). */ -//@cindex print_bq # if defined(PAR) void print_bq (StgClosure *node) @@ -3885,45 +3578,22 @@ run_queue_len(void) } #endif -static void +void sched_belch(char *s, ...) { va_list ap; va_start(ap,s); -#ifdef SMP - fprintf(stderr, "scheduler (task %ld): ", osThreadId()); +#ifdef RTS_SUPPORTS_THREADS + fprintf(stderr, "sched (task %p): ", osThreadId()); #elif defined(PAR) fprintf(stderr, "== "); #else - fprintf(stderr, "scheduler: "); + fprintf(stderr, "sched: "); #endif vfprintf(stderr, s, ap); fprintf(stderr, "\n"); + fflush(stderr); va_end(ap); } #endif /* DEBUG */ - - -//@node Index, , Debugging Routines, Main scheduling code -//@subsection Index - -//@index -//* StgMainThread:: @cindex\s-+StgMainThread -//* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue -//* blocked_queue_hd:: @cindex\s-+blocked_queue_hd -//* blocked_queue_tl:: @cindex\s-+blocked_queue_tl -//* context_switch:: @cindex\s-+context_switch -//* createThread:: @cindex\s-+createThread -//* gc_pending_cond:: @cindex\s-+gc_pending_cond -//* initScheduler:: @cindex\s-+initScheduler -//* interrupted:: @cindex\s-+interrupted -//* next_thread_id:: @cindex\s-+next_thread_id -//* print_bq:: @cindex\s-+print_bq -//* run_queue_hd:: @cindex\s-+run_queue_hd -//* run_queue_tl:: @cindex\s-+run_queue_tl -//* sched_mutex:: @cindex\s-+sched_mutex -//* schedule:: @cindex\s-+schedule -//* take_off_run_queue:: @cindex\s-+take_off_run_queue -//* term_mutex:: @cindex\s-+term_mutex -//@end index