X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=4430f5a2aa16eca5dc321a267b7161909fea3624;hb=d1447bea8845e1c77ad8e76b16f9f5180dede4d8;hp=026ffebe417cdfcba33545023337fb88c572f34c;hpb=3b9c5eb29bbb47a5733e37c9940789342d9d6f49;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 026ffeb..4430f5a 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.114 2002/01/31 11:18:07 sof Exp $ + * $Id: Schedule.c,v 1.121 2002/02/12 15:38:08 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -112,6 +112,7 @@ #include "Sparks.h" #include "Capability.h" #include "OSThreads.h" +#include "Task.h" #include @@ -138,7 +139,7 @@ typedef struct StgMainThread_ { SchedulerStatus stat; StgClosure ** ret; #if defined(RTS_SUPPORTS_THREADS) - CondVar wakeup; + Condition wakeup; #endif struct StgMainThread_ *link; } StgMainThread; @@ -184,7 +185,9 @@ StgTSO *sleeping_queue; /* perhaps replace with a hash table? */ */ StgTSO *all_threads; -/* Threads suspended in _ccall_GC. +/* When a thread performs a safe C call (_ccall_GC, using old + * terminology), it gets put on the suspended_ccalling_threads + * list. Used by the garbage collector. */ static StgTSO *suspended_ccalling_threads; @@ -228,10 +231,6 @@ StgThreadID next_thread_id = 1; #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) -#if !defined(SMP) -Capability MainCapability; /* for non-SMP, we have one global capability */ -#endif - #if defined(GRAN) StgTSO *CurrentTSO; #endif @@ -244,13 +243,6 @@ StgTSO dummy_tso; rtsBool ready_to_gc; -/* All our current task ids, saved in case we need to kill them later. - */ -#if defined(SMP) -//@cindex task_ids -task_info *task_ids; -#endif - void addToBlockedQueue ( StgTSO *tso ); static void schedule ( void ); @@ -271,14 +263,63 @@ static void sched_belch(char *s, ...); /* ToDo: carefully document the invariants that go together * with these synchronisation objects. */ -MutexVar sched_mutex = INIT_MUTEX_VAR; -MutexVar term_mutex = INIT_MUTEX_VAR; -CondVar thread_ready_cond = INIT_COND_VAR; -CondVar gc_pending_cond = INIT_COND_VAR; +Mutex sched_mutex = INIT_MUTEX_VAR; +Mutex term_mutex = INIT_MUTEX_VAR; +#if defined(THREADED_RTS) +/* + * The rts_mutex is the 'big lock' that the active native + * thread within the RTS holds while executing code. + * It is given up when the thread makes a transition out of + * the RTS (e.g., to perform an external C call), hopefully + * for another thread to take over its chores and enter + * the RTS. + * + */ +Mutex rts_mutex = INIT_MUTEX_VAR; +/* + * When a native thread has completed executing an external + * call, it needs to communicate the result back to the + * (Haskell) thread that made the call. Do this as follows: + * + * - in resumeThread(), the thread increments the counter + * threads_waiting, and then blocks on the 'big' RTS lock. + * - upon entry to the scheduler, the thread that's currently + * holding the RTS lock checks threads_waiting. If there + * are native threads waiting, it gives up its RTS lock + * and tries to re-grab the RTS lock [perhaps after having + * waited for a bit..?] + * - care must be taken to deal with the case where more than + * one external thread are waiting on the lock. [ToDo: more] + * + */ + +static nat threads_waiting = 0; +#endif + + +/* thread_ready_cond: when signalled, a thread has become runnable for a + * task to execute. + * + * In the non-SMP case, it also implies that the thread that is woken up has + * exclusive access to the RTS and all its DS (that are not under sched_mutex's + * control). + * + * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold. + * + */ +Condition thread_ready_cond = INIT_COND_VAR; +#if 0 +/* For documentation purposes only */ +#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE()) +#endif +#if defined(SMP) +Condition gc_pending_cond = INIT_COND_VAR; nat await_death; #endif +#endif + #if defined(PAR) StgTSO *LastTSO; rtsTime TimeOfLastYield; @@ -314,6 +355,24 @@ StgTSO * activateSpark (rtsSpark spark); StgTSO *MainTSO; */ +#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) +static void taskStart(void); +static void +taskStart(void) +{ + /* threads start up using 'taskStart', so make them + them grab the RTS lock. */ +#if defined(THREADED_RTS) + ACQUIRE_LOCK(&rts_mutex); + taskNotAvailable(); +#endif + schedule(); +} +#endif + + + + //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code //@subsection Main scheduling loop @@ -374,6 +433,28 @@ schedule( void ) rtsBool was_interrupted = rtsFalse; ACQUIRE_LOCK(&sched_mutex); + +#if defined(THREADED_RTS) + /* ToDo: consider SMP support */ + if (threads_waiting > 0) { + /* (At least) one native thread is waiting to + * deposit the result of an external call. So, + * give up our RTS executing privileges and let + * one of them continue. + * + */ + taskAvailable(); + RELEASE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting)); + RELEASE_LOCK(&rts_mutex); + /* ToDo: come up with mechanism that guarantees that + * the main thread doesn't loop here. + */ + yieldThread(); + /* ToDo: longjmp() */ + taskStart(); + } +#endif #if defined(GRAN) @@ -439,7 +520,7 @@ schedule( void ) } *prev = m->link; m->stat = Success; - broadcastCondVar(&m->wakeup); + broadcastCondition(&m->wakeup); break; case ThreadKilled: if (m->ret) *(m->ret) = NULL; @@ -449,7 +530,7 @@ schedule( void ) } else { m->stat = Killed; } - broadcastCondVar(&m->wakeup); + broadcastCondition(&m->wakeup); break; default: break; @@ -525,7 +606,7 @@ schedule( void ) * work for them. */ if (getFreeCapabilities() - n > 1) { - signalCondVar ( &thread_ready_cond ); + signalCondition( &thread_ready_cond ); } } #endif // SMP @@ -543,9 +624,8 @@ schedule( void ) * ToDo: what if another client comes along & requests another * main thread? */ - if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) { - awaitEvent( - (run_queue_hd == END_TSO_QUEUE) + if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) { + awaitEvent( EMPTY_RUN_QUEUE() #if defined(SMP) && allFreeCapabilities() #endif @@ -566,19 +646,24 @@ schedule( void ) * inform all the main threads. */ #ifndef PAR - if (blocked_queue_hd == END_TSO_QUEUE - && run_queue_hd == END_TSO_QUEUE - && sleeping_queue == END_TSO_QUEUE + if ( EMPTY_QUEUE(blocked_queue_hd) + && EMPTY_RUN_QUEUE() + && EMPTY_QUEUE(sleeping_queue) #if defined(SMP) && allFreeCapabilities() +#elif defined(THREADED_RTS) + && EMPTY_QUEUE(suspended_ccalling_threads) #endif ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); + RELEASE_LOCK(&sched_mutex); GarbageCollect(GetRoots,rtsTrue); - if (blocked_queue_hd == END_TSO_QUEUE - && run_queue_hd == END_TSO_QUEUE - && sleeping_queue == END_TSO_QUEUE) { + ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("GC done.")); + if ( EMPTY_QUEUE(blocked_queue_hd) + && EMPTY_RUN_QUEUE() + && EMPTY_QUEUE(sleeping_queue) ) { IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes...")); detectBlackHoles(); @@ -588,7 +673,7 @@ schedule( void ) * build, send *all* main threads the deadlock exception, * since none of them can make progress). */ - if (run_queue_hd == END_TSO_QUEUE) { + if ( EMPTY_RUN_QUEUE() ) { StgMainThread *m; #if defined(RTS_SUPPORTS_THREADS) for (m = main_threads; m != NULL; m = m->link) { @@ -619,9 +704,14 @@ schedule( void ) } #endif } -#if !defined(RTS_SUPPORTS_THREADS) - ASSERT( run_queue_hd != END_TSO_QUEUE ); +#if defined(RTS_SUPPORTS_THREADS) + if ( EMPTY_RUN_QUEUE() ) { + IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down.")); + shutdownHaskellAndExit(0); + + } #endif + ASSERT( !EMPTY_RUN_QUEUE() ); } } #elif defined(PAR) @@ -634,23 +724,35 @@ schedule( void ) */ if (ready_to_gc) { IF_DEBUG(scheduler,sched_belch("waiting for GC")); - waitCondVar ( &gc_pending_cond, &sched_mutex ); + waitCondition( &gc_pending_cond, &sched_mutex ); } #endif -#if defined(RTS_SUPPORTS_THREADS) +#if defined(SMP) /* block until we've got a thread on the run queue and a free * capability. */ - while ( run_queue_hd == END_TSO_QUEUE -#if defined(SMP) - || noFreeCapabilities() -#endif - ) { + while ( noCapabilities() || EMPTY_RUN_QUEUE() ) { IF_DEBUG(scheduler, sched_belch("waiting for work")); - waitCondVar ( &thread_ready_cond, &sched_mutex ); + waitCondition( &thread_ready_cond, &sched_mutex ); IF_DEBUG(scheduler, sched_belch("work now available")); } +#elif defined(THREADED_RTS) + if ( EMPTY_RUN_QUEUErun_queue_hd == END_TSO_QUEUE ) { + /* no work available, wait for external calls to complete. */ + IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId())); + taskAvailable(); + RELEASE_LOCK(&rts_mutex); + + while ( EMPTY_RUN_QUEUE() ) { + waitCondition(&thread_ready_cond, &sched_mutex); + }; + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId())); + /* ToDo: longjmp() */ + taskStart(); + } #endif #if defined(GRAN) @@ -932,19 +1034,12 @@ schedule( void ) */ ASSERT(run_queue_hd != END_TSO_QUEUE); t = POP_RUN_QUEUE(); - // Sanity check the thread we're about to run. This can be // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); - #endif -#ifdef SMP grabCapability(&cap); -#else - cap = &MainCapability; -#endif - cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1323,7 +1418,7 @@ schedule( void ) GarbageCollect(GetRoots,rtsFalse); ready_to_gc = rtsFalse; #ifdef SMP - broadcastCondVar(&gc_pending_cond); + broadcastCondition(&gc_pending_cond); #endif #if defined(GRAN) /* add a ContinueThread event to continue execution of current thread */ @@ -1363,15 +1458,18 @@ schedule( void ) void deleteAllThreads ( void ) { - StgTSO* t; + StgTSO* t, *next; IF_DEBUG(scheduler,sched_belch("deleting all threads")); - for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) { + for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) { + next = t->link; deleteThread(t); } - for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) { + for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) { + next = t->link; deleteThread(t); } - for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) { + for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) { + next = t->link; deleteThread(t); } run_queue_hd = run_queue_tl = END_TSO_QUEUE; @@ -1381,6 +1479,7 @@ void deleteAllThreads ( void ) /* startThread and insertThread are now in GranSim.c -- HWL */ + //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code //@subsection Suspend and Resume @@ -1405,7 +1504,9 @@ suspendThread( StgRegTable *reg ) nat tok; Capability *cap; - // assume that *reg is a pointer to the StgRegTable part of a Capability + /* assume that *reg is a pointer to the StgRegTable part + * of a Capability. + */ cap = (Capability *)((void *)reg - sizeof(StgFunTable)); ACQUIRE_LOCK(&sched_mutex); @@ -1420,12 +1521,25 @@ suspendThread( StgRegTable *reg ) /* Use the thread ID as the token; it should be unique */ tok = cap->r.rCurrentTSO->id; -#ifdef SMP /* Hand back capability */ - releaseCapability(&cap); + releaseCapability(cap); + +#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) + /* Preparing to leave the RTS, so ensure there's a native thread/task + waiting to take over. + + ToDo: optimise this and only create a new task if there's a need + for one (i.e., if there's only one Concurrent Haskell thread alive, + there's no need to create a new task). + */ + IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok)); + startTask(taskStart); + #endif + THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); + // RELEASE_LOCK(&rts_mutex); return tok; } @@ -1435,8 +1549,26 @@ resumeThread( StgInt tok ) StgTSO *tso, **prev; Capability *cap; +#if defined(THREADED_RTS) + IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok)); ACQUIRE_LOCK(&sched_mutex); + threads_waiting++; + IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting)); + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok)); + ACQUIRE_LOCK(&rts_mutex); + threads_waiting--; + taskNotAvailable(); + IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok)); +#endif + +#if defined(THREADED_RTS) + /* Free up any RTS-blocked threads. */ + broadcastCondition(&thread_ready_cond); +#endif + /* Remove the thread off of the suspended list */ prev = &suspended_ccalling_threads; for (tso = suspended_ccalling_threads; tso != END_TSO_QUEUE; @@ -1451,20 +1583,18 @@ resumeThread( StgInt tok ) } tso->link = END_TSO_QUEUE; -#ifdef SMP - while ( noFreeCapabilities() ) { +#if defined(RTS_SUPPORTS_THREADS) + while ( noCapabilities() ) { IF_DEBUG(scheduler, sched_belch("waiting to resume")); - waitCondVar(&thread_ready_cond, &sched_mutex); + waitCondition(&thread_ready_cond, &sched_mutex); IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); } - grabCapability(&cap); -#else - cap = &MainCapability; #endif + grabCapability(&cap); + cap->r.rCurrentTSO = tso; - RELEASE_LOCK(&sched_mutex); return &cap->r; } @@ -1789,29 +1919,12 @@ scheduleThread(StgTSO *tso) } /* --------------------------------------------------------------------------- - * startTasks() - * - * Start up Posix threads to run each of the scheduler tasks. - * I believe the task ids are not needed in the system as defined. - * KH @ 25/10/99 - * ------------------------------------------------------------------------ */ - -#if defined(PAR) || defined(SMP) -void -taskStart(void) /* ( void *arg STG_UNUSED) */ -{ - schedule(); -} -#endif - -/* --------------------------------------------------------------------------- * initScheduler() * * Initialise the scheduler. This resets all the queues - if the * queues contained any threads, they'll be garbage collected at the * next pass. * - * This now calls startTasks(), so should only be called once! KH @ 25/10/99 * ------------------------------------------------------------------------ */ #ifdef SMP @@ -1858,6 +1971,27 @@ initScheduler(void) RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; + +#if defined(RTS_SUPPORTS_THREADS) + /* Initialise the mutex and condition variables used by + * the scheduler. */ + initMutex(&sched_mutex); + initMutex(&term_mutex); + + initCondition(&thread_ready_cond); +#if defined(THREADED_RTS) + initMutex(&rts_mutex); +#endif + + initCondition(&gc_pending_cond); +#endif + +#if defined(THREADED_RTS) + /* Grab big lock */ + ACQUIRE_LOCK(&rts_mutex); + IF_DEBUG(scheduler, + sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId())); +#endif /* Install the SIGHUP handler */ #ifdef SMP @@ -1873,11 +2007,19 @@ initScheduler(void) } #endif -#ifdef SMP - /* Allocate N Capabilities */ - initCapabilities(RtsFlags.ParFlags.nNodes); -#else - initCapability(&MainCapability); + /* A capability holds the state a native thread needs in + * order to execute STG code. At least one capability is + * floating around (only SMP builds have more than one). + */ + initCapabilities(); + +#if defined(RTS_SUPPORTS_THREADS) + /* start our haskell execution tasks */ +# if defined(SMP) + startTaskManager(RtsFlags.ParFlags.nNodes, taskStart); +# else + startTaskManager(0,taskStart); +# endif #endif #if /* defined(SMP) ||*/ defined(PAR) @@ -1885,68 +2027,11 @@ initScheduler(void) #endif } -#ifdef SMP -void -startTasks( void ) -{ - nat i; - int r; - OSThreadId tid; - - /* make some space for saving all the thread ids */ - task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info), - "initScheduler:task_ids"); - - /* and create all the threads */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTasks: Can't create new Posix thread"); - } - task_ids[i].id = tid; - task_ids[i].mut_time = 0.0; - task_ids[i].mut_etime = 0.0; - task_ids[i].gc_time = 0.0; - task_ids[i].gc_etime = 0.0; - task_ids[i].elapsedtimestart = stat_getElapsedTime(); - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid);); - } -} -#endif - void exitScheduler( void ) { -#ifdef SMP - nat i; - - /* Don't want to use pthread_cancel, since we'd have to install - * these silly exception handlers (pthread_cleanup_{push,pop}) around - * all our locks. - */ -#if 0 - /* Cancel all our tasks */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_cancel(task_ids[i].id); - } - - /* Wait for all the tasks to terminate */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", - task_ids[i].id)); - pthread_join(task_ids[i].id, NULL); - } -#endif - - /* Send 'em all a SIGHUP. That should shut 'em up. - */ - await_death = RtsFlags.ParFlags.nNodes; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_kill(task_ids[i].id,SIGTERM); - } - while (await_death > 0) { - sched_yield(); - } +#if defined(RTS_SUPPORTS_THREADS) + stopTaskManager(); #endif } @@ -2022,7 +2107,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->ret = ret; m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) - initCondVar(&m->wakeup); + initCondition(&m->wakeup); #endif m->link = main_threads; @@ -2033,7 +2118,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) #ifdef SMP do { - waitCondVar(&m->wakeup, &sched_mutex); + waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); #elif defined(GRAN) /* GranSim specific init */ @@ -2043,6 +2128,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) schedule(); #else + RELEASE_LOCK(&sched_mutex); schedule(); ASSERT(m->stat != NoStatus); #endif @@ -2050,7 +2136,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) stat = m->stat; #if defined(RTS_SUPPORTS_THREADS) - closeCondVar(&m->wakeup); + closeCondition(&m->wakeup); #endif IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", @@ -3589,7 +3675,6 @@ sched_belch(char *s, ...) //* sched_mutex:: @cindex\s-+sched_mutex //* schedule:: @cindex\s-+schedule //* take_off_run_queue:: @cindex\s-+take_off_run_queue -//* task_ids:: @cindex\s-+task_ids //* term_mutex:: @cindex\s-+term_mutex //* thread_ready_cond:: @cindex\s-+thread_ready_cond //@end index