Refactoring and reorganisation of the scheduler
[ghc-hetmet.git] / rts / Schedule.c
index 09150fd..e17c653 100644 (file)
@@ -137,17 +137,21 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
+static void scheduleFindWork (Capability *cap);
+#if defined(THREADED_RTS)
+static void scheduleYield (Capability **pcap, Task *task);
+#endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void schedulePushWork(Capability *cap, Task *task);
-static rtsBool scheduleGetRemoteWork(Capability *cap);
 #if defined(PARALLEL_HASKELL)
+static rtsBool scheduleGetRemoteWork(Capability *cap);
 static void scheduleSendPendingMessages(void);
 #endif
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void scheduleActivateSpark(Capability *cap);
 #endif
 static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -281,25 +285,6 @@ schedule (Capability *initialCapability, Task *task)
 
   while (TERMINATION_CONDITION) {
 
-#if defined(THREADED_RTS)
-      if (first) {
-         // don't yield the first time, we want a chance to run this
-         // thread for a bit, even if there are others banging at the
-         // door.
-         first = rtsFalse;
-         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
-      } else {
-         // Yield the capability to higher-priority tasks if necessary.
-         yieldCapability(&cap, task);
-         /* inside yieldCapability, attempts to steal work from other
-            capabilities, unless the capability has own work. 
-            See (REMARK) below.
-         */
-      }
-#endif
-
-    /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
-      
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
@@ -367,62 +352,11 @@ schedule (Capability *initialCapability, Task *task)
        barf("sched_state: %d", sched_state);
     }
 
-    /* this was the place to activate a spark, now below... */
-
-    scheduleStartSignalHandlers(cap);
+    scheduleFindWork(cap);
 
