RTS tidyup sweep, first phase
[ghc-hetmet.git] / rts / Capability.c
index a6a0f0a..ddaba69 100644 (file)
 
 #include "PosixSource.h"
 #include "Rts.h"
-#include "RtsUtils.h"
-#include "RtsFlags.h"
-#include "STM.h"
-#include "OSThreads.h"
+
 #include "Capability.h"
 #include "Schedule.h"
 #include "Sparks.h"
 #include "Trace.h"
+#include "sm/GC.h" // for gcWorkerThread()
+#include "STM.h"
+#include "RtsUtils.h"
 
 // one global capability, this is the Capability for non-threaded
 // builds, and for +RTS -N1
 Capability MainCapability;
 
-nat n_capabilities;
+nat n_capabilities = 0;
 Capability *capabilities = NULL;
 
 // Holds the Capability which last became free.  This is used so that
 // an in-call has a chance of quickly finding a free Capability.
 // Maintaining a global free list of Capabilities would require global
 // locking, so we don't do that.
-Capability *last_free_capability;
+Capability *last_free_capability = NULL;
 
 /* GC indicator, in scope for the scheduler, init'ed to false */
 volatile StgWord waiting_for_gc = 0;
 
+/* Let foreign code get the current Capability -- assuming there is one!
+ * This is useful for unsafe foreign calls because they are called with
+ * the current Capability held, but they are not passed it. For example,
+ * see see the integer-gmp package which calls allocateLocal() in its
+ * stgAllocForGMP() function (which gets called by gmp functions).
+ * */
+Capability * rts_unsafeGetMyCapability (void)
+{
+#if defined(THREADED_RTS)
+  return myTask()->cap;
+#else
+  return &MainCapability;
+#endif
+}
+
 #if defined(THREADED_RTS)
 STATIC_INLINE rtsBool
 globalWorkToDo (void)
@@ -54,25 +69,43 @@ globalWorkToDo (void)
 #endif
 
 #if defined(THREADED_RTS)
-rtsBool
-stealWork (Capability *cap)
+StgClosure *
+findSpark (Capability *cap)
 {
-  /* use the normal Sparks.h interface (internally modified to enable
-     concurrent stealing) 
-     and immediately turn the spark into a thread when successful
-  */
   Capability *robbed;
   StgClosurePtr spark;
-  rtsBool success = rtsFalse;
   rtsBool retry;
   nat i = 0;
 
+  if (!emptyRunQueue(cap)) {
+      // If there are other threads, don't try to run any new
+      // sparks: sparks might be speculative, we don't want to take
+      // resources away from the main computation.
+      return 0;
+  }
+
+  // first try to get a spark from our own pool.
+  // We should be using reclaimSpark(), because it works without
+  // needing any atomic instructions:
+  //   spark = reclaimSpark(cap->sparks);
+  // However, measurements show that this makes at least one benchmark
+  // slower (prsa) and doesn't affect the others.
+  spark = tryStealSpark(cap);
+  if (spark != NULL) {
+      cap->sparks_converted++;
+
+      // Post event for running a spark from capability's own pool.
+      postEvent(cap, EVENT_RUN_SPARK, cap->r.rCurrentTSO->id, 0);
+
+      return spark;
+  }
+
+  if (n_capabilities == 1) { return NULL; } // makes no sense...
+
   debugTrace(DEBUG_sched,
             "cap %d: Trying to steal work from other capabilities", 
             cap->no);
 
-  if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
-
   do {
       retry = rtsFalse;
 
@@ -86,7 +119,7 @@ stealWork (Capability *cap)
           if (emptySparkPoolCap(robbed)) // nothing to steal here
               continue;
 
-          spark = tryStealSpark(robbed->sparks);
+          spark = tryStealSpark(robbed);
           if (spark == NULL && !emptySparkPoolCap(robbed)) {
               // we conflicted with another thread while trying to steal;
               // try again later.
@@ -97,16 +130,37 @@ stealWork (Capability *cap)
               debugTrace(DEBUG_sched,
                 "cap %d: Stole a spark from capability %d",
                          cap->no, robbed->no);
+              cap->sparks_converted++;
 
-              createSparkThread(cap,spark);
-              return rtsTrue;
+              postEvent(cap, EVENT_STEAL_SPARK, 
+                        cap->r.rCurrentTSO->id, robbed->no);
+                        
+              
+              return spark;
           }
           // otherwise: no success, try next one
       }
   } while (retry);
 
   debugTrace(DEBUG_sched, "No sparks stolen");
