Add ASSERTs to all calls of nameModule
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2006
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24 #include "Trace.h"
25
26 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
27
28 static INLINE_ME void bump_hd (StgSparkPool *p)
29 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
30
31 static INLINE_ME void bump_tl (StgSparkPool *p)
32 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
33
34 /* -----------------------------------------------------------------------------
35  * 
36  * Initialising spark pools.
37  *
38  * -------------------------------------------------------------------------- */
39
40 static void 
41 initSparkPool(StgSparkPool *pool)
42 {
43     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
44                                 * sizeof(StgClosure *),
45                                 "initSparkPools");
46     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
47     pool->hd  = pool->base;
48     pool->tl  = pool->base;
49 }
50
51 void
52 initSparkPools( void )
53 {
54 #ifdef THREADED_RTS
55     /* walk over the capabilities, allocating a spark pool for each one */
56     nat i;
57     for (i = 0; i < n_capabilities; i++) {
58         initSparkPool(&capabilities[i].r.rSparks);
59     }
60 #else
61     /* allocate a single spark pool */
62     initSparkPool(&MainCapability.r.rSparks);
63 #endif
64 }
65
66 void
67 freeSparkPool(StgSparkPool *pool) {
68     stgFree(pool->base);
69 }
70
71 /* -----------------------------------------------------------------------------
72  * 
73  * findSpark: find a spark on the current Capability that we can fork
74  * into a thread.
75  *
76  * -------------------------------------------------------------------------- */
77
78 StgClosure *
79 findSpark (Capability *cap)
80 {
81     StgSparkPool *pool;
82     StgClosure *spark;
83     
84     pool = &(cap->r.rSparks);
85     ASSERT_SPARK_POOL_INVARIANTS(pool);
86
87     while (pool->hd != pool->tl) {
88         spark = *pool->hd;
89         bump_hd(pool);
90         if (closure_SHOULD_SPARK(spark)) {
91 #ifdef GRAN
92             if (RtsFlags.ParFlags.ParStats.Sparks) 
93                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
94                                  GR_STEALING, ((StgTSO *)NULL), spark, 
95                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
96 #endif
97             return spark;
98         }
99     }
100     // spark pool is now empty
101     return NULL;
102 }
103
104 /* -----------------------------------------------------------------------------
105  * 
106  * Turn a spark into a real thread
107  *
108  * -------------------------------------------------------------------------- */
109
110 void
111 createSparkThread (Capability *cap, StgClosure *p)
112 {
113     StgTSO *tso;
114
115     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
116     appendToRunQueue(cap,tso);
117 }
118
119 /* -----------------------------------------------------------------------------
120  * 
121  * Create a new spark
122  *
123  * -------------------------------------------------------------------------- */
124
125 #define DISCARD_NEW
126
127 StgInt
128 newSpark (StgRegTable *reg, StgClosure *p)
129 {
130     StgSparkPool *pool = &(reg->rSparks);
131
132     /* I am not sure whether this is the right thing to do.
133      * Maybe it is better to exploit the tag information
134      * instead of throwing it away?
135      */
136     p = UNTAG_CLOSURE(p);
137
138     ASSERT_SPARK_POOL_INVARIANTS(pool);
139
140     if (closure_SHOULD_SPARK(p)) {
141 #ifdef DISCARD_NEW
142         StgClosure **new_tl;
143         new_tl = pool->tl + 1;
144         if (new_tl == pool->lim) { new_tl = pool->base; }
145         if (new_tl != pool->hd) {
146             *pool->tl = p;
147             pool->tl = new_tl;
148         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
149             // if the old closure is not sparkable, discard it and
150             // keep the new one.  Otherwise, keep the old one.
151             *pool->tl = p;
152             bump_hd(pool);
153         }
154 #else  /* DISCARD OLD */
155         *pool->tl = p;
156         bump_tl(pool);
157         if (pool->tl == pool->hd) { bump_hd(pool); }
158 #endif
159     }   
160
161     ASSERT_SPARK_POOL_INVARIANTS(pool);
162     return 1;
163 }
164
165 /* -----------------------------------------------------------------------------
166  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
167  * implicit slide i.e. after marking all sparks are at the beginning of the
168  * spark pool and the spark pool only contains sparkable closures 
169  * -------------------------------------------------------------------------- */
170
171 static void
172 pruneSparkQueue (Capability *cap)
173
174     StgClosure *spark, **sparkp, **to_sparkp;
175     nat n, pruned_sparks; // stats only
176     StgSparkPool *pool;
177     
178     PAR_TICKY_MARK_SPARK_QUEUE_START();
179     
180     n = 0;
181     pruned_sparks = 0;
182     
183     pool = &(cap->r.rSparks);
184     
185     ASSERT_SPARK_POOL_INVARIANTS(pool);
186     
187     sparkp = pool->hd;
188     to_sparkp = pool->hd;
189     while (sparkp != pool->tl) {
190         ASSERT(*sparkp!=NULL);
191         ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
192         // ToDo?: statistics gathering here (also for GUM!)
193         spark = *sparkp;
194         if (!closure_SHOULD_SPARK(spark)) {
195             pruned_sparks++;
196         } else{
197             *to_sparkp++ = spark;
198             if (to_sparkp == pool->lim) {
199                 to_sparkp = pool->base;
200             }
201             n++;
202         }
203         sparkp++;
204         if (sparkp == pool->lim) {
205             sparkp = pool->base;
206         }
207     }
208     pool->tl = to_sparkp;
209         
210     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
211         
212     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
213     
214     debugTrace(DEBUG_sched,
215                "new spark queue len=%d; (hd=%p; tl=%p)",
216                sparkPoolSize(pool), pool->hd, pool->tl);
217 }
218
219 void
220 pruneSparkQueues (void)
221 {
222     nat i;
223     for (i = 0; i < n_capabilities; i++) {
224         pruneSparkQueue(&capabilities[i]);
225     }
226 }
227
228 void
229 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
230 {
231     StgClosure **sparkp;
232     StgSparkPool *pool;
233     
234     pool = &(cap->r.rSparks);
235     sparkp = pool->hd;
236     while (sparkp != pool->tl) {
237         evac(user, sparkp);
238         sparkp++;
239         if (sparkp == pool->lim) {
240             sparkp = pool->base;
241         }
242     }
243 }
244
245 #else
246
247 StgInt
248 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
249 {
250     /* nothing */
251     return 1;
252 }
253
254
255 #endif /* PARALLEL_HASKELL || THREADED_RTS */
256
257
258 /* -----------------------------------------------------------------------------
259  * 
260  * GRAN & PARALLEL_HASKELL stuff beyond here.
261  *
262  * -------------------------------------------------------------------------- */
263
264 #if defined(PARALLEL_HASKELL) || defined(GRAN)
265
266 static void slide_spark_pool( StgSparkPool *pool );
267
268 rtsBool
269 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
270 {
271   if (pool->tl == pool->lim)
272     slide_spark_pool(pool);
273
274   if (closure_SHOULD_SPARK(closure) && 
275       pool->tl < pool->lim) {
276     *(pool->tl++) = closure;
277
278 #if defined(PARALLEL_HASKELL)
279     // collect parallel global statistics (currently done together with GC stats)
280     if (RtsFlags.ParFlags.ParStats.Global &&
281         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
282       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
283       globalParStats.tot_sparks_created++;
284     }
285 #endif
286     return rtsTrue;
287   } else {
288 #if defined(PARALLEL_HASKELL)
289     // collect parallel global statistics (currently done together with GC stats)
290     if (RtsFlags.ParFlags.ParStats.Global &&
291         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
292       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
293       globalParStats.tot_sparks_ignored++;
294     }
295 #endif
296     return rtsFalse;
297   }
298 }
299
300 static void
301 slide_spark_pool( StgSparkPool *pool )
302 {
303   StgClosure **sparkp, **to_sparkp;
304
305   sparkp = pool->hd;
306   to_sparkp = pool->base;
307   while (sparkp < pool->tl) {
308     ASSERT(to_sparkp<=sparkp);
309     ASSERT(*sparkp!=NULL);
310     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
311
312     if (closure_SHOULD_SPARK(*sparkp)) {
313       *to_sparkp++ = *sparkp++;
314     } else {
315       sparkp++;
316     }
317   }
318   pool->hd = pool->base;
319   pool->tl = to_sparkp;
320 }
321
322 void
323 disposeSpark(spark)
324 StgClosure *spark;
325 {
326 #if !defined(THREADED_RTS)
327   Capability *cap;
328   StgSparkPool *pool;
329
330   cap = &MainRegTable;
331   pool = &(cap->rSparks);
332   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
333 #endif
334   ASSERT(spark != (StgClosure *)NULL);
335   /* Do nothing */
336 }
337
338
339 #elif defined(GRAN)
340
341 /* 
342    Search the spark queue of the proc in event for a spark that's worth
343    turning into a thread 
344    (was gimme_spark in the old RTS)
345 */
346 void
347 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
348 {
349    PEs proc = event->proc,       /* proc to search for work */
350        creator = event->creator; /* proc that requested work */
351    StgClosure* node;
352    rtsBool found;
353    rtsSparkQ spark_of_non_local_node = NULL, 
354              spark_of_non_local_node_prev = NULL, 
355              low_priority_spark = NULL, 
356              low_priority_spark_prev = NULL,
357              spark = NULL, prev = NULL;
358   
359    /* Choose a spark from the local spark queue */
360    prev = (rtsSpark*)NULL;
361    spark = pending_sparks_hds[proc];
362    found = rtsFalse;
363
364    // ToDo: check this code & implement local sparking !! -- HWL  
365    while (!found && spark != (rtsSpark*)NULL)
366      {
367        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
368               (prev==(rtsSpark*)NULL || prev->next==spark) &&
369               (spark->prev==prev));
370        node = spark->node;
371        if (!closure_SHOULD_SPARK(node)) 
372          {
373            IF_GRAN_DEBUG(checkSparkQ,
374                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
375                                spark, node));
376
377            if (RtsFlags.GranFlags.GranSimStats.Sparks)
378              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
379                               spark->node, spark->name, spark_queue_len(proc));
380   
381            ASSERT(spark != (rtsSpark*)NULL);
382            ASSERT(SparksAvail>0);
383            --SparksAvail;
384
385            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
386            spark = delete_from_sparkq (spark, proc, rtsTrue);
387            if (spark != (rtsSpark*)NULL)
388              prev = spark->prev;
389            continue;
390          }
391        /* -- node should eventually be sparked */
392        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
393                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
394          {
395            barf("Local sparking not yet implemented");
396
397            /* Remember first low priority spark */
398            if (spark_of_non_local_node==(rtsSpark*)NULL) {
399              spark_of_non_local_node_prev = prev;
400              spark_of_non_local_node = spark;
401               }
402   
403            if (spark->next == (rtsSpark*)NULL) { 
404              /* ASSERT(spark==SparkQueueTl);  just for testing */
405              prev = spark_of_non_local_node_prev;
406              spark = spark_of_non_local_node;
407              found = rtsTrue;
408              break;
409            }
410   
411 # if defined(GRAN) && defined(GRAN_CHECK)
412            /* Should never happen; just for testing 
413            if (spark==pending_sparks_tl) {
414              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
415                 stg_exit(EXIT_FAILURE);
416                 } */
417 # endif
418            prev = spark; 
419            spark = spark->next;
420            ASSERT(SparksAvail>0);
421            --SparksAvail;
422            continue;
423          }
424        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
425                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
426          {
427            if (RtsFlags.GranFlags.DoPrioritySparking)
428              barf("Priority sparking not yet implemented");
429
430            found = rtsTrue;
431          }
432 #if 0      
433        else /* only used if SparkPriority2 is defined */
434          {
435            /* ToDo: fix the code below and re-integrate it */
436            /* Remember first low priority spark */
437            if (low_priority_spark==(rtsSpark*)NULL) { 
438              low_priority_spark_prev = prev;
439              low_priority_spark = spark;
440            }
441   
442            if (spark->next == (rtsSpark*)NULL) { 
443                 /* ASSERT(spark==spark_queue_tl);  just for testing */
444              prev = low_priority_spark_prev;
445              spark = low_priority_spark;
446              found = rtsTrue;       /* take low pri spark => rc is 2  */
447              break;
448            }
449   
450            /* Should never happen; just for testing 
451            if (spark==pending_sparks_tl) {
452              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
453                 stg_exit(EXIT_FAILURE);
454              break;
455            } */                
456            prev = spark; 
457            spark = spark->next;
458
459            IF_GRAN_DEBUG(pri,
460                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
461                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
462                                spark->node, spark->name);)
463            }
464 #endif
465    }  /* while (spark!=NULL && !found) */
466
467    *spark_res = spark;
468    *found_res = found;
469 }
470
471 /*
472   Turn the spark into a thread.
473   In GranSim this basically means scheduling a StartThread event for the
474   node pointed to by the spark at some point in the future.
475   (was munch_spark in the old RTS)
476 */
477 rtsBool
478 activateSpark (rtsEvent *event, rtsSparkQ spark) 
479 {
480   PEs proc = event->proc,       /* proc to search for work */
481       creator = event->creator; /* proc that requested work */
482   StgTSO* tso;
483   StgClosure* node;
484   rtsTime spark_arrival_time;
485
486   /* 
487      We've found a node on PE proc requested by PE creator.
488      If proc==creator we can turn the spark into a thread immediately;
489      otherwise we schedule a MoveSpark event on the requesting PE
490   */
491      
492   /* DaH Qu' yIchen */
493   if (proc!=creator) { 
494
495     /* only possible if we simulate GUM style fishing */
496     ASSERT(RtsFlags.GranFlags.Fishing);
497
498     /* Message packing costs for sending a Fish; qeq jabbI'ID */
499     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
500   
501     if (RtsFlags.GranFlags.GranSimStats.Sparks)
502       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
503                        (StgTSO*)NULL, spark->node,
504                        spark->name, spark_queue_len(proc));
505
506     /* time of the spark arrival on the remote PE */
507     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
508
509     new_event(creator, proc, spark_arrival_time,
510               MoveSpark,
511               (StgTSO*)NULL, spark->node, spark);
512
513     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
514             
515   } else { /* proc==creator i.e. turn the spark into a thread */
516
517     if ( RtsFlags.GranFlags.GranSimStats.Global && 
518          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
519
520       globalGranStats.tot_low_pri_sparks++;
521       IF_GRAN_DEBUG(pri,
522                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
523                           spark->gran_info, 
524                           spark->node, spark->name));
525     } 
526     
527     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
528     
529     node = spark->node;
530     
531 # if 0
532     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
533     if (GARBAGE COLLECTION IS NECESSARY) {
534       /* Some kind of backoff needed here in case there's too little heap */
535 #  if defined(GRAN_CHECK) && defined(GRAN)
536       if (RtsFlags.GcFlags.giveStats)
537         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
538                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
539                 spark, node, spark->name);
540 #  endif
541       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
542                   FindWork,
543                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
544       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
545       GarbageCollect(GetRoots, rtsFalse);
546       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
547       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
548       spark = NULL;
549       return; /* was: continue; */ /* to the next event, eventually */
550     }
551 # endif
552     
553     if (RtsFlags.GranFlags.GranSimStats.Sparks)
554       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
555                        spark->node, spark->name,
556                        spark_queue_len(CurrentProc));
557     
558     new_event(proc, proc, CurrentTime[proc],
559               StartThread, 
560               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
561     
562     procStatus[proc] = Starting;
563   }
564 }
565
566 /* -------------------------------------------------------------------------
567    This is the main point where handling granularity information comes into
568    play. 
569    ------------------------------------------------------------------------- */
570
571 #define MAX_RAND_PRI    100
572
573 /* 
574    Granularity info transformers. 
575    Applied to the GRAN_INFO field of a spark.
576 */
577 STATIC_INLINE nat  ID(nat x) { return(x); };
578 STATIC_INLINE nat  INV(nat x) { return(-x); };
579 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
580 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
581
582 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
583 rtsSpark *
584 newSpark(node,name,gran_info,size_info,par_info,local)
585 StgClosure *node;
586 nat name, gran_info, size_info, par_info, local;
587 {
588   nat pri;
589   rtsSpark *newspark;
590
591   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
592         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
593         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
594                            ID(gran_info);
595
596   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
597        pri<RtsFlags.GranFlags.SparkPriority ) {
598     IF_GRAN_DEBUG(pri,
599       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
600               pri, RtsFlags.GranFlags.SparkPriority, node, name));
601     return ((rtsSpark*)NULL);
602   }
603
604   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
605   newspark->prev = newspark->next = (rtsSpark*)NULL;
606   newspark->node = node;
607   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
608   newspark->gran_info = pri;
609   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
610
611   if (RtsFlags.GranFlags.GranSimStats.Global) {
612     globalGranStats.tot_sparks_created++;
613     globalGranStats.sparks_created_on_PE[CurrentProc]++;
614   }
615
616   return(newspark);
617 }
618
619 void
620 disposeSpark(spark)
621 rtsSpark *spark;
622 {
623   ASSERT(spark!=NULL);
624   stgFree(spark);
625 }
626
627 void 
628 disposeSparkQ(spark)
629 rtsSparkQ spark;
630 {
631   if (spark==NULL) 
632     return;
633
634   disposeSparkQ(spark->next);
635
636 # ifdef GRAN_CHECK
637   if (SparksAvail < 0) {
638     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
639     print_spark(spark);
640   }
641 # endif
642
643   stgFree(spark);
644 }
645
646 /*
647    With PrioritySparking add_to_spark_queue performs an insert sort to keep
648    the spark queue sorted. Otherwise the spark is just added to the end of
649    the queue. 
650 */
651
652 void
653 add_to_spark_queue(spark)
654 rtsSpark *spark;
655 {
656   rtsSpark *prev = NULL, *next = NULL;
657   nat count = 0;
658   rtsBool found = rtsFalse;
659
660   if ( spark == (rtsSpark *)NULL ) {
661     return;
662   }
663
664   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
665     /* Priority sparking is enabled i.e. spark queues must be sorted */
666
667     for (prev = NULL, next = pending_sparks_hd, count=0;
668          (next != NULL) && 
669          !(found = (spark->gran_info >= next->gran_info));
670          prev = next, next = next->next, count++) 
671      {}
672
673   } else {   /* 'utQo' */
674     /* Priority sparking is disabled */
675     
676     found = rtsFalse;   /* to add it at the end */
677
678   }
679
680   if (found) {
681     /* next points to the first spark with a gran_info smaller than that
682        of spark; therefore, add spark before next into the spark queue */
683     spark->next = next;
684     if ( next == NULL ) {
685       pending_sparks_tl = spark;
686     } else {
687       next->prev = spark;
688     }
689     spark->prev = prev;
690     if ( prev == NULL ) {
691       pending_sparks_hd = spark;
692     } else {
693       prev->next = spark;
694     }
695   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
696     /* add the spark at the end of the spark queue */
697     spark->next = NULL;                        
698     spark->prev = pending_sparks_tl;
699     if (pending_sparks_hd == NULL)
700       pending_sparks_hd = spark;
701     else
702       pending_sparks_tl->next = spark;
703     pending_sparks_tl = spark;    
704   } 
705   ++SparksAvail;
706
707   /* add costs for search in priority sparking */
708   if (RtsFlags.GranFlags.DoPrioritySparking) {
709     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
710   }
711
712   IF_GRAN_DEBUG(checkSparkQ,
713                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
714                       spark, spark->node, CurrentProc);
715                 print_sparkq_stats());
716
717 #  if defined(GRAN_CHECK)
718   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
719     for (prev = NULL, next =  pending_sparks_hd;
720          (next != NULL);
721          prev = next, next = next->next) 
722       {}
723     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
724       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
725               spark,CurrentProc, 
726               pending_sparks_tl, prev);
727   }
728 #  endif
729
730 #  if defined(GRAN_CHECK)
731   /* Check if the sparkq is still sorted. Just for testing, really!  */
732   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
733        RtsFlags.GranFlags.Debug.pri ) {
734     rtsBool sorted = rtsTrue;
735     rtsSpark *prev, *next;
736
737     if (pending_sparks_hd == NULL ||
738         pending_sparks_hd->next == NULL ) {
739       /* just 1 elem => ok */
740     } else {
741       for (prev = pending_sparks_hd,
742            next = pending_sparks_hd->next;
743            (next != NULL) ;
744            prev = next, next = next->next) {
745         sorted = sorted && 
746                  (prev->gran_info >= next->gran_info);
747       }
748     }
749     if (!sorted) {
750       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
751               CurrentProc);
752       print_sparkq(CurrentProc);
753     }
754   }
755 #  endif
756 }
757
758 nat
759 spark_queue_len(proc) 
760 PEs proc;
761 {
762  rtsSpark *prev, *spark;                     /* prev only for testing !! */
763  nat len;
764
765  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
766       spark != NULL; 
767       len++, prev = spark, spark = spark->next)
768    {}
769
770 #  if defined(GRAN_CHECK)
771   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
772     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
773       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
774               proc, pending_sparks_tls[proc], prev);
775 #  endif
776
777  return (len);
778 }
779
780 /* 
781    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
782    hd and tl pointers of the spark queue. Returns a pointer to the next
783    spark in the queue.
784 */
785 rtsSpark *
786 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
787 rtsSpark *spark;
788 PEs p;
789 rtsBool dispose_too;
790 {
791   rtsSpark *new_spark;
792
793   if (spark==NULL) 
794     barf("delete_from_sparkq: trying to delete NULL spark\n");
795
796 #  if defined(GRAN_CHECK)
797   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
798     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
799             pending_sparks_hd, pending_sparks_tl,
800             spark->prev, spark, spark->next, 
801             (spark->next==NULL ? 0 : spark->next->prev));
802   }
803 #  endif
804
805   if (spark->prev==NULL) {
806     /* spark is first spark of queue => adjust hd pointer */
807     ASSERT(pending_sparks_hds[p]==spark);
808     pending_sparks_hds[p] = spark->next;
809   } else {
810     spark->prev->next = spark->next;
811   }
812   if (spark->next==NULL) {
813     ASSERT(pending_sparks_tls[p]==spark);
814     /* spark is first spark of queue => adjust tl pointer */
815     pending_sparks_tls[p] = spark->prev;
816   } else {
817     spark->next->prev = spark->prev;
818   }
819   new_spark = spark->next;
820   
821 #  if defined(GRAN_CHECK)
822   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
823     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
824             pending_sparks_hd, pending_sparks_tl,
825             spark->prev, spark, spark->next, 
826             (spark->next==NULL ? 0 : spark->next->prev), spark);
827   }
828 #  endif
829
830   if (dispose_too)
831     disposeSpark(spark);
832                   
833   return new_spark;
834 }
835
836 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
837 void
838 markSparkQueue(void)
839
840   StgClosure *MarkRoot(StgClosure *root); // prototype
841   PEs p;
842   rtsSpark *sp;
843
844   for (p=0; p<RtsFlags.GranFlags.proc; p++)
845     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
846       ASSERT(sp->node!=NULL);
847       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
848       // ToDo?: statistics gathering here (also for GUM!)
849       sp->node = (StgClosure *)MarkRoot(sp->node);
850     }
851
852   IF_DEBUG(gc,
853            debugBelch("markSparkQueue: spark statistics at start of GC:");
854            print_sparkq_stats());
855 }
856
857 void
858 print_spark(spark)
859 rtsSpark *spark;
860
861   char str[16];
862
863   if (spark==NULL) {
864     debugBelch("Spark: NIL\n");
865     return;
866   } else {
867     sprintf(str,
868             ((spark->node==NULL) ? "______" : "%#6lx"), 
869             stgCast(StgPtr,spark->node));
870
871     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
872             str, spark->name, 
873             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
874             spark->prev, spark->next);
875   }
876 }
877
878 void
879 print_sparkq(proc)
880 PEs proc;
881 // rtsSpark *hd;
882 {
883   rtsSpark *x = pending_sparks_hds[proc];
884
885   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
886   for (; x!=(rtsSpark*)NULL; x=x->next) {
887     print_spark(x);
888   }
889 }
890
891 /* 
892    Print a statistics of all spark queues.
893 */
894 void
895 print_sparkq_stats(void)
896 {
897   PEs p;
898
899   debugBelch("SparkQs: [");
900   for (p=0; p<RtsFlags.GranFlags.proc; p++)
901     debugBelch(", PE %d: %d", p, spark_queue_len(p));
902   debugBelch("\n");
903 }
904
905 #endif