[project @ 2002-02-14 07:52:05 by sof]
authorsof <unknown>
Thu, 14 Feb 2002 07:52:05 +0000 (07:52 +0000)
committersof <unknown>
Thu, 14 Feb 2002 07:52:05 +0000 (07:52 +0000)
Restructured / tidied a bit:

* Capability.grabReturnCapability() is now called by resumeThread().
  It takes care of waiting on the (Capability.c-local) condition
  variable, 'returning_worker_cond' (moved here from Schedule.c)

* If a worker notices upon entry to the Scheduler that there are
  worker threads waiting to deposit results of external calls,
  it gives up its capability by calling Capability.yieldCapability().

* Added Scheduler.waitForWork(), which takes care of blocking
  on 'thread_ready_cond' (+ 'rts_n_waiting_tasks' book-keeping).

Note: changes haven't been fully tested, due to HEAD instability.

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Schedule.c

index d2a2ef8..77096a9 100644 (file)
@@ -1,20 +1,18 @@
 /* ---------------------------------------------------------------------------
  *
- * (c) The GHC Team, 2001
+ * (c) The GHC Team, 2002
  *
  * Capabilities
  *
- * The notion of a capability is used when operating in multi-threaded
- * environments (which the SMP and Threads builds of the RTS do), to
- * hold all the state an OS thread/task needs to run Haskell code:
+ * A Capability represent the token required to execute STG code,
+ * and all the state an OS thread/task needs to run Haskell code:
  * its STG registers, a pointer to its  TSO, a nursery etc. During
- * STG execution, a pointer to the capabilitity is kept in a 
+ * STG execution, a pointer to the capabilitity is kept in a
  * register (BaseReg).
  *
  * Only in an SMP build will there be multiple capabilities, the threaded
  * RTS and other non-threaded builds, there is one global capability,
  * namely MainRegTable.
- *
  * 
  * --------------------------------------------------------------------------*/
 #include "PosixSource.h"
@@ -29,6 +27,27 @@ Capability MainCapability;     /* for non-SMP, we have one global capability */
 
 nat rts_n_free_capabilities;
 
+#if defined(RTS_SUPPORTS_THREADS)
+/* returning_worker_cond: when a worker thread returns from executing an
+ * external call, it needs to wait for an RTS Capability before passing
+ * on the result of the call to the Haskell thread that made it.
+ * 
+ * returning_worker_cond is signalled in Capability.releaseCapability().
+ *
+ */
+Condition returning_worker_cond = INIT_COND_VAR;
+
+/*
+ * To avoid starvation of threads blocked on worker_thread_cond,
+ * the task(s) that enter the Scheduler will check to see whether
+ * there are one or more worker threads blocked waiting on
+ * returning_worker_cond.
+ *
+ * Locks needed: sched_mutex
+ */
+nat rts_n_waiting_workers = 0;
+#endif
+
 static
 void
 initCapability( Capability *cap )
