From efa41d9d5eada7aa3230a2bd03b97a8b7025ef2e Mon Sep 17 00:00:00 2001 From: sof Date: Thu, 14 Feb 2002 07:52:05 +0000 Subject: [PATCH] [project @ 2002-02-14 07:52:05 by sof] Restructured / tidied a bit: * Capability.grabReturnCapability() is now called by resumeThread(). It takes care of waiting on the (Capability.c-local) condition variable, 'returning_worker_cond' (moved here from Schedule.c) * If a worker notices upon entry to the Scheduler that there are worker threads waiting to deposit results of external calls, it gives up its capability by calling Capability.yieldCapability(). * Added Scheduler.waitForWork(), which takes care of blocking on 'thread_ready_cond' (+ 'rts_n_waiting_tasks' book-keeping). Note: changes haven't been fully tested, due to HEAD instability. --- ghc/rts/Capability.c | 142 ++++++++++++++++++++++++++++++++++++++++++++++---- ghc/rts/Capability.h | 7 ++- ghc/rts/Schedule.c | 108 ++++++++++---------------------------- 3 files changed, 163 insertions(+), 94 deletions(-) diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index d2a2ef8..77096a9 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,20 +1,18 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 2001 + * (c) The GHC Team, 2002 * * Capabilities * - * The notion of a capability is used when operating in multi-threaded - * environments (which the SMP and Threads builds of the RTS do), to - * hold all the state an OS thread/task needs to run Haskell code: + * A Capability represent the token required to execute STG code, + * and all the state an OS thread/task needs to run Haskell code: * its STG registers, a pointer to its TSO, a nursery etc. During - * STG execution, a pointer to the capabilitity is kept in a + * STG execution, a pointer to the capabilitity is kept in a * register (BaseReg). * * Only in an SMP build will there be multiple capabilities, the threaded * RTS and other non-threaded builds, there is one global capability, * namely MainRegTable. - * * * --------------------------------------------------------------------------*/ #include "PosixSource.h" @@ -29,6 +27,27 @@ Capability MainCapability; /* for non-SMP, we have one global capability */ nat rts_n_free_capabilities; +#if defined(RTS_SUPPORTS_THREADS) +/* returning_worker_cond: when a worker thread returns from executing an + * external call, it needs to wait for an RTS Capability before passing + * on the result of the call to the Haskell thread that made it. + * + * returning_worker_cond is signalled in Capability.releaseCapability(). + * + */ +Condition returning_worker_cond = INIT_COND_VAR; + +/* + * To avoid starvation of threads blocked on worker_thread_cond, + * the task(s) that enter the Scheduler will check to see whether + * there are one or more worker threads blocked waiting on + * returning_worker_cond. + * + * Locks needed: sched_mutex + */ +nat rts_n_waiting_workers = 0; +#endif + static void initCapability( Capability *cap ) @@ -48,6 +67,10 @@ static void initCapabilities_(nat n); void initCapabilities() { +#if defined(RTS_SUPPORTS_THREADS) + initCondition(returning_worker_cond); +#endif + #if defined(SMP) initCapabilities_(RtsFlags.ParFlags.nNodes); #else @@ -78,9 +101,12 @@ void grabCapability(Capability** cap) } /* - * Letting go of a capability + * Function: releaseCapability(Capability*) + * + * Purpose: Letting go of a capability. * - * Locks required: sched_mutex + * Pre-condition: sched_mutex is assumed held by current thread. + * Post-condition: */ void releaseCapability(Capability* cap #if !defined(SMP) @@ -100,9 +126,11 @@ void releaseCapability(Capability* cap /* Check to see whether a worker thread can be given the go-ahead to return the result of an external call..*/ if (rts_n_waiting_workers > 0) { - /* The worker is responsible for grabbing the capability and - * decrementing the rts_n_returning_workers count + /* Decrement the counter here to avoid livelock where the + * thread that is yielding its capability will repeatedly + * signal returning_worker_cond. */ + rts_n_waiting_workers--; signalCondition(&returning_worker_cond); } else if ( !EMPTY_RUN_QUEUE() ) { /* Signal that work is available */ @@ -112,8 +140,100 @@ void releaseCapability(Capability* cap return; } +#if defined(RTS_SUPPORTS_THREADS) +/* + * When a native thread has completed the execution of an external + * call, it needs to communicate the result back. This is done + * as follows: + * + * - in resumeThread(), the thread calls grabReturnCapability(). + * - If no capabilities are readily available, grabReturnCapability() + * increments a counter rts_n_waiting_workers, and blocks + * waiting for the condition returning_worker_cond to become + * signalled. + * - upon entry to the Scheduler, a worker thread checks the + * value of rts_n_waiting_workers. If > 0, the worker thread + * will yield its capability to let a returning worker thread + * proceed with returning its result -- this is done via + * yieldCapability(). + * - the worker thread that yielded its capability then tries + * to re-grab a capability and re-enter the Scheduler. + */ + +/* + * Function: grabReturnCapability(Capability**) + * + * Purpose: when an OS thread returns from an external call, + * it calls grabReturningCapability() (via Schedule.resumeThread()) + * to wait for permissions to enter the RTS & communicate the + * result of the ext. call back to the Haskell thread that + * made it. + * + * Pre-condition: sched_mutex isn't held. + * Post-condition: sched_mutex is held and a capability has + * been assigned to the worker thread. + */ +void +grabReturnCapability(Capability** pCap) +{ + IF_DEBUG(scheduler, + sched_belch("thread %d: returning, waiting for sched. lock.\n", osThreadId())); + ACQUIRE_LOCK(&sched_mutex); + rts_n_waiting_workers++; + IF_DEBUG(scheduler, + sched_belch("worker (%d,%d): returning; workers waiting: %d\n", + tok, osThreadId(), rts_n_waiting_workers)); + while ( noCapabilities() ) { + waitCondition(&returning_worker_cond, &sched_mutex); + } + + grabCapability(pCap); + return; +} + +/* + * Function: yieldCapability(Capability**) + * + * Purpose: when, upon entry to the Scheduler, an OS worker thread + * spots that one or more threads are blocked waiting for + * permission to return back their result, it gives up + * its Capability. + * + * Pre-condition: sched_mutex is held and the thread possesses + * a Capability. + * Post-condition: sched_mutex isn't held and the Capability has + * been given back. + */ +void +yieldCapability(Capability* cap) +{ + IF_DEBUG(scheduler, + sched_belch("worker thread (%d): giving up RTS token\n", osThreadId())); + releaseCapability(cap); + RELEASE_LOCK(&sched_mutex); + yieldThread(); + /* At this point, sched_mutex has been given up & we've + * forced a thread context switch. Guaranteed to be + * enough for the signalled worker thread to race + * ahead? + */ + return; +} + +#endif /* RTS_SUPPORTS_THREADS */ + #if defined(SMP) -/* Allocate 'n' capabilities */ +/* + * Function: initCapabilities_(nat) + * + * Purpose: upon startup, allocate and fill in table + * holding 'n' Capabilities. Only for SMP, since + * it is the only build that supports multiple + * capabilities within the RTS. + * + * Pre-condition: sched_mutex is held. + * + */ static void initCapabilities_(nat n) { diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index b878507..6aef3db 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -28,14 +28,17 @@ extern Capability MainCapability; #endif - extern void initCapabilities(void); -extern void grabCapability(Capability** cap); +extern void grabCapability(Capability** pCap); extern void releaseCapability(Capability* cap); #if defined(RTS_SUPPORTS_THREADS) /* total number of available capabilities */ extern nat rts_n_free_capabilities; +extern nat rts_n_waiting_workers; + +extern void grabReturnCapability(Capability** pCap); +extern void yieldCapability(Capability* cap); static inline nat getFreeCapabilities (void) { diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index d73559e..fef2795 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $ + * $Id: Schedule.c,v 1.123 2002/02/14 07:52:05 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -267,21 +267,6 @@ Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; -/* - * When a native thread has completed executing an external - * call, it needs to communicate the result back to the - * (Haskell) thread that made the call. Do this as follows: - * - * - in resumeThread(), the thread increments the counter - * rts_n_returning_workers, and then blocks waiting on the - * condition returning_worker_cond. - * - upon entry to the scheduler, a worker/task checks - * rts_n_returning_workers. If it is > 0, worker threads - * are waiting to return, so it gives up its capability - * to let a worker deposit its result. - * - the worker thread that gave up its capability then tries - * to re-grab a capability and re-enter the Scheduler. - */ /* thread_ready_cond: when signalled, a thread has become runnable for a @@ -305,28 +290,13 @@ Condition thread_ready_cond = INIT_COND_VAR; * to create a new task when making an external call, keep track of * the number of tasks currently blocked waiting on thread_ready_cond. * (if > 0 => no need for a new task, just unblock an existing one). - */ -nat rts_n_waiting_tasks = 0; - -/* returning_worker_cond: when a worker thread returns from executing an - * external call, it needs to wait for an RTS Capability before passing - * on the result of the call to the Haskell thread that made it. - * - * returning_worker_cond is signalled in Capability.releaseCapability(). - * - */ -Condition returning_worker_cond = INIT_COND_VAR; - -/* - * To avoid starvation of threads blocked on worker_thread_cond, - * the task(s) that enter the Scheduler will check to see whether - * there are one or more worker threads blocked waiting on - * returning_worker_cond. * - * Locks needed: sched_mutex + * waitForWork() takes care of keeping it up-to-date; Task.startTask() + * uses its current value. */ -nat rts_n_waiting_workers = 0; +nat rts_n_waiting_tasks = 0; +static void waitForWork(void); # if defined(SMP) static Condition gc_pending_cond = INIT_COND_VAR; @@ -456,20 +426,15 @@ schedule_start: * deposit the result of an external call. So, * be nice and hand over our capability. */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers)); - releaseCapability(cap); - RELEASE_LOCK(&sched_mutex); - - yieldThread(); + yieldCapability(cap); + /* Lost our sched_mutex lock, try to re-enter the scheduler. */ goto schedule_start; } #endif #if defined(RTS_SUPPORTS_THREADS) while ( noCapabilities() ) { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, &sched_mutex); - rts_n_waiting_tasks--; + waitForWork(); } #endif @@ -731,7 +696,6 @@ schedule_start: if ( EMPTY_RUN_QUEUE() ) { IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); shutdownHaskellAndExit(0); - } #endif ASSERT( !EMPTY_RUN_QUEUE() ); @@ -761,9 +725,7 @@ schedule_start: releaseCapability(cap); while ( noCapabilities() || EMPTY_RUN_QUEUE() ) { IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); - rts_n_waiting_tasks++; - waitCondition( &thread_ready_cond, &sched_mutex ); - rts_n_waiting_tasks--; + waitForWork(); IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE())); } } @@ -1566,21 +1528,10 @@ resumeThread( StgInt tok ) Capability *cap; #if defined(RTS_SUPPORTS_THREADS) - IF_DEBUG(scheduler, sched_belch("worker %d: returning, waiting for sched. lock.\n", tok)); - ACQUIRE_LOCK(&sched_mutex); - rts_n_waiting_workers++; - IF_DEBUG(scheduler, sched_belch("worker %d: returning; workers waiting: %d.\n", tok, rts_n_waiting_workers)); - - /* - * Wait for the go ahead - */ - IF_DEBUG(scheduler, sched_belch("worker %d: waiting for capability %d...\n", tok, rts_n_free_capabilities)); - while ( noCapabilities() ) { - waitCondition(&returning_worker_cond, &sched_mutex); - } - rts_n_waiting_workers--; - - IF_DEBUG(scheduler, sched_belch("worker %d: acquired capability...\n", tok)); + /* Wait for permission to re-enter the RTS with the result.. */ + grabReturnCapability(&cap); +#else + grabCapability(&cap); #endif /* Remove the thread off of the suspended list */ @@ -1597,32 +1548,28 @@ resumeThread( StgInt tok ) barf("resumeThread: thread not found"); } tso->link = END_TSO_QUEUE; - -#if defined(RTS_SUPPORTS_THREADS) - /* Is it clever to block here with the TSO off the list, - * but not hooked up to a capability? - */ - while ( noCapabilities() ) { - IF_DEBUG(scheduler, sched_belch("waiting to resume")); - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, &sched_mutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); - } -#endif - - grabCapability(&cap); - RELEASE_LOCK(&sched_mutex); - /* Reset blocking status */ tso->why_blocked = NotBlocked; - cap->r.rCurrentTSO = tso; + RELEASE_LOCK(&sched_mutex); + cap->r.rCurrentTSO = tso; return &cap->r; } +#if defined(RTS_SUPPORTS_THREADS) +static void +waitForWork() +{ + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, &sched_mutex); + rts_n_waiting_tasks--; + return; +} +#endif + + /* --------------------------------------------------------------------------- * Static functions * ------------------------------------------------------------------------ */ @@ -2024,7 +1971,6 @@ initScheduler(void) initMutex(&term_mutex); initCondition(&thread_ready_cond); - initCondition(&returning_worker_cond); #endif #if defined(SMP) -- 1.7.10.4