X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=1afc9fe067cbf499a862cb71887658b2796b366b;hb=1f730da425bb17f8c80f0fb3d576a37823558565;hp=d8491ae0b6c6ea264834b8dd37e17c58c53674ea;hpb=d1d8706d2ebfb8898ba86977420b89b9854d5213;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index d8491ae..1afc9fe 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,20 +1,27 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.85 2000/12/19 16:38:15 sewardj Exp $ + * $Id: Schedule.c,v 1.170 2003/06/19 10:35:37 simonmar Exp $ * * (c) The GHC Team, 1998-2000 * * Scheduler * - * The main scheduling code in GranSim is quite different from that in std - * (concurrent) Haskell: while concurrent Haskell just iterates over the - * threads in the runnable queue, GranSim is event driven, i.e. it iterates - * over the events in the global event queue. -- HWL + * Different GHC ways use this scheduler quite differently (see comments below) + * Here is the global picture: + * + * WAY Name CPP flag What's it for + * -------------------------------------- + * mp GUM PAR Parallel execution on a distributed memory machine + * s SMP SMP Parallel execution on a shared memory machine + * mg GranSim GRAN Simulation of parallel execution + * md GUM/GdH DIST Distributed execution (based on GUM) + * * --------------------------------------------------------------------------*/ //@node Main scheduling code, , , //@section Main scheduling code -/* Version with scheduler monitor support for SMPs. +/* + * Version with scheduler monitor support for SMPs (WAY=s): This design provides a high-level API to create and schedule threads etc. as documented in the SMP design document. @@ -32,6 +39,24 @@ In a non-SMP build, there is one global capability, namely MainRegTable. SDM & KH, 10/99 + + * Version with support for distributed memory parallelism aka GUM (WAY=mp): + + The main scheduling loop in GUM iterates until a finish message is received. + In that case a global flag @receivedFinish@ is set and this instance of + the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages() + for the handling of incoming messages, such as PP_FINISH. + Note that in the parallel case we have a system manager that coordinates + different PEs, each of which are running one instance of the RTS. + See ghc/rts/parallel/SysMan.c for the main routine of the parallel program. + From this routine processes executing ghc/rts/Main.c are spawned. -- HWL + + * Version with support for simulating parallel execution aka GranSim (WAY=mg): + + The main scheduling code in GranSim is quite different from that in std + (concurrent) Haskell: while concurrent Haskell just iterates over the + threads in the runnable queue, GranSim is event driven, i.e. it iterates + over the events in the global event queue. -- HWL */ //@menu @@ -50,6 +75,7 @@ //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code //@subsection Includes +#include "PosixSource.h" #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" @@ -57,20 +83,24 @@ #include "Storage.h" #include "StgRun.h" #include "StgStartup.h" -#include "GC.h" #include "Hooks.h" +#define COMPILING_SCHEDULER #include "Schedule.h" #include "StgMiscClosures.h" #include "Storage.h" #include "Interpreter.h" #include "Exception.h" #include "Printer.h" -#include "Main.h" #include "Signals.h" #include "Sanity.h" #include "Stats.h" -#include "Itimer.h" +#include "Timer.h" #include "Prelude.h" +#include "ThreadLabels.h" +#ifdef PROFILING +#include "Proftimer.h" +#include "ProfHeap.h" +#endif #if defined(GRAN) || defined(PAR) # include "GranSimRts.h" # include "GranSim.h" @@ -81,41 +111,37 @@ # include "HLC.h" #endif #include "Sparks.h" +#include "Capability.h" +#include "OSThreads.h" +#include "Task.h" + +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#include #include //@node Variables and Data structures, Prototypes, Includes, Main scheduling code //@subsection Variables and Data structures -/* Main threads: - * - * These are the threads which clients have requested that we run. - * - * In an SMP build, we might have several concurrent clients all - * waiting for results, and each one will wait on a condition variable - * until the result is available. - * - * In non-SMP, clients are strictly nested: the first client calls - * into the RTS, which might call out again to C with a _ccall_GC, and - * eventually re-enter the RTS. - * - * Main threads information is kept in a linked list: - */ -//@cindex StgMainThread -typedef struct StgMainThread_ { - StgTSO * tso; - SchedulerStatus stat; - StgClosure ** ret; -#ifdef SMP - pthread_cond_t wakeup; -#endif - struct StgMainThread_ *link; -} StgMainThread; - /* Main thread queue. * Locks required: sched_mutex. */ -static StgMainThread *main_threads; +StgMainThread *main_threads = NULL; + +#ifdef THREADED_RTS +// Pointer to the thread that executes main +// When this thread is finished, the program terminates +// by calling shutdownHaskellAndExit. +// It would be better to add a call to shutdownHaskellAndExit +// to the Main.main wrapper and to remove this hack. +StgMainThread *main_main_thread = NULL; +#endif /* Thread queues. * Locks required: sched_mutex. @@ -126,7 +152,7 @@ StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */ /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */ /* - In GranSim we have a runable and a blocked queue for each processor. + In GranSim we have a runnable and a blocked queue for each processor. In order to minimise code changes new arrays run_queue_hds/tls are created. run_queue_hd is then a short cut (macro) for run_queue_hds[CurrentProc] (see GranSim.h). @@ -142,22 +168,25 @@ StgTSO *ccalling_threadss[MAX_PROC]; #else /* !GRAN */ -StgTSO *run_queue_hd, *run_queue_tl; -StgTSO *blocked_queue_hd, *blocked_queue_tl; -StgTSO *sleeping_queue; /* perhaps replace with a hash table? */ +StgTSO *run_queue_hd = NULL; +StgTSO *run_queue_tl = NULL; +StgTSO *blocked_queue_hd = NULL; +StgTSO *blocked_queue_tl = NULL; +StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */ #endif /* Linked list of all threads. * Used for detecting garbage collected threads. */ -StgTSO *all_threads; +StgTSO *all_threads = NULL; -/* Threads suspended in _ccall_GC. +/* When a thread performs a safe C call (_ccall_GC, using old + * terminology), it gets put on the suspended_ccalling_threads + * list. Used by the garbage collector. */ static StgTSO *suspended_ccalling_threads; -static void GetRoots(void); static StgTSO *threadStackOverflow(StgTSO *tso); /* KH: The following two flags are shared memory locations. There is no need @@ -167,17 +196,17 @@ static StgTSO *threadStackOverflow(StgTSO *tso); /* flag set by signal handler to precipitate a context switch */ //@cindex context_switch -nat context_switch; +nat context_switch = 0; /* if this flag is set as well, give up execution */ //@cindex interrupted -rtsBool interrupted; +rtsBool interrupted = rtsFalse; /* Next thread ID to allocate. - * Locks required: sched_mutex + * Locks required: thread_id_mutex */ //@cindex next_thread_id -StgThreadID next_thread_id = 1; +static StgThreadID next_thread_id = 1; /* * Pointers to the state of the current thread. @@ -188,27 +217,16 @@ StgThreadID next_thread_id = 1; /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) - * + 1 (the realworld token for an IO thread) * + 1 (the closure to enter) + * + 1 (stg_ap_v_ret) + * + 1 (spare slot req'd by stg_ap_v_ret) * * A thread with this stack will bomb immediately with a stack * overflow, which will increase its stack size. */ -#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) +#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3) -/* Free capability list. - * Locks required: sched_mutex. - */ -#ifdef SMP -//@cindex free_capabilities -//@cindex n_free_capabilities -Capability *free_capabilities; /* Available capabilities for running threads */ -nat n_free_capabilities; /* total number of available capabilities */ -#else -//@cindex MainRegTable -Capability MainRegTable; /* for non-SMP, we have one global capability */ -#endif #if defined(GRAN) StgTSO *CurrentTSO; @@ -220,24 +238,19 @@ StgTSO *CurrentTSO; */ StgTSO dummy_tso; -rtsBool ready_to_gc; +static rtsBool ready_to_gc; -/* All our current task ids, saved in case we need to kill them later. +/* + * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) -- + * in an MT setting, needed to signal that a worker thread shouldn't hang around + * in the scheduler when it is out of work. */ -#ifdef SMP -//@cindex task_ids -task_info *task_ids; -#endif +static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); static void schedule ( void ); void interruptStgRts ( void ); -#if defined(GRAN) -static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri ); -#else -static StgTSO * createThread_ ( nat size, rtsBool have_lock ); -#endif static void detectBlackHoles ( void ); @@ -245,40 +258,46 @@ static void detectBlackHoles ( void ); static void sched_belch(char *s, ...); #endif -#ifdef SMP -//@cindex sched_mutex -//@cindex term_mutex -//@cindex thread_ready_cond -//@cindex gc_pending_cond -pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER; -pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER; +#if defined(RTS_SUPPORTS_THREADS) +/* ToDo: carefully document the invariants that go together + * with these synchronisation objects. + */ +Mutex sched_mutex = INIT_MUTEX_VAR; +Mutex term_mutex = INIT_MUTEX_VAR; +/* + * A heavyweight solution to the problem of protecting + * the thread_id from concurrent update. + */ +Mutex thread_id_mutex = INIT_MUTEX_VAR; + + +# if defined(SMP) +static Condition gc_pending_cond = INIT_COND_VAR; nat await_death; -#endif +# endif + +#endif /* RTS_SUPPORTS_THREADS */ #if defined(PAR) StgTSO *LastTSO; rtsTime TimeOfLastYield; +rtsBool emitSchedule = rtsTrue; #endif #if DEBUG -char *whatNext_strs[] = { - "ThreadEnterGHC", +static char *whatNext_strs[] = { "ThreadRunGHC", - "ThreadEnterInterp", + "ThreadInterpret", "ThreadKilled", + "ThreadRelocated", "ThreadComplete" }; +#endif -char *threadReturnCode_strs[] = { - "HeapOverflow", /* might also be StackOverflow */ - "StackOverflow", - "ThreadYielding", - "ThreadBlocked", - "ThreadFinished" -}; +#if defined(PAR) +StgTSO * createSparkThread(rtsSpark spark); +StgTSO * activateSpark (rtsSpark spark); #endif /* @@ -287,6 +306,23 @@ char *threadReturnCode_strs[] = { StgTSO *MainTSO; */ +#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) +static void taskStart(void); +static void +taskStart(void) +{ + schedule(); +} +#endif + +#if defined(RTS_SUPPORTS_THREADS) +void +startSchedulerTask(void) +{ + startTask(taskStart); +} +#endif + //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code //@subsection Main scheduling loop @@ -339,13 +375,25 @@ schedule( void ) rtsSpark spark; StgTSO *tso; GlobalTaskId pe; + rtsBool receivedFinish = rtsFalse; +# if defined(DEBUG) + nat tp_size, sp_size; // stats only +# endif #endif rtsBool was_interrupted = rtsFalse; + StgTSOWhatNext prev_what_next; ACQUIRE_LOCK(&sched_mutex); + +#if defined(RTS_SUPPORTS_THREADS) + waitForWorkCapability(&sched_mutex, &cap, rtsFalse); + IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId())); +#else + /* simply initialise it in the non-threaded case */ + grabCapability(&cap); +#endif #if defined(GRAN) - /* set up first event to get things going */ /* ToDo: assign costs for system setup and init MainTSO ! */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -370,8 +418,8 @@ schedule( void ) #elif defined(PAR) - while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */ - + while (!receivedFinish) { /* set by processMessages */ + /* when receiving PP_FINISH message */ #else while (1) { @@ -380,15 +428,32 @@ schedule( void ) IF_DEBUG(scheduler, printAllThreads()); +#if defined(RTS_SUPPORTS_THREADS) + /* Check to see whether there are any worker threads + waiting to deposit external call results. If so, + yield our capability */ + yieldToReturningWorker(&sched_mutex, &cap); +#endif + /* If we're interrupted (the user pressed ^C, or some other * termination condition occurred), kill all the currently running * threads. */ if (interrupted) { IF_DEBUG(scheduler, sched_belch("interrupted")); - deleteAllThreads(); interrupted = rtsFalse; was_interrupted = rtsTrue; +#if defined(RTS_SUPPORTS_THREADS) + // In the threaded RTS, deadlock detection doesn't work, + // so just exit right away. + prog_belch("interrupted"); + releaseCapability(cap); + startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit + RELEASE_LOCK(&sched_mutex); + shutdownHaskellAndExit(EXIT_SUCCESS); +#else + deleteAllThreads(); +#endif } /* Go through the list of main threads and wake up any @@ -396,28 +461,50 @@ schedule( void ) * should be done more efficiently without a linear scan * of the main threads list, somehow... */ -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) { StgMainThread *m, **prev; prev = &main_threads; - for (m = main_threads; m != NULL; m = m->link) { + for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { switch (m->tso->what_next) { case ThreadComplete: if (m->ret) { - *(m->ret) = (StgClosure *)m->tso->sp[0]; + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(m->ret) = (StgClosure *)m->tso->sp[1]; } *prev = m->link; m->stat = Success; - pthread_cond_broadcast(&m->wakeup); + broadcastCondition(&m->wakeup); +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif + if(m == main_main_thread) + { + releaseCapability(cap); + startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit + RELEASE_LOCK(&sched_mutex); + shutdownHaskellAndExit(EXIT_SUCCESS); + } break; case ThreadKilled: + if (m->ret) *(m->ret) = NULL; *prev = m->link; if (was_interrupted) { m->stat = Interrupted; } else { m->stat = Killed; } - pthread_cond_broadcast(&m->wakeup); + broadcastCondition(&m->wakeup); +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif + if(m == main_main_thread) + { + releaseCapability(cap); + startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit + RELEASE_LOCK(&sched_mutex); + shutdownHaskellAndExit(EXIT_SUCCESS); + } break; default: break; @@ -425,7 +512,8 @@ schedule( void ) } } -#else +#else /* not threaded */ + # if defined(PAR) /* in GUM do this only on the Main PE */ if (IAmMainThread) @@ -436,13 +524,18 @@ schedule( void ) StgMainThread *m = main_threads; if (m->tso->what_next == ThreadComplete || m->tso->what_next == ThreadKilled) { +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif main_threads = main_threads->link; if (m->tso->what_next == ThreadComplete) { - /* we finished successfully, fill in the return value */ - if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; }; - m->stat = Success; - return; + // We finished successfully, fill in the return value + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; }; + m->stat = Success; + return; } else { + if (m->ret) { *(m->ret) = NULL; }; if (was_interrupted) { m->stat = Interrupted; } else { @@ -457,10 +550,13 @@ schedule( void ) /* Top up the run queue from our spark pool. We try to make the * number of threads in the run queue equal to the number of * free capabilities. + * + * Disable spark support in SMP for now, non-essential & requires + * a little bit of work to make it compile cleanly. -- sof 1/02. */ -#if defined(SMP) +#if 0 /* defined(SMP) */ { - nat n = n_free_capabilities; + nat n = getFreeCapabilities(); StgTSO *tso = run_queue_hd; /* Count the run queue */ @@ -471,59 +567,56 @@ schedule( void ) for (; n > 0; n--) { StgClosure *spark; - spark = findSpark(); + spark = findSpark(rtsFalse); if (spark == NULL) { break; /* no more sparks in the pool */ } else { /* I'd prefer this to be done in activateSpark -- HWL */ /* tricky - it needs to hold the scheduler lock and * not try to re-acquire it -- SDM */ - StgTSO *tso; - tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); - pushClosure(tso,spark); - PUSH_ON_RUN_QUEUE(tso); -#ifdef PAR - advisory_thread_count++; -#endif - + createSparkThread(spark); IF_DEBUG(scheduler, - sched_belch("turning spark of closure %p into a thread", + sched_belch("==^^ turning spark of closure %p into a thread", (StgClosure *)spark)); } } /* We need to wake up the other tasks if we just created some * work for them. */ - if (n_free_capabilities - n > 1) { - pthread_cond_signal(&thread_ready_cond); + if (getFreeCapabilities() - n > 1) { + signalCondition( &thread_ready_cond ); } } -#endif /* SMP */ +#endif // SMP + + /* check for signals each time around the scheduler */ +#if defined(RTS_USER_SIGNALS) + if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ + startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); + } +#endif /* Check whether any waiting threads need to be woken up. If the * run queue is empty, and there are no other tasks running, we * can wait indefinitely for something to happen. - * ToDo: what if another client comes along & requests another - * main thread? */ - if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) { - awaitEvent( - (run_queue_hd == END_TSO_QUEUE) -#ifdef SMP - && (n_free_capabilities == RtsFlags.ParFlags.nNodes) + if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) +#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) + || EMPTY_RUN_QUEUE() +#endif + ) + { + awaitEvent( EMPTY_RUN_QUEUE() +#if defined(SMP) + && allFreeCapabilities() #endif ); } /* we can be interrupted while waiting for I/O... */ if (interrupted) continue; - /* check for signals each time around the scheduler */ -#ifndef mingw32_TARGET_OS - if (signals_pending()) { - start_signal_handlers(); - } -#endif - /* * Detect deadlock: when we have no threads to run, there are no * threads waiting on I/O or sleeping, and all the other tasks are @@ -535,62 +628,164 @@ schedule( void ) * If no threads are black holed, we have a deadlock situation, so * inform all the main threads. */ +#if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS) + if ( EMPTY_THREAD_QUEUES() +#if defined(RTS_SUPPORTS_THREADS) + && EMPTY_QUEUE(suspended_ccalling_threads) +#endif #ifdef SMP - if (blocked_queue_hd == END_TSO_QUEUE - && run_queue_hd == END_TSO_QUEUE - && sleeping_queue == END_TSO_QUEUE - && (n_free_capabilities == RtsFlags.ParFlags.nNodes)) + && allFreeCapabilities() +#endif + ) { - IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes...")); + IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); +#if defined(THREADED_RTS) + /* and SMP mode ..? */ + releaseCapability(cap); +#endif + // Garbage collection can release some new threads due to + // either (a) finalizers or (b) threads resurrected because + // they are about to be send BlockedOnDeadMVar. Any threads + // thus released will be immediately runnable. + GarbageCollect(GetRoots,rtsTrue); + + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } + + IF_DEBUG(scheduler, + sched_belch("still deadlocked, checking for black holes...")); detectBlackHoles(); - if (run_queue_hd == END_TSO_QUEUE) { + + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } + +#if defined(RTS_USER_SIGNALS) + /* If we have user-installed signal handlers, then wait + * for signals to arrive rather then bombing out with a + * deadlock. + */ +#if defined(RTS_SUPPORTS_THREADS) + if ( 0 ) { /* hmm..what to do? Simply stop waiting for + a signal with no runnable threads (or I/O + suspended ones) leads nowhere quick. + For now, simply shut down when we reach this + condition. + + ToDo: define precisely under what conditions + the Scheduler should shut down in an MT setting. + */ +#else + if ( anyUserHandlers() ) { +#endif + IF_DEBUG(scheduler, + sched_belch("still deadlocked, waiting for signals...")); + + awaitUserSignals(); + + // we might be interrupted... + if (interrupted) { continue; } + + if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); + startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); + } + ASSERT(!EMPTY_RUN_QUEUE()); + goto not_deadlocked; + } +#endif + + /* Probably a real deadlock. Send the current main thread the + * Deadlock exception (or in the SMP build, send *all* main + * threads the deadlock exception, since none of them can make + * progress). + */ + { StgMainThread *m; +#if defined(RTS_SUPPORTS_THREADS) for (m = main_threads; m != NULL; m = m->link) { - m->ret = NULL; - m->stat = Deadlock; - pthread_cond_broadcast(&m->wakeup); + switch (m->tso->why_blocked) { + case BlockedOnBlackHole: + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + break; + case BlockedOnException: + case BlockedOnMVar: + raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + break; + default: + barf("deadlock: main thread blocked in a strange way"); + } } - main_threads = NULL; - } - } -#else /* ! SMP */ - if (blocked_queue_hd == END_TSO_QUEUE - && run_queue_hd == END_TSO_QUEUE - && sleeping_queue == END_TSO_QUEUE) - { - IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes...")); - detectBlackHoles(); - if (run_queue_hd == END_TSO_QUEUE) { - StgMainThread *m = main_threads; - m->ret = NULL; - m->stat = Deadlock; - main_threads = m->link; - return; +#else + m = main_threads; + switch (m->tso->why_blocked) { + case BlockedOnBlackHole: + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + break; + case BlockedOnException: + case BlockedOnMVar: + raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + break; + default: + barf("deadlock: main thread blocked in a strange way"); + } +#endif } + +#if defined(RTS_SUPPORTS_THREADS) + /* ToDo: revisit conditions (and mechanism) for shutting + down a multi-threaded world */ + IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); + RELEASE_LOCK(&sched_mutex); + shutdownHaskell(); + return; +#endif } + not_deadlocked: + +#elif defined(RTS_SUPPORTS_THREADS) + /* ToDo: add deadlock detection in threaded RTS */ +#elif defined(PAR) + /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ #endif -#ifdef SMP +#if defined(SMP) /* If there's a GC pending, don't do anything until it has * completed. */ if (ready_to_gc) { IF_DEBUG(scheduler,sched_belch("waiting for GC")); - pthread_cond_wait(&gc_pending_cond, &sched_mutex); + waitCondition( &gc_pending_cond, &sched_mutex ); } - +#endif + +#if defined(RTS_SUPPORTS_THREADS) +#if defined(SMP) /* block until we've got a thread on the run queue and a free * capability. + * */ - while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) { - IF_DEBUG(scheduler, sched_belch("waiting for work")); - pthread_cond_wait(&thread_ready_cond, &sched_mutex); - IF_DEBUG(scheduler, sched_belch("work now available")); + if ( EMPTY_RUN_QUEUE() ) { + /* Give up our capability */ + releaseCapability(cap); + + /* If we're in the process of shutting down (& running the + * a batch of finalisers), don't wait around. + */ + if ( shutting_down_scheduler ) { + RELEASE_LOCK(&sched_mutex); + return; + } + IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); + waitForWorkCapability(&sched_mutex, &cap, rtsTrue); + IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); + } +#else + if ( EMPTY_RUN_QUEUE() ) { + continue; // nothing to do } #endif +#endif #if defined(GRAN) - if (RtsFlags.GranFlags.Light) GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc @@ -603,7 +798,7 @@ schedule( void ) if (!RtsFlags.GranFlags.Light) handleIdlePEs(); - IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n")) + IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n")); /* main event dispatcher in GranSim */ switch (event->evttype) { @@ -717,7 +912,7 @@ schedule( void ) IF_DEBUG(gran, fprintf(stderr, "GRAN: About to run current thread, which is\n"); - G_TSO(t,5)) + G_TSO(t,5)); context_switch = 0; // turned on via GranYield, checking events and time slice @@ -727,14 +922,13 @@ schedule( void ) procStatus[CurrentProc] = Busy; #elif defined(PAR) - if (PendingFetches != END_BF_QUEUE) { processFetches(); } /* ToDo: phps merge with spark activation above */ /* check whether we have local work and send requests if we have none */ - if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */ + if (EMPTY_RUN_QUEUE()) { /* no runnable threads */ /* :-[ no local threads => look out for local sparks */ /* the spark pool for the current PE */ pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable @@ -748,8 +942,8 @@ schedule( void ) * to turn one of those pending sparks into a * thread... */ - - spark = findSpark(); /* get a spark */ + + spark = findSpark(rtsFalse); /* get a spark */ if (spark != (rtsSpark) NULL) { tso = activateSpark(spark); /* turn the spark into a thread */ IF_PAR_DEBUG(schedule, @@ -766,9 +960,13 @@ schedule( void ) spark_queue_len(pool))); goto next_thread; } - } else + } + + /* If we still have no work we need to send a FISH to get a spark + from another PE + */ + if (EMPTY_RUN_QUEUE()) { /* =8-[ no local sparks => look for work on other PEs */ - { /* * We really have absolutely no work. Send out a fish * (there may be some out there already), and wait for @@ -777,28 +975,48 @@ schedule( void ) * we're hoping to see. (Of course, we still have to * respond to other types of messages.) */ - if (//!fishing && - outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // && - // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) { - /* fishing set in sendFish, processFish; + TIME now = msTime() /*CURRENT_TIME*/; + IF_PAR_DEBUG(verbose, + belch("-- now=%ld", now)); + IF_PAR_DEBUG(verbose, + if (outstandingFishes < RtsFlags.ParFlags.maxFishes && + (last_fish_arrived_at!=0 && + last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) { + belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)", + last_fish_arrived_at+RtsFlags.ParFlags.fishDelay, + last_fish_arrived_at, + RtsFlags.ParFlags.fishDelay, now); + }); + + if (outstandingFishes < RtsFlags.ParFlags.maxFishes && + (last_fish_arrived_at==0 || + (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) { + /* outstandingFishes is set in sendFish, processFish; avoid flooding system with fishes via delay */ pe = choosePE(); sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, NEW_FISH_HUNGER); + + // Global statistics: count no. of fishes + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + globalParStats.tot_fish_mess++; + } } - - processMessages(); + + receivedFinish = processMessages(); goto next_thread; - // ReSchedule(0); } } else if (PacketsWaiting()) { /* Look for incoming messages */ - processMessages(); + receivedFinish = processMessages(); } /* Now we are sure that we have some work available */ ASSERT(run_queue_hd != END_TSO_QUEUE); + /* Take a thread from the run queue, if we have work */ t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE); + IF_DEBUG(sanity,checkTSO(t)); /* ToDo: write something to the log-file if (RTSflags.ParFlags.granSimStats && !sameThread) @@ -809,17 +1027,23 @@ schedule( void ) /* the spark pool for the current PE */ pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable - IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", - spark_queue_len(pool), - CURRENT_PROC, - pool->hd, pool->tl, pool->base, pool->lim)); - - IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", - run_queue_len(), CURRENT_PROC, - run_queue_hd, run_queue_tl)); + IF_DEBUG(scheduler, + belch("--=^ %d threads, %d sparks on [%#x]", + run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); + +# if 1 + if (0 && RtsFlags.ParFlags.ParStats.Full && + t && LastTSO && t->id != LastTSO->id && + LastTSO->why_blocked == NotBlocked && + LastTSO->what_next != ThreadComplete) { + // if previously scheduled TSO not blocked we have to record the context switch + DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC, + GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0); + } -#if 0 - if (t != LastTSO) { + if (RtsFlags.ParFlags.ParStats.Full && + (emitSchedule /* forced emit */ || + (t && LastTSO && t->id != LastTSO->id))) { /* we are running a different TSO, so write a schedule event to log file NB: If we use fair scheduling we also have to write a deschedule @@ -829,73 +1053,61 @@ schedule( void ) */ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0); - + emitSchedule = rtsFalse; } -#endif + +# endif #else /* !GRAN && !PAR */ - /* grab a thread from the run queue - */ + /* grab a thread from the run queue */ ASSERT(run_queue_hd != END_TSO_QUEUE); t = POP_RUN_QUEUE(); + // Sanity check the thread we're about to run. This can be + // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); - -#endif - - /* grab a capability - */ -#ifdef SMP - cap = free_capabilities; - free_capabilities = cap->link; - n_free_capabilities--; -#else - cap = &MainRegTable; #endif - - cap->rCurrentTSO = t; + + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 */ - if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 - && (run_queue_hd != END_TSO_QUEUE - || blocked_queue_hd != END_TSO_QUEUE - || sleeping_queue != END_TSO_QUEUE)) + if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0 + && (run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE))) context_switch = 1; else context_switch = 0; +run_thread: + RELEASE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", - t->id, t, whatNext_strs[t->what_next])); + IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", + t->id, whatNext_strs[t->what_next])); + +#ifdef PROFILING + startHeapProfTimer(); +#endif /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ /* Run the current thread */ - switch (cap->rCurrentTSO->what_next) { + prev_what_next = t->what_next; + switch (prev_what_next) { case ThreadKilled: case ThreadComplete: - /* Thread already finished, return to scheduler. */ - ret = ThreadFinished; - break; - case ThreadEnterGHC: - ret = StgRun((StgFunPtr) stg_enterStackTop, cap); - break; + /* Thread already finished, return to scheduler. */ + ret = ThreadFinished; + break; case ThreadRunGHC: - ret = StgRun((StgFunPtr) stg_returnToStackTop, cap); - break; - case ThreadEnterInterp: -#ifdef GHCI - { - IF_DEBUG(scheduler,sched_belch("entering interpreter")); - ret = interpretBCO(cap); - break; - } -#else - barf("Panic: entered a BCO but no bytecode interpreter in this build"); -#endif + ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); + break; + case ThreadInterpret: + ret = interpretBCO(cap); + break; default: barf("schedule: invalid what_next field"); } @@ -903,37 +1115,118 @@ schedule( void ) /* Costs for the scheduler are assigned to CCS_SYSTEM */ #ifdef PROFILING + stopHeapProfTimer(); CCCS = CCS_SYSTEM; #endif ACQUIRE_LOCK(&sched_mutex); - -#ifdef SMP - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self());); + +#ifdef RTS_SUPPORTS_THREADS + IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); #endif - t = cap->rCurrentTSO; + t = cap->r.rCurrentTSO; #if defined(PAR) /* HACK 675: if the last thread didn't yield, make sure to print a SCHEDULE event to the log file when StgRunning the next thread, even if it is the same one as before */ - LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; + LastTSO = t; TimeOfLastYield = CURRENT_TIME; #endif switch (ret) { case HeapOverflow: +#if defined(GRAN) + IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t)); + globalGranStats.tot_heapover++; +#elif defined(PAR) + globalParStats.tot_heapover++; +#endif + + // did the task ask for a large block? + if (cap->r.rHpAlloc > BLOCK_SIZE_W) { + // if so, get one and push it on the front of the nursery. + bdescr *bd; + nat blocks; + + blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE; + + IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", + t->id, whatNext_strs[t->what_next], blocks)); + + // don't do this if it would push us over the + // alloc_blocks_lim limit; we'll GC first. + if (alloc_blocks + blocks < alloc_blocks_lim) { + + alloc_blocks += blocks; + bd = allocGroup( blocks ); + + // link the new group into the list + bd->link = cap->r.rCurrentNursery; + bd->u.back = cap->r.rCurrentNursery->u.back; + if (cap->r.rCurrentNursery->u.back != NULL) { + cap->r.rCurrentNursery->u.back->link = bd; + } else { + ASSERT(g0s0->blocks == cap->r.rCurrentNursery && + g0s0->blocks == cap->r.rNursery); + cap->r.rNursery = g0s0->blocks = bd; + } + cap->r.rCurrentNursery->u.back = bd; + + // initialise it as a nursery block. We initialise the + // step, gen_no, and flags field of *every* sub-block in + // this large block, because this is easier than making + // sure that we always find the block head of a large + // block whenever we call Bdescr() (eg. evacuate() and + // isAlive() in the GC would both have to do this, at + // least). + { + bdescr *x; + for (x = bd; x < bd + blocks; x++) { + x->step = g0s0; + x->gen_no = 0; + x->flags = 0; + } + } + + // don't forget to update the block count in g0s0. + g0s0->n_blocks += blocks; + // This assert can be a killer if the app is doing lots + // of large block allocations. + ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); + + // now update the nursery to point to the new block + cap->r.rCurrentNursery = bd; + + // we might be unlucky and have another thread get on the + // run queue before us and steal the large block, but in that + // case the thread will just end up requesting another large + // block. + PUSH_ON_RUN_QUEUE(t); + break; + } + } + /* make all the running tasks block on a condition variable, * maybe set context_switch and wait till they all pile in, * then have them wait on a GC condition variable. */ - IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", - t->id, t, whatNext_strs[t->what_next])); + IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", + t->id, whatNext_strs[t->what_next])); threadPaused(t); #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); +#elif defined(PAR) + /* Currently we emit a DESCHEDULE event before GC in GUM. + ToDo: either add separate event to distinguish SYSTEM time from rest + or just nuke this DESCHEDULE (and the following SCHEDULE) */ + if (0 && RtsFlags.ParFlags.ParStats.Full) { + DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, + GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0); + emitSchedule = rtsTrue; + } #endif ready_to_gc = rtsTrue; @@ -943,8 +1236,17 @@ schedule( void ) break; case StackOverflow: - IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", - t->id, t, whatNext_strs[t->what_next])); +#if defined(GRAN) + IF_DEBUG(gran, + DumpGranEvent(GR_DESCHEDULE, t)); + globalGranStats.tot_stackover++; +#elif defined(PAR) + // IF_DEBUG(par, + // DumpGranEvent(GR_DESCHEDULE, t); + globalParStats.tot_stackover++; +#endif + IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", + t->id, whatNext_strs[t->what_next])); /* just adjust the stack for this thread, then pop it back * on the run queue. */ @@ -974,8 +1276,9 @@ schedule( void ) DumpGranEvent(GR_DESCHEDULE, t)); globalGranStats.tot_yields++; #elif defined(PAR) - IF_DEBUG(par, - DumpGranEvent(GR_DESCHEDULE, t)); + // IF_DEBUG(par, + // DumpGranEvent(GR_DESCHEDULE, t); + globalParStats.tot_yields++; #endif /* put the thread back on the run queue. Then, if we're ready to * GC, check whether this is the last task to stop. If so, wake @@ -983,24 +1286,29 @@ schedule( void ) * GC is finished. */ IF_DEBUG(scheduler, - if (t->what_next == ThreadEnterInterp) { - /* ToDo: or maybe a timer expired when we were in Hugs? - * or maybe someone hit ctrl-C - */ - belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", - t->id, t, whatNext_strs[t->what_next]); + if (t->what_next != prev_what_next) { + belch("--<< thread %ld (%s) stopped to switch evaluators", + t->id, whatNext_strs[t->what_next]); } else { - belch("--<< thread %ld (%p; %s) stopped, yielding", - t->id, t, whatNext_strs[t->what_next]); + belch("--<< thread %ld (%s) stopped, yielding", + t->id, whatNext_strs[t->what_next]); } ); - threadPaused(t); - IF_DEBUG(sanity, //belch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); ASSERT(t->link == END_TSO_QUEUE); + + // Shortcut if we're just switching evaluators: don't bother + // doing stack squeezing (which can be expensive), just run the + // thread. + if (t->what_next != prev_what_next) { + goto run_thread; + } + + threadPaused(t); + #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); @@ -1008,7 +1316,20 @@ schedule( void ) //belch("&& Doing sanity check on all ThreadQueues (and their TSOs)."); checkThreadQsSanity(rtsTrue)); #endif + +#if defined(PAR) + if (RtsFlags.ParFlags.doFairScheduling) { + /* this does round-robin scheduling; good for concurrency */ + APPEND_TO_RUN_QUEUE(t); + } else { + /* this does unfair scheduling; good for parallelism */ + PUSH_ON_RUN_QUEUE(t); + } +#else + // this does round-robin scheduling; good for concurrency APPEND_TO_RUN_QUEUE(t); +#endif + #if defined(GRAN) /* add a ContinueThread event to actually process the thread */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -1017,10 +1338,10 @@ schedule( void ) IF_GRAN_DEBUG(bq, belch("GRAN: eventq and runnableq after adding yielded thread to queue again:"); G_EVENTQ(0); - G_CURR_THREADQ(0)) + G_CURR_THREADQ(0)); #endif /* GRAN */ break; - + case ThreadBlocked: #if defined(GRAN) IF_DEBUG(scheduler, @@ -1043,16 +1364,19 @@ schedule( void ) procStatus[CurrentProc] = Idle; */ #elif defined(PAR) - IF_DEBUG(par, - DumpGranEvent(GR_DESCHEDULE, t)); + IF_DEBUG(scheduler, + belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", + t->id, t, whatNext_strs[t->what_next], t->block_info.closure)); + IF_PAR_DEBUG(bq, + + if (t->block_info.closure!=(StgClosure*)NULL) + print_bq(t->block_info.closure)); /* Send a fetch (if BlockedOnGA) and dump event to log file */ blockThread(t); - IF_DEBUG(scheduler, - belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure); - if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure)); + /* whatever we schedule next, we must log that schedule */ + emitSchedule = rtsTrue; #else /* !GRAN */ /* don't need to do anything. Either the thread is blocked on @@ -1061,7 +1385,8 @@ schedule( void ) * case it'll be on the relevant queue already. */ IF_DEBUG(scheduler, - fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t); + fprintf(stderr, "--<< thread %d (%s) stopped: ", + t->id, whatNext_strs[t->what_next]); printThreadBlockage(t); fprintf(stderr, "\n")); @@ -1082,12 +1407,22 @@ schedule( void ) /* We also end up here if the thread kills itself with an * uncaught exception, see Exception.hc. */ - IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t)); + IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", + t->id, whatNext_strs[t->what_next])); #if defined(GRAN) endThread(t, CurrentProc); // clean-up the thread #elif defined(PAR) + /* For now all are advisory -- HWL */ + //if(t->priority==AdvisoryPriority) ?? advisory_thread_count--; - if (RtsFlags.ParFlags.ParStats.Full) + +# ifdef DIST + if(t->dist.priority==RevalPriority) + FinishReval(t); +# endif + + if (RtsFlags.ParFlags.ParStats.Full && + !RtsFlags.ParFlags.ParStats.Suppressed) DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */); #endif break; @@ -1095,31 +1430,37 @@ schedule( void ) default: barf("schedule: invalid thread return code %d", (int)ret); } - -#ifdef SMP - cap->link = free_capabilities; - free_capabilities = cap; - n_free_capabilities++; + +#ifdef PROFILING + // When we have +RTS -i0 and we're heap profiling, do a census at + // every GC. This lets us get repeatable runs for debugging. + if (performHeapProfile || + (RtsFlags.ProfFlags.profileInterval==0 && + RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) { + GarbageCollect(GetRoots, rtsTrue); + heapCensus(); + performHeapProfile = rtsFalse; + ready_to_gc = rtsFalse; // we already GC'd + } #endif + if (ready_to_gc #ifdef SMP - if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) -#else - if (ready_to_gc) + && allFreeCapabilities() #endif - { + ) { /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to * broadcast on gc_pending_cond afterward. */ -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif GarbageCollect(GetRoots,rtsFalse); ready_to_gc = rtsFalse; #ifdef SMP - pthread_cond_broadcast(&gc_pending_cond); + broadcastCondition(&gc_pending_cond); #endif #if defined(GRAN) /* add a ContinueThread event to continue execution of current thread */ @@ -1129,27 +1470,98 @@ schedule( void ) IF_GRAN_DEBUG(bq, fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n"); G_EVENTQ(0); - G_CURR_THREADQ(0)) + G_CURR_THREADQ(0)); #endif /* GRAN */ } + #if defined(GRAN) next_thread: IF_GRAN_DEBUG(unused, print_eventq(EventHd)); event = get_next_event(); - #elif defined(PAR) next_thread: /* ToDo: wait for next message to arrive rather than busy wait */ - -#else /* GRAN */ - /* not any more - next_thread: - t = take_off_run_queue(END_TSO_QUEUE); - */ #endif /* GRAN */ + } /* end of while(1) */ + + IF_PAR_DEBUG(verbose, + belch("== Leaving schedule() after having received Finish")); +} + +/* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ + +StgInt +forkProcess(StgTSO* tso) +{ +#ifndef mingw32_TARGET_OS + pid_t pid; + StgTSO* t,*next; + StgMainThread *m; + rtsBool doKill; + + IF_DEBUG(scheduler,sched_belch("forking!")); + + pid = fork(); + if (pid) { /* parent */ + + /* just return the pid */ + + } else { /* child */ + /* wipe all other threads */ + run_queue_hd = run_queue_tl = tso; + tso->link = END_TSO_QUEUE; + + /* When clearing out the threads, we need to ensure + that a 'main thread' is left behind; if there isn't, + the Scheduler will shutdown next time it is entered. + + ==> we don't kill a thread that's on the main_threads + list (nor the current thread.) + + [ Attempts at implementing the more ambitious scheme of + killing the main_threads also, and then adding the + current thread onto the main_threads list if it wasn't + there already, failed -- waitThread() (for one) wasn't + up to it. If it proves to be desirable to also kill + the main threads, then this scheme will have to be + revisited (and fully debugged!) + + -- sof 7/2002 + ] + */ + /* DO NOT TOUCH THE QUEUES directly because most of the code around + us is picky about finding the thread still in its queue when + handling the deleteThread() */ + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + /* Don't kill the current thread.. */ + if (t->id == tso->id) continue; + doKill=rtsTrue; + /* ..or a main thread */ + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso->id == t->id) { + doKill=rtsFalse; + break; + } + } + if (doKill) { + deleteThread(t); + } + } + } + return pid; +#else /* mingw32 */ + barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); + /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */ + return -1; +#endif /* mingw32 */ } /* --------------------------------------------------------------------------- @@ -1157,21 +1569,19 @@ schedule( void ) * * This is used when we catch a user interrupt (^C), before performing * any necessary cleanups and running finalizers. + * + * Locks: sched_mutex held. * ------------------------------------------------------------------------- */ -void deleteAllThreads ( void ) +void +deleteAllThreads ( void ) { - StgTSO* t; + StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); - for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) { + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->global_link; deleteThread(t); - } - for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) { - deleteThread(t); - } - for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) { - deleteThread(t); - } + } run_queue_hd = run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; @@ -1179,6 +1589,7 @@ void deleteAllThreads ( void ) /* startThread and insertThread are now in GranSim.c -- HWL */ + //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code //@subsection Suspend and Resume @@ -1198,40 +1609,85 @@ void deleteAllThreads ( void ) * ------------------------------------------------------------------------- */ StgInt -suspendThread( Capability *cap ) +suspendThread( StgRegTable *reg, + rtsBool concCall +#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG) + STG_UNUSED +#endif + ) { nat tok; + Capability *cap; + + /* assume that *reg is a pointer to the StgRegTable part + * of a Capability. + */ + cap = (Capability *)((void *)reg - sizeof(StgFunTable)); ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id)); + sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall)); + + // XXX this might not be necessary --SDM + cap->r.rCurrentTSO->what_next = ThreadRunGHC; - threadPaused(cap->rCurrentTSO); - cap->rCurrentTSO->link = suspended_ccalling_threads; - suspended_ccalling_threads = cap->rCurrentTSO; + threadPaused(cap->r.rCurrentTSO); + cap->r.rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->r.rCurrentTSO; + +#if defined(RTS_SUPPORTS_THREADS) + if(cap->r.rCurrentTSO->blocked_exceptions == NULL) + { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; + cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE; + } + else + { + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc; + } +#endif /* Use the thread ID as the token; it should be unique */ - tok = cap->rCurrentTSO->id; + tok = cap->r.rCurrentTSO->id; -#ifdef SMP - cap->link = free_capabilities; - free_capabilities = cap; - n_free_capabilities++; + /* Hand back capability */ + releaseCapability(cap); + +#if defined(RTS_SUPPORTS_THREADS) + /* Preparing to leave the RTS, so ensure there's a native thread/task + waiting to take over. + */ + IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId())); + //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult + startTask(taskStart); + //} #endif + /* Other threads _might_ be available for execution; signal this */ + THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); return tok; } -Capability * -resumeThread( StgInt tok ) +StgRegTable * +resumeThread( StgInt tok, + rtsBool concCall STG_UNUSED ) { StgTSO *tso, **prev; Capability *cap; +#if defined(RTS_SUPPORTS_THREADS) + /* Wait for permission to re-enter the RTS with the result. */ ACQUIRE_LOCK(&sched_mutex); + grabReturnCapability(&sched_mutex, &cap); + + IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId())); +#else + grabCapability(&cap); +#endif + /* Remove the thread off of the suspended list */ prev = &suspended_ccalling_threads; for (tso = suspended_ccalling_threads; tso != END_TSO_QUEUE; @@ -1245,24 +1701,23 @@ resumeThread( StgInt tok ) barf("resumeThread: thread not found"); } tso->link = END_TSO_QUEUE; - -#ifdef SMP - while (free_capabilities == NULL) { - IF_DEBUG(scheduler, sched_belch("waiting to resume")); - pthread_cond_wait(&thread_ready_cond, &sched_mutex); - IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); + +#if defined(RTS_SUPPORTS_THREADS) + if(tso->why_blocked == BlockedOnCCall) + { + awakenBlockedQueueNoLock(tso->blocked_exceptions); + tso->blocked_exceptions = NULL; } - cap = free_capabilities; - free_capabilities = cap->link; - n_free_capabilities--; -#else - cap = &MainRegTable; #endif + + /* Reset blocking status */ + tso->why_blocked = NotBlocked; - cap->rCurrentTSO = tso; - + cap->r.rCurrentTSO = tso; +#if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); - return cap; +#endif + return &cap->r; } @@ -1278,10 +1733,11 @@ static void unblockThread(StgTSO *tso); * instances of Eq/Ord for ThreadIds. * ------------------------------------------------------------------------ */ -int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) +int +cmp_thread(StgPtr tso1, StgPtr tso2) { - StgThreadID id1 = tso1->id; - StgThreadID id2 = tso2->id; + StgThreadID id1 = ((StgTSO *)tso1)->id; + StgThreadID id2 = ((StgTSO *)tso2)->id; if (id1 < id2) return (-1); if (id1 > id2) return 1; @@ -1289,6 +1745,33 @@ int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) } /* --------------------------------------------------------------------------- + * Fetching the ThreadID from an StgTSO. + * + * This is used in the implementation of Show for ThreadIds. + * ------------------------------------------------------------------------ */ +int +rts_getThreadId(StgPtr tso) +{ + return ((StgTSO *)tso)->id; +} + +#ifdef DEBUG +void +labelThread(StgPtr tso, char *label) +{ + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()"); + strncpy(buf,label,len); + /* Update will free the old memory for us */ + updateThreadLabel((StgWord)tso,buf); +} +#endif /* DEBUG */ + +/* --------------------------------------------------------------------------- Create a new thread. The new thread starts with the given stack size. Before the @@ -1305,25 +1788,12 @@ int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) #if defined(GRAN) /* currently pri (priority) is only used in a GRAN setup -- HWL */ StgTSO * -createThread(nat stack_size, StgInt pri) -{ - return createThread_(stack_size, rtsFalse, pri); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock, StgInt pri) -{ +createThread(nat size, StgInt pri) #else StgTSO * -createThread(nat stack_size) -{ - return createThread_(stack_size, rtsFalse); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock) -{ +createThread(nat size) #endif +{ StgTSO *tso; nat stack_size; @@ -1354,21 +1824,23 @@ createThread_(nat size, rtsBool have_lock) stack_size = size - TSO_STRUCT_SIZEW; tso = (StgTSO *)allocate(size); - TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0); + TICK_ALLOC_TSO(stack_size, 0); SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); #if defined(GRAN) SET_GRAN_HDR(tso, ThisPE); #endif - tso->what_next = ThreadEnterGHC; + + // Always start with the compiled code evaluator + tso->what_next = ThreadRunGHC; /* tso->id needs to be unique. For now we use a heavyweight mutex to * protect the increment operation on next_thread_id. * In future, we could use an atomic increment instead. */ - if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); } + ACQUIRE_LOCK(&thread_id_mutex); tso->id = next_thread_id++; - if (!have_lock) { RELEASE_LOCK(&sched_mutex); } + RELEASE_LOCK(&thread_id_mutex); tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; @@ -1385,8 +1857,6 @@ createThread_(nat size, rtsBool have_lock) /* put a stop frame on the stack */ tso->sp -= sizeofW(StgStopFrame); SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); - tso->su = (StgUpdateFrame*)tso->sp; - // ToDo: check this #if defined(GRAN) tso->link = END_TSO_QUEUE; @@ -1398,8 +1868,14 @@ createThread_(nat size, rtsBool have_lock) */ #endif -#if defined(GRAN) || defined(PAR) - DumpGranEvent(GR_START,tso); +#if defined(GRAN) + if (RtsFlags.GranFlags.GranSimStats.Full) + DumpGranEvent(GR_START,tso); +#elif defined(PAR) + if (RtsFlags.ParFlags.ParStats.Full) + DumpGranEvent(GR_STARTQ,tso); + /* HACk to avoid SCHEDULE + LastTSO = tso; */ #endif /* Link the new thread on the global thread list. @@ -1407,6 +1883,10 @@ createThread_(nat size, rtsBool have_lock) tso->global_link = all_threads; all_threads = tso; +#if defined(DIST) + tso->dist.priority = MandatoryPriority; //by default that is... +#endif + #if defined(GRAN) tso->gran.pri = pri; # if defined(DEBUG) @@ -1455,6 +1935,13 @@ createThread_(nat size, rtsBool have_lock) globalGranStats.threads_created_on_PE[CurrentProc]++; globalGranStats.tot_sq_len += spark_queue_len(CurrentProc); globalGranStats.tot_sq_probes++; +#elif defined(PAR) + // collect parallel global statistics (currently done together with GC stats) + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); + globalParStats.tot_threads_created++; + } #endif #if defined(GRAN) @@ -1472,6 +1959,36 @@ createThread_(nat size, rtsBool have_lock) return tso; } +#if defined(PAR) +/* RFP: + all parallel thread creation calls should fall through the following routine. +*/ +StgTSO * +createSparkThread(rtsSpark spark) +{ StgTSO *tso; + ASSERT(spark != (rtsSpark)NULL); + if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) + { threadsIgnored++; + barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)", + RtsFlags.ParFlags.maxThreads, advisory_thread_count); + return END_TSO_QUEUE; + } + else + { threadsCreated++; + tso = createThread(RtsFlags.GcFlags.initialStkSize); + if (tso==END_TSO_QUEUE) + barf("createSparkThread: Cannot create TSO"); +#if defined(DIST) + tso->priority = AdvisoryPriority; +#endif + pushClosure(tso,spark); + PUSH_ON_RUN_QUEUE(tso); + advisory_thread_count++; + } + return tso; +} +#endif + /* Turn a spark into a thread. ToDo: fix for SMP (needs to acquire SCHED_MUTEX!) @@ -1482,22 +1999,13 @@ StgTSO * activateSpark (rtsSpark spark) { StgTSO *tso; - - ASSERT(spark != (rtsSpark)NULL); - tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); - if (tso!=END_TSO_QUEUE) { - pushClosure(tso,spark); - PUSH_ON_RUN_QUEUE(tso); - advisory_thread_count++; - if (RtsFlags.ParFlags.ParStats.Full) { - //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ... - IF_PAR_DEBUG(verbose, - belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread", - (StgClosure *)spark, info_type((StgClosure *)spark))); - } - } else { - barf("activateSpark: Cannot create TSO"); + tso = createSparkThread(spark); + if (RtsFlags.ParFlags.ParStats.Full) { + //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ... + IF_PAR_DEBUG(verbose, + belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread", + (StgClosure *)spark, info_type((StgClosure *)spark))); } // ToDo: fwd info on local/global spark to thread -- HWL // tso->gran.exported = spark->exported; @@ -1508,6 +2016,13 @@ activateSpark (rtsSpark spark) } #endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ); + + /* --------------------------------------------------------------------------- * scheduleThread() * @@ -1518,15 +2033,12 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ +static void scheduleThread_ (StgTSO* tso); + void -scheduleThread(StgTSO *tso) +scheduleThread_(StgTSO *tso) { - if (tso==END_TSO_QUEUE){ - schedule(); - return; - } - - ACQUIRE_LOCK(&sched_mutex); + // Precondition: sched_mutex must be held. /* Put the new thread on the head of the runnable queue. The caller * better push an appropriate closure on this thread's stack @@ -1539,24 +2051,49 @@ scheduleThread(StgTSO *tso) #if 0 IF_DEBUG(scheduler,printTSO(tso)); #endif +} + +void scheduleThread(StgTSO* tso) +{ + ACQUIRE_LOCK(&sched_mutex); + scheduleThread_(tso); RELEASE_LOCK(&sched_mutex); } -/* --------------------------------------------------------------------------- - * startTasks() - * - * Start up Posix threads to run each of the scheduler tasks. - * I believe the task ids are not needed in the system as defined. - * KH @ 25/10/99 - * ------------------------------------------------------------------------ */ +SchedulerStatus +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) +{ // Precondition: sched_mutex must be held + StgMainThread *m; -#if defined(PAR) || defined(SMP) -void * -taskStart( void *arg STG_UNUSED ) -{ - rts_evalNothing(NULL); -} + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&m->wakeup); +#endif + + /* Put the thread on the main-threads list prior to scheduling the TSO. + Failure to do so introduces a race condition in the MT case (as + identified by Wolfgang Thaller), whereby the new task/OS thread + created by scheduleThread_() would complete prior to the thread + that spawned it managed to put 'itself' on the main-threads list. + The upshot of it all being that the worker thread wouldn't get to + signal the completion of the its work item for the main thread to + see (==> it got stuck waiting.) -- sof 6/02. + */ + IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id)); + + m->link = main_threads; + main_threads = m; + + scheduleThread_(tso); +#if defined(THREADED_RTS) + return waitThread_(m, rtsTrue); +#else + return waitThread_(m); #endif +} /* --------------------------------------------------------------------------- * initScheduler() @@ -1565,7 +2102,6 @@ taskStart( void *arg STG_UNUSED ) * queues contained any threads, they'll be garbage collected at the * next pass. * - * This now calls startTasks(), so should only be called once! KH @ 25/10/99 * ------------------------------------------------------------------------ */ #ifdef SMP @@ -1576,11 +2112,10 @@ term_handler(int sig STG_UNUSED) ACQUIRE_LOCK(&term_mutex); await_death--; RELEASE_LOCK(&term_mutex); - pthread_exit(NULL); + shutdownThread(); } #endif -//@cindex initScheduler void initScheduler(void) { @@ -1613,14 +2148,27 @@ initScheduler(void) RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; + +#if defined(RTS_SUPPORTS_THREADS) + /* Initialise the mutex and condition variables used by + * the scheduler. */ + initMutex(&sched_mutex); + initMutex(&term_mutex); + initMutex(&thread_id_mutex); + + initCondition(&thread_ready_cond); +#endif + +#if defined(SMP) + initCondition(&gc_pending_cond); +#endif -#ifdef INTERPRETER - ecafList = END_ECAF_LIST; - clearECafTable(); +#if defined(RTS_SUPPORTS_THREADS) + ACQUIRE_LOCK(&sched_mutex); #endif /* Install the SIGHUP handler */ -#ifdef SMP +#if defined(SMP) { struct sigaction action,oact; @@ -1633,93 +2181,38 @@ initScheduler(void) } #endif -#ifdef SMP - /* Allocate N Capabilities */ - { - nat i; - Capability *cap, *prev; - cap = NULL; - prev = NULL; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities"); - cap->link = prev; - prev = cap; - } - free_capabilities = cap; - n_free_capabilities = RtsFlags.ParFlags.nNodes; - } - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", - n_free_capabilities);); + /* A capability holds the state a native thread needs in + * order to execute STG code. At least one capability is + * floating around (only SMP builds have more than one). + */ + initCapabilities(); + +#if defined(RTS_SUPPORTS_THREADS) + /* start our haskell execution tasks */ +# if defined(SMP) + startTaskManager(RtsFlags.ParFlags.nNodes, taskStart); +# else + startTaskManager(0,taskStart); +# endif #endif -#if defined(SMP) || defined(PAR) +#if /* defined(SMP) ||*/ defined(PAR) initSparkPools(); #endif -} -#ifdef SMP -void -startTasks( void ) -{ - nat i; - int r; - pthread_t tid; - - /* make some space for saving all the thread ids */ - task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info), - "initScheduler:task_ids"); - - /* and create all the threads */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - r = pthread_create(&tid,NULL,taskStart,NULL); - if (r != 0) { - barf("startTasks: Can't create new Posix thread"); - } - task_ids[i].id = tid; - task_ids[i].mut_time = 0.0; - task_ids[i].mut_etime = 0.0; - task_ids[i].gc_time = 0.0; - task_ids[i].gc_etime = 0.0; - task_ids[i].elapsedtimestart = elapsedtime(); - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid);); - } -} +#if defined(RTS_SUPPORTS_THREADS) + RELEASE_LOCK(&sched_mutex); #endif +} + void exitScheduler( void ) { -#ifdef SMP - nat i; - - /* Don't want to use pthread_cancel, since we'd have to install - * these silly exception handlers (pthread_cleanup_{push,pop}) around - * all our locks. - */ -#if 0 - /* Cancel all our tasks */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_cancel(task_ids[i].id); - } - - /* Wait for all the tasks to terminate */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", - task_ids[i].id)); - pthread_join(task_ids[i].id, NULL); - } -#endif - - /* Send 'em all a SIGHUP. That should shut 'em up. - */ - await_death = RtsFlags.ParFlags.nNodes; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_kill(task_ids[i].id,SIGTERM); - } - while (await_death > 0) { - sched_yield(); - } +#if defined(RTS_SUPPORTS_THREADS) + stopTaskManager(); #endif + shutting_down_scheduler = rtsTrue; } /* ----------------------------------------------------------------------------- @@ -1766,13 +2259,13 @@ finishAllThreads ( void ) { do { while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL ); + waitThread ( run_queue_hd, NULL); } while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL ); + waitThread ( blocked_queue_hd, NULL); } while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL ); + waitThread ( blocked_queue_hd, NULL); } } while (blocked_queue_hd != END_TSO_QUEUE || @@ -1782,55 +2275,92 @@ finishAllThreads ( void ) SchedulerStatus waitThread(StgTSO *tso, /*out*/StgClosure **ret) -{ +{ StgMainThread *m; SchedulerStatus stat; - ACQUIRE_LOCK(&sched_mutex); - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; m->ret = ret; m->stat = NoStatus; -#ifdef SMP - pthread_cond_init(&m->wakeup, NULL); +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&m->wakeup); #endif + /* see scheduleWaitThread() comment */ + ACQUIRE_LOCK(&sched_mutex); m->link = main_threads; main_threads = m; - IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", - m->tso->id)); + IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id)); +#if defined(THREADED_RTS) + stat = waitThread_(m, rtsFalse); +#else + stat = waitThread_(m); +#endif + RELEASE_LOCK(&sched_mutex); + return stat; +} -#ifdef SMP - do { - pthread_cond_wait(&m->wakeup, &sched_mutex); - } while (m->stat == NoStatus); +static +SchedulerStatus +waitThread_(StgMainThread* m +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ) +{ + SchedulerStatus stat; + + // Precondition: sched_mutex must be held. + IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); + +#if defined(RTS_SUPPORTS_THREADS) + +# if defined(THREADED_RTS) + if (!blockWaiting) { + /* In the threaded case, the OS thread that called main() + * gets to enter the RTS directly without going via another + * task/thread. + */ + main_main_thread = m; + RELEASE_LOCK(&sched_mutex); + schedule(); + ACQUIRE_LOCK(&sched_mutex); + main_main_thread = NULL; + ASSERT(m->stat != NoStatus); + } else +# endif + { + do { + waitCondition(&m->wakeup, &sched_mutex); + } while (m->stat == NoStatus); + } #elif defined(GRAN) /* GranSim specific init */ CurrentTSO = m->tso; // the TSO to run procStatus[MainProc] = Busy; // status of main PE CurrentProc = MainProc; // PE to run it on + RELEASE_LOCK(&sched_mutex); schedule(); #else + RELEASE_LOCK(&sched_mutex); schedule(); ASSERT(m->stat != NoStatus); #endif stat = m->stat; -#ifdef SMP - pthread_cond_destroy(&m->wakeup); +#if defined(RTS_SUPPORTS_THREADS) + closeCondition(&m->wakeup); #endif - IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", + IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", m->tso->id)); - free(m); - - RELEASE_LOCK(&sched_mutex); + stgFree(m); + // Postcondition: sched_mutex still held return stat; } @@ -1954,25 +2484,24 @@ take_off_run_queue(StgTSO *tso) { KH @ 25/10/99 */ -static void GetRoots(void) +void +GetRoots(evac_fn evac) { - StgMainThread *m; - #if defined(GRAN) { nat i; for (i=0; i<=RtsFlags.GranFlags.proc; i++) { if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) - run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]); + evac((StgClosure **)&run_queue_hds[i]); if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) - run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]); + evac((StgClosure **)&run_queue_tls[i]); if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) - blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]); + evac((StgClosure **)&blocked_queue_hds[i]); if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) - blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]); + evac((StgClosure **)&blocked_queue_tls[i]); if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) - ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]); + evac((StgClosure **)&ccalling_threads[i]); } } @@ -1980,32 +2509,53 @@ static void GetRoots(void) #else /* !GRAN */ if (run_queue_hd != END_TSO_QUEUE) { - ASSERT(run_queue_tl != END_TSO_QUEUE); - run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); - run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); + ASSERT(run_queue_tl != END_TSO_QUEUE); + evac((StgClosure **)&run_queue_hd); + evac((StgClosure **)&run_queue_tl); } - + if (blocked_queue_hd != END_TSO_QUEUE) { - ASSERT(blocked_queue_tl != END_TSO_QUEUE); - blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd); - blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl); + ASSERT(blocked_queue_tl != END_TSO_QUEUE); + evac((StgClosure **)&blocked_queue_hd); + evac((StgClosure **)&blocked_queue_tl); } - + if (sleeping_queue != END_TSO_QUEUE) { - sleeping_queue = (StgTSO *)MarkRoot((StgClosure *)sleeping_queue); + evac((StgClosure **)&sleeping_queue); } #endif - for (m = main_threads; m != NULL; m = m->link) { - m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso); + if (suspended_ccalling_threads != END_TSO_QUEUE) { + evac((StgClosure **)&suspended_ccalling_threads); } - if (suspended_ccalling_threads != END_TSO_QUEUE) - suspended_ccalling_threads = - (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads); -#if defined(SMP) || defined(PAR) || defined(GRAN) - markSparkQueue(); +#if defined(PAR) || defined(GRAN) + markSparkQueue(evac); +#endif + +#if defined(RTS_USER_SIGNALS) + // mark the signal handlers (signals should be already blocked) + markSignalHandlers(evac); #endif + + // main threads which have completed need to be retained until they + // are dealt with in the main scheduler loop. They won't be + // retained any other way: the GC will drop them from the + // all_threads list, so we have to be careful to treat them as roots + // here. + { + StgMainThread *m; + for (m = main_threads; m != NULL; m = m->link) { + switch (m->tso->what_next) { + case ThreadComplete: + case ThreadKilled: + evac((StgClosure **)&m->tso); + break; + default: + break; + } + } + } } /* ----------------------------------------------------------------------------- @@ -2021,33 +2571,39 @@ static void GetRoots(void) This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ -void (*extra_roots)(void); +static void (*extra_roots)(evac_fn); void performGC(void) { + /* Obligated to hold this lock upon entry */ + ACQUIRE_LOCK(&sched_mutex); GarbageCollect(GetRoots,rtsFalse); + RELEASE_LOCK(&sched_mutex); } void performMajorGC(void) { + ACQUIRE_LOCK(&sched_mutex); GarbageCollect(GetRoots,rtsTrue); + RELEASE_LOCK(&sched_mutex); } static void -AllRoots(void) +AllRoots(evac_fn evac) { - GetRoots(); /* the scheduler's roots */ - extra_roots(); /* the user's roots */ + GetRoots(evac); // the scheduler's roots + extra_roots(evac); // the user's roots } void -performGCWithRoots(void (*get_roots)(void)) +performGCWithRoots(void (*get_roots)(evac_fn)) { + ACQUIRE_LOCK(&sched_mutex); extra_roots = get_roots; - GarbageCollect(AllRoots,rtsFalse); + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -2062,7 +2618,7 @@ performGCWithRoots(void (*get_roots)(void)) static StgTSO * threadStackOverflow(StgTSO *tso) { - nat new_stack_size, new_tso_size, diff, stack_words; + nat new_stack_size, new_tso_size, stack_words; StgPtr new_sp; StgTSO *dest; @@ -2076,13 +2632,8 @@ threadStackOverflow(StgTSO *tso) printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, tso->sp+64))); -#ifdef INTERPRETER - fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" ); - exit(1); -#else /* Send this thread the StackOverflow exception */ raiseAsync(tso, (StgClosure *)stackOverflow_closure); -#endif return tso; } @@ -2096,10 +2647,10 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); - TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0); + TICK_ALLOC_TSO(new_stack_size,0); /* copy the TSO block and the old stack into the new area */ memcpy(dest,tso,TSO_STRUCT_SIZE); @@ -2108,25 +2659,19 @@ threadStackOverflow(StgTSO *tso) memcpy(new_sp, tso->sp, stack_words * sizeof(W_)); /* relocate the stack pointers... */ - diff = (P_)new_sp - (P_)tso->sp; /* In *words* */ - dest->su = (StgUpdateFrame *) ((P_)dest->su + diff); - dest->sp = new_sp; + dest->sp = new_sp; dest->stack_size = new_stack_size; - /* and relocate the update frame list */ - relocate_TSO(tso, dest); - /* Mark the old TSO as relocated. We have to check for relocated * TSOs in the garbage collector and any primops that deal with TSOs. * - * It's important to set the sp and su values to just beyond the end + * It's important to set the sp value to just beyond the end * of the stack, so we don't attempt to scavenge any part of the * dead TSO's stack. */ tso->what_next = ThreadRelocated; tso->link = dest; tso->sp = (P_)&(tso->stack[tso->stack_size]); - tso->su = (StgUpdateFrame *)tso->sp; tso->why_blocked = NotBlocked; dest->mut_link = NULL; @@ -2165,8 +2710,10 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) update blocked and fetch time (depending on type of the orig closure) */ if (RtsFlags.ParFlags.ParStats.Full) { DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, + GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, 0, 0 /* spark_queue_len(ADVISORY_POOL) */); + if (EMPTY_RUN_QUEUE()) + emitSchedule = rtsTrue; switch (get_itbl(node)->type) { case FETCH_ME_BQ: @@ -2177,6 +2724,10 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) case BLACKHOLE_BQ: ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; break; +#ifdef DIST + case MVAR: + break; +#endif default: barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue"); } @@ -2246,8 +2797,8 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) case BLOCKED_FETCH: /* if it's a BLOCKED_FETCH put it on the PendingFetches list */ next = bqe->link; - bqe->link = PendingFetches; - PendingFetches = bqe; + bqe->link = (StgBlockingQueueElement *)PendingFetches; + PendingFetches = (StgBlockedFetch *)bqe; break; # if defined(DEBUG) @@ -2266,7 +2817,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) (StgClosure *)bqe); # endif } - // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id)); + IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe))); return next; } @@ -2316,13 +2867,14 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) nat len = 0; IF_GRAN_DEBUG(bq, - belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \ + belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \ node, CurrentProc, CurrentTime[CurrentProc], CurrentTSO->id, CurrentTSO)); node_loc = where_is(node); - ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave + ASSERT(q == END_BQ_QUEUE || + get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave get_itbl(q)->type == CONSTR); // closure (type constructor) ASSERT(is_unique(node)); @@ -2392,15 +2944,23 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) { - StgBlockingQueueElement *bqe, *next; + StgBlockingQueueElement *bqe; ACQUIRE_LOCK(&sched_mutex); IF_PAR_DEBUG(verbose, - belch("## AwBQ for node %p on [%x]: ", + belch("##-_ AwBQ for node %p on [%x]: ", node, mytid)); - - ASSERT(get_itbl(q)->type == TSO || +#ifdef DIST + //RFP + if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) { + IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)")); + return; + } +#endif + + ASSERT(q == END_BQ_QUEUE || + get_itbl(q)->type == TSO || get_itbl(q)->type == BLOCKED_FETCH || get_itbl(q)->type == CONSTR); @@ -2413,6 +2973,17 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) } #else /* !GRAN && !PAR */ + +#ifdef RTS_SUPPORTS_THREADS +void +awakenBlockedQueueNoLock(StgTSO *tso) +{ + while (tso != END_TSO_QUEUE) { + tso = unblockOneLocked(tso); + } +} +#endif + void awakenBlockedQueue(StgTSO *tso) { @@ -2452,13 +3023,15 @@ interruptStgRts(void) NB: only the type of the blocking queue is different in GranSim and GUM the operations on the queue-elements are the same long live polymorphism! + + Locks: sched_mutex is held upon entry and exit. + */ static void unblockThread(StgTSO *tso) { StgBlockingQueueElement *t, **last; - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { case NotBlocked: @@ -2507,6 +3080,12 @@ unblockThread(StgTSO *tso) StgTSO *target = tso->block_info.tso; ASSERT(get_itbl(target)->type == TSO); + + if (target->what_next == ThreadRelocated) { + target = target->link; + ASSERT(get_itbl(target)->type == TSO); + } + ASSERT(target->blocked_exceptions != NULL); last = (StgBlockingQueueElement **)&target->blocked_exceptions; @@ -2525,6 +3104,7 @@ unblockThread(StgTSO *tso) case BlockedOnRead: case BlockedOnWrite: { + /* take TSO off blocked_queue */ StgBlockingQueueElement *prev = NULL; for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; prev = t, t = t->link) { @@ -2548,6 +3128,7 @@ unblockThread(StgTSO *tso) case BlockedOnDelay: { + /* take TSO off sleeping_queue */ StgBlockingQueueElement *prev = NULL; for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; prev = t, t = t->link) { @@ -2572,20 +3153,20 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #else static void unblockThread(StgTSO *tso) { StgTSO *t, **last; + + /* To avoid locking unnecessarily. */ + if (tso->why_blocked == NotBlocked) { + return; + } - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { - case NotBlocked: - return; /* not blocked */ - case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -2627,6 +3208,12 @@ unblockThread(StgTSO *tso) StgTSO *target = tso->block_info.tso; ASSERT(get_itbl(target)->type == TSO); + + while (target->what_next == ThreadRelocated) { + target = target->link; + ASSERT(get_itbl(target)->type == TSO); + } + ASSERT(target->blocked_exceptions != NULL); last = &target->blocked_exceptions; @@ -2691,7 +3278,6 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #endif @@ -2711,12 +3297,12 @@ unblockThread(StgTSO *tso) * the top of the stack. * * How exactly do we save all the active computations? We create an - * AP_UPD for every UpdateFrame on the stack. Entering one of these - * AP_UPDs pushes everything from the corresponding update frame + * AP_STACK for every UpdateFrame on the stack. Entering one of these + * AP_STACKs pushes everything from the corresponding update frame * upwards onto the stack. (Actually, it pushes everything up to the - * next update frame plus a pointer to the next AP_UPD object. - * Entering the next AP_UPD object pushes more onto the stack until we - * reach the last AP_UPD object - at which point the stack should look + * next update frame plus a pointer to the next AP_STACK object. + * Entering the next AP_STACK object pushes more onto the stack until we + * reach the last AP_STACK object - at which point the stack should look * exactly as it did when we killed the TSO and we can continue * execution by entering the closure on top of the stack. * @@ -2725,6 +3311,8 @@ unblockThread(StgTSO *tso) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * + * Locks: sched_mutex held upon entry nor exit. + * * -------------------------------------------------------------------------- */ void @@ -2734,196 +3322,181 @@ deleteThread(StgTSO *tso) } void +raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) +{ + /* When raising async exs from contexts where sched_mutex isn't held; + use raiseAsyncWithLock(). */ + ACQUIRE_LOCK(&sched_mutex); + raiseAsync(tso,exception); + RELEASE_LOCK(&sched_mutex); +} + +void raiseAsync(StgTSO *tso, StgClosure *exception) { - StgUpdateFrame* su = tso->su; - StgPtr sp = tso->sp; + StgRetInfoTable *info; + StgPtr sp; - /* Thread already dead? */ - if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; - } - - IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id)); - - /* Remove it from any blocking queues */ - unblockThread(tso); - - /* The stack freezing code assumes there's a closure pointer on - * the top of the stack. This isn't always the case with compiled - * code, so we have to push a dummy closure on the top which just - * returns to the next return address on the stack. - */ - if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) { - *(--sp) = (W_)&stg_dummy_ret_closure; - } - - while (1) { - int words = ((P_)su - (P_)sp) - 1; - nat i; - StgAP_UPD * ap; - - /* If we find a CATCH_FRAME, and we've got an exception to raise, - * then build PAP(handler,exception,realworld#), and leave it on - * top of the stack ready to enter. - */ - if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) { - StgCatchFrame *cf = (StgCatchFrame *)su; - /* we've got an exception to raise, so let's pass it to the - * handler in this frame. - */ - ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2); - TICK_ALLOC_UPD_PAP(3,0); - SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs); - - ap->n_args = 2; - ap->fun = cf->handler; /* :: Exception -> IO a */ - ap->payload[0] = exception; - ap->payload[1] = ARG_TAG(0); /* realworld token */ - - /* throw away the stack from Sp up to and including the - * CATCH_FRAME. - */ - sp = (P_)su + sizeofW(StgCatchFrame) - 1; - tso->su = cf->link; - - /* Restore the blocked/unblocked state for asynchronous exceptions - * at the CATCH_FRAME. - * - * If exceptions were unblocked at the catch, arrange that they - * are unblocked again after executing the handler by pushing an - * unblockAsyncExceptions_ret stack frame. - */ - if (!cf->exceptions_blocked) { - *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info; - } - - /* Ensure that async exceptions are blocked when running the handler. - */ - if (tso->blocked_exceptions == NULL) { - tso->blocked_exceptions = END_TSO_QUEUE; - } - - /* Put the newly-built PAP on top of the stack, ready to execute - * when the thread restarts. - */ - sp[0] = (W_)ap; - tso->sp = sp; - tso->what_next = ThreadEnterGHC; - IF_DEBUG(sanity, checkTSO(tso)); - return; + // Thread already dead? + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; } - /* First build an AP_UPD consisting of the stack chunk above the - * current update frame, with the top word on the stack as the - * fun field. - */ - ap = (StgAP_UPD *)allocate(AP_sizeW(words)); + IF_DEBUG(scheduler, + sched_belch("raising exception in thread %ld.", tso->id)); - ASSERT(words >= 0); + // Remove it from any blocking queues + unblockThread(tso); + + sp = tso->sp; - ap->n_args = words; - ap->fun = (StgClosure *)sp[0]; - sp++; - for(i=0; i < (nat)words; ++i) { - ap->payload[i] = (StgClosure *)*sp++; + // The stack freezing code assumes there's a closure pointer on + // the top of the stack, so we have to arrange that this is the case... + // + if (sp[0] == (W_)&stg_enter_info) { + sp++; + } else { + sp--; + sp[0] = (W_)&stg_dummy_ret_closure; } - - switch (get_itbl(su)->type) { - - case UPDATE_FRAME: - { - SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); - TICK_ALLOC_UP_THK(words+1,0); - - IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Updating "); - printPtr((P_)su->updatee); - fprintf(stderr, " with "); - printObj((StgClosure *)ap); - ); - - /* Replace the updatee with an indirection - happily - * this will also wake up any threads currently - * waiting on the result. - */ - UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */ - su = su->link; - sp += sizeofW(StgUpdateFrame) -1; - sp[0] = (W_)ap; /* push onto stack */ - break; - } - - case CATCH_FRAME: - { - StgCatchFrame *cf = (StgCatchFrame *)su; - StgClosure* o; - - /* We want a PAP, not an AP_UPD. Fortunately, the - * layout's the same. - */ - SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */); - TICK_ALLOC_UPD_PAP(words+1,0); - - /* now build o = FUN(catch,ap,handler) */ - o = (StgClosure *)allocate(sizeofW(StgClosure)+2); - TICK_ALLOC_FUN(2,0); - SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */); - o->payload[0] = (StgClosure *)ap; - o->payload[1] = cf->handler; + + while (1) { + nat i; + + // 1. Let the top of the stack be the "current closure" + // + // 2. Walk up the stack until we find either an UPDATE_FRAME or a + // CATCH_FRAME. + // + // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the + // current closure applied to the chunk of stack up to (but not + // including) the update frame. This closure becomes the "current + // closure". Go back to step 2. + // + // 4. If it's a CATCH_FRAME, then leave the exception handler on + // top of the stack applied to the exception. + // + // 5. If it's a STOP_FRAME, then kill the thread. - IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Built "); - printObj((StgClosure *)o); - ); + StgPtr frame; - /* pop the old handler and put o on the stack */ - su = cf->link; - sp += sizeofW(StgCatchFrame) - 1; - sp[0] = (W_)o; - break; - } - - case SEQ_FRAME: - { - StgSeqFrame *sf = (StgSeqFrame *)su; - StgClosure* o; + frame = sp + 1; + info = get_ret_itbl((StgClosure *)frame); - SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */); - TICK_ALLOC_UPD_PAP(words+1,0); + while (info->i.type != UPDATE_FRAME + && (info->i.type != CATCH_FRAME || exception == NULL) + && info->i.type != STOP_FRAME) { + frame += stack_frame_sizeW((StgClosure *)frame); + info = get_ret_itbl((StgClosure *)frame); + } - /* now build o = FUN(seq,ap) */ - o = (StgClosure *)allocate(sizeofW(StgClosure)+1); - TICK_ALLOC_SE_THK(1,0); - SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */); - o->payload[0] = (StgClosure *)ap; + switch (info->i.type) { + + case CATCH_FRAME: + // If we find a CATCH_FRAME, and we've got an exception to raise, + // then build the THUNK raise(exception), and leave it on + // top of the CATCH_FRAME ready to enter. + // + { +#ifdef PROFILING + StgCatchFrame *cf = (StgCatchFrame *)frame; +#endif + StgClosure *raise; + + // we've got an exception to raise, so let's pass it to the + // handler in this frame. + // + raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); + TICK_ALLOC_SE_THK(1,0); + SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); + raise->payload[0] = exception; + + // throw away the stack from Sp up to the CATCH_FRAME. + // + sp = frame - 1; + + /* Ensure that async excpetions are blocked now, so we don't get + * a surprise exception before we get around to executing the + * handler. + */ + if (tso->blocked_exceptions == NULL) { + tso->blocked_exceptions = END_TSO_QUEUE; + } + + /* Put the newly-built THUNK on top of the stack, ready to execute + * when the thread restarts. + */ + sp[0] = (W_)raise; + sp[-1] = (W_)&stg_enter_info; + tso->sp = sp-1; + tso->what_next = ThreadRunGHC; + IF_DEBUG(sanity, checkTSO(tso)); + return; + } - IF_DEBUG(scheduler, - fprintf(stderr, "scheduler: Built "); - printObj((StgClosure *)o); - ); + case UPDATE_FRAME: + { + StgAP_STACK * ap; + nat words; + + // First build an AP_STACK consisting of the stack chunk above the + // current update frame, with the top word on the stack as the + // fun field. + // + words = frame - sp - 1; + ap = (StgAP_STACK *)allocate(PAP_sizeW(words)); + + ap->size = words; + ap->fun = (StgClosure *)sp[0]; + sp++; + for(i=0; i < (nat)words; ++i) { + ap->payload[i] = (StgClosure *)*sp++; + } + + SET_HDR(ap,&stg_AP_STACK_info, + ((StgClosure *)frame)->header.prof.ccs /* ToDo */); + TICK_ALLOC_UP_THK(words+1,0); + + IF_DEBUG(scheduler, + fprintf(stderr, "scheduler: Updating "); + printPtr((P_)((StgUpdateFrame *)frame)->updatee); + fprintf(stderr, " with "); + printObj((StgClosure *)ap); + ); + + // Replace the updatee with an indirection - happily + // this will also wake up any threads currently + // waiting on the result. + // + // Warning: if we're in a loop, more than one update frame on + // the stack may point to the same object. Be careful not to + // overwrite an IND_OLDGEN in this case, because we'll screw + // up the mutable lists. To be on the safe side, don't + // overwrite any kind of indirection at all. See also + // threadSqueezeStack in GC.c, where we have to make a similar + // check. + // + if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) { + // revert the black hole + UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap); + } + sp += sizeofW(StgUpdateFrame) - 1; + sp[0] = (W_)ap; // push onto stack + break; + } - /* pop the old handler and put o on the stack */ - su = sf->link; - sp += sizeofW(StgSeqFrame) - 1; - sp[0] = (W_)o; - break; - } - - case STOP_FRAME: - /* We've stripped the entire stack, the thread is now dead. */ - sp += sizeofW(StgStopFrame) - 1; - sp[0] = (W_)exception; /* save the exception */ - tso->what_next = ThreadKilled; - tso->su = (StgUpdateFrame *)(sp+1); - tso->sp = sp; - return; - - default: - barf("raiseAsync"); + case STOP_FRAME: + // We've stripped the entire stack, the thread is now dead. + sp += sizeofW(StgStopFrame); + tso->what_next = ThreadKilled; + tso->sp = sp; + return; + + default: + barf("raiseAsync"); + } } - } - barf("raiseAsync"); + barf("raiseAsync"); } /* ----------------------------------------------------------------------------- @@ -2932,6 +3505,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception) up and sent a signal: BlockedOnDeadMVar if the thread was blocked on an MVar, or NonTermination if the thread was blocked on a Black Hole. + + Locks: sched_mutex isn't held upon entry nor exit. -------------------------------------------------------------------------- */ void @@ -2948,6 +3523,7 @@ resurrectThreads( StgTSO *threads ) switch (tso->why_blocked) { case BlockedOnMVar: case BlockedOnException: + /* Called by GC - sched_mutex lock is currently held. */ raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure); break; case BlockedOnBlackHole: @@ -2972,63 +3548,71 @@ resurrectThreads( StgTSO *threads ) * * This is only done in a deadlock situation in order to avoid * performance overhead in the normal case. + * + * Locks: sched_mutex is held upon entry and exit. * -------------------------------------------------------------------------- */ static void detectBlackHoles( void ) { - StgTSO *t = all_threads; - StgUpdateFrame *frame; + StgTSO *tso = all_threads; + StgClosure *frame; StgClosure *blocked_on; + StgRetInfoTable *info; - for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { + for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) { - if (t->why_blocked != BlockedOnBlackHole) { + while (tso->what_next == ThreadRelocated) { + tso = tso->link; + ASSERT(get_itbl(tso)->type == TSO); + } + + if (tso->why_blocked != BlockedOnBlackHole) { continue; } + blocked_on = tso->block_info.closure; - blocked_on = t->block_info.closure; - - for (frame = t->su; ; frame = frame->link) { - switch (get_itbl(frame)->type) { + frame = (StgClosure *)tso->sp; + while(1) { + info = get_ret_itbl(frame); + switch (info->i.type) { case UPDATE_FRAME: - if (frame->updatee == blocked_on) { + if (((StgUpdateFrame *)frame)->updatee == blocked_on) { /* We are blocking on one of our own computations, so * send this thread the NonTermination exception. */ IF_DEBUG(scheduler, - sched_belch("thread %d is blocked on itself", t->id)); - raiseAsync(t, (StgClosure *)NonTermination_closure); + sched_belch("thread %d is blocked on itself", tso->id)); + raiseAsync(tso, (StgClosure *)NonTermination_closure); goto done; } - else { - continue; - } - - case CATCH_FRAME: - case SEQ_FRAME: - continue; + frame = (StgClosure *) ((StgUpdateFrame *)frame + 1); + continue; + case STOP_FRAME: - break; - } - break; - } + goto done; - done: - } + // normal stack frames; do nothing except advance the pointer + default: + (StgPtr)frame += stack_frame_sizeW(frame); + } + } + done: ; + } } //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code //@subsection Debugging Routines /* ----------------------------------------------------------------------------- - Debugging: why is a thread blocked + * Debugging: why is a thread blocked + * [Also provides useful information when debugging threaded programs + * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02] -------------------------------------------------------------------------- */ -#ifdef DEBUG - +static void printThreadBlockage(StgTSO *tso) { @@ -3065,12 +3649,21 @@ printThreadBlockage(StgTSO *tso) tso->block_info.closure, info_type(tso->block_info.closure)); break; #endif +#if defined(RTS_SUPPORTS_THREADS) + case BlockedOnCCall: + fprintf(stderr,"is blocked on an external call"); + break; + case BlockedOnCCall_NoUnblockExc: + fprintf(stderr,"is blocked on an external call (exceptions were already blocked)"); + break; +#endif default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", tso->why_blocked, tso->id, tso); } } +static void printThreadStatus(StgTSO *tso) { @@ -3090,15 +3683,35 @@ void printAllThreads(void) { StgTSO *t; + void *label; + +# if defined(GRAN) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(TIME_ON_PROC(CurrentProc), + time_string, rtsFalse/*no commas!*/); + + fprintf(stderr, "all threads at [%s]:\n", time_string); +# elif defined(PAR) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(CURRENT_TIME, + time_string, rtsFalse/*no commas!*/); + + fprintf(stderr,"all threads at [%s]:\n", time_string); +# else + fprintf(stderr,"all threads:\n"); +# endif - sched_belch("all threads:"); for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - fprintf(stderr, "\tthread %d ", t->id); + fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t); + label = lookupThreadLabel((StgWord)t); + if (label) fprintf(stderr,"[\"%s\"] ",(char *)label); printThreadStatus(t); fprintf(stderr,"\n"); } } +#ifdef DEBUG + /* Print a whole blocking queue attached to node (debugging only). */ @@ -3117,27 +3730,41 @@ print_bq (StgClosure *node) /* should cover all closures that may have a blocking queue */ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || get_itbl(node)->type == FETCH_ME_BQ || - get_itbl(node)->type == RBH); + get_itbl(node)->type == RBH || + get_itbl(node)->type == MVAR); ASSERT(node!=(StgClosure*)NULL); // sanity check + + print_bqe(((StgBlockingQueue*)node)->blocking_queue); +} + +/* + Print a whole blocking queue starting with the element bqe. +*/ +void +print_bqe (StgBlockingQueueElement *bqe) +{ + rtsBool end; + /* NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; */ - for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE); + for (end = (bqe==END_BQ_QUEUE); !end; // iterate until bqe points to a CONSTR - end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) { - ASSERT(bqe != END_BQ_QUEUE); // sanity check - ASSERT(bqe != (StgTSO*)NULL); // sanity check + end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), + bqe = end ? END_BQ_QUEUE : bqe->link) { + ASSERT(bqe != END_BQ_QUEUE); // sanity check + ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check /* types of closures that may appear in a blocking queue */ ASSERT(get_itbl(bqe)->type == TSO || get_itbl(bqe)->type == BLOCKED_FETCH || get_itbl(bqe)->type == CONSTR); /* only BQs of an RBH end with an RBH_Save closure */ - ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); + //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); switch (get_itbl(bqe)->type) { case TSO: - fprintf(stderr," TSO %d (%x),", + fprintf(stderr," TSO %u (%x),", ((StgTSO *)bqe)->id, ((StgTSO *)bqe)); break; case BLOCKED_FETCH: @@ -3155,8 +3782,8 @@ print_bq (StgClosure *node) "RBH_Save_?"), get_itbl(bqe)); break; default: - barf("Unexpected closure type %s in blocking queue of %p (%s)", - info_type(bqe), node, info_type(node)); + barf("Unexpected closure type %s in blocking queue", // of %p (%s)", + info_type((StgClosure *)bqe)); // , node, info_type(node)); break; } } /* for */ @@ -3259,12 +3886,15 @@ sched_belch(char *s, ...) va_list ap; va_start(ap,s); #ifdef SMP - fprintf(stderr, "scheduler (task %ld): ", pthread_self()); + fprintf(stderr, "scheduler (task %ld): ", osThreadId()); +#elif defined(PAR) + fprintf(stderr, "== "); #else fprintf(stderr, "scheduler: "); #endif vfprintf(stderr, s, ap); fprintf(stderr, "\n"); + va_end(ap); } #endif /* DEBUG */ @@ -3274,18 +3904,15 @@ sched_belch(char *s, ...) //@subsection Index //@index -//* MainRegTable:: @cindex\s-+MainRegTable //* StgMainThread:: @cindex\s-+StgMainThread //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd //* blocked_queue_tl:: @cindex\s-+blocked_queue_tl //* context_switch:: @cindex\s-+context_switch //* createThread:: @cindex\s-+createThread -//* free_capabilities:: @cindex\s-+free_capabilities //* gc_pending_cond:: @cindex\s-+gc_pending_cond //* initScheduler:: @cindex\s-+initScheduler //* interrupted:: @cindex\s-+interrupted -//* n_free_capabilities:: @cindex\s-+n_free_capabilities //* next_thread_id:: @cindex\s-+next_thread_id //* print_bq:: @cindex\s-+print_bq //* run_queue_hd:: @cindex\s-+run_queue_hd @@ -3293,7 +3920,5 @@ sched_belch(char *s, ...) //* sched_mutex:: @cindex\s-+sched_mutex //* schedule:: @cindex\s-+schedule //* take_off_run_queue:: @cindex\s-+take_off_run_queue -//* task_ids:: @cindex\s-+task_ids //* term_mutex:: @cindex\s-+term_mutex -//* thread_ready_cond:: @cindex\s-+thread_ready_cond //@end index