[project @ 2005-03-10 14:03:28 by simonmar]
[ghc-hetmet.git] / ghc / rts / Sparks.c
index 4a9bf00..07b3b6e 100644 (file)
@@ -1,5 +1,4 @@
 /* ---------------------------------------------------------------------------
- * $Id: Sparks.c,v 1.2 2000/03/31 03:09:36 hwloidl Exp $
  *
  * (c) The GHC Team, 2000
  *
 //* GUM code::                 
 //* GranSim code::             
 //@end menu
+//*/
 
 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
 //@subsection Includes
 
+#include "PosixSource.h"
 #include "Rts.h"
 #include "Schedule.h"
 #include "SchedAPI.h"
 #include "Storage.h"
 #include "RtsFlags.h"
 #include "RtsUtils.h"
+#include "ParTicky.h"
 # if defined(PAR)
 # include "ParallelRts.h"
+# include "GranSimRts.h"   // for GR_...
 # elif defined(GRAN)
 # include "GranSimRts.h"
 # endif
 #include "Sparks.h"
 
-#if defined(SMP) || defined(PAR)
+#if /*defined(SMP) ||*/ defined(PAR)
 
 //@node GUM code, GranSim code, Includes, Spark Management Routines
 //@subsection GUM code
 
 static void slide_spark_pool( StgSparkPool *pool );
 
-void
+rtsBool
 initSparkPools( void )
 {
   Capability *cap;
@@ -62,14 +65,21 @@ initSparkPools( void )
     pool->hd  = pool->base;
     pool->tl  = pool->base;
   }
+  return rtsTrue; /* Qapla' */
 }
 
+/* 
+   We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
+   spark. Rationale, we don't want to give away the only work a PE has.
+   ToDo: introduce low- and high-water-marks for load balancing.
+*/
 StgClosure *
-findSpark( void )
+findSpark( rtsBool for_export )
 {
   Capability *cap;
   StgSparkPool *pool;
-  StgClosure *spark;
+  StgClosure *spark, *first=NULL;
+  rtsBool isIdlePE = EMPTY_RUN_QUEUE();
 
 #ifdef SMP
   /* walk over the capabilities, allocating a spark pool for each one */
@@ -82,14 +92,36 @@ findSpark( void )
     pool = &(cap->rSparks);
     while (pool->hd < pool->tl) {
       spark = *pool->hd++;
-      if (closure_SHOULD_SPARK(spark))
-       return spark;
+      if (closure_SHOULD_SPARK(spark)) {
+       if (for_export && isIdlePE) {
+         if (first==NULL) {
+           first = spark; // keep the first usable spark if PE is idle
+         } else {
+           pool->hd--;    // found a second spark; keep it in the pool 
+           ASSERT(*pool->hd==spark);
+           if (RtsFlags.ParFlags.ParStats.Sparks) 
+             DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
+                              GR_STEALING, ((StgTSO *)NULL), first, 
+                              0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+           return first;  // and return the *first* spark found
+         }
+        } else {
+         if (RtsFlags.ParFlags.ParStats.Sparks && for_export) 
+           DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
+                            GR_STEALING, ((StgTSO *)NULL), spark, 
+                            0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+         return spark;    // return first spark found
+       }
+      }
     }
     slide_spark_pool(pool);
   }
   return NULL;
 }
 
+/* 
+   activateSpark is defined in Schedule.c
+*/
 rtsBool
 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
 {
@@ -99,8 +131,25 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
   if (closure_SHOULD_SPARK(closure) && 
       pool->tl < pool->lim) {
     *(pool->tl++) = closure;
+
+#if defined(PAR)
+    // collect parallel global statistics (currently done together with GC stats)
+    if (RtsFlags.ParFlags.ParStats.Global &&
+       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+      // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
+      globalParStats.tot_sparks_created++;
+    }
+#endif
     return rtsTrue;
   } else {
+#if defined(PAR)
+    // collect parallel global statistics (currently done together with GC stats)
+    if (RtsFlags.ParFlags.ParStats.Global &&
+       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+      //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
+      globalParStats.tot_sparks_ignored++;
+    }
+#endif
     return rtsFalse;
   }
 }
@@ -141,12 +190,12 @@ void
 markSparkQueue( void )
 { 
   StgClosure **sparkp, **to_sparkp;
-#ifdef DEBUG
-  nat n, pruned_sparks;
-#endif
+  nat n, pruned_sparks; // stats only
   StgSparkPool *pool;
   Capability *cap;
 
+  PAR_TICKY_MARK_SPARK_QUEUE_START();
+
 #ifdef SMP
   /* walk over the capabilities, allocating a spark pool for each one */
   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
@@ -156,8 +205,9 @@ markSparkQueue( void )
   {
 #endif
     pool = &(cap->rSparks);
-    
-#ifdef DEBUG
+
+#if defined(PAR)
+    // stats only
     n = 0;
     pruned_sparks = 0;
 #endif
@@ -172,11 +222,11 @@ markSparkQueue( void )
       if (closure_SHOULD_SPARK(*sparkp)) {
        *to_sparkp = MarkRoot(*sparkp);
        to_sparkp++;
-#ifdef DEBUG
+#ifdef PAR
        n++;
 #endif
       } else {
-#ifdef DEBUG
+#ifdef PAR
        pruned_sparks++;
 #endif
       }
@@ -185,22 +235,24 @@ markSparkQueue( void )
     pool->hd = pool->base;
     pool->tl = to_sparkp;
 
+    PAR_TICKY_MARK_SPARK_QUEUE_END(n);
+    
 #if defined(SMP)
     IF_DEBUG(scheduler,
-            belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
                   n, pruned_sparks, pthread_self()));
 #elif defined(PAR)
     IF_DEBUG(scheduler,
-            belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
                   n, pruned_sparks, mytid));
 #else
     IF_DEBUG(scheduler,
-            belch("markSparkQueue: marked %d sparks and pruned %d sparks",
+            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks",
                   n, pruned_sparks));
 #endif
 
     IF_DEBUG(scheduler,
-            belch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)",
+            debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)",
                   spark_queue_len(pool), pool->hd, pool->tl));
 
   }
@@ -269,7 +321,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
        if (!closure_SHOULD_SPARK(node)) 
          {
           IF_GRAN_DEBUG(checkSparkQ,
-                        belch("^^ pruning spark %p (node %p) in gimme_spark",
+                        debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
                               spark, node));
 
            if (RtsFlags.GranFlags.GranSimStats.Sparks)
@@ -309,7 +361,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
 # if defined(GRAN) && defined(GRAN_CHECK)
            /* Should never happen; just for testing 
            if (spark==pending_sparks_tl) {
-             fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
+             debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
                stg_exit(EXIT_FAILURE);
                } */
 # endif
@@ -347,7 +399,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
   
            /* Should never happen; just for testing 
            if (spark==pending_sparks_tl) {
-             fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
+             debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
                stg_exit(EXIT_FAILURE);
              break;
           } */                
@@ -355,7 +407,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
           spark = spark->next;
 
           IF_GRAN_DEBUG(pri,
-                        belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
+                        debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
                               spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
                               spark->node, spark->name);)
            }
@@ -418,9 +470,9 @@ activateSpark (rtsEvent *event, rtsSparkQ spark)
 
       globalGranStats.tot_low_pri_sparks++;
       IF_GRAN_DEBUG(pri,
-                   belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
+                   debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
                          spark->gran_info, 
-                         spark->node, spark->name);)
+                         spark->node, spark->name));
     } 
     
     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
@@ -441,7 +493,7 @@ activateSpark (rtsEvent *event, rtsSparkQ spark)
                  FindWork,
                  (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
-      GarbageCollect(GetRoots);
+      GarbageCollect(GetRoots, rtsFalse);
       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
       spark = NULL;
@@ -473,10 +525,10 @@ activateSpark (rtsEvent *event, rtsSparkQ spark)
    Granularity info transformers. 
    Applied to the GRAN_INFO field of a spark.
 */
-static inline nat  ID(nat x) { return(x); };
-static inline nat  INV(nat x) { return(-x); };
-static inline nat  IGNORE(nat x) { return (0); };
-static inline nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
+STATIC_INLINE nat  ID(nat x) { return(x); };
+STATIC_INLINE nat  INV(nat x) { return(-x); };
+STATIC_INLINE nat  IGNORE(nat x) { return (0); };
+STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
 
 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
 //@cindex newSpark
@@ -496,7 +548,7 @@ nat name, gran_info, size_info, par_info, local;
   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
        pri<RtsFlags.GranFlags.SparkPriority ) {
     IF_GRAN_DEBUG(pri,
-      belch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
+      debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
              pri, RtsFlags.GranFlags.SparkPriority, node, name));
     return ((rtsSpark*)NULL);
   }
@@ -522,7 +574,7 @@ disposeSpark(spark)
 rtsSpark *spark;
 {
   ASSERT(spark!=NULL);
-  free(spark);
+  stgFree(spark);
 }
 
 //@cindex disposeSparkQ
@@ -537,12 +589,12 @@ rtsSparkQ spark;
 
 # ifdef GRAN_CHECK
   if (SparksAvail < 0) {
-    fprintf(stderr,"disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
+    debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
     print_spark(spark);
   }
 # endif
 
-  free(spark);
+  stgFree(spark);
 }
 
 /*
@@ -613,7 +665,7 @@ rtsSpark *spark;
   }
 
   IF_GRAN_DEBUG(checkSparkQ,
-               belch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
+               debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
                      spark, spark->node, CurrentProc);
                print_sparkq_stats());
 
@@ -624,7 +676,7 @@ rtsSpark *spark;
         prev = next, next = next->next) 
       {}
     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
-      fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
+      debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
              spark,CurrentProc, 
              pending_sparks_tl, prev);
   }
@@ -650,7 +702,7 @@ rtsSpark *spark;
       }
     }
     if (!sorted) {
-      fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n",
+      debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
              CurrentProc);
       print_sparkq(CurrentProc);
     }
@@ -677,7 +729,7 @@ PEs proc;
 #  if defined(GRAN_CHECK)
   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
-      fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
+      debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
              proc, pending_sparks_tls[proc], prev);
 #  endif
 
@@ -703,7 +755,7 @@ rtsBool dispose_too;
 
 #  if defined(GRAN_CHECK)
   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
-    fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
+    debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
            pending_sparks_hd, pending_sparks_tl,
            spark->prev, spark, spark->next, 
            (spark->next==NULL ? 0 : spark->next->prev));
@@ -728,7 +780,7 @@ rtsBool dispose_too;
   
 #  if defined(GRAN_CHECK)
   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
-    fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
+    debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
            pending_sparks_hd, pending_sparks_tl,
            spark->prev, spark, spark->next, 
            (spark->next==NULL ? 0 : spark->next->prev), spark);
@@ -758,7 +810,7 @@ markSparkQueue(void)
       sp->node = (StgClosure *)MarkRoot(sp->node);
     }
   IF_DEBUG(gc,
-          belch("@@ markSparkQueue: spark statistics at start of GC:");
+          debugBelch("@@ markSparkQueue: spark statistics at start of GC:");
           print_sparkq_stats());
 }
 
@@ -770,14 +822,14 @@ rtsSpark *spark;
   char str[16];
 
   if (spark==NULL) {
-    fprintf(stderr,"Spark: NIL\n");
+    debugBelch("Spark: NIL\n");
     return;
   } else {
     sprintf(str,
            ((spark->node==NULL) ? "______" : "%#6lx"), 
            stgCast(StgPtr,spark->node));
 
-    fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
+    debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
            str, spark->name, 
             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
             spark->prev, spark->next);
@@ -792,7 +844,7 @@ PEs proc;
 {
   rtsSpark *x = pending_sparks_hds[proc];
 
-  fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x);
+  debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
   for (; x!=(rtsSpark*)NULL; x=x->next) {
     print_spark(x);
   }
@@ -807,10 +859,10 @@ print_sparkq_stats(void)
 {
   PEs p;
 
-  fprintf(stderr, "SparkQs: [");
+  debugBelch("SparkQs: [");
   for (p=0; p<RtsFlags.GranFlags.proc; p++)
-    fprintf(stderr, ", PE %d: %d", p, spark_queue_len(p));
-  fprintf(stderr, "\n");
+    debugBelch(", PE %d: %d", p, spark_queue_len(p));
+  debugBelch("\n");
 }
 
 #endif