[project @ 2005-05-23 15:44:10 by simonmar]
authorsimonmar <unknown>
Mon, 23 May 2005 15:44:10 +0000 (15:44 +0000)
committersimonmar <unknown>
Mon, 23 May 2005 15:44:10 +0000 (15:44 +0000)
Simplify and improve the Capability-passing machinery for bound
threads.

The old story was quite complicated: if you find a thread on the run
queue which the current task can't run, you had to call
passCapability(), which set a flag saying where the next Capability
was to go, and then release the Capability.  When multiple
Capabilities are flying around, it's not clear how this story should
extend.

The new story is much simpler: each time around the scheduler loop,
the task looks to see whether it can make any progress, and if not, it
releases its Capability and wakes up a task which *can* make some
progress.  The predicate for whether we can make any progress is
encapsulated in the (inline) function ANY_WORK_FOR_ME(Condition).
Waking up an appropriate task is encapsulated in the function
threadRunnable() (previously it was in two places).

The logic in Capability.c is simpler, but unfortunately it is now more
closely connected with the Scheduler, because it inspects the run
queue.  However, performance when communicating between bound and
unbound threads might be better.

The concurrency tests still work, so hopefully this hasn't broken
anything.

ghc/rts/Capability.c
ghc/rts/Capability.h
ghc/rts/Schedule.c

index 0d58e75..9e28a16 100644 (file)
@@ -74,9 +74,6 @@ Condition thread_ready_cond = INIT_COND_VAR;
  * Task.startTask() uses its current value.
  */
 nat rts_n_waiting_tasks = 0;
-
-static Condition *passTarget = NULL;
-static rtsBool passingCapability = rtsFalse;
 #endif
 
 #if defined(SMP)
@@ -97,12 +94,43 @@ HashTable *capability_hash;
 #define UNUSED_IF_NOT_SMP STG_UNUSED
 #endif
 
+
+#if defined(RTS_SUPPORTS_THREADS)
+INLINE_HEADER rtsBool
+ANY_WORK_FOR_ME( Condition *cond )
+{
+    // If the run queue is not empty, then we only wake up the guy who
+    // can run the thread at the head, even if there is some other
+    // reason for this task to run (eg. interrupted=rtsTrue).
+    if (!EMPTY_RUN_QUEUE()) {
+       if (run_queue_hd->main == NULL) {
+           return (cond == NULL);
+       } else {
+           return (&run_queue_hd->main->bound_thread_cond == cond);
+       }
+    }
+
+    return blackholes_need_checking
+       || interrupted
 #if defined(RTS_USER_SIGNALS)
-#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking || signals_pending())
-#else
-#define ANY_WORK_TO_DO() (!EMPTY_RUN_QUEUE() || interrupted || blackholes_need_checking)
+       || signals_pending()
+#endif
+       ;
+}
 #endif
 
+INLINE_HEADER rtsBool
+ANY_WORK_TO_DO(void) 
+{
+    return (!EMPTY_RUN_QUEUE() 
+           || interrupted
+           || blackholes_need_checking
+#if defined(RTS_USER_SIGNALS)
+           || signals_pending()
+#endif
+       );
+}
+
 /* ----------------------------------------------------------------------------
    Initialisation
    ------------------------------------------------------------------------- */
@@ -232,28 +260,14 @@ releaseCapability( Capability* cap UNUSED_IF_NOT_SMP )
        // Decrement the counter here to avoid livelock where the
        // thread that is yielding its capability will repeatedly
        // signal returning_worker_cond.
-
        rts_n_waiting_workers--;
        signalCondition(&returning_worker_cond);
-       IF_DEBUG(scheduler, sched_belch("worker: released capability to returning worker"));
-    } else if (passingCapability) {
-       if (passTarget == NULL) {
-           signalCondition(&thread_ready_cond);
-           startSchedulerTaskIfNecessary();
-       } else {
-           signalCondition(passTarget);
-       }
-       rts_n_free_capabilities++;
-       IF_DEBUG(scheduler, sched_belch("worker: released capability, passing it"));
-
+       IF_DEBUG(scheduler, 
+                sched_belch("worker: released capability to returning worker"));
     } else {
        rts_n_free_capabilities++;
-       // Signal that a capability is available
-       if (rts_n_waiting_tasks > 0 && ANY_WORK_TO_DO()) {
-           signalCondition(&thread_ready_cond);
-       }
-       startSchedulerTaskIfNecessary();
        IF_DEBUG(scheduler, sched_belch("worker: released capability"));
+       threadRunnable();
     }
 #endif
     return;
@@ -299,7 +313,7 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
             sched_belch("worker: returning; workers waiting: %d",
                         rts_n_waiting_workers));
 
