[project @ 2005-05-27 14:47:08 by tharris]
[ghc-hetmet.git] / ghc / rts / Capability.c
index 8a93dc9..9e28a16 100644 (file)
 #include "OSThreads.h"
 #include "Capability.h"
 #include "Schedule.h"  /* to get at EMPTY_RUN_QUEUE() */
-#include "Signals.h" /* to get at handleSignalsInThisThread() */
+#if defined(SMP)
+#include "Hash.h"
+#endif
 
 #if !defined(SMP)
 Capability MainCapability;     /* for non-SMP, we have one global capability */
 #endif
 
+Capability *capabilities = NULL;
 nat rts_n_free_capabilities;
 
 #if defined(RTS_SUPPORTS_THREADS)
@@ -71,9 +74,6 @@ Condition thread_ready_cond = INIT_COND_VAR;
  * Task.startTask() uses its current value.
  */
 nat rts_n_waiting_tasks = 0;
-
-static Condition *passTarget = NULL;
-static rtsBool passingCapability = rtsFalse;
 #endif
 
 #if defined(SMP)
@@ -81,6 +81,11 @@ static rtsBool passingCapability = rtsFalse;
  * Free capability list. 
  */
 Capability *free_capabilities;
+
+/* 
+ * Maps OSThreadId to Capability *
+ */
+HashTable *capability_hash;
 #endif
 
 #ifdef SMP
@@ -89,12 +94,43 @@ Capability *free_capabilities;
 #define UNUSED_IF_NOT_SMP STG_UNUSED
 #endif
 
+
+#if defined(RTS_SUPPORTS_THREADS)
+INLINE_HEADER rtsBool
+ANY_WORK_FOR_ME( Condition *cond )
+{
+    // If the run queue is not empty, then we only wake up the guy who
+    // can run the thread at the head, even if there is some other
+    // reason for this task to run (eg. interrupted=rtsTrue).
+    if (!EMPTY_RUN_QUEUE()) {
+       if (run_queue_hd->main == NULL) {
+           return (cond == NULL);
+       } else {
+           return (&run_queue_hd->main->bound_thread_cond == cond);
+       }
+    }
+
+    return blackholes_need_checking
+       || interrupted
 #if defined(RTS_USER_SIGNALS)
-#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking || signals_pending())
-#else
-#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking)
+       || signals_pending()
+#endif
+       ;
+}
 #endif
 
+INLINE_HEADER rtsBool
+ANY_WORK_TO_DO(void) 
+{
+    return (!EMPTY_RUN_QUEUE() 
+           || interrupted
+           || blackholes_need_checking
+#if defined(RTS_USER_SIGNALS)
+           || signals_pending()
+#endif
+       );
+}
+
 /* ----------------------------------------------------------------------------
    Initialisation
    ------------------------------------------------------------------------- */
@@ -102,39 +138,11 @@ Capability *free_capabilities;
 static void
 initCapability( Capability *cap )
 {
+    cap->r.rInHaskell      = rtsFalse;
     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
     cap->f.stgGCFun        = (F_)__stg_gc_fun;
 }
 
