X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=72229d43dd33dade33beced38bf340fe43e0fa72;hb=85126d5203ec5344f6a5b2e77da23d34444e48c6;hp=1a3843a50874c43a5445e9339675594bd8b1ab3d;hpb=6e2ea06c4a72866396f1b754ec8c2091a9b1e20b;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 1a3843a..72229d4 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.131 2002/02/18 13:26:13 sof Exp $ + * $Id: Schedule.c,v 1.139 2002/04/23 09:56:28 stolz Exp $ * * (c) The GHC Team, 1998-2000 * @@ -114,40 +114,22 @@ #include "OSThreads.h" #include "Task.h" +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif + #include //@node Variables and Data structures, Prototypes, Includes, Main scheduling code //@subsection Variables and Data structures -/* Main threads: - * - * These are the threads which clients have requested that we run. - * - * In a 'threaded' build, we might have several concurrent clients all - * waiting for results, and each one will wait on a condition variable - * until the result is available. - * - * In non-SMP, clients are strictly nested: the first client calls - * into the RTS, which might call out again to C with a _ccall_GC, and - * eventually re-enter the RTS. - * - * Main threads information is kept in a linked list: - */ -//@cindex StgMainThread -typedef struct StgMainThread_ { - StgTSO * tso; - SchedulerStatus stat; - StgClosure ** ret; -#if defined(RTS_SUPPORTS_THREADS) - Condition wakeup; -#endif - struct StgMainThread_ *link; -} StgMainThread; - /* Main thread queue. * Locks required: sched_mutex. */ -static StgMainThread *main_threads; +StgMainThread *main_threads; /* Thread queues. * Locks required: sched_mutex. @@ -382,12 +364,10 @@ schedule( void ) ACQUIRE_LOCK(&sched_mutex); #if defined(RTS_SUPPORTS_THREADS) - /* Check to see whether there are any worker threads - waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, cap); - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); +#else + /* simply initialise it in the non-threaded case */ + grabCapability(&cap); #endif #if defined(GRAN) @@ -425,6 +405,13 @@ schedule( void ) IF_DEBUG(scheduler, printAllThreads()); +#if defined(RTS_SUPPORTS_THREADS) + /* Check to see whether there are any worker threads + waiting to deposit external call results. If so, + yield our capability */ + yieldToReturningWorker(&sched_mutex, &cap); +#endif + /* If we're interrupted (the user pressed ^C, or some other * termination condition occurred), kill all the currently running * threads. @@ -454,6 +441,10 @@ schedule( void ) *prev = m->link; m->stat = Success; broadcastCondition(&m->wakeup); +#ifdef DEBUG + free(m->tso->label); + m->tso->label = NULL; +#endif break; case ThreadKilled: if (m->ret) *(m->ret) = NULL; @@ -464,6 +455,10 @@ schedule( void ) m->stat = Killed; } broadcastCondition(&m->wakeup); +#ifdef DEBUG + free(m->tso->label); + m->tso->label = NULL; +#endif break; default: break; @@ -483,6 +478,10 @@ schedule( void ) StgMainThread *m = main_threads; if (m->tso->what_next == ThreadComplete || m->tso->what_next == ThreadKilled) { +#ifdef DEBUG + free(m->tso->label); + m->tso->label = NULL; +#endif main_threads = main_threads->link; if (m->tso->what_next == ThreadComplete) { /* we finished successfully, fill in the return value */ @@ -547,7 +546,9 @@ schedule( void ) /* check for signals each time around the scheduler */ #ifndef mingw32_TARGET_OS if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); } #endif @@ -579,9 +580,7 @@ schedule( void ) * inform all the main threads. */ #ifndef PAR - if ( EMPTY_RUN_QUEUE() - && EMPTY_QUEUE(blocked_queue_hd) - && EMPTY_QUEUE(sleeping_queue) + if ( EMPTY_THREAD_QUEUES() #if defined(RTS_SUPPORTS_THREADS) && EMPTY_QUEUE(suspended_ccalling_threads) #endif @@ -595,37 +594,65 @@ schedule( void ) /* and SMP mode ..? */ releaseCapability(cap); #endif + // Garbage collection can release some new threads due to + // either (a) finalizers or (b) threads resurrected because + // they are about to be send BlockedOnDeadMVar. Any threads + // thus released will be immediately runnable. GarbageCollect(GetRoots,rtsTrue); - if ( EMPTY_QUEUE(blocked_queue_hd) - && EMPTY_RUN_QUEUE() - && EMPTY_QUEUE(sleeping_queue) ) { - - IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes...")); - detectBlackHoles(); - - /* No black holes, so probably a real deadlock. Send the - * current main thread the Deadlock exception (or in the SMP - * build, send *all* main threads the deadlock exception, - * since none of them can make progress). - */ - if ( EMPTY_RUN_QUEUE() ) { - StgMainThread *m; + + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } + + IF_DEBUG(scheduler, + sched_belch("still deadlocked, checking for black holes...")); + detectBlackHoles(); + + if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; } + +#ifndef mingw32_TARGET_OS + /* If we have user-installed signal handlers, then wait + * for signals to arrive rather then bombing out with a + * deadlock. + */ #if defined(RTS_SUPPORTS_THREADS) - for (m = main_threads; m != NULL; m = m->link) { - switch (m->tso->why_blocked) { - case BlockedOnBlackHole: - raiseAsync(m->tso, (StgClosure *)NonTermination_closure); - break; - case BlockedOnException: - case BlockedOnMVar: - raiseAsync(m->tso, (StgClosure *)Deadlock_closure); - break; - default: - barf("deadlock: main thread blocked in a strange way"); - } - } + if ( 0 ) { /* hmm..what to do? Simply stop waiting for + a signal with no runnable threads (or I/O + suspended ones) leads nowhere quick. + For now, simply shut down when we reach this + condition. + + ToDo: define precisely under what conditions + the Scheduler should shut down in an MT setting. + */ #else - m = main_threads; + if ( anyUserHandlers() ) { +#endif + IF_DEBUG(scheduler, + sched_belch("still deadlocked, waiting for signals...")); + + awaitUserSignals(); + + // we might be interrupted... + if (interrupted) { continue; } + + if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); + startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); + } + ASSERT(!EMPTY_RUN_QUEUE()); + goto not_deadlocked; + } +#endif + + /* Probably a real deadlock. Send the current main thread the + * Deadlock exception (or in the SMP build, send *all* main + * threads the deadlock exception, since none of them can make + * progress). + */ + { + StgMainThread *m; +#if defined(RTS_SUPPORTS_THREADS) + for (m = main_threads; m != NULL; m = m->link) { switch (m->tso->why_blocked) { case BlockedOnBlackHole: raiseAsync(m->tso, (StgClosure *)NonTermination_closure); @@ -637,19 +664,32 @@ schedule( void ) default: barf("deadlock: main thread blocked in a strange way"); } -#endif } -#if defined(RTS_SUPPORTS_THREADS) - /* ToDo: revisit conditions (and mechanism) for shutting - down a multi-threaded world */ - if ( EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); - shutdownHaskellAndExit(0); +#else + m = main_threads; + switch (m->tso->why_blocked) { + case BlockedOnBlackHole: + raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + break; + case BlockedOnException: + case BlockedOnMVar: + raiseAsync(m->tso, (StgClosure *)Deadlock_closure); + break; + default: + barf("deadlock: main thread blocked in a strange way"); } #endif - ASSERT( !EMPTY_RUN_QUEUE() ); } + +#if defined(RTS_SUPPORTS_THREADS) + /* ToDo: revisit conditions (and mechanism) for shutting + down a multi-threaded world */ + IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); + shutdownHaskellAndExit(0); +#endif } + not_deadlocked: + #elif defined(PAR) /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ #endif @@ -966,7 +1006,6 @@ schedule( void ) IF_DEBUG(sanity,checkTSO(t)); #endif - grabCapability(&cap); cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1314,11 +1353,6 @@ schedule( void ) default: barf("schedule: invalid thread return code %d", (int)ret); } - -#if defined(RTS_SUPPORTS_THREADS) - /* I don't understand what this re-grab is doing -- sof */ - grabCapability(&cap); -#endif #ifdef PROFILING if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) { @@ -1377,28 +1411,62 @@ schedule( void ) } /* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ + +StgInt forkProcess(StgTSO* tso) { + +#ifndef mingw32_TARGET_OS + pid_t pid; + StgTSO* t,*next; + + IF_DEBUG(scheduler,sched_belch("forking!")); + + pid = fork(); + if (pid) { /* parent */ + + /* just return the pid */ + + } else { /* child */ + /* wipe all other threads */ + run_queue_hd = tso; + tso->link = END_TSO_QUEUE; + + /* DO NOT TOUCH THE QUEUES directly because most of the code around + us is picky about finding the threat still in its queue when + handling the deleteThread() */ + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + if (t->id != tso->id) { + deleteThread(t); + } + } + } + return pid; +#else /* mingw32 */ + barf("forkProcess#: primop not implemented for mingw32, sorry!"); + return -1; +#endif /* mingw32 */ +} + +/* --------------------------------------------------------------------------- * deleteAllThreads(): kill all the live threads. * * This is used when we catch a user interrupt (^C), before performing * any necessary cleanups and running finalizers. + * + * Locks: sched_mutex held. * ------------------------------------------------------------------------- */ void deleteAllThreads ( void ) { StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); - for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) { - next = t->link; - deleteThread(t); - } - for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) { - next = t->link; - deleteThread(t); - } - for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) { - next = t->link; + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->global_link; deleteThread(t); - } + } run_queue_hd = run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; @@ -1426,7 +1494,12 @@ void deleteAllThreads ( void ) * ------------------------------------------------------------------------- */ StgInt -suspendThread( StgRegTable *reg, rtsBool concCall ) +suspendThread( StgRegTable *reg, + rtsBool concCall +#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG) + STG_UNUSED +#endif + ) { nat tok; Capability *cap; @@ -1439,7 +1512,7 @@ suspendThread( StgRegTable *reg, rtsBool concCall ) ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); + sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall)); threadPaused(cap->r.rCurrentTSO); cap->r.rCurrentTSO->link = suspended_ccalling_threads; @@ -1476,7 +1549,12 @@ suspendThread( StgRegTable *reg, rtsBool concCall ) } StgRegTable * -resumeThread( StgInt tok, rtsBool concCall ) +resumeThread( StgInt tok, + rtsBool concCall +#if !defined(RTS_SUPPORTS_THREADS) + STG_UNUSED +#endif + ) { StgTSO *tso, **prev; Capability *cap; @@ -1484,6 +1562,7 @@ resumeThread( StgInt tok, rtsBool concCall ) #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ if ( concCall ) { + ACQUIRE_LOCK(&sched_mutex); grabReturnCapability(&sched_mutex, &cap); } else { grabCapability(&cap); @@ -1509,9 +1588,8 @@ resumeThread( StgInt tok, rtsBool concCall ) /* Reset blocking status */ tso->why_blocked = NotBlocked; - RELEASE_LOCK(&sched_mutex); - cap->r.rCurrentTSO = tso; + RELEASE_LOCK(&sched_mutex); return &cap->r; } @@ -1548,6 +1626,25 @@ int rts_getThreadId(const StgTSO *tso) return tso->id; } +#ifdef DEBUG +void labelThread(StgTSO *tso, char *label) +{ + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = realloc(tso->label,len); + if (buf == NULL) { + fprintf(stderr,"insufficient memory for labelThread!\n"); + free(tso->label); + tso->label = NULL; + } else + strncpy(buf,label,len); + tso->label = buf; +} +#endif /* DEBUG */ + /* --------------------------------------------------------------------------- Create a new thread. @@ -1622,13 +1719,21 @@ createThread_(nat size, rtsBool have_lock) #endif tso->what_next = ThreadEnterGHC; +#ifdef DEBUG + tso->label = NULL; +#endif + /* tso->id needs to be unique. For now we use a heavyweight mutex to * protect the increment operation on next_thread_id. * In future, we could use an atomic increment instead. */ +#ifdef SMP if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); } +#endif tso->id = next_thread_id++; +#ifdef SMP if (!have_lock) { RELEASE_LOCK(&sched_mutex); } +#endif tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; @@ -2039,6 +2144,7 @@ finishAllThreads ( void ) SchedulerStatus waitThread(StgTSO *tso, /*out*/StgClosure **ret) { + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); #if defined(THREADED_RTS) return waitThread_(tso,ret, rtsFalse); #else @@ -2058,6 +2164,7 @@ waitThread_(StgTSO *tso, SchedulerStatus stat; ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); @@ -2246,8 +2353,6 @@ take_off_run_queue(StgTSO *tso) { void GetRoots(evac_fn evac) { - StgMainThread *m; - #if defined(GRAN) { nat i; @@ -2286,9 +2391,6 @@ GetRoots(evac_fn evac) } #endif - for (m = main_threads; m != NULL; m = m->link) { - evac((StgClosure **)&m->tso); - } if (suspended_ccalling_threads != END_TSO_QUEUE) { evac((StgClosure **)&suspended_ccalling_threads); } @@ -2758,13 +2860,15 @@ interruptStgRts(void) NB: only the type of the blocking queue is different in GranSim and GUM the operations on the queue-elements are the same long live polymorphism! + + Locks: sched_mutex is held upon entry and exit. + */ static void unblockThread(StgTSO *tso) { StgBlockingQueueElement *t, **last; - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { case NotBlocked: @@ -2886,20 +2990,20 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #else static void unblockThread(StgTSO *tso) { StgTSO *t, **last; + + /* To avoid locking unnecessarily. */ + if (tso->why_blocked == NotBlocked) { + return; + } - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { - case NotBlocked: - return; /* not blocked */ - case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -3011,7 +3115,6 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #endif @@ -3045,6 +3148,8 @@ unblockThread(StgTSO *tso) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * + * Locks: sched_mutex held upon entry nor exit. + * * -------------------------------------------------------------------------- */ void @@ -3054,6 +3159,16 @@ deleteThread(StgTSO *tso) } void +raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) +{ + /* When raising async exs from contexts where sched_mutex isn't held; + use raiseAsyncWithLock(). */ + ACQUIRE_LOCK(&sched_mutex); + raiseAsync(tso,exception); + RELEASE_LOCK(&sched_mutex); +} + +void raiseAsync(StgTSO *tso, StgClosure *exception) { StgUpdateFrame* su = tso->su; @@ -3069,6 +3184,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) /* Remove it from any blocking queues */ unblockThread(tso); + IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id)); /* The stack freezing code assumes there's a closure pointer on * the top of the stack. This isn't always the case with compiled * code, so we have to push a dummy closure on the top which just @@ -3088,7 +3204,9 @@ raiseAsync(StgTSO *tso, StgClosure *exception) * top of the CATCH_FRAME ready to enter. */ if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) { +#ifdef PROFILING StgCatchFrame *cf = (StgCatchFrame *)su; +#endif StgClosure *raise; /* we've got an exception to raise, so let's pass it to the @@ -3250,6 +3368,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception) up and sent a signal: BlockedOnDeadMVar if the thread was blocked on an MVar, or NonTermination if the thread was blocked on a Black Hole. + + Locks: sched_mutex isn't held upon entry nor exit. -------------------------------------------------------------------------- */ void @@ -3266,6 +3386,7 @@ resurrectThreads( StgTSO *threads ) switch (tso->why_blocked) { case BlockedOnMVar: case BlockedOnException: + /* Called by GC - sched_mutex lock is currently held. */ raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure); break; case BlockedOnBlackHole: @@ -3290,6 +3411,8 @@ resurrectThreads( StgTSO *threads ) * * This is only done in a deadlock situation in order to avoid * performance overhead in the normal case. + * + * Locks: sched_mutex is held upon entry and exit. * -------------------------------------------------------------------------- */ static void @@ -3437,6 +3560,7 @@ printAllThreads(void) for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { fprintf(stderr, "\tthread %d ", t->id); + if (t->label) fprintf(stderr,"[\"%s\"] ",t->label); printThreadStatus(t); fprintf(stderr,"\n"); }