X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=17c7e74b598d855f6fb7dc5b0be0fce9c2974aab;hb=f0f6913272d9475a6ac1b2a3cc59105c5756fba3;hp=8a4744309194dda4ef63d162ad3e1eb0f3ed47c2;hpb=a55ebc70b9874176d4892fbfb03935580a9c3837;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 8a47443..17c7e74 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.135 2002/04/01 11:18:19 panne Exp $ + * $Id: Schedule.c,v 1.157 2002/10/22 11:01:19 simonmar Exp $ * * (c) The GHC Team, 1998-2000 * @@ -84,6 +84,7 @@ #include "StgRun.h" #include "StgStartup.h" #include "Hooks.h" +#define COMPILING_SCHEDULER #include "Schedule.h" #include "StgMiscClosures.h" #include "Storage.h" @@ -96,6 +97,7 @@ #include "Stats.h" #include "Itimer.h" #include "Prelude.h" +#include "ThreadLabels.h" #ifdef PROFILING #include "Proftimer.h" #include "ProfHeap.h" @@ -114,6 +116,15 @@ #include "OSThreads.h" #include "Task.h" +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_UNISTD_H +#include +#endif + +#include +#include #include //@node Variables and Data structures, Prototypes, Includes, Main scheduling code @@ -122,7 +133,7 @@ /* Main thread queue. * Locks required: sched_mutex. */ -StgMainThread *main_threads; +StgMainThread *main_threads = NULL; /* Thread queues. * Locks required: sched_mutex. @@ -149,16 +160,18 @@ StgTSO *ccalling_threadss[MAX_PROC]; #else /* !GRAN */ -StgTSO *run_queue_hd, *run_queue_tl; -StgTSO *blocked_queue_hd, *blocked_queue_tl; -StgTSO *sleeping_queue; /* perhaps replace with a hash table? */ +StgTSO *run_queue_hd = NULL; +StgTSO *run_queue_tl = NULL; +StgTSO *blocked_queue_hd = NULL; +StgTSO *blocked_queue_tl = NULL; +StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */ #endif /* Linked list of all threads. * Used for detecting garbage collected threads. */ -StgTSO *all_threads; +StgTSO *all_threads = NULL; /* When a thread performs a safe C call (_ccall_GC, using old * terminology), it gets put on the suspended_ccalling_threads @@ -175,17 +188,17 @@ static StgTSO *threadStackOverflow(StgTSO *tso); /* flag set by signal handler to precipitate a context switch */ //@cindex context_switch -nat context_switch; +nat context_switch = 0; /* if this flag is set as well, give up execution */ //@cindex interrupted -rtsBool interrupted; +rtsBool interrupted = rtsFalse; /* Next thread ID to allocate. - * Locks required: sched_mutex + * Locks required: thread_id_mutex */ //@cindex next_thread_id -StgThreadID next_thread_id = 1; +static StgThreadID next_thread_id = 1; /* * Pointers to the state of the current thread. @@ -216,17 +229,19 @@ StgTSO *CurrentTSO; */ StgTSO dummy_tso; -rtsBool ready_to_gc; +static rtsBool ready_to_gc; + +/* + * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) -- + * in an MT setting, needed to signal that a worker thread shouldn't hang around + * in the scheduler when it is out of work. + */ +static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); static void schedule ( void ); void interruptStgRts ( void ); -#if defined(GRAN) -static StgTSO * createThread_ ( nat size, rtsBool have_lock, StgInt pri ); -#else -static StgTSO * createThread_ ( nat size, rtsBool have_lock ); -#endif static void detectBlackHoles ( void ); @@ -241,6 +256,13 @@ static void sched_belch(char *s, ...); Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; +/* + * A heavyweight solution to the problem of protecting + * the thread_id from concurrent update. + */ +Mutex thread_id_mutex = INIT_MUTEX_VAR; + + # if defined(SMP) static Condition gc_pending_cond = INIT_COND_VAR; nat await_death; @@ -255,21 +277,13 @@ rtsBool emitSchedule = rtsTrue; #endif #if DEBUG -char *whatNext_strs[] = { +static char *whatNext_strs[] = { "ThreadEnterGHC", "ThreadRunGHC", "ThreadEnterInterp", "ThreadKilled", "ThreadComplete" }; - -char *threadReturnCode_strs[] = { - "HeapOverflow", /* might also be StackOverflow */ - "StackOverflow", - "ThreadYielding", - "ThreadBlocked", - "ThreadFinished" -}; #endif #if defined(PAR) @@ -357,12 +371,10 @@ schedule( void ) ACQUIRE_LOCK(&sched_mutex); #if defined(RTS_SUPPORTS_THREADS) - /* Check to see whether there are any worker threads - waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, cap); - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); +#else + /* simply initialise it in the non-threaded case */ + grabCapability(&cap); #endif #if defined(GRAN) @@ -400,6 +412,13 @@ schedule( void ) IF_DEBUG(scheduler, printAllThreads()); +#if defined(RTS_SUPPORTS_THREADS) + /* Check to see whether there are any worker threads + waiting to deposit external call results. If so, + yield our capability */ + yieldToReturningWorker(&sched_mutex, &cap); +#endif + /* If we're interrupted (the user pressed ^C, or some other * termination condition occurred), kill all the currently running * threads. @@ -429,6 +448,9 @@ schedule( void ) *prev = m->link; m->stat = Success; broadcastCondition(&m->wakeup); +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif break; case ThreadKilled: if (m->ret) *(m->ret) = NULL; @@ -439,6 +461,9 @@ schedule( void ) m->stat = Killed; } broadcastCondition(&m->wakeup); +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif break; default: break; @@ -458,6 +483,9 @@ schedule( void ) StgMainThread *m = main_threads; if (m->tso->what_next == ThreadComplete || m->tso->what_next == ThreadKilled) { +#ifdef DEBUG + removeThreadLabel((StgWord)m->tso); +#endif main_threads = main_threads->link; if (m->tso->what_next == ThreadComplete) { /* we finished successfully, fill in the return value */ @@ -522,7 +550,9 @@ schedule( void ) /* check for signals each time around the scheduler */ #ifndef mingw32_TARGET_OS if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); } #endif @@ -587,7 +617,19 @@ schedule( void ) * for signals to arrive rather then bombing out with a * deadlock. */ +#if defined(RTS_SUPPORTS_THREADS) + if ( 0 ) { /* hmm..what to do? Simply stop waiting for + a signal with no runnable threads (or I/O + suspended ones) leads nowhere quick. + For now, simply shut down when we reach this + condition. + + ToDo: define precisely under what conditions + the Scheduler should shut down in an MT setting. + */ +#else if ( anyUserHandlers() ) { +#endif IF_DEBUG(scheduler, sched_belch("still deadlocked, waiting for signals...")); @@ -597,7 +639,9 @@ schedule( void ) if (interrupted) { continue; } if (signals_pending()) { + RELEASE_LOCK(&sched_mutex); startSignalHandlers(); + ACQUIRE_LOCK(&sched_mutex); } ASSERT(!EMPTY_RUN_QUEUE()); goto not_deadlocked; @@ -645,7 +689,9 @@ schedule( void ) /* ToDo: revisit conditions (and mechanism) for shutting down a multi-threaded world */ IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); - shutdownHaskellAndExit(0); + RELEASE_LOCK(&sched_mutex); + shutdownHaskell(); + return; #endif } not_deadlocked: @@ -672,15 +718,17 @@ schedule( void ) if ( EMPTY_RUN_QUEUE() ) { /* Give up our capability */ releaseCapability(cap); + + /* If we're in the process of shutting down (& running the + * a batch of finalisers), don't wait around. + */ + if ( shutting_down_scheduler ) { + RELEASE_LOCK(&sched_mutex); + return; + } IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); waitForWorkCapability(&sched_mutex, &cap, rtsTrue); IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); -#if 0 - while ( EMPTY_RUN_QUEUE() ) { - waitForWorkCapability(&sched_mutex, &cap); - IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); - } -#endif } #endif @@ -966,7 +1014,6 @@ schedule( void ) IF_DEBUG(sanity,checkTSO(t)); #endif - grabCapability(&cap); cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1080,11 +1127,22 @@ schedule( void ) } cap->r.rCurrentNursery->u.back = bd; - // initialise it as a nursery block - bd->step = g0s0; - bd->gen_no = 0; - bd->flags = 0; - bd->free = bd->start; + // initialise it as a nursery block. We initialise the + // step, gen_no, and flags field of *every* sub-block in + // this large block, because this is easier than making + // sure that we always find the block head of a large + // block whenever we call Bdescr() (eg. evacuate() and + // isAlive() in the GC would both have to do this, at + // least). + { + bdescr *x; + for (x = bd; x < bd + blocks; x++) { + x->step = g0s0; + x->gen_no = 0; + x->flags = 0; + x->free = x->start; + } + } // don't forget to update the block count in g0s0. g0s0->n_blocks += blocks; @@ -1314,11 +1372,6 @@ schedule( void ) default: barf("schedule: invalid thread return code %d", (int)ret); } - -#if defined(RTS_SUPPORTS_THREADS) - /* I don't understand what this re-grab is doing -- sof */ - grabCapability(&cap); -#endif #ifdef PROFILING if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) { @@ -1377,10 +1430,84 @@ schedule( void ) } /* --------------------------------------------------------------------------- + * Singleton fork(). Do not copy any running threads. + * ------------------------------------------------------------------------- */ + +StgInt forkProcess(StgTSO* tso) { + +#ifndef mingw32_TARGET_OS + pid_t pid; + StgTSO* t,*next; + StgMainThread *m; + rtsBool doKill; + + IF_DEBUG(scheduler,sched_belch("forking!")); + + pid = fork(); + if (pid) { /* parent */ + + /* just return the pid */ + + } else { /* child */ + /* wipe all other threads */ + run_queue_hd = run_queue_tl = tso; + tso->link = END_TSO_QUEUE; + + /* When clearing out the threads, we need to ensure + that a 'main thread' is left behind; if there isn't, + the Scheduler will shutdown next time it is entered. + + ==> we don't kill a thread that's on the main_threads + list (nor the current thread.) + + [ Attempts at implementing the more ambitious scheme of + killing the main_threads also, and then adding the + current thread onto the main_threads list if it wasn't + there already, failed -- waitThread() (for one) wasn't + up to it. If it proves to be desirable to also kill + the main threads, then this scheme will have to be + revisited (and fully debugged!) + + -- sof 7/2002 + ] + */ + /* DO NOT TOUCH THE QUEUES directly because most of the code around + us is picky about finding the thread still in its queue when + handling the deleteThread() */ + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + /* Don't kill the current thread.. */ + if (t->id == tso->id) continue; + doKill=rtsTrue; + /* ..or a main thread */ + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso->id == t->id) { + doKill=rtsFalse; + break; + } + } + if (doKill) { + deleteThread(t); + } + } + } + return pid; +#else /* mingw32 */ + barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); + /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */ + return -1; +#endif /* mingw32 */ +} + +/* --------------------------------------------------------------------------- * deleteAllThreads(): kill all the live threads. * * This is used when we catch a user interrupt (^C), before performing * any necessary cleanups and running finalizers. + * + * Locks: sched_mutex held. * ------------------------------------------------------------------------- */ void deleteAllThreads ( void ) @@ -1420,7 +1547,7 @@ void deleteAllThreads ( void ) StgInt suspendThread( StgRegTable *reg, rtsBool concCall -#if !defined(RTS_SUPPORTS_THREADS) +#if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG) STG_UNUSED #endif ) @@ -1436,7 +1563,7 @@ suspendThread( StgRegTable *reg, ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); + sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall)); threadPaused(cap->r.rCurrentTSO); cap->r.rCurrentTSO->link = suspended_ccalling_threads; @@ -1486,6 +1613,7 @@ resumeThread( StgInt tok, #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ if ( concCall ) { + ACQUIRE_LOCK(&sched_mutex); grabReturnCapability(&sched_mutex, &cap); } else { grabCapability(&cap); @@ -1511,9 +1639,8 @@ resumeThread( StgInt tok, /* Reset blocking status */ tso->why_blocked = NotBlocked; - RELEASE_LOCK(&sched_mutex); - cap->r.rCurrentTSO = tso; + RELEASE_LOCK(&sched_mutex); return &cap->r; } @@ -1530,10 +1657,11 @@ static void unblockThread(StgTSO *tso); * instances of Eq/Ord for ThreadIds. * ------------------------------------------------------------------------ */ -int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) +int +cmp_thread(StgPtr tso1, StgPtr tso2) { - StgThreadID id1 = tso1->id; - StgThreadID id2 = tso2->id; + StgThreadID id1 = ((StgTSO *)tso1)->id; + StgThreadID id2 = ((StgTSO *)tso2)->id; if (id1 < id2) return (-1); if (id1 > id2) return 1; @@ -1545,10 +1673,30 @@ int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) * * This is used in the implementation of Show for ThreadIds. * ------------------------------------------------------------------------ */ -int rts_getThreadId(const StgTSO *tso) +int +rts_getThreadId(StgPtr tso) +{ + return ((StgTSO *)tso)->id; +} + +#ifdef DEBUG +void +labelThread(StgPtr tso, char *label) { - return tso->id; + int len; + void *buf; + + /* Caveat: Once set, you can only set the thread name to "" */ + len = strlen(label)+1; + buf = malloc(len); + if (buf == NULL) { + fprintf(stderr,"insufficient memory for labelThread!\n"); + } else + strncpy(buf,label,len); + /* Update will free the old memory for us */ + updateThreadLabel((StgWord)tso,buf); } +#endif /* DEBUG */ /* --------------------------------------------------------------------------- Create a new thread. @@ -1567,25 +1715,12 @@ int rts_getThreadId(const StgTSO *tso) #if defined(GRAN) /* currently pri (priority) is only used in a GRAN setup -- HWL */ StgTSO * -createThread(nat stack_size, StgInt pri) -{ - return createThread_(stack_size, rtsFalse, pri); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock, StgInt pri) -{ +createThread(nat size, StgInt pri) #else StgTSO * -createThread(nat stack_size) -{ - return createThread_(stack_size, rtsFalse); -} - -static StgTSO * -createThread_(nat size, rtsBool have_lock) -{ +createThread(nat size) #endif +{ StgTSO *tso; nat stack_size; @@ -1628,9 +1763,9 @@ createThread_(nat size, rtsBool have_lock) * protect the increment operation on next_thread_id. * In future, we could use an atomic increment instead. */ - if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); } + ACQUIRE_LOCK(&thread_id_mutex); tso->id = next_thread_id++; - if (!have_lock) { RELEASE_LOCK(&sched_mutex); } + RELEASE_LOCK(&thread_id_mutex); tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; @@ -1767,7 +1902,7 @@ createSparkThread(rtsSpark spark) } else { threadsCreated++; - tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); + tso = createThread(RtsFlags.GcFlags.initialStkSize); if (tso==END_TSO_QUEUE) barf("createSparkThread: Cannot create TSO"); #if defined(DIST) @@ -1808,6 +1943,13 @@ activateSpark (rtsSpark spark) } #endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ); + + /* --------------------------------------------------------------------------- * scheduleThread() * @@ -1854,12 +1996,48 @@ scheduleThread_(StgTSO *tso void scheduleThread(StgTSO* tso) { - return scheduleThread_(tso, rtsFalse); + scheduleThread_(tso, rtsFalse); } -void scheduleExtThread(StgTSO* tso) +SchedulerStatus +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) { - return scheduleThread_(tso, rtsTrue); + StgMainThread *m; + + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; +#if defined(RTS_SUPPORTS_THREADS) + initCondition(&m->wakeup); +#endif + + /* Put the thread on the main-threads list prior to scheduling the TSO. + Failure to do so introduces a race condition in the MT case (as + identified by Wolfgang Thaller), whereby the new task/OS thread + created by scheduleThread_() would complete prior to the thread + that spawned it managed to put 'itself' on the main-threads list. + The upshot of it all being that the worker thread wouldn't get to + signal the completion of the its work item for the main thread to + see (==> it got stuck waiting.) -- sof 6/02. + */ + ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); + + m->link = main_threads; + main_threads = m; + + /* Inefficient (scheduleThread_() acquires it again right away), + * but obviously correct. + */ + RELEASE_LOCK(&sched_mutex); + + scheduleThread_(tso, rtsTrue); +#if defined(THREADED_RTS) + return waitThread_(m, rtsTrue); +#else + return waitThread_(m); +#endif } /* --------------------------------------------------------------------------- @@ -1921,6 +2099,7 @@ initScheduler(void) * the scheduler. */ initMutex(&sched_mutex); initMutex(&term_mutex); + initMutex(&thread_id_mutex); initCondition(&thread_ready_cond); #endif @@ -1978,6 +2157,7 @@ exitScheduler( void ) #if defined(RTS_SUPPORTS_THREADS) stopTaskManager(); #endif + shutting_down_scheduler = rtsTrue; } /* ----------------------------------------------------------------------------- @@ -2041,28 +2221,9 @@ finishAllThreads ( void ) SchedulerStatus waitThread(StgTSO *tso, /*out*/StgClosure **ret) { -#if defined(THREADED_RTS) - return waitThread_(tso,ret, rtsFalse); -#else - return waitThread_(tso,ret); -#endif -} - -SchedulerStatus -waitThread_(StgTSO *tso, - /*out*/StgClosure **ret -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif - ) -{ StgMainThread *m; - SchedulerStatus stat; - ACQUIRE_LOCK(&sched_mutex); - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; m->ret = ret; m->stat = NoStatus; @@ -2070,8 +2231,30 @@ waitThread_(StgTSO *tso, initCondition(&m->wakeup); #endif + /* see scheduleWaitThread() comment */ + ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); m->link = main_threads; main_threads = m; + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id)); +#if defined(THREADED_RTS) + return waitThread_(m, rtsFalse); +#else + return waitThread_(m); +#endif +} + +static +SchedulerStatus +waitThread_(StgMainThread* m +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ) +{ + SchedulerStatus stat; IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); @@ -2083,13 +2266,12 @@ waitThread_(StgTSO *tso, * gets to enter the RTS directly without going via another * task/thread. */ - RELEASE_LOCK(&sched_mutex); schedule(); ASSERT(m->stat != NoStatus); } else # endif { - IF_DEBUG(scheduler, sched_belch("sfoo")); + ACQUIRE_LOCK(&sched_mutex); do { waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); @@ -2293,6 +2475,30 @@ GetRoots(evac_fn evac) #if defined(PAR) || defined(GRAN) markSparkQueue(evac); #endif + +#ifndef mingw32_TARGET_OS + // mark the signal handlers (signals should be already blocked) + markSignalHandlers(evac); +#endif + + // main threads which have completed need to be retained until they + // are dealt with in the main scheduler loop. They won't be + // retained any other way: the GC will drop them from the + // all_threads list, so we have to be careful to treat them as roots + // here. + { + StgMainThread *m; + for (m = main_threads; m != NULL; m = m->link) { + switch (m->tso->what_next) { + case ThreadComplete: + case ThreadKilled: + evac((StgClosure **)&m->tso); + break; + default: + break; + } + } + } } /* ----------------------------------------------------------------------------- @@ -2308,7 +2514,7 @@ GetRoots(evac_fn evac) This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ -void (*extra_roots)(evac_fn); +static void (*extra_roots)(evac_fn); void performGC(void) @@ -2755,13 +2961,15 @@ interruptStgRts(void) NB: only the type of the blocking queue is different in GranSim and GUM the operations on the queue-elements are the same long live polymorphism! + + Locks: sched_mutex is held upon entry and exit. + */ static void unblockThread(StgTSO *tso) { StgBlockingQueueElement *t, **last; - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { case NotBlocked: @@ -2883,20 +3091,20 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #else static void unblockThread(StgTSO *tso) { StgTSO *t, **last; + + /* To avoid locking unnecessarily. */ + if (tso->why_blocked == NotBlocked) { + return; + } - ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { - case NotBlocked: - return; /* not blocked */ - case BlockedOnMVar: ASSERT(get_itbl(tso->block_info.closure)->type == MVAR); { @@ -3008,7 +3216,6 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); } #endif @@ -3042,6 +3249,8 @@ unblockThread(StgTSO *tso) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * + * Locks: sched_mutex held upon entry nor exit. + * * -------------------------------------------------------------------------- */ void @@ -3051,6 +3260,16 @@ deleteThread(StgTSO *tso) } void +raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) +{ + /* When raising async exs from contexts where sched_mutex isn't held; + use raiseAsyncWithLock(). */ + ACQUIRE_LOCK(&sched_mutex); + raiseAsync(tso,exception); + RELEASE_LOCK(&sched_mutex); +} + +void raiseAsync(StgTSO *tso, StgClosure *exception) { StgUpdateFrame* su = tso->su; @@ -3080,6 +3299,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception) nat i; StgAP_UPD * ap; + ASSERT((P_)su > (P_)sp); + /* If we find a CATCH_FRAME, and we've got an exception to raise, * then build the THUNK raise(exception), and leave it on * top of the CATCH_FRAME ready to enter. @@ -3127,8 +3348,6 @@ raiseAsync(StgTSO *tso, StgClosure *exception) */ ap = (StgAP_UPD *)allocate(AP_sizeW(words)); - ASSERT(words >= 0); - ap->n_args = words; ap->fun = (StgClosure *)sp[0]; sp++; @@ -3249,6 +3468,8 @@ raiseAsync(StgTSO *tso, StgClosure *exception) up and sent a signal: BlockedOnDeadMVar if the thread was blocked on an MVar, or NonTermination if the thread was blocked on a Black Hole. + + Locks: sched_mutex isn't held upon entry nor exit. -------------------------------------------------------------------------- */ void @@ -3265,6 +3486,7 @@ resurrectThreads( StgTSO *threads ) switch (tso->why_blocked) { case BlockedOnMVar: case BlockedOnException: + /* Called by GC - sched_mutex lock is currently held. */ raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure); break; case BlockedOnBlackHole: @@ -3289,6 +3511,8 @@ resurrectThreads( StgTSO *threads ) * * This is only done in a deadlock situation in order to avoid * performance overhead in the normal case. + * + * Locks: sched_mutex is held upon entry and exit. * -------------------------------------------------------------------------- */ static void @@ -3346,11 +3570,12 @@ detectBlackHoles( void ) //@subsection Debugging Routines /* ----------------------------------------------------------------------------- - Debugging: why is a thread blocked + * Debugging: why is a thread blocked + * [Also provides useful information when debugging threaded programs + * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02] -------------------------------------------------------------------------- */ -#ifdef DEBUG - +static void printThreadBlockage(StgTSO *tso) { @@ -3398,6 +3623,7 @@ printThreadBlockage(StgTSO *tso) } } +static void printThreadStatus(StgTSO *tso) { @@ -3417,30 +3643,35 @@ void printAllThreads(void) { StgTSO *t; + void *label; # if defined(GRAN) char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; ullong_format_string(TIME_ON_PROC(CurrentProc), time_string, rtsFalse/*no commas!*/); - sched_belch("all threads at [%s]:", time_string); + fprintf(stderr, "all threads at [%s]:\n", time_string); # elif defined(PAR) char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; ullong_format_string(CURRENT_TIME, time_string, rtsFalse/*no commas!*/); - sched_belch("all threads at [%s]:", time_string); + fprintf(stderr,"all threads at [%s]:\n", time_string); # else - sched_belch("all threads:"); + fprintf(stderr,"all threads:\n"); # endif for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - fprintf(stderr, "\tthread %d ", t->id); + fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t); + label = lookupThreadLabel((StgWord)t); + if (label) fprintf(stderr,"[\"%s\"] ",(char *)label); printThreadStatus(t); fprintf(stderr,"\n"); } } +#ifdef DEBUG + /* Print a whole blocking queue attached to node (debugging only). */ @@ -3623,6 +3854,7 @@ sched_belch(char *s, ...) #endif vfprintf(stderr, s, ap); fprintf(stderr, "\n"); + va_end(ap); } #endif /* DEBUG */