X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=67984ebb183f5605a231fba03f50e7b102ed12de;hb=a9190910d12a90557d8d3d5fe7423cef77883bcd;hp=c58584f6939691a91aa889903e8af8f57564e8b6;hpb=3072ee8dfd68c5a89d4c756ba0dabbc84670a8e4;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index c58584f..67984eb 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $ + * $Id: Schedule.c,v 1.178 2003/10/01 21:16:12 wolfgang Exp $ * * (c) The GHC Team, 1998-2000 * @@ -126,6 +126,22 @@ #include #include +#ifdef HAVE_ERRNO_H +#include +#endif + +#ifdef THREADED_RTS +#define USED_IN_THREADED_RTS +#else +#define USED_IN_THREADED_RTS STG_UNUSED +#endif + +#ifdef RTS_SUPPORTS_THREADS +#define USED_WHEN_RTS_SUPPORTS_THREADS +#else +#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED +#endif + //@node Variables and Data structures, Prototypes, Includes, Main scheduling code //@subsection Variables and Data structures @@ -134,15 +150,6 @@ */ StgMainThread *main_threads = NULL; -#ifdef THREADED_RTS -// Pointer to the thread that executes main -// When this thread is finished, the program terminates -// by calling shutdownHaskellAndExit. -// It would be better to add a call to shutdownHaskellAndExit -// to the Main.main wrapper and to remove this hack. -StgMainThread *main_main_thread = NULL; -#endif - /* Thread queues. * Locks required: sched_mutex. */ @@ -249,7 +256,7 @@ static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); -static void schedule ( void ); +static void schedule ( StgMainThread *mainThread, Capability *initialCapability ); void interruptStgRts ( void ); static void detectBlackHoles ( void ); @@ -306,20 +313,38 @@ StgTSO * activateSpark (rtsSpark spark); StgTSO *MainTSO; */ -#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_SUPPORTS_THREADS) +static rtsBool startingWorkerThread = rtsFalse; + static void taskStart(void); static void taskStart(void) { - schedule(); + Capability *cap; + + ACQUIRE_LOCK(&sched_mutex); + startingWorkerThread = rtsFalse; + waitForWorkCapability(&sched_mutex, &cap, NULL); + RELEASE_LOCK(&sched_mutex); + + schedule(NULL,cap); } -#endif -#if defined(RTS_SUPPORTS_THREADS) void -startSchedulerTask(void) +startSchedulerTaskIfNecessary(void) { - startTask(taskStart); + if(run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE) + { + if(!startingWorkerThread) + { // we don't want to start another worker thread + // just because the last one hasn't yet reached the + // "waiting for capability" state + startingWorkerThread = rtsTrue; + startTask(taskStart); + } + } } #endif @@ -363,10 +388,11 @@ startSchedulerTask(void) ------------------------------------------------------------------------ */ //@cindex schedule static void -schedule( void ) +schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, + Capability *initialCapability ) { StgTSO *t; - Capability *cap; + Capability *cap = initialCapability; StgThreadReturnCode ret; #if defined(GRAN) rtsEvent *event; @@ -386,8 +412,16 @@ schedule( void ) ACQUIRE_LOCK(&sched_mutex); #if defined(RTS_SUPPORTS_THREADS) - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); - IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId())); + /* in the threaded case, the capability is either passed in via the initialCapability + parameter, or initialized inside the scheduler loop */ + + IF_DEBUG(scheduler, + fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n", + osThreadId(), osThreadId())); + IF_DEBUG(scheduler, + fprintf(stderr,"### main thread: %p\n",mainThread)); + IF_DEBUG(scheduler, + fprintf(stderr,"### initial cap: %p\n",initialCapability)); #else /* simply initialise it in the non-threaded case */ grabCapability(&cap); @@ -431,8 +465,19 @@ schedule( void ) #if defined(RTS_SUPPORTS_THREADS) /* Check to see whether there are any worker threads waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, &cap); + yield our capability... if we have a capability, that is. */ + if(cap) + yieldToReturningWorker(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + + /* If we do not currently hold a capability, we wait for one */ + if(!cap) + { + waitForWorkCapability(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap", + osThreadId())); + } #endif /* If we're interrupted (the user pressed ^C, or some other @@ -448,7 +493,6 @@ schedule( void ) // so just exit right away. prog_belch("interrupted"); releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit RELEASE_LOCK(&sched_mutex); shutdownHaskellAndExit(EXIT_SUCCESS); #else @@ -463,55 +507,63 @@ schedule( void ) */ #if defined(RTS_SUPPORTS_THREADS) { - StgMainThread *m, **prev; - prev = &main_threads; - for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { - switch (m->tso->what_next) { - case ThreadComplete: - if (m->ret) { - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(m->ret) = (StgClosure *)m->tso->sp[1]; - } - *prev = m->link; - m->stat = Success; - broadcastCondition(&m->wakeup); -#ifdef DEBUG - removeThreadLabel((StgWord)m->tso); -#endif - if(m == main_main_thread) - { - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); - } - break; - case ThreadKilled: - if (m->ret) *(m->ret) = NULL; - *prev = m->link; - if (was_interrupted) { - m->stat = Interrupted; - } else { - m->stat = Killed; - } - broadcastCondition(&m->wakeup); + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { + if (m->tso->what_next == ThreadComplete + || m->tso->what_next == ThreadKilled) + { + if(m == mainThread) + { + if(m->tso->what_next == ThreadComplete) + { + if (m->ret) + { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(m->ret) = (StgClosure *)m->tso->sp[1]; + } + m->stat = Success; + } + else + { + if (m->ret) + { + *(m->ret) = NULL; + } + if (was_interrupted) + { + m->stat = Interrupted; + } + else + { + m->stat = Killed; + } + } + *prev = m->link; + #ifdef DEBUG - removeThreadLabel((StgWord)m->tso); + removeThreadLabel((StgWord)m->tso); #endif - if(m == main_main_thread) - { releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); + return; + } + else + { + // The current OS thread can not handle the fact that the Haskell + // thread "m" has ended. + // "m" is bound; the scheduler loop in it's bound OS thread has + // to return, so let's pass our capability directly to that thread. + passCapability(&sched_mutex, cap, &m->bound_thread_cond); + cap = NULL; + } } - break; - default: - break; } - } } - + + if(!cap) // If we gave our capability away, + continue; // go to the top to get it back + #else /* not threaded */ # if defined(PAR) @@ -1067,6 +1119,63 @@ schedule( void ) IF_DEBUG(sanity,checkTSO(t)); #endif +#ifdef THREADED_RTS + { + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == t) + break; + } + + if(m) + { + if(m == mainThread) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in bound OS thread %u\n", + t, osThreadId())); + // yes, the Haskell thread is bound to the current native thread + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p bound to other OS thread than %u\n", + t, osThreadId())); + // no, bound to a different Haskell thread: pass to that thread + PUSH_ON_RUN_QUEUE(t); + passCapability(&sched_mutex,cap,&m->bound_thread_cond); + cap = NULL; + continue; + } + } + else + { + // The thread we want to run is not bound. + if(mainThread == NULL) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in worker OS thread %u\n", + t, osThreadId())); + // if we are a worker thread, + // we may run it here + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n", + t, mainThread, osThreadId())); + // no, the current native thread is bound to a different + // Haskell thread, so pass it to any worker thread + PUSH_ON_RUN_QUEUE(t); + passCapabilityToWorker(&sched_mutex, cap); + cap = NULL; + continue; + } + } + } +#endif + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1103,7 +1212,9 @@ run_thread: ret = ThreadFinished; break; case ThreadRunGHC: + errno = t->saved_errno; ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); + t->saved_errno = errno; break; case ThreadInterpret: ret = interpretBCO(cap); @@ -1122,7 +1233,7 @@ run_thread: ACQUIRE_LOCK(&sched_mutex); #ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); + IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); #endif @@ -1492,74 +1603,104 @@ run_thread: } /* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#ifdef THREADED_RTS + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) +{ +#ifdef THREADED_RTS + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == tso) + return rtsTrue; + } +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ +static void +deleteThreadImmediately(StgTSO *tso); + StgInt -forkProcess(StgTSO* tso) +forkProcess(HsStablePtr *entry) { #ifndef mingw32_TARGET_OS pid_t pid; StgTSO* t,*next; StgMainThread *m; - rtsBool doKill; + SchedulerStatus rc; IF_DEBUG(scheduler,sched_belch("forking!")); + rts_lock(); // This not only acquires sched_mutex, it also + // makes sure that no other threads are running pid = fork(); + if (pid) { /* parent */ /* just return the pid */ + rts_unlock(); + return pid; } else { /* child */ - /* wipe all other threads */ - run_queue_hd = run_queue_tl = END_TSO_QUEUE; - tso->link = END_TSO_QUEUE; - - /* When clearing out the threads, we need to ensure - that a 'main thread' is left behind; if there isn't, - the Scheduler will shutdown next time it is entered. - - ==> we don't kill a thread that's on the main_threads - list (nor the current thread.) - [ Attempts at implementing the more ambitious scheme of - killing the main_threads also, and then adding the - current thread onto the main_threads list if it wasn't - there already, failed -- waitThread() (for one) wasn't - up to it. If it proves to be desirable to also kill - the main threads, then this scheme will have to be - revisited (and fully debugged!) - - -- sof 7/2002 - ] - */ - /* DO NOT TOUCH THE QUEUES directly because most of the code around - us is picky about finding the thread still in its queue when - handling the deleteThread() */ - - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->link; - /* Don't kill the current thread.. */ - if (t->id == tso->id) continue; - doKill=rtsTrue; - /* ..or a main thread */ - for (m = main_threads; m != NULL; m = m->link) { - if (m->tso->id == t->id) { - doKill=rtsFalse; - break; - } + // delete all threads + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + // don't allow threads to catch the ThreadKilled exception + deleteThreadImmediately(t); } - if (doKill) { - deleteThread(t); + + // wipe the main thread list + while((m = main_threads) != NULL) { + main_threads = m->link; +#ifdef THREADED_RTS + closeCondition(&m->bound_thread_cond); +#endif + stgFree(m); } + +#ifdef RTS_SUPPORTS_THREADS + resetTaskManagerAfterFork(); // tell startTask() and friends that + startingWorkerThread = rtsFalse; // we have no worker threads any more + resetWorkerWakeupPipeAfterFork(); +#endif + + rc = rts_evalStableIO(entry, NULL); // run the action + rts_checkSchedStatus("forkProcess",rc); + + rts_unlock(); + + hs_exit(); // clean up and exit + stg_exit(0); } - } - return pid; #else /* mingw32 */ - barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); - /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */ + barf("forkProcess#: primop not implemented for mingw32, sorry!\n"); return -1; #endif /* mingw32 */ } @@ -1618,6 +1759,7 @@ suspendThread( StgRegTable *reg, { nat tok; Capability *cap; + int saved_errno = errno; /* assume that *reg is a pointer to the StgRegTable part * of a Capability. @@ -1659,14 +1801,13 @@ suspendThread( StgRegTable *reg, waiting to take over. */ IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId())); - //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult - startTask(taskStart); - //} #endif /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; return tok; } @@ -1676,6 +1817,7 @@ resumeThread( StgInt tok, { StgTSO *tso, **prev; Capability *cap; + int saved_errno = errno; #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ @@ -1717,6 +1859,7 @@ resumeThread( StgInt tok, #if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); #endif + errno = saved_errno; return &cap->r; } @@ -1845,6 +1988,8 @@ createThread(nat size) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->saved_errno = 0; + tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -2016,10 +2161,8 @@ activateSpark (rtsSpark spark) } #endif -static SchedulerStatus waitThread_(/*out*/StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m, + Capability *initialCapability ); @@ -2061,7 +2204,7 @@ void scheduleThread(StgTSO* tso) } SchedulerStatus -scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability) { // Precondition: sched_mutex must be held StgMainThread *m; @@ -2070,8 +2213,12 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) m->ret = ret; m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#else initCondition(&m->wakeup); #endif +#endif /* Put the thread on the main-threads list prior to scheduling the TSO. Failure to do so introduces a race condition in the MT case (as @@ -2088,11 +2235,8 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) main_threads = m; scheduleThread_(tso); -#if defined(THREADED_RTS) - return waitThread_(m, rtsTrue); -#else - return waitThread_(m); -#endif + + return waitThread_(m, initialCapability); } /* --------------------------------------------------------------------------- @@ -2259,13 +2403,13 @@ finishAllThreads ( void ) { do { while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL); + waitThread ( run_queue_hd, NULL, NULL ); } while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } } while (blocked_queue_hd != END_TSO_QUEUE || @@ -2274,7 +2418,7 @@ finishAllThreads ( void ) } SchedulerStatus -waitThread(StgTSO *tso, /*out*/StgClosure **ret) +waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability) { StgMainThread *m; SchedulerStatus stat; @@ -2284,8 +2428,12 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->ret = ret; m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#else initCondition(&m->wakeup); #endif +#endif /* see scheduleWaitThread() comment */ ACQUIRE_LOCK(&sched_mutex); @@ -2293,45 +2441,25 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) main_threads = m; IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id)); -#if defined(THREADED_RTS) - stat = waitThread_(m, rtsFalse); -#else - stat = waitThread_(m); -#endif + + stat = waitThread_(m,initialCapability); + RELEASE_LOCK(&sched_mutex); return stat; } static SchedulerStatus -waitThread_(StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif - ) +waitThread_(StgMainThread* m, Capability *initialCapability) { SchedulerStatus stat; // Precondition: sched_mutex must be held. IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); -#if defined(RTS_SUPPORTS_THREADS) - -# if defined(THREADED_RTS) - if (!blockWaiting) { - /* In the threaded case, the OS thread that called main() - * gets to enter the RTS directly without going via another - * task/thread. - */ - main_main_thread = m; - RELEASE_LOCK(&sched_mutex); - schedule(); - ACQUIRE_LOCK(&sched_mutex); - main_main_thread = NULL; - ASSERT(m->stat != NoStatus); - } else -# endif - { +#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS) + { // FIXME: does this still make sense? + // It's not for the threaded rts => SMP only do { waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); @@ -2343,18 +2471,23 @@ waitThread_(StgMainThread* m CurrentProc = MainProc; // PE to run it on RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); #else RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); + ACQUIRE_LOCK(&sched_mutex); ASSERT(m->stat != NoStatus); #endif stat = m->stat; #if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) + closeCondition(&m->bound_thread_cond); +#else closeCondition(&m->wakeup); #endif +#endif IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", m->tso->id)); @@ -3008,6 +3141,7 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; + wakeBlockedWorkerThread(); } /* ----------------------------------------------------------------------------- @@ -3327,6 +3461,22 @@ deleteThread(StgTSO *tso) raiseAsync(tso,NULL); } +static void +deleteThreadImmediately(StgTSO *tso) +{ // for forkProcess only: + // delete thread without giving it a chance to catch the KillThread exception + + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } +#if defined(RTS_SUPPORTS_THREADS) + if (tso->why_blocked != BlockedOnCCall + && tso->why_blocked != BlockedOnCCall_NoUnblockExc) +#endif + unblockThread(tso); + tso->what_next = ThreadKilled; +} + void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) {