@@ -48,6 +67,10 @@ static void initCapabilities_(nat n);
 void
 initCapabilities()
 {
+#if defined(RTS_SUPPORTS_THREADS)
+  initCondition(returning_worker_cond);
+#endif
+
 #if defined(SMP)
   initCapabilities_(RtsFlags.ParFlags.nNodes);
 #else
@@ -78,9 +101,12 @@ void grabCapability(Capability** cap)
 }
 
 /*
- * Letting go of a capability
+ * Function:  releaseCapability(Capability*)
+ *
+ * Purpose:   Letting go of a capability.
  *
- * Locks required: sched_mutex
+ * Pre-condition: sched_mutex is assumed held by current thread.
+ * Post-condition:
  */
 void releaseCapability(Capability* cap
 #if !defined(SMP)
@@ -100,9 +126,11 @@ void releaseCapability(Capability* cap
   /* Check to see whether a worker thread can be given
      the go-ahead to return the result of an external call..*/
   if (rts_n_waiting_workers > 0) {
-    /* The worker is responsible for grabbing the capability and
-     * decrementing the rts_n_returning_workers count
+    /* Decrement the counter here to avoid livelock where the
+     * thread that is yielding its capability will repeatedly
+     * signal returning_worker_cond.
      */
+    rts_n_waiting_workers--;
     signalCondition(&returning_worker_cond);
   } else if ( !EMPTY_RUN_QUEUE() ) {
     /* Signal that work is available */
@@ -112,8 +140,100 @@ void releaseCapability(Capability* cap
   return;
 }
 
+#if defined(RTS_SUPPORTS_THREADS)
+/*
+ * When a native thread has completed the execution of an external
+ * call, it needs to communicate the result back. This is done
+ * as follows:
+ *
+ *  - in resumeThread(), the thread calls grabReturnCapability().
+ *  - If no capabilities are readily available, grabReturnCapability()
+ *    increments a counter rts_n_waiting_workers, and blocks
+ *    waiting for the condition returning_worker_cond to become
+ *    signalled.
+ *  - upon entry to the Scheduler, a worker thread checks the
+ *    value of rts_n_waiting_workers. If > 0, the worker thread
+ *    will yield its capability to let a returning worker thread
+ *    proceed with returning its result -- this is done via
+ *    yieldCapability().
+ *  - the worker thread that yielded its capability then tries
+ *    to re-grab a capability and re-enter the Scheduler.
+ */
+
+/*
+ * Function: grabReturnCapability(Capability**)
+ *
+ * Purpose:  when an OS thread returns from an external call,
+ * it calls grabReturningCapability() (via Schedule.resumeThread())
+ * to wait for permissions to enter the RTS & communicate the
+ * result of the ext. call back to the Haskell thread that
+ * made it.
+ *
+ * Pre-condition:  sched_mutex isn't held.
+ * Post-condition: sched_mutex is held and a capability has
+ *                 been assigned to the worker thread.
+ */
+void
+grabReturnCapability(Capability** pCap)
+{
+  IF_DEBUG(scheduler,
+          sched_belch("thread %d: returning, waiting for sched. lock.\n", osThreadId()));
+  ACQUIRE_LOCK(&sched_mutex);
+  rts_n_waiting_workers++;
+  IF_DEBUG(scheduler,
+          sched_belch("worker (%d,%d): returning; workers waiting: %d\n",
+                      tok, osThreadId(), rts_n_waiting_workers));
+  while ( noCapabilities() ) {
+    waitCondition(&returning_worker_cond, &sched_mutex);
+  }
+  
+  grabCapability(pCap);
+  return;
+}
+
+/*
+ * Function: yieldCapability(Capability**)
+ *
+ * Purpose:  when, upon entry to the Scheduler, an OS worker thread
+ *           spots that one or more threads are blocked waiting for
+ *           permission to return back their result, it gives up
+ *           its Capability. 
+ *
+ * Pre-condition:  sched_mutex is held and the thread possesses
+ *                 a Capability.
+ * Post-condition: sched_mutex isn't held and the Capability has
+ *                 been given back.
+ */
+void
+yieldCapability(Capability* cap)
+{
+    IF_DEBUG(scheduler,
+            sched_belch("worker thread (%d): giving up RTS token\n", osThreadId()));
+    releaseCapability(cap);
+    RELEASE_LOCK(&sched_mutex);
+    yieldThread();
+    /* At this point, sched_mutex has been given up & we've 
+     * forced a thread context switch. Guaranteed to be
+     * enough for the signalled worker thread to race
+     * ahead?
+     */
+    return;
+}
+
+#endif /* RTS_SUPPORTS_THREADS */
+
 #if defined(SMP)
-/* Allocate 'n' capabilities */
+/*
+ * Function: initCapabilities_(nat)
+ *
+ * Purpose:  upon startup, allocate and fill in table
+ *           holding 'n' Capabilities. Only for SMP, since
+ *           it is the only build that supports multiple
+ *           capabilities within the RTS.
+ * 
+ * Pre-condition: sched_mutex is held.
+ *
+ */
 static void
 initCapabilities_(nat n)
 {
index b878507..6aef3db 100644 (file)
 extern Capability MainCapability;
 #endif
 
-
 extern void initCapabilities(void);
-extern void grabCapability(Capability** cap);
+extern void grabCapability(Capability** pCap);
 extern void releaseCapability(Capability* cap);
 
 #if defined(RTS_SUPPORTS_THREADS)
 /* total number of available capabilities */
 extern nat rts_n_free_capabilities;  
+extern nat rts_n_waiting_workers;
+
+extern void grabReturnCapability(Capability** pCap);
+extern void yieldCapability(Capability* cap);
 
 static inline nat getFreeCapabilities (void)
 {
index d73559e..fef2795 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $
+ * $Id: Schedule.c,v 1.123 2002/02/14 07:52:05 sof Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -267,21 +267,6 @@ Mutex     sched_mutex       = INIT_MUTEX_VAR;
 Mutex     term_mutex        = INIT_MUTEX_VAR;
 
 
-/*
- * When a native thread has completed executing an external
- * call, it needs to communicate the result back to the
- * (Haskell) thread that made the call. Do this as follows:
- *
- *  - in resumeThread(), the thread increments the counter
- *    rts_n_returning_workers, and then blocks waiting on the
- *    condition returning_worker_cond.
- *  - upon entry to the scheduler, a worker/task checks 
- *    rts_n_returning_workers. If it is > 0, worker threads
- *    are waiting to return, so it gives up its capability
- *    to let a worker deposit its result.
- *  - the worker thread that gave up its capability then tries
- *    to re-grab a capability and re-enter the Scheduler.
- */
 
 
 /* thread_ready_cond: when signalled, a thread has become runnable for a
@@ -305,28 +290,13 @@ Condition thread_ready_cond = INIT_COND_VAR;
  * to create a new task when making an external call, keep track of
  * the number of tasks currently blocked waiting on thread_ready_cond.
  * (if > 0 => no need for a new task, just unblock an existing one).
- */
-nat rts_n_waiting_tasks = 0;
-
-/* returning_worker_cond: when a worker thread returns from executing an
- * external call, it needs to wait for an RTS Capability before passing
- * on the result of the call to the Haskell thread that made it.
- * 
- * returning_worker_cond is signalled in Capability.releaseCapability().
- *
- */
-Condition returning_worker_cond = INIT_COND_VAR;
-
-/*
- * To avoid starvation of threads blocked on worker_thread_cond,
- * the task(s) that enter the Scheduler will check to see whether
- * there are one or more worker threads blocked waiting on
- * returning_worker_cond.
  *
- * Locks needed: sched_mutex
+ * waitForWork() takes care of keeping it up-to-date; Task.startTask()
+ * uses its current value.
  */
-nat rts_n_waiting_workers = 0;
+nat rts_n_waiting_tasks = 0;
 
+static void waitForWork(void);
 
 # if defined(SMP)
 static Condition gc_pending_cond = INIT_COND_VAR;
@@ -456,20 +426,15 @@ schedule_start:
      * deposit the result of an external call. So,
      * be nice and hand over our capability.
      */
-    IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers));
-    releaseCapability(cap);
-    RELEASE_LOCK(&sched_mutex);
-
-    yieldThread();
+    yieldCapability(cap);
+    /* Lost our sched_mutex lock, try to re-enter the scheduler. */
     goto schedule_start;
   }
 #endif
 
 #if defined(RTS_SUPPORTS_THREADS)
   while ( noCapabilities() ) {
-    rts_n_waiting_tasks++;
-    waitCondition(&thread_ready_cond, &sched_mutex);
-    rts_n_waiting_tasks--;
+    waitForWork();
   }
 #endif
 
@@ -731,7 +696,6 @@ schedule_start:
            if ( EMPTY_RUN_QUEUE() ) {
              IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
              shutdownHaskellAndExit(0);
-           
            }
 #endif
            ASSERT( !EMPTY_RUN_QUEUE() );
@@ -761,9 +725,7 @@ schedule_start:
       releaseCapability(cap);
       while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
        IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
-       rts_n_waiting_tasks++;
-       waitCondition( &thread_ready_cond, &sched_mutex );
-       rts_n_waiting_tasks--;
+       waitForWork();
        IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
       }
     }
