X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=530bdf904167d9d8e8837f6d46909dae6ed2b61e;hb=fbbed914e114b6b55158319dca8956885f301ff5;hp=4430f5a2aa16eca5dc321a267b7161909fea3624;hpb=a62d5cd262731e3e67e5f27b197e913d872dc4ad;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 4430f5a..530bdf9 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.121 2002/02/12 15:38:08 sof Exp $ + * $Id: Schedule.c,v 1.146 2002/06/26 08:18:42 stolz Exp $ * * (c) The GHC Team, 1998-2000 * @@ -96,6 +96,7 @@ #include "Stats.h" #include "Itimer.h" #include "Prelude.h" +#include "ThreadLabels.h" #ifdef PROFILING #include "Proftimer.h" #include "ProfHeap.h" @@ -114,40 +115,22 @@ #include "OSThreads.h" #include "Task.h" +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif + #include //@node Variables and Data structures, Prototypes, Includes, Main scheduling code //@subsection Variables and Data structures -/* Main threads: - * - * These are the threads which clients have requested that we run. - * - * In a 'threaded' build, we might have several concurrent clients all - * waiting for results, and each one will wait on a condition variable - * until the result is available. - * - * In non-SMP, clients are strictly nested: the first client calls - * into the RTS, which might call out again to C with a _ccall_GC, and - * eventually re-enter the RTS. - * - * Main threads information is kept in a linked list: - */ -//@cindex StgMainThread -typedef struct StgMainThread_ { - StgTSO * tso; - SchedulerStatus stat; - StgClosure ** ret; -#if defined(RTS_SUPPORTS_THREADS) - Condition wakeup; -#endif - struct StgMainThread_ *link; -} StgMainThread; - /* Main thread queue. * Locks required: sched_mutex. */ -static StgMainThread *main_threads; +StgMainThread *main_threads; /* Thread queues. * Locks required: sched_mutex. @@ -158,7 +141,7 @@ StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */ /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */ /* - In GranSim we have a runable and a blocked queue for each processor. + In GranSim we have a runnable and a blocked queue for each processor. In order to minimise code changes new arrays run_queue_hds/tls are created. run_queue_hd is then a short cut (macro) for run_queue_hds[CurrentProc] (see GranSim.h). @@ -207,7 +190,7 @@ nat context_switch; rtsBool interrupted; /* Next thread ID to allocate. - * Locks required: sched_mutex + * Locks required: thread_id_mutex */ //@cindex next_thread_id StgThreadID next_thread_id = 1; @@ -243,15 +226,17 @@ StgTSO dummy_tso; rtsBool ready_to_gc; +/* + * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) -- + * in an MT setting, needed to signal that a worker thread shouldn't hang around + * in the scheduler when it is out of work. + */ +static rtsBool shutting_down_scheduler = rtsFalse; + void addToBlockedQueue ( StgTSO *tso ); static void schedule ( void ); void interruptStgRts ( void ); -#if defined(GRAN) -static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri ); -#else -static StgTSO * createThread_ ( nat size, rtsBool have_lock ); -#endif static void detectBlackHoles ( void ); @@ -265,60 +250,20 @@ static void sched_belch(char *s, ...); */ Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; -#if defined(THREADED_RTS) -/* - * The rts_mutex is the 'big lock' that the active native - * thread within the RTS holds while executing code. - * It is given up when the thread makes a transition out of - * the RTS (e.g., to perform an external C call), hopefully - * for another thread to take over its chores and enter - * the RTS. - * - */ -Mutex rts_mutex = INIT_MUTEX_VAR; + /* - * When a native thread has completed executing an external - * call, it needs to communicate the result back to the - * (Haskell) thread that made the call. Do this as follows: - * - * - in resumeThread(), the thread increments the counter - * threads_waiting, and then blocks on the 'big' RTS lock. - * - upon entry to the scheduler, the thread that's currently - * holding the RTS lock checks threads_waiting. If there - * are native threads waiting, it gives up its RTS lock - * and tries to re-grab the RTS lock [perhaps after having - * waited for a bit..?] - * - care must be taken to deal with the case where more than - * one external thread are waiting on the lock. [ToDo: more] - * + * A heavyweight solution to the problem of protecting + * the thread_id from concurrent update. */ +Mutex thread_id_mutex = INIT_MUTEX_VAR; -static nat threads_waiting = 0; -#endif - - -/* thread_ready_cond: when signalled, a thread has become runnable for a - * task to execute. - * - * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its DS (that are not under sched_mutex's - * control). - * - * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold. - * - */ -Condition thread_ready_cond = INIT_COND_VAR; -#if 0 -/* For documentation purposes only */ -#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE()) -#endif -#if defined(SMP) -Condition gc_pending_cond = INIT_COND_VAR; +# if defined(SMP) +static Condition gc_pending_cond = INIT_COND_VAR; nat await_death; -#endif +# endif -#endif +#endif /* RTS_SUPPORTS_THREADS */ #if defined(PAR) StgTSO *LastTSO; @@ -360,12 +305,6 @@ static void taskStart(void); static void taskStart(void) { - /* threads start up using 'taskStart', so make them - them grab the RTS lock. */ -#if defined(THREADED_RTS) - ACQUIRE_LOCK(&rts_mutex); - taskNotAvailable(); -#endif schedule(); } #endif @@ -433,31 +372,15 @@ schedule( void ) rtsBool was_interrupted = rtsFalse; ACQUIRE_LOCK(&sched_mutex); - -#if defined(THREADED_RTS) - /* ToDo: consider SMP support */ - if (threads_waiting > 0) { - /* (At least) one native thread is waiting to - * deposit the result of an external call. So, - * give up our RTS executing privileges and let - * one of them continue. - * - */ - taskAvailable(); - RELEASE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting)); - RELEASE_LOCK(&rts_mutex); - /* ToDo: come up with mechanism that guarantees that - * the main thread doesn't loop here. - */ - yieldThread(); - /* ToDo: longjmp() */ - taskStart(); - } + +#if defined(RTS_SUPPORTS_THREADS) + waitForWorkCapability(&sched_mutex, &cap, rtsFalse); +#else + /* simply initialise it in the non-threaded case */ + grabCapability(&cap); #endif #if defined(GRAN) - /* set up first event to get things going */ /* ToDo: assign costs for system setup and init MainTSO ! */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -492,6 +415,13 @@ schedule( void ) IF_DEBUG(scheduler, printAllThreads()); +#if defined(RTS_SUPPORTS_THREADS) + /* Check to see whether there are any worker threads + waiting to deposit external call results. If so, + yield our capability */ + yieldToReturningWorker(&sched_mutex, &cap); +#endif + /* If we're interrupted (the user pressed ^C, or some other * termination condition occurred), kill all the currently running * threads. @@ -521,6 +451,9 @@ schedule( void ) *prev = m->link; m->stat = Success; broadcastCondition(&m->wakeup); +#ifdef DEBUG + removeThreadLabel(m->tso); +#endif break; case ThreadKilled: if (m->ret) *(m->ret) = NULL; @@ -531,6 +464,9 @@ schedule( void ) m->stat = Killed; } broadcastCondition(&m->wakeup); +#ifdef DEBUG + removeThreadLabel(m->tso); +#endif break; default: break; @@ -550,6 +486,9 @@ schedule( void ) StgMainThread *m = main_threads; if (m->tso->what_next == ThreadComplete || m->tso->what_next == ThreadKilled) { +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif main_threads = main_threads->link; if (m->tso->what_next == ThreadComplete) { /* we finished successfully, fill in the return value */ @@ -614,7 +553,9 @@ schedule( void ) /* check for signals each time around the scheduler */ #ifndef mingw32_TARGET_OS if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); } #endif @@ -646,51 +587,79 @@ schedule( void ) * inform all the main threads. */ #ifndef PAR - if ( EMPTY_QUEUE(blocked_queue_hd) - && EMPTY_RUN_QUEUE() - && EMPTY_QUEUE(sleeping_queue) -#if defined(SMP) - && allFreeCapabilities() -#elif defined(THREADED_RTS) + if ( EMPTY_THREAD_QUEUES() +#if defined(RTS_SUPPORTS_THREADS) && EMPTY_QUEUE(suspended_ccalling_threads) #endif +#ifdef SMP + && allFreeCapabilities() +#endif ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); - RELEASE_LOCK(&sched_mutex); +#if defined(THREADED_RTS) + /* and SMP mode ..? */ + releaseCapability(cap); +#endif + // Garbage collection can release some new threads due to + // either (a) finalizers or (b) threads resurrected because + // they are about to be send BlockedOnDeadMVar. Any threads + // thus released will be immediately runnable. GarbageCollect(GetRoots,rtsTrue); - ACQUIRE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, sched_belch("GC done.")); - if ( EMPTY_QUEUE(blocked_queue_hd) - && EMPTY_RUN_QUEUE() - && EMPTY_QUEUE(sleeping_queue) ) { - - IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes...")); - detectBlackHoles(); - - /* No black holes, so probably a real deadlock. Send the - * current main thread the Deadlock exception (or in the SMP - * build, send *all* main threads the deadlock exception, - * since none of them can make progress). - */ - if ( EMPTY_RUN_QUEUE() ) { - StgMainThread *m; + + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } + + IF_DEBUG(scheduler, + sched_belch("still deadlocked, checking for black holes...")); + detectBlackHoles(); + + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } + +#ifndef mingw32_TARGET_OS + /* If we have user-installed signal handlers, then wait + * for signals to arrive rather then bombing out with a + * deadlock. + */ #if defined(RTS_SUPPORTS_THREADS) - for (m = main_threads; m != NULL; m = m->link) { - switch (m->tso->why_blocked) { - case BlockedOnBlackHole: - raiseAsync(m->tso, (StgClosure *)NonTermination_closure); - break; - case BlockedOnException: - case BlockedOnMVar: - raiseAsync(m->tso, (StgClosure *)Deadlock_closure); - break; - default: - barf("deadlock: main thread blocked in a strange way"); - } - } + if ( 0 ) { /* hmm..what to do? Simply stop waiting for + a signal with no runnable threads (or I/O + suspended ones) leads nowhere quick. + For now, simply shut down when we reach this + condition. + + ToDo: define precisely under what conditions + the Scheduler should shut down in an MT setting. + */ #else - m = main_threads; + if ( anyUserHandlers() ) { +#endif + IF_DEBUG(scheduler, + sched_belch("still deadlocked, waiting for signals...")); + + awaitUserSignals(); + + // we might be interrupted... + if (interrupted) { continue; } + + if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); + startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); + } + ASSERT(!EMPTY_RUN_QUEUE()); + goto not_deadlocked; + } +#endif + + /* Probably a real deadlock. Send the current main thread the + * Deadlock exception (or in the SMP build, send *all* main + * threads the deadlock exception, since none of them can make + * progress). + */ + { + StgMainThread *m; +#if defined(RTS_SUPPORTS_THREADS) + for (m = main_threads; m != NULL; m = m->link) { switch (m->tso->why_blocked) { case BlockedOnBlackHole: raiseAsync(m->tso, (StgClosure *)NonTermination_closure); @@ -702,18 +671,34 @@ schedule( void ) default: barf("deadlock: main thread blocked in a strange way"); } -#endif } -#if defined(RTS_SUPPORTS_THREADS) - if ( EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down.")); - shutdownHaskellAndExit(0); - +#else + m = main_threads; + switch (m->tso->why_blocked) { + case BlockedOnBlackHole: + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + break; + case BlockedOnException: + case BlockedOnMVar: + raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + break; + default: + barf("deadlock: main thread blocked in a strange way"); } #endif - ASSERT( !EMPTY_RUN_QUEUE() ); } + +#if defined(RTS_SUPPORTS_THREADS) + /* ToDo: revisit conditions (and mechanism) for shutting + down a multi-threaded world */ + IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); + RELEASE_LOCK(&sched_mutex); + shutdownHaskell(); + return; +#endif } + not_deadlocked: + #elif defined(PAR) /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ #endif @@ -728,35 +713,29 @@ schedule( void ) } #endif -#if defined(SMP) +#if defined(RTS_SUPPORTS_THREADS) /* block until we've got a thread on the run queue and a free * capability. + * */ - while ( noCapabilities() || EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("waiting for work")); - waitCondition( &thread_ready_cond, &sched_mutex ); - IF_DEBUG(scheduler, sched_belch("work now available")); - } -#elif defined(THREADED_RTS) - if ( EMPTY_RUN_QUEUErun_queue_hd == END_TSO_QUEUE ) { - /* no work available, wait for external calls to complete. */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId())); - taskAvailable(); - RELEASE_LOCK(&rts_mutex); + if ( EMPTY_RUN_QUEUE() ) { + /* Give up our capability */ + releaseCapability(cap); - while ( EMPTY_RUN_QUEUE() ) { - waitCondition(&thread_ready_cond, &sched_mutex); - }; - RELEASE_LOCK(&sched_mutex); - - IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId())); - /* ToDo: longjmp() */ - taskStart(); - } + /* If we're in the process of shutting down (& running the + * a batch of finalisers), don't wait around. + */ + if ( shutting_down_scheduler ) { + RELEASE_LOCK(&sched_mutex); + return; + } + IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); + waitForWorkCapability(&sched_mutex, &cap, rtsTrue); + IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); + } #endif #if defined(GRAN) - if (RtsFlags.GranFlags.Light) GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc @@ -1002,7 +981,7 @@ schedule( void ) belch("--=^ %d threads, %d sparks on [%#x]", run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); -#if 1 +# if 1 if (0 && RtsFlags.ParFlags.ParStats.Full && t && LastTSO && t->id != LastTSO->id && LastTSO->why_blocked == NotBlocked && @@ -1027,11 +1006,10 @@ schedule( void ) emitSchedule = rtsFalse; } -#endif +# endif #else /* !GRAN && !PAR */ - /* grab a thread from the run queue - */ + /* grab a thread from the run queue */ ASSERT(run_queue_hd != END_TSO_QUEUE); t = POP_RUN_QUEUE(); // Sanity check the thread we're about to run. This can be @@ -1039,7 +1017,6 @@ schedule( void ) IF_DEBUG(sanity,checkTSO(t)); #endif - grabCapability(&cap); cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1387,10 +1364,6 @@ schedule( void ) default: barf("schedule: invalid thread return code %d", (int)ret); } - -#ifdef SMP - grabCapability(&cap); -#endif #ifdef PROFILING if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) { @@ -1401,12 +1374,11 @@ schedule( void ) } #endif + if (ready_to_gc #ifdef SMP - if (ready_to_gc && allFreeCapabilities() ) -#else - if (ready_to_gc) + && allFreeCapabilities() #endif - { + ) { /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1450,28 +1422,63 @@ schedule( void ) } /* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ + +StgInt forkProcess(StgTSO* tso) { + +#ifndef mingw32_TARGET_OS + pid_t pid; + StgTSO* t,*next; + + IF_DEBUG(scheduler,sched_belch("forking!")); + + pid = fork(); + if (pid) { /* parent */ + + /* just return the pid */ + + } else { /* child */ + /* wipe all other threads */ + run_queue_hd = tso; + tso->link = END_TSO_QUEUE; + + /* DO NOT TOUCH THE QUEUES directly because most of the code around + us is picky about finding the threat still in its queue when + handling the deleteThread() */ + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + if (t->id != tso->id) { + deleteThread(t); + } + } + } + return pid; +#else /* mingw32 */ + barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); + /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */ + return -1; +#endif /* mingw32 */ +} + +/* --------------------------------------------------------------------------- * deleteAllThreads(): kill all the live threads. * * This is used when we catch a user interrupt (^C), before performing * any necessary cleanups and running finalizers. + * + * Locks: sched_mutex held. * ------------------------------------------------------------------------- */ void deleteAllThreads ( void ) { StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); - for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) { - next = t->link; - deleteThread(t); - } - for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) { - next = t->link; - deleteThread(t); - } - for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) { - next = t->link; + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->global_link; deleteThread(t); - } + } run_queue_hd = run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; @@ -1499,7 +1506,12 @@ void deleteAllThreads ( void ) * ------------------------------------------------------------------------- */ StgInt -suspendThread( StgRegTable *reg ) +suspendThread( StgRegTable *reg, + rtsBool concCall +#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG) + STG_UNUSED +#endif + ) { nat tok; Capability *cap; @@ -1512,19 +1524,23 @@ suspendThread( StgRegTable *reg ) ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); + sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall)); threadPaused(cap->r.rCurrentTSO); cap->r.rCurrentTSO->link = suspended_ccalling_threads; suspended_ccalling_threads = cap->r.rCurrentTSO; +#if defined(RTS_SUPPORTS_THREADS) + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; +#endif + /* Use the thread ID as the token; it should be unique */ tok = cap->r.rCurrentTSO->id; /* Hand back capability */ releaseCapability(cap); -#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) +#if defined(RTS_SUPPORTS_THREADS) /* Preparing to leave the RTS, so ensure there's a native thread/task waiting to take over. @@ -1532,40 +1548,39 @@ suspendThread( StgRegTable *reg ) for one (i.e., if there's only one Concurrent Haskell thread alive, there's no need to create a new task). */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok)); - startTask(taskStart); - + IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok)); + if (concCall) { + startTask(taskStart); + } #endif + /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); - // RELEASE_LOCK(&rts_mutex); return tok; } StgRegTable * -resumeThread( StgInt tok ) +resumeThread( StgInt tok, + rtsBool concCall +#if !defined(RTS_SUPPORTS_THREADS) + STG_UNUSED +#endif + ) { StgTSO *tso, **prev; Capability *cap; -#if defined(THREADED_RTS) - IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok)); - ACQUIRE_LOCK(&sched_mutex); - threads_waiting++; - IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting)); - RELEASE_LOCK(&sched_mutex); - - IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok)); - ACQUIRE_LOCK(&rts_mutex); - threads_waiting--; - taskNotAvailable(); - IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok)); -#endif - -#if defined(THREADED_RTS) - /* Free up any RTS-blocked threads. */ - broadcastCondition(&thread_ready_cond); +#if defined(RTS_SUPPORTS_THREADS) + /* Wait for permission to re-enter the RTS with the result. */ + if ( concCall ) { + ACQUIRE_LOCK(&sched_mutex); + grabReturnCapability(&sched_mutex, &cap); + } else { + grabCapability(&cap); + } +#else + grabCapability(&cap); #endif /* Remove the thread off of the suspended list */ @@ -1582,19 +1597,11 @@ resumeThread( StgInt tok ) barf("resumeThread: thread not found"); } tso->link = END_TSO_QUEUE; - -#if defined(RTS_SUPPORTS_THREADS) - while ( noCapabilities() ) { - IF_DEBUG(scheduler, sched_belch("waiting to resume")); - waitCondition(&thread_ready_cond, &sched_mutex); - IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); - } -#endif - - grabCapability(&cap); + /* Reset blocking status */ + tso->why_blocked = NotBlocked; cap->r.rCurrentTSO = tso; - + RELEASE_LOCK(&sched_mutex); return &cap->r; } @@ -1631,6 +1638,24 @@ int rts_getThreadId(const StgTSO *tso) return tso->id; } +#ifdef DEBUG +void labelThread(StgTSO *tso, char *label) +{ + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = malloc(len); + if (buf == NULL) { + fprintf(stderr,"insufficient memory for labelThread!\n"); + } else + strncpy(buf,label,len); + /* Update will free the old memory for us */ + updateThreadLabel((StgWord)tso,buf); +} +#endif /* DEBUG */ + /* --------------------------------------------------------------------------- Create a new thread. @@ -1648,25 +1673,12 @@ int rts_getThreadId(const StgTSO *tso) #if defined(GRAN) /* currently pri (priority) is only used in a GRAN setup -- HWL */ StgTSO * -createThread(nat stack_size, StgInt pri) -{ - return createThread_(stack_size, rtsFalse, pri); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock, StgInt pri) -{ +createThread(nat size, StgInt pri) #else StgTSO * -createThread(nat stack_size) -{ - return createThread_(stack_size, rtsFalse); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock) -{ +createThread(nat size) #endif +{ StgTSO *tso; nat stack_size; @@ -1709,9 +1721,9 @@ createThread_(nat size, rtsBool have_lock) * protect the increment operation on next_thread_id. * In future, we could use an atomic increment instead. */ - if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); } + ACQUIRE_LOCK(&thread_id_mutex); tso->id = next_thread_id++; - if (!have_lock) { RELEASE_LOCK(&sched_mutex); } + RELEASE_LOCK(&thread_id_mutex); tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; @@ -1848,7 +1860,7 @@ createSparkThread(rtsSpark spark) } else { threadsCreated++; - tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); + tso = createThread(RtsFlags.GcFlags.initialStkSize); if (tso==END_TSO_QUEUE) barf("createSparkThread: Cannot create TSO"); #if defined(DIST) @@ -1889,6 +1901,13 @@ activateSpark (rtsSpark spark) } #endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ); + + /* --------------------------------------------------------------------------- * scheduleThread() * @@ -1899,8 +1918,15 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ +static void scheduleThread_ (StgTSO* tso, rtsBool createTask); + void -scheduleThread(StgTSO *tso) +scheduleThread_(StgTSO *tso + , rtsBool createTask +#if !defined(THREADED_RTS) + STG_UNUSED +#endif + ) { ACQUIRE_LOCK(&sched_mutex); @@ -1910,6 +1936,14 @@ scheduleThread(StgTSO *tso) * soon as we release the scheduler lock below. */ PUSH_ON_RUN_QUEUE(tso); +#if defined(THREADED_RTS) + /* If main() is scheduling a thread, don't bother creating a + * new task. + */ + if ( createTask ) { + startTask(taskStart); + } +#endif THREAD_RUNNABLE(); #if 0 @@ -1918,6 +1952,52 @@ scheduleThread(StgTSO *tso) RELEASE_LOCK(&sched_mutex); } +void scheduleThread(StgTSO* tso) +{ + scheduleThread_(tso, rtsFalse); +} + +SchedulerStatus +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) +{ + StgMainThread *m; + + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&m->wakeup); +#endif + + /* Put the thread on the main-threads list prior to scheduling the TSO. + Failure to do so introduces a race condition in the MT case (as + identified by Wolfgang Thaller), whereby the new task/OS thread + created by scheduleThread_() would complete prior to the thread + that spawned it managed to put 'itself' on the main-threads list. + The upshot of it all being that the worker thread wouldn't get to + signal the completion of the its work item for the main thread to + see (==> it got stuck waiting.) -- sof 6/02. + */ + ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); + + m->link = main_threads; + main_threads = m; + + /* Inefficient (scheduleThread_() acquires it again right away), + * but obviously correct. + */ + RELEASE_LOCK(&sched_mutex); + + scheduleThread_(tso, rtsTrue); +#if defined(THREADED_RTS) + return waitThread_(m, rtsTrue); +#else + return waitThread_(m); +#endif +} + /* --------------------------------------------------------------------------- * initScheduler() * @@ -1977,24 +2057,21 @@ initScheduler(void) * the scheduler. */ initMutex(&sched_mutex); initMutex(&term_mutex); + initMutex(&thread_id_mutex); initCondition(&thread_ready_cond); -#if defined(THREADED_RTS) - initMutex(&rts_mutex); #endif +#if defined(SMP) initCondition(&gc_pending_cond); #endif -#if defined(THREADED_RTS) - /* Grab big lock */ - ACQUIRE_LOCK(&rts_mutex); - IF_DEBUG(scheduler, - sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId())); +#if defined(RTS_SUPPORTS_THREADS) + ACQUIRE_LOCK(&sched_mutex); #endif /* Install the SIGHUP handler */ -#ifdef SMP +#if defined(SMP) { struct sigaction action,oact; @@ -2025,6 +2102,11 @@ initScheduler(void) #if /* defined(SMP) ||*/ defined(PAR) initSparkPools(); #endif + +#if defined(RTS_SUPPORTS_THREADS) + RELEASE_LOCK(&sched_mutex); +#endif + } void @@ -2033,6 +2115,7 @@ exitScheduler( void ) #if defined(RTS_SUPPORTS_THREADS) stopTaskManager(); #endif + shutting_down_scheduler = rtsTrue; } /* ----------------------------------------------------------------------------- @@ -2079,13 +2162,13 @@ finishAllThreads ( void ) { do { while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL ); + waitThread ( run_queue_hd, NULL); } while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL ); + waitThread ( blocked_queue_hd, NULL); } while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL ); + waitThread ( blocked_queue_hd, NULL); } } while (blocked_queue_hd != END_TSO_QUEUE || @@ -2095,14 +2178,10 @@ finishAllThreads ( void ) SchedulerStatus waitThread(StgTSO *tso, /*out*/StgClosure **ret) -{ +{ StgMainThread *m; - SchedulerStatus stat; - ACQUIRE_LOCK(&sched_mutex); - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; m->ret = ret; m->stat = NoStatus; @@ -2110,16 +2189,51 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) initCondition(&m->wakeup); #endif + /* see scheduleWaitThread() comment */ + ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); m->link = main_threads; main_threads = m; + RELEASE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", - m->tso->id)); + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); +#if defined(THREADED_RTS) + return waitThread_(m, rtsFalse); +#else + return waitThread_(m); +#endif +} -#ifdef SMP - do { - waitCondition(&m->wakeup, &sched_mutex); - } while (m->stat == NoStatus); +static +SchedulerStatus +waitThread_(StgMainThread* m +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ) +{ + SchedulerStatus stat; + + IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); + +#if defined(RTS_SUPPORTS_THREADS) + +# if defined(THREADED_RTS) + if (!blockWaiting) { + /* In the threaded case, the OS thread that called main() + * gets to enter the RTS directly without going via another + * task/thread. + */ + schedule(); + ASSERT(m->stat != NoStatus); + } else +# endif + { + ACQUIRE_LOCK(&sched_mutex); + do { + waitCondition(&m->wakeup, &sched_mutex); + } while (m->stat == NoStatus); + } #elif defined(GRAN) /* GranSim specific init */ CurrentTSO = m->tso; // the TSO to run @@ -2143,7 +2257,10 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->tso->id)); free(m); - RELEASE_LOCK(&sched_mutex); +#if defined(THREADED_RTS) + if (blockWaiting) +#endif + RELEASE_LOCK(&sched_mutex); return stat; } @@ -2271,8 +2388,6 @@ take_off_run_queue(StgTSO *tso) { void GetRoots(evac_fn evac) { - StgMainThread *m; - #if defined(GRAN) { nat i; @@ -2311,9 +2426,6 @@ GetRoots(evac_fn evac) } #endif - for (m = main_threads; m != NULL; m = m->link) { - evac((StgClosure **)&m->tso); - } if (suspended_ccalling_threads != END_TSO_QUEUE) { evac((StgClosure **)&suspended_ccalling_threads); } @@ -2341,13 +2453,18 @@ void (*extra_roots)(evac_fn); void performGC(void) { + /* Obligated to hold this lock upon entry */ + ACQUIRE_LOCK(&sched_mutex); GarbageCollect(GetRoots,rtsFalse); + RELEASE_LOCK(&sched_mutex); } void performMajorGC(void) { + ACQUIRE_LOCK(&sched_mutex); GarbageCollect(GetRoots,rtsTrue); + RELEASE_LOCK(&sched_mutex); } static void @@ -2360,8 +2477,10 @@ AllRoots(evac_fn evac) void performGCWithRoots(void (*get_roots)(evac_fn)) { + ACQUIRE_LOCK(&sched_mutex); extra_roots = get_roots; GarbageCollect(AllRoots,rtsFalse); + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -2776,13 +2895,15 @@ interruptStgRts(void) NB: only the type of the blocking queue is different in GranSim and GUM the operations on the queue-elements are the same long live polymorphism! + + Locks: sched_mutex is held upon entry and exit. + */ static void unblockThread(StgTSO *tso) { StgBlockingQueueElement *t, **last; - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { case NotBlocked: @@ -2904,20 +3025,20 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #else static void unblockThread(StgTSO *tso) { StgTSO *t, **last; + + /* To avoid locking unnecessarily. */ + if (tso->why_blocked == NotBlocked) { + return; + } - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { - case NotBlocked: - return; /* not blocked */ - case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -3029,7 +3150,6 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #endif @@ -3063,6 +3183,8 @@ unblockThread(StgTSO *tso) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * + * Locks: sched_mutex held upon entry nor exit. + * * -------------------------------------------------------------------------- */ void @@ -3072,6 +3194,16 @@ deleteThread(StgTSO *tso) } void +raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) +{ + /* When raising async exs from contexts where sched_mutex isn't held; + use raiseAsyncWithLock(). */ + ACQUIRE_LOCK(&sched_mutex); + raiseAsync(tso,exception); + RELEASE_LOCK(&sched_mutex); +} + +void raiseAsync(StgTSO *tso, StgClosure *exception) { StgUpdateFrame* su = tso->su; @@ -3087,6 +3219,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) /* Remove it from any blocking queues */ unblockThread(tso); + IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id)); /* The stack freezing code assumes there's a closure pointer on * the top of the stack. This isn't always the case with compiled * code, so we have to push a dummy closure on the top which just @@ -3102,51 +3235,41 @@ raiseAsync(StgTSO *tso, StgClosure *exception) StgAP_UPD * ap; /* If we find a CATCH_FRAME, and we've got an exception to raise, - * then build PAP(handler,exception,realworld#), and leave it on - * top of the stack ready to enter. + * then build the THUNK raise(exception), and leave it on + * top of the CATCH_FRAME ready to enter. */ if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) { +#ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)su; +#endif + StgClosure *raise; + /* we've got an exception to raise, so let's pass it to the * handler in this frame. */ - ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2); - TICK_ALLOC_UPD_PAP(3,0); - SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs); - - ap->n_args = 2; - ap->fun = cf->handler; /* :: Exception -> IO a */ - ap->payload[0] = exception; - ap->payload[1] = ARG_TAG(0); /* realworld token */ - - /* throw away the stack from Sp up to and including the - * CATCH_FRAME. - */ - sp = (P_)su + sizeofW(StgCatchFrame) - 1; - tso->su = cf->link; - - /* Restore the blocked/unblocked state for asynchronous exceptions - * at the CATCH_FRAME. - * - * If exceptions were unblocked at the catch, arrange that they - * are unblocked again after executing the handler by pushing an - * unblockAsyncExceptions_ret stack frame. + raise = (StgClosure *)allocate(sizeofW(StgClosure)+1); + TICK_ALLOC_SE_THK(1,0); + SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); + raise->payload[0] = exception; + + /* throw away the stack from Sp up to the CATCH_FRAME. */ - if (!cf->exceptions_blocked) { - *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info; - } - - /* Ensure that async exceptions are blocked when running the handler. + sp = (P_)su - 1; + + /* Ensure that async excpetions are blocked now, so we don't get + * a surprise exception before we get around to executing the + * handler. */ if (tso->blocked_exceptions == NULL) { - tso->blocked_exceptions = END_TSO_QUEUE; + tso->blocked_exceptions = END_TSO_QUEUE; } - - /* Put the newly-built PAP on top of the stack, ready to execute + + /* Put the newly-built THUNK on top of the stack, ready to execute * when the thread restarts. */ - sp[0] = (W_)ap; + sp[0] = (W_)raise; tso->sp = sp; + tso->su = su; tso->what_next = ThreadEnterGHC; IF_DEBUG(sanity, checkTSO(tso)); return; @@ -3280,6 +3403,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception) up and sent a signal: BlockedOnDeadMVar if the thread was blocked on an MVar, or NonTermination if the thread was blocked on a Black Hole. + + Locks: sched_mutex isn't held upon entry nor exit. -------------------------------------------------------------------------- */ void @@ -3296,6 +3421,7 @@ resurrectThreads( StgTSO *threads ) switch (tso->why_blocked) { case BlockedOnMVar: case BlockedOnException: + /* Called by GC - sched_mutex lock is currently held. */ raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure); break; case BlockedOnBlackHole: @@ -3320,6 +3446,8 @@ resurrectThreads( StgTSO *threads ) * * This is only done in a deadlock situation in order to avoid * performance overhead in the normal case. + * + * Locks: sched_mutex is held upon entry and exit. * -------------------------------------------------------------------------- */ static void @@ -3418,6 +3546,11 @@ printThreadBlockage(StgTSO *tso) tso->block_info.closure, info_type(tso->block_info.closure)); break; #endif +#if defined(RTS_SUPPORTS_THREADS) + case BlockedOnCCall: + fprintf(stderr,"is blocked on an external call"); + break; +#endif default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", tso->why_blocked, tso->id, tso); @@ -3443,6 +3576,7 @@ void printAllThreads(void) { StgTSO *t; + void *label; # if defined(GRAN) char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; @@ -3461,7 +3595,9 @@ printAllThreads(void) # endif for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - fprintf(stderr, "\tthread %d ", t->id); + fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t); + label = lookupThreadLabel((StgWord)t); + if (label) fprintf(stderr,"[\"%s\"] ",(char *)label); printThreadStatus(t); fprintf(stderr,"\n"); } @@ -3649,6 +3785,7 @@ sched_belch(char *s, ...) #endif vfprintf(stderr, s, ap); fprintf(stderr, "\n"); + va_end(ap); } #endif /* DEBUG */ @@ -3658,7 +3795,6 @@ sched_belch(char *s, ...) //@subsection Index //@index -//* MainRegTable:: @cindex\s-+MainRegTable //* StgMainThread:: @cindex\s-+StgMainThread //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd @@ -3676,5 +3812,4 @@ sched_belch(char *s, ...) //* schedule:: @cindex\s-+schedule //* take_off_run_queue:: @cindex\s-+take_off_run_queue //* term_mutex:: @cindex\s-+term_mutex -//* thread_ready_cond:: @cindex\s-+thread_ready_cond //@end index