-  return rtsFalse;
+  return NULL;
+}
+
+// Returns True if any spark pool is non-empty at this moment in time
+// The result is only valid for an instant, of course, so in a sense
+// is immediately invalid, and should not be relied upon for
+// correctness.
+rtsBool
+anySparks (void)
+{
+    nat i;
+
+    for (i=0; i < n_capabilities; i++) {
+        if (!emptySparkPoolCap(&capabilities[i])) {
+            return rtsTrue;
+        }
+    }
+    return rtsFalse;
 }
 #endif
 
@@ -160,6 +214,7 @@ initCapability( Capability *cap, nat i )
 
     cap->no = i;
     cap->in_haskell        = rtsFalse;
+    cap->in_gc             = rtsFalse;
 
     cap->run_queue_hd      = END_TSO_QUEUE;
     cap->run_queue_tl      = END_TSO_QUEUE;
@@ -178,12 +233,16 @@ initCapability( Capability *cap, nat i )
     cap->sparks_pruned      = 0;
 #endif
 
-    cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
-    cap->f.stgGCFun        = (F_)__stg_gc_fun;
+    cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info;
+    cap->f.stgGCEnter1     = (StgFunPtr)__stg_gc_enter_1;
+    cap->f.stgGCFun        = (StgFunPtr)__stg_gc_fun;
 
     cap->mut_lists  = stgMallocBytes(sizeof(bdescr *) *
                                     RtsFlags.GcFlags.generations,
                                     "initCapability");
+    cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) *
+                                          RtsFlags.GcFlags.generations,
+                                          "initCapability");
 
     for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
        cap->mut_lists[g] = NULL;
@@ -258,10 +317,10 @@ initCapabilities( void )
 
 void setContextSwitches(void)
 {
-  nat i;
-  for (i=0; i < n_capabilities; i++) {
-    capabilities[i].context_switch = 1;
-  }
+    nat i;
+    for (i=0; i < n_capabilities; i++) {
+        contextSwitchCapability(&capabilities[i]);
+    }
 }
 
 /* ----------------------------------------------------------------------------
@@ -284,10 +343,9 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
 {
     ASSERT_LOCK_HELD(&cap->lock);
     ASSERT(task->cap == cap);
-    trace(TRACE_sched | DEBUG_sched,
-         "passing capability %d to %s %p",
-         cap->no, task->tso ? "bound task" : "worker",
-         (void *)task->id);
+    debugTrace(DEBUG_sched, "passing capability %d to %s %p",
+               cap->no, task->tso ? "bound task" : "worker",
+               (void *)task->id);
     ACQUIRE_LOCK(&task->lock);
     task->wakeup = rtsTrue;
     // the wakeup flag is needed because signalCondition() doesn't
@@ -327,17 +385,9 @@ releaseCapability_ (Capability* cap,
        return;
     }
 
-    /* if waiting_for_gc was the reason to release the cap: thread
-       comes from yieldCap->releaseAndQueueWorker. Unconditionally set
-       cap. free and return (see default after the if-protected other
-       special cases). Thread will wait on cond.var and re-acquire the
-       same cap after GC (GC-triggering cap. calls releaseCap and
-       enters the spare_workers case)
-    */
-    if (waiting_for_gc) {
+    if (waiting_for_gc == PENDING_GC_SEQ) {
       last_free_capability = cap; // needed?
-      trace(TRACE_sched | DEBUG_sched, 
-           "GC pending, set capability %d free", cap->no);
+      debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
       return;
     } 
 
@@ -378,7 +428,7 @@ releaseCapability_ (Capability* cap,
     }
 
     last_free_capability = cap;
-    trace(TRACE_sched | DEBUG_sched, "freeing capability %d", cap->no);
+    debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
 }
 
 void