-/* -----------------------------------------------------------------------------
- * Function: initCapabilities_(nat)
- *
- * Purpose:  upon startup, allocate and fill in table
- *           holding 'n' Capabilities. Only for SMP, since
- *           it is the only build that supports multiple
- *           capabilities within the RTS.
- * -------------------------------------------------------------------------- */
-#if defined(SMP)
-static void
-initCapabilities_(nat n)
-{
-  nat i;
-  Capability *cap, *prev;
-  cap  = NULL;
-  prev = NULL;
-  for (i = 0; i < n; i++) {
-    cap = stgMallocBytes(sizeof(Capability), "initCapabilities");
-    initCapability(cap);
-    cap->link = prev;
-    prev = cap;
-  }
-  free_capabilities = cap;
-  rts_n_free_capabilities = n;
-  IF_DEBUG(scheduler,
-          sched_belch("allocated %d capabilities", rts_n_free_capabilities));
-}
-#endif /* SMP */
-
 /* ---------------------------------------------------------------------------
  * Function:  initCapabilities()
  *
@@ -147,17 +155,33 @@ void
 initCapabilities( void )
 {
 #if defined(SMP)
-  initCapabilities_(RtsFlags.ParFlags.nNodes);
+    nat i,n;
+
+    n = RtsFlags.ParFlags.nNodes;
+    capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities");
+
+    for (i = 0; i < n; i++) {
+       initCapability(&capabilities[i]);
+       capabilities[i].link = &capabilities[i+1];
+    }
+    capabilities[n-1].link = NULL;
+    
+    free_capabilities = &capabilities[0];
+    rts_n_free_capabilities = n;
+
+    capability_hash = allocHashTable();
+
+    IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n));
 #else
-  initCapability(&MainCapability);
+    capabilities = &MainCapability;
+    initCapability(&MainCapability);
+    rts_n_free_capabilities = 1;
 #endif
 
 #if defined(RTS_SUPPORTS_THREADS)
-  initCondition(&returning_worker_cond);
-  initCondition(&thread_ready_cond);
+    initCondition(&returning_worker_cond);
+    initCondition(&thread_ready_cond);
 #endif
-
-  rts_n_free_capabilities = 1;
 }
 
 /* ----------------------------------------------------------------------------
@@ -178,13 +202,13 @@ grabCapability( Capability** cap )
   *cap = free_capabilities;
   free_capabilities = (*cap)->link;
   rts_n_free_capabilities--;
+  insertHashTable(capability_hash, osThreadId(), *cap);
 #else
 # if defined(RTS_SUPPORTS_THREADS)
   ASSERT(rts_n_free_capabilities == 1);
   rts_n_free_capabilities = 0;
 # endif
   *cap = &MainCapability;
-  handleSignalsInThisThread();
 #endif
 #if defined(RTS_SUPPORTS_THREADS)
   IF_DEBUG(scheduler, sched_belch("worker: got capability"));
@@ -192,6 +216,23 @@ grabCapability( Capability** cap )
 }
 
 /* ----------------------------------------------------------------------------
+ * Function:  myCapability(void)
+ *
+ * Purpose:   Return the capability owned by the current thread.
+ *            Should not be used if the current thread does not 
+ *            hold a Capability.
+ * ------------------------------------------------------------------------- */
+Capability *
+myCapability (void)
+{
+#if defined(SMP)
+    return lookupHashTable(capability_hash, osThreadId());
+#else
+    return &MainCapability;
+#endif
+}
+
+/* ----------------------------------------------------------------------------
  * Function:  releaseCapability(Capability*)
  *
  * Purpose:   Letting go of a capability. Causes a
@@ -207,46 +248,26 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
 #if !defined(SMP)
     ASSERT(rts_n_free_capabilities == 0);
 #endif
+#if defined(SMP)
+    cap->link = free_capabilities;
+    free_capabilities = cap;
+    ASSERT(myCapability() == cap);
+    removeHashTable(capability_hash, osThreadId(), NULL);
+#endif
     // Check to see whether a worker thread can be given
     // the go-ahead to return the result of an external call..
     if (rts_n_waiting_workers > 0) {
        // Decrement the counter here to avoid livelock where the
        // thread that is yielding its capability will repeatedly
        // signal returning_worker_cond.
-
-#if defined(SMP)
-       // SMP variant untested
-       cap->link = free_capabilities;
-       free_capabilities = cap;
-#endif
-
        rts_n_waiting_workers--;
        signalCondition(&returning_worker_cond);
-       IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker"));
-    } else if (passingCapability) {
-       if (passTarget == NULL) {
-           signalCondition(&thread_ready_cond);
-           startSchedulerTaskIfNecessary();
-       } else {
-           signalCondition(passTarget);
-       }
-       rts_n_free_capabilities = 1;
-       IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it"));
-
+       IF_DEBUG(scheduler, 
+                sched_belch("worker: released capability to returning worker"));
     } else {
-#if defined(SMP)
-       cap->link = free_capabilities;
-       free_capabilities = cap;
        rts_n_free_capabilities++;
-#else
-       rts_n_free_capabilities = 1;
-#endif
-       // Signal that a capability is available
-       if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) {
-           signalCondition(&thread_ready_cond);
-       }
-       startSchedulerTaskIfNecessary();
        IF_DEBUG(scheduler, sched_belch("worker: released capability"));
+       threadRunnable();
     }
 #endif
     return;
@@ -292,7 +313,7 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
             sched_belch("worker: returning; workers waiting: %d",
                         rts_n_waiting_workers));
 
-    if ( noCapabilities() || passingCapability ) {
+    if ( noCapabilities() ) {
        rts_n_waiting_workers++;
        context_switch = 1;     // make sure it's our turn soon
        waitCondition(&returning_worker_cond, pMutex);
@@ -300,11 +321,11 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
        *pCap = free_capabilities;
        free_capabilities = (*pCap)->link;
        ASSERT(pCap != NULL);
+       insertHashTable(capability_hash, osThreadId(), *pCap);
 #else
        *pCap = &MainCapability;
        ASSERT(rts_n_free_capabilities == 0);
 #endif
-       handleSignalsInThisThread();
     } else {
        grabCapability(pCap);
     }
@@ -320,16 +341,16 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
  * ------------------------------------------------------------------------- */
 
 void
