New tracing interface
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2006
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Schedule.h"
12 #include "SchedAPI.h"
13 #include "Storage.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24 #include "Trace.h"
25
26 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
27
28 static INLINE_ME void bump_hd (StgSparkPool *p)
29 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
30
31 static INLINE_ME void bump_tl (StgSparkPool *p)
32 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
33
34 /* -----------------------------------------------------------------------------
35  * 
36  * Initialising spark pools.
37  *
38  * -------------------------------------------------------------------------- */
39
40 static void 
41 initSparkPool(StgSparkPool *pool)
42 {
43     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
44                                 * sizeof(StgClosure *),
45                                 "initSparkPools");
46     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
47     pool->hd  = pool->base;
48     pool->tl  = pool->base;
49 }
50
51 void
52 initSparkPools( void )
53 {
54 #ifdef THREADED_RTS
55     /* walk over the capabilities, allocating a spark pool for each one */
56     nat i;
57     for (i = 0; i < n_capabilities; i++) {
58         initSparkPool(&capabilities[i].r.rSparks);
59     }
60 #else
61     /* allocate a single spark pool */
62     initSparkPool(&MainCapability.r.rSparks);
63 #endif
64 }
65
66 /* -----------------------------------------------------------------------------
67  * 
68  * findSpark: find a spark on the current Capability that we can fork
69  * into a thread.
70  *
71  * -------------------------------------------------------------------------- */
72
73 StgClosure *
74 findSpark (Capability *cap)
75 {
76     StgSparkPool *pool;
77     StgClosure *spark;
78     
79     pool = &(cap->r.rSparks);
80     ASSERT_SPARK_POOL_INVARIANTS(pool);
81
82     while (pool->hd != pool->tl) {
83         spark = *pool->hd;
84         bump_hd(pool);
85         if (closure_SHOULD_SPARK(spark)) {
86 #ifdef GRAN
87             if (RtsFlags.ParFlags.ParStats.Sparks) 
88                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
89                                  GR_STEALING, ((StgTSO *)NULL), spark, 
90                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
91 #endif
92             return spark;
93         }
94     }
95     // spark pool is now empty
96     return NULL;
97 }
98
99 /* -----------------------------------------------------------------------------
100  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
101  * implicit slide i.e. after marking all sparks are at the beginning of the
102  * spark pool and the spark pool only contains sparkable closures 
103  * -------------------------------------------------------------------------- */
104
105 void
106 markSparkQueue (evac_fn evac)
107
108     StgClosure **sparkp, **to_sparkp;
109     nat i, n, pruned_sparks; // stats only
110     StgSparkPool *pool;
111     Capability *cap;
112     
113     PAR_TICKY_MARK_SPARK_QUEUE_START();
114     
115     n = 0;
116     pruned_sparks = 0;
117     for (i = 0; i < n_capabilities; i++) {
118         cap = &capabilities[i];
119         pool = &(cap->r.rSparks);
120         
121         ASSERT_SPARK_POOL_INVARIANTS(pool);
122
123 #if defined(PARALLEL_HASKELL)
124         // stats only
125         n = 0;
126         pruned_sparks = 0;
127 #endif
128         
129         sparkp = pool->hd;
130         to_sparkp = pool->hd;
131         while (sparkp != pool->tl) {
132             ASSERT(to_sparkp<=sparkp);
133             ASSERT(*sparkp!=NULL);
134             ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
135             // ToDo?: statistics gathering here (also for GUM!)
136             if (closure_SHOULD_SPARK(*sparkp)) {
137                 evac(sparkp);
138                 *to_sparkp++ = *sparkp;
139                 n++;
140             } else {
141                 pruned_sparks++;
142             }
143             sparkp++;
144             if (sparkp == pool->lim) {
145                 sparkp = pool->base;
146             }
147         }
148         pool->tl = to_sparkp;
149         
150         PAR_TICKY_MARK_SPARK_QUEUE_END(n);
151         
152 #if defined(PARALLEL_HASKELL)
153         debugTrace(DEBUG_sched, 
154                    "marked %d sparks and pruned %d sparks on [%x]",
155                    n, pruned_sparks, mytid);
156 #else
157         debugTrace(DEBUG_sched, 
158                    "marked %d sparks and pruned %d sparks",
159                    n, pruned_sparks);
160 #endif
161         
162         debugTrace(DEBUG_sched,
163                    "new spark queue len=%d; (hd=%p; tl=%p)\n",
164                    sparkPoolSize(pool), pool->hd, pool->tl);
165     }
166 }
167
168 /* -----------------------------------------------------------------------------
169  * 
170  * Turn a spark into a real thread
171  *
172  * -------------------------------------------------------------------------- */
173
174 void
175 createSparkThread (Capability *cap, StgClosure *p)
176 {
177     StgTSO *tso;
178
179     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
180     appendToRunQueue(cap,tso);
181 }
182
183 /* -----------------------------------------------------------------------------
184  * 
185  * Create a new spark
186  *
187  * -------------------------------------------------------------------------- */
188
189 #define DISCARD_NEW
190
191 StgInt
192 newSpark (StgRegTable *reg, StgClosure *p)
193 {
194     StgSparkPool *pool = &(reg->rSparks);
195
196     ASSERT_SPARK_POOL_INVARIANTS(pool);
197
198     if (closure_SHOULD_SPARK(p)) {
199 #ifdef DISCARD_NEW
200         StgClosure **new_tl;
201         new_tl = pool->tl + 1;
202         if (new_tl == pool->lim) { new_tl = pool->base; }
203         if (new_tl != pool->hd) {
204             *pool->tl = p;
205             pool->tl = new_tl;
206         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
207             // if the old closure is not sparkable, discard it and
208             // keep the new one.  Otherwise, keep the old one.
209             *pool->tl = p;
210             bump_hd(pool);
211         }
212 #else  /* DISCARD OLD */
213         *pool->tl = p;
214         bump_tl(pool);
215         if (pool->tl == pool->hd) { bump_hd(pool); }
216 #endif
217     }   
218
219     ASSERT_SPARK_POOL_INVARIANTS(pool);
220     return 1;
221 }
222
223 #else
224
225 StgInt
226 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
227 {
228     /* nothing */
229     return 1;
230 }
231
232 #endif /* PARALLEL_HASKELL || THREADED_RTS */
233
234
235 /* -----------------------------------------------------------------------------
236  * 
237  * GRAN & PARALLEL_HASKELL stuff beyond here.
238  *
239  * -------------------------------------------------------------------------- */
240
241 #if defined(PARALLEL_HASKELL) || defined(GRAN)
242
243 static void slide_spark_pool( StgSparkPool *pool );
244
245 rtsBool
246 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
247 {
248   if (pool->tl == pool->lim)
249     slide_spark_pool(pool);
250
251   if (closure_SHOULD_SPARK(closure) && 
252       pool->tl < pool->lim) {
253     *(pool->tl++) = closure;
254
255 #if defined(PARALLEL_HASKELL)
256     // collect parallel global statistics (currently done together with GC stats)
257     if (RtsFlags.ParFlags.ParStats.Global &&
258         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
259       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
260       globalParStats.tot_sparks_created++;
261     }
262 #endif
263     return rtsTrue;
264   } else {
265 #if defined(PARALLEL_HASKELL)
266     // collect parallel global statistics (currently done together with GC stats)
267     if (RtsFlags.ParFlags.ParStats.Global &&
268         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
269       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
270       globalParStats.tot_sparks_ignored++;
271     }
272 #endif
273     return rtsFalse;
274   }
275 }
276
277 static void
278 slide_spark_pool( StgSparkPool *pool )
279 {
280   StgClosure **sparkp, **to_sparkp;
281
282   sparkp = pool->hd;
283   to_sparkp = pool->base;
284   while (sparkp < pool->tl) {
285     ASSERT(to_sparkp<=sparkp);
286     ASSERT(*sparkp!=NULL);
287     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
288
289     if (closure_SHOULD_SPARK(*sparkp)) {
290       *to_sparkp++ = *sparkp++;
291     } else {
292       sparkp++;
293     }
294   }
295   pool->hd = pool->base;
296   pool->tl = to_sparkp;
297 }
298
299 void
300 disposeSpark(spark)
301 StgClosure *spark;
302 {
303 #if !defined(THREADED_RTS)
304   Capability *cap;
305   StgSparkPool *pool;
306
307   cap = &MainRegTable;
308   pool = &(cap->rSparks);
309   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
310 #endif
311   ASSERT(spark != (StgClosure *)NULL);
312   /* Do nothing */
313 }
314
315
316 #elif defined(GRAN)
317
318 /* 
319    Search the spark queue of the proc in event for a spark that's worth
320    turning into a thread 
321    (was gimme_spark in the old RTS)
322 */
323 void
324 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
325 {
326    PEs proc = event->proc,       /* proc to search for work */
327        creator = event->creator; /* proc that requested work */
328    StgClosure* node;
329    rtsBool found;
330    rtsSparkQ spark_of_non_local_node = NULL, 
331              spark_of_non_local_node_prev = NULL, 
332              low_priority_spark = NULL, 
333              low_priority_spark_prev = NULL,
334              spark = NULL, prev = NULL;
335   
336    /* Choose a spark from the local spark queue */
337    prev = (rtsSpark*)NULL;
338    spark = pending_sparks_hds[proc];
339    found = rtsFalse;
340
341    // ToDo: check this code & implement local sparking !! -- HWL  
342    while (!found && spark != (rtsSpark*)NULL)
343      {
344        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
345               (prev==(rtsSpark*)NULL || prev->next==spark) &&
346               (spark->prev==prev));
347        node = spark->node;
348        if (!closure_SHOULD_SPARK(node)) 
349          {
350            IF_GRAN_DEBUG(checkSparkQ,
351                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
352                                spark, node));
353
354            if (RtsFlags.GranFlags.GranSimStats.Sparks)
355              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
356                               spark->node, spark->name, spark_queue_len(proc));
357   
358            ASSERT(spark != (rtsSpark*)NULL);
359            ASSERT(SparksAvail>0);
360            --SparksAvail;
361
362            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
363            spark = delete_from_sparkq (spark, proc, rtsTrue);
364            if (spark != (rtsSpark*)NULL)
365              prev = spark->prev;
366            continue;
367          }
368        /* -- node should eventually be sparked */
369        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
370                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
371          {
372            barf("Local sparking not yet implemented");
373
374            /* Remember first low priority spark */
375            if (spark_of_non_local_node==(rtsSpark*)NULL) {
376              spark_of_non_local_node_prev = prev;
377              spark_of_non_local_node = spark;
378               }
379   
380            if (spark->next == (rtsSpark*)NULL) { 
381              /* ASSERT(spark==SparkQueueTl);  just for testing */
382              prev = spark_of_non_local_node_prev;
383              spark = spark_of_non_local_node;
384              found = rtsTrue;
385              break;
386            }
387   
388 # if defined(GRAN) && defined(GRAN_CHECK)
389            /* Should never happen; just for testing 
390            if (spark==pending_sparks_tl) {
391              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
392                 stg_exit(EXIT_FAILURE);
393                 } */
394 # endif
395            prev = spark; 
396            spark = spark->next;
397            ASSERT(SparksAvail>0);
398            --SparksAvail;
399            continue;
400          }
401        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
402                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
403          {
404            if (RtsFlags.GranFlags.DoPrioritySparking)
405              barf("Priority sparking not yet implemented");
406
407            found = rtsTrue;
408          }
409 #if 0      
410        else /* only used if SparkPriority2 is defined */
411          {
412            /* ToDo: fix the code below and re-integrate it */
413            /* Remember first low priority spark */
414            if (low_priority_spark==(rtsSpark*)NULL) { 
415              low_priority_spark_prev = prev;
416              low_priority_spark = spark;
417            }
418   
419            if (spark->next == (rtsSpark*)NULL) { 
420                 /* ASSERT(spark==spark_queue_tl);  just for testing */
421              prev = low_priority_spark_prev;
422              spark = low_priority_spark;
423              found = rtsTrue;       /* take low pri spark => rc is 2  */
424              break;
425            }
426   
427            /* Should never happen; just for testing 
428            if (spark==pending_sparks_tl) {
429              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
430                 stg_exit(EXIT_FAILURE);
431              break;
432            } */                
433            prev = spark; 
434            spark = spark->next;
435
436            IF_GRAN_DEBUG(pri,
437                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
438                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
439                                spark->node, spark->name);)
440            }
441 #endif
442    }  /* while (spark!=NULL && !found) */
443
444    *spark_res = spark;
445    *found_res = found;
446 }
447
448 /*
449   Turn the spark into a thread.
450   In GranSim this basically means scheduling a StartThread event for the
451   node pointed to by the spark at some point in the future.
452   (was munch_spark in the old RTS)
453 */
454 rtsBool
455 activateSpark (rtsEvent *event, rtsSparkQ spark) 
456 {
457   PEs proc = event->proc,       /* proc to search for work */
458       creator = event->creator; /* proc that requested work */
459   StgTSO* tso;
460   StgClosure* node;
461   rtsTime spark_arrival_time;
462
463   /* 
464      We've found a node on PE proc requested by PE creator.
465      If proc==creator we can turn the spark into a thread immediately;
466      otherwise we schedule a MoveSpark event on the requesting PE
467   */
468      
469   /* DaH Qu' yIchen */
470   if (proc!=creator) { 
471
472     /* only possible if we simulate GUM style fishing */
473     ASSERT(RtsFlags.GranFlags.Fishing);
474
475     /* Message packing costs for sending a Fish; qeq jabbI'ID */
476     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
477   
478     if (RtsFlags.GranFlags.GranSimStats.Sparks)
479       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
480                        (StgTSO*)NULL, spark->node,
481                        spark->name, spark_queue_len(proc));
482
483     /* time of the spark arrival on the remote PE */
484     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
485
486     new_event(creator, proc, spark_arrival_time,
487               MoveSpark,
488               (StgTSO*)NULL, spark->node, spark);
489
490     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
491             
492   } else { /* proc==creator i.e. turn the spark into a thread */
493
494     if ( RtsFlags.GranFlags.GranSimStats.Global && 
495          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
496
497       globalGranStats.tot_low_pri_sparks++;
498       IF_GRAN_DEBUG(pri,
499                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
500                           spark->gran_info, 
501                           spark->node, spark->name));
502     } 
503     
504     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
505     
506     node = spark->node;
507     
508 # if 0
509     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
510     if (GARBAGE COLLECTION IS NECESSARY) {
511       /* Some kind of backoff needed here in case there's too little heap */
512 #  if defined(GRAN_CHECK) && defined(GRAN)
513       if (RtsFlags.GcFlags.giveStats)
514         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
515                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
516                 spark, node, spark->name);
517 #  endif
518       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
519                   FindWork,
520                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
521       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
522       GarbageCollect(GetRoots, rtsFalse);
523       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
524       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
525       spark = NULL;
526       return; /* was: continue; */ /* to the next event, eventually */
527     }
528 # endif
529     
530     if (RtsFlags.GranFlags.GranSimStats.Sparks)
531       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
532                        spark->node, spark->name,
533                        spark_queue_len(CurrentProc));
534     
535     new_event(proc, proc, CurrentTime[proc],
536               StartThread, 
537               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
538     
539     procStatus[proc] = Starting;
540   }
541 }
542
543 /* -------------------------------------------------------------------------
544    This is the main point where handling granularity information comes into
545    play. 
546    ------------------------------------------------------------------------- */
547
548 #define MAX_RAND_PRI    100
549
550 /* 
551    Granularity info transformers. 
552    Applied to the GRAN_INFO field of a spark.
553 */
554 STATIC_INLINE nat  ID(nat x) { return(x); };
555 STATIC_INLINE nat  INV(nat x) { return(-x); };
556 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
557 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
558
559 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
560 rtsSpark *
561 newSpark(node,name,gran_info,size_info,par_info,local)
562 StgClosure *node;
563 nat name, gran_info, size_info, par_info, local;
564 {
565   nat pri;
566   rtsSpark *newspark;
567
568   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
569         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
570         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
571                            ID(gran_info);
572
573   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
574        pri<RtsFlags.GranFlags.SparkPriority ) {
575     IF_GRAN_DEBUG(pri,
576       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
577               pri, RtsFlags.GranFlags.SparkPriority, node, name));
578     return ((rtsSpark*)NULL);
579   }
580
581   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
582   newspark->prev = newspark->next = (rtsSpark*)NULL;
583   newspark->node = node;
584   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
585   newspark->gran_info = pri;
586   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
587
588   if (RtsFlags.GranFlags.GranSimStats.Global) {
589     globalGranStats.tot_sparks_created++;
590     globalGranStats.sparks_created_on_PE[CurrentProc]++;
591   }
592
593   return(newspark);
594 }
595
596 void
597 disposeSpark(spark)
598 rtsSpark *spark;
599 {
600   ASSERT(spark!=NULL);
601   stgFree(spark);
602 }
603
604 void 
605 disposeSparkQ(spark)
606 rtsSparkQ spark;
607 {
608   if (spark==NULL) 
609     return;
610
611   disposeSparkQ(spark->next);
612
613 # ifdef GRAN_CHECK
614   if (SparksAvail < 0) {
615     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
616     print_spark(spark);
617   }
618 # endif
619
620   stgFree(spark);
621 }
622
623 /*
624    With PrioritySparking add_to_spark_queue performs an insert sort to keep
625    the spark queue sorted. Otherwise the spark is just added to the end of
626    the queue. 
627 */
628
629 void
630 add_to_spark_queue(spark)
631 rtsSpark *spark;
632 {
633   rtsSpark *prev = NULL, *next = NULL;
634   nat count = 0;
635   rtsBool found = rtsFalse;
636
637   if ( spark == (rtsSpark *)NULL ) {
638     return;
639   }
640
641   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
642     /* Priority sparking is enabled i.e. spark queues must be sorted */
643
644     for (prev = NULL, next = pending_sparks_hd, count=0;
645          (next != NULL) && 
646          !(found = (spark->gran_info >= next->gran_info));
647          prev = next, next = next->next, count++) 
648      {}
649
650   } else {   /* 'utQo' */
651     /* Priority sparking is disabled */
652     
653     found = rtsFalse;   /* to add it at the end */
654
655   }
656
657   if (found) {
658     /* next points to the first spark with a gran_info smaller than that
659        of spark; therefore, add spark before next into the spark queue */
660     spark->next = next;
661     if ( next == NULL ) {
662       pending_sparks_tl = spark;
663     } else {
664       next->prev = spark;
665     }
666     spark->prev = prev;
667     if ( prev == NULL ) {
668       pending_sparks_hd = spark;
669     } else {
670       prev->next = spark;
671     }
672   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
673     /* add the spark at the end of the spark queue */
674     spark->next = NULL;                        
675     spark->prev = pending_sparks_tl;
676     if (pending_sparks_hd == NULL)
677       pending_sparks_hd = spark;
678     else
679       pending_sparks_tl->next = spark;
680     pending_sparks_tl = spark;    
681   } 
682   ++SparksAvail;
683
684   /* add costs for search in priority sparking */
685   if (RtsFlags.GranFlags.DoPrioritySparking) {
686     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
687   }
688
689   IF_GRAN_DEBUG(checkSparkQ,
690                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
691                       spark, spark->node, CurrentProc);
692                 print_sparkq_stats());
693
694 #  if defined(GRAN_CHECK)
695   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
696     for (prev = NULL, next =  pending_sparks_hd;
697          (next != NULL);
698          prev = next, next = next->next) 
699       {}
700     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
701       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
702               spark,CurrentProc, 
703               pending_sparks_tl, prev);
704   }
705 #  endif
706
707 #  if defined(GRAN_CHECK)
708   /* Check if the sparkq is still sorted. Just for testing, really!  */
709   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
710        RtsFlags.GranFlags.Debug.pri ) {
711     rtsBool sorted = rtsTrue;
712     rtsSpark *prev, *next;
713
714     if (pending_sparks_hd == NULL ||
715         pending_sparks_hd->next == NULL ) {
716       /* just 1 elem => ok */
717     } else {
718       for (prev = pending_sparks_hd,
719            next = pending_sparks_hd->next;
720            (next != NULL) ;
721            prev = next, next = next->next) {
722         sorted = sorted && 
723                  (prev->gran_info >= next->gran_info);
724       }
725     }
726     if (!sorted) {
727       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
728               CurrentProc);
729       print_sparkq(CurrentProc);
730     }
731   }
732 #  endif
733 }
734
735 nat
736 spark_queue_len(proc) 
737 PEs proc;
738 {
739  rtsSpark *prev, *spark;                     /* prev only for testing !! */
740  nat len;
741
742  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
743       spark != NULL; 
744       len++, prev = spark, spark = spark->next)
745    {}
746
747 #  if defined(GRAN_CHECK)
748   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
749     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
750       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
751               proc, pending_sparks_tls[proc], prev);
752 #  endif
753
754  return (len);
755 }
756
757 /* 
758    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
759    hd and tl pointers of the spark queue. Returns a pointer to the next
760    spark in the queue.
761 */
762 rtsSpark *
763 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
764 rtsSpark *spark;
765 PEs p;
766 rtsBool dispose_too;
767 {
768   rtsSpark *new_spark;
769
770   if (spark==NULL) 
771     barf("delete_from_sparkq: trying to delete NULL spark\n");
772
773 #  if defined(GRAN_CHECK)
774   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
775     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
776             pending_sparks_hd, pending_sparks_tl,
777             spark->prev, spark, spark->next, 
778             (spark->next==NULL ? 0 : spark->next->prev));
779   }
780 #  endif
781
782   if (spark->prev==NULL) {
783     /* spark is first spark of queue => adjust hd pointer */
784     ASSERT(pending_sparks_hds[p]==spark);
785     pending_sparks_hds[p] = spark->next;
786   } else {
787     spark->prev->next = spark->next;
788   }
789   if (spark->next==NULL) {
790     ASSERT(pending_sparks_tls[p]==spark);
791     /* spark is first spark of queue => adjust tl pointer */
792     pending_sparks_tls[p] = spark->prev;
793   } else {
794     spark->next->prev = spark->prev;
795   }
796   new_spark = spark->next;
797   
798 #  if defined(GRAN_CHECK)
799   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
800     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
801             pending_sparks_hd, pending_sparks_tl,
802             spark->prev, spark, spark->next, 
803             (spark->next==NULL ? 0 : spark->next->prev), spark);
804   }
805 #  endif
806
807   if (dispose_too)
808     disposeSpark(spark);
809                   
810   return new_spark;
811 }
812
813 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
814 void
815 markSparkQueue(void)
816
817   StgClosure *MarkRoot(StgClosure *root); // prototype
818   PEs p;
819   rtsSpark *sp;
820
821   for (p=0; p<RtsFlags.GranFlags.proc; p++)
822     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
823       ASSERT(sp->node!=NULL);
824       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
825       // ToDo?: statistics gathering here (also for GUM!)
826       sp->node = (StgClosure *)MarkRoot(sp->node);
827     }
828
829   IF_DEBUG(gc,
830            debugBelch("markSparkQueue: spark statistics at start of GC:");
831            print_sparkq_stats());
832 }
833
834 void
835 print_spark(spark)
836 rtsSpark *spark;
837
838   char str[16];
839
840   if (spark==NULL) {
841     debugBelch("Spark: NIL\n");
842     return;
843   } else {
844     sprintf(str,
845             ((spark->node==NULL) ? "______" : "%#6lx"), 
846             stgCast(StgPtr,spark->node));
847
848     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
849             str, spark->name, 
850             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
851             spark->prev, spark->next);
852   }
853 }
854
855 void
856 print_sparkq(proc)
857 PEs proc;
858 // rtsSpark *hd;
859 {
860   rtsSpark *x = pending_sparks_hds[proc];
861
862   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
863   for (; x!=(rtsSpark*)NULL; x=x->next) {
864     print_spark(x);
865   }
866 }
867
868 /* 
869    Print a statistics of all spark queues.
870 */
871 void
872 print_sparkq_stats(void)
873 {
874   PEs p;
875
876   debugBelch("SparkQs: [");
877   for (p=0; p<RtsFlags.GranFlags.proc; p++)
878     debugBelch(", PE %d: %d", p, spark_queue_len(p));
879   debugBelch("\n");
880 }
881
882 #endif