@@ -453,14 +503,17 @@ waitForReturnCapability (Capability **pCap, Task *task)
        if (!cap->running_task) {
            nat i;
            // otherwise, search for a free capability
+            cap = NULL;
            for (i = 0; i < n_capabilities; i++) {
-               cap = &capabilities[i];
-               if (!cap->running_task) {
+               if (!capabilities[i].running_task) {
+                    cap = &capabilities[i];
                    break;
                }
            }
-           // Can't find a free one, use last_free_capability.
-           cap = last_free_capability;
+            if (cap == NULL) {
+                // Can't find a free one, use last_free_capability.
+                cap = last_free_capability;
+            }
        }
 
        // record the Capability as the one this Task is now assocated with.
@@ -510,7 +563,7 @@ waitForReturnCapability (Capability **pCap, Task *task)
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
-    trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
+    debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
 
     *pCap = cap;
 #endif
@@ -526,6 +579,14 @@ yieldCapability (Capability** pCap, Task *task)
 {
     Capability *cap = *pCap;
 
+    if (waiting_for_gc == PENDING_GC_PAR) {
+       debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
+        postEvent(cap, EVENT_GC_START, 0, 0);
+        gcWorkerThread(cap);
+        postEvent(cap, EVENT_GC_END, 0, 0);
+        return;
+    }
+
        debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
 
        // We must now release the capability and wait to be woken up
@@ -568,7 +629,7 @@ yieldCapability (Capability** pCap, Task *task)
            break;
        }
 
-       trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
+       debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
        ASSERT(cap->running_task == task);
 
     *pCap = cap;
@@ -610,7 +671,6 @@ wakeupThreadOnCapability (Capability *my_cap,
 
        appendToRunQueue(other_cap,tso);
 
-       trace(TRACE_sched, "resuming capability %d", other_cap->no);
        releaseCapability_(other_cap,rtsFalse);
     } else {
        appendToWakeupQueue(my_cap,other_cap,tso);
@@ -624,58 +684,21 @@ wakeupThreadOnCapability (Capability *my_cap,
 }
 
 /* ----------------------------------------------------------------------------
- * prodCapabilities
+ * prodCapability
  *
- * Used to indicate that the interrupted flag is now set, or some
- * other global condition that might require waking up a Task on each
- * Capability.
- * ------------------------------------------------------------------------- */
-
-static void
-prodCapabilities(rtsBool all)
-{
-    nat i;
-    Capability *cap;
-    Task *task;
-
-    for (i=0; i < n_capabilities; i++) {
-       cap = &capabilities[i];
-       ACQUIRE_LOCK(&cap->lock);
-       if (!cap->running_task) {
-           if (cap->spare_workers) {
-               trace(TRACE_sched, "resuming capability %d", cap->no);
-               task = cap->spare_workers;
-               ASSERT(!task->stopped);
-               giveCapabilityToTask(cap,task);
-               if (!all) {
-                   RELEASE_LOCK(&cap->lock);
-                   return;
-               }
-           }
-       }
-       RELEASE_LOCK(&cap->lock);
-    }
-    return;
-}
-
-void
-prodAllCapabilities (void)
-{
-    prodCapabilities(rtsTrue);
-}
-
-/* ----------------------------------------------------------------------------
- * prodOneCapability
- *
- * Like prodAllCapabilities, but we only require a single Task to wake
- * up in order to service some global event, such as checking for
- * deadlock after some idle time has passed.
+ * If a Capability is currently idle, wake up a Task on it.  Used to 
+ * get every Capability into the GC.
  * ------------------------------------------------------------------------- */
 
 void
-prodOneCapability (void)
+prodCapability (Capability *cap, Task *task)
 {
-    prodCapabilities(rtsFalse);
+    ACQUIRE_LOCK(&cap->lock);
+    if (!cap->running_task) {
+        cap->running_task = task;
+        releaseCapability_(cap,rtsTrue);
+    }
+    RELEASE_LOCK(&cap->lock);
 }
 
 /* ----------------------------------------------------------------------------
@@ -698,8 +721,6 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
 {
     nat i;
 
-    ASSERT(sched_state == SCHED_SHUTTING_DOWN);
-
     task->cap = cap;
 
     // Loop indefinitely until all the workers have exited and there
@@ -709,6 +730,8 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
     // isn't safe, for one thing).
 
     for (i = 0; /* i < 50 */; i++) {
+        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
+
        debugTrace(DEBUG_sched, 
                   "shutting down capability %d, attempt %d", cap->no, i);
        ACQUIRE_LOCK(&cap->lock);
@@ -767,8 +790,8 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
             continue;
         }
             
+        postEvent(cap, EVENT_SHUTDOWN, 0, 0);
        debugTrace(DEBUG_sched, "capability %d is stopped.", cap->no);
-        freeCapability(cap);
        RELEASE_LOCK(&cap->lock);
        break;
     }
@@ -806,14 +829,28 @@ tryGrabCapability (Capability *cap, Task *task)
 
 #endif /* THREADED_RTS */
 
-void
-freeCapability (Capability *cap) {
+static void
+freeCapability (Capability *cap)
+{
     stgFree(cap->mut_lists);
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+#if defined(THREADED_RTS)
     freeSparkPool(cap->sparks);
 #endif
 }
 
+void
+freeCapabilities (void)
+{
+#if defined(THREADED_RTS)
+    nat i;
+    for (i=0; i < n_capabilities; i++) {
+        freeCapability(&capabilities[i]);
+    }
+#else
+    freeCapability(&MainCapability);
+#endif
+}
+
 /* ---------------------------------------------------------------------------
    Mark everything directly reachable from the Capabilities.  When
    using multiple GC threads, each GC thread marks all Capabilities