[project @ 2002-02-15 07:50:36 by sof]
authorsof <unknown>
Fri, 15 Feb 2002 07:50:37 +0000 (07:50 +0000)
committersof <unknown>
Fri, 15 Feb 2002 07:50:37 +0000 (07:50 +0000)
Tighten up the Scheduler synchronisation story some more:

- moved thread_ready_cond + the counter rts_n_waiting_tasks
  to Capability.c, leaving only sched_mutex as a synchro
  variable in Scheduler (the less stuff that inhabit
  Schedule.c, the better, methinks.)
- upon entry to the Scheduler, a worker thread will now call
  Capability.yieldToReturningWorker() to check whether it
  needs to give up its capability.
- Worker threads that are either idle or lack a capability,
  will now call Capability.waitForWorkCapability() and block.

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Schedule.c
ghc/rts/Schedule.h

index 6f4a945..2dec782 100644 (file)
  * --------------------------------------------------------------------------*/
 #include "PosixSource.h"
 #include "Rts.h"
-#include "Schedule.h"
 #include "RtsUtils.h"
+#include "OSThreads.h"
 #include "Capability.h"
+#include "Schedule.h"  /* to get at EMPTY_RUN_QUEUE() */
 
 #if !defined(SMP)
 Capability MainCapability;     /* for non-SMP, we have one global capability */
@@ -42,12 +43,40 @@ Condition returning_worker_cond = INIT_COND_VAR;
  * the task(s) that enter the Scheduler will check to see whether
  * there are one or more worker threads blocked waiting on
  * returning_worker_cond.
+ */
+static nat rts_n_waiting_workers = 0;
+
+/* thread_ready_cond: when signalled, a thread has become runnable for a
+ * task to execute.
+ *
+ * In the non-SMP case, it also implies that the thread that is woken up has
+ * exclusive access to the RTS and all its data structures (that are not
+ * locked by the Scheduler's mutex).
+ *
+ * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
  *
- * Locks needed: sched_mutex
  */
-nat rts_n_waiting_workers = 0;
+Condition thread_ready_cond = INIT_COND_VAR;
+#if 0
+/* For documentation purposes only */
+#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
 #endif
 
+/*
+ * To be able to make an informed decision about whether or not 
+ * to create a new task when making an external call, keep track of
+ * the number of tasks currently blocked waiting on thread_ready_cond.
+ * (if > 0 => no need for a new task, just unblock an existing one).
+ *
+ * waitForWork() takes care of keeping it up-to-date; Task.startTask()
+ * uses its current value.
+ */
+nat rts_n_waiting_tasks = 0;
+#endif
+
+/* -----------------------------------------------------------------------------
+   Initialisation
+   -------------------------------------------------------------------------- */
 static
 void
 initCapability( Capability *cap )
@@ -76,6 +105,7 @@ initCapabilities()
 {
 #if defined(RTS_SUPPORTS_THREADS)
   initCondition(&returning_worker_cond);
+  initCondition(&thread_ready_cond);
 #endif
 
 #if defined(SMP)
@@ -88,13 +118,15 @@ initCapabilities()
   return;
 }
 
-/* Free capability list.
- * Locks required: sched_mutex.
- */
 #if defined(SMP)
+/* Free capability list. */
 static Capability *free_capabilities; /* Available capabilities for running threads */
 #endif
 
