[project @ 2001-03-22 03:51:08 by hwloidl]
[ghc-hetmet.git] / ghc / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Sparks.c,v 1.3 2001/03/22 03:51:10 hwloidl Exp $
3  *
4  * (c) The GHC Team, 2000
5  *
6  * Sparking support for PAR and SMP versions of the RTS.
7  *
8  * -------------------------------------------------------------------------*/
9
10 //@node Spark Management Routines, , ,
11 //@section Spark Management Routines
12
13 //@menu
14 //* Includes::                  
15 //* GUM code::                  
16 //* GranSim code::              
17 //@end menu
18 //*/
19
20 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
21 //@subsection Includes
22
23 #include "Rts.h"
24 #include "Schedule.h"
25 #include "SchedAPI.h"
26 #include "Storage.h"
27 #include "RtsFlags.h"
28 #include "RtsUtils.h"
29 #include "ParTicky.h"
30 # if defined(PAR)
31 # include "ParallelRts.h"
32 # include "GranSimRts.h"   // for GR_...
33 # elif defined(GRAN)
34 # include "GranSimRts.h"
35 # endif
36 #include "Sparks.h"
37
38 #if defined(SMP) || defined(PAR)
39
40 //@node GUM code, GranSim code, Includes, Spark Management Routines
41 //@subsection GUM code
42
43 static void slide_spark_pool( StgSparkPool *pool );
44
45 rtsBool
46 initSparkPools( void )
47 {
48   Capability *cap;
49   StgSparkPool *pool;
50
51 #ifdef SMP
52   /* walk over the capabilities, allocating a spark pool for each one */
53   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
54 #else
55   /* allocate a single spark pool */
56   cap = &MainRegTable;
57   {
58 #endif
59     pool = &(cap->rSparks);
60     
61     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
62                                      * sizeof(StgClosure *),
63                                      "initSparkPools");
64     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
65     pool->hd  = pool->base;
66     pool->tl  = pool->base;
67   }
68   return rtsTrue; /* Qapla' */
69 }
70
71 /* 
72    We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
73    spark. Rationale, we don't want to give away the only work a PE has.
74    ToDo: introduce low- and high-water-marks for load balancing.
75 */
76 StgClosure *
77 findSpark( rtsBool for_export )
78 {
79   Capability *cap;
80   StgSparkPool *pool;
81   StgClosure *spark, *first=NULL;
82   rtsBool isIdlePE = EMPTY_RUN_QUEUE();
83
84 #ifdef SMP
85   /* walk over the capabilities, allocating a spark pool for each one */
86   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
87 #else
88   /* allocate a single spark pool */
89   cap = &MainRegTable;
90   {
91 #endif
92     pool = &(cap->rSparks);
93     while (pool->hd < pool->tl) {
94       spark = *pool->hd++;
95       if (closure_SHOULD_SPARK(spark)) {
96         if (for_export && isIdlePE) {
97           if (first==NULL) {
98             first = spark; // keep the first usable spark if PE is idle
99           } else {
100             pool->hd--;    // found a second spark; keep it in the pool 
101             ASSERT(*pool->hd==spark);
102             if (RtsFlags.ParFlags.ParStats.Sparks) 
103               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
104                                GR_STEALING, ((StgTSO *)NULL), first, 
105                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
106             return first;  // and return the *first* spark found
107           }
108         } else {
109           if (RtsFlags.ParFlags.ParStats.Sparks && for_export) 
110             DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
111                              GR_STEALING, ((StgTSO *)NULL), spark, 
112                              0, 0 /* spark_queue_len(ADVISORY_POOL) */);
113           return spark;    // return first spark found
114         }
115       }
116     }
117     slide_spark_pool(pool);
118   }
119   return NULL;
120 }
121
122 /* 
123    activateSpark is defined in Schedule.c
124 */
125 rtsBool
126 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
127 {
128   if (pool->tl == pool->lim)
129     slide_spark_pool(pool);
130
131   if (closure_SHOULD_SPARK(closure) && 
132       pool->tl < pool->lim) {
133     *(pool->tl++) = closure;
134
135 #if defined(PAR)
136     // collect parallel global statistics (currently done together with GC stats)
137     if (RtsFlags.ParFlags.ParStats.Global &&
138         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
139       // fprintf(stderr, "Creating spark for %x @ %11.2f\n", closure, usertime()); 
140       globalParStats.tot_sparks_created++;
141     }
142 #endif
143     return rtsTrue;
144   } else {
145 #if defined(PAR)
146     // collect parallel global statistics (currently done together with GC stats)
147     if (RtsFlags.ParFlags.ParStats.Global &&
148         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
149       //fprintf(stderr, "Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
150       globalParStats.tot_sparks_ignored++;
151     }
152 #endif
153     return rtsFalse;
154   }
155 }
156
157 static void
158 slide_spark_pool( StgSparkPool *pool )
159 {
160   StgClosure **sparkp, **to_sparkp;
161
162   sparkp = pool->hd;
163   to_sparkp = pool->base;
164   while (sparkp < pool->tl) {
165     ASSERT(to_sparkp<=sparkp);
166     ASSERT(*sparkp!=NULL);
167     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
168
169     if (closure_SHOULD_SPARK(*sparkp)) {
170       *to_sparkp++ = *sparkp++;
171     } else {
172       sparkp++;
173     }
174   }
175   pool->hd = pool->base;
176   pool->tl = to_sparkp;
177 }
178
179 nat
180 spark_queue_len( StgSparkPool *pool ) 
181 {
182   return (nat) (pool->tl - pool->hd);
183 }
184
185 /* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
186    implicit slide i.e. after marking all sparks are at the beginning of the
187    spark pool and the spark pool only contains sparkable closures 
188 */
189 void
190 markSparkQueue( void )
191
192   StgClosure **sparkp, **to_sparkp;
193   nat n, pruned_sparks; // stats only
194   StgSparkPool *pool;
195   Capability *cap;
196
197   PAR_TICKY_MARK_SPARK_QUEUE_START();
198
199 #ifdef SMP
200   /* walk over the capabilities, allocating a spark pool for each one */
201   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
202 #else
203   /* allocate a single spark pool */
204   cap = &MainRegTable;
205   {
206 #endif
207     pool = &(cap->rSparks);
208
209 #if defined(PAR)
210     // stats only
211     n = 0;
212     pruned_sparks = 0;
213 #endif
214
215     sparkp = pool->hd;
216     to_sparkp = pool->base;
217     while (sparkp < pool->tl) {
218       ASSERT(to_sparkp<=sparkp);
219       ASSERT(*sparkp!=NULL);
220       ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
221       // ToDo?: statistics gathering here (also for GUM!)
222       if (closure_SHOULD_SPARK(*sparkp)) {
223         *to_sparkp = MarkRoot(*sparkp);
224         to_sparkp++;
225 #ifdef PAR
226         n++;
227 #endif
228       } else {
229 #ifdef PAR
230         pruned_sparks++;
231 #endif
232       }
233       sparkp++;
234     }
235     pool->hd = pool->base;
236     pool->tl = to_sparkp;
237
238     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
239     
240 #if defined(SMP)
241     IF_DEBUG(scheduler,
242              belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
243                    n, pruned_sparks, pthread_self()));
244 #elif defined(PAR)
245     IF_DEBUG(scheduler,
246              belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
247                    n, pruned_sparks, mytid));
248 #else
249     IF_DEBUG(scheduler,
250              belch("markSparkQueue: marked %d sparks and pruned %d sparks",
251                    n, pruned_sparks));
252 #endif
253
254     IF_DEBUG(scheduler,
255              belch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)",
256                    spark_queue_len(pool), pool->hd, pool->tl));
257
258   }
259 }
260
261 void
262 disposeSpark(spark)
263 StgClosure *spark;
264 {
265 #if !defined(SMP)
266   Capability *cap;
267   StgSparkPool *pool;
268
269   cap = &MainRegTable;
270   pool = &(cap->rSparks);
271   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
272 #endif
273   ASSERT(spark != (StgClosure *)NULL);
274   /* Do nothing */
275 }
276
277
278 #elif defined(GRAN)
279
280 //@node GranSim code,  , GUM code, Spark Management Routines
281 //@subsection GranSim code
282
283 //@menu
284 //* Basic interface to sparkq::  
285 //* Aux fcts::                  
286 //@end menu
287
288 //@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
289 //@subsubsection Basic interface to sparkq
290 /* 
291    Search the spark queue of the proc in event for a spark that's worth
292    turning into a thread 
293    (was gimme_spark in the old RTS)
294 */
295 //@cindex findLocalSpark
296 void
297 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
298 {
299    PEs proc = event->proc,       /* proc to search for work */
300        creator = event->creator; /* proc that requested work */
301    StgClosure* node;
302    rtsBool found;
303    rtsSparkQ spark_of_non_local_node = NULL, 
304              spark_of_non_local_node_prev = NULL, 
305              low_priority_spark = NULL, 
306              low_priority_spark_prev = NULL,
307              spark = NULL, prev = NULL;
308   
309    /* Choose a spark from the local spark queue */
310    prev = (rtsSpark*)NULL;
311    spark = pending_sparks_hds[proc];
312    found = rtsFalse;
313
314    // ToDo: check this code & implement local sparking !! -- HWL  
315    while (!found && spark != (rtsSpark*)NULL)
316      {
317        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
318               (prev==(rtsSpark*)NULL || prev->next==spark) &&
319               (spark->prev==prev));
320        node = spark->node;
321        if (!closure_SHOULD_SPARK(node)) 
322          {
323            IF_GRAN_DEBUG(checkSparkQ,
324                          belch("^^ pruning spark %p (node %p) in gimme_spark",
325                                spark, node));
326
327            if (RtsFlags.GranFlags.GranSimStats.Sparks)
328              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
329                               spark->node, spark->name, spark_queue_len(proc));
330   
331            ASSERT(spark != (rtsSpark*)NULL);
332            ASSERT(SparksAvail>0);
333            --SparksAvail;
334
335            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
336            spark = delete_from_sparkq (spark, proc, rtsTrue);
337            if (spark != (rtsSpark*)NULL)
338              prev = spark->prev;
339            continue;
340          }
341        /* -- node should eventually be sparked */
342        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
343                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
344          {
345            barf("Local sparking not yet implemented");
346
347            /* Remember first low priority spark */
348            if (spark_of_non_local_node==(rtsSpark*)NULL) {
349              spark_of_non_local_node_prev = prev;
350              spark_of_non_local_node = spark;
351               }
352   
353            if (spark->next == (rtsSpark*)NULL) { 
354              /* ASSERT(spark==SparkQueueTl);  just for testing */
355              prev = spark_of_non_local_node_prev;
356              spark = spark_of_non_local_node;
357              found = rtsTrue;
358              break;
359            }
360   
361 # if defined(GRAN) && defined(GRAN_CHECK)
362            /* Should never happen; just for testing 
363            if (spark==pending_sparks_tl) {
364              fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
365                 stg_exit(EXIT_FAILURE);
366                 } */
367 # endif
368            prev = spark; 
369            spark = spark->next;
370            ASSERT(SparksAvail>0);
371            --SparksAvail;
372            continue;
373          }
374        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
375                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
376          {
377            if (RtsFlags.GranFlags.DoPrioritySparking)
378              barf("Priority sparking not yet implemented");
379
380            found = rtsTrue;
381          }
382 #if 0      
383        else /* only used if SparkPriority2 is defined */
384          {
385            /* ToDo: fix the code below and re-integrate it */
386            /* Remember first low priority spark */
387            if (low_priority_spark==(rtsSpark*)NULL) { 
388              low_priority_spark_prev = prev;
389              low_priority_spark = spark;
390            }
391   
392            if (spark->next == (rtsSpark*)NULL) { 
393                 /* ASSERT(spark==spark_queue_tl);  just for testing */
394              prev = low_priority_spark_prev;
395              spark = low_priority_spark;
396              found = rtsTrue;       /* take low pri spark => rc is 2  */
397              break;
398            }
399   
400            /* Should never happen; just for testing 
401            if (spark==pending_sparks_tl) {
402              fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
403                 stg_exit(EXIT_FAILURE);
404              break;
405            } */                
406            prev = spark; 
407            spark = spark->next;
408
409            IF_GRAN_DEBUG(pri,
410                          belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
411                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
412                                spark->node, spark->name);)
413            }
414 #endif
415    }  /* while (spark!=NULL && !found) */
416
417    *spark_res = spark;
418    *found_res = found;
419 }
420
421 /*
422   Turn the spark into a thread.
423   In GranSim this basically means scheduling a StartThread event for the
424   node pointed to by the spark at some point in the future.
425   (was munch_spark in the old RTS)
426 */
427 //@cindex activateSpark
428 rtsBool
429 activateSpark (rtsEvent *event, rtsSparkQ spark) 
430 {
431   PEs proc = event->proc,       /* proc to search for work */
432       creator = event->creator; /* proc that requested work */
433   StgTSO* tso;
434   StgClosure* node;
435   rtsTime spark_arrival_time;
436
437   /* 
438      We've found a node on PE proc requested by PE creator.
439      If proc==creator we can turn the spark into a thread immediately;
440      otherwise we schedule a MoveSpark event on the requesting PE
441   */
442      
443   /* DaH Qu' yIchen */
444   if (proc!=creator) { 
445
446     /* only possible if we simulate GUM style fishing */
447     ASSERT(RtsFlags.GranFlags.Fishing);
448
449     /* Message packing costs for sending a Fish; qeq jabbI'ID */
450     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
451   
452     if (RtsFlags.GranFlags.GranSimStats.Sparks)
453       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
454                        (StgTSO*)NULL, spark->node,
455                        spark->name, spark_queue_len(proc));
456
457     /* time of the spark arrival on the remote PE */
458     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
459
460     new_event(creator, proc, spark_arrival_time,
461               MoveSpark,
462               (StgTSO*)NULL, spark->node, spark);
463
464     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
465             
466   } else { /* proc==creator i.e. turn the spark into a thread */
467
468     if ( RtsFlags.GranFlags.GranSimStats.Global && 
469          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
470
471       globalGranStats.tot_low_pri_sparks++;
472       IF_GRAN_DEBUG(pri,
473                     belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
474                           spark->gran_info, 
475                           spark->node, spark->name));
476     } 
477     
478     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
479     
480     node = spark->node;
481     
482 # if 0
483     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
484     if (GARBAGE COLLECTION IS NECESSARY) {
485       /* Some kind of backoff needed here in case there's too little heap */
486 #  if defined(GRAN_CHECK) && defined(GRAN)
487       if (RtsFlags.GcFlags.giveStats)
488         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
489                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
490                 spark, node, spark->name);
491 #  endif
492       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
493                   FindWork,
494                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
495       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
496       GarbageCollect(GetRoots, rtsFalse);
497       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
498       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
499       spark = NULL;
500       return; /* was: continue; */ /* to the next event, eventually */
501     }
502 # endif
503     
504     if (RtsFlags.GranFlags.GranSimStats.Sparks)
505       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
506                        spark->node, spark->name,
507                        spark_queue_len(CurrentProc));
508     
509     new_event(proc, proc, CurrentTime[proc],
510               StartThread, 
511               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
512     
513     procStatus[proc] = Starting;
514   }
515 }
516
517 /* -------------------------------------------------------------------------
518    This is the main point where handling granularity information comes into
519    play. 
520    ------------------------------------------------------------------------- */
521
522 #define MAX_RAND_PRI    100
523
524 /* 
525    Granularity info transformers. 
526    Applied to the GRAN_INFO field of a spark.
527 */
528 static inline nat  ID(nat x) { return(x); };
529 static inline nat  INV(nat x) { return(-x); };
530 static inline nat  IGNORE(nat x) { return (0); };
531 static inline nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
532
533 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
534 //@cindex newSpark
535 rtsSpark *
536 newSpark(node,name,gran_info,size_info,par_info,local)
537 StgClosure *node;
538 nat name, gran_info, size_info, par_info, local;
539 {
540   nat pri;
541   rtsSpark *newspark;
542
543   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
544         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
545         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
546                            ID(gran_info);
547
548   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
549        pri<RtsFlags.GranFlags.SparkPriority ) {
550     IF_GRAN_DEBUG(pri,
551       belch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
552               pri, RtsFlags.GranFlags.SparkPriority, node, name));
553     return ((rtsSpark*)NULL);
554   }
555
556   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
557   newspark->prev = newspark->next = (rtsSpark*)NULL;
558   newspark->node = node;
559   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
560   newspark->gran_info = pri;
561   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
562
563   if (RtsFlags.GranFlags.GranSimStats.Global) {
564     globalGranStats.tot_sparks_created++;
565     globalGranStats.sparks_created_on_PE[CurrentProc]++;
566   }
567
568   return(newspark);
569 }
570
571 //@cindex disposeSpark
572 void
573 disposeSpark(spark)
574 rtsSpark *spark;
575 {
576   ASSERT(spark!=NULL);
577   free(spark);
578 }
579
580 //@cindex disposeSparkQ
581 void 
582 disposeSparkQ(spark)
583 rtsSparkQ spark;
584 {
585   if (spark==NULL) 
586     return;
587
588   disposeSparkQ(spark->next);
589
590 # ifdef GRAN_CHECK
591   if (SparksAvail < 0) {
592     fprintf(stderr,"disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
593     print_spark(spark);
594   }
595 # endif
596
597   free(spark);
598 }
599
600 /*
601    With PrioritySparking add_to_spark_queue performs an insert sort to keep
602    the spark queue sorted. Otherwise the spark is just added to the end of
603    the queue. 
604 */
605
606 //@cindex add_to_spark_queue
607 void
608 add_to_spark_queue(spark)
609 rtsSpark *spark;
610 {
611   rtsSpark *prev = NULL, *next = NULL;
612   nat count = 0;
613   rtsBool found = rtsFalse;
614
615   if ( spark == (rtsSpark *)NULL ) {
616     return;
617   }
618
619   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
620     /* Priority sparking is enabled i.e. spark queues must be sorted */
621
622     for (prev = NULL, next = pending_sparks_hd, count=0;
623          (next != NULL) && 
624          !(found = (spark->gran_info >= next->gran_info));
625          prev = next, next = next->next, count++) 
626      {}
627
628   } else {   /* 'utQo' */
629     /* Priority sparking is disabled */
630     
631     found = rtsFalse;   /* to add it at the end */
632
633   }
634
635   if (found) {
636     /* next points to the first spark with a gran_info smaller than that
637        of spark; therefore, add spark before next into the spark queue */
638     spark->next = next;
639     if ( next == NULL ) {
640       pending_sparks_tl = spark;
641     } else {
642       next->prev = spark;
643     }
644     spark->prev = prev;
645     if ( prev == NULL ) {
646       pending_sparks_hd = spark;
647     } else {
648       prev->next = spark;
649     }
650   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
651     /* add the spark at the end of the spark queue */
652     spark->next = NULL;                        
653     spark->prev = pending_sparks_tl;
654     if (pending_sparks_hd == NULL)
655       pending_sparks_hd = spark;
656     else
657       pending_sparks_tl->next = spark;
658     pending_sparks_tl = spark;    
659   } 
660   ++SparksAvail;
661
662   /* add costs for search in priority sparking */
663   if (RtsFlags.GranFlags.DoPrioritySparking) {
664     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
665   }
666
667   IF_GRAN_DEBUG(checkSparkQ,
668                 belch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
669                       spark, spark->node, CurrentProc);
670                 print_sparkq_stats());
671
672 #  if defined(GRAN_CHECK)
673   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
674     for (prev = NULL, next =  pending_sparks_hd;
675          (next != NULL);
676          prev = next, next = next->next) 
677       {}
678     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
679       fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
680               spark,CurrentProc, 
681               pending_sparks_tl, prev);
682   }
683 #  endif
684
685 #  if defined(GRAN_CHECK)
686   /* Check if the sparkq is still sorted. Just for testing, really!  */
687   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
688        RtsFlags.GranFlags.Debug.pri ) {
689     rtsBool sorted = rtsTrue;
690     rtsSpark *prev, *next;
691
692     if (pending_sparks_hd == NULL ||
693         pending_sparks_hd->next == NULL ) {
694       /* just 1 elem => ok */
695     } else {
696       for (prev = pending_sparks_hd,
697            next = pending_sparks_hd->next;
698            (next != NULL) ;
699            prev = next, next = next->next) {
700         sorted = sorted && 
701                  (prev->gran_info >= next->gran_info);
702       }
703     }
704     if (!sorted) {
705       fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n",
706               CurrentProc);
707       print_sparkq(CurrentProc);
708     }
709   }
710 #  endif
711 }
712
713 //@node Aux fcts,  , Basic interface to sparkq, GranSim code
714 //@subsubsection Aux fcts
715
716 //@cindex spark_queue_len
717 nat
718 spark_queue_len(proc) 
719 PEs proc;
720 {
721  rtsSpark *prev, *spark;                     /* prev only for testing !! */
722  nat len;
723
724  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
725       spark != NULL; 
726       len++, prev = spark, spark = spark->next)
727    {}
728
729 #  if defined(GRAN_CHECK)
730   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
731     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
732       fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
733               proc, pending_sparks_tls[proc], prev);
734 #  endif
735
736  return (len);
737 }
738
739 /* 
740    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
741    hd and tl pointers of the spark queue. Returns a pointer to the next
742    spark in the queue.
743 */
744 //@cindex delete_from_sparkq
745 rtsSpark *
746 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
747 rtsSpark *spark;
748 PEs p;
749 rtsBool dispose_too;
750 {
751   rtsSpark *new_spark;
752
753   if (spark==NULL) 
754     barf("delete_from_sparkq: trying to delete NULL spark\n");
755
756 #  if defined(GRAN_CHECK)
757   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
758     fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
759             pending_sparks_hd, pending_sparks_tl,
760             spark->prev, spark, spark->next, 
761             (spark->next==NULL ? 0 : spark->next->prev));
762   }
763 #  endif
764
765   if (spark->prev==NULL) {
766     /* spark is first spark of queue => adjust hd pointer */
767     ASSERT(pending_sparks_hds[p]==spark);
768     pending_sparks_hds[p] = spark->next;
769   } else {
770     spark->prev->next = spark->next;
771   }
772   if (spark->next==NULL) {
773     ASSERT(pending_sparks_tls[p]==spark);
774     /* spark is first spark of queue => adjust tl pointer */
775     pending_sparks_tls[p] = spark->prev;
776   } else {
777     spark->next->prev = spark->prev;
778   }
779   new_spark = spark->next;
780   
781 #  if defined(GRAN_CHECK)
782   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
783     fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
784             pending_sparks_hd, pending_sparks_tl,
785             spark->prev, spark, spark->next, 
786             (spark->next==NULL ? 0 : spark->next->prev), spark);
787   }
788 #  endif
789
790   if (dispose_too)
791     disposeSpark(spark);
792                   
793   return new_spark;
794 }
795
796 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
797 //@cindex markSparkQueue
798 void
799 markSparkQueue(void)
800
801   StgClosure *MarkRoot(StgClosure *root); // prototype
802   PEs p;
803   rtsSpark *sp;
804
805   for (p=0; p<RtsFlags.GranFlags.proc; p++)
806     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
807       ASSERT(sp->node!=NULL);
808       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
809       // ToDo?: statistics gathering here (also for GUM!)
810       sp->node = (StgClosure *)MarkRoot(sp->node);
811     }
812   IF_DEBUG(gc,
813            belch("@@ markSparkQueue: spark statistics at start of GC:");
814            print_sparkq_stats());
815 }
816
817 //@cindex print_spark
818 void
819 print_spark(spark)
820 rtsSpark *spark;
821
822   char str[16];
823
824   if (spark==NULL) {
825     fprintf(stderr,"Spark: NIL\n");
826     return;
827   } else {
828     sprintf(str,
829             ((spark->node==NULL) ? "______" : "%#6lx"), 
830             stgCast(StgPtr,spark->node));
831
832     fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
833             str, spark->name, 
834             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
835             spark->prev, spark->next);
836   }
837 }
838
839 //@cindex print_sparkq
840 void
841 print_sparkq(proc)
842 PEs proc;
843 // rtsSpark *hd;
844 {
845   rtsSpark *x = pending_sparks_hds[proc];
846
847   fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x);
848   for (; x!=(rtsSpark*)NULL; x=x->next) {
849     print_spark(x);
850   }
851 }
852
853 /* 
854    Print a statistics of all spark queues.
855 */
856 //@cindex print_sparkq_stats
857 void
858 print_sparkq_stats(void)
859 {
860   PEs p;
861
862   fprintf(stderr, "SparkQs: [");
863   for (p=0; p<RtsFlags.GranFlags.proc; p++)
864     fprintf(stderr, ", PE %d: %d", p, spark_queue_len(p));
865   fprintf(stderr, "\n");
866 }
867
868 #endif