-    if ( noCapabilities() || passingCapability ) {
+    if ( noCapabilities() ) {
        rts_n_waiting_workers++;
        context_switch = 1;     // make sure it's our turn soon
        waitCondition(&returning_worker_cond, pMutex);
@@ -327,16 +341,16 @@ waitForReturnCapability( Mutex* pMutex, Capability** pCap )
  * ------------------------------------------------------------------------- */
 
 void
-yieldCapability( Capability** pCap )
+yieldCapability( Capability** pCap, Condition *cond )
 {
     // Pre-condition:  pMutex is assumed held, the current thread
     // holds the capability pointed to by pCap.
 
-    if ( rts_n_waiting_workers > 0 || passingCapability || !ANY_WORK_TO_DO()) {
+    if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) {
        IF_DEBUG(scheduler, 
                 if (rts_n_waiting_workers > 0) {
                     sched_belch("worker: giving up capability (returning wkr)");
-                } else if (passingCapability) {
+                } else if (!EMPTY_RUN_QUEUE()) {
                     sched_belch("worker: giving up capability (passing capability)");
                 } else {
                     sched_belch("worker: giving up capability (no threads to run)");
@@ -367,7 +381,7 @@ yieldCapability( Capability** pCap )
  *           not to create a new worker thread when an external
  *           call is made.
  *           If pThreadCond is not NULL, a capability can be specifically
- *           passed to this thread using passCapability.
+ *           passed to this thread.
  * ------------------------------------------------------------------------- */
  
 void
@@ -375,9 +389,7 @@ waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
 {
     // Pre-condition: pMutex is held.
 
-    while ( noCapabilities() ||
-           (passingCapability && passTarget != pThreadCond) ||
-           !ANY_WORK_TO_DO()) {
+    while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) {
        IF_DEBUG(scheduler,
                 sched_belch("worker: wait for capability (cond: %p)",
                             pThreadCond));
@@ -392,43 +404,12 @@ waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond )
            IF_DEBUG(scheduler, sched_belch("worker: get normal capability"));
        }
     }
-    passingCapability = rtsFalse;
     grabCapability(pCap);
 
     // Post-condition: pMutex is held and *pCap is held by the current thread
     return;
 }
 
-/* ----------------------------------------------------------------------------
-   passCapability, passCapabilityToWorker
-   ------------------------------------------------------------------------- */
-
-void
-passCapability( Condition *pTargetThreadCond )
-{
-    // Pre-condition: pMutex is held and cap is held by the current thread
-
-    passTarget = pTargetThreadCond;
-    passingCapability = rtsTrue;
-    IF_DEBUG(scheduler, sched_belch("worker: passCapability"));
-
-    // Post-condition: pMutex is held; cap is still held, but will be
-    //                 passed to the target thread when next released.
-}
-
-void
-passCapabilityToWorker( void )
-{
-    // Pre-condition: pMutex is held and cap is held by the current thread
-
-    passTarget = NULL;
-    passingCapability = rtsTrue;
-    IF_DEBUG(scheduler, sched_belch("worker: passCapabilityToWorker"));
-
-    // Post-condition: pMutex is held; cap is still held, but will be
-    //                 passed to a worker thread when next released.
-}
-
 #endif /* RTS_SUPPORTS_THREADS */
 
 /* ----------------------------------------------------------------------------
@@ -444,10 +425,17 @@ void
 threadRunnable ( void )
 {
 #if defined(RTS_SUPPORTS_THREADS)
-    if ( !noCapabilities() && ANY_WORK_TO_DO() && rts_n_waiting_tasks > 0 ) {
-       signalCondition(&thread_ready_cond);
+    if ( !noCapabilities() && ANY_WORK_TO_DO() ) {
+       if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) {
+           signalCondition(&run_queue_hd->main->bound_thread_cond);
+           return;
+       }
+       if (rts_n_waiting_tasks > 0) {
+           signalCondition(&thread_ready_cond);
+       } else {
+           startSchedulerTaskIfNecessary();
+       }
     }
-    startSchedulerTaskIfNecessary();
 #endif
 }
 
index 7034b6b..5f1649e 100644 (file)
@@ -59,7 +59,7 @@ extern void prodWorker ( void );
 // current worker thread should then re-acquire it using
 // waitForCapability().
 //
-extern void yieldCapability( Capability **pCap );
+extern void yieldCapability( Capability** pCap, Condition *cond );
 
 // Acquires a capability for doing some work.
 //
index 7f85690..ea85c83 100644 (file)
@@ -463,7 +463,8 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
       // Yield the capability to higher-priority tasks if necessary.
       //
       if (cap != NULL) {
-         yieldCapability(&cap);
+         yieldCapability(&cap, 
+                         mainThread ? &mainThread->bound_thread_cond : NULL );
       }
 
       // If we do not currently hold a capability, we wait for one
@@ -475,7 +476,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
       // We now have a capability...
 #endif
-      
+
 #if 0 /* extra sanity checking */
       { 
          StgMainThread *m;
@@ -627,21 +628,19 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
            sched_belch("### thread %d bound to another OS thread", t->id));
          // no, bound to a different Haskell thread: pass to that thread
          PUSH_ON_RUN_QUEUE(t);
-         passCapability(&m->bound_thread_cond);
          continue;
        }
       }
       else
       {
        if(mainThread != NULL)
-        // The thread we want to run is bound.
+        // The thread we want to run is unbound.
        {
          IF_DEBUG(scheduler,
            sched_belch("### this OS thread cannot run thread %d", t->id));
          // no, the current native thread is bound to a different
          // Haskell thread, so pass it to any worker thread
          PUSH_ON_RUN_QUEUE(t);
-         passCapabilityToWorker();
          continue; 
        }
       }