X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=71c3ec933733518a45574eecf6f77dfedad85730;hb=048304d347f5d18b60d8b346ff2ad9c6666a9b35;hp=74c561bd815758be1662af25f211facc9f49b54b;hpb=dfbf36a9588da7d50467950603fc3385088b4f72;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 74c561b..71c3ec9 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,7 +1,6 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.168 2003/04/08 15:53:51 sof Exp $ * - * (c) The GHC Team, 1998-2000 + * (c) The GHC Team, 1998-2004 * * Scheduler * @@ -10,36 +9,14 @@ * * WAY Name CPP flag What's it for * -------------------------------------- - * mp GUM PAR Parallel execution on a distributed memory machine + * mp GUM PAR Parallel execution on a distrib. memory machine * s SMP SMP Parallel execution on a shared memory machine * mg GranSim GRAN Simulation of parallel execution * md GUM/GdH DIST Distributed execution (based on GUM) * * --------------------------------------------------------------------------*/ -//@node Main scheduling code, , , -//@section Main scheduling code - /* - * Version with scheduler monitor support for SMPs (WAY=s): - - This design provides a high-level API to create and schedule threads etc. - as documented in the SMP design document. - - It uses a monitor design controlled by a single mutex to exercise control - over accesses to shared data structures, and builds on the Posix threads - library. - - The majority of state is shared. In order to keep essential per-task state, - there is a Capability structure, which contains all the information - needed to run a thread: its STG registers, a pointer to its TSO, a - nursery etc. During STG execution, a pointer to the capability is - kept in a register (BaseReg). - - In a non-SMP build, there is one global capability, namely MainRegTable. - - SDM & KH, 10/99 - * Version with support for distributed memory parallelism aka GUM (WAY=mp): The main scheduling loop in GUM iterates until a finish message is received. @@ -59,30 +36,14 @@ over the events in the global event queue. -- HWL */ -//@menu -//* Includes:: -//* Variables and Data structures:: -//* Main scheduling loop:: -//* Suspend and Resume:: -//* Run queue code:: -//* Garbage Collextion Routines:: -//* Blocking Queue Routines:: -//* Exception Handling Routines:: -//* Debugging Routines:: -//* Index:: -//@end menu - -//@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code -//@subsection Includes - #include "PosixSource.h" #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" #include "RtsFlags.h" +#include "BlockAlloc.h" #include "Storage.h" #include "StgRun.h" -#include "StgStartup.h" #include "Hooks.h" #define COMPILING_SCHEDULER #include "Schedule.h" @@ -97,6 +58,8 @@ #include "Timer.h" #include "Prelude.h" #include "ThreadLabels.h" +#include "LdvProfile.h" +#include "Updates.h" #ifdef PROFILING #include "Proftimer.h" #include "ProfHeap.h" @@ -126,23 +89,27 @@ #include #include -//@node Variables and Data structures, Prototypes, Includes, Main scheduling code -//@subsection Variables and Data structures +#ifdef HAVE_ERRNO_H +#include +#endif + +#ifdef THREADED_RTS +#define USED_IN_THREADED_RTS +#else +#define USED_IN_THREADED_RTS STG_UNUSED +#endif + +#ifdef RTS_SUPPORTS_THREADS +#define USED_WHEN_RTS_SUPPORTS_THREADS +#else +#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED +#endif /* Main thread queue. * Locks required: sched_mutex. */ StgMainThread *main_threads = NULL; -#ifdef THREADED_RTS -// Pointer to the thread that executes main -// When this thread is finished, the program terminates -// by calling shutdownHaskellAndExit. -// It would be better to add a call to shutdownHaskellAndExit -// to the Main.main wrapper and to remove this hack. -StgMainThread *main_main_thread = NULL; -#endif - /* Thread queues. * Locks required: sched_mutex. */ @@ -195,17 +162,14 @@ static StgTSO *threadStackOverflow(StgTSO *tso); */ /* flag set by signal handler to precipitate a context switch */ -//@cindex context_switch nat context_switch = 0; /* if this flag is set as well, give up execution */ -//@cindex interrupted rtsBool interrupted = rtsFalse; /* Next thread ID to allocate. * Locks required: thread_id_mutex */ -//@cindex next_thread_id static StgThreadID next_thread_id = 1; /* @@ -217,14 +181,15 @@ static StgThreadID next_thread_id = 1; /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) - * + 1 (the realworld token for an IO thread) * + 1 (the closure to enter) + * + 1 (stg_ap_v_ret) + * + 1 (spare slot req'd by stg_ap_v_ret) * * A thread with this stack will bomb immediately with a stack * overflow, which will increase its stack size. */ -#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) +#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3) #if defined(GRAN) @@ -248,15 +213,11 @@ static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); -static void schedule ( void ); +static void schedule ( StgMainThread *mainThread, Capability *initialCapability ); void interruptStgRts ( void ); static void detectBlackHoles ( void ); -#ifdef DEBUG -static void sched_belch(char *s, ...); -#endif - #if defined(RTS_SUPPORTS_THREADS) /* ToDo: carefully document the invariants that go together * with these synchronisation objects. @@ -264,18 +225,6 @@ static void sched_belch(char *s, ...); Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; -/* - * A heavyweight solution to the problem of protecting - * the thread_id from concurrent update. - */ -Mutex thread_id_mutex = INIT_MUTEX_VAR; - - -# if defined(SMP) -static Condition gc_pending_cond = INIT_COND_VAR; -nat await_death; -# endif - #endif /* RTS_SUPPORTS_THREADS */ #if defined(PAR) @@ -286,6 +235,7 @@ rtsBool emitSchedule = rtsTrue; #if DEBUG static char *whatNext_strs[] = { + "(unknown)", "ThreadRunGHC", "ThreadInterpret", "ThreadKilled", @@ -299,32 +249,44 @@ StgTSO * createSparkThread(rtsSpark spark); StgTSO * activateSpark (rtsSpark spark); #endif -/* - * The thread state for the main thread. -// ToDo: check whether not needed any more -StgTSO *MainTSO; - */ +/* ---------------------------------------------------------------------------- + * Starting Tasks + * ------------------------------------------------------------------------- */ + +#if defined(RTS_SUPPORTS_THREADS) +static rtsBool startingWorkerThread = rtsFalse; -#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) static void taskStart(void); static void taskStart(void) { - schedule(); + ACQUIRE_LOCK(&sched_mutex); + startingWorkerThread = rtsFalse; + schedule(NULL,NULL); + RELEASE_LOCK(&sched_mutex); } -#endif -#if defined(RTS_SUPPORTS_THREADS) void -startSchedulerTask(void) +startSchedulerTaskIfNecessary(void) { - startTask(taskStart); + if(run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE) + { + if(!startingWorkerThread) + { // we don't want to start another worker thread + // just because the last one hasn't yet reached the + // "waiting for capability" state + startingWorkerThread = rtsTrue; + if(!startTask(taskStart)) + { + startingWorkerThread = rtsFalse; + } + } + } } #endif -//@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code -//@subsection Main scheduling loop - /* --------------------------------------------------------------------------- Main scheduling loop. @@ -360,9 +322,9 @@ startSchedulerTask(void) This is not the ugliest code you could imagine, but it's bloody close. ------------------------------------------------------------------------ */ -//@cindex schedule static void -schedule( void ) +schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, + Capability *initialCapability ) { StgTSO *t; Capability *cap; @@ -380,15 +342,24 @@ schedule( void ) # endif #endif rtsBool was_interrupted = rtsFalse; - StgTSOWhatNext prev_what_next; + nat prev_what_next; - ACQUIRE_LOCK(&sched_mutex); - + // Pre-condition: sched_mutex is held. + // We might have a capability, passed in as initialCapability. + cap = initialCapability; + #if defined(RTS_SUPPORTS_THREADS) - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); - IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId())); + // + // in the threaded case, the capability is either passed in via the + // initialCapability parameter, or initialized inside the scheduler + // loop + // + IF_DEBUG(scheduler, + sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)", + mainThread, initialCapability); + ); #else - /* simply initialise it in the non-threaded case */ + // simply initialise it in the non-threaded case grabCapability(&cap); #endif @@ -419,177 +390,55 @@ schedule( void ) while (!receivedFinish) { /* set by processMessages */ /* when receiving PP_FINISH message */ -#else + +#else // everything except GRAN and PAR while (1) { #endif - IF_DEBUG(scheduler, printAllThreads()); + IF_DEBUG(scheduler, printAllThreads()); #if defined(RTS_SUPPORTS_THREADS) - /* Check to see whether there are any worker threads - waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, &cap); + // Yield the capability to higher-priority tasks if necessary. + // + if (cap != NULL) { + yieldCapability(&cap); + } + + // If we do not currently hold a capability, we wait for one + // + if (cap == NULL) { + waitForCapability(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + } + + // We now have a capability... #endif - /* If we're interrupted (the user pressed ^C, or some other - * termination condition occurred), kill all the currently running - * threads. - */ + // + // If we're interrupted (the user pressed ^C, or some other + // termination condition occurred), kill all the currently running + // threads. + // if (interrupted) { - IF_DEBUG(scheduler, sched_belch("interrupted")); - interrupted = rtsFalse; - was_interrupted = rtsTrue; + IF_DEBUG(scheduler, sched_belch("interrupted")); + interrupted = rtsFalse; + was_interrupted = rtsTrue; #if defined(RTS_SUPPORTS_THREADS) - // In the threaded RTS, deadlock detection doesn't work, - // so just exit right away. - prog_belch("interrupted"); - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); + // In the threaded RTS, deadlock detection doesn't work, + // so just exit right away. + prog_belch("interrupted"); + releaseCapability(cap); + RELEASE_LOCK(&sched_mutex); + shutdownHaskellAndExit(EXIT_SUCCESS); #else - deleteAllThreads(); -#endif - } - - /* Go through the list of main threads and wake up any - * clients whose computations have finished. ToDo: this - * should be done more efficiently without a linear scan - * of the main threads list, somehow... - */ -#if defined(RTS_SUPPORTS_THREADS) - { - StgMainThread *m, **prev; - prev = &main_threads; - for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { - switch (m->tso->what_next) { - case ThreadComplete: - if (m->ret) { - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(m->ret) = (StgClosure *)m->tso->sp[1]; - } - *prev = m->link; - m->stat = Success; - broadcastCondition(&m->wakeup); -#ifdef DEBUG - removeThreadLabel((StgWord)m->tso); -#endif - if(m == main_main_thread) - { - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); - } - break; - case ThreadKilled: - if (m->ret) *(m->ret) = NULL; - *prev = m->link; - if (was_interrupted) { - m->stat = Interrupted; - } else { - m->stat = Killed; - } - broadcastCondition(&m->wakeup); -#ifdef DEBUG - removeThreadLabel((StgWord)m->tso); -#endif - if(m == main_main_thread) - { - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); - } - break; - default: - break; - } - } - } - -#else /* not threaded */ - -# if defined(PAR) - /* in GUM do this only on the Main PE */ - if (IAmMainThread) -# endif - /* If our main thread has finished or been killed, return. - */ - { - StgMainThread *m = main_threads; - if (m->tso->what_next == ThreadComplete - || m->tso->what_next == ThreadKilled) { -#ifdef DEBUG - removeThreadLabel((StgWord)m->tso); -#endif - main_threads = main_threads->link; - if (m->tso->what_next == ThreadComplete) { - // We finished successfully, fill in the return value - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; }; - m->stat = Success; - return; - } else { - if (m->ret) { *(m->ret) = NULL; }; - if (was_interrupted) { - m->stat = Interrupted; - } else { - m->stat = Killed; - } - return; - } - } - } + deleteAllThreads(); #endif - - /* Top up the run queue from our spark pool. We try to make the - * number of threads in the run queue equal to the number of - * free capabilities. - * - * Disable spark support in SMP for now, non-essential & requires - * a little bit of work to make it compile cleanly. -- sof 1/02. - */ -#if 0 /* defined(SMP) */ - { - nat n = getFreeCapabilities(); - StgTSO *tso = run_queue_hd; - - /* Count the run queue */ - while (n > 0 && tso != END_TSO_QUEUE) { - tso = tso->link; - n--; - } - - for (; n > 0; n--) { - StgClosure *spark; - spark = findSpark(rtsFalse); - if (spark == NULL) { - break; /* no more sparks in the pool */ - } else { - /* I'd prefer this to be done in activateSpark -- HWL */ - /* tricky - it needs to hold the scheduler lock and - * not try to re-acquire it -- SDM */ - createSparkThread(spark); - IF_DEBUG(scheduler, - sched_belch("==^^ turning spark of closure %p into a thread", - (StgClosure *)spark)); - } - } - /* We need to wake up the other tasks if we just created some - * work for them. - */ - if (getFreeCapabilities() - n > 1) { - signalCondition( &thread_ready_cond ); - } } -#endif // SMP - /* check for signals each time around the scheduler */ #if defined(RTS_USER_SIGNALS) + // check for signals each time around the scheduler if (signals_pending()) { RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); @@ -597,23 +446,20 @@ schedule( void ) } #endif - /* Check whether any waiting threads need to be woken up. If the - * run queue is empty, and there are no other tasks running, we - * can wait indefinitely for something to happen. - */ - if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) -#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) + // + // Check whether any waiting threads need to be woken up. If the + // run queue is empty, and there are no other tasks running, we + // can wait indefinitely for something to happen. + // + if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) +#if defined(RTS_SUPPORTS_THREADS) || EMPTY_RUN_QUEUE() #endif - ) + ) { - awaitEvent( EMPTY_RUN_QUEUE() -#if defined(SMP) - && allFreeCapabilities() -#endif - ); + awaitEvent( EMPTY_RUN_QUEUE() ); } - /* we can be interrupted while waiting for I/O... */ + // we can be interrupted while waiting for I/O... if (interrupted) continue; /* @@ -628,20 +474,9 @@ schedule( void ) * inform all the main threads. */ #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS) - if ( EMPTY_THREAD_QUEUES() -#if defined(RTS_SUPPORTS_THREADS) - && EMPTY_QUEUE(suspended_ccalling_threads) -#endif -#ifdef SMP - && allFreeCapabilities() -#endif - ) + if ( EMPTY_THREAD_QUEUES() ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); -#if defined(THREADED_RTS) - /* and SMP mode ..? */ - releaseCapability(cap); -#endif // Garbage collection can release some new threads due to // either (a) finalizers or (b) threads resurrected because // they are about to be send BlockedOnDeadMVar. Any threads @@ -661,19 +496,7 @@ schedule( void ) * for signals to arrive rather then bombing out with a * deadlock. */ -#if defined(RTS_SUPPORTS_THREADS) - if ( 0 ) { /* hmm..what to do? Simply stop waiting for - a signal with no runnable threads (or I/O - suspended ones) leads nowhere quick. - For now, simply shut down when we reach this - condition. - - ToDo: define precisely under what conditions - the Scheduler should shut down in an MT setting. - */ -#else if ( anyUserHandlers() ) { -#endif IF_DEBUG(scheduler, sched_belch("still deadlocked, waiting for signals...")); @@ -699,89 +522,30 @@ schedule( void ) */ { StgMainThread *m; -#if defined(RTS_SUPPORTS_THREADS) - for (m = main_threads; m != NULL; m = m->link) { - switch (m->tso->why_blocked) { - case BlockedOnBlackHole: - raiseAsync(m->tso, (StgClosure *)NonTermination_closure); - break; - case BlockedOnException: - case BlockedOnMVar: - raiseAsync(m->tso, (StgClosure *)Deadlock_closure); - break; - default: - barf("deadlock: main thread blocked in a strange way"); - } - } -#else m = main_threads; switch (m->tso->why_blocked) { case BlockedOnBlackHole: - raiseAsync(m->tso, (StgClosure *)NonTermination_closure); - break; case BlockedOnException: case BlockedOnMVar: - raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); break; default: barf("deadlock: main thread blocked in a strange way"); } -#endif } - -#if defined(RTS_SUPPORTS_THREADS) - /* ToDo: revisit conditions (and mechanism) for shutting - down a multi-threaded world */ - IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); - RELEASE_LOCK(&sched_mutex); - shutdownHaskell(); - return; -#endif } not_deadlocked: #elif defined(RTS_SUPPORTS_THREADS) - /* ToDo: add deadlock detection in threaded RTS */ + // ToDo: add deadlock detection in threaded RTS #elif defined(PAR) - /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ + // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL #endif -#if defined(SMP) - /* If there's a GC pending, don't do anything until it has - * completed. - */ - if (ready_to_gc) { - IF_DEBUG(scheduler,sched_belch("waiting for GC")); - waitCondition( &gc_pending_cond, &sched_mutex ); - } -#endif - #if defined(RTS_SUPPORTS_THREADS) -#if defined(SMP) - /* block until we've got a thread on the run queue and a free - * capability. - * - */ if ( EMPTY_RUN_QUEUE() ) { - /* Give up our capability */ - releaseCapability(cap); - - /* If we're in the process of shutting down (& running the - * a batch of finalisers), don't wait around. - */ - if ( shutting_down_scheduler ) { - RELEASE_LOCK(&sched_mutex); - return; - } - IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); - waitForWorkCapability(&sched_mutex, &cap, rtsTrue); - IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); + continue; // nothing to do } -#else - if ( EMPTY_RUN_QUEUE() ) { - continue; // nothing to do - } -#endif #endif #if defined(GRAN) @@ -907,7 +671,7 @@ schedule( void ) /* in a GranSim setup the TSO stays on the run queue */ t = CurrentTSO; /* Take a thread from the run queue. */ - t = POP_RUN_QUEUE(); // take_off_run_queue(t); + POP_RUN_QUEUE(t); // take_off_run_queue(t); IF_DEBUG(gran, fprintf(stderr, "GRAN: About to run current thread, which is\n"); @@ -1014,7 +778,7 @@ schedule( void ) ASSERT(run_queue_hd != END_TSO_QUEUE); /* Take a thread from the run queue, if we have work */ - t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE); + POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE); IF_DEBUG(sanity,checkTSO(t)); /* ToDo: write something to the log-file @@ -1058,14 +822,54 @@ schedule( void ) # endif #else /* !GRAN && !PAR */ - /* grab a thread from the run queue */ + // grab a thread from the run queue ASSERT(run_queue_hd != END_TSO_QUEUE); - t = POP_RUN_QUEUE(); + POP_RUN_QUEUE(t); + // Sanity check the thread we're about to run. This can be // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); #endif +#ifdef THREADED_RTS + { + StgMainThread *m = t->main; + + if(m) + { + if(m == mainThread) + { + IF_DEBUG(scheduler, + sched_belch("### Running thread %d in bound thread", t->id)); + // yes, the Haskell thread is bound to the current native thread + } + else + { + IF_DEBUG(scheduler, + sched_belch("### thread %d bound to another OS thread", t->id)); + // no, bound to a different Haskell thread: pass to that thread + PUSH_ON_RUN_QUEUE(t); + passCapability(&m->bound_thread_cond); + continue; + } + } + else + { + if(mainThread != NULL) + // The thread we want to run is bound. + { + IF_DEBUG(scheduler, + sched_belch("### this OS thread cannot run thread %d", t->id)); + // no, the current native thread is bound to a different + // Haskell thread, so pass it to any worker thread + PUSH_ON_RUN_QUEUE(t); + passCapabilityToWorker(); + continue; + } + } + } +#endif + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1077,8 +881,6 @@ schedule( void ) || blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE))) context_switch = 1; - else - context_switch = 0; run_thread: @@ -1095,21 +897,35 @@ run_thread: /* Run the current thread */ prev_what_next = t->what_next; + + errno = t->saved_errno; + switch (prev_what_next) { + case ThreadKilled: case ThreadComplete: /* Thread already finished, return to scheduler. */ ret = ThreadFinished; break; + case ThreadRunGHC: ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); break; + case ThreadInterpret: ret = interpretBCO(cap); break; + default: barf("schedule: invalid what_next field"); } + + // The TSO might have moved, so find the new location: + t = cap->r.rCurrentTSO; + + // And save the current errno in this thread. + t->saved_errno = errno; + /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ /* Costs for the scheduler are assigned to CCS_SYSTEM */ @@ -1121,11 +937,10 @@ run_thread: ACQUIRE_LOCK(&sched_mutex); #ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); + IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); + IF_DEBUG(scheduler,fprintf(stderr,"sched: ");); #endif - t = cap->r.rCurrentTSO; #if defined(PAR) /* HACK 675: if the last thread didn't yield, make sure to print a @@ -1145,12 +960,12 @@ run_thread: #endif // did the task ask for a large block? - if (cap->r.rHpAlloc > BLOCK_SIZE_W) { + if (cap->r.rHpAlloc > BLOCK_SIZE) { // if so, get one and push it on the front of the nursery. bdescr *bd; nat blocks; - blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE; + blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE; IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", t->id, whatNext_strs[t->what_next], blocks)); @@ -1251,7 +1066,6 @@ run_thread: */ threadPaused(t); { - StgMainThread *m; /* enlarge the stack */ StgTSO *new_t = threadStackOverflow(t); @@ -1259,17 +1073,22 @@ run_thread: * main thread stack. It better not be on any other queues... * (it shouldn't be). */ - for (m = main_threads; m != NULL; m = m->link) { - if (m->tso == t) { - m->tso = new_t; - } + if (t->main != NULL) { + t->main->tso = new_t; } - threadPaused(new_t); PUSH_ON_RUN_QUEUE(new_t); } break; case ThreadYielding: + // Reset the context switch flag. We don't do this just before + // running the thread, because that would mean we would lose ticks + // during GC, which can lead to unfair scheduling (a thread hogs + // the CPU because the tick always arrives during GC). This way + // penalises threads that do a lot of allocation, but that seems + // better than the alternative. + context_switch = 0; + #if defined(GRAN) IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t)); @@ -1388,6 +1207,7 @@ run_thread: t->id, whatNext_strs[t->what_next]); printThreadBlockage(t); fprintf(stderr, "\n")); + fflush(stderr); /* Only for dumping event to log file ToDo: do I need this in GranSim, too? @@ -1396,7 +1216,7 @@ run_thread: #endif threadPaused(t); break; - + case ThreadFinished: /* Need to check whether this was a main thread, and if so, signal * the task that started it with the return value. If we have no @@ -1424,8 +1244,74 @@ run_thread: !RtsFlags.ParFlags.ParStats.Suppressed) DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */); #endif + + // + // Check whether the thread that just completed was a main + // thread, and if so return with the result. + // + // There is an assumption here that all thread completion goes + // through this point; we need to make sure that if a thread + // ends up in the ThreadKilled state, that it stays on the run + // queue so it can be dealt with here. + // + if ( +#if defined(RTS_SUPPORTS_THREADS) + mainThread != NULL +#else + mainThread->tso == t +#endif + ) + { + // We are a bound thread: this must be our thread that just + // completed. + ASSERT(mainThread->tso == t); + + if (t->what_next == ThreadComplete) { + if (mainThread->ret) { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; + } + mainThread->stat = Success; + } else { + if (mainThread->ret) { + *(mainThread->ret) = NULL; + } + if (was_interrupted) { + mainThread->stat = Interrupted; + } else { + mainThread->stat = Killed; + } + } +#ifdef DEBUG + removeThreadLabel((StgWord)mainThread->tso->id); +#endif + if (mainThread->prev == NULL) { + main_threads = mainThread->link; + } else { + mainThread->prev->link = mainThread->link; + } + if (mainThread->link != NULL) { + mainThread->link->prev = NULL; + } + releaseCapability(cap); + return; + } + +#ifdef RTS_SUPPORTS_THREADS + ASSERT(t->main == NULL); +#else + if (t->main != NULL) { + // Must be a main thread that is not the topmost one. Leave + // it on the run queue until the stack has unwound to the + // point where we can deal with this. Leaving it on the run + // queue also ensures that the garbage collector knows about + // this thread and its return value (it gets dropped from the + // all_threads list so there's no other way to find it). + APPEND_TO_RUN_QUEUE(t); + } +#endif break; - + default: barf("schedule: invalid thread return code %d", (int)ret); } @@ -1443,11 +1329,7 @@ run_thread: } #endif - if (ready_to_gc -#ifdef SMP - && allFreeCapabilities() -#endif - ) { + if (ready_to_gc) { /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1458,9 +1340,6 @@ run_thread: #endif GarbageCollect(GetRoots,rtsFalse); ready_to_gc = rtsFalse; -#ifdef SMP - broadcastCondition(&gc_pending_cond); -#endif #if defined(GRAN) /* add a ContinueThread event to continue execution of current thread */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -1491,75 +1370,110 @@ run_thread: } /* --------------------------------------------------------------------------- - * Singleton fork(). Do not copy any running threads. + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#ifdef THREADED_RTS + return rtsTrue; +#else + return rtsFalse; +#endif +} -StgInt forkProcess(StgTSO* tso) { +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) +{ +#ifdef THREADED_RTS + return (tso->main != NULL); +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ #ifndef mingw32_TARGET_OS +#define FORKPROCESS_PRIMOP_SUPPORTED +#endif + +#ifdef FORKPROCESS_PRIMOP_SUPPORTED +static void +deleteThreadImmediately(StgTSO *tso); +#endif +StgInt +forkProcess(HsStablePtr *entry +#ifndef FORKPROCESS_PRIMOP_SUPPORTED + STG_UNUSED +#endif + ) +{ +#ifdef FORKPROCESS_PRIMOP_SUPPORTED pid_t pid; StgTSO* t,*next; StgMainThread *m; - rtsBool doKill; + SchedulerStatus rc; IF_DEBUG(scheduler,sched_belch("forking!")); + rts_lock(); // This not only acquires sched_mutex, it also + // makes sure that no other threads are running pid = fork(); + if (pid) { /* parent */ /* just return the pid */ + rts_unlock(); + return pid; } else { /* child */ - /* wipe all other threads */ - run_queue_hd = run_queue_tl = tso; - tso->link = END_TSO_QUEUE; - - /* When clearing out the threads, we need to ensure - that a 'main thread' is left behind; if there isn't, - the Scheduler will shutdown next time it is entered. - - ==> we don't kill a thread that's on the main_threads - list (nor the current thread.) - [ Attempts at implementing the more ambitious scheme of - killing the main_threads also, and then adding the - current thread onto the main_threads list if it wasn't - there already, failed -- waitThread() (for one) wasn't - up to it. If it proves to be desirable to also kill - the main threads, then this scheme will have to be - revisited (and fully debugged!) - - -- sof 7/2002 - ] - */ - /* DO NOT TOUCH THE QUEUES directly because most of the code around - us is picky about finding the thread still in its queue when - handling the deleteThread() */ - - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->link; - /* Don't kill the current thread.. */ - if (t->id == tso->id) continue; - doKill=rtsTrue; - /* ..or a main thread */ - for (m = main_threads; m != NULL; m = m->link) { - if (m->tso->id == t->id) { - doKill=rtsFalse; - break; - } + // delete all threads + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + // don't allow threads to catch the ThreadKilled exception + deleteThreadImmediately(t); } - if (doKill) { - deleteThread(t); + + // wipe the main thread list + while((m = main_threads) != NULL) { + main_threads = m->link; +# ifdef THREADED_RTS + closeCondition(&m->bound_thread_cond); +# endif + stgFree(m); } + +# ifdef RTS_SUPPORTS_THREADS + resetTaskManagerAfterFork(); // tell startTask() and friends that + startingWorkerThread = rtsFalse; // we have no worker threads any more + resetWorkerWakeupPipeAfterFork(); +# endif + + rc = rts_evalStableIO(entry, NULL); // run the action + rts_checkSchedStatus("forkProcess",rc); + + rts_unlock(); + + hs_exit(); // clean up and exit + stg_exit(0); } - } - return pid; -#else /* mingw32 */ - barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); - /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */ +#else /* !FORKPROCESS_PRIMOP_SUPPORTED */ + barf("forkProcess#: primop not supported, sorry!\n"); return -1; -#endif /* mingw32 */ +#endif } /* --------------------------------------------------------------------------- @@ -1571,7 +1485,8 @@ StgInt forkProcess(StgTSO* tso) { * Locks: sched_mutex held. * ------------------------------------------------------------------------- */ -void deleteAllThreads ( void ) +void +deleteAllThreads ( void ) { StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); @@ -1579,17 +1494,20 @@ void deleteAllThreads ( void ) next = t->global_link; deleteThread(t); } - run_queue_hd = run_queue_tl = END_TSO_QUEUE; - blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; - sleeping_queue = END_TSO_QUEUE; + + // The run queue now contains a bunch of ThreadKilled threads. We + // must not throw these away: the main thread(s) will be in there + // somewhere, and the main scheduler loop has to deal with it. + // Also, the run queue is the only thing keeping these threads from + // being GC'd, and we don't want the "main thread has been GC'd" panic. + + ASSERT(blocked_queue_hd == END_TSO_QUEUE); + ASSERT(sleeping_queue == END_TSO_QUEUE); } /* startThread and insertThread are now in GranSim.c -- HWL */ -//@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code -//@subsection Suspend and Resume - /* --------------------------------------------------------------------------- * Suspending & resuming Haskell threads. * @@ -1606,25 +1524,21 @@ void deleteAllThreads ( void ) * ------------------------------------------------------------------------- */ StgInt -suspendThread( StgRegTable *reg, - rtsBool concCall -#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG) - STG_UNUSED -#endif - ) +suspendThread( StgRegTable *reg ) { nat tok; Capability *cap; + int saved_errno = errno; /* assume that *reg is a pointer to the StgRegTable part * of a Capability. */ - cap = (Capability *)((void *)reg - sizeof(StgFunTable)); + cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable))); ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall)); + sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); // XXX this might not be necessary --SDM cap->r.rCurrentTSO->what_next = ThreadRunGHC; @@ -1633,17 +1547,12 @@ suspendThread( StgRegTable *reg, cap->r.rCurrentTSO->link = suspended_ccalling_threads; suspended_ccalling_threads = cap->r.rCurrentTSO; -#if defined(RTS_SUPPORTS_THREADS) - if(cap->r.rCurrentTSO->blocked_exceptions == NULL) - { + if(cap->r.rCurrentTSO->blocked_exceptions == NULL) { cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE; - } - else - { + } else { cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc; } -#endif /* Use the thread ID as the token; it should be unique */ tok = cap->r.rCurrentTSO->id; @@ -1655,31 +1564,30 @@ suspendThread( StgRegTable *reg, /* Preparing to leave the RTS, so ensure there's a native thread/task waiting to take over. */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId())); - //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult - startTask(taskStart); - //} + IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); #endif /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; return tok; } StgRegTable * -resumeThread( StgInt tok, - rtsBool concCall STG_UNUSED ) +resumeThread( StgInt tok ) { StgTSO *tso, **prev; Capability *cap; + int saved_errno = errno; #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ ACQUIRE_LOCK(&sched_mutex); - grabReturnCapability(&sched_mutex, &cap); + waitForReturnCapability(&sched_mutex, &cap); - IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId())); + IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok)); #else grabCapability(&cap); #endif @@ -1699,21 +1607,17 @@ resumeThread( StgInt tok, } tso->link = END_TSO_QUEUE; -#if defined(RTS_SUPPORTS_THREADS) - if(tso->why_blocked == BlockedOnCCall) - { + if(tso->why_blocked == BlockedOnCCall) { awakenBlockedQueueNoLock(tso->blocked_exceptions); tso->blocked_exceptions = NULL; } -#endif /* Reset blocking status */ tso->why_blocked = NotBlocked; cap->r.rCurrentTSO = tso; -#if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); -#endif + errno = saved_errno; return &cap->r; } @@ -1764,7 +1668,7 @@ labelThread(StgPtr tso, char *label) buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); strncpy(buf,label,len); /* Update will free the old memory for us */ - updateThreadLabel((StgWord)tso,buf); + updateThreadLabel(((StgTSO *)tso)->id,buf); } #endif /* DEBUG */ @@ -1781,7 +1685,6 @@ labelThread(StgPtr tso, char *label) currently pri (priority) is only used in a GRAN setup -- HWL ------------------------------------------------------------------------ */ -//@cindex createThread #if defined(GRAN) /* currently pri (priority) is only used in a GRAN setup -- HWL */ StgTSO * @@ -1831,17 +1734,13 @@ createThread(nat size) // Always start with the compiled code evaluator tso->what_next = ThreadRunGHC; - /* tso->id needs to be unique. For now we use a heavyweight mutex to - * protect the increment operation on next_thread_id. - * In future, we could use an atomic increment instead. - */ - ACQUIRE_LOCK(&thread_id_mutex); tso->id = next_thread_id++; - RELEASE_LOCK(&thread_id_mutex); - tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->saved_errno = 0; + tso->main = NULL; + tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -1854,9 +1753,10 @@ createThread(nat size) /* put a stop frame on the stack */ tso->sp -= sizeofW(StgStopFrame); SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); + tso->link = END_TSO_QUEUE; + // ToDo: check this #if defined(GRAN) - tso->link = END_TSO_QUEUE; /* uses more flexible routine in GranSim */ insertThread(tso, CurrentProc); #else @@ -1991,7 +1891,6 @@ createSparkThread(rtsSpark spark) ToDo: fix for SMP (needs to acquire SCHED_MUTEX!) */ #if defined(PAR) -//@cindex activateSpark StgTSO * activateSpark (rtsSpark spark) { @@ -2013,10 +1912,8 @@ activateSpark (rtsSpark spark) } #endif -static SchedulerStatus waitThread_(/*out*/StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m, + Capability *initialCapability ); @@ -2036,60 +1933,74 @@ void scheduleThread_(StgTSO *tso) { // Precondition: sched_mutex must be held. - - /* Put the new thread on the head of the runnable queue. The caller - * better push an appropriate closure on this thread's stack - * beforehand. In the SMP case, the thread may start running as - * soon as we release the scheduler lock below. - */ - PUSH_ON_RUN_QUEUE(tso); + // The thread goes at the *end* of the run-queue, to avoid possible + // starvation of any threads already on the queue. + APPEND_TO_RUN_QUEUE(tso); THREAD_RUNNABLE(); - -#if 0 - IF_DEBUG(scheduler,printTSO(tso)); -#endif } -void scheduleThread(StgTSO* tso) +void +scheduleThread(StgTSO* tso) { ACQUIRE_LOCK(&sched_mutex); scheduleThread_(tso); RELEASE_LOCK(&sched_mutex); } +#if defined(RTS_SUPPORTS_THREADS) +static Condition bound_cond_cache; +static int bound_cond_cache_full = 0; +#endif + + SchedulerStatus -scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) -{ // Precondition: sched_mutex must be held - StgMainThread *m; +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, + Capability *initialCapability) +{ + // Precondition: sched_mutex must be held + StgMainThread *m; + + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + m->tso = tso; + tso->main = m; + m->ret = ret; + m->stat = NoStatus; + m->link = main_threads; + m->prev = NULL; + if (main_threads != NULL) { + main_threads->prev = m; + } + main_threads = m; - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; - m->ret = ret; - m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) - initCondition(&m->wakeup); + // Allocating a new condition for each thread is expensive, so we + // cache one. This is a pretty feeble hack, but it helps speed up + // consecutive call-ins quite a bit. + if (bound_cond_cache_full) { + m->bound_thread_cond = bound_cond_cache; + bound_cond_cache_full = 0; + } else { + initCondition(&m->bound_thread_cond); + } #endif - /* Put the thread on the main-threads list prior to scheduling the TSO. - Failure to do so introduces a race condition in the MT case (as - identified by Wolfgang Thaller), whereby the new task/OS thread - created by scheduleThread_() would complete prior to the thread - that spawned it managed to put 'itself' on the main-threads list. - The upshot of it all being that the worker thread wouldn't get to - signal the completion of the its work item for the main thread to - see (==> it got stuck waiting.) -- sof 6/02. - */ - IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id)); - - m->link = main_threads; - main_threads = m; + /* Put the thread on the main-threads list prior to scheduling the TSO. + Failure to do so introduces a race condition in the MT case (as + identified by Wolfgang Thaller), whereby the new task/OS thread + created by scheduleThread_() would complete prior to the thread + that spawned it managed to put 'itself' on the main-threads list. + The upshot of it all being that the worker thread wouldn't get to + signal the completion of the its work item for the main thread to + see (==> it got stuck waiting.) -- sof 6/02. + */ + IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id)); + + APPEND_TO_RUN_QUEUE(tso); + // NB. Don't call THREAD_RUNNABLE() here, because the thread is + // bound and only runnable by *this* OS thread, so waking up other + // workers will just slow things down. - scheduleThread_(tso); -#if defined(THREADED_RTS) - return waitThread_(m, rtsTrue); -#else - return waitThread_(m); -#endif + return waitThread_(m, initialCapability); } /* --------------------------------------------------------------------------- @@ -2101,18 +2012,6 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) * * ------------------------------------------------------------------------ */ -#ifdef SMP -static void -term_handler(int sig STG_UNUSED) -{ - stat_workerStop(); - ACQUIRE_LOCK(&term_mutex); - await_death--; - RELEASE_LOCK(&term_mutex); - shutdownThread(); -} -#endif - void initScheduler(void) { @@ -2151,32 +2050,9 @@ initScheduler(void) * the scheduler. */ initMutex(&sched_mutex); initMutex(&term_mutex); - initMutex(&thread_id_mutex); - - initCondition(&thread_ready_cond); #endif -#if defined(SMP) - initCondition(&gc_pending_cond); -#endif - -#if defined(RTS_SUPPORTS_THREADS) ACQUIRE_LOCK(&sched_mutex); -#endif - - /* Install the SIGHUP handler */ -#if defined(SMP) - { - struct sigaction action,oact; - - action.sa_handler = term_handler; - sigemptyset(&action.sa_mask); - action.sa_flags = 0; - if (sigaction(SIGTERM, &action, &oact) != 0) { - barf("can't install TERM handler"); - } - } -#endif /* A capability holds the state a native thread needs in * order to execute STG code. At least one capability is @@ -2186,21 +2062,14 @@ initScheduler(void) #if defined(RTS_SUPPORTS_THREADS) /* start our haskell execution tasks */ -# if defined(SMP) - startTaskManager(RtsFlags.ParFlags.nNodes, taskStart); -# else startTaskManager(0,taskStart); -# endif #endif #if /* defined(SMP) ||*/ defined(PAR) initSparkPools(); #endif -#if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); -#endif - } void @@ -2212,259 +2081,54 @@ exitScheduler( void ) shutting_down_scheduler = rtsTrue; } -/* ----------------------------------------------------------------------------- +/* ---------------------------------------------------------------------------- Managing the per-task allocation areas. Each capability comes with an allocation area. These are fixed-length block lists into which allocation can be done. ToDo: no support for two-space collection at the moment??? - -------------------------------------------------------------------------- */ - -/* ----------------------------------------------------------------------------- - * waitThread is the external interface for running a new computation - * and waiting for the result. - * - * In the non-SMP case, we create a new main thread, push it on the - * main-thread stack, and invoke the scheduler to run it. The - * scheduler will return when the top main thread on the stack has - * completed or died, and fill in the necessary fields of the - * main_thread structure. - * - * In the SMP case, we create a main thread as before, but we then - * create a new condition variable and sleep on it. When our new - * main thread has completed, we'll be woken up and the status/result - * will be in the main_thread struct. - * -------------------------------------------------------------------------- */ - -int -howManyThreadsAvail ( void ) -{ - int i = 0; - StgTSO* q; - for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link) - i++; - for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link) - i++; - for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link) - i++; - return i; -} - -void -finishAllThreads ( void ) -{ - do { - while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL); - } - while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); - } - while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); - } - } while - (blocked_queue_hd != END_TSO_QUEUE || - run_queue_hd != END_TSO_QUEUE || - sleeping_queue != END_TSO_QUEUE); -} - -SchedulerStatus -waitThread(StgTSO *tso, /*out*/StgClosure **ret) -{ - StgMainThread *m; - SchedulerStatus stat; - - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; - m->ret = ret; - m->stat = NoStatus; -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&m->wakeup); -#endif - - /* see scheduleWaitThread() comment */ - ACQUIRE_LOCK(&sched_mutex); - m->link = main_threads; - main_threads = m; - - IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id)); -#if defined(THREADED_RTS) - stat = waitThread_(m, rtsFalse); -#else - stat = waitThread_(m); -#endif - RELEASE_LOCK(&sched_mutex); - return stat; -} + ------------------------------------------------------------------------- */ static SchedulerStatus -waitThread_(StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif - ) +waitThread_(StgMainThread* m, Capability *initialCapability) { SchedulerStatus stat; // Precondition: sched_mutex must be held. - IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); + IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id)); -#if defined(RTS_SUPPORTS_THREADS) - -# if defined(THREADED_RTS) - if (!blockWaiting) { - /* In the threaded case, the OS thread that called main() - * gets to enter the RTS directly without going via another - * task/thread. - */ - main_main_thread = m; - RELEASE_LOCK(&sched_mutex); - schedule(); - ACQUIRE_LOCK(&sched_mutex); - main_main_thread = NULL; - ASSERT(m->stat != NoStatus); - } else -# endif - { - do { - waitCondition(&m->wakeup, &sched_mutex); - } while (m->stat == NoStatus); - } -#elif defined(GRAN) +#if defined(GRAN) /* GranSim specific init */ CurrentTSO = m->tso; // the TSO to run procStatus[MainProc] = Busy; // status of main PE CurrentProc = MainProc; // PE to run it on - - RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); #else - RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); ASSERT(m->stat != NoStatus); #endif stat = m->stat; #if defined(RTS_SUPPORTS_THREADS) - closeCondition(&m->wakeup); + // Free the condition variable, returning it to the cache if possible. + if (!bound_cond_cache_full) { + bound_cond_cache = m->bound_thread_cond; + bound_cond_cache_full = 1; + } else { + closeCondition(&m->bound_thread_cond); + } #endif - IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", - m->tso->id)); + IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id)); stgFree(m); // Postcondition: sched_mutex still held return stat; } -//@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code -//@subsection Run queue code - -#if 0 -/* - NB: In GranSim we have many run queues; run_queue_hd is actually a macro - unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an - implicit global variable that has to be correct when calling these - fcts -- HWL -*/ - -/* Put the new thread on the head of the runnable queue. - * The caller of createThread better push an appropriate closure - * on this thread's stack before the scheduler is invoked. - */ -static /* inline */ void -add_to_run_queue(tso) -StgTSO* tso; -{ - ASSERT(tso!=run_queue_hd && tso!=run_queue_tl); - tso->link = run_queue_hd; - run_queue_hd = tso; - if (run_queue_tl == END_TSO_QUEUE) { - run_queue_tl = tso; - } -} - -/* Put the new thread at the end of the runnable queue. */ -static /* inline */ void -push_on_run_queue(tso) -StgTSO* tso; -{ - ASSERT(get_itbl((StgClosure *)tso)->type == TSO); - ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL); - ASSERT(tso!=run_queue_hd && tso!=run_queue_tl); - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_hd = tso; - } else { - run_queue_tl->link = tso; - } - run_queue_tl = tso; -} - -/* - Should be inlined because it's used very often in schedule. The tso - argument is actually only needed in GranSim, where we want to have the - possibility to schedule *any* TSO on the run queue, irrespective of the - actual ordering. Therefore, if tso is not the nil TSO then we traverse - the run queue and dequeue the tso, adjusting the links in the queue. -*/ -//@cindex take_off_run_queue -static /* inline */ StgTSO* -take_off_run_queue(StgTSO *tso) { - StgTSO *t, *prev; - - /* - qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq! - - if tso is specified, unlink that tso from the run_queue (doesn't have - to be at the beginning of the queue); GranSim only - */ - if (tso!=END_TSO_QUEUE) { - /* find tso in queue */ - for (t=run_queue_hd, prev=END_TSO_QUEUE; - t!=END_TSO_QUEUE && t!=tso; - prev=t, t=t->link) - /* nothing */ ; - ASSERT(t==tso); - /* now actually dequeue the tso */ - if (prev!=END_TSO_QUEUE) { - ASSERT(run_queue_hd!=t); - prev->link = t->link; - } else { - /* t is at beginning of thread queue */ - ASSERT(run_queue_hd==t); - run_queue_hd = t->link; - } - /* t is at end of thread queue */ - if (t->link==END_TSO_QUEUE) { - ASSERT(t==run_queue_tl); - run_queue_tl = prev; - } else { - ASSERT(run_queue_tl!=t); - } - t->link = END_TSO_QUEUE; - } else { - /* take tso from the beginning of the queue; std concurrent code */ - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; - } - } - } - return t; -} - -#endif /* 0 */ - -//@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code -//@subsection Garbage Collextion Routines - /* --------------------------------------------------------------------------- Where are the roots that we know about? @@ -2482,7 +2146,7 @@ take_off_run_queue(StgTSO *tso) { */ void -GetRoots(evac_fn evac) +GetRoots( evac_fn evac ) { #if defined(GRAN) { @@ -2534,25 +2198,6 @@ GetRoots(evac_fn evac) // mark the signal handlers (signals should be already blocked) markSignalHandlers(evac); #endif - - // main threads which have completed need to be retained until they - // are dealt with in the main scheduler loop. They won't be - // retained any other way: the GC will drop them from the - // all_threads list, so we have to be careful to treat them as roots - // here. - { - StgMainThread *m; - for (m = main_threads; m != NULL; m = m->link) { - switch (m->tso->what_next) { - case ThreadComplete: - case ThreadKilled: - evac((StgClosure **)&m->tso); - break; - default: - break; - } - } - } } /* ----------------------------------------------------------------------------- @@ -2623,7 +2268,7 @@ threadStackOverflow(StgTSO *tso) if (tso->stack_size >= tso->max_stack_size) { IF_DEBUG(gc, - belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld", + belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)", tso->id, tso, tso->stack_size, tso->max_stack_size); /* If we're debugging, just print out the top of the stack */ printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, @@ -2644,7 +2289,7 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); @@ -2687,20 +2332,17 @@ threadStackOverflow(StgTSO *tso) return dest; } -//@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code -//@subsection Blocking Queue Routines - /* --------------------------------------------------------------------------- Wake up a queue that was blocked on some resource. ------------------------------------------------------------------------ */ #if defined(GRAN) -static inline void +STATIC_INLINE void unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) { } #elif defined(PAR) -static inline void +STATIC_INLINE void unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) { /* write RESUME events to log file and @@ -2783,8 +2425,8 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked); /* if it's a TSO just push it onto the run_queue */ next = bqe->link; - // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging? - PUSH_ON_RUN_QUEUE((StgTSO *)bqe); + ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging? + APPEND_TO_RUN_QUEUE((StgTSO *)bqe); THREAD_RUNNABLE(); unblockCount(bqe, node); /* reset blocking status after dumping event */ @@ -2828,7 +2470,8 @@ unblockOneLocked(StgTSO *tso) ASSERT(tso->why_blocked != NotBlocked); tso->why_blocked = NotBlocked; next = tso->link; - PUSH_ON_RUN_QUEUE(tso); + tso->link = END_TSO_QUEUE; + APPEND_TO_RUN_QUEUE(tso); THREAD_RUNNABLE(); IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id)); return next; @@ -2836,7 +2479,7 @@ unblockOneLocked(StgTSO *tso) #endif #if defined(GRAN) || defined(PAR) -inline StgBlockingQueueElement * +INLINE_ME StgBlockingQueueElement * unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) { ACQUIRE_LOCK(&sched_mutex); @@ -2845,7 +2488,7 @@ unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) return bqe; } #else -inline StgTSO * +INLINE_ME StgTSO * unblockOne(StgTSO *tso) { ACQUIRE_LOCK(&sched_mutex); @@ -2971,7 +2614,6 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) #else /* !GRAN && !PAR */ -#ifdef RTS_SUPPORTS_THREADS void awakenBlockedQueueNoLock(StgTSO *tso) { @@ -2979,7 +2621,6 @@ awakenBlockedQueueNoLock(StgTSO *tso) tso = unblockOneLocked(tso); } } -#endif void awakenBlockedQueue(StgTSO *tso) @@ -2992,9 +2633,6 @@ awakenBlockedQueue(StgTSO *tso) } #endif -//@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code -//@subsection Exception Handling Routines - /* --------------------------------------------------------------------------- Interrupt execution - usually called inside a signal handler so it mustn't do anything fancy. @@ -3005,6 +2643,9 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; +#ifdef RTS_SUPPORTS_THREADS + wakeBlockedWorkerThread(); +#endif } /* ----------------------------------------------------------------------------- @@ -3100,6 +2741,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { /* take TSO off blocked_queue */ StgBlockingQueueElement *prev = NULL; @@ -3138,7 +2782,7 @@ unblockThread(StgTSO *tso) goto done; } } - barf("unblockThread (I/O): TSO not found"); + barf("unblockThread (delay): TSO not found"); } default: @@ -3227,6 +2871,9 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: +#endif { StgTSO *prev = NULL; for (t = blocked_queue_hd; t != END_TSO_QUEUE; @@ -3263,7 +2910,7 @@ unblockThread(StgTSO *tso) goto done; } } - barf("unblockThread (I/O): TSO not found"); + barf("unblockThread (delay): TSO not found"); } default: @@ -3274,7 +2921,7 @@ unblockThread(StgTSO *tso) tso->link = END_TSO_QUEUE; tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; - PUSH_ON_RUN_QUEUE(tso); + APPEND_TO_RUN_QUEUE(tso); } #endif @@ -3318,6 +2965,25 @@ deleteThread(StgTSO *tso) raiseAsync(tso,NULL); } +#ifdef FORKPROCESS_PRIMOP_SUPPORTED +static void +deleteThreadImmediately(StgTSO *tso) +{ // for forkProcess only: + // delete thread without giving it a chance to catch the KillThread exception + + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } + + if (tso->why_blocked != BlockedOnCCall && + tso->why_blocked != BlockedOnCCall_NoUnblockExc) { + unblockThread(tso); + } + + tso->what_next = ThreadKilled; +} +#endif + void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) { @@ -3455,7 +3121,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) TICK_ALLOC_UP_THK(words+1,0); IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Updating "); + fprintf(stderr, "sched: Updating "); printPtr((P_)((StgUpdateFrame *)frame)->updatee); fprintf(stderr, " with "); printObj((StgClosure *)ap); @@ -3475,7 +3141,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception) // if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) { // revert the black hole - UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap); + UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee, + (StgClosure *)ap); } sp += sizeofW(StgUpdateFrame) - 1; sp[0] = (W_)ap; // push onto stack @@ -3497,6 +3164,77 @@ raiseAsync(StgTSO *tso, StgClosure *exception) } /* ----------------------------------------------------------------------------- + raiseExceptionHelper + + This function is called by the raise# primitve, just so that we can + move some of the tricky bits of raising an exception from C-- into + C. Who knows, it might be a useful re-useable thing here too. + -------------------------------------------------------------------------- */ + +StgWord +raiseExceptionHelper (StgTSO *tso, StgClosure *exception) +{ + StgClosure *raise_closure = NULL; + StgPtr p, next; + StgRetInfoTable *info; + // + // This closure represents the expression 'raise# E' where E + // is the exception raise. It is used to overwrite all the + // thunks which are currently under evaluataion. + // + + // + // LDV profiling: stg_raise_info has THUNK as its closure + // type. Since a THUNK takes at least MIN_UPD_SIZE words in its + // payload, MIN_UPD_SIZE is more approprate than 1. It seems that + // 1 does not cause any problem unless profiling is performed. + // However, when LDV profiling goes on, we need to linearly scan + // small object pool, where raise_closure is stored, so we should + // use MIN_UPD_SIZE. + // + // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate, + // sizeofW(StgClosure)+1); + // + + // + // Walk up the stack, looking for the catch frame. On the way, + // we update any closures pointed to from update frames with the + // raise closure that we just built. + // + p = tso->sp; + while(1) { + info = get_ret_itbl((StgClosure *)p); + next = p + stack_frame_sizeW((StgClosure *)p); + switch (info->i.type) { + + case UPDATE_FRAME: + // Only create raise_closure if we need to. + if (raise_closure == NULL) { + raise_closure = + (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE); + SET_HDR(raise_closure, &stg_raise_info, CCCS); + raise_closure->payload[0] = exception; + } + UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure); + p = next; + continue; + + case CATCH_FRAME: + tso->sp = p; + return CATCH_FRAME; + + case STOP_FRAME: + tso->sp = p; + return STOP_FRAME; + + default: + p = next; + continue; + } + } +} + +/* ----------------------------------------------------------------------------- resurrectThreads is called after garbage collection on the list of threads found to be garbage. Each of these threads will be woken up and sent a signal: BlockedOnDeadMVar if the thread was blocked @@ -3600,14 +3338,11 @@ detectBlackHoles( void ) } } -//@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code -//@subsection Debugging Routines - -/* ----------------------------------------------------------------------------- +/* ---------------------------------------------------------------------------- * Debugging: why is a thread blocked * [Also provides useful information when debugging threaded programs * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02] - -------------------------------------------------------------------------- */ + ------------------------------------------------------------------------- */ static void @@ -3620,6 +3355,11 @@ printThreadBlockage(StgTSO *tso) case BlockedOnWrite: fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd); break; +#if defined(mingw32_TARGET_OS) + case BlockedOnDoProc: + fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID); + break; +#endif case BlockedOnDelay: fprintf(stderr,"is blocked until %d", tso->block_info.target); break; @@ -3646,14 +3386,12 @@ printThreadBlockage(StgTSO *tso) tso->block_info.closure, info_type(tso->block_info.closure)); break; #endif -#if defined(RTS_SUPPORTS_THREADS) case BlockedOnCCall: fprintf(stderr,"is blocked on an external call"); break; case BlockedOnCCall_NoUnblockExc: fprintf(stderr,"is blocked on an external call (exceptions were already blocked)"); break; -#endif default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", tso->why_blocked, tso->id, tso); @@ -3700,7 +3438,7 @@ printAllThreads(void) for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t); - label = lookupThreadLabel((StgWord)t); + label = lookupThreadLabel(t->id); if (label) fprintf(stderr,"[\"%s\"] ",(char *)label); printThreadStatus(t); fprintf(stderr,"\n"); @@ -3712,7 +3450,6 @@ printAllThreads(void) /* Print a whole blocking queue attached to node (debugging only). */ -//@cindex print_bq # if defined(PAR) void print_bq (StgClosure *node) @@ -3877,45 +3614,22 @@ run_queue_len(void) } #endif -static void +void sched_belch(char *s, ...) { va_list ap; va_start(ap,s); -#ifdef SMP - fprintf(stderr, "scheduler (task %ld): ", osThreadId()); +#ifdef RTS_SUPPORTS_THREADS + fprintf(stderr, "sched (task %p): ", osThreadId()); #elif defined(PAR) fprintf(stderr, "== "); #else - fprintf(stderr, "scheduler: "); + fprintf(stderr, "sched: "); #endif vfprintf(stderr, s, ap); fprintf(stderr, "\n"); + fflush(stderr); va_end(ap); } #endif /* DEBUG */ - - -//@node Index, , Debugging Routines, Main scheduling code -//@subsection Index - -//@index -//* StgMainThread:: @cindex\s-+StgMainThread -//* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue -//* blocked_queue_hd:: @cindex\s-+blocked_queue_hd -//* blocked_queue_tl:: @cindex\s-+blocked_queue_tl -//* context_switch:: @cindex\s-+context_switch -//* createThread:: @cindex\s-+createThread -//* gc_pending_cond:: @cindex\s-+gc_pending_cond -//* initScheduler:: @cindex\s-+initScheduler -//* interrupted:: @cindex\s-+interrupted -//* next_thread_id:: @cindex\s-+next_thread_id -//* print_bq:: @cindex\s-+print_bq -//* run_queue_hd:: @cindex\s-+run_queue_hd -//* run_queue_tl:: @cindex\s-+run_queue_tl -//* sched_mutex:: @cindex\s-+sched_mutex -//* schedule:: @cindex\s-+schedule -//* take_off_run_queue:: @cindex\s-+take_off_run_queue -//* term_mutex:: @cindex\s-+term_mutex -//@end index