-    // Only check the black holes here if we've nothing else to do.
-    // During normal execution, the black hole list only gets checked
-    // at GC time, to avoid repeatedly traversing this possibly long
-    // list each time around the scheduler.
-    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
-    scheduleCheckWakeupThreads(cap);
-
-    scheduleCheckBlockedThreads(cap);
-
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-    /* work distribution in multithreaded and parallel systems 
-
-       REMARK: IMHO best location for work-stealing as well.
-       tests above might yield some new jobs, so no need to steal a
-       spark in some cases. I believe the yieldCapability.. above
-       should be moved here.
-    */
-
-#if defined(PARALLEL_HASKELL)
-    /* if messages have been buffered... a NOOP in THREADED_RTS */
-    scheduleSendPendingMessages();
-#endif
-
-    /* If the run queue is empty,...*/
-    if (emptyRunQueue(cap)) {
-      /* ...take one of our own sparks and turn it into a thread */
-      scheduleActivateSpark(cap);
-
-       /* if this did not work, try to steal a spark from someone else */
-      if (emptyRunQueue(cap)) {
-#if defined(PARALLEL_HASKELL)
-       receivedFinish = scheduleGetRemoteWork(cap);
-       continue; //  a new round, (hopefully) with new work
-       /* 
-          in GUM, this a) sends out a FISH and returns IF no fish is
-                          out already
-                       b) (blocking) awaits and receives messages
-          
-          in Eden, this is only the blocking receive, as b) in GUM.
-
-          in Threaded-RTS, this does plain nothing. Stealing routine
-               is inside Capability.c and called from
-               yieldCapability() at the very beginning, see REMARK.
-       */
-#endif
-      }
-    } else { /* i.e. run queue was (initially) not empty */
-      schedulePushWork(cap,task);
-      /* work pushing, currently relevant only for THREADED_RTS:
-        (pushes threads, wakes up idle capabilities for stealing) */
-    }
+    /* work pushing, currently relevant only for THREADED_RTS:
+       (pushes threads, wakes up idle capabilities for stealing) */
+    schedulePushWork(cap,task);
 
 #if defined(PARALLEL_HASKELL)
     /* since we perform a blocking receive and continue otherwise,
@@ -439,9 +373,8 @@ schedule (Capability *initialCapability, Task *task)
     }
 #endif // PARALLEL_HASKELL: non-empty run queue!
 
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
-
     scheduleDetectDeadlock(cap,task);
+
 #if defined(THREADED_RTS)
     cap = task->cap;    // reload cap, it might have changed
 #endif
@@ -454,12 +387,27 @@ schedule (Capability *initialCapability, Task *task)
     //
     // win32: might be here due to awaitEvent() being abandoned
     // as a result of a console event having been delivered.
-    if ( emptyRunQueue(cap) ) {
+    
+#if defined(THREADED_RTS)
+    if (first) 
+    {
+    // XXX: ToDo
+    //     // don't yield the first time, we want a chance to run this
+    //     // thread for a bit, even if there are others banging at the
+    //     // door.
+    //     first = rtsFalse;
+    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    }
+
+    scheduleYield(&cap,task);
+    if (emptyRunQueue(cap)) continue; // look for work again
+#endif
+
 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+    if ( emptyRunQueue(cap) ) {
        ASSERT(sched_state >= SCHED_INTERRUPTING);
-#endif
-       continue; // nothing to do
     }
+#endif
 
     // 
     // Get a thread to run
@@ -683,12 +631,110 @@ schedulePreLoop(void)
 }
 
 /* -----------------------------------------------------------------------------
+ * scheduleFindWork()
+ *
+ * Search for work to do, and handle messages from elsewhere.
+ * -------------------------------------------------------------------------- */
+
+static void
+scheduleFindWork (Capability *cap)
+{
+    scheduleStartSignalHandlers(cap);
+
+    // Only check the black holes here if we've nothing else to do.
+    // During normal execution, the black hole list only gets checked
+    // at GC time, to avoid repeatedly traversing this possibly long
+    // list each time around the scheduler.
+    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+
+    scheduleCheckWakeupThreads(cap);
+
+    scheduleCheckBlockedThreads(cap);
+
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+    // Try to activate one of our own sparks
+    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+#endif
+
+#if defined(THREADED_RTS)
+    // Try to steak work if we don't have any
+    if (emptyRunQueue(cap)) { stealWork(cap); }
+#endif
+    
+#if defined(PARALLEL_HASKELL)
+    // if messages have been buffered...
+    scheduleSendPendingMessages();
+#endif
+
+#if defined(PARALLEL_HASKELL)
+    if (emptyRunQueue(cap)) {
+       receivedFinish = scheduleGetRemoteWork(cap);
+       continue; //  a new round, (hopefully) with new work
+       /* 
+          in GUM, this a) sends out a FISH and returns IF no fish is
+                          out already
+                       b) (blocking) awaits and receives messages
+          
+          in Eden, this is only the blocking receive, as b) in GUM.
+       */
+    }
+#endif
+}
+
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+shouldYieldCapability (Capability *cap, Task *task)
+{
+    // we need to yield this capability to someone else if..
+    //   - another thread is initiating a GC
+    //   - another Task is returning from a foreign call
+    //   - the thread at the head of the run queue cannot be run
+    //     by this Task (it is bound to another Task, or it is unbound
+    //     and this task it bound).
+    return (waiting_for_gc || 
+            cap->returning_tasks_hd != NULL ||
+            (!emptyRunQueue(cap) && (task->tso == NULL
+                                     ? cap->run_queue_hd->bound != NULL
+                                     : cap->run_queue_hd->bound != task)));
+}
+
+// This is the single place where a Task goes to sleep.  There are
+// two reasons it might need to sleep:
+//    - there are no threads to run
+//    - we need to yield this Capability to someone else 
+//      (see shouldYieldCapability())
+//
+// The return value indicates whether 
+
+static void
+scheduleYield (Capability **pcap, Task *task)
+{
+    Capability *cap = *pcap;
+
+    // if we have work, and we don't need to give up the Capability, continue.
+    if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
+        return;
+
+    // otherwise yield (sleep), and keep yielding if necessary.
+    do {
+        yieldCapability(&cap,task);
+    } 
+    while (shouldYieldCapability(cap,task));
+
+    // note there may still be no threads on the run queue at this
+    // point, the caller has to check.
+
+    *pcap = cap;
+    return;
+}
+#endif
+    
+/* -----------------------------------------------------------------------------
  * schedulePushWork()
  *
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 static void
 schedulePushWork(Capability *cap USED_IF_THREADS, 
                 Task *task      USED_IF_THREADS)
@@ -788,7 +834,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
            // i is the next free capability to push to
            for (; i < n_free_caps; i++) {
                if (emptySparkPoolCap(free_caps[i])) {
-                   spark = findSpark(cap);
+                   spark = tryStealSpark(cap->sparks);
                    if (spark != NULL) {
                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
                        newSpark(&(free_caps[i]->r), spark);
@@ -801,18 +847,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
-           releaseCapability(free_caps[i]);
+           releaseAndWakeupCapability(free_caps[i]);
        }
-       // now wake them all up, and they might steal sparks if
-       // the did not get a thread
-       prodAllCapabilities();
     }
     task->cap = cap; // reset to point to our Capability.
 
 #endif /* THREADED_RTS */
 
 }
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
@@ -1031,7 +1073,12 @@ scheduleActivateSpark(Capability *cap)
         on our run queue in the meantime ? But would need a lock.. */
       return;
 
-    spark = findSpark(cap); // defined in Sparks.c
+    // Really we should be using reclaimSpark() here, but
+    // experimentally it doesn't seem to perform as well as just
+    // stealing from our own spark pool:
+    // spark = reclaimSpark(cap->sparks);
+    spark = tryStealSpark(cap->sparks); // defined in Sparks.c
 
     if (spark != NULL) {
       debugTrace(DEBUG_sched,
@@ -1046,9 +1093,9 @@ scheduleActivateSpark(Capability *cap)
  * Get work from a remote node (PARALLEL_HASKELL only)
  * ------------------------------------------------------------------------- */
     
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(PARALLEL_HASKELL)
 static rtsBool /* return value used in PARALLEL_HASKELL only */
-scheduleGetRemoteWork(Capability *cap)
+scheduleGetRemoteWork (Capability *cap STG_UNUSED)
 {
 #if defined(PARALLEL_HASKELL)
   rtsBool receivedFinish = rtsFalse;
@@ -1800,7 +1847,7 @@ suspendThread (StgRegTable *reg)
 
   suspendTask(cap,task);
   cap->in_haskell = rtsFalse;
-  releaseCapability_(cap);
+  releaseCapability_(cap,rtsFalse);
   
   RELEASE_LOCK(&cap->lock);