6e638d4106990f708b291617d7ce830ead3ec180
[ghc-hetmet.git] / ghc / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000
4  *
5  * Sparking support for PAR and SMP versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 //@node Spark Management Routines, , ,
10 //@section Spark Management Routines
11
12 //@menu
13 //* Includes::                  
14 //* GUM code::                  
15 //* GranSim code::              
16 //@end menu
17 //*/
18
19 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
20 //@subsection Includes
21
22 #include "PosixSource.h"
23 #include "Rts.h"
24 #include "Schedule.h"
25 #include "SchedAPI.h"
26 #include "Storage.h"
27 #include "RtsFlags.h"
28 #include "RtsUtils.h"
29 #include "ParTicky.h"
30 # if defined(PAR)
31 # include "ParallelRts.h"
32 # include "GranSimRts.h"   // for GR_...
33 # elif defined(GRAN)
34 # include "GranSimRts.h"
35 # endif
36 #include "Sparks.h"
37
38 #if /*defined(SMP) ||*/ defined(PAR)
39
40 //@node GUM code, GranSim code, Includes, Spark Management Routines
41 //@subsection GUM code
42
43 static void slide_spark_pool( StgSparkPool *pool );
44
45 void
46 initSparkPools( void )
47 {
48   Capability *cap;
49   StgSparkPool *pool;
50
51 #ifdef SMP
52   /* walk over the capabilities, allocating a spark pool for each one */
53   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
54 #else
55   /* allocate a single spark pool */
56   cap = &MainRegTable;
57   {
58 #endif
59     pool = &(cap->rSparks);
60     
61     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
62                                      * sizeof(StgClosure *),
63                                      "initSparkPools");
64     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
65     pool->hd  = pool->base;
66     pool->tl  = pool->base;
67   }
68 }
69
70 /* 
71    We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
72    spark. Rationale, we don't want to give away the only work a PE has.
73    ToDo: introduce low- and high-water-marks for load balancing.
74 */
75 StgClosure *
76 findSpark( rtsBool for_export )
77 {
78   Capability *cap;
79   StgSparkPool *pool;
80   StgClosure *spark, *first=NULL;
81   rtsBool isIdlePE = EMPTY_RUN_QUEUE();
82
83 #ifdef SMP
84   /* walk over the capabilities, allocating a spark pool for each one */
85   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
86 #else
87   /* allocate a single spark pool */
88   cap = &MainRegTable;
89   {
90 #endif
91     pool = &(cap->rSparks);
92     while (pool->hd < pool->tl) {
93       spark = *pool->hd++;
94       if (closure_SHOULD_SPARK(spark)) {
95         if (for_export && isIdlePE) {
96           if (first==NULL) {
97             first = spark; // keep the first usable spark if PE is idle
98           } else {
99             pool->hd--;    // found a second spark; keep it in the pool 
100             ASSERT(*pool->hd==spark);
101             if (RtsFlags.ParFlags.ParStats.Sparks) 
102               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
103                                GR_STEALING, ((StgTSO *)NULL), first, 
104                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
105             return first;  // and return the *first* spark found
106           }
107         } else {
108           if (RtsFlags.ParFlags.ParStats.Sparks && for_export) 
109             DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
110                              GR_STEALING, ((StgTSO *)NULL), spark, 
111                              0, 0 /* spark_queue_len(ADVISORY_POOL) */);
112           return spark;    // return first spark found
113         }
114       }
115     }
116     slide_spark_pool(pool);
117   }
118   return NULL;
119 }
120
121 /* 
122    activateSpark is defined in Schedule.c
123 */
124 rtsBool
125 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
126 {
127   if (pool->tl == pool->lim)
128     slide_spark_pool(pool);
129
130   if (closure_SHOULD_SPARK(closure) && 
131       pool->tl < pool->lim) {
132     *(pool->tl++) = closure;
133
134 #if defined(PAR)
135     // collect parallel global statistics (currently done together with GC stats)
136     if (RtsFlags.ParFlags.ParStats.Global &&
137         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
138       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
139       globalParStats.tot_sparks_created++;
140     }
141 #endif
142     return rtsTrue;
143   } else {
144 #if defined(PAR)
145     // collect parallel global statistics (currently done together with GC stats)
146     if (RtsFlags.ParFlags.ParStats.Global &&
147         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
148       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
149       globalParStats.tot_sparks_ignored++;
150     }
151 #endif
152     return rtsFalse;
153   }
154 }
155
156 static void
157 slide_spark_pool( StgSparkPool *pool )
158 {
159   StgClosure **sparkp, **to_sparkp;
160
161   sparkp = pool->hd;
162   to_sparkp = pool->base;
163   while (sparkp < pool->tl) {
164     ASSERT(to_sparkp<=sparkp);
165     ASSERT(*sparkp!=NULL);
166     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
167
168     if (closure_SHOULD_SPARK(*sparkp)) {
169       *to_sparkp++ = *sparkp++;
170     } else {
171       sparkp++;
172     }
173   }
174   pool->hd = pool->base;
175   pool->tl = to_sparkp;
176 }
177
178 nat
179 spark_queue_len( StgSparkPool *pool ) 
180 {
181   return (nat) (pool->tl - pool->hd);
182 }
183
184 /* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
185    implicit slide i.e. after marking all sparks are at the beginning of the
186    spark pool and the spark pool only contains sparkable closures 
187 */
188 void
189 markSparkQueue( void )
190
191   StgClosure **sparkp, **to_sparkp;
192   nat n, pruned_sparks; // stats only
193   StgSparkPool *pool;
194   Capability *cap;
195
196   PAR_TICKY_MARK_SPARK_QUEUE_START();
197
198 #ifdef SMP
199   /* walk over the capabilities, allocating a spark pool for each one */
200   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
201 #else
202   /* allocate a single spark pool */
203   cap = &MainRegTable;
204   {
205 #endif
206     pool = &(cap->rSparks);
207
208 #if defined(PAR)
209     // stats only
210     n = 0;
211     pruned_sparks = 0;
212 #endif
213
214     sparkp = pool->hd;
215     to_sparkp = pool->base;
216     while (sparkp < pool->tl) {
217       ASSERT(to_sparkp<=sparkp);
218       ASSERT(*sparkp!=NULL);
219       ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
220       // ToDo?: statistics gathering here (also for GUM!)
221       if (closure_SHOULD_SPARK(*sparkp)) {
222         *to_sparkp = MarkRoot(*sparkp);
223         to_sparkp++;
224 #ifdef PAR
225         n++;
226 #endif
227       } else {
228 #ifdef PAR
229         pruned_sparks++;
230 #endif
231       }
232       sparkp++;
233     }
234     pool->hd = pool->base;
235     pool->tl = to_sparkp;
236
237     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
238     
239 #if defined(SMP)
240     IF_DEBUG(scheduler,
241              debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
242                    n, pruned_sparks, pthread_self()));
243 #elif defined(PAR)
244     IF_DEBUG(scheduler,
245              debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
246                    n, pruned_sparks, mytid));
247 #else
248     IF_DEBUG(scheduler,
249              debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks",
250                    n, pruned_sparks));
251 #endif
252
253     IF_DEBUG(scheduler,
254              debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)",
255                    spark_queue_len(pool), pool->hd, pool->tl));
256
257   }
258 }
259
260 void
261 disposeSpark(spark)
262 StgClosure *spark;
263 {
264 #if !defined(SMP)
265   Capability *cap;
266   StgSparkPool *pool;
267
268   cap = &MainRegTable;
269   pool = &(cap->rSparks);
270   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
271 #endif
272   ASSERT(spark != (StgClosure *)NULL);
273   /* Do nothing */
274 }
275
276
277 #elif defined(GRAN)
278
279 //@node GranSim code,  , GUM code, Spark Management Routines
280 //@subsection GranSim code
281
282 //@menu
283 //* Basic interface to sparkq::  
284 //* Aux fcts::                  
285 //@end menu
286
287 //@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
288 //@subsubsection Basic interface to sparkq
289 /* 
290    Search the spark queue of the proc in event for a spark that's worth
291    turning into a thread 
292    (was gimme_spark in the old RTS)
293 */
294 //@cindex findLocalSpark
295 void
296 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
297 {
298    PEs proc = event->proc,       /* proc to search for work */
299        creator = event->creator; /* proc that requested work */
300    StgClosure* node;
301    rtsBool found;
302    rtsSparkQ spark_of_non_local_node = NULL, 
303              spark_of_non_local_node_prev = NULL, 
304              low_priority_spark = NULL, 
305              low_priority_spark_prev = NULL,
306              spark = NULL, prev = NULL;
307   
308    /* Choose a spark from the local spark queue */
309    prev = (rtsSpark*)NULL;
310    spark = pending_sparks_hds[proc];
311    found = rtsFalse;
312
313    // ToDo: check this code & implement local sparking !! -- HWL  
314    while (!found && spark != (rtsSpark*)NULL)
315      {
316        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
317               (prev==(rtsSpark*)NULL || prev->next==spark) &&
318               (spark->prev==prev));
319        node = spark->node;
320        if (!closure_SHOULD_SPARK(node)) 
321          {
322            IF_GRAN_DEBUG(checkSparkQ,
323                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
324                                spark, node));
325
326            if (RtsFlags.GranFlags.GranSimStats.Sparks)
327              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
328                               spark->node, spark->name, spark_queue_len(proc));
329   
330            ASSERT(spark != (rtsSpark*)NULL);
331            ASSERT(SparksAvail>0);
332            --SparksAvail;
333
334            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
335            spark = delete_from_sparkq (spark, proc, rtsTrue);
336            if (spark != (rtsSpark*)NULL)
337              prev = spark->prev;
338            continue;
339          }
340        /* -- node should eventually be sparked */
341        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
342                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
343          {
344            barf("Local sparking not yet implemented");
345
346            /* Remember first low priority spark */
347            if (spark_of_non_local_node==(rtsSpark*)NULL) {
348              spark_of_non_local_node_prev = prev;
349              spark_of_non_local_node = spark;
350               }
351   
352            if (spark->next == (rtsSpark*)NULL) { 
353              /* ASSERT(spark==SparkQueueTl);  just for testing */
354              prev = spark_of_non_local_node_prev;
355              spark = spark_of_non_local_node;
356              found = rtsTrue;
357              break;
358            }
359   
360 # if defined(GRAN) && defined(GRAN_CHECK)
361            /* Should never happen; just for testing 
362            if (spark==pending_sparks_tl) {
363              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
364                 stg_exit(EXIT_FAILURE);
365                 } */
366 # endif
367            prev = spark; 
368            spark = spark->next;
369            ASSERT(SparksAvail>0);
370            --SparksAvail;
371            continue;
372          }
373        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
374                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
375          {
376            if (RtsFlags.GranFlags.DoPrioritySparking)
377              barf("Priority sparking not yet implemented");
378
379            found = rtsTrue;
380          }
381 #if 0      
382        else /* only used if SparkPriority2 is defined */
383          {
384            /* ToDo: fix the code below and re-integrate it */
385            /* Remember first low priority spark */
386            if (low_priority_spark==(rtsSpark*)NULL) { 
387              low_priority_spark_prev = prev;
388              low_priority_spark = spark;
389            }
390   
391            if (spark->next == (rtsSpark*)NULL) { 
392                 /* ASSERT(spark==spark_queue_tl);  just for testing */
393              prev = low_priority_spark_prev;
394              spark = low_priority_spark;
395              found = rtsTrue;       /* take low pri spark => rc is 2  */
396              break;
397            }
398   
399            /* Should never happen; just for testing 
400            if (spark==pending_sparks_tl) {
401              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
402                 stg_exit(EXIT_FAILURE);
403              break;
404            } */                
405            prev = spark; 
406            spark = spark->next;
407
408            IF_GRAN_DEBUG(pri,
409                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
410                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
411                                spark->node, spark->name);)
412            }
413 #endif
414    }  /* while (spark!=NULL && !found) */
415
416    *spark_res = spark;
417    *found_res = found;
418 }
419
420 /*
421   Turn the spark into a thread.
422   In GranSim this basically means scheduling a StartThread event for the
423   node pointed to by the spark at some point in the future.
424   (was munch_spark in the old RTS)
425 */
426 //@cindex activateSpark
427 rtsBool
428 activateSpark (rtsEvent *event, rtsSparkQ spark) 
429 {
430   PEs proc = event->proc,       /* proc to search for work */
431       creator = event->creator; /* proc that requested work */
432   StgTSO* tso;
433   StgClosure* node;
434   rtsTime spark_arrival_time;
435
436   /* 
437      We've found a node on PE proc requested by PE creator.
438      If proc==creator we can turn the spark into a thread immediately;
439      otherwise we schedule a MoveSpark event on the requesting PE
440   */
441      
442   /* DaH Qu' yIchen */
443   if (proc!=creator) { 
444
445     /* only possible if we simulate GUM style fishing */
446     ASSERT(RtsFlags.GranFlags.Fishing);
447
448     /* Message packing costs for sending a Fish; qeq jabbI'ID */
449     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
450   
451     if (RtsFlags.GranFlags.GranSimStats.Sparks)
452       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
453                        (StgTSO*)NULL, spark->node,
454                        spark->name, spark_queue_len(proc));
455
456     /* time of the spark arrival on the remote PE */
457     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
458
459     new_event(creator, proc, spark_arrival_time,
460               MoveSpark,
461               (StgTSO*)NULL, spark->node, spark);
462
463     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
464             
465   } else { /* proc==creator i.e. turn the spark into a thread */
466
467     if ( RtsFlags.GranFlags.GranSimStats.Global && 
468          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
469
470       globalGranStats.tot_low_pri_sparks++;
471       IF_GRAN_DEBUG(pri,
472                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
473                           spark->gran_info, 
474                           spark->node, spark->name));
475     } 
476     
477     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
478     
479     node = spark->node;
480     
481 # if 0
482     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
483     if (GARBAGE COLLECTION IS NECESSARY) {
484       /* Some kind of backoff needed here in case there's too little heap */
485 #  if defined(GRAN_CHECK) && defined(GRAN)
486       if (RtsFlags.GcFlags.giveStats)
487         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
488                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
489                 spark, node, spark->name);
490 #  endif
491       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
492                   FindWork,
493                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
494       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
495       GarbageCollect(GetRoots, rtsFalse);
496       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
497       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
498       spark = NULL;
499       return; /* was: continue; */ /* to the next event, eventually */
500     }
501 # endif
502     
503     if (RtsFlags.GranFlags.GranSimStats.Sparks)
504       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
505                        spark->node, spark->name,
506                        spark_queue_len(CurrentProc));
507     
508     new_event(proc, proc, CurrentTime[proc],
509               StartThread, 
510               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
511     
512     procStatus[proc] = Starting;
513   }
514 }
515
516 /* -------------------------------------------------------------------------
517    This is the main point where handling granularity information comes into
518    play. 
519    ------------------------------------------------------------------------- */
520
521 #define MAX_RAND_PRI    100
522
523 /* 
524    Granularity info transformers. 
525    Applied to the GRAN_INFO field of a spark.
526 */
527 STATIC_INLINE nat  ID(nat x) { return(x); };
528 STATIC_INLINE nat  INV(nat x) { return(-x); };
529 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
530 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
531
532 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
533 //@cindex newSpark
534 rtsSpark *
535 newSpark(node,name,gran_info,size_info,par_info,local)
536 StgClosure *node;
537 nat name, gran_info, size_info, par_info, local;
538 {
539   nat pri;
540   rtsSpark *newspark;
541
542   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
543         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
544         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
545                            ID(gran_info);
546
547   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
548        pri<RtsFlags.GranFlags.SparkPriority ) {
549     IF_GRAN_DEBUG(pri,
550       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
551               pri, RtsFlags.GranFlags.SparkPriority, node, name));
552     return ((rtsSpark*)NULL);
553   }
554
555   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
556   newspark->prev = newspark->next = (rtsSpark*)NULL;
557   newspark->node = node;
558   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
559   newspark->gran_info = pri;
560   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
561
562   if (RtsFlags.GranFlags.GranSimStats.Global) {
563     globalGranStats.tot_sparks_created++;
564     globalGranStats.sparks_created_on_PE[CurrentProc]++;
565   }
566
567   return(newspark);
568 }
569
570 //@cindex disposeSpark
571 void
572 disposeSpark(spark)
573 rtsSpark *spark;
574 {
575   ASSERT(spark!=NULL);
576   stgFree(spark);
577 }
578
579 //@cindex disposeSparkQ
580 void 
581 disposeSparkQ(spark)
582 rtsSparkQ spark;
583 {
584   if (spark==NULL) 
585     return;
586
587   disposeSparkQ(spark->next);
588
589 # ifdef GRAN_CHECK
590   if (SparksAvail < 0) {
591     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
592     print_spark(spark);
593   }
594 # endif
595
596   stgFree(spark);
597 }
598
599 /*
600    With PrioritySparking add_to_spark_queue performs an insert sort to keep
601    the spark queue sorted. Otherwise the spark is just added to the end of
602    the queue. 
603 */
604
605 //@cindex add_to_spark_queue
606 void
607 add_to_spark_queue(spark)
608 rtsSpark *spark;
609 {
610   rtsSpark *prev = NULL, *next = NULL;
611   nat count = 0;
612   rtsBool found = rtsFalse;
613
614   if ( spark == (rtsSpark *)NULL ) {
615     return;
616   }
617
618   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
619     /* Priority sparking is enabled i.e. spark queues must be sorted */
620
621     for (prev = NULL, next = pending_sparks_hd, count=0;
622          (next != NULL) && 
623          !(found = (spark->gran_info >= next->gran_info));
624          prev = next, next = next->next, count++) 
625      {}
626
627   } else {   /* 'utQo' */
628     /* Priority sparking is disabled */
629     
630     found = rtsFalse;   /* to add it at the end */
631
632   }
633
634   if (found) {
635     /* next points to the first spark with a gran_info smaller than that
636        of spark; therefore, add spark before next into the spark queue */
637     spark->next = next;
638     if ( next == NULL ) {
639       pending_sparks_tl = spark;
640     } else {
641       next->prev = spark;
642     }
643     spark->prev = prev;
644     if ( prev == NULL ) {
645       pending_sparks_hd = spark;
646     } else {
647       prev->next = spark;
648     }
649   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
650     /* add the spark at the end of the spark queue */
651     spark->next = NULL;                        
652     spark->prev = pending_sparks_tl;
653     if (pending_sparks_hd == NULL)
654       pending_sparks_hd = spark;
655     else
656       pending_sparks_tl->next = spark;
657     pending_sparks_tl = spark;    
658   } 
659   ++SparksAvail;
660
661   /* add costs for search in priority sparking */
662   if (RtsFlags.GranFlags.DoPrioritySparking) {
663     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
664   }
665
666   IF_GRAN_DEBUG(checkSparkQ,
667                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
668                       spark, spark->node, CurrentProc);
669                 print_sparkq_stats());
670
671 #  if defined(GRAN_CHECK)
672   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
673     for (prev = NULL, next =  pending_sparks_hd;
674          (next != NULL);
675          prev = next, next = next->next) 
676       {}
677     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
678       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
679               spark,CurrentProc, 
680               pending_sparks_tl, prev);
681   }
682 #  endif
683
684 #  if defined(GRAN_CHECK)
685   /* Check if the sparkq is still sorted. Just for testing, really!  */
686   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
687        RtsFlags.GranFlags.Debug.pri ) {
688     rtsBool sorted = rtsTrue;
689     rtsSpark *prev, *next;
690
691     if (pending_sparks_hd == NULL ||
692         pending_sparks_hd->next == NULL ) {
693       /* just 1 elem => ok */
694     } else {
695       for (prev = pending_sparks_hd,
696            next = pending_sparks_hd->next;
697            (next != NULL) ;
698            prev = next, next = next->next) {
699         sorted = sorted && 
700                  (prev->gran_info >= next->gran_info);
701       }
702     }
703     if (!sorted) {
704       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
705               CurrentProc);
706       print_sparkq(CurrentProc);
707     }
708   }
709 #  endif
710 }
711
712 //@node Aux fcts,  , Basic interface to sparkq, GranSim code
713 //@subsubsection Aux fcts
714
715 //@cindex spark_queue_len
716 nat
717 spark_queue_len(proc) 
718 PEs proc;
719 {
720  rtsSpark *prev, *spark;                     /* prev only for testing !! */
721  nat len;
722
723  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
724       spark != NULL; 
725       len++, prev = spark, spark = spark->next)
726    {}
727
728 #  if defined(GRAN_CHECK)
729   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
730     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
731       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
732               proc, pending_sparks_tls[proc], prev);
733 #  endif
734
735  return (len);
736 }
737
738 /* 
739    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
740    hd and tl pointers of the spark queue. Returns a pointer to the next
741    spark in the queue.
742 */
743 //@cindex delete_from_sparkq
744 rtsSpark *
745 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
746 rtsSpark *spark;
747 PEs p;
748 rtsBool dispose_too;
749 {
750   rtsSpark *new_spark;
751
752   if (spark==NULL) 
753     barf("delete_from_sparkq: trying to delete NULL spark\n");
754
755 #  if defined(GRAN_CHECK)
756   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
757     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
758             pending_sparks_hd, pending_sparks_tl,
759             spark->prev, spark, spark->next, 
760             (spark->next==NULL ? 0 : spark->next->prev));
761   }
762 #  endif
763
764   if (spark->prev==NULL) {
765     /* spark is first spark of queue => adjust hd pointer */
766     ASSERT(pending_sparks_hds[p]==spark);
767     pending_sparks_hds[p] = spark->next;
768   } else {
769     spark->prev->next = spark->next;
770   }
771   if (spark->next==NULL) {
772     ASSERT(pending_sparks_tls[p]==spark);
773     /* spark is first spark of queue => adjust tl pointer */
774     pending_sparks_tls[p] = spark->prev;
775   } else {
776     spark->next->prev = spark->prev;
777   }
778   new_spark = spark->next;
779   
780 #  if defined(GRAN_CHECK)
781   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
782     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
783             pending_sparks_hd, pending_sparks_tl,
784             spark->prev, spark, spark->next, 
785             (spark->next==NULL ? 0 : spark->next->prev), spark);
786   }
787 #  endif
788
789   if (dispose_too)
790     disposeSpark(spark);
791                   
792   return new_spark;
793 }
794
795 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
796 //@cindex markSparkQueue
797 void
798 markSparkQueue(void)
799
800   StgClosure *MarkRoot(StgClosure *root); // prototype
801   PEs p;
802   rtsSpark *sp;
803
804   for (p=0; p<RtsFlags.GranFlags.proc; p++)
805     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
806       ASSERT(sp->node!=NULL);
807       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
808       // ToDo?: statistics gathering here (also for GUM!)
809       sp->node = (StgClosure *)MarkRoot(sp->node);
810     }
811   IF_DEBUG(gc,
812            debugBelch("@@ markSparkQueue: spark statistics at start of GC:");
813            print_sparkq_stats());
814 }
815
816 //@cindex print_spark
817 void
818 print_spark(spark)
819 rtsSpark *spark;
820
821   char str[16];
822
823   if (spark==NULL) {
824     debugBelch("Spark: NIL\n");
825     return;
826   } else {
827     sprintf(str,
828             ((spark->node==NULL) ? "______" : "%#6lx"), 
829             stgCast(StgPtr,spark->node));
830
831     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
832             str, spark->name, 
833             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
834             spark->prev, spark->next);
835   }
836 }
837
838 //@cindex print_sparkq
839 void
840 print_sparkq(proc)
841 PEs proc;
842 // rtsSpark *hd;
843 {
844   rtsSpark *x = pending_sparks_hds[proc];
845
846   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
847   for (; x!=(rtsSpark*)NULL; x=x->next) {
848     print_spark(x);
849   }
850 }
851
852 /* 
853    Print a statistics of all spark queues.
854 */
855 //@cindex print_sparkq_stats
856 void
857 print_sparkq_stats(void)
858 {
859   PEs p;
860
861   debugBelch("SparkQs: [");
862   for (p=0; p<RtsFlags.GranFlags.proc; p++)
863     debugBelch(", PE %d: %d", p, spark_queue_len(p));
864   debugBelch("\n");
865 }
866
867 #endif