From: sof Date: Mon, 4 Feb 2002 20:40:37 +0000 (+0000) Subject: [project @ 2002-02-04 20:40:36 by sof] X-Git-Tag: Approximately_9120_patches~156 X-Git-Url: http://git.megacz.com/?a=commitdiff_plain;h=be72dc058ab6bac6a5bf45e5852a4baaf74c2d23;p=ghc-hetmet.git [project @ 2002-02-04 20:40:36 by sof] Snapshot of 'native thread'-friendly extension: - call-outs now work, i.e., a Concurrent Haskell thread which makes an external (C) call no longer stop other CH threads dead in their tracks. [More testing and tightening up of invariants reqd, this is just a snapshot]. - separated task handling into sep. module. --- diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 026ffeb..bce5b2d 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.114 2002/01/31 11:18:07 sof Exp $ + * $Id: Schedule.c,v 1.115 2002/02/04 20:40:36 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -112,6 +112,7 @@ #include "Sparks.h" #include "Capability.h" #include "OSThreads.h" +#include "Task.h" #include @@ -138,7 +139,7 @@ typedef struct StgMainThread_ { SchedulerStatus stat; StgClosure ** ret; #if defined(RTS_SUPPORTS_THREADS) - CondVar wakeup; + Condition wakeup; #endif struct StgMainThread_ *link; } StgMainThread; @@ -228,10 +229,6 @@ StgThreadID next_thread_id = 1; #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) -#if !defined(SMP) -Capability MainCapability; /* for non-SMP, we have one global capability */ -#endif - #if defined(GRAN) StgTSO *CurrentTSO; #endif @@ -244,13 +241,6 @@ StgTSO dummy_tso; rtsBool ready_to_gc; -/* All our current task ids, saved in case we need to kill them later. - */ -#if defined(SMP) -//@cindex task_ids -task_info *task_ids; -#endif - void addToBlockedQueue ( StgTSO *tso ); static void schedule ( void ); @@ -271,10 +261,56 @@ static void sched_belch(char *s, ...); /* ToDo: carefully document the invariants that go together * with these synchronisation objects. */ -MutexVar sched_mutex = INIT_MUTEX_VAR; -MutexVar term_mutex = INIT_MUTEX_VAR; -CondVar thread_ready_cond = INIT_COND_VAR; -CondVar gc_pending_cond = INIT_COND_VAR; +Mutex sched_mutex = INIT_MUTEX_VAR; +Mutex term_mutex = INIT_MUTEX_VAR; +#if defined(THREADED_RTS) +/* + * The rts_mutex is the 'big lock' that the active native + * thread within the RTS holds while executing code + * within the RTS. It is given up when the thread makes a + * transition out of the RTS (e.g., to perform an external + * C call), hopefully for another thread to enter the RTS. + * + */ +Mutex rts_mutex = INIT_MUTEX_VAR; +/* + * When a native thread has completed executing an external + * call, it needs to communicate the result back to the + * (Haskell) thread that made the call. Do this as follows: + * + * - in resumeThread(), the thread increments the counter + * ext_threads_waiting, and then blocks on the + * 'big' RTS lock. + * - upon entry to the scheduler, the thread that's currently + * holding the RTS lock checks ext_threads_waiting. If there + * are native threads waiting, it gives up its RTS lock + * and tries to re-grab the RTS lock [perhaps after having + * waited for a bit..?] + * - care must be taken to deal with the case where more than + * one external thread are waiting on the lock. [ToDo: more] + * + */ + +static nat ext_threads_waiting = 0; +/* + * thread_ready_aux_mutex is used to handle the scenario where the + * the RTS executing thread runs out of work, but there are + * active external threads. The RTS executing thread gives up + * its RTS mutex, and blocks waiting for the thread_ready_cond. + * Unfortunately, a condition variable needs to be associated + * with a mutex in pthreads, so rts_thread_waiting_mutex is + * used for just this purpose. + * + */ +Mutex thread_ready_aux_mutex = INIT_MUTEX_VAR; +#endif + + +/* thread_ready_cond: when signalled, a thread has + * become runnable. When used? + */ +Condition thread_ready_cond = INIT_COND_VAR; +Condition gc_pending_cond = INIT_COND_VAR; nat await_death; #endif @@ -314,6 +350,23 @@ StgTSO * activateSpark (rtsSpark spark); StgTSO *MainTSO; */ +#if defined(PAR) || defined(RTS_SUPPORTS_THREADS) +static void taskStart(void); +static void +taskStart(void) +{ + /* threads start up using 'taskStart', so make them + them grab the RTS lock. */ +#if defined(THREADED_RTS) + ACQUIRE_LOCK(&rts_mutex); +#endif + schedule(); +} +#endif + + + + //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code //@subsection Main scheduling loop @@ -374,6 +427,25 @@ schedule( void ) rtsBool was_interrupted = rtsFalse; ACQUIRE_LOCK(&sched_mutex); + +#if defined(THREADED_RTS) + /* ToDo: consider SMP support */ + if (ext_threads_waiting > 0) { + /* (At least) one external thread is waiting to + * to deposit the result of an external call. + * Give way to one of them by giving up the RTS + * lock. + */ + RELEASE_LOCK(&sched_mutex); + RELEASE_LOCK(&rts_mutex); + /* ToDo: come up with mechanism that guarantees that + * the main thread doesn't loop here. + */ + yieldThread(); + /* ToDo: longjmp() */ + taskStart(); + } +#endif #if defined(GRAN) @@ -439,7 +511,7 @@ schedule( void ) } *prev = m->link; m->stat = Success; - broadcastCondVar(&m->wakeup); + broadcastCondition(&m->wakeup); break; case ThreadKilled: if (m->ret) *(m->ret) = NULL; @@ -449,7 +521,7 @@ schedule( void ) } else { m->stat = Killed; } - broadcastCondVar(&m->wakeup); + broadcastCondition(&m->wakeup); break; default: break; @@ -525,7 +597,7 @@ schedule( void ) * work for them. */ if (getFreeCapabilities() - n > 1) { - signalCondVar ( &thread_ready_cond ); + signalCondition( &thread_ready_cond ); } } #endif // SMP @@ -571,11 +643,16 @@ schedule( void ) && sleeping_queue == END_TSO_QUEUE #if defined(SMP) && allFreeCapabilities() +#elif defined(THREADED_RTS) + && suspended_ccalling_threads == END_TSO_QUEUE #endif ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); + RELEASE_LOCK(&sched_mutex); GarbageCollect(GetRoots,rtsTrue); + ACQUIRE_LOCK(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("GC done.")); if (blocked_queue_hd == END_TSO_QUEUE && run_queue_hd == END_TSO_QUEUE && sleeping_queue == END_TSO_QUEUE) { @@ -619,9 +696,14 @@ schedule( void ) } #endif } -#if !defined(RTS_SUPPORTS_THREADS) - ASSERT( run_queue_hd != END_TSO_QUEUE ); +#if defined(RTS_SUPPORTS_THREADS) + if ( run_queue_hd == END_TSO_QUEUE ) { + IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down.")); + shutdownHaskellAndExit(0); + + } #endif + ASSERT( run_queue_hd != END_TSO_QUEUE ); } } #elif defined(PAR) @@ -634,23 +716,37 @@ schedule( void ) */ if (ready_to_gc) { IF_DEBUG(scheduler,sched_belch("waiting for GC")); - waitCondVar ( &gc_pending_cond, &sched_mutex ); + waitCondition( &gc_pending_cond, &sched_mutex ); } #endif -#if defined(RTS_SUPPORTS_THREADS) +#if defined(SMP) /* block until we've got a thread on the run queue and a free * capability. */ while ( run_queue_hd == END_TSO_QUEUE -#if defined(SMP) || noFreeCapabilities() -#endif ) { IF_DEBUG(scheduler, sched_belch("waiting for work")); - waitCondVar ( &thread_ready_cond, &sched_mutex ); + waitCondition( &thread_ready_cond, &sched_mutex ); IF_DEBUG(scheduler, sched_belch("work now available")); } +#elif defined(THREADED_RTS) + if ( run_queue_hd == END_TSO_QUEUE ) { + /* no work available, wait for external calls to complete. */ + IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId())); + RELEASE_LOCK(&sched_mutex); + RELEASE_LOCK(&rts_mutex); + /* Sigh - need to have a mutex locked in order to wait on the + condition variable. */ + ACQUIRE_LOCK(&thread_ready_aux_mutex); + waitCondition(&thread_ready_cond, &thread_ready_aux_mutex); + RELEASE_LOCK(&thread_ready_aux_mutex); + IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId())); + /* ToDo: longjmp() */ + taskStart(); + + } #endif #if defined(GRAN) @@ -932,19 +1028,12 @@ schedule( void ) */ ASSERT(run_queue_hd != END_TSO_QUEUE); t = POP_RUN_QUEUE(); - // Sanity check the thread we're about to run. This can be // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); - #endif -#ifdef SMP grabCapability(&cap); -#else - cap = &MainCapability; -#endif - cap->r.rCurrentTSO = t; /* context switches are now initiated by the timer signal, unless @@ -1323,7 +1412,7 @@ schedule( void ) GarbageCollect(GetRoots,rtsFalse); ready_to_gc = rtsFalse; #ifdef SMP - broadcastCondVar(&gc_pending_cond); + broadcastCondition(&gc_pending_cond); #endif #if defined(GRAN) /* add a ContinueThread event to continue execution of current thread */ @@ -1381,6 +1470,7 @@ void deleteAllThreads ( void ) /* startThread and insertThread are now in GranSim.c -- HWL */ + //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code //@subsection Suspend and Resume @@ -1405,7 +1495,9 @@ suspendThread( StgRegTable *reg ) nat tok; Capability *cap; - // assume that *reg is a pointer to the StgRegTable part of a Capability + /* assume that *reg is a pointer to the StgRegTable part + * of a Capability. + */ cap = (Capability *)((void *)reg - sizeof(StgFunTable)); ACQUIRE_LOCK(&sched_mutex); @@ -1420,12 +1512,16 @@ suspendThread( StgRegTable *reg ) /* Use the thread ID as the token; it should be unique */ tok = cap->r.rCurrentTSO->id; -#ifdef SMP /* Hand back capability */ releaseCapability(&cap); + +#if defined(RTS_SUPPORTS_THREADS) && !defined(SMP) + IF_DEBUG(scheduler, sched_belch("thread %d leaving RTS\n", tok)); + startTask(taskStart); #endif - + RELEASE_LOCK(&sched_mutex); + RELEASE_LOCK(&rts_mutex); return tok; } @@ -1435,8 +1531,23 @@ resumeThread( StgInt tok ) StgTSO *tso, **prev; Capability *cap; + IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok)); ACQUIRE_LOCK(&sched_mutex); + ext_threads_waiting++; + IF_DEBUG(scheduler, sched_belch("thread %d returning, ext_thread count: %d.\n", tok, ext_threads_waiting)); + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok)); + ACQUIRE_LOCK(&rts_mutex); + ext_threads_waiting--; + IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok)); +#if defined(THREADED_RTS) + /* Free up any RTS-blocked threads. */ + broadcastCondition(&thread_ready_cond); +#endif + + /* Remove the thread off of the suspended list */ prev = &suspended_ccalling_threads; for (tso = suspended_ccalling_threads; tso != END_TSO_QUEUE; @@ -1451,20 +1562,18 @@ resumeThread( StgInt tok ) } tso->link = END_TSO_QUEUE; -#ifdef SMP +#if defined(SMP) while ( noFreeCapabilities() ) { IF_DEBUG(scheduler, sched_belch("waiting to resume")); - waitCondVar(&thread_ready_cond, &sched_mutex); + waitCondition(&thread_ready_cond, &sched_mutex); IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); } - grabCapability(&cap); -#else - cap = &MainCapability; #endif + grabCapability(&cap); + cap->r.rCurrentTSO = tso; - RELEASE_LOCK(&sched_mutex); return &cap->r; } @@ -1789,29 +1898,12 @@ scheduleThread(StgTSO *tso) } /* --------------------------------------------------------------------------- - * startTasks() - * - * Start up Posix threads to run each of the scheduler tasks. - * I believe the task ids are not needed in the system as defined. - * KH @ 25/10/99 - * ------------------------------------------------------------------------ */ - -#if defined(PAR) || defined(SMP) -void -taskStart(void) /* ( void *arg STG_UNUSED) */ -{ - schedule(); -} -#endif - -/* --------------------------------------------------------------------------- * initScheduler() * * Initialise the scheduler. This resets all the queues - if the * queues contained any threads, they'll be garbage collected at the * next pass. * - * This now calls startTasks(), so should only be called once! KH @ 25/10/99 * ------------------------------------------------------------------------ */ #ifdef SMP @@ -1858,6 +1950,27 @@ initScheduler(void) RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; + +#if defined(RTS_SUPPORTS_THREADS) + /* Initialise the mutex and condition variables used by + * the scheduler. */ + initMutex(&rts_mutex); + initMutex(&sched_mutex); + initMutex(&term_mutex); +#if defined(THREADED_RTS) + initMutex(&thread_ready_aux_mutex); +#endif + + initCondition(&thread_ready_cond); + initCondition(&gc_pending_cond); +#endif + +#if defined(THREADED_RTS) + /* Grab big lock */ + ACQUIRE_LOCK(&rts_mutex); + IF_DEBUG(scheduler, + sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId())); +#endif /* Install the SIGHUP handler */ #ifdef SMP @@ -1873,11 +1986,19 @@ initScheduler(void) } #endif -#ifdef SMP - /* Allocate N Capabilities */ - initCapabilities(RtsFlags.ParFlags.nNodes); -#else - initCapability(&MainCapability); + /* A capability holds the state a native thread needs in + * order to execute STG code. At least one capability is + * floating around (only SMP builds have more than one). + */ + initCapabilities(); + +#if defined(RTS_SUPPORTS_THREADS) + /* start our haskell execution tasks */ +# if defined(SMP) + startTaskManager(RtsFlags.ParFlags.nNodes, taskStart); +# else + startTaskManager(0,taskStart); +# endif #endif #if /* defined(SMP) ||*/ defined(PAR) @@ -1885,68 +2006,11 @@ initScheduler(void) #endif } -#ifdef SMP -void -startTasks( void ) -{ - nat i; - int r; - OSThreadId tid; - - /* make some space for saving all the thread ids */ - task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info), - "initScheduler:task_ids"); - - /* and create all the threads */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTasks: Can't create new Posix thread"); - } - task_ids[i].id = tid; - task_ids[i].mut_time = 0.0; - task_ids[i].mut_etime = 0.0; - task_ids[i].gc_time = 0.0; - task_ids[i].gc_etime = 0.0; - task_ids[i].elapsedtimestart = stat_getElapsedTime(); - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid);); - } -} -#endif - void exitScheduler( void ) { -#ifdef SMP - nat i; - - /* Don't want to use pthread_cancel, since we'd have to install - * these silly exception handlers (pthread_cleanup_{push,pop}) around - * all our locks. - */ -#if 0 - /* Cancel all our tasks */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_cancel(task_ids[i].id); - } - - /* Wait for all the tasks to terminate */ - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", - task_ids[i].id)); - pthread_join(task_ids[i].id, NULL); - } -#endif - - /* Send 'em all a SIGHUP. That should shut 'em up. - */ - await_death = RtsFlags.ParFlags.nNodes; - for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { - pthread_kill(task_ids[i].id,SIGTERM); - } - while (await_death > 0) { - sched_yield(); - } +#if defined(RTS_SUPPORTS_THREADS) + stopTaskManager(); #endif } @@ -2022,7 +2086,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->ret = ret; m->stat = NoStatus; #if defined(RTS_SUPPORTS_THREADS) - initCondVar(&m->wakeup); + initCondition(&m->wakeup); #endif m->link = main_threads; @@ -2033,7 +2097,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) #ifdef SMP do { - waitCondVar(&m->wakeup, &sched_mutex); + waitCondition(&m->wakeup, &sched_mutex); } while (m->stat == NoStatus); #elif defined(GRAN) /* GranSim specific init */ @@ -2043,6 +2107,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) schedule(); #else + RELEASE_LOCK(&sched_mutex); schedule(); ASSERT(m->stat != NoStatus); #endif @@ -2050,7 +2115,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) stat = m->stat; #if defined(RTS_SUPPORTS_THREADS) - closeCondVar(&m->wakeup); + closeCondition(&m->wakeup); #endif IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", @@ -3589,7 +3654,6 @@ sched_belch(char *s, ...) //* sched_mutex:: @cindex\s-+sched_mutex //* schedule:: @cindex\s-+schedule //* take_off_run_queue:: @cindex\s-+take_off_run_queue -//* task_ids:: @cindex\s-+task_ids //* term_mutex:: @cindex\s-+term_mutex //* thread_ready_cond:: @cindex\s-+thread_ready_cond //@end index diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index b81087d..d728292 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.h,v 1.25 2001/11/22 14:25:12 simonmar Exp $ + * $Id: Schedule.h,v 1.26 2002/02/04 20:40:37 sof Exp $ * * (c) The GHC Team 1998-1999 * @@ -7,6 +7,9 @@ * (RTS internal scheduler interface) * * -------------------------------------------------------------------------*/ +#ifndef __SCHEDULE_H__ +#define __SCHEDULE_H__ +#include "OSThreads.h" //@menu //* Scheduler Functions:: @@ -20,17 +23,13 @@ //@cindex initScheduler //@cindex exitScheduler -//@cindex startTasks /* initScheduler(), exitScheduler(), startTasks() * * Called from STG : no * Locks assumed : none */ -void initScheduler( void ); -void exitScheduler( void ); -#ifdef SMP -void startTasks( void ); -#endif +extern void initScheduler ( void ); +extern void exitScheduler ( void ); //@cindex awakenBlockedQueue /* awakenBlockedQueue() @@ -157,31 +156,13 @@ extern StgTSO *sleeping_queue; /* Linked list of all threads. */ extern StgTSO *all_threads; -#ifdef SMP -//@cindex sched_mutex -//@cindex thread_ready_cond -//@cindex gc_pending_cond -extern pthread_mutex_t sched_mutex; -extern pthread_cond_t thread_ready_cond; -extern pthread_cond_t gc_pending_cond; -#endif - -//@cindex task_info -#ifdef SMP -typedef struct { - pthread_t id; - double elapsedtimestart; - double mut_time; - double mut_etime; - double gc_time; - double gc_etime; -} task_info; - -extern task_info *task_ids; +#if defined(RTS_SUPPORTS_THREADS) +extern Mutex sched_mutex; +extern Condition thread_ready_cond; +extern Condition gc_pending_cond; #endif -/* Needed by Hugs. - */ +/* Called by shutdown_handler(). */ void interruptStgRts ( void ); void raiseAsync(StgTSO *tso, StgClosure *exception); @@ -268,7 +249,7 @@ void print_bqe (StgBlockingQueueElement *bqe); #ifdef SMP #define THREAD_RUNNABLE() \ if (free_capabilities != NULL) { \ - pthread_cond_signal(&thread_ready_cond); \ + signalCondition(&thread_ready_cond); \ } \ context_switch = 1; #else @@ -280,16 +261,16 @@ void print_bqe (StgBlockingQueueElement *bqe); */ #define EMPTY_RUN_QUEUE() (run_queue_hd == END_TSO_QUEUE) +#endif /* __SCHEDULE_H__ */ + //@node Index, , Some convenient macros //@subsection Index //@index //* APPEND_TO_BLOCKED_QUEUE:: @cindex\s-+APPEND_TO_BLOCKED_QUEUE //* APPEND_TO_RUN_QUEUE:: @cindex\s-+APPEND_TO_RUN_QUEUE -//* Capability:: @cindex\s-+Capability //* POP_RUN_QUEUE :: @cindex\s-+POP_RUN_QUEUE //* PUSH_ON_RUN_QUEUE:: @cindex\s-+PUSH_ON_RUN_QUEUE -//* THREAD_RUNNABLE:: @cindex\s-+THREAD_RUNNABLE //* awaitEvent:: @cindex\s-+awaitEvent //* awakenBlockedQueue:: @cindex\s-+awakenBlockedQueue //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue @@ -298,9 +279,6 @@ void print_bqe (StgBlockingQueueElement *bqe); //* gc_pending_cond:: @cindex\s-+gc_pending_cond //* initScheduler:: @cindex\s-+initScheduler //* raiseAsync:: @cindex\s-+raiseAsync -//* sched_mutex:: @cindex\s-+sched_mutex //* startTasks:: @cindex\s-+startTasks -//* task_info:: @cindex\s-+task_info -//* thread_ready_cond:: @cindex\s-+thread_ready_cond //* unblockOne:: @cindex\s-+unblockOne //@end index