-yieldCapability( Capability** pCap )
+yieldCapability( Capability** pCap, Condition *cond )
 {
     // Pre-condition:  pMutex is assumed held, the current thread
     // holds the capability pointed to by pCap.
 
-    if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) {
+    if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) {
        IF_DEBUG(scheduler, 
                 if (rts_n_waiting_workers > 0) {
                     sched_belch("worker: giving up capability (returning wkr)");
-                } else if (passingCapability) {
+                } else if (!EMPTY_RUN_QUEUE()) {
                     sched_belch("worker: giving up capability (passing capability)");
                 } else {
                     sched_belch("worker: giving up capability (no threads to run)");
@@ -360,7 +381,7 @@ yieldCapability( Capability** pCap )
  *           not to create a new worker thread when an external
  *           call is made.
  *           If pThreadCond is not NULL, a capability can be specifically
- *           passed to this thread using passCapability.
+ *           passed to this thread.
  * ------------------------------------------------------------------------- */
  
 void
@@ -368,9 +389,7 @@ waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
 {
     // Pre-condition: pMutex is held.
 
-    while ( noCapabilities() ||
-           (passingCapability && passTarget != pThreadCond) ||
-           !ANY_WORK_TO_DO()) {
+    while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) {
        IF_DEBUG(scheduler,
                 sched_belch("worker: wait for capability (cond: %p)",
                             pThreadCond));
@@ -385,43 +404,12 @@ waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
            IF_DEBUG(scheduler, sched_belch("worker: get normal capability"));
        }
     }
-    passingCapability = rtsFalse;
     grabCapability(pCap);
 
     // Post-condition: pMutex is held and *pCap is held by the current thread
     return;
 }
 
-/* ----------------------------------------------------------------------------
-   passCapability, passCapabilityToWorker
-   ------------------------------------------------------------------------- */
-
-void
-passCapability( Condition *pTargetThreadCond )
-{
-    // Pre-condition: pMutex is held and cap is held by the current thread
-
-    passTarget = pTargetThreadCond;
-    passingCapability = rtsTrue;
-    IF_DEBUG(scheduler, sched_belch("worker: passCapability"));
-
-    // Post-condition: pMutex is held; cap is still held, but will be
-    //                 passed to the target thread when next released.
-}
-
-void
-passCapabilityToWorker( void )
-{
-    // Pre-condition: pMutex is held and cap is held by the current thread
-
-    passTarget = NULL;
-    passingCapability = rtsTrue;
-    IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker"));
-
-    // Post-condition: pMutex is held; cap is still held, but will be
-    //                 passed to a worker thread when next released.
-}
-
 #endif /* RTS_SUPPORTS_THREADS */
 
 /* ----------------------------------------------------------------------------
@@ -433,14 +421,34 @@ passCapabilityToWorker( void )
    ToDo: should check whether the thread at the front of the queue is
    bound, and if so wake up the appropriate worker.
    -------------------------------------------------------------------------- */
-
 void
 threadRunnable ( void )
 {
 #if defined(RTS_SUPPORTS_THREADS)
-    if ( !noCapabilities() && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) {
-       signalCondition(&thread_ready_cond);
+    if ( !noCapabilities() && ANY_WORK_TO_DO() ) {
+       if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) {
+           signalCondition(&run_queue_hd->main->bound_thread_cond);
+           return;
+       }
+       if (rts_n_waiting_tasks > 0) {
+           signalCondition(&thread_ready_cond);
+       } else {
+           startSchedulerTaskIfNecessary();
+       }
     }
-    startSchedulerTaskIfNecessary();
+#endif
+}
+
+
+/* ----------------------------------------------------------------------------
+   prodWorker()
+
+   Wake up... time to die.
+   -------------------------------------------------------------------------- */
+void
+prodWorker ( void )
+{
+#if defined(RTS_SUPPORTS_THREADS)
+    signalCondition(&thread_ready_cond);
 #endif
 }