Refactoring and reorganisation of the scheduler
[ghc-hetmet.git] / rts / Capability.c
index 516aaa5..948922a 100644 (file)
@@ -54,15 +54,17 @@ globalWorkToDo (void)
 #endif
 
 #if defined(THREADED_RTS)
-rtsBool stealWork( Capability *cap) {
+rtsBool
+stealWork (Capability *cap)
+{
   /* use the normal Sparks.h interface (internally modified to enable
      concurrent stealing) 
      and immediately turn the spark into a thread when successful
   */
   Capability *robbed;
-  SparkPool *pool;
   StgClosurePtr spark;
   rtsBool success = rtsFalse;
+  rtsBool retry;
   nat i = 0;
 
   debugTrace(DEBUG_sched,
@@ -71,63 +73,40 @@ rtsBool stealWork( Capability *cap) {
 
   if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
 
-  /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
-     start at a random place instead of 0 as well.  */
-  for ( i=0 ; i < n_capabilities ; i++ ) {
-    robbed = &capabilities[i];
-    if (cap == robbed)  // ourselves...
-      continue;
+  do {
+      retry = rtsFalse;
 
-    if (emptySparkPoolCap(robbed)) // nothing to steal here
-      continue;
-    
-    spark = findSpark(robbed);
+      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+      start at a random place instead of 0 as well.  */
+      for ( i=0 ; i < n_capabilities ; i++ ) {
+          robbed = &capabilities[i];
+          if (cap == robbed)  // ourselves...
+              continue;
 
-    if (spark == NULL && !emptySparkPoolCap(robbed)) {
-      spark = findSpark(robbed); // lost race in concurrent access, try again
-    }
-    if (spark != NULL) {
-      debugTrace(DEBUG_sched,
+          if (emptySparkPoolCap(robbed)) // nothing to steal here
+              continue;
+
+          spark = tryStealSpark(robbed->sparks);
+          if (spark == NULL && !emptySparkPoolCap(robbed)) {
+              // we conflicted with another thread while trying to steal;
+              // try again later.
+              retry = rtsTrue;
+          }
+
+          if (spark != NULL) {
+              debugTrace(DEBUG_sched,
                 "cap %d: Stole a spark from capability %d",
-                cap->no, robbed->no);
+                         cap->no, robbed->no);
 
-      createSparkThread(cap,spark);
-      success = rtsTrue;
-      break; // got one, leave the loop
-    }
- // otherwise: no success, try next one
-  }
-  debugTrace(DEBUG_sched,
-            "Leaving work stealing routine (%s)",
-            success?"one spark stolen":"thefts did not succeed");
-  return success;
-}
+              createSparkThread(cap,spark);
+              return rtsTrue;
+          }
+          // otherwise: no success, try next one
+      }
+  } while (retry);
 
-STATIC_INLINE rtsBool
-anyWorkForMe( Capability *cap, Task *task )
-{
-    if (task->tso != NULL) {
-       // A bound task only runs if its thread is on the run queue of
-       // the capability on which it was woken up.  Otherwise, we
-       // can't be sure that we have the right capability: the thread
-       // might be woken up on some other capability, and task->cap
-       // could change under our feet.
-       return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
-    } else {
-       // A vanilla worker task runs if either there is a lightweight
-       // thread at the head of the run queue, or the run queue is
-       // empty and (there are sparks to execute, or there is some
-       // other global condition to check, such as threads blocked on
-       // blackholes).
-       if (emptyRunQueue(cap)) {
-           return !emptySparkPoolCap(cap)
-               || !emptyWakeupQueue(cap)
-               || globalWorkToDo()
-               || stealWork(cap); /* if all false: try to steal work */
-       } else {
-           return cap->run_queue_hd->bound == NULL;
-       }
-    }
+  debugTrace(DEBUG_sched, "No sparks stolen");
+  return rtsFalse;
 }
 #endif
 
@@ -194,6 +173,9 @@ initCapability( Capability *cap, nat i )
     cap->returning_tasks_tl = NULL;
     cap->wakeup_queue_hd    = END_TSO_QUEUE;
     cap->wakeup_queue_tl    = END_TSO_QUEUE;
+    cap->sparks_created     = 0;
+    cap->sparks_converted   = 0;
+    cap->sparks_pruned      = 0;
 #endif
 
     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
@@ -326,7 +308,8 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
 
 #if defined(THREADED_RTS)
 void
-releaseCapability_ (Capability* cap)
+releaseCapability_ (Capability* cap, 
+                    rtsBool always_wakeup)
 {
     Task *task;
 
@@ -384,8 +367,9 @@ releaseCapability_ (Capability* cap)
 
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
-    if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
-             || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+    if (always_wakeup || 
+        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+        !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
            // The worker Task pops itself from the queue;
@@ -401,7 +385,15 @@ void
 releaseCapability (Capability* cap USED_IF_THREADS)
 {
     ACQUIRE_LOCK(&cap->lock);
-    releaseCapability_(cap);
+    releaseCapability_(cap, rtsFalse);
+    RELEASE_LOCK(&cap->lock);
+}
+
+void
+releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
+{
+    ACQUIRE_LOCK(&cap->lock);
+    releaseCapability_(cap, rtsTrue);
     RELEASE_LOCK(&cap->lock);
 }
 
@@ -427,7 +419,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
     }
     // Bound tasks just float around attached to their TSOs.
 
-    releaseCapability_(cap);
+    releaseCapability_(cap,rtsFalse);
 
     RELEASE_LOCK(&cap->lock);
 }
@@ -534,16 +526,6 @@ yieldCapability (Capability** pCap, Task *task)
 {
     Capability *cap = *pCap;
 
-    // The fast path has no locking, if we don't enter this while loop
-
-    while ( waiting_for_gc
-           /* i.e. another capability triggered HeapOverflow, is busy
-              getting capabilities (stopping their owning tasks) */
-           || cap->returning_tasks_hd != NULL 
-               /* cap reserved for another task */
-           || !anyWorkForMe(cap,task) 
-               /* cap/task have no work */
-           ) {
        debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
 
        // We must now release the capability and wait to be woken up
@@ -588,7 +570,6 @@ yieldCapability (Capability** pCap, Task *task)
 
        trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
        ASSERT(cap->running_task == task);
-    }
 
     *pCap = cap;
 
@@ -630,7 +611,7 @@ wakeupThreadOnCapability (Capability *my_cap,
        appendToRunQueue(other_cap,tso);
 
        trace(TRACE_sched, "resuming capability %d", other_cap->no);
-       releaseCapability_(other_cap);
+       releaseCapability_(other_cap,rtsFalse);
     } else {
        appendToWakeupQueue(my_cap,other_cap,tso);
         other_cap->context_switch = 1;
@@ -765,7 +746,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
        if (!emptyRunQueue(cap) || cap->spare_workers) {
            debugTrace(DEBUG_sched, 
                       "runnable threads or workers still alive, yielding");
-           releaseCapability_(cap); // this will wake up a worker
+           releaseCapability_(cap,rtsFalse); // this will wake up a worker
            RELEASE_LOCK(&cap->lock);
            yieldThread();
            continue;