+/* -----------------------------------------------------------------------------
+   Acquiring capabilities
+   -------------------------------------------------------------------------- */
+
 /*
  * Function:  grabCapability(Capability**)
  * 
@@ -102,12 +134,9 @@ static Capability *free_capabilities; /* Available capabilities for running thre
  *            remove one from the free capabilities list (which
  *            may just have one entry). In threaded builds, worker
  *            threads are prevented from doing so willy-nilly
- *            through the use of the sched_mutex lock along with
- *            condition variables thread_ready_cond and
+ *            via the condition variables thread_ready_cond and
  *            returning_worker_cond.
  *
- * Pre-condition:  sched_mutex is held (in threaded builds only).
- *
  */ 
 void grabCapability(Capability** cap)
 {
@@ -124,10 +153,10 @@ void grabCapability(Capability** cap)
 /*
  * Function:  releaseCapability(Capability*)
  *
- * Purpose:   Letting go of a capability.
+ * Purpose:   Letting go of a capability. Causes a
+ *            'returning worker' thread or a 'waiting worker'
+ *            to wake up, in that order.
  *
- * Pre-condition: sched_mutex is assumed held by current thread.
- * Post-condition:
  */
 void releaseCapability(Capability* cap
 #if !defined(SMP)
@@ -176,7 +205,7 @@ void releaseCapability(Capability* cap
  *    value of rts_n_waiting_workers. If > 0, the worker thread
  *    will yield its capability to let a returning worker thread
  *    proceed with returning its result -- this is done via
- *    yieldCapability().
+ *    yieldToReturningWorker().
  *  - the worker thread that yielded its capability then tries
  *    to re-grab a capability and re-enter the Scheduler.
  */
@@ -190,57 +219,91 @@ void releaseCapability(Capability* cap
  * result of the ext. call back to the Haskell thread that
  * made it.
  *
- * Pre-condition:  sched_mutex isn't held.
- * Post-condition: sched_mutex is held and a capability has
+ * Pre-condition:  pMutex isn't held.
+ * Post-condition: pMutex is held and a capability has
  *                 been assigned to the worker thread.
  */
 void
-grabReturnCapability(Capability** pCap)
+grabReturnCapability(Mutex* pMutex, Capability** pCap)
 {
   IF_DEBUG(scheduler,
-          fprintf(stderr,"worker (%ld): returning, waiting for sched. lock.\n", osThreadId()));
-  ACQUIRE_LOCK(&sched_mutex);
+          fprintf(stderr,"worker (%ld): returning, waiting for lock.\n", osThreadId()));
+  ACQUIRE_LOCK(pMutex);
   rts_n_waiting_workers++;
   IF_DEBUG(scheduler,
           fprintf(stderr,"worker (%ld): returning; workers waiting: %d\n",
                   osThreadId(), rts_n_waiting_workers));
   while ( noCapabilities() ) {
-    waitCondition(&returning_worker_cond, &sched_mutex);
+    waitCondition(&returning_worker_cond, pMutex);
   }
   
   grabCapability(pCap);
   return;
 }
 
+
+/* -----------------------------------------------------------------------------
+   Yielding/waiting for capabilities
+   -------------------------------------------------------------------------- */
+
 /*
- * Function: yieldCapability(Capability**)
+ * Function: yieldToReturningWorker(Mutex*,Capability*)
  *
  * Purpose:  when, upon entry to the Scheduler, an OS worker thread
  *           spots that one or more threads are blocked waiting for
  *           permission to return back their result, it gives up
  *           its Capability. 
  *
- * Pre-condition:  sched_mutex is held and the thread possesses
+ * Pre-condition:  pMutex is assumed held and the thread possesses
  *                 a Capability.
- * Post-condition: sched_mutex isn't held and the Capability has
+ * Post-condition: pMutex isn't held and the Capability has
  *                 been given back.
  */
 void
-yieldCapability(Capability* cap)
+yieldToReturningWorker(Mutex* pMutex, Capability* cap)
 {
+  if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
     IF_DEBUG(scheduler,
             fprintf(stderr,"worker thread (%ld): giving up RTS token\n", osThreadId()));
     releaseCapability(cap);
-    RELEASE_LOCK(&sched_mutex);
+    RELEASE_LOCK(pMutex);
     yieldThread();
-    /* At this point, sched_mutex has been given up & we've 
+    /* At this point, pMutex has been given up & we've 
      * forced a thread context switch. Guaranteed to be
      * enough for the signalled worker thread to race
-     * ahead?
+     * ahead of us?
      */
-    return;
+
+    /* Re-grab the mutex */
+    ACQUIRE_LOCK(pMutex);
+  }
+  return;
 }
 
+
+/*
+ * Function: waitForWorkCapability(Mutex*, Capability**, rtsBool)
+ *
+ * Purpose:  wait for a Capability to become available. In
+ *           the process of doing so, updates the number
+ *           of tasks currently blocked waiting for a capability/more
+ *           work. That counter is used when deciding whether or
+ *           not to create a new worker thread when an external
+ *           call is made.
+ *
+ * Pre-condition: pMutex is held.
+ */
+void 
+waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable)
+{
+  while ( noCapabilities() || (runnable && EMPTY_RUN_QUEUE()) ) {
+    rts_n_waiting_tasks++;
+    waitCondition(&thread_ready_cond, pMutex);
+    rts_n_waiting_tasks--;
+  }
+  grabCapability(pCap);
+  return;
+}
 #endif /* RTS_SUPPORTS_THREADS */
 
 #if defined(SMP)
@@ -251,9 +314,6 @@ yieldCapability(Capability* cap)
  *           holding 'n' Capabilities. Only for SMP, since
  *           it is the only build that supports multiple
  *           capabilities within the RTS.
- * 
- * Pre-condition: sched_mutex is held.
- *
  */
 static void
 initCapabilities_(nat n)
index 6aef3db..71359b6 100644 (file)
@@ -37,8 +37,9 @@ extern void releaseCapability(Capability* cap);
 extern nat rts_n_free_capabilities;  
 extern nat rts_n_waiting_workers;
 
-extern void grabReturnCapability(Capability** pCap);
-extern void yieldCapability(Capability* cap);
+extern void grabReturnCapability(Mutex* pMutex, Capability** pCap);
+extern void yieldToReturningWorker(Mutex* pMutex, Capability* cap);
+extern void waitForWorkCapability(Mutex* pMutex, Capability** pCap, rtsBool runnable);
 
 static inline nat getFreeCapabilities (void)
 {
index fef2795..4b2425f 100644 (file)
@@ -1,5 +1,5 @@
 /* ---------------------------------------------------------------------------
- * $Id: Schedule.c,v 1.123 2002/02/14 07:52:05 sof Exp $
+ * $Id: Schedule.c,v 1.124 2002/02/15 07:50:36 sof Exp $
  *
  * (c) The GHC Team, 1998-2000
  *
@@ -266,38 +266,6 @@ static void sched_belch(char *s, ...);
 Mutex     sched_mutex       = INIT_MUTEX_VAR;
 Mutex     term_mutex        = INIT_MUTEX_VAR;
 
-
-
-
-/* thread_ready_cond: when signalled, a thread has become runnable for a
- * task to execute.
- *
- * In the non-SMP case, it also implies that the thread that is woken up has
- * exclusive access to the RTS and all its data structures (that are not
- * under sched_mutex's control).
- *
- * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
- *
- */
-Condition thread_ready_cond = INIT_COND_VAR;
-#if 0
-/* For documentation purposes only */
-#define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
-#endif
-
-/*
- * To be able to make an informed decision about whether or not 
- * to create a new task when making an external call, keep track of
- * the number of tasks currently blocked waiting on thread_ready_cond.
- * (if > 0 => no need for a new task, just unblock an existing one).
- *
- * waitForWork() takes care of keeping it up-to-date; Task.startTask()
- * uses its current value.
- */
-nat rts_n_waiting_tasks = 0;
-
-static void waitForWork(void);
-
 # if defined(SMP)
 static Condition gc_pending_cond = INIT_COND_VAR;
 nat await_death;
@@ -410,36 +378,19 @@ schedule( void )
 # endif
 #endif
   rtsBool was_interrupted = rtsFalse;
-
-#if defined(RTS_SUPPORTS_THREADS)
-schedule_start:
-#endif
   
-#if defined(RTS_SUPPORTS_THREADS)
   ACQUIRE_LOCK(&sched_mutex);
-#endif
  
 #if defined(RTS_SUPPORTS_THREADS)
-  /* ToDo: consider SMP support */
-  if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
-    /* (At least) one native thread is waiting to
-     * deposit the result of an external call. So,
-     * be nice and hand over our capability.
-     */
-    yieldCapability(cap);
-    /* Lost our sched_mutex lock, try to re-enter the scheduler. */
-    goto schedule_start;
-  }
-#endif
+  /* Check to see whether there are any worker threads
+     waiting to deposit external call results. If so,
+     yield our capability */
+  yieldToReturningWorker(&sched_mutex, cap);
 
-#if defined(RTS_SUPPORTS_THREADS)
-  while ( noCapabilities() ) {
-    waitForWork();
-  }
+  waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
 #endif
 
 #if defined(GRAN)
-
   /* set up first event to get things going */
   /* ToDo: assign costs for system setup and init MainTSO ! */
   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
@@ -723,16 +674,19 @@ schedule_start:
     if ( EMPTY_RUN_QUEUE() ) {
       /* Give up our capability */
       releaseCapability(cap);
-      while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
-       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
-       waitForWork();
-       IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
+      IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
+      waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
+      IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
+#if 0
+      while ( EMPTY_RUN_QUEUE() ) {
+       waitForWorkCapability(&sched_mutex, &cap);
+       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
       }
+#endif
     }
 #endif
 
 #if defined(GRAN)
-
     if (RtsFlags.GranFlags.Light)
       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
 
@@ -978,7 +932,7 @@ schedule_start:
             belch("--=^ %d threads, %d sparks on [%#x]", 
                   run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
 
-#if 1
+# if 1
     if (0 && RtsFlags.ParFlags.ParStats.Full && 
        t && LastTSO && t->id != LastTSO->id && 
        LastTSO->why_blocked == NotBlocked && 
@@ -1003,7 +957,7 @@ schedule_start:
       emitSchedule = rtsFalse;
     }
      
-#endif
+# endif
 #else /* !GRAN && !PAR */
   
     /* grab a thread from the run queue */
@@ -1377,12 +1331,11 @@ schedule_start:
     }
 #endif
 
+    if (ready_to_gc 
 #ifdef SMP
-    if (ready_to_gc && allFreeCapabilities() )
-#else
-    if (ready_to_gc) 
+       && allFreeCapabilities() 
 #endif
-      {
+       ) {
       /* everybody back, start the GC.
        * Could do it in this thread, or signal a condition var
        * to do it in another thread.  Either way, we need to
@@ -1512,7 +1465,7 @@ suspendThread( StgRegTable *reg )
      for one (i.e., if there's only one Concurrent Haskell thread alive,
      there's no need to create a new task).
   */
-  IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
+  IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
   startTask(taskStart);
 #endif
 
@@ -1528,8 +1481,8 @@ resumeThread( StgInt tok )
   Capability *cap;
 
 #if defined(RTS_SUPPORTS_THREADS)
-  /* Wait for permission to re-enter the RTS with the result.. */
-  grabReturnCapability(&cap);
+  /* Wait for permission to re-enter the RTS with the result. */
+  grabReturnCapability(&sched_mutex, &cap);
 #else
   grabCapability(&cap);
 #endif
@@ -1558,18 +1511,6 @@ resumeThread( StgInt tok )
 }
 
 
-#if defined(RTS_SUPPORTS_THREADS)
-static void
-waitForWork()
-{
-  rts_n_waiting_tasks++;
-  waitCondition(&thread_ready_cond, &sched_mutex);
-  rts_n_waiting_tasks--;
-  return;
-}
-#endif
-
-
 /* ---------------------------------------------------------------------------
  * Static functions
  * ------------------------------------------------------------------------ */
@@ -1870,10 +1811,13 @@ activateSpark (rtsSpark spark)
  * on this thread's stack before the scheduler is invoked.
  * ------------------------------------------------------------------------ */
 
+static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
+
 void
 scheduleThread_(StgTSO *tso
-#if defined(THREADED_RTS)
               , rtsBool createTask
+#if !defined(THREADED_RTS)
+                STG_UNUSED
 #endif
              )
 {
@@ -1903,11 +1847,12 @@ scheduleThread_(StgTSO *tso
 
 void scheduleThread(StgTSO* tso)
 {
-#if defined(THREADED_RTS)
+  return scheduleThread_(tso, rtsFalse);
+}
+
+void scheduleExtThread(StgTSO* tso)
+{
   return scheduleThread_(tso, rtsTrue);
-#else
-  return scheduleThread_(tso);
-#endif
 }
 
 /* ---------------------------------------------------------------------------
@@ -3688,7 +3633,6 @@ sched_belch(char *s, ...)
 //@subsection Index
 
 //@index
-//* MainRegTable::  @cindex\s-+MainRegTable
 //* StgMainThread::  @cindex\s-+StgMainThread
 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
@@ -3706,5 +3650,4 @@ sched_belch(char *s, ...)
 //* schedule::  @cindex\s-+schedule
 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
 //* term_mutex::  @cindex\s-+term_mutex
-//* thread_ready_cond::  @cindex\s-+thread_ready_cond
 //@end index
index 93ef030..cb12eb1 100644 (file)
@@ -1,5 +1,5 @@
 /* -----------------------------------------------------------------------------
- * $Id: Schedule.h,v 1.28 2002/02/13 08:48:07 sof Exp $
+ * $Id: Schedule.h,v 1.29 2002/02/15 07:50:37 sof Exp $
  *
  * (c) The GHC Team 1998-1999
  *
@@ -164,11 +164,6 @@ extern SchedulerStatus waitThread_(StgTSO *tso,
                                   , rtsBool blockWaiting
 #endif
                                   );
-extern void scheduleThread_(StgTSO *tso
-#if defined(THREADED_RTS)
-                          , rtsBool createTask
-#endif
-                           );
 extern SchedulerStatus rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret);