Merge the smp and threaded RTS ways
[ghc-hetmet.git] / ghc / rts / Sparks.c
index 6e638d4..98d7102 100644 (file)
@@ -1,24 +1,11 @@
 /* ---------------------------------------------------------------------------
  *
- * (c) The GHC Team, 2000
+ * (c) The GHC Team, 2000-2006
  *
- * Sparking support for PAR and SMP versions of the RTS.
+ * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
  *
  * -------------------------------------------------------------------------*/
 
-//@node Spark Management Routines, , ,
-//@section Spark Management Routines
-
-//@menu
-//* Includes::                 
-//* GUM code::                 
-//* GranSim code::             
-//@end menu
-//*/
-
-//@node Includes, GUM code, Spark Management Routines, Spark Management Routines
-//@subsection Includes
-
 #include "PosixSource.h"
 #include "Rts.h"
 #include "Schedule.h"
@@ -27,7 +14,7 @@
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "ParTicky.h"
-# if defined(PAR)
+# if defined(PARALLEL_HASKELL)
 # include "ParallelRts.h"
 # include "GranSimRts.h"   // for GR_...
 # elif defined(GRAN)
 # endif
 #include "Sparks.h"
 
-#if /*defined(SMP) ||*/ defined(PAR)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
 
-//@node GUM code, GranSim code, Includes, Spark Management Routines
-//@subsection GUM code
+static INLINE_ME void bump_hd (StgSparkPool *p)
+{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
 
-static void slide_spark_pool( StgSparkPool *pool );
+static INLINE_ME void bump_tl (StgSparkPool *p)
+{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
 
-void
-initSparkPools( void )
-{
-  Capability *cap;
-  StgSparkPool *pool;
+/* -----------------------------------------------------------------------------
+ * 
+ * Initialising spark pools.
+ *
+ * -------------------------------------------------------------------------- */
 
-#ifdef SMP
-  /* walk over the capabilities, allocating a spark pool for each one */
-  for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
-  /* allocate a single spark pool */
-  cap = &MainRegTable;
-  {
-#endif
-    pool = &(cap->rSparks);
-    
+static void 
+initSparkPool(StgSparkPool *pool)
+{
     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
-                                    * sizeof(StgClosure *),
-                                    "initSparkPools");
+                               * sizeof(StgClosure *),
+                               "initSparkPools");
     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
     pool->hd  = pool->base;
     pool->tl  = pool->base;
-  }
 }
 
-/* 
-   We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
-   spark. Rationale, we don't want to give away the only work a PE has.
-   ToDo: introduce low- and high-water-marks for load balancing.
-*/
-StgClosure *
-findSpark( rtsBool for_export )
+void
+initSparkPools( void )
 {
-  Capability *cap;
-  StgSparkPool *pool;
-  StgClosure *spark, *first=NULL;
-  rtsBool isIdlePE = EMPTY_RUN_QUEUE();
-
-#ifdef SMP
-  /* walk over the capabilities, allocating a spark pool for each one */
-  for (cap = free_capabilities; cap != NULL; cap = cap->link) {
+#ifdef THREADED_RTS
+    /* walk over the capabilities, allocating a spark pool for each one */
+    nat i;
+    for (i = 0; i < n_capabilities; i++) {
+       initSparkPool(&capabilities[i].r.rSparks);
+    }
 #else
-  /* allocate a single spark pool */
-  cap = &MainRegTable;
-  {
+    /* allocate a single spark pool */
+    initSparkPool(&MainCapability.r.rSparks);
 #endif
-    pool = &(cap->rSparks);
-    while (pool->hd < pool->tl) {
-      spark = *pool->hd++;
-      if (closure_SHOULD_SPARK(spark)) {
-       if (for_export && isIdlePE) {
-         if (first==NULL) {
-           first = spark; // keep the first usable spark if PE is idle
-         } else {
-           pool->hd--;    // found a second spark; keep it in the pool 
-           ASSERT(*pool->hd==spark);
+}
+
+/* -----------------------------------------------------------------------------
+ * 
+ * findSpark: find a spark on the current Capability that we can fork
+ * into a thread.
+ *
+ * -------------------------------------------------------------------------- */
+
+StgClosure *
+findSpark (Capability *cap)
+{
+    StgSparkPool *pool;
+    StgClosure *spark;
+    
+    pool = &(cap->r.rSparks);
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+    while (pool->hd != pool->tl) {
+       spark = *pool->hd;
+       bump_hd(pool);
+       if (closure_SHOULD_SPARK(spark)) {
+#ifdef GRAN
            if (RtsFlags.ParFlags.ParStats.Sparks) 
-             DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                              GR_STEALING, ((StgTSO *)NULL), first, 
-                              0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-           return first;  // and return the *first* spark found
-         }
-        } else {
-         if (RtsFlags.ParFlags.ParStats.Sparks && for_export) 
-           DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                            GR_STEALING, ((StgTSO *)NULL), spark, 
-                            0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-         return spark;    // return first spark found
+               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
+                                GR_STEALING, ((StgTSO *)NULL), spark, 
+                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+#endif
+           return spark;
        }
-      }
     }
-    slide_spark_pool(pool);
-  }
-  return NULL;
+    // spark pool is now empty
+    return NULL;
 }
 
-/* 
-   activateSpark is defined in Schedule.c
-*/
+/* -----------------------------------------------------------------------------
+ * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
+ * implicit slide i.e. after marking all sparks are at the beginning of the
+ * spark pool and the spark pool only contains sparkable closures 
+ * -------------------------------------------------------------------------- */
+
+void
+markSparkQueue (evac_fn evac)
+{ 
+    StgClosure **sparkp, **to_sparkp;
+    nat i, n, pruned_sparks; // stats only
+    StgSparkPool *pool;
+    Capability *cap;
+    
+    PAR_TICKY_MARK_SPARK_QUEUE_START();
+    
+    n = 0;
+    pruned_sparks = 0;
+    for (i = 0; i < n_capabilities; i++) {
+       cap = &capabilities[i];
+       pool = &(cap->r.rSparks);
+       
+       ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+#if defined(PARALLEL_HASKELL)
+       // stats only
+       n = 0;
+       pruned_sparks = 0;
+#endif
+       
+       sparkp = pool->hd;
+       to_sparkp = pool->hd;
+       while (sparkp != pool->tl) {
+           ASSERT(to_sparkp<=sparkp);
+           ASSERT(*sparkp!=NULL);
+           ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
+           // ToDo?: statistics gathering here (also for GUM!)
+           if (closure_SHOULD_SPARK(*sparkp)) {
+               evac(sparkp);
+               *to_sparkp++ = *sparkp;
+               n++;
+           } else {
+               pruned_sparks++;
+           }
+           sparkp++;
+           if (sparkp == pool->lim) {
+               sparkp = pool->base;
+           }
+       }
+       pool->tl = to_sparkp;
+       
+       PAR_TICKY_MARK_SPARK_QUEUE_END(n);
+       
+#if defined(PARALLEL_HASKELL)
+       IF_DEBUG(scheduler,
+                debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+                           n, pruned_sparks, mytid));
+#else
+       IF_DEBUG(scheduler,
+              debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n",
+                         n, pruned_sparks));
+#endif
+       
+       IF_DEBUG(scheduler,
+                debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)\n",
+                           sparkPoolSize(pool), pool->hd, pool->tl));
+       
+    }
+}
+
+/* -----------------------------------------------------------------------------
+ * 
+ * Turn a spark into a real thread
+ *
+ * -------------------------------------------------------------------------- */
+
+void
+createSparkThread (Capability *cap, StgClosure *p)
+{
+    StgTSO *tso;
+
+    tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
+    appendToRunQueue(cap,tso);
+}
+
+/* -----------------------------------------------------------------------------
+ * 
+ * Create a new spark
+ *
+ * -------------------------------------------------------------------------- */
+
+#define DISCARD_NEW
+
+StgInt
+newSpark (StgRegTable *reg, StgClosure *p)
+{
+    StgSparkPool *pool = &(reg->rSparks);
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+    if (closure_SHOULD_SPARK(p)) {
+#ifdef DISCARD_NEW
+       StgClosure **new_tl;
+       new_tl = pool->tl + 1;
+       if (new_tl == pool->lim) { new_tl = pool->base; }
+       if (new_tl != pool->hd) {
+           *pool->tl = p;
+           pool->tl = new_tl;
+       } else if (!closure_SHOULD_SPARK(*pool->hd)) {
+           // if the old closure is not sparkable, discard it and
+           // keep the new one.  Otherwise, keep the old one.
+           *pool->tl = p;
+           bump_hd(pool);
+       }
+#else  /* DISCARD OLD */
+       *pool->tl = p;
+       bump_tl(pool);
+       if (pool->tl == pool->hd) { bump_hd(pool); }
+#endif
+    }  
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    return 1;
+}
+
+#else
+
+StgInt
+newSpark (StgRegTable *reg, StgClosure *p)
+{
+    /* nothing */
+    return 1;
+}
+
+#endif /* PARALLEL_HASKELL || THREADED_RTS */
+
+
+/* -----------------------------------------------------------------------------
+ * 
+ * GRAN & PARALLEL_HASKELL stuff beyond here.
+ *
+ * -------------------------------------------------------------------------- */
+
+#if defined(PARALLEL_HASKELL) || defined(GRAN)
+
+static void slide_spark_pool( StgSparkPool *pool );
+
 rtsBool
 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
 {
@@ -131,7 +252,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
       pool->tl < pool->lim) {
     *(pool->tl++) = closure;
 
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
     // collect parallel global statistics (currently done together with GC stats)
     if (RtsFlags.ParFlags.ParStats.Global &&
        RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
@@ -141,7 +262,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
 #endif
     return rtsTrue;
   } else {
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
     // collect parallel global statistics (currently done together with GC stats)
     if (RtsFlags.ParFlags.ParStats.Global &&
        RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
@@ -175,93 +296,11 @@ slide_spark_pool( StgSparkPool *pool )
   pool->tl = to_sparkp;
 }
 
-nat
-spark_queue_len( StgSparkPool *pool ) 
-{
-  return (nat) (pool->tl - pool->hd);
-}
-
-/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
-   implicit slide i.e. after marking all sparks are at the beginning of the
-   spark pool and the spark pool only contains sparkable closures 
-*/
-void
-markSparkQueue( void )
-{ 
-  StgClosure **sparkp, **to_sparkp;
-  nat n, pruned_sparks; // stats only
-  StgSparkPool *pool;
-  Capability *cap;
-
-  PAR_TICKY_MARK_SPARK_QUEUE_START();
-
-#ifdef SMP
-  /* walk over the capabilities, allocating a spark pool for each one */
-  for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
-  /* allocate a single spark pool */
-  cap = &MainRegTable;
-  {
-#endif
-    pool = &(cap->rSparks);
-
-#if defined(PAR)
-    // stats only
-    n = 0;
-    pruned_sparks = 0;
-#endif
-
-    sparkp = pool->hd;
-    to_sparkp = pool->base;
-    while (sparkp < pool->tl) {
-      ASSERT(to_sparkp<=sparkp);
-      ASSERT(*sparkp!=NULL);
-      ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
-      // ToDo?: statistics gathering here (also for GUM!)
-      if (closure_SHOULD_SPARK(*sparkp)) {
-       *to_sparkp = MarkRoot(*sparkp);
-       to_sparkp++;
-#ifdef PAR
-       n++;
-#endif
-      } else {
-#ifdef PAR
-       pruned_sparks++;
-#endif
-      }
-      sparkp++;
-    }
-    pool->hd = pool->base;
-    pool->tl = to_sparkp;
-
-    PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-    
-#if defined(SMP)
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
-                  n, pruned_sparks, pthread_self()));
-#elif defined(PAR)
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
-                  n, pruned_sparks, mytid));
-#else
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks",
-                  n, pruned_sparks));
-#endif
-
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)",
-                  spark_queue_len(pool), pool->hd, pool->tl));
-
-  }
-}
-
 void
 disposeSpark(spark)
 StgClosure *spark;
 {
-#if !defined(SMP)
+#if !defined(THREADED_RTS)
   Capability *cap;
   StgSparkPool *pool;
 
@@ -276,22 +315,11 @@ StgClosure *spark;
 
 #elif defined(GRAN)
 
-//@node GranSim code,  , GUM code, Spark Management Routines
-//@subsection GranSim code
-
-//@menu
-//* Basic interface to sparkq::         
-//* Aux fcts::                 
-//@end menu
-
-//@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
-//@subsubsection Basic interface to sparkq
 /* 
    Search the spark queue of the proc in event for a spark that's worth
    turning into a thread 
    (was gimme_spark in the old RTS)
 */
-//@cindex findLocalSpark
 void
 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
 {
@@ -423,7 +451,6 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
   node pointed to by the spark at some point in the future.
   (was munch_spark in the old RTS)
 */
-//@cindex activateSpark
 rtsBool
 activateSpark (rtsEvent *event, rtsSparkQ spark) 
 {
@@ -530,7 +557,6 @@ STATIC_INLINE nat  IGNORE(nat x) { return (0); };
 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
 
 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
-//@cindex newSpark
 rtsSpark *
 newSpark(node,name,gran_info,size_info,par_info,local)
 StgClosure *node;
@@ -567,7 +593,6 @@ nat name, gran_info, size_info, par_info, local;
   return(newspark);
 }
 
-//@cindex disposeSpark
 void
 disposeSpark(spark)
 rtsSpark *spark;
@@ -576,7 +601,6 @@ rtsSpark *spark;
   stgFree(spark);
 }
 
-//@cindex disposeSparkQ
 void 
 disposeSparkQ(spark)
 rtsSparkQ spark;
@@ -602,7 +626,6 @@ rtsSparkQ spark;
    the queue. 
 */
 
-//@cindex add_to_spark_queue
 void
 add_to_spark_queue(spark)
 rtsSpark *spark;
@@ -709,10 +732,6 @@ rtsSpark *spark;
 #  endif
 }
 
-//@node Aux fcts,  , Basic interface to sparkq, GranSim code
-//@subsubsection Aux fcts
-
-//@cindex spark_queue_len
 nat
 spark_queue_len(proc) 
 PEs proc;
@@ -740,7 +759,6 @@ PEs proc;
    hd and tl pointers of the spark queue. Returns a pointer to the next
    spark in the queue.
 */
-//@cindex delete_from_sparkq
 rtsSpark *
 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
 rtsSpark *spark;
@@ -793,7 +811,6 @@ rtsBool dispose_too;
 }
 
 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
-//@cindex markSparkQueue
 void
 markSparkQueue(void)
 { 
@@ -813,7 +830,6 @@ markSparkQueue(void)
           print_sparkq_stats());
 }
 
-//@cindex print_spark
 void
 print_spark(spark)
 rtsSpark *spark;
@@ -835,7 +851,6 @@ rtsSpark *spark;
   }
 }
 
-//@cindex print_sparkq
 void
 print_sparkq(proc)
 PEs proc;
@@ -852,7 +867,6 @@ PEs proc;
 /* 
    Print a statistics of all spark queues.
 */
-//@cindex print_sparkq_stats
 void
 print_sparkq_stats(void)
 {