12af2963809e0f414ac04ec41ce99719f7cc3a20
[ghc-hetmet.git] / ghc / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2005
4  *
5  * Sparking support for PARALLEL_HASKELL and SMP versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Schedule.h"
12 #include "SchedAPI.h"
13 #include "Storage.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24
25 #if defined(SMP) || defined(PARALLEL_HASKELL)
26
27 static INLINE_ME void bump_hd (StgSparkPool *p)
28 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
29
30 static INLINE_ME void bump_tl (StgSparkPool *p)
31 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
32
33 /* -----------------------------------------------------------------------------
34  * 
35  * Initialising spark pools.
36  *
37  * -------------------------------------------------------------------------- */
38
39 static void 
40 initSparkPool(StgSparkPool *pool)
41 {
42     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
43                                 * sizeof(StgClosure *),
44                                 "initSparkPools");
45     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
46     pool->hd  = pool->base;
47     pool->tl  = pool->base;
48 }
49
50 void
51 initSparkPools( void )
52 {
53 #ifdef SMP
54     /* walk over the capabilities, allocating a spark pool for each one */
55     nat i;
56     for (i = 0; i < n_capabilities; i++) {
57         initSparkPool(&capabilities[i].r.rSparks);
58     }
59 #else
60     /* allocate a single spark pool */
61     initSparkPool(&MainCapability.r.rSparks);
62 #endif
63 }
64
65 /* -----------------------------------------------------------------------------
66  * 
67  * findSpark: find a spark on the current Capability that we can fork
68  * into a thread.
69  *
70  * -------------------------------------------------------------------------- */
71
72 StgClosure *
73 findSpark (Capability *cap)
74 {
75     StgSparkPool *pool;
76     StgClosure *spark;
77     
78     pool = &(cap->r.rSparks);
79     ASSERT_SPARK_POOL_INVARIANTS(pool);
80
81     while (pool->hd != pool->tl) {
82         spark = *pool->hd;
83         bump_hd(pool);
84         if (closure_SHOULD_SPARK(spark)) {
85 #ifdef GRAN
86             if (RtsFlags.ParFlags.ParStats.Sparks) 
87                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
88                                  GR_STEALING, ((StgTSO *)NULL), spark, 
89                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
90 #endif
91             return spark;
92         }
93     }
94     // spark pool is now empty
95     return NULL;
96 }
97
98 /* -----------------------------------------------------------------------------
99  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
100  * implicit slide i.e. after marking all sparks are at the beginning of the
101  * spark pool and the spark pool only contains sparkable closures 
102  * -------------------------------------------------------------------------- */
103
104 void
105 markSparkQueue (evac_fn evac)
106
107     StgClosure **sparkp, **to_sparkp;
108     nat i, n, pruned_sparks; // stats only
109     StgSparkPool *pool;
110     Capability *cap;
111     
112     PAR_TICKY_MARK_SPARK_QUEUE_START();
113     
114     n = 0;
115     pruned_sparks = 0;
116     for (i = 0; i < n_capabilities; i++) {
117         cap = &capabilities[i];
118         pool = &(cap->r.rSparks);
119         
120         ASSERT_SPARK_POOL_INVARIANTS(pool);
121
122 #if defined(PARALLEL_HASKELL)
123         // stats only
124         n = 0;
125         pruned_sparks = 0;
126 #endif
127         
128         sparkp = pool->hd;
129         to_sparkp = pool->hd;
130         while (sparkp != pool->tl) {
131             ASSERT(to_sparkp<=sparkp);
132             ASSERT(*sparkp!=NULL);
133             ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
134             // ToDo?: statistics gathering here (also for GUM!)
135             if (closure_SHOULD_SPARK(*sparkp)) {
136                 evac(sparkp);
137                 *to_sparkp++ = *sparkp;
138                 n++;
139             } else {
140                 pruned_sparks++;
141             }
142             sparkp++;
143             if (sparkp == pool->lim) {
144                 sparkp = pool->base;
145             }
146         }
147         pool->tl = to_sparkp;
148         
149         PAR_TICKY_MARK_SPARK_QUEUE_END(n);
150         
151 #if defined(PARALLEL_HASKELL)
152         IF_DEBUG(scheduler,
153                  debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
154                             n, pruned_sparks, mytid));
155 #else
156         IF_DEBUG(scheduler,
157                debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n",
158                           n, pruned_sparks));
159 #endif
160         
161         IF_DEBUG(scheduler,
162                  debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)\n",
163                             sparkPoolSize(pool), pool->hd, pool->tl));
164         
165     }
166 }
167
168 /* -----------------------------------------------------------------------------
169  * 
170  * Turn a spark into a real thread
171  *
172  * -------------------------------------------------------------------------- */
173
174 void
175 createSparkThread (Capability *cap, StgClosure *p)
176 {
177     StgTSO *tso;
178
179     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
180     appendToRunQueue(cap,tso);
181 }
182
183 /* -----------------------------------------------------------------------------
184  * 
185  * Create a new spark
186  *
187  * -------------------------------------------------------------------------- */
188
189 #define DISCARD_NEW
190
191 StgInt
192 newSpark (StgRegTable *reg, StgClosure *p)
193 {
194     StgSparkPool *pool = &(reg->rSparks);
195
196     ASSERT_SPARK_POOL_INVARIANTS(pool);
197
198     if (closure_SHOULD_SPARK(p)) {
199 #ifdef DISCARD_NEW
200         StgClosure **new_tl;
201         new_tl = pool->tl + 1;
202         if (new_tl == pool->lim) { new_tl = pool->base; }
203         if (new_tl != pool->hd) {
204             *pool->tl = p;
205             pool->tl = new_tl;
206         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
207             // if the old closure is not sparkable, discard it and
208             // keep the new one.  Otherwise, keep the old one.
209             *pool->tl = p;
210             bump_hd(pool);
211         }
212 #else  /* DISCARD OLD */
213         *pool->tl = p;
214         bump_tl(pool);
215         if (pool->tl == pool->hd) { bump_hd(pool); }
216 #endif
217     }   
218
219     ASSERT_SPARK_POOL_INVARIANTS(pool);
220     return 1;
221 }
222
223 #endif /* PARALLEL_HASKELL || SMP */
224
225 /* -----------------------------------------------------------------------------
226  * 
227  * GRAN & PARALLEL_HASKELL stuff beyond here.
228  *
229  * -------------------------------------------------------------------------- */
230
231 #if defined(PARALLEL_HASKELL) || defined(GRAN)
232
233 static void slide_spark_pool( StgSparkPool *pool );
234
235 rtsBool
236 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
237 {
238   if (pool->tl == pool->lim)
239     slide_spark_pool(pool);
240
241   if (closure_SHOULD_SPARK(closure) && 
242       pool->tl < pool->lim) {
243     *(pool->tl++) = closure;
244
245 #if defined(PARALLEL_HASKELL)
246     // collect parallel global statistics (currently done together with GC stats)
247     if (RtsFlags.ParFlags.ParStats.Global &&
248         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
249       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
250       globalParStats.tot_sparks_created++;
251     }
252 #endif
253     return rtsTrue;
254   } else {
255 #if defined(PARALLEL_HASKELL)
256     // collect parallel global statistics (currently done together with GC stats)
257     if (RtsFlags.ParFlags.ParStats.Global &&
258         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
259       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
260       globalParStats.tot_sparks_ignored++;
261     }
262 #endif
263     return rtsFalse;
264   }
265 }
266
267 static void
268 slide_spark_pool( StgSparkPool *pool )
269 {
270   StgClosure **sparkp, **to_sparkp;
271
272   sparkp = pool->hd;
273   to_sparkp = pool->base;
274   while (sparkp < pool->tl) {
275     ASSERT(to_sparkp<=sparkp);
276     ASSERT(*sparkp!=NULL);
277     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
278
279     if (closure_SHOULD_SPARK(*sparkp)) {
280       *to_sparkp++ = *sparkp++;
281     } else {
282       sparkp++;
283     }
284   }
285   pool->hd = pool->base;
286   pool->tl = to_sparkp;
287 }
288
289 void
290 disposeSpark(spark)
291 StgClosure *spark;
292 {
293 #if !defined(SMP)
294   Capability *cap;
295   StgSparkPool *pool;
296
297   cap = &MainRegTable;
298   pool = &(cap->rSparks);
299   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
300 #endif
301   ASSERT(spark != (StgClosure *)NULL);
302   /* Do nothing */
303 }
304
305
306 #elif defined(GRAN)
307
308 /* 
309    Search the spark queue of the proc in event for a spark that's worth
310    turning into a thread 
311    (was gimme_spark in the old RTS)
312 */
313 void
314 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
315 {
316    PEs proc = event->proc,       /* proc to search for work */
317        creator = event->creator; /* proc that requested work */
318    StgClosure* node;
319    rtsBool found;
320    rtsSparkQ spark_of_non_local_node = NULL, 
321              spark_of_non_local_node_prev = NULL, 
322              low_priority_spark = NULL, 
323              low_priority_spark_prev = NULL,
324              spark = NULL, prev = NULL;
325   
326    /* Choose a spark from the local spark queue */
327    prev = (rtsSpark*)NULL;
328    spark = pending_sparks_hds[proc];
329    found = rtsFalse;
330
331    // ToDo: check this code & implement local sparking !! -- HWL  
332    while (!found && spark != (rtsSpark*)NULL)
333      {
334        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
335               (prev==(rtsSpark*)NULL || prev->next==spark) &&
336               (spark->prev==prev));
337        node = spark->node;
338        if (!closure_SHOULD_SPARK(node)) 
339          {
340            IF_GRAN_DEBUG(checkSparkQ,
341                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
342                                spark, node));
343
344            if (RtsFlags.GranFlags.GranSimStats.Sparks)
345              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
346                               spark->node, spark->name, spark_queue_len(proc));
347   
348            ASSERT(spark != (rtsSpark*)NULL);
349            ASSERT(SparksAvail>0);
350            --SparksAvail;
351
352            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
353            spark = delete_from_sparkq (spark, proc, rtsTrue);
354            if (spark != (rtsSpark*)NULL)
355              prev = spark->prev;
356            continue;
357          }
358        /* -- node should eventually be sparked */
359        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
360                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
361          {
362            barf("Local sparking not yet implemented");
363
364            /* Remember first low priority spark */
365            if (spark_of_non_local_node==(rtsSpark*)NULL) {
366              spark_of_non_local_node_prev = prev;
367              spark_of_non_local_node = spark;
368               }
369   
370            if (spark->next == (rtsSpark*)NULL) { 
371              /* ASSERT(spark==SparkQueueTl);  just for testing */
372              prev = spark_of_non_local_node_prev;
373              spark = spark_of_non_local_node;
374              found = rtsTrue;
375              break;
376            }
377   
378 # if defined(GRAN) && defined(GRAN_CHECK)
379            /* Should never happen; just for testing 
380            if (spark==pending_sparks_tl) {
381              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
382                 stg_exit(EXIT_FAILURE);
383                 } */
384 # endif
385            prev = spark; 
386            spark = spark->next;
387            ASSERT(SparksAvail>0);
388            --SparksAvail;
389            continue;
390          }
391        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
392                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
393          {
394            if (RtsFlags.GranFlags.DoPrioritySparking)
395              barf("Priority sparking not yet implemented");
396
397            found = rtsTrue;
398          }
399 #if 0      
400        else /* only used if SparkPriority2 is defined */
401          {
402            /* ToDo: fix the code below and re-integrate it */
403            /* Remember first low priority spark */
404            if (low_priority_spark==(rtsSpark*)NULL) { 
405              low_priority_spark_prev = prev;
406              low_priority_spark = spark;
407            }
408   
409            if (spark->next == (rtsSpark*)NULL) { 
410                 /* ASSERT(spark==spark_queue_tl);  just for testing */
411              prev = low_priority_spark_prev;
412              spark = low_priority_spark;
413              found = rtsTrue;       /* take low pri spark => rc is 2  */
414              break;
415            }
416   
417            /* Should never happen; just for testing 
418            if (spark==pending_sparks_tl) {
419              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
420                 stg_exit(EXIT_FAILURE);
421              break;
422            } */                
423            prev = spark; 
424            spark = spark->next;
425
426            IF_GRAN_DEBUG(pri,
427                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
428                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
429                                spark->node, spark->name);)
430            }
431 #endif
432    }  /* while (spark!=NULL && !found) */
433
434    *spark_res = spark;
435    *found_res = found;
436 }
437
438 /*
439   Turn the spark into a thread.
440   In GranSim this basically means scheduling a StartThread event for the
441   node pointed to by the spark at some point in the future.
442   (was munch_spark in the old RTS)
443 */
444 rtsBool
445 activateSpark (rtsEvent *event, rtsSparkQ spark) 
446 {
447   PEs proc = event->proc,       /* proc to search for work */
448       creator = event->creator; /* proc that requested work */
449   StgTSO* tso;
450   StgClosure* node;
451   rtsTime spark_arrival_time;
452
453   /* 
454      We've found a node on PE proc requested by PE creator.
455      If proc==creator we can turn the spark into a thread immediately;
456      otherwise we schedule a MoveSpark event on the requesting PE
457   */
458      
459   /* DaH Qu' yIchen */
460   if (proc!=creator) { 
461
462     /* only possible if we simulate GUM style fishing */
463     ASSERT(RtsFlags.GranFlags.Fishing);
464
465     /* Message packing costs for sending a Fish; qeq jabbI'ID */
466     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
467   
468     if (RtsFlags.GranFlags.GranSimStats.Sparks)
469       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
470                        (StgTSO*)NULL, spark->node,
471                        spark->name, spark_queue_len(proc));
472
473     /* time of the spark arrival on the remote PE */
474     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
475
476     new_event(creator, proc, spark_arrival_time,
477               MoveSpark,
478               (StgTSO*)NULL, spark->node, spark);
479
480     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
481             
482   } else { /* proc==creator i.e. turn the spark into a thread */
483
484     if ( RtsFlags.GranFlags.GranSimStats.Global && 
485          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
486
487       globalGranStats.tot_low_pri_sparks++;
488       IF_GRAN_DEBUG(pri,
489                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
490                           spark->gran_info, 
491                           spark->node, spark->name));
492     } 
493     
494     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
495     
496     node = spark->node;
497     
498 # if 0
499     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
500     if (GARBAGE COLLECTION IS NECESSARY) {
501       /* Some kind of backoff needed here in case there's too little heap */
502 #  if defined(GRAN_CHECK) && defined(GRAN)
503       if (RtsFlags.GcFlags.giveStats)
504         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
505                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
506                 spark, node, spark->name);
507 #  endif
508       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
509                   FindWork,
510                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
511       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
512       GarbageCollect(GetRoots, rtsFalse);
513       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
514       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
515       spark = NULL;
516       return; /* was: continue; */ /* to the next event, eventually */
517     }
518 # endif
519     
520     if (RtsFlags.GranFlags.GranSimStats.Sparks)
521       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
522                        spark->node, spark->name,
523                        spark_queue_len(CurrentProc));
524     
525     new_event(proc, proc, CurrentTime[proc],
526               StartThread, 
527               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
528     
529     procStatus[proc] = Starting;
530   }
531 }
532
533 /* -------------------------------------------------------------------------
534    This is the main point where handling granularity information comes into
535    play. 
536    ------------------------------------------------------------------------- */
537
538 #define MAX_RAND_PRI    100
539
540 /* 
541    Granularity info transformers. 
542    Applied to the GRAN_INFO field of a spark.
543 */
544 STATIC_INLINE nat  ID(nat x) { return(x); };
545 STATIC_INLINE nat  INV(nat x) { return(-x); };
546 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
547 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
548
549 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
550 rtsSpark *
551 newSpark(node,name,gran_info,size_info,par_info,local)
552 StgClosure *node;
553 nat name, gran_info, size_info, par_info, local;
554 {
555   nat pri;
556   rtsSpark *newspark;
557
558   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
559         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
560         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
561                            ID(gran_info);
562
563   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
564        pri<RtsFlags.GranFlags.SparkPriority ) {
565     IF_GRAN_DEBUG(pri,
566       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
567               pri, RtsFlags.GranFlags.SparkPriority, node, name));
568     return ((rtsSpark*)NULL);
569   }
570
571   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
572   newspark->prev = newspark->next = (rtsSpark*)NULL;
573   newspark->node = node;
574   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
575   newspark->gran_info = pri;
576   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
577
578   if (RtsFlags.GranFlags.GranSimStats.Global) {
579     globalGranStats.tot_sparks_created++;
580     globalGranStats.sparks_created_on_PE[CurrentProc]++;
581   }
582
583   return(newspark);
584 }
585
586 void
587 disposeSpark(spark)
588 rtsSpark *spark;
589 {
590   ASSERT(spark!=NULL);
591   stgFree(spark);
592 }
593
594 void 
595 disposeSparkQ(spark)
596 rtsSparkQ spark;
597 {
598   if (spark==NULL) 
599     return;
600
601   disposeSparkQ(spark->next);
602
603 # ifdef GRAN_CHECK
604   if (SparksAvail < 0) {
605     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
606     print_spark(spark);
607   }
608 # endif
609
610   stgFree(spark);
611 }
612
613 /*
614    With PrioritySparking add_to_spark_queue performs an insert sort to keep
615    the spark queue sorted. Otherwise the spark is just added to the end of
616    the queue. 
617 */
618
619 void
620 add_to_spark_queue(spark)
621 rtsSpark *spark;
622 {
623   rtsSpark *prev = NULL, *next = NULL;
624   nat count = 0;
625   rtsBool found = rtsFalse;
626
627   if ( spark == (rtsSpark *)NULL ) {
628     return;
629   }
630
631   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
632     /* Priority sparking is enabled i.e. spark queues must be sorted */
633
634     for (prev = NULL, next = pending_sparks_hd, count=0;
635          (next != NULL) && 
636          !(found = (spark->gran_info >= next->gran_info));
637          prev = next, next = next->next, count++) 
638      {}
639
640   } else {   /* 'utQo' */
641     /* Priority sparking is disabled */
642     
643     found = rtsFalse;   /* to add it at the end */
644
645   }
646
647   if (found) {
648     /* next points to the first spark with a gran_info smaller than that
649        of spark; therefore, add spark before next into the spark queue */
650     spark->next = next;
651     if ( next == NULL ) {
652       pending_sparks_tl = spark;
653     } else {
654       next->prev = spark;
655     }
656     spark->prev = prev;
657     if ( prev == NULL ) {
658       pending_sparks_hd = spark;
659     } else {
660       prev->next = spark;
661     }
662   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
663     /* add the spark at the end of the spark queue */
664     spark->next = NULL;                        
665     spark->prev = pending_sparks_tl;
666     if (pending_sparks_hd == NULL)
667       pending_sparks_hd = spark;
668     else
669       pending_sparks_tl->next = spark;
670     pending_sparks_tl = spark;    
671   } 
672   ++SparksAvail;
673
674   /* add costs for search in priority sparking */
675   if (RtsFlags.GranFlags.DoPrioritySparking) {
676     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
677   }
678
679   IF_GRAN_DEBUG(checkSparkQ,
680                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
681                       spark, spark->node, CurrentProc);
682                 print_sparkq_stats());
683
684 #  if defined(GRAN_CHECK)
685   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
686     for (prev = NULL, next =  pending_sparks_hd;
687          (next != NULL);
688          prev = next, next = next->next) 
689       {}
690     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
691       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
692               spark,CurrentProc, 
693               pending_sparks_tl, prev);
694   }
695 #  endif
696
697 #  if defined(GRAN_CHECK)
698   /* Check if the sparkq is still sorted. Just for testing, really!  */
699   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
700        RtsFlags.GranFlags.Debug.pri ) {
701     rtsBool sorted = rtsTrue;
702     rtsSpark *prev, *next;
703
704     if (pending_sparks_hd == NULL ||
705         pending_sparks_hd->next == NULL ) {
706       /* just 1 elem => ok */
707     } else {
708       for (prev = pending_sparks_hd,
709            next = pending_sparks_hd->next;
710            (next != NULL) ;
711            prev = next, next = next->next) {
712         sorted = sorted && 
713                  (prev->gran_info >= next->gran_info);
714       }
715     }
716     if (!sorted) {
717       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
718               CurrentProc);
719       print_sparkq(CurrentProc);
720     }
721   }
722 #  endif
723 }
724
725 nat
726 spark_queue_len(proc) 
727 PEs proc;
728 {
729  rtsSpark *prev, *spark;                     /* prev only for testing !! */
730  nat len;
731
732  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
733       spark != NULL; 
734       len++, prev = spark, spark = spark->next)
735    {}
736
737 #  if defined(GRAN_CHECK)
738   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
739     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
740       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
741               proc, pending_sparks_tls[proc], prev);
742 #  endif
743
744  return (len);
745 }
746
747 /* 
748    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
749    hd and tl pointers of the spark queue. Returns a pointer to the next
750    spark in the queue.
751 */
752 rtsSpark *
753 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
754 rtsSpark *spark;
755 PEs p;
756 rtsBool dispose_too;
757 {
758   rtsSpark *new_spark;
759
760   if (spark==NULL) 
761     barf("delete_from_sparkq: trying to delete NULL spark\n");
762
763 #  if defined(GRAN_CHECK)
764   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
765     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
766             pending_sparks_hd, pending_sparks_tl,
767             spark->prev, spark, spark->next, 
768             (spark->next==NULL ? 0 : spark->next->prev));
769   }
770 #  endif
771
772   if (spark->prev==NULL) {
773     /* spark is first spark of queue => adjust hd pointer */
774     ASSERT(pending_sparks_hds[p]==spark);
775     pending_sparks_hds[p] = spark->next;
776   } else {
777     spark->prev->next = spark->next;
778   }
779   if (spark->next==NULL) {
780     ASSERT(pending_sparks_tls[p]==spark);
781     /* spark is first spark of queue => adjust tl pointer */
782     pending_sparks_tls[p] = spark->prev;
783   } else {
784     spark->next->prev = spark->prev;
785   }
786   new_spark = spark->next;
787   
788 #  if defined(GRAN_CHECK)
789   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
790     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
791             pending_sparks_hd, pending_sparks_tl,
792             spark->prev, spark, spark->next, 
793             (spark->next==NULL ? 0 : spark->next->prev), spark);
794   }
795 #  endif
796
797   if (dispose_too)
798     disposeSpark(spark);
799                   
800   return new_spark;
801 }
802
803 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
804 void
805 markSparkQueue(void)
806
807   StgClosure *MarkRoot(StgClosure *root); // prototype
808   PEs p;
809   rtsSpark *sp;
810
811   for (p=0; p<RtsFlags.GranFlags.proc; p++)
812     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
813       ASSERT(sp->node!=NULL);
814       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
815       // ToDo?: statistics gathering here (also for GUM!)
816       sp->node = (StgClosure *)MarkRoot(sp->node);
817     }
818   IF_DEBUG(gc,
819            debugBelch("@@ markSparkQueue: spark statistics at start of GC:");
820            print_sparkq_stats());
821 }
822
823 void
824 print_spark(spark)
825 rtsSpark *spark;
826
827   char str[16];
828
829   if (spark==NULL) {
830     debugBelch("Spark: NIL\n");
831     return;
832   } else {
833     sprintf(str,
834             ((spark->node==NULL) ? "______" : "%#6lx"), 
835             stgCast(StgPtr,spark->node));
836
837     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
838             str, spark->name, 
839             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
840             spark->prev, spark->next);
841   }
842 }
843
844 void
845 print_sparkq(proc)
846 PEs proc;
847 // rtsSpark *hd;
848 {
849   rtsSpark *x = pending_sparks_hds[proc];
850
851   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
852   for (; x!=(rtsSpark*)NULL; x=x->next) {
853     print_spark(x);
854   }
855 }
856
857 /* 
858    Print a statistics of all spark queues.
859 */
860 void
861 print_sparkq_stats(void)
862 {
863   PEs p;
864
865   debugBelch("SparkQs: [");
866   for (p=0; p<RtsFlags.GranFlags.proc; p++)
867     debugBelch(", PE %d: %d", p, spark_queue_len(p));
868   debugBelch("\n");
869 }
870
871 #endif