@@ -1566,21 +1528,10 @@ resumeThread( StgInt tok )
   Capability *cap;
 
 #if defined(RTS_SUPPORTS_THREADS)
-  IF_DEBUG(scheduler, sched_belch("worker %d: returning, waiting for sched. lock.\n", tok));
-  ACQUIRE_LOCK(&sched_mutex);
-  rts_n_waiting_workers++;
-  IF_DEBUG(scheduler, sched_belch("worker %d: returning; workers waiting: %d.\n", tok, rts_n_waiting_workers));
-
-  /*
-   * Wait for the go ahead
-   */
-  IF_DEBUG(scheduler, sched_belch("worker %d: waiting for capability %d...\n", tok, rts_n_free_capabilities));
-  while ( noCapabilities() ) {
-    waitCondition(&returning_worker_cond, &sched_mutex);
-  }
-  rts_n_waiting_workers--;
-
-  IF_DEBUG(scheduler, sched_belch("worker %d: acquired capability...\n", tok));
+  /* Wait for permission to re-enter the RTS with the result.. */
+  grabReturnCapability(&cap);
+#else
+  grabCapability(&cap);
 #endif
 
   /* Remove the thread off of the suspended list */
@@ -1597,32 +1548,28 @@ resumeThread( StgInt tok )
     barf("resumeThread: thread not found");
   }
   tso->link = END_TSO_QUEUE;
-
-#if defined(RTS_SUPPORTS_THREADS)
-  /* Is it clever to block here with the TSO off the list,
-   * but not hooked up to a capability?
-   */
-  while ( noCapabilities() ) {
-    IF_DEBUG(scheduler, sched_belch("waiting to resume"));
-    rts_n_waiting_tasks++;
-    waitCondition(&thread_ready_cond, &sched_mutex);
-    rts_n_waiting_tasks--;
-    IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
-  }
-#endif
-
-  grabCapability(&cap);
-  RELEASE_LOCK(&sched_mutex);
-
   /* Reset blocking status */
   tso->why_blocked  = NotBlocked;
 
-  cap->r.rCurrentTSO = tso;
+  RELEASE_LOCK(&sched_mutex);
 
+  cap->r.rCurrentTSO = tso;
   return &cap->r;
 }
 
 
+#if defined(RTS_SUPPORTS_THREADS)
+static void
+waitForWork()
+{
+  rts_n_waiting_tasks++;
+  waitCondition(&thread_ready_cond, &sched_mutex);
+  rts_n_waiting_tasks--;
+  return;
+}
+#endif
+
+
 /* ---------------------------------------------------------------------------
  * Static functions
  * ------------------------------------------------------------------------ */
@@ -2024,7 +1971,6 @@ initScheduler(void)
   initMutex(&term_mutex);
 
   initCondition(&thread_ready_cond);
-  initCondition(&returning_worker_cond);
 #endif
   
 #if defined(SMP)