X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=d29b6bbad2582215bde3d6aead3466879aa40ee8;hb=85aa72b9dc6803685595936c61f3cab6faab815a;hp=c58584f6939691a91aa889903e8af8f57564e8b6;hpb=6f0dcafbc883c851aaf6d5a7c446ddaaebe23361;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index c58584f..d29b6bb 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.173 2003/08/15 12:43:57 simonmar Exp $ + * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $ * * (c) The GHC Team, 1998-2000 * @@ -126,6 +126,10 @@ #include #include +#ifdef HAVE_ERRNO_H +#include +#endif + //@node Variables and Data structures, Prototypes, Includes, Main scheduling code //@subsection Variables and Data structures @@ -134,15 +138,6 @@ */ StgMainThread *main_threads = NULL; -#ifdef THREADED_RTS -// Pointer to the thread that executes main -// When this thread is finished, the program terminates -// by calling shutdownHaskellAndExit. -// It would be better to add a call to shutdownHaskellAndExit -// to the Main.main wrapper and to remove this hack. -StgMainThread *main_main_thread = NULL; -#endif - /* Thread queues. * Locks required: sched_mutex. */ @@ -249,7 +244,7 @@ static rtsBool shutting_down_scheduler = rtsFalse; void addToBlockedQueue ( StgTSO *tso ); -static void schedule ( void ); +static void schedule ( StgMainThread *mainThread, Capability *initialCapability ); void interruptStgRts ( void ); static void detectBlackHoles ( void ); @@ -311,7 +306,7 @@ static void taskStart(void); static void taskStart(void) { - schedule(); + schedule(NULL,NULL); } #endif @@ -363,10 +358,10 @@ startSchedulerTask(void) ------------------------------------------------------------------------ */ //@cindex schedule static void -schedule( void ) +schedule( StgMainThread *mainThread, Capability *initialCapability ) { StgTSO *t; - Capability *cap; + Capability *cap = initialCapability; StgThreadReturnCode ret; #if defined(GRAN) rtsEvent *event; @@ -386,8 +381,16 @@ schedule( void ) ACQUIRE_LOCK(&sched_mutex); #if defined(RTS_SUPPORTS_THREADS) - waitForWorkCapability(&sched_mutex, &cap, rtsFalse); - IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId())); + /* in the threaded case, the capability is either passed in via the initialCapability + parameter, or initialized inside the scheduler loop */ + + IF_DEBUG(scheduler, + fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n", + osThreadId(), osThreadId())); + IF_DEBUG(scheduler, + fprintf(stderr,"### main thread: %p\n",mainThread)); + IF_DEBUG(scheduler, + fprintf(stderr,"### initial cap: %p\n",initialCapability)); #else /* simply initialise it in the non-threaded case */ grabCapability(&cap); @@ -431,8 +434,19 @@ schedule( void ) #if defined(RTS_SUPPORTS_THREADS) /* Check to see whether there are any worker threads waiting to deposit external call results. If so, - yield our capability */ - yieldToReturningWorker(&sched_mutex, &cap); + yield our capability... if we have a capability, that is. */ + if(cap) + yieldToReturningWorker(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + + /* If we do not currently hold a capability, we wait for one */ + if(!cap) + { + waitForWorkCapability(&sched_mutex, &cap, + mainThread ? &mainThread->bound_thread_cond : NULL); + IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap", + osThreadId())); + } #endif /* If we're interrupted (the user pressed ^C, or some other @@ -463,55 +477,63 @@ schedule( void ) */ #if defined(RTS_SUPPORTS_THREADS) { - StgMainThread *m, **prev; - prev = &main_threads; - for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { - switch (m->tso->what_next) { - case ThreadComplete: - if (m->ret) { - // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(m->ret) = (StgClosure *)m->tso->sp[1]; - } - *prev = m->link; - m->stat = Success; - broadcastCondition(&m->wakeup); -#ifdef DEBUG - removeThreadLabel((StgWord)m->tso); -#endif - if(m == main_main_thread) - { - releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit - RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); - } - break; - case ThreadKilled: - if (m->ret) *(m->ret) = NULL; - *prev = m->link; - if (was_interrupted) { - m->stat = Interrupted; - } else { - m->stat = Killed; - } - broadcastCondition(&m->wakeup); + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; prev = &m->link, m = m->link) { + if (m->tso->what_next == ThreadComplete + || m->tso->what_next == ThreadKilled) + { + if(m == mainThread) + { + if(m->tso->what_next == ThreadComplete) + { + if (m->ret) + { + // NOTE: return val is tso->sp[1] (see StgStartup.hc) + *(m->ret) = (StgClosure *)m->tso->sp[1]; + } + m->stat = Success; + } + else + { + if (m->ret) + { + *(m->ret) = NULL; + } + if (was_interrupted) + { + m->stat = Interrupted; + } + else + { + m->stat = Killed; + } + } + *prev = m->link; + #ifdef DEBUG - removeThreadLabel((StgWord)m->tso); + removeThreadLabel((StgWord)m->tso); #endif - if(m == main_main_thread) - { releaseCapability(cap); - startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit RELEASE_LOCK(&sched_mutex); - shutdownHaskellAndExit(EXIT_SUCCESS); + return; + } + else + { + // The current OS thread can not handle the fact that the Haskell + // thread "m" has ended. + // "m" is bound; the scheduler loop in it's bound OS thread has + // to return, so let's pass our capability directly to that thread. + passCapability(&sched_mutex, cap, &m->bound_thread_cond); + cap = NULL; + } } - break; - default: - break; } - } } - + + if(!cap) // If we gave our capability away, + continue; // go to the top to get it back + #else /* not threaded */ # if defined(PAR) @@ -1067,6 +1089,63 @@ schedule( void ) IF_DEBUG(sanity,checkTSO(t)); #endif +#ifdef THREADED_RTS + { + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == t) + break; + } + + if(m) + { + if(m == mainThread) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in bound OS thread %u\n", + t, osThreadId())); + // yes, the Haskell thread is bound to the current native thread + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p bound to other OS thread than %u\n", + t, osThreadId())); + // no, bound to a different Haskell thread: pass to that thread + PUSH_ON_RUN_QUEUE(t); + passCapability(&sched_mutex,cap,&m->bound_thread_cond); + cap = NULL; + continue; + } + } + else + { + // The thread we want to run is not bound. + if(mainThread == NULL) + { + IF_DEBUG(scheduler, + fprintf(stderr,"### Running TSO %p in worker OS thread %u\n", + t, osThreadId())); + // if we are a worker thread, + // we may run it here + } + else + { + IF_DEBUG(scheduler, + fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n", + t, mainThread, osThreadId())); + // no, the current native thread is bound to a different + // Haskell thread, so pass it to any worker thread + PUSH_ON_RUN_QUEUE(t); + releaseCapability(cap); + cap = NULL; + continue; + } + } + } +#endif + cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1103,7 +1182,9 @@ run_thread: ret = ThreadFinished; break; case ThreadRunGHC: + errno = t->saved_errno; ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); + t->saved_errno = errno; break; case ThreadInterpret: ret = interpretBCO(cap); @@ -1122,7 +1203,7 @@ run_thread: ACQUIRE_LOCK(&sched_mutex); #ifdef RTS_SUPPORTS_THREADS - IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId());); + IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId());); #elif !defined(GRAN) && !defined(PAR) IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); #endif @@ -1492,19 +1573,54 @@ run_thread: } /* --------------------------------------------------------------------------- + * rtsSupportsBoundThreads(): is the RTS built to support bound threads? + * used by Control.Concurrent for error checking. + * ------------------------------------------------------------------------- */ + +StgBool +rtsSupportsBoundThreads(void) +{ +#ifdef THREADED_RTS + return rtsTrue; +#else + return rtsFalse; +#endif +} + +/* --------------------------------------------------------------------------- + * isThreadBound(tso): check whether tso is bound to an OS thread. + * ------------------------------------------------------------------------- */ + +StgBool +isThreadBound(StgTSO* tso) +{ +#ifdef THREADED_RTS + StgMainThread *m; + for(m = main_threads; m; m = m->link) + { + if(m->tso == tso) + return rtsTrue; + } +#endif + return rtsFalse; +} + +/* --------------------------------------------------------------------------- * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ +static void +deleteThreadImmediately(StgTSO *tso); + StgInt forkProcess(StgTSO* tso) { #ifndef mingw32_TARGET_OS pid_t pid; StgTSO* t,*next; - StgMainThread *m; - rtsBool doKill; IF_DEBUG(scheduler,sched_belch("forking!")); + ACQUIRE_LOCK(&sched_mutex); pid = fork(); if (pid) { /* parent */ @@ -1512,6 +1628,43 @@ forkProcess(StgTSO* tso) /* just return the pid */ } else { /* child */ +#ifdef THREADED_RTS + /* wipe all other threads */ + run_queue_hd = run_queue_tl = END_TSO_QUEUE; + tso->link = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + /* Don't kill the current thread.. */ + if (t->id == tso->id) { + continue; + } + + if (isThreadBound(t)) { + // If the thread is bound, the OS thread that the thread is bound to + // no longer exists after the fork() system call. + // The bound Haskell thread is therefore unable to run at all; + // we must not give it a chance to survive by catching the + // ThreadKilled exception. So we kill it "brutally" rather than + // using deleteThread. + deleteThreadImmediately(t); + } else { + deleteThread(t); + } + } + + if (isThreadBound(tso)) { + } else { + // If the current is not bound, then we should make it so. + // The OS thread left over by fork() is special in that the process + // will terminate as soon as the thread terminates; + // we'd expect forkProcess to behave similarily. + // FIXME - we don't do this. + } +#else + StgMainThread *m; + rtsBool doKill; /* wipe all other threads */ run_queue_hd = run_queue_tl = END_TSO_QUEUE; tso->link = END_TSO_QUEUE; @@ -1555,7 +1708,9 @@ forkProcess(StgTSO* tso) deleteThread(t); } } +#endif } + RELEASE_LOCK(&sched_mutex); return pid; #else /* mingw32 */ barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id); @@ -1618,6 +1773,7 @@ suspendThread( StgRegTable *reg, { nat tok; Capability *cap; + int saved_errno = errno; /* assume that *reg is a pointer to the StgRegTable part * of a Capability. @@ -1667,6 +1823,8 @@ suspendThread( StgRegTable *reg, /* Other threads _might_ be available for execution; signal this */ THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); + + errno = saved_errno; return tok; } @@ -1676,6 +1834,7 @@ resumeThread( StgInt tok, { StgTSO *tso, **prev; Capability *cap; + int saved_errno = errno; #if defined(RTS_SUPPORTS_THREADS) /* Wait for permission to re-enter the RTS with the result. */ @@ -1717,6 +1876,7 @@ resumeThread( StgInt tok, #if defined(RTS_SUPPORTS_THREADS) RELEASE_LOCK(&sched_mutex); #endif + errno = saved_errno; return &cap->r; } @@ -1845,6 +2005,8 @@ createThread(nat size) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->saved_errno = 0; + tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -2016,10 +2178,8 @@ activateSpark (rtsSpark spark) } #endif -static SchedulerStatus waitThread_(/*out*/StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif +static SchedulerStatus waitThread_(/*out*/StgMainThread* m, + Capability *initialCapability ); @@ -2061,7 +2221,7 @@ void scheduleThread(StgTSO* tso) } SchedulerStatus -scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) +scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability) { // Precondition: sched_mutex must be held StgMainThread *m; @@ -2071,6 +2231,9 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) initCondition(&m->wakeup); +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#endif #endif /* Put the thread on the main-threads list prior to scheduling the TSO. @@ -2088,11 +2251,8 @@ scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret) main_threads = m; scheduleThread_(tso); -#if defined(THREADED_RTS) - return waitThread_(m, rtsTrue); -#else - return waitThread_(m); -#endif + + return waitThread_(m, initialCapability); } /* --------------------------------------------------------------------------- @@ -2259,13 +2419,13 @@ finishAllThreads ( void ) { do { while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL); + waitThread ( run_queue_hd, NULL, NULL ); } while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL); + waitThread ( blocked_queue_hd, NULL, NULL ); } } while (blocked_queue_hd != END_TSO_QUEUE || @@ -2274,7 +2434,7 @@ finishAllThreads ( void ) } SchedulerStatus -waitThread(StgTSO *tso, /*out*/StgClosure **ret) +waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability) { StgMainThread *m; SchedulerStatus stat; @@ -2285,6 +2445,9 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) initCondition(&m->wakeup); +#if defined(THREADED_RTS) + initCondition(&m->bound_thread_cond); +#endif #endif /* see scheduleWaitThread() comment */ @@ -2293,45 +2456,25 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) main_threads = m; IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id)); -#if defined(THREADED_RTS) - stat = waitThread_(m, rtsFalse); -#else - stat = waitThread_(m); -#endif + + stat = waitThread_(m,initialCapability); + RELEASE_LOCK(&sched_mutex); return stat; } static SchedulerStatus -waitThread_(StgMainThread* m -#if defined(THREADED_RTS) - , rtsBool blockWaiting -#endif - ) +waitThread_(StgMainThread* m, Capability *initialCapability) { SchedulerStatus stat; // Precondition: sched_mutex must be held. IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); -#if defined(RTS_SUPPORTS_THREADS) - -# if defined(THREADED_RTS) - if (!blockWaiting) { - /* In the threaded case, the OS thread that called main() - * gets to enter the RTS directly without going via another - * task/thread. - */ - main_main_thread = m; - RELEASE_LOCK(&sched_mutex); - schedule(); - ACQUIRE_LOCK(&sched_mutex); - main_main_thread = NULL; - ASSERT(m->stat != NoStatus); - } else -# endif - { +#if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS) + { // FIXME: does this still make sense? + // It's not for the threaded rts => SMP only do { waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); @@ -2343,10 +2486,11 @@ waitThread_(StgMainThread* m CurrentProc = MainProc; // PE to run it on RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); #else RELEASE_LOCK(&sched_mutex); - schedule(); + schedule(m,initialCapability); + ACQUIRE_LOCK(&sched_mutex); ASSERT(m->stat != NoStatus); #endif @@ -2354,6 +2498,9 @@ waitThread_(StgMainThread* m #if defined(RTS_SUPPORTS_THREADS) closeCondition(&m->wakeup); +#if defined(THREADED_RTS) + closeCondition(&m->bound_thread_cond); +#endif #endif IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", @@ -3327,6 +3474,18 @@ deleteThread(StgTSO *tso) raiseAsync(tso,NULL); } +static void +deleteThreadImmediately(StgTSO *tso) +{ // for forkProcess only: + // delete thread without giving it a chance to catch the KillThread exception + + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } + unblockThread(tso); + tso->what_next = ThreadKilled; +} + void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) {