From 6d7576ef2853317e690a7c4e823d7f0bb8d9aaf0 Mon Sep 17 00:00:00 2001 From: sof Date: Fri, 15 Feb 2002 07:50:37 +0000 Subject: [PATCH] [project @ 2002-02-15 07:50:36 by sof] Tighten up the Scheduler synchronisation story some more: - moved thread_ready_cond + the counter rts_n_waiting_tasks to Capability.c, leaving only sched_mutex as a synchro variable in Scheduler (the less stuff that inhabit Schedule.c, the better, methinks.) - upon entry to the Scheduler, a worker thread will now call Capability.yieldToReturningWorker() to check whether it needs to give up its capability. - Worker threads that are either idle or lack a capability, will now call Capability.waitForWorkCapability() and block. --- ghc/rts/Capability.c | 122 +++++++++++++++++++++++++++++++++++++------------- ghc/rts/Capability.h | 5 ++- ghc/rts/Schedule.c | 119 +++++++++++++----------------------------------- ghc/rts/Schedule.h | 7 +-- 4 files changed, 126 insertions(+), 127 deletions(-) diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 6f4a945..2dec782 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -17,9 +17,10 @@ * --------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" -#include "Schedule.h" #include "RtsUtils.h" +#include "OSThreads.h" #include "Capability.h" +#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ #if !defined(SMP) Capability MainCapability; /* for non-SMP, we have one global capability */ @@ -42,12 +43,40 @@ Condition returning_worker_cond = INIT_COND_VAR; * the task(s) that enter the Scheduler will check to see whether * there are one or more worker threads blocked waiting on * returning_worker_cond. + */ +static nat rts_n_waiting_workers = 0; + +/* thread_ready_cond: when signalled, a thread has become runnable for a + * task to execute. + * + * In the non-SMP case, it also implies that the thread that is woken up has + * exclusive access to the RTS and all its data structures (that are not + * locked by the Scheduler's mutex). + * + * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold. * - * Locks needed: sched_mutex */ -nat rts_n_waiting_workers = 0; +Condition thread_ready_cond = INIT_COND_VAR; +#if 0 +/* For documentation purposes only */ +#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE()) #endif +/* + * To be able to make an informed decision about whether or not + * to create a new task when making an external call, keep track of + * the number of tasks currently blocked waiting on thread_ready_cond. + * (if > 0 => no need for a new task, just unblock an existing one). + * + * waitForWork() takes care of keeping it up-to-date; Task.startTask() + * uses its current value. + */ +nat rts_n_waiting_tasks = 0; +#endif + +/* ----------------------------------------------------------------------------- + Initialisation + -------------------------------------------------------------------------- */ static void initCapability( Capability *cap ) @@ -76,6 +105,7 @@ initCapabilities() { #if defined(RTS_SUPPORTS_THREADS) initCondition(&returning_worker_cond); + initCondition(&thread_ready_cond); #endif #if defined(SMP) @@ -88,13 +118,15 @@ initCapabilities() return; } -/* Free capability list. - * Locks required: sched_mutex. - */ #if defined(SMP) +/* Free capability list. */ static Capability *free_capabilities; /* Available capabilities for running threads */ #endif +/* ----------------------------------------------------------------------------- + Acquiring capabilities + -------------------------------------------------------------------------- */ + /* * Function: grabCapability(Capability**) * @@ -102,12 +134,9 @@ static Capability *free_capabilities; /* Available capabilities for running thre * remove one from the free capabilities list (which * may just have one entry). In threaded builds, worker * threads are prevented from doing so willy-nilly - * through the use of the sched_mutex lock along with - * condition variables thread_ready_cond and + * via the condition variables thread_ready_cond and * returning_worker_cond. * - * Pre-condition: sched_mutex is held (in threaded builds only). - * */ void grabCapability(Capability** cap) { @@ -124,10 +153,10 @@ void grabCapability(Capability** cap) /* * Function: releaseCapability(Capability*) * - * Purpose: Letting go of a capability. + * Purpose: Letting go of a capability. Causes a + * 'returning worker' thread or a 'waiting worker' + * to wake up, in that order. * - * Pre-condition: sched_mutex is assumed held by current thread. - * Post-condition: */ void releaseCapability(Capability* cap #if !defined(SMP) @@ -176,7 +205,7 @@ void releaseCapability(Capability* cap * value of rts_n_waiting_workers. If > 0, the worker thread * will yield its capability to let a returning worker thread * proceed with returning its result -- this is done via - * yieldCapability(). + * yieldToReturningWorker(). * - the worker thread that yielded its capability then tries * to re-grab a capability and re-enter the Scheduler. */ @@ -190,57 +219,91 @@ void releaseCapability(Capability* cap * result of the ext. call back to the Haskell thread that * made it. * - * Pre-condition: sched_mutex isn't held. - * Post-condition: sched_mutex is held and a capability has + * Pre-condition: pMutex isn't held. + * Post-condition: pMutex is held and a capability has * been assigned to the worker thread. */ void -grabReturnCapability(Capability** pCap) +grabReturnCapability(Mutex* pMutex, Capability** pCap) { IF_DEBUG(scheduler, - fprintf(stderr,"worker (%ld): returning, waiting for sched. lock.\n", osThreadId())); - ACQUIRE_LOCK(&sched_mutex); + fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId())); + ACQUIRE_LOCK(pMutex); rts_n_waiting_workers++; IF_DEBUG(scheduler, fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n", osThreadId(), rts_n_waiting_workers)); while ( noCapabilities() ) { - waitCondition(&returning_worker_cond, &sched_mutex); + waitCondition(&returning_worker_cond, pMutex); } grabCapability(pCap); return; } + +/* ----------------------------------------------------------------------------- + Yielding/waiting for capabilities + -------------------------------------------------------------------------- */ + /* - * Function: yieldCapability(Capability**) + * Function: yieldToReturningWorker(Mutex*,Capability*) * * Purpose: when, upon entry to the Scheduler, an OS worker thread * spots that one or more threads are blocked waiting for * permission to return back their result, it gives up * its Capability. * - * Pre-condition: sched_mutex is held and the thread possesses + * Pre-condition: pMutex is assumed held and the thread possesses * a Capability. - * Post-condition: sched_mutex isn't held and the Capability has + * Post-condition: pMutex isn't held and the Capability has * been given back. */ void -yieldCapability(Capability* cap) +yieldToReturningWorker(Mutex* pMutex, Capability* cap) { + if ( rts_n_waiting_workers > 0 && noCapabilities() ) { IF_DEBUG(scheduler, fprintf(stderr,"worker thread (%ld): giving up RTS token\n", osThreadId())); releaseCapability(cap); - RELEASE_LOCK(&sched_mutex); + RELEASE_LOCK(pMutex); yieldThread(); - /* At this point, sched_mutex has been given up & we've + /* At this point, pMutex has been given up & we've * forced a thread context switch. Guaranteed to be * enough for the signalled worker thread to race - * ahead? + * ahead of us? */ - return; + + /* Re-grab the mutex */ + ACQUIRE_LOCK(pMutex); + } + return; } + +/* + * Function: waitForWorkCapability(Mutex*, Capability**, rtsBool) + * + * Purpose: wait for a Capability to become available. In + * the process of doing so, updates the number + * of tasks currently blocked waiting for a capability/more + * work. That counter is used when deciding whether or + * not to create a new worker thread when an external + * call is made. + * + * Pre-condition: pMutex is held. + */ +void +waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable) +{ + while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) { + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, pMutex); + rts_n_waiting_tasks--; + } + grabCapability(pCap); + return; +} #endif /* RTS_SUPPORTS_THREADS */ #if defined(SMP) @@ -251,9 +314,6 @@ yieldCapability(Capability* cap) * holding 'n' Capabilities. Only for SMP, since * it is the only build that supports multiple * capabilities within the RTS. - * - * Pre-condition: sched_mutex is held. - * */ static void initCapabilities_(nat n) diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index 6aef3db..71359b6 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -37,8 +37,9 @@ extern void releaseCapability(Capability* cap); extern nat rts_n_free_capabilities; extern nat rts_n_waiting_workers; -extern void grabReturnCapability(Capability** pCap); -extern void yieldCapability(Capability* cap); +extern void grabReturnCapability(Mutex* pMutex, Capability** pCap); +extern void yieldToReturningWorker(Mutex* pMutex, Capability* cap); +extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable); static inline nat getFreeCapabilities (void) { diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index fef2795..4b2425f 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.123 2002/02/14 07:52:05 sof Exp $ + * $Id: Schedule.c,v 1.124 2002/02/15 07:50:36 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -266,38 +266,6 @@ static void sched_belch(char *s, ...); Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; - - - -/* thread_ready_cond: when signalled, a thread has become runnable for a - * task to execute. - * - * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its data structures (that are not - * under sched_mutex's control). - * - * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold. - * - */ -Condition thread_ready_cond = INIT_COND_VAR; -#if 0 -/* For documentation purposes only */ -#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE()) -#endif - -/* - * To be able to make an informed decision about whether or not - * to create a new task when making an external call, keep track of - * the number of tasks currently blocked waiting on thread_ready_cond. - * (if > 0 => no need for a new task, just unblock an existing one). - * - * waitForWork() takes care of keeping it up-to-date; Task.startTask() - * uses its current value. - */ -nat rts_n_waiting_tasks = 0; - -static void waitForWork(void); - # if defined(SMP) static Condition gc_pending_cond = INIT_COND_VAR; nat await_death; @@ -410,36 +378,19 @@ schedule( void ) # endif #endif rtsBool was_interrupted = rtsFalse; - -#if defined(RTS_SUPPORTS_THREADS) -schedule_start: -#endif -#if defined(RTS_SUPPORTS_THREADS) ACQUIRE_LOCK(&sched_mutex); -#endif #if defined(RTS_SUPPORTS_THREADS) - /* ToDo: consider SMP support */ - if ( rts_n_waiting_workers > 0 && noCapabilities() ) { - /* (At least) one native thread is waiting to - * deposit the result of an external call. So, - * be nice and hand over our capability. - */ - yieldCapability(cap); - /* Lost our sched_mutex lock, try to re-enter the scheduler. */ - goto schedule_start; - } -#endif + /* Check to see whether there are any worker threads + waiting to deposit external call results. If so, + yield our capability */ + yieldToReturningWorker(&sched_mutex, cap); -#if defined(RTS_SUPPORTS_THREADS) - while ( noCapabilities() ) { - waitForWork(); - } + waitForWorkCapability(&sched_mutex, &cap, rtsFalse); #endif #if defined(GRAN) - /* set up first event to get things going */ /* ToDo: assign costs for system setup and init MainTSO ! */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -723,16 +674,19 @@ schedule_start: if ( EMPTY_RUN_QUEUE() ) { /* Give up our capability */ releaseCapability(cap); - while ( noCapabilities() || EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); - waitForWork(); - IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE())); + IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); + waitForWorkCapability(&sched_mutex, &cap, rtsTrue); + IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); +#if 0 + while ( EMPTY_RUN_QUEUE() ) { + waitForWorkCapability(&sched_mutex, &cap); + IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId())); } +#endif } #endif #if defined(GRAN) - if (RtsFlags.GranFlags.Light) GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc @@ -978,7 +932,7 @@ schedule_start: belch("--=^ %d threads, %d sparks on [%#x]", run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); -#if 1 +# if 1 if (0 && RtsFlags.ParFlags.ParStats.Full && t && LastTSO && t->id != LastTSO->id && LastTSO->why_blocked == NotBlocked && @@ -1003,7 +957,7 @@ schedule_start: emitSchedule = rtsFalse; } -#endif +# endif #else /* !GRAN && !PAR */ /* grab a thread from the run queue */ @@ -1377,12 +1331,11 @@ schedule_start: } #endif + if (ready_to_gc #ifdef SMP - if (ready_to_gc && allFreeCapabilities() ) -#else - if (ready_to_gc) + && allFreeCapabilities() #endif - { + ) { /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -1512,7 +1465,7 @@ suspendThread( StgRegTable *reg ) for one (i.e., if there's only one Concurrent Haskell thread alive, there's no need to create a new task). */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok)); + IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok)); startTask(taskStart); #endif @@ -1528,8 +1481,8 @@ resumeThread( StgInt tok ) Capability *cap; #if defined(RTS_SUPPORTS_THREADS) - /* Wait for permission to re-enter the RTS with the result.. */ - grabReturnCapability(&cap); + /* Wait for permission to re-enter the RTS with the result. */ + grabReturnCapability(&sched_mutex, &cap); #else grabCapability(&cap); #endif @@ -1558,18 +1511,6 @@ resumeThread( StgInt tok ) } -#if defined(RTS_SUPPORTS_THREADS) -static void -waitForWork() -{ - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, &sched_mutex); - rts_n_waiting_tasks--; - return; -} -#endif - - /* --------------------------------------------------------------------------- * Static functions * ------------------------------------------------------------------------ */ @@ -1870,10 +1811,13 @@ activateSpark (rtsSpark spark) * on this thread's stack before the scheduler is invoked. * ------------------------------------------------------------------------ */ +static void scheduleThread_ (StgTSO* tso, rtsBool createTask); + void scheduleThread_(StgTSO *tso -#if defined(THREADED_RTS) , rtsBool createTask +#if !defined(THREADED_RTS) + STG_UNUSED #endif ) { @@ -1903,11 +1847,12 @@ scheduleThread_(StgTSO *tso void scheduleThread(StgTSO* tso) { -#if defined(THREADED_RTS) + return scheduleThread_(tso, rtsFalse); +} + +void scheduleExtThread(StgTSO* tso) +{ return scheduleThread_(tso, rtsTrue); -#else - return scheduleThread_(tso); -#endif } /* --------------------------------------------------------------------------- @@ -3688,7 +3633,6 @@ sched_belch(char *s, ...) //@subsection Index //@index -//* MainRegTable:: @cindex\s-+MainRegTable //* StgMainThread:: @cindex\s-+StgMainThread //* awaken_blocked_queue:: @cindex\s-+awaken_blocked_queue //* blocked_queue_hd:: @cindex\s-+blocked_queue_hd @@ -3706,5 +3650,4 @@ sched_belch(char *s, ...) //* schedule:: @cindex\s-+schedule //* take_off_run_queue:: @cindex\s-+take_off_run_queue //* term_mutex:: @cindex\s-+term_mutex -//* thread_ready_cond:: @cindex\s-+thread_ready_cond //@end index diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index 93ef030..cb12eb1 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.h,v 1.28 2002/02/13 08:48:07 sof Exp $ + * $Id: Schedule.h,v 1.29 2002/02/15 07:50:37 sof Exp $ * * (c) The GHC Team 1998-1999 * @@ -164,11 +164,6 @@ extern SchedulerStatus waitThread_(StgTSO *tso, , rtsBool blockWaiting #endif ); -extern void scheduleThread_(StgTSO *tso -#if defined(THREADED_RTS) - , rtsBool createTask -#endif - ); extern SchedulerStatus rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret); -- 1.7.10.4