X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FCapability.c;h=a1c7ea3269fac363ed296bd8f6051a65da0f1479;hb=2df439cce75fec8c3620f88ee193c06f2a766050;hp=2dec782fd90f269b34c31af41f6c73a08fddedf3;hpb=6d7576ef2853317e690a7c4e823d7f0bb8d9aaf0;p=ghc-hetmet.git diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 2dec782..a1c7ea3 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -21,6 +21,7 @@ #include "OSThreads.h" #include "Capability.h" #include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ +#include "Signals.h" /* to get at handleSignalsInThisThread() */ #if !defined(SMP) Capability MainCapability; /* for non-SMP, we have one global capability */ @@ -44,7 +45,7 @@ Condition returning_worker_cond = INIT_COND_VAR; * there are one or more worker threads blocked waiting on * returning_worker_cond. */ -static nat rts_n_waiting_workers = 0; +nat rts_n_waiting_workers = 0; /* thread_ready_cond: when signalled, a thread has become runnable for a * task to execute. @@ -53,14 +54,10 @@ static nat rts_n_waiting_workers = 0; * exclusive access to the RTS and all its data structures (that are not * locked by the Scheduler's mutex). * - * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold. + * thread_ready_cond is signalled whenever noCapabilities doesn't hold. * */ Condition thread_ready_cond = INIT_COND_VAR; -#if 0 -/* For documentation purposes only */ -#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE()) -#endif /* * To be able to make an informed decision about whether or not @@ -68,8 +65,8 @@ Condition thread_ready_cond = INIT_COND_VAR; * the number of tasks currently blocked waiting on thread_ready_cond. * (if > 0 => no need for a new task, just unblock an existing one). * - * waitForWork() takes care of keeping it up-to-date; Task.startTask() - * uses its current value. + * waitForWorkCapability() takes care of keeping it up-to-date; + * Task.startTask() uses its current value. */ nat rts_n_waiting_tasks = 0; #endif @@ -81,10 +78,8 @@ static void initCapability( Capability *cap ) { - cap->f.stgChk0 = (F_)__stg_chk_0; - cap->f.stgChk1 = (F_)__stg_chk_1; cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; - cap->f.stgUpdatePAP = (F_)__stg_update_PAP; + cap->f.stgGCFun = (F_)__stg_gc_fun; } #if defined(SMP) @@ -121,6 +116,8 @@ initCapabilities() #if defined(SMP) /* Free capability list. */ static Capability *free_capabilities; /* Available capabilities for running threads */ +static Capability *returning_capabilities; + /* Capabilities being passed to returning worker threads */ #endif /* ----------------------------------------------------------------------------- @@ -140,14 +137,23 @@ static Capability *free_capabilities; /* Available capabilities for running thre */ void grabCapability(Capability** cap) { +#ifdef RTS_SUPPORTS_THREADS + ASSERT(rts_n_free_capabilities > 0); +#endif #if !defined(SMP) rts_n_free_capabilities = 0; *cap = &MainCapability; + handleSignalsInThisThread(); #else *cap = free_capabilities; free_capabilities = (*cap)->link; rts_n_free_capabilities--; #endif +#ifdef RTS_SUPPORTS_THREADS + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): got capability\n", + osThreadId())); +#endif } /* @@ -163,16 +169,11 @@ void releaseCapability(Capability* cap STG_UNUSED #endif ) -{ -#if defined(SMP) - cap->link = free_capabilities; - free_capabilities = cap; - rts_n_free_capabilities++; -#else - rts_n_free_capabilities = 1; -#endif - +{ // Precondition: sched_mutex must be held #if defined(RTS_SUPPORTS_THREADS) +#ifndef SMP + ASSERT(rts_n_free_capabilities == 0); +#endif /* Check to see whether a worker thread can be given the go-ahead to return the result of an external call..*/ if (rts_n_waiting_workers > 0) { @@ -180,13 +181,33 @@ void releaseCapability(Capability* cap * thread that is yielding its capability will repeatedly * signal returning_worker_cond. */ +#if defined(SMP) + // SMP variant untested + cap->link = returning_capabilities; + returning_capabilities = cap; +#else +#endif rts_n_waiting_workers--; signalCondition(&returning_worker_cond); - } else if ( !EMPTY_RUN_QUEUE() ) { - /* Signal that work is available */ + } else /*if ( !EMPTY_RUN_QUEUE() )*/ { +#if defined(SMP) + cap->link = free_capabilities; + free_capabilities = cap; + rts_n_free_capabilities++; +#else + rts_n_free_capabilities = 1; +#endif + /* Signal that a capability is available */ signalCondition(&thread_ready_cond); + startSchedulerTaskIfNecessary(); // if there is more work to be done, + // we'll need a new thread } #endif +#ifdef RTS_SUPPORTS_THREADS + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): released capability\n", + osThreadId())); +#endif return; } @@ -214,30 +235,39 @@ void releaseCapability(Capability* cap * Function: grabReturnCapability(Capability**) * * Purpose: when an OS thread returns from an external call, - * it calls grabReturningCapability() (via Schedule.resumeThread()) + * it calls grabReturnCapability() (via Schedule.resumeThread()) * to wait for permissions to enter the RTS & communicate the - * result of the ext. call back to the Haskell thread that + * result of the external call back to the Haskell thread that * made it. * - * Pre-condition: pMutex isn't held. - * Post-condition: pMutex is held and a capability has + * Pre-condition: pMutex is held. + * Post-condition: pMutex is still held and a capability has * been assigned to the worker thread. */ void grabReturnCapability(Mutex* pMutex, Capability** pCap) { IF_DEBUG(scheduler, - fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId())); - ACQUIRE_LOCK(pMutex); - rts_n_waiting_workers++; + fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId())); IF_DEBUG(scheduler, - fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n", + fprintf(stderr,"worker (%p): returning; workers waiting: %d\n", osThreadId(), rts_n_waiting_workers)); - while ( noCapabilities() ) { + if ( noCapabilities() ) { + rts_n_waiting_workers++; + wakeBlockedWorkerThread(); + context_switch = 1; // make sure it's our turn soon waitCondition(&returning_worker_cond, pMutex); +#if defined(SMP) + *pCap = returning_capabilities; + returning_capabilities = (*pCap)->link; +#else + *pCap = &MainCapability; + ASSERT(rts_n_free_capabilities == 0); + handleSignalsInThisThread(); +#endif + } else { + grabCapability(pCap); } - - grabCapability(pCap); return; } @@ -247,42 +277,40 @@ grabReturnCapability(Mutex* pMutex, Capability** pCap) -------------------------------------------------------------------------- */ /* - * Function: yieldToReturningWorker(Mutex*,Capability*) + * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*) * * Purpose: when, upon entry to the Scheduler, an OS worker thread * spots that one or more threads are blocked waiting for * permission to return back their result, it gives up - * its Capability. + * its Capability. + * Immediately afterwards, it tries to reaquire the Capabilty + * using waitForWorkCapability. + * * * Pre-condition: pMutex is assumed held and the thread possesses * a Capability. - * Post-condition: pMutex isn't held and the Capability has - * been given back. + * Post-condition: pMutex is held and the thread possesses + * a Capability. */ void -yieldToReturningWorker(Mutex* pMutex, Capability* cap) +yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) { - if ( rts_n_waiting_workers > 0 && noCapabilities() ) { + if ( rts_n_waiting_workers > 0 ) { IF_DEBUG(scheduler, - fprintf(stderr,"worker thread (%ld): giving up RTS token\n", osThreadId())); - releaseCapability(cap); - RELEASE_LOCK(pMutex); - yieldThread(); - /* At this point, pMutex has been given up & we've - * forced a thread context switch. Guaranteed to be - * enough for the signalled worker thread to race - * ahead of us? - */ - - /* Re-grab the mutex */ - ACQUIRE_LOCK(pMutex); + fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId())); + releaseCapability(*pCap); + /* And wait for work */ + waitForWorkCapability(pMutex, pCap, pThreadCond); + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n", + osThreadId())); } return; } /* - * Function: waitForWorkCapability(Mutex*, Capability**, rtsBool) + * Function: waitForWorkCapability(Mutex*, Capability**, Condition*) * * Purpose: wait for a Capability to become available. In * the process of doing so, updates the number @@ -290,20 +318,103 @@ yieldToReturningWorker(Mutex* pMutex, Capability* cap) * work. That counter is used when deciding whether or * not to create a new worker thread when an external * call is made. + * If pThreadCond is not NULL, a capability can be specifically + * passed to this thread using passCapability. * * Pre-condition: pMutex is held. + * Post-condition: pMutex is held and *pCap is held by the current thread */ + +static Condition *passTarget = NULL; +static rtsBool passingCapability = rtsFalse; + void -waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable) +waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond) { - while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; +#ifdef SMP + #error SMP version not implemented +#endif + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n", + osThreadId(),pThreadCond)); + while ( noCapabilities() || (passingCapability && passTarget != pThreadCond)) { + if(pThreadCond) + { + waitCondition(pThreadCond, pMutex); + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): get passed capability\n", + osThreadId())); + } + else + { + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, pMutex); + rts_n_waiting_tasks--; + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): get normal capability\n", + osThreadId())); + } } + passingCapability = rtsFalse; grabCapability(pCap); return; } + +/* + * Function: passCapability(Mutex*, Capability*, Condition*) + * + * Purpose: Let go of the capability and make sure the thread associated + * with the Condition pTargetThreadCond gets it next. + * + * Pre-condition: pMutex is held and cap is held by the current thread + * Post-condition: pMutex is held; cap will be grabbed by the "target" + * thread when pMutex is released. + */ + +void +passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond) +{ +#ifdef SMP + #error SMP version not implemented +#endif + rts_n_free_capabilities = 1; + signalCondition(pTargetThreadCond); + passTarget = pTargetThreadCond; + passingCapability = rtsTrue; + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): passCapability\n", + osThreadId())); +} + +/* + * Function: passCapabilityToWorker(Mutex*, Capability*) + * + * Purpose: Let go of the capability and make sure that a + * "plain" worker thread (not a bound thread) gets it next. + * + * Pre-condition: pMutex is held and cap is held by the current thread + * Post-condition: pMutex is held; cap will be grabbed by the "target" + * thread when pMutex is released. + */ + +void +passCapabilityToWorker(Mutex* pMutex, Capability* cap) +{ +#ifdef SMP + #error SMP version not implemented +#endif + rts_n_free_capabilities = 1; + signalCondition(&thread_ready_cond); + startSchedulerTaskIfNecessary(); + passTarget = NULL; + passingCapability = rtsTrue; + IF_DEBUG(scheduler, + fprintf(stderr,"worker thread (%p): passCapabilityToWorker\n", + osThreadId())); +} + + + #endif /* RTS_SUPPORTS_THREADS */ #if defined(SMP) @@ -330,6 +441,7 @@ initCapabilities_(nat n) } free_capabilities = cap; rts_n_free_capabilities = n; + returning_capabilities = NULL; IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities);); } #endif /* SMP */