Refactoring and reorganisation of the scheduler
authorSimon Marlow <marlowsd@gmail.com>
Wed, 22 Oct 2008 09:27:44 +0000 (09:27 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Wed, 22 Oct 2008 09:27:44 +0000 (09:27 +0000)
Change the way we look for work in the scheduler.  Previously,
checking to see whether there was anything to do was a
non-side-effecting operation, but this has changed now that we do
work-stealing.  This lead to a refactoring of the inner loop of the
scheduler.

Also, lots of cleanup in the new work-stealing code, but no functional
changes.

One new statistic is added to the +RTS -s output:

  SPARKS: 1430 (2 converted, 1427 pruned)

lets you know something about the use of `par` in the program.

includes/RtsTypes.h
rts/Capability.c
rts/Capability.h
rts/Schedule.c
rts/Sparks.c
rts/Sparks.h
rts/Stable.c
rts/Stats.c

index 3510ee7..d497005 100644 (file)
@@ -1,8 +1,10 @@
-/*
-  Time-stamp: <2005-03-30 12:02:33 simonmar>
-
-  RTS specific types.
-*/
+/* -----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2008
+ *
+ * RTS-specific types.
+ *
+ * ---------------------------------------------------------------------------*/
 
 /* -------------------------------------------------------------------------
    Generally useful typedefs
@@ -37,40 +39,6 @@ typedef enum {
    Types specific to the parallel runtime system.
 */
 
-
-/* Spark pools: used to store pending sparks 
- *  (THREADED_RTS & PARALLEL_HASKELL only)
- * Implementation uses a DeQue to enable concurrent read accesses at
- * the top end.
- */
-typedef struct  SparkPool_ {
-  /* Size of elements array. Used for modulo calculation: we round up
-     to powers of 2 and use the dyadic log (modulo == bitwise &) */
-  StgWord size; 
-  StgWord moduloSize; /* bitmask for modulo */
-
-  /* top, index where multiple readers steal() (protected by a cas) */
-  StgWord top;
-
-  /* bottom, index of next free place where one writer can push
-     elements. This happens unsynchronised. */
-  StgWord bottom;
-  /* both position indices are continuously incremented, and used as
-     an index modulo the current array size. */
-  
-  /* lower bound on the current top value. This is an internal
-     optimisation to avoid unnecessarily accessing the top field
-     inside pushBottom */
-  StgWord topBound;
-
-  /* The elements array */
-  StgClosurePtr* elements;
-  /*  Please note: the dataspace cannot follow the admin fields
-      immediately, as it should be possible to enlarge it without
-      disposing the old one automatically (as realloc would)! */
-
-} SparkPool;
-
 typedef ullong        rtsTime;
 
 #if defined(PAR)
index 516aaa5..948922a 100644 (file)
@@ -54,15 +54,17 @@ globalWorkToDo (void)
 #endif
 
 #if defined(THREADED_RTS)
-rtsBool stealWork( Capability *cap) {
+rtsBool
+stealWork (Capability *cap)
+{
   /* use the normal Sparks.h interface (internally modified to enable
      concurrent stealing) 
      and immediately turn the spark into a thread when successful
   */
   Capability *robbed;
-  SparkPool *pool;
   StgClosurePtr spark;
   rtsBool success = rtsFalse;
+  rtsBool retry;
   nat i = 0;
 
   debugTrace(DEBUG_sched,
@@ -71,63 +73,40 @@ rtsBool stealWork( Capability *cap) {
 
   if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
 
-  /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
-     start at a random place instead of 0 as well.  */
-  for ( i=0 ; i < n_capabilities ; i++ ) {
-    robbed = &capabilities[i];
-    if (cap == robbed)  // ourselves...
-      continue;
+  do {
+      retry = rtsFalse;
 
-    if (emptySparkPoolCap(robbed)) // nothing to steal here
-      continue;
-    
-    spark = findSpark(robbed);
+      /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+      start at a random place instead of 0 as well.  */
+      for ( i=0 ; i < n_capabilities ; i++ ) {
+          robbed = &capabilities[i];
+          if (cap == robbed)  // ourselves...
+              continue;
 
-    if (spark == NULL && !emptySparkPoolCap(robbed)) {
-      spark = findSpark(robbed); // lost race in concurrent access, try again
-    }
-    if (spark != NULL) {
-      debugTrace(DEBUG_sched,
+          if (emptySparkPoolCap(robbed)) // nothing to steal here
+              continue;
+
+          spark = tryStealSpark(robbed->sparks);
+          if (spark == NULL && !emptySparkPoolCap(robbed)) {
+              // we conflicted with another thread while trying to steal;
+              // try again later.
+              retry = rtsTrue;
+          }
+
+          if (spark != NULL) {
+              debugTrace(DEBUG_sched,
                 "cap %d: Stole a spark from capability %d",
-                cap->no, robbed->no);
+                         cap->no, robbed->no);
 
-      createSparkThread(cap,spark);
-      success = rtsTrue;
-      break; // got one, leave the loop
-    }
- // otherwise: no success, try next one
-  }
-  debugTrace(DEBUG_sched,
-            "Leaving work stealing routine (%s)",
-            success?"one spark stolen":"thefts did not succeed");
-  return success;
-}
+              createSparkThread(cap,spark);
+              return rtsTrue;
+          }
+          // otherwise: no success, try next one
+      }
+  } while (retry);
 
-STATIC_INLINE rtsBool
-anyWorkForMe( Capability *cap, Task *task )
-{
-    if (task->tso != NULL) {
-       // A bound task only runs if its thread is on the run queue of
-       // the capability on which it was woken up.  Otherwise, we
-       // can't be sure that we have the right capability: the thread
-       // might be woken up on some other capability, and task->cap
-       // could change under our feet.
-       return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
-    } else {
-       // A vanilla worker task runs if either there is a lightweight
-       // thread at the head of the run queue, or the run queue is
-       // empty and (there are sparks to execute, or there is some
-       // other global condition to check, such as threads blocked on
-       // blackholes).
-       if (emptyRunQueue(cap)) {
-           return !emptySparkPoolCap(cap)
-               || !emptyWakeupQueue(cap)
-               || globalWorkToDo()
-               || stealWork(cap); /* if all false: try to steal work */
-       } else {
-           return cap->run_queue_hd->bound == NULL;
-       }
-    }
+  debugTrace(DEBUG_sched, "No sparks stolen");
+  return rtsFalse;
 }
 #endif
 
@@ -194,6 +173,9 @@ initCapability( Capability *cap, nat i )
     cap->returning_tasks_tl = NULL;
     cap->wakeup_queue_hd    = END_TSO_QUEUE;
     cap->wakeup_queue_tl    = END_TSO_QUEUE;
+    cap->sparks_created     = 0;
+    cap->sparks_converted   = 0;
+    cap->sparks_pruned      = 0;
 #endif
 
     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
@@ -326,7 +308,8 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
 
 #if defined(THREADED_RTS)
 void
-releaseCapability_ (Capability* cap)
+releaseCapability_ (Capability* cap, 
+                    rtsBool always_wakeup)
 {
     Task *task;
 
@@ -384,8 +367,9 @@ releaseCapability_ (Capability* cap)
 
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
-    if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
-             || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+    if (always_wakeup || 
+        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+        !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
            // The worker Task pops itself from the queue;
@@ -401,7 +385,15 @@ void
 releaseCapability (Capability* cap USED_IF_THREADS)
 {
     ACQUIRE_LOCK(&cap->lock);
-    releaseCapability_(cap);
+    releaseCapability_(cap, rtsFalse);
+    RELEASE_LOCK(&cap->lock);
+}
+
+void
+releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
+{
+    ACQUIRE_LOCK(&cap->lock);
+    releaseCapability_(cap, rtsTrue);
     RELEASE_LOCK(&cap->lock);
 }
 
@@ -427,7 +419,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
     }
     // Bound tasks just float around attached to their TSOs.
 
-    releaseCapability_(cap);
+    releaseCapability_(cap,rtsFalse);
 
     RELEASE_LOCK(&cap->lock);
 }
@@ -534,16 +526,6 @@ yieldCapability (Capability** pCap, Task *task)
 {
     Capability *cap = *pCap;
 
-    // The fast path has no locking, if we don't enter this while loop
-
-    while ( waiting_for_gc
-           /* i.e. another capability triggered HeapOverflow, is busy
-              getting capabilities (stopping their owning tasks) */
-           || cap->returning_tasks_hd != NULL 
-               /* cap reserved for another task */
-           || !anyWorkForMe(cap,task) 
-               /* cap/task have no work */
-           ) {
        debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
 
        // We must now release the capability and wait to be woken up
@@ -588,7 +570,6 @@ yieldCapability (Capability** pCap, Task *task)
 
        trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
        ASSERT(cap->running_task == task);
-    }
 
     *pCap = cap;
 
@@ -630,7 +611,7 @@ wakeupThreadOnCapability (Capability *my_cap,
        appendToRunQueue(other_cap,tso);
 
        trace(TRACE_sched, "resuming capability %d", other_cap->no);
-       releaseCapability_(other_cap);
+       releaseCapability_(other_cap,rtsFalse);
     } else {
        appendToWakeupQueue(my_cap,other_cap,tso);
         other_cap->context_switch = 1;
@@ -765,7 +746,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
        if (!emptyRunQueue(cap) || cap->spare_workers) {
            debugTrace(DEBUG_sched, 
                       "runnable threads or workers still alive, yielding");
-           releaseCapability_(cap); // this will wake up a worker
+           releaseCapability_(cap,rtsFalse); // this will wake up a worker
            RELEASE_LOCK(&cap->lock);
            yieldThread();
            continue;
index 5945895..779a194 100644 (file)
@@ -23,9 +23,9 @@
 #ifndef CAPABILITY_H
 #define CAPABILITY_H
 
-#include "RtsTypes.h"
 #include "RtsFlags.h"
 #include "Task.h"
+#include "Sparks.h"
 
 struct Capability_ {
     // State required by the STG virtual machine when running Haskell
@@ -91,6 +91,13 @@ struct Capability_ {
     // woken up by another Capability.
     StgTSO *wakeup_queue_hd;
     StgTSO *wakeup_queue_tl;
+
+    SparkPool *sparks;
+
+    // Stats on spark creation/conversion
+    nat sparks_created;
+    nat sparks_converted;
+    nat sparks_pruned;
 #endif
 
     // Per-capability STM-related data
@@ -100,8 +107,6 @@ struct Capability_ {
     StgTRecHeader *free_trec_headers;
     nat transaction_tokens;
 
-    SparkPool *sparks;
-
 }; // typedef Capability, defined in RtsAPI.h
 
 
@@ -147,12 +152,16 @@ void initCapabilities (void);
 // ASSUMES: cap->running_task is the current Task.
 //
 #if defined(THREADED_RTS)
-void releaseCapability  (Capability* cap);
-void releaseCapability_ (Capability* cap); // assumes cap->lock is held
+void releaseCapability           (Capability* cap);
+void releaseAndWakeupCapability  (Capability* cap);
+void releaseCapability_ (Capability* cap, rtsBool always_wakeup); 
+// assumes cap->lock is held
 #else
 // releaseCapability() is empty in non-threaded RTS
 INLINE_HEADER void releaseCapability  (Capability* cap STG_UNUSED) {};
-INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED) {};
+INLINE_HEADER void releaseAndWakeupCapability  (Capability* cap STG_UNUSED) {};
+INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED, 
+                                       rtsBool always_wakeup STG_UNUSED) {};
 #endif
 
 #if !IN_STG_CODE
@@ -231,6 +240,14 @@ void shutdownCapability (Capability *cap, Task *task, rtsBool wait_foreign);
 //
 rtsBool tryGrabCapability (Capability *cap, Task *task);
 
+// Try to steal a spark from other Capabilities
+//
+rtsBool stealWork (Capability *cap);
+
+INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
+INLINE_HEADER nat     sparkPoolSizeCap  (Capability *cap);
+INLINE_HEADER void    discardSparksCap  (Capability *cap);
+
 #else // !THREADED_RTS
 
 // Grab a capability.  (Only in the non-threaded RTS; in the threaded
@@ -273,4 +290,18 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
     *bd->free++ = (StgWord)p;
 }
 
+#if defined(THREADED_RTS)
+INLINE_HEADER rtsBool
+emptySparkPoolCap (Capability *cap) 
+{ return looksEmpty(cap->sparks); }
+
+INLINE_HEADER nat
+sparkPoolSizeCap (Capability *cap) 
+{ return sparkPoolSize(cap->sparks); }
+
+INLINE_HEADER void
+discardSparksCap (Capability *cap) 
+{ return discardSparks(cap->sparks); }
+#endif
+
 #endif /* CAPABILITY_H */
index 09150fd..e17c653 100644 (file)
@@ -137,17 +137,21 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
+static void scheduleFindWork (Capability *cap);
+#if defined(THREADED_RTS)
+static void scheduleYield (Capability **pcap, Task *task);
+#endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void schedulePushWork(Capability *cap, Task *task);
-static rtsBool scheduleGetRemoteWork(Capability *cap);
 #if defined(PARALLEL_HASKELL)
+static rtsBool scheduleGetRemoteWork(Capability *cap);
 static void scheduleSendPendingMessages(void);
 #endif
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void scheduleActivateSpark(Capability *cap);
 #endif
 static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -281,25 +285,6 @@ schedule (Capability *initialCapability, Task *task)
 
   while (TERMINATION_CONDITION) {
 
-#if defined(THREADED_RTS)
-      if (first) {
-         // don't yield the first time, we want a chance to run this
-         // thread for a bit, even if there are others banging at the
-         // door.
-         first = rtsFalse;
-         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
-      } else {
-         // Yield the capability to higher-priority tasks if necessary.
-         yieldCapability(&cap, task);
-         /* inside yieldCapability, attempts to steal work from other
-            capabilities, unless the capability has own work. 
-            See (REMARK) below.
-         */
-      }
-#endif
-
-    /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
-      
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
@@ -367,62 +352,11 @@ schedule (Capability *initialCapability, Task *task)
        barf("sched_state: %d", sched_state);
     }
 
-    /* this was the place to activate a spark, now below... */
-
-    scheduleStartSignalHandlers(cap);
+    scheduleFindWork(cap);
 
-    // Only check the black holes here if we've nothing else to do.
-    // During normal execution, the black hole list only gets checked
-    // at GC time, to avoid repeatedly traversing this possibly long
-    // list each time around the scheduler.
-    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
-    scheduleCheckWakeupThreads(cap);
-
-    scheduleCheckBlockedThreads(cap);
-
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-    /* work distribution in multithreaded and parallel systems 
-
-       REMARK: IMHO best location for work-stealing as well.
-       tests above might yield some new jobs, so no need to steal a
-       spark in some cases. I believe the yieldCapability.. above
-       should be moved here.
-    */
-
-#if defined(PARALLEL_HASKELL)
-    /* if messages have been buffered... a NOOP in THREADED_RTS */
-    scheduleSendPendingMessages();
-#endif
-
-    /* If the run queue is empty,...*/
-    if (emptyRunQueue(cap)) {
-      /* ...take one of our own sparks and turn it into a thread */
-      scheduleActivateSpark(cap);
-
-       /* if this did not work, try to steal a spark from someone else */
-      if (emptyRunQueue(cap)) {
-#if defined(PARALLEL_HASKELL)
-       receivedFinish = scheduleGetRemoteWork(cap);
-       continue; //  a new round, (hopefully) with new work
-       /* 
-          in GUM, this a) sends out a FISH and returns IF no fish is
-                          out already
-                       b) (blocking) awaits and receives messages
-          
-          in Eden, this is only the blocking receive, as b) in GUM.
-
-          in Threaded-RTS, this does plain nothing. Stealing routine
-               is inside Capability.c and called from
-               yieldCapability() at the very beginning, see REMARK.
-       */
-#endif
-      }
-    } else { /* i.e. run queue was (initially) not empty */
-      schedulePushWork(cap,task);
-      /* work pushing, currently relevant only for THREADED_RTS:
-        (pushes threads, wakes up idle capabilities for stealing) */
-    }
+    /* work pushing, currently relevant only for THREADED_RTS:
+       (pushes threads, wakes up idle capabilities for stealing) */
+    schedulePushWork(cap,task);
 
 #if defined(PARALLEL_HASKELL)
     /* since we perform a blocking receive and continue otherwise,
@@ -439,9 +373,8 @@ schedule (Capability *initialCapability, Task *task)
     }
 #endif // PARALLEL_HASKELL: non-empty run queue!
 
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
-
     scheduleDetectDeadlock(cap,task);
+
 #if defined(THREADED_RTS)
     cap = task->cap;    // reload cap, it might have changed
 #endif
@@ -454,12 +387,27 @@ schedule (Capability *initialCapability, Task *task)
     //
     // win32: might be here due to awaitEvent() being abandoned
     // as a result of a console event having been delivered.
-    if ( emptyRunQueue(cap) ) {
+    
+#if defined(THREADED_RTS)
+    if (first) 
+    {
+    // XXX: ToDo
+    //     // don't yield the first time, we want a chance to run this
+    //     // thread for a bit, even if there are others banging at the
+    //     // door.
+    //     first = rtsFalse;
+    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    }
+
+    scheduleYield(&cap,task);
+    if (emptyRunQueue(cap)) continue; // look for work again
+#endif
+
 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+    if ( emptyRunQueue(cap) ) {
        ASSERT(sched_state >= SCHED_INTERRUPTING);
-#endif
-       continue; // nothing to do
     }
+#endif
 
     // 
     // Get a thread to run
@@ -683,12 +631,110 @@ schedulePreLoop(void)
 }
 
 /* -----------------------------------------------------------------------------
+ * scheduleFindWork()
+ *
+ * Search for work to do, and handle messages from elsewhere.
+ * -------------------------------------------------------------------------- */
+
+static void
+scheduleFindWork (Capability *cap)
+{
+    scheduleStartSignalHandlers(cap);
+
+    // Only check the black holes here if we've nothing else to do.
+    // During normal execution, the black hole list only gets checked
+    // at GC time, to avoid repeatedly traversing this possibly long
+    // list each time around the scheduler.
+    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+
+    scheduleCheckWakeupThreads(cap);
+
+    scheduleCheckBlockedThreads(cap);
+
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+    // Try to activate one of our own sparks
+    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+#endif
+
+#if defined(THREADED_RTS)
+    // Try to steak work if we don't have any
+    if (emptyRunQueue(cap)) { stealWork(cap); }
+#endif
+    
+#if defined(PARALLEL_HASKELL)
+    // if messages have been buffered...
+    scheduleSendPendingMessages();
+#endif
+
+#if defined(PARALLEL_HASKELL)
+    if (emptyRunQueue(cap)) {
+       receivedFinish = scheduleGetRemoteWork(cap);
+       continue; //  a new round, (hopefully) with new work
+       /* 
+          in GUM, this a) sends out a FISH and returns IF no fish is
+                          out already
+                       b) (blocking) awaits and receives messages
+          
+          in Eden, this is only the blocking receive, as b) in GUM.
+       */
+    }
+#endif
+}
+
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+shouldYieldCapability (Capability *cap, Task *task)
+{
+    // we need to yield this capability to someone else if..
+    //   - another thread is initiating a GC
+    //   - another Task is returning from a foreign call
+    //   - the thread at the head of the run queue cannot be run
+    //     by this Task (it is bound to another Task, or it is unbound
+    //     and this task it bound).
+    return (waiting_for_gc || 
+            cap->returning_tasks_hd != NULL ||
+            (!emptyRunQueue(cap) && (task->tso == NULL
+                                     ? cap->run_queue_hd->bound != NULL
+                                     : cap->run_queue_hd->bound != task)));
+}
+
+// This is the single place where a Task goes to sleep.  There are
+// two reasons it might need to sleep:
+//    - there are no threads to run
+//    - we need to yield this Capability to someone else 
+//      (see shouldYieldCapability())
+//
+// The return value indicates whether 
+
+static void
+scheduleYield (Capability **pcap, Task *task)
+{
+    Capability *cap = *pcap;
+
+    // if we have work, and we don't need to give up the Capability, continue.
+    if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
+        return;
+
+    // otherwise yield (sleep), and keep yielding if necessary.
+    do {
+        yieldCapability(&cap,task);
+    } 
+    while (shouldYieldCapability(cap,task));
+
+    // note there may still be no threads on the run queue at this
+    // point, the caller has to check.
+
+    *pcap = cap;
+    return;
+}
+#endif
+    
+/* -----------------------------------------------------------------------------
  * schedulePushWork()
  *
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 static void
 schedulePushWork(Capability *cap USED_IF_THREADS, 
                 Task *task      USED_IF_THREADS)
@@ -788,7 +834,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
            // i is the next free capability to push to
            for (; i < n_free_caps; i++) {
                if (emptySparkPoolCap(free_caps[i])) {
-                   spark = findSpark(cap);
+                   spark = tryStealSpark(cap->sparks);
                    if (spark != NULL) {
                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
                        newSpark(&(free_caps[i]->r), spark);
@@ -801,18 +847,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
-           releaseCapability(free_caps[i]);
+           releaseAndWakeupCapability(free_caps[i]);
        }
-       // now wake them all up, and they might steal sparks if
-       // the did not get a thread
-       prodAllCapabilities();
     }
     task->cap = cap; // reset to point to our Capability.
 
 #endif /* THREADED_RTS */
 
 }
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
 
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
@@ -1031,7 +1073,12 @@ scheduleActivateSpark(Capability *cap)
         on our run queue in the meantime ? But would need a lock.. */
       return;
 
-    spark = findSpark(cap); // defined in Sparks.c
+    // Really we should be using reclaimSpark() here, but
+    // experimentally it doesn't seem to perform as well as just
+    // stealing from our own spark pool:
+    // spark = reclaimSpark(cap->sparks);
+    spark = tryStealSpark(cap->sparks); // defined in Sparks.c
 
     if (spark != NULL) {
       debugTrace(DEBUG_sched,
@@ -1046,9 +1093,9 @@ scheduleActivateSpark(Capability *cap)
  * Get work from a remote node (PARALLEL_HASKELL only)
  * ------------------------------------------------------------------------- */
     
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(PARALLEL_HASKELL)
 static rtsBool /* return value used in PARALLEL_HASKELL only */
-scheduleGetRemoteWork(Capability *cap)
+scheduleGetRemoteWork (Capability *cap STG_UNUSED)
 {
 #if defined(PARALLEL_HASKELL)
   rtsBool receivedFinish = rtsFalse;
@@ -1800,7 +1847,7 @@ suspendThread (StgRegTable *reg)
 
   suspendTask(cap,task);
   cap->in_haskell = rtsFalse;
-  releaseCapability_(cap);
+  releaseCapability_(cap,rtsFalse);
   
   RELEASE_LOCK(&cap->lock);
 
index ac11172..360ea41 100644 (file)
@@ -53,9 +53,9 @@
 
 /* internal helpers ... */
 
-StgWord roundUp2(StgWord val);
-
-StgWord roundUp2(StgWord val) {
+static StgWord
+roundUp2(StgWord val)
+{
   StgWord rounded = 1;
 
   /* StgWord is unsigned anyway, only catch 0 */
@@ -69,25 +69,6 @@ StgWord roundUp2(StgWord val) {
   return rounded;
 }
 
-INLINE_HEADER
-rtsBool casTop(StgPtr addr, StgWord old, StgWord new);
-
-#if !defined(THREADED_RTS)
-/* missing def. in non THREADED RTS, and makes no sense anyway... */
-StgWord cas(StgPtr addr,StgWord old,StgWord new);
-StgWord cas(StgPtr addr,StgWord old,StgWord new) { 
-  barf("cas: not implemented without multithreading");
-  old = new = *addr; /* to avoid gcc warnings */
-}
-#endif
-
-INLINE_HEADER
-rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
-  StgWord res = cas((StgPtr) addr, old, new);
-  return ((res == old));
-}
-
-/* or simply like this */
 #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
 
 /* -----------------------------------------------------------------------------
@@ -97,8 +78,9 @@ rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
  * -------------------------------------------------------------------------- */
 
 /* constructor */
-SparkPool* initPool(StgWord size) {
-
+static SparkPool*
+initPool(StgWord size)
+{
   StgWord realsize; 
   SparkPool *q;
 
@@ -136,14 +118,17 @@ initSparkPools( void )
 }
 
 void
-freeSparkPool(SparkPool *pool) {
+freeSparkPool (SparkPool *pool)
+{
   /* should not interfere with concurrent findSpark() calls! And
      nobody should use the pointer any more. We cross our fingers...*/
   stgFree(pool->elements);
   stgFree(pool);
 }
 
-/* reclaimSpark(cap): remove a spark from the write end of the queue.
+/* -----------------------------------------------------------------------------
+ * 
+ * reclaimSpark: remove a spark from the write end of the queue.
  * Returns the removed spark, and NULL if a race is lost or the pool
  * empty.
  *
@@ -151,9 +136,12 @@ freeSparkPool(SparkPool *pool) {
  * concurrently stealing threads by using cas to modify the top field.
  * This routine should NEVER be called by a task which does not own
  * the capability. Can this be checked here?
- */
-StgClosure* reclaimSpark(Capability *cap) {
-  SparkPool *deque = cap->sparks;
+ *
+ * -------------------------------------------------------------------------- */
+
+StgClosure *
+reclaimSpark (SparkPool *deque)
+{
   /* also a bit tricky, has to avoid concurrent steal() calls by
      accessing top with cas, when there is only one element left */
   StgWord t, b;
@@ -196,19 +184,17 @@ StgClosure* reclaimSpark(Capability *cap) {
 
 /* -----------------------------------------------------------------------------
  * 
- * findSpark: find a spark on the current Capability that we can fork
- * into a thread.
+ * tryStealSpark: try to steal a spark from a Capability.
  *
- * May be called by concurrent threads, which synchronise on top
- * variable. Returns a spark, or NULL if pool empty or race lost.
+ * Returns a valid spark, or NULL if the pool was empty, and can
+ * occasionally return NULL if there was a race with another thread
+ * stealing from the same pool.  In this case, try again later.
  *
  -------------------------------------------------------------------------- */
 
-StgClosurePtr steal(SparkPool *deque);
-
-/* steal an element from the read end. Synchronises multiple callers
-   by failing with NULL return. Returns NULL when deque is empty. */
-StgClosurePtr steal(SparkPool *deque) {
+static StgClosurePtr
+steal(SparkPool *deque)
+{
   StgClosurePtr* pos;
   StgClosurePtr* arraybase;
   StgWord sz;
@@ -231,43 +217,39 @@ StgClosurePtr steal(SparkPool *deque) {
 
   /* now decide whether we have won */
   if ( !(CASTOP(&(deque->top),t,t+1)) ) {
-    /* lost the race, someon else has changed top in the meantime */
-    stolen = NULL;    
+      /* lost the race, someon else has changed top in the meantime */
+      return NULL;
   }  /* else: OK, top has been incremented by the cas call */
 
-
   ASSERT_SPARK_POOL_INVARIANTS(deque); 
-  /* return NULL or stolen element */
+  /* return stolen element */
   return stolen;
 }
 
 StgClosure *
-findSpark (Capability *cap)
+tryStealSpark (SparkPool *pool)
 {
-  SparkPool *deque = (cap->sparks);
   StgClosure *stolen;
 
-  ASSERT_SPARK_POOL_INVARIANTS(deque); 
-
   do { 
-    /* keep trying until good spark found or pool looks empty. 
-       TODO is this a good idea? */
-
-    stolen = steal(deque);
-    
-  } while ( ( !stolen /* nothing stolen*/
-             || !closure_SHOULD_SPARK(stolen)) /* spark not OK */
-           && !looksEmpty(deque)); /* run empty, give up */
+      stolen = steal(pool);
+  } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
 
-  /* return stolen element */
   return stolen;
 }
 
 
-/* "guesses" whether a deque is empty. Can return false negatives in
-   presence of concurrent steal() calls, and false positives in
-   presence of a concurrent pushBottom().*/
-rtsBool looksEmpty(SparkPool* deque) {
+/* -----------------------------------------------------------------------------
+ * 
+ * "guesses" whether a deque is empty. Can return false negatives in
+ *  presence of concurrent steal() calls, and false positives in
+ *  presence of a concurrent pushBottom().
+ *
+ * -------------------------------------------------------------------------- */
+
+rtsBool
+looksEmpty(SparkPool* deque)
+{
   StgWord t = deque->top;
   StgWord b = deque->bottom;
   /* try to prefer false negatives by reading top first */
@@ -288,6 +270,7 @@ createSparkThread (Capability *cap, StgClosure *p)
 
     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
     appendToRunQueue(cap,tso);
+    cap->sparks_converted++;
 }
 
 /* -----------------------------------------------------------------------------
@@ -297,11 +280,12 @@ createSparkThread (Capability *cap, StgClosure *p)
  * -------------------------------------------------------------------------- */
 
 #define DISCARD_NEW
-void pushBottom(SparkPool* deque, StgClosurePtr elem);
 
 /* enqueue an element. Should always succeed by resizing the array
    (not implemented yet, silently fails in that case). */
-void pushBottom(SparkPool* deque, StgClosurePtr elem) {
+static void
+pushBottom (SparkPool* deque, StgClosurePtr elem)
+{
   StgWord t;
   StgClosurePtr* pos;
   StgWord sz = deque->moduloSize; 
@@ -349,12 +333,16 @@ void pushBottom(SparkPool* deque, StgClosurePtr elem) {
 }
 
 
-/* this is called as a direct C-call from Stg => we need to keep the
-   pool in a register (???) */
+/* --------------------------------------------------------------------------
+ * newSpark: create a new spark, as a result of calling "par"
+ * Called directly from STG.
+ * -------------------------------------------------------------------------- */
+
 StgInt
 newSpark (StgRegTable *reg, StgClosure *p)
 {
-    SparkPool *pool = (reg->rCurrentTSO->cap->sparks);
+    Capability *cap = regTableToCapability(reg);
+    SparkPool *pool = cap->sparks;
 
     /* I am not sure whether this is the right thing to do.
      * Maybe it is better to exploit the tag information
@@ -368,6 +356,8 @@ newSpark (StgRegTable *reg, StgClosure *p)
       pushBottom(pool,p);
     }  
 
+    cap->sparks_created++;
+
     ASSERT_SPARK_POOL_INVARIANTS(pool);
     return 1;
 }
@@ -385,7 +375,7 @@ static void
 pruneSparkQueue (Capability *cap)
 { 
     SparkPool *pool;
-    StgClosurePtr spark, evacspark, *elements;
+    StgClosurePtr spark, *elements;
     nat n, pruned_sparks; // stats only
     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
     
@@ -457,6 +447,7 @@ pruneSparkQueue (Capability *cap)
        n++;
       } else { 
        pruned_sparks++; // discard spark
+        cap->sparks_pruned++;
       }
       currInd++;
 
@@ -528,7 +519,6 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
 }
 
 /* ----------------------------------------------------------------------------
-
  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
  * capabilities) and its size. Accesses all spark pools and equally
  * distributes the sparks among them.
@@ -537,7 +527,8 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
  * -------------------------------------------------------------------------- */
 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
 
-void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) {
+void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
+                           Capability caps[] STG_UNUSED) {
   barf("not implemented");
 }
 
index dbbf268..4062a0b 100644 (file)
 
 #if defined(THREADED_RTS)
 
+/* Spark pools: used to store pending sparks 
+ *  (THREADED_RTS & PARALLEL_HASKELL only)
+ * Implementation uses a DeQue to enable concurrent read accesses at
+ * the top end.
+ */
+typedef struct  SparkPool_ {
+  /* Size of elements array. Used for modulo calculation: we round up
+     to powers of 2 and use the dyadic log (modulo == bitwise &) */
+  StgWord size; 
+  StgWord moduloSize; /* bitmask for modulo */
+
+  /* top, index where multiple readers steal() (protected by a cas) */
+  volatile StgWord top;
+
+  /* bottom, index of next free place where one writer can push
+     elements. This happens unsynchronised. */
+  volatile StgWord bottom;
+  /* both position indices are continuously incremented, and used as
+     an index modulo the current array size. */
+  
+  /* lower bound on the current top value. This is an internal
+     optimisation to avoid unnecessarily accessing the top field
+     inside pushBottom */
+  volatile StgWord topBound;
+
+  /* The elements array */
+  StgClosurePtr* elements;
+  /*  Please note: the dataspace cannot follow the admin fields
+      immediately, as it should be possible to enlarge it without
+      disposing the old one automatically (as realloc would)! */
+
+} SparkPool;
+
+
 /* INVARIANTS, in this order: bottom/top consistent, reasonable size,
    topBound consistent, space pointer, space accessible to us */
 #define ASSERT_SPARK_POOL_INVARIANTS(p)         \
   ASSERT(*((p)->elements) || 1);                \
   ASSERT(*((p)->elements - 1  + ((p)->size)) || 1);
 
-// missing in old interface. Currently called by initSparkPools
-// internally.
-SparkPool* initPool(StgWord size);
+// Initialisation
+void initSparkPools (void);
 
-// special case: accessing our own pool, at the write end
-// otherwise, we can always steal from our pool as the others do...
-StgClosure* reclaimSpark(Capability *cap);
+// Take a spark from the "write" end of the pool.  Can be called
+// by the pool owner only.
+StgClosure* reclaimSpark(SparkPool *pool);
 
+// Returns True if the spark pool is empty (can give a false positive
+// if the pool is almost empty).
 rtsBool looksEmpty(SparkPool* deque);
 
-// rest: same as old interface
-StgClosure * findSpark         (Capability *cap);
-void         initSparkPools    (void);
+StgClosure * tryStealSpark     (SparkPool *pool);
 void         freeSparkPool     (SparkPool *pool);
 void         createSparkThread (Capability *cap, StgClosure *p);
 void         pruneSparkQueues  (void);
 void         traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
 
-INLINE_HEADER void     discardSparks  (SparkPool *pool);
-INLINE_HEADER nat      sparkPoolSize  (SparkPool *pool);
-
-INLINE_HEADER void     discardSparksCap  (Capability *cap);
-INLINE_HEADER nat      sparkPoolSizeCap  (Capability *cap);
-INLINE_HEADER rtsBool  emptySparkPoolCap (Capability *cap);
+INLINE_HEADER void discardSparks  (SparkPool *pool);
+INLINE_HEADER nat  sparkPoolSize  (SparkPool *pool);
 #endif
 
 /* -----------------------------------------------------------------------------
@@ -64,30 +93,16 @@ INLINE_HEADER rtsBool
 emptySparkPool (SparkPool *pool) 
 { return looksEmpty(pool); }
 
-INLINE_HEADER rtsBool
-emptySparkPoolCap (Capability *cap) 
-{ return looksEmpty(cap->sparks); }
-
 INLINE_HEADER nat
 sparkPoolSize (SparkPool *pool) 
-{
-  return (pool->bottom - pool->top);
-}
-
-INLINE_HEADER nat
-sparkPoolSizeCap (Capability *cap) 
-{ return sparkPoolSize(cap->sparks); }
+{ return (pool->bottom - pool->top); }
 
 INLINE_HEADER void
 discardSparks (SparkPool *pool)
 {
-    pool->top = pool->bottom = 0;
+    pool->top = pool->topBound = pool->bottom = 0;
 }
 
-INLINE_HEADER void
-discardSparksCap (Capability *cap) 
-{ return discardSparks(cap->sparks); }
-
 #endif
 
 #endif /* SPARKS_H */
index a2c47d7..94a756a 100644 (file)
@@ -6,9 +6,6 @@
  *
  * ---------------------------------------------------------------------------*/
 
-// Make static versions of inline functions in Stable.h:
-#define RTS_STABLE_C
-
 #include "PosixSource.h"
 #include "Rts.h"
 #include "Hash.h"
index 2e15613..228f0c0 100644 (file)
@@ -641,6 +641,21 @@ stat_exit(int alloc)
                                TICK_TO_DBL(task->gc_etime));
                }
            }
+
+            {
+                nat i;
+                lnat sparks_created   = 0;
+                lnat sparks_converted = 0;
+                lnat sparks_pruned    = 0;
+                for (i = 0; i < n_capabilities; i++) {
+                    sparks_created   += capabilities[i].sparks_created;
+                    sparks_converted += capabilities[i].sparks_converted;
+                    sparks_pruned    += capabilities[i].sparks_pruned;
+                }
+
+                statsPrintf("  SPARKS: %ld (%ld converted, %ld pruned)\n\n",
+                            sparks_created, sparks_converted, sparks_pruned);
+            }
 #endif
 
            statsPrintf("  INIT  time  %6.2fs  (%6.2fs elapsed)\n",