[project @ 2005-04-06 15:27:06 by simonmar]
[ghc-hetmet.git] / ghc / rts / Capability.c
index 74d50ac..bdb651d 100644 (file)
@@ -1,6 +1,5 @@
 /* ---------------------------------------------------------------------------
- *
- * (c) The GHC Team, 2002
+ * (c) The GHC Team, 2003
  *
  * Capabilities
  *
  * one global capability, namely MainCapability.
  * 
  * --------------------------------------------------------------------------*/
+
 #include "PosixSource.h"
 #include "Rts.h"
 #include "RtsUtils.h"
+#include "RtsFlags.h"
 #include "OSThreads.h"
 #include "Capability.h"
 #include "Schedule.h"  /* to get at EMPTY_RUN_QUEUE() */
@@ -30,6 +31,7 @@ Capability MainCapability;     /* for non-SMP, we have one global capability */
 nat rts_n_free_capabilities;
 
 #if defined(RTS_SUPPORTS_THREADS)
+
 /* returning_worker_cond: when a worker thread returns from executing an
  * external call, it needs to wait for an RTS Capability before passing
  * on the result of the call to the Haskell thread that made it.
@@ -54,8 +56,8 @@ nat rts_n_waiting_workers = 0;
  * exclusive access to the RTS and all its data structures (that are not
  * locked by the Scheduler's mutex).
  *
- * thread_ready_cond is signalled whenever noCapabilities doesn't hold.
- *
+ * thread_ready_cond is signalled whenever
+ *      !noCapabilities && !EMPTY_RUN_QUEUE().
  */
 Condition thread_ready_cond = INIT_COND_VAR;
 
@@ -69,144 +71,185 @@ Condition thread_ready_cond = INIT_COND_VAR;
  * Task.startTask() uses its current value.
  */
 nat rts_n_waiting_tasks = 0;
+
+static Condition *passTarget = NULL;
+static rtsBool passingCapability = rtsFalse;
 #endif
 
-/* -----------------------------------------------------------------------------
+#if defined(SMP)
+/*
+ * Free capability list. 
+ */
+Capability *free_capabilities;
+#endif
+
+#ifdef SMP
+#define UNUSED_IF_NOT_SMP
+#else
+#define UNUSED_IF_NOT_SMP STG_UNUSED
+#endif
+
+#if defined(RTS_USER_SIGNALS)
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking || signals_pending())
+#else
+#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking)
+#endif
+
+/* ----------------------------------------------------------------------------
    Initialisation
-   -------------------------------------------------------------------------- */
-static
-void
+   ------------------------------------------------------------------------- */
+
+static void
 initCapability( Capability *cap )
 {
     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
     cap->f.stgGCFun        = (F_)__stg_gc_fun;
 }
 
+/* -----------------------------------------------------------------------------
+ * Function: initCapabilities_(nat)
+ *
+ * Purpose:  upon startup, allocate and fill in table
+ *           holding 'n' Capabilities. Only for SMP, since
+ *           it is the only build that supports multiple
+ *           capabilities within the RTS.
+ * -------------------------------------------------------------------------- */
 #if defined(SMP)
-static void initCapabilities_(nat n);
-#endif
+static void
+initCapabilities_(nat n)
+{
+  nat i;
+  Capability *cap, *prev;
+  cap  = NULL;
+  prev = NULL;
+  for (i = 0; i < n; i++) {
+    cap = stgMallocBytes(sizeof(Capability), "initCapabilities");
+    initCapability(cap);
+    cap->link = prev;
+    prev = cap;
+  }
+  free_capabilities = cap;
+  rts_n_free_capabilities = n;
+  IF_DEBUG(scheduler,
+          sched_belch("allocated %d capabilities", rts_n_free_capabilities));
+}
+#endif /* SMP */
 
-/* 
+/* ---------------------------------------------------------------------------
  * Function:  initCapabilities()
  *
  * Purpose:   set up the Capability handling. For the SMP build,
  *            we keep a table of them, the size of which is
  *            controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes
  *
- * Pre-conditions: no locks assumed held.
- */
+ * ------------------------------------------------------------------------- */
 void
-initCapabilities()
+initCapabilities( void )
 {
-#if defined(RTS_SUPPORTS_THREADS)
-  initCondition(&returning_worker_cond);
-  initCondition(&thread_ready_cond);
-#endif
-
 #if defined(SMP)
   initCapabilities_(RtsFlags.ParFlags.nNodes);
 #else
   initCapability(&MainCapability);
-  rts_n_free_capabilities = 1;
 #endif
 
-  return;
+#if defined(RTS_SUPPORTS_THREADS)
+  initCondition(&returning_worker_cond);
+  initCondition(&thread_ready_cond);
+#endif
+
+  rts_n_free_capabilities = 1;
 }
 
-#if defined(SMP)
-/* Free capability list. */
-static Capability *free_capabilities; /* Available capabilities for running threads */
-static Capability *returning_capabilities; 
-       /* Capabilities being passed to returning worker threads */
-#endif
+/* ----------------------------------------------------------------------------
+   grabCapability( Capability** )
 
-/* -----------------------------------------------------------------------------
-   Acquiring capabilities
-   -------------------------------------------------------------------------- */
+   (only externally visible when !RTS_SUPPORTS_THREADS.  In the
+   threaded RTS, clients must use waitFor*Capability()).
+   ------------------------------------------------------------------------- */
 
-/*
- * Function:  grabCapability(Capability**)
- * 
- * Purpose:   the act of grabbing a capability is easy; just 
- *            remove one from the free capabilities list (which
- *            may just have one entry). In threaded builds, worker
- *            threads are prevented from doing so willy-nilly
- *            via the condition variables thread_ready_cond and
- *            returning_worker_cond.
- *
- */ 
-void grabCapability(Capability** cap)
+#if defined(RTS_SUPPORTS_THREADS)
+static
+#endif
+void
+grabCapability( Capability** cap )
 {
-#ifdef RTS_SUPPORTS_THREADS
+#if defined(SMP)
   ASSERT(rts_n_free_capabilities > 0);
-#endif
-#if !defined(SMP)
-  rts_n_free_capabilities = 0;
-  *cap = &MainCapability;
-  handleSignalsInThisThread();
-#else
   *cap = free_capabilities;
   free_capabilities = (*cap)->link;
   rts_n_free_capabilities--;
+#else
+# if defined(RTS_SUPPORTS_THREADS)
+  ASSERT(rts_n_free_capabilities == 1);
+  rts_n_free_capabilities = 0;
+# endif
+  *cap = &MainCapability;
+  handleSignalsInThisThread();
 #endif
-#ifdef RTS_SUPPORTS_THREADS
-  IF_DEBUG(scheduler,
-          fprintf(stderr,"worker thread (%p): got capability\n",
-                  osThreadId()));
+#if defined(RTS_SUPPORTS_THREADS)
+  IF_DEBUG(scheduler, sched_belch("worker: got capability"));
 #endif
 }
 
-/*
+/* ----------------------------------------------------------------------------
  * Function:  releaseCapability(Capability*)
  *
  * Purpose:   Letting go of a capability. Causes a
  *            'returning worker' thread or a 'waiting worker'
  *            to wake up, in that order.
- *
- */
-void releaseCapability(Capability* cap
+ * ------------------------------------------------------------------------- */
+
+void
+releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
+{
+    // Precondition: sched_mutex is held.
+#if defined(RTS_SUPPORTS_THREADS)
 #if !defined(SMP)
-                      STG_UNUSED
+    ASSERT(rts_n_free_capabilities == 0);
 #endif
-)
-{      // Precondition: sched_mutex must be held
-#if defined(RTS_SUPPORTS_THREADS)
-#ifndef SMP
-  ASSERT(rts_n_free_capabilities == 0);
+#if defined(SMP)
+    cap->link = free_capabilities;
+    free_capabilities = cap;
 #endif
-  /* Check to see whether a worker thread can be given
-     the go-ahead to return the result of an external call..*/
-  if (rts_n_waiting_workers > 0) {
-    /* Decrement the counter here to avoid livelock where the
-     * thread that is yielding its capability will repeatedly
-     * signal returning_worker_cond.
-     */
+    // Check to see whether a worker thread can be given
+    // the go-ahead to return the result of an external call..
+    if (rts_n_waiting_workers > 0) {
+       // Decrement the counter here to avoid livelock where the
+       // thread that is yielding its capability will repeatedly
+       // signal returning_worker_cond.
+
+       rts_n_waiting_workers--;
+       signalCondition(&returning_worker_cond);
+       IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker"));
+    } else if (passingCapability) {
+       if (passTarget == NULL) {
+           signalCondition(&thread_ready_cond);
+           startSchedulerTaskIfNecessary();
+       } else {
+           signalCondition(passTarget);
+       }
 #if defined(SMP)
-       // SMP variant untested
-    cap->link = returning_capabilities;
-    returning_capabilities = cap;
+       rts_n_free_capabilities++;
 #else
+       rts_n_free_capabilities = 1;
 #endif
-    rts_n_waiting_workers--;
-    signalCondition(&returning_worker_cond);
-  } else /*if ( !EMPTY_RUN_QUEUE() )*/ {
+       IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it"));
+
+    } else {
 #if defined(SMP)
-    cap->link = free_capabilities;
-    free_capabilities = cap;
-    rts_n_free_capabilities++;
+       rts_n_free_capabilities++;
 #else
-    rts_n_free_capabilities = 1;
-#endif
-    /* Signal that a capability is available */
-    signalCondition(&thread_ready_cond);
-  }
+       rts_n_free_capabilities = 1;
 #endif
-#ifdef RTS_SUPPORTS_THREADS
-  IF_DEBUG(scheduler,
-          fprintf(stderr,"worker thread (%p): released capability\n",
-                  osThreadId()));
+       // Signal that a capability is available
+       if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) {
+           signalCondition(&thread_ready_cond);
+       }
+       startSchedulerTaskIfNecessary();
+       IF_DEBUG(scheduler, sched_belch("worker: released capability"));
+    }
 #endif
- return;
+    return;
 }
 
 #if defined(RTS_SUPPORTS_THREADS)
@@ -215,8 +258,8 @@ void releaseCapability(Capability* cap
  * call, it needs to communicate the result back. This is done
  * as follows:
  *
- *  - in resumeThread(), the thread calls grabReturnCapability().
- *  - If no capabilities are readily available, grabReturnCapability()
+ *  - in resumeThread(), the thread calls waitForReturnCapability().
+ *  - If no capabilities are readily available, waitForReturnCapability()
  *    increments a counter rts_n_waiting_workers, and blocks
  *    waiting for the condition returning_worker_cond to become
  *    signalled.
@@ -229,8 +272,8 @@ void releaseCapability(Capability* cap
  *    to re-grab a capability and re-enter the Scheduler.
  */
 
-/*
- * Function: grabReturnCapability(Capability**)
+/* ----------------------------------------------------------------------------
+ * waitForReturnCapability( Mutext *pMutex, Capability** )
  *
  * Purpose:  when an OS thread returns from an external call,
  * it calls grabReturnCapability() (via Schedule.resumeThread())
@@ -238,77 +281,77 @@ void releaseCapability(Capability* cap
  * result of the external call back to the Haskell thread that
  * made it.
  *
- * Pre-condition:  pMutex is held.
- * Post-condition: pMutex is still held and a capability has
- *                 been assigned to the worker thread.
- */
+ * ------------------------------------------------------------------------- */
+
 void
-grabReturnCapability(Mutex* pMutex, Capability** pCap)
+waitForReturnCapability( Mutex* pMutex, Capability** pCap )
 {
-  IF_DEBUG(scheduler,
-          fprintf(stderr,"worker (%p): returning, waiting for lock.\n", osThreadId()));
-  IF_DEBUG(scheduler,
-          fprintf(stderr,"worker (%p): returning; workers waiting: %d\n",
-                  osThreadId(), rts_n_waiting_workers));
-  if ( noCapabilities() ) {
-    rts_n_waiting_workers++;
-    wakeBlockedWorkerThread();
-    context_switch = 1;        // make sure it's our turn soon
-    waitCondition(&returning_worker_cond, pMutex);
+    // Pre-condition: pMutex is held.
+
+    IF_DEBUG(scheduler, 
+            sched_belch("worker: returning; workers waiting: %d",
+                        rts_n_waiting_workers));
+
+    if ( noCapabilities() || passingCapability ) {
+       rts_n_waiting_workers++;
+       context_switch = 1;     // make sure it's our turn soon
+       waitCondition(&returning_worker_cond, pMutex);
 #if defined(SMP)
-    *pCap = returning_capabilities;
-    returning_capabilities = (*pCap)->link;
+       *pCap = free_capabilities;
+       free_capabilities = (*pCap)->link;
+       ASSERT(pCap != NULL);
 #else
-    *pCap = &MainCapability;
-    ASSERT(rts_n_free_capabilities == 0);
-    handleSignalsInThisThread();
+       *pCap = &MainCapability;
+       ASSERT(rts_n_free_capabilities == 0);
 #endif
-  } else {
-    grabCapability(pCap);
-  }
-  return;
+       handleSignalsInThisThread();
+    } else {
+       grabCapability(pCap);
+    }
+
+    // Post-condition: pMutex is held, pCap points to a capability
+    // which is now held by the current thread.
+    return;
 }
 
 
-/* -----------------------------------------------------------------------------
-   Yielding/waiting for capabilities
-   -------------------------------------------------------------------------- */
+/* ----------------------------------------------------------------------------
+ * yieldCapability( Mutex* pMutex, Capability** pCap )
+ * ------------------------------------------------------------------------- */
 
-/*
- * Function: yieldToReturningWorker(Mutex*,Capability*,Condition*)
- *
- * Purpose:  when, upon entry to the Scheduler, an OS worker thread
- *           spots that one or more threads are blocked waiting for
- *           permission to return back their result, it gives up
- *           its Capability.
- *           Immediately afterwards, it tries to reaquire the Capabilty
- *           using waitForWorkCapability.
- *
- *
- * Pre-condition:  pMutex is assumed held and the thread possesses
- *                 a Capability.
- * Post-condition: pMutex is held and the thread possesses
- *                 a Capability.
- */
 void
-yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
+yieldCapability( Capability** pCap )
 {
-  if ( rts_n_waiting_workers > 0 ) {
-    IF_DEBUG(scheduler,
-            fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId()));
-    releaseCapability(*pCap);
-        /* And wait for work */
-    waitForWorkCapability(pMutex, pCap, pThreadCond);
-    IF_DEBUG(scheduler,
-            fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n",
-               osThreadId()));
-  }
-  return;
+    // Pre-condition:  pMutex is assumed held, the current thread
+    // holds the capability pointed to by pCap.
+
+    if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) {
+       IF_DEBUG(scheduler, 
+                if (rts_n_waiting_workers > 0) {
+                    sched_belch("worker: giving up capability (returning wkr)");
+                } else if (passingCapability) {
+                    sched_belch("worker: giving up capability (passing capability)");
+                } else {
+                    sched_belch("worker: giving up capability (no threads to run)");
+                }
+           );
+       releaseCapability(*pCap);
+       *pCap = NULL;
+    }
+
+    // Post-condition:  either:
+    //
+    //  1. *pCap is NULL, in which case the current thread does not
+    //     hold a capability now, or
+    //  2. *pCap is not NULL, in which case the current thread still
+    //     holds the capability.
+    //
+    return;
 }
 
 
-/*
- * Function: waitForWorkCapability(Mutex*, Capability**, Condition*)
+/* ----------------------------------------------------------------------------
+ * waitForCapability( Mutex*, Capability**, Condition* )
  *
  * Purpose:  wait for a Capability to become available. In
  *           the process of doing so, updates the number
@@ -318,100 +361,99 @@ yieldToReturningWorker(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
  *           call is made.
  *           If pThreadCond is not NULL, a capability can be specifically
  *           passed to this thread using passCapability.
- *
- * Pre-condition: pMutex is held.
- * Post-condition: pMutex is held and *pCap is held by the current thread
- */
+ * ------------------------------------------------------------------------- */
  
-static Condition *passTarget = NULL;
-void 
-waitForWorkCapability(Mutex* pMutex, Capability** pCap, Condition* pThreadCond)
+void
+waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
 {
-#ifdef SMP
-  #error SMP version not implemented
-#endif
-  IF_DEBUG(scheduler,
-          fprintf(stderr,"worker thread (%p): wait for cap (cond: %p)\n",
-             osThreadId(),pThreadCond));
-  while ( noCapabilities() || (pThreadCond && passTarget != pThreadCond)
-      || (!pThreadCond && passTarget)) {
-    if(pThreadCond)
-    {
-      waitCondition(pThreadCond, pMutex);
-      IF_DEBUG(scheduler,
-              fprintf(stderr,"worker thread (%p): get passed capability\n",
-                 osThreadId()));
-    }
-    else
-    {
-      rts_n_waiting_tasks++;
-      waitCondition(&thread_ready_cond, pMutex);
-      rts_n_waiting_tasks--;
-      IF_DEBUG(scheduler,
-              fprintf(stderr,"worker thread (%p): get normal capability\n",
-                 osThreadId()));
+    // Pre-condition: pMutex is held.
+
+    while ( noCapabilities() ||
+           (passingCapability && passTarget != pThreadCond) ||
+           !ANY_WORK_TO_DO()) {
+       IF_DEBUG(scheduler,
+                sched_belch("worker: wait for capability (cond: %p)",
+                            pThreadCond));
+
+       if (pThreadCond != NULL) {
+           waitCondition(pThreadCond, pMutex);
+           IF_DEBUG(scheduler, sched_belch("worker: get passed capability"));
+       } else {
+           rts_n_waiting_tasks++;
+           waitCondition(&thread_ready_cond, pMutex);
+           rts_n_waiting_tasks--;
+           IF_DEBUG(scheduler, sched_belch("worker: get normal capability"));
+       }
     }
-  }
-  passTarget = NULL;
-  grabCapability(pCap);
-  return;
+    passingCapability = rtsFalse;
+    grabCapability(pCap);
+
+    // Post-condition: pMutex is held and *pCap is held by the current thread
+    return;
 }
 
-/*
- * Function: passCapability(Mutex*, Capability*, Condition*)
- *
- * Purpose:  Let go of the capability and make sure the thread associated
- *          with the Condition pTargetThreadCond gets it next.
- *
- * Pre-condition: pMutex is held and cap is held by the current thread
- * Post-condition: pMutex is held; cap will be grabbed by the "target"
- *                thread when pMutex is released.
- */
+/* ----------------------------------------------------------------------------
+   passCapability, passCapabilityToWorker
+   ------------------------------------------------------------------------- */
 
 void
-passCapability(Mutex* pMutex, Capability* cap, Condition *pTargetThreadCond)
+passCapability( Condition *pTargetThreadCond )
 {
-#ifdef SMP
-  #error SMP version not implemented
-#endif
-    rts_n_free_capabilities = 1;
-    signalCondition(pTargetThreadCond);
+    // Pre-condition: pMutex is held and cap is held by the current thread
+
     passTarget = pTargetThreadCond;
-    IF_DEBUG(scheduler,
-            fprintf(stderr,"worker thread (%p): passCapability\n",
-               osThreadId()));
+    passingCapability = rtsTrue;
+    IF_DEBUG(scheduler, sched_belch("worker: passCapability"));
+
+    // Post-condition: pMutex is held; cap is still held, but will be
+    //                 passed to the target thread when next released.
 }
 
+void
+passCapabilityToWorker( void )
+{
+    // Pre-condition: pMutex is held and cap is held by the current thread
+
+    passTarget = NULL;
+    passingCapability = rtsTrue;
+    IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker"));
+
+    // Post-condition: pMutex is held; cap is still held, but will be
+    //                 passed to a worker thread when next released.
+}
 
 #endif /* RTS_SUPPORTS_THREADS */
 
-#if defined(SMP)
-/*
- * Function: initCapabilities_(nat)
- *
- * Purpose:  upon startup, allocate and fill in table
- *           holding 'n' Capabilities. Only for SMP, since
- *           it is the only build that supports multiple
- *           capabilities within the RTS.
- */
-static void
-initCapabilities_(nat n)
+/* ----------------------------------------------------------------------------
+   threadRunnable()
+
+   Signals that a thread has been placed on the run queue, so a worker
+   might need to be woken up to run it.
+
+   ToDo: should check whether the thread at the front of the queue is
+   bound, and if so wake up the appropriate worker.
+   -------------------------------------------------------------------------- */
+void
+threadRunnable ( void )
 {
-  nat i;
-  Capability *cap, *prev;
-  cap  = NULL;
-  prev = NULL;
-  for (i = 0; i < n; i++) {
-    cap = stgMallocBytes(sizeof(Capability), "initCapabilities");
-    initCapability(cap);
-    cap->link = prev;
-    prev = cap;
-  }
-  free_capabilities = cap;
-  rts_n_free_capabilities = n;
-  returning_capabilities = NULL;
-  IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities););
+#if defined(RTS_SUPPORTS_THREADS)
+    if ( !noCapabilities() && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) {
+       signalCondition(&thread_ready_cond);
+    }
+    startSchedulerTaskIfNecessary();
+#endif
 }
-#endif /* SMP */
 
+
+/* ----------------------------------------------------------------------------
+   prodWorker()
+
+   Wake up... time to die.
+   -------------------------------------------------------------------------- */
+void
+prodWorker ( void )
+{
+#if defined(RTS_SUPPORTS_THREADS)
+    signalCondition(&thread_ready_cond);
+#endif
+}