debug message tweaks
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2006
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24 #include "Trace.h"
25
26 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
27
28 static INLINE_ME void bump_hd (StgSparkPool *p)
29 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
30
31 static INLINE_ME void bump_tl (StgSparkPool *p)
32 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
33
34 /* -----------------------------------------------------------------------------
35  * 
36  * Initialising spark pools.
37  *
38  * -------------------------------------------------------------------------- */
39
40 static void 
41 initSparkPool(StgSparkPool *pool)
42 {
43     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
44                                 * sizeof(StgClosure *),
45                                 "initSparkPools");
46     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
47     pool->hd  = pool->base;
48     pool->tl  = pool->base;
49 }
50
51 void
52 initSparkPools( void )
53 {
54 #ifdef THREADED_RTS
55     /* walk over the capabilities, allocating a spark pool for each one */
56     nat i;
57     for (i = 0; i < n_capabilities; i++) {
58         initSparkPool(&capabilities[i].r.rSparks);
59     }
60 #else
61     /* allocate a single spark pool */
62     initSparkPool(&MainCapability.r.rSparks);
63 #endif
64 }
65
66 void
67 freeSparkPool(StgSparkPool *pool) {
68     stgFree(pool->base);
69 }
70
71 /* -----------------------------------------------------------------------------
72  * 
73  * findSpark: find a spark on the current Capability that we can fork
74  * into a thread.
75  *
76  * -------------------------------------------------------------------------- */
77
78 StgClosure *
79 findSpark (Capability *cap)
80 {
81     StgSparkPool *pool;
82     StgClosure *spark;
83     
84     pool = &(cap->r.rSparks);
85     ASSERT_SPARK_POOL_INVARIANTS(pool);
86
87     while (pool->hd != pool->tl) {
88         spark = *pool->hd;
89         bump_hd(pool);
90         if (closure_SHOULD_SPARK(spark)) {
91 #ifdef GRAN
92             if (RtsFlags.ParFlags.ParStats.Sparks) 
93                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
94                                  GR_STEALING, ((StgTSO *)NULL), spark, 
95                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
96 #endif
97             return spark;
98         }
99     }
100     // spark pool is now empty
101     return NULL;
102 }
103
104 /* -----------------------------------------------------------------------------
105  * 
106  * Turn a spark into a real thread
107  *
108  * -------------------------------------------------------------------------- */
109
110 void
111 createSparkThread (Capability *cap, StgClosure *p)
112 {
113     StgTSO *tso;
114
115     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
116     appendToRunQueue(cap,tso);
117 }
118
119 /* -----------------------------------------------------------------------------
120  * 
121  * Create a new spark
122  *
123  * -------------------------------------------------------------------------- */
124
125 #define DISCARD_NEW
126
127 StgInt
128 newSpark (StgRegTable *reg, StgClosure *p)
129 {
130     StgSparkPool *pool = &(reg->rSparks);
131
132     /* I am not sure whether this is the right thing to do.
133      * Maybe it is better to exploit the tag information
134      * instead of throwing it away?
135      */
136     p = UNTAG_CLOSURE(p);
137
138     ASSERT_SPARK_POOL_INVARIANTS(pool);
139
140     if (closure_SHOULD_SPARK(p)) {
141 #ifdef DISCARD_NEW
142         StgClosure **new_tl;
143         new_tl = pool->tl + 1;
144         if (new_tl == pool->lim) { new_tl = pool->base; }
145         if (new_tl != pool->hd) {
146             *pool->tl = p;
147             pool->tl = new_tl;
148         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
149             // if the old closure is not sparkable, discard it and
150             // keep the new one.  Otherwise, keep the old one.
151             *pool->tl = p;
152             bump_hd(pool);
153         }
154 #else  /* DISCARD OLD */
155         *pool->tl = p;
156         bump_tl(pool);
157         if (pool->tl == pool->hd) { bump_hd(pool); }
158 #endif
159     }   
160
161     ASSERT_SPARK_POOL_INVARIANTS(pool);
162     return 1;
163 }
164
165 /* -----------------------------------------------------------------------------
166  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
167  * implicit slide i.e. after marking all sparks are at the beginning of the
168  * spark pool and the spark pool only contains sparkable closures 
169  * -------------------------------------------------------------------------- */
170
171 void
172 updateSparkQueue (Capability *cap)
173
174     StgClosure *spark, **sparkp, **to_sparkp;
175     nat n, pruned_sparks; // stats only
176     StgSparkPool *pool;
177     
178     PAR_TICKY_MARK_SPARK_QUEUE_START();
179     
180     n = 0;
181     pruned_sparks = 0;
182     
183     pool = &(cap->r.rSparks);
184     
185     ASSERT_SPARK_POOL_INVARIANTS(pool);
186     
187     sparkp = pool->hd;
188     to_sparkp = pool->hd;
189     while (sparkp != pool->tl) {
190         ASSERT(*sparkp!=NULL);
191         ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
192         // ToDo?: statistics gathering here (also for GUM!)
193         spark = isAlive(*sparkp);
194         if (spark != NULL && closure_SHOULD_SPARK(spark)) {
195             *to_sparkp++ = spark;
196             if (to_sparkp == pool->lim) {
197                 to_sparkp = pool->base;
198             }
199             n++;
200         } else {
201             pruned_sparks++;
202         }
203         sparkp++;
204         if (sparkp == pool->lim) {
205             sparkp = pool->base;
206         }
207     }
208     pool->tl = to_sparkp;
209         
210     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
211         
212     debugTrace(DEBUG_sched, 
213                "updated %d sparks and pruned %d sparks",
214                n, pruned_sparks);
215     
216     debugTrace(DEBUG_sched,
217                "new spark queue len=%d; (hd=%p; tl=%p)",
218                sparkPoolSize(pool), pool->hd, pool->tl);
219 }
220
221 void
222 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
223 {
224     StgClosure **sparkp;
225     StgSparkPool *pool;
226     
227     pool = &(cap->r.rSparks);
228     sparkp = pool->hd;
229     while (sparkp != pool->tl) {
230         evac(sparkp, user);
231         sparkp++;
232         if (sparkp == pool->lim) {
233             sparkp = pool->base;
234         }
235     }
236 }
237
238 #else
239
240 StgInt
241 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
242 {
243     /* nothing */
244     return 1;
245 }
246
247
248 #endif /* PARALLEL_HASKELL || THREADED_RTS */
249
250
251 /* -----------------------------------------------------------------------------
252  * 
253  * GRAN & PARALLEL_HASKELL stuff beyond here.
254  *
255  * -------------------------------------------------------------------------- */
256
257 #if defined(PARALLEL_HASKELL) || defined(GRAN)
258
259 static void slide_spark_pool( StgSparkPool *pool );
260
261 rtsBool
262 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
263 {
264   if (pool->tl == pool->lim)
265     slide_spark_pool(pool);
266
267   if (closure_SHOULD_SPARK(closure) && 
268       pool->tl < pool->lim) {
269     *(pool->tl++) = closure;
270
271 #if defined(PARALLEL_HASKELL)
272     // collect parallel global statistics (currently done together with GC stats)
273     if (RtsFlags.ParFlags.ParStats.Global &&
274         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
275       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
276       globalParStats.tot_sparks_created++;
277     }
278 #endif
279     return rtsTrue;
280   } else {
281 #if defined(PARALLEL_HASKELL)
282     // collect parallel global statistics (currently done together with GC stats)
283     if (RtsFlags.ParFlags.ParStats.Global &&
284         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
285       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
286       globalParStats.tot_sparks_ignored++;
287     }
288 #endif
289     return rtsFalse;
290   }
291 }
292
293 static void
294 slide_spark_pool( StgSparkPool *pool )
295 {
296   StgClosure **sparkp, **to_sparkp;
297
298   sparkp = pool->hd;
299   to_sparkp = pool->base;
300   while (sparkp < pool->tl) {
301     ASSERT(to_sparkp<=sparkp);
302     ASSERT(*sparkp!=NULL);
303     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
304
305     if (closure_SHOULD_SPARK(*sparkp)) {
306       *to_sparkp++ = *sparkp++;
307     } else {
308       sparkp++;
309     }
310   }
311   pool->hd = pool->base;
312   pool->tl = to_sparkp;
313 }
314
315 void
316 disposeSpark(spark)
317 StgClosure *spark;
318 {
319 #if !defined(THREADED_RTS)
320   Capability *cap;
321   StgSparkPool *pool;
322
323   cap = &MainRegTable;
324   pool = &(cap->rSparks);
325   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
326 #endif
327   ASSERT(spark != (StgClosure *)NULL);
328   /* Do nothing */
329 }
330
331
332 #elif defined(GRAN)
333
334 /* 
335    Search the spark queue of the proc in event for a spark that's worth
336    turning into a thread 
337    (was gimme_spark in the old RTS)
338 */
339 void
340 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
341 {
342    PEs proc = event->proc,       /* proc to search for work */
343        creator = event->creator; /* proc that requested work */
344    StgClosure* node;
345    rtsBool found;
346    rtsSparkQ spark_of_non_local_node = NULL, 
347              spark_of_non_local_node_prev = NULL, 
348              low_priority_spark = NULL, 
349              low_priority_spark_prev = NULL,
350              spark = NULL, prev = NULL;
351   
352    /* Choose a spark from the local spark queue */
353    prev = (rtsSpark*)NULL;
354    spark = pending_sparks_hds[proc];
355    found = rtsFalse;
356
357    // ToDo: check this code & implement local sparking !! -- HWL  
358    while (!found && spark != (rtsSpark*)NULL)
359      {
360        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
361               (prev==(rtsSpark*)NULL || prev->next==spark) &&
362               (spark->prev==prev));
363        node = spark->node;
364        if (!closure_SHOULD_SPARK(node)) 
365          {
366            IF_GRAN_DEBUG(checkSparkQ,
367                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
368                                spark, node));
369
370            if (RtsFlags.GranFlags.GranSimStats.Sparks)
371              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
372                               spark->node, spark->name, spark_queue_len(proc));
373   
374            ASSERT(spark != (rtsSpark*)NULL);
375            ASSERT(SparksAvail>0);
376            --SparksAvail;
377
378            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
379            spark = delete_from_sparkq (spark, proc, rtsTrue);
380            if (spark != (rtsSpark*)NULL)
381              prev = spark->prev;
382            continue;
383          }
384        /* -- node should eventually be sparked */
385        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
386                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
387          {
388            barf("Local sparking not yet implemented");
389
390            /* Remember first low priority spark */
391            if (spark_of_non_local_node==(rtsSpark*)NULL) {
392              spark_of_non_local_node_prev = prev;
393              spark_of_non_local_node = spark;
394               }
395   
396            if (spark->next == (rtsSpark*)NULL) { 
397              /* ASSERT(spark==SparkQueueTl);  just for testing */
398              prev = spark_of_non_local_node_prev;
399              spark = spark_of_non_local_node;
400              found = rtsTrue;
401              break;
402            }
403   
404 # if defined(GRAN) && defined(GRAN_CHECK)
405            /* Should never happen; just for testing 
406            if (spark==pending_sparks_tl) {
407              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
408                 stg_exit(EXIT_FAILURE);
409                 } */
410 # endif
411            prev = spark; 
412            spark = spark->next;
413            ASSERT(SparksAvail>0);
414            --SparksAvail;
415            continue;
416          }
417        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
418                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
419          {
420            if (RtsFlags.GranFlags.DoPrioritySparking)
421              barf("Priority sparking not yet implemented");
422
423            found = rtsTrue;
424          }
425 #if 0      
426        else /* only used if SparkPriority2 is defined */
427          {
428            /* ToDo: fix the code below and re-integrate it */
429            /* Remember first low priority spark */
430            if (low_priority_spark==(rtsSpark*)NULL) { 
431              low_priority_spark_prev = prev;
432              low_priority_spark = spark;
433            }
434   
435            if (spark->next == (rtsSpark*)NULL) { 
436                 /* ASSERT(spark==spark_queue_tl);  just for testing */
437              prev = low_priority_spark_prev;
438              spark = low_priority_spark;
439              found = rtsTrue;       /* take low pri spark => rc is 2  */
440              break;
441            }
442   
443            /* Should never happen; just for testing 
444            if (spark==pending_sparks_tl) {
445              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
446                 stg_exit(EXIT_FAILURE);
447              break;
448            } */                
449            prev = spark; 
450            spark = spark->next;
451
452            IF_GRAN_DEBUG(pri,
453                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
454                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
455                                spark->node, spark->name);)
456            }
457 #endif
458    }  /* while (spark!=NULL && !found) */
459
460    *spark_res = spark;
461    *found_res = found;
462 }
463
464 /*
465   Turn the spark into a thread.
466   In GranSim this basically means scheduling a StartThread event for the
467   node pointed to by the spark at some point in the future.
468   (was munch_spark in the old RTS)
469 */
470 rtsBool
471 activateSpark (rtsEvent *event, rtsSparkQ spark) 
472 {
473   PEs proc = event->proc,       /* proc to search for work */
474       creator = event->creator; /* proc that requested work */
475   StgTSO* tso;
476   StgClosure* node;
477   rtsTime spark_arrival_time;
478
479   /* 
480      We've found a node on PE proc requested by PE creator.
481      If proc==creator we can turn the spark into a thread immediately;
482      otherwise we schedule a MoveSpark event on the requesting PE
483   */
484      
485   /* DaH Qu' yIchen */
486   if (proc!=creator) { 
487
488     /* only possible if we simulate GUM style fishing */
489     ASSERT(RtsFlags.GranFlags.Fishing);
490
491     /* Message packing costs for sending a Fish; qeq jabbI'ID */
492     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
493   
494     if (RtsFlags.GranFlags.GranSimStats.Sparks)
495       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
496                        (StgTSO*)NULL, spark->node,
497                        spark->name, spark_queue_len(proc));
498
499     /* time of the spark arrival on the remote PE */
500     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
501
502     new_event(creator, proc, spark_arrival_time,
503               MoveSpark,
504               (StgTSO*)NULL, spark->node, spark);
505
506     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
507             
508   } else { /* proc==creator i.e. turn the spark into a thread */
509
510     if ( RtsFlags.GranFlags.GranSimStats.Global && 
511          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
512
513       globalGranStats.tot_low_pri_sparks++;
514       IF_GRAN_DEBUG(pri,
515                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
516                           spark->gran_info, 
517                           spark->node, spark->name));
518     } 
519     
520     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
521     
522     node = spark->node;
523     
524 # if 0
525     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
526     if (GARBAGE COLLECTION IS NECESSARY) {
527       /* Some kind of backoff needed here in case there's too little heap */
528 #  if defined(GRAN_CHECK) && defined(GRAN)
529       if (RtsFlags.GcFlags.giveStats)
530         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
531                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
532                 spark, node, spark->name);
533 #  endif
534       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
535                   FindWork,
536                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
537       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
538       GarbageCollect(GetRoots, rtsFalse);
539       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
540       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
541       spark = NULL;
542       return; /* was: continue; */ /* to the next event, eventually */
543     }
544 # endif
545     
546     if (RtsFlags.GranFlags.GranSimStats.Sparks)
547       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
548                        spark->node, spark->name,
549                        spark_queue_len(CurrentProc));
550     
551     new_event(proc, proc, CurrentTime[proc],
552               StartThread, 
553               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
554     
555     procStatus[proc] = Starting;
556   }
557 }
558
559 /* -------------------------------------------------------------------------
560    This is the main point where handling granularity information comes into
561    play. 
562    ------------------------------------------------------------------------- */
563
564 #define MAX_RAND_PRI    100
565
566 /* 
567    Granularity info transformers. 
568    Applied to the GRAN_INFO field of a spark.
569 */
570 STATIC_INLINE nat  ID(nat x) { return(x); };
571 STATIC_INLINE nat  INV(nat x) { return(-x); };
572 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
573 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
574
575 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
576 rtsSpark *
577 newSpark(node,name,gran_info,size_info,par_info,local)
578 StgClosure *node;
579 nat name, gran_info, size_info, par_info, local;
580 {
581   nat pri;
582   rtsSpark *newspark;
583
584   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
585         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
586         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
587                            ID(gran_info);
588
589   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
590        pri<RtsFlags.GranFlags.SparkPriority ) {
591     IF_GRAN_DEBUG(pri,
592       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
593               pri, RtsFlags.GranFlags.SparkPriority, node, name));
594     return ((rtsSpark*)NULL);
595   }
596
597   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
598   newspark->prev = newspark->next = (rtsSpark*)NULL;
599   newspark->node = node;
600   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
601   newspark->gran_info = pri;
602   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
603
604   if (RtsFlags.GranFlags.GranSimStats.Global) {
605     globalGranStats.tot_sparks_created++;
606     globalGranStats.sparks_created_on_PE[CurrentProc]++;
607   }
608
609   return(newspark);
610 }
611
612 void
613 disposeSpark(spark)
614 rtsSpark *spark;
615 {
616   ASSERT(spark!=NULL);
617   stgFree(spark);
618 }
619
620 void 
621 disposeSparkQ(spark)
622 rtsSparkQ spark;
623 {
624   if (spark==NULL) 
625     return;
626
627   disposeSparkQ(spark->next);
628
629 # ifdef GRAN_CHECK
630   if (SparksAvail < 0) {
631     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
632     print_spark(spark);
633   }
634 # endif
635
636   stgFree(spark);
637 }
638
639 /*
640    With PrioritySparking add_to_spark_queue performs an insert sort to keep
641    the spark queue sorted. Otherwise the spark is just added to the end of
642    the queue. 
643 */
644
645 void
646 add_to_spark_queue(spark)
647 rtsSpark *spark;
648 {
649   rtsSpark *prev = NULL, *next = NULL;
650   nat count = 0;
651   rtsBool found = rtsFalse;
652
653   if ( spark == (rtsSpark *)NULL ) {
654     return;
655   }
656
657   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
658     /* Priority sparking is enabled i.e. spark queues must be sorted */
659
660     for (prev = NULL, next = pending_sparks_hd, count=0;
661          (next != NULL) && 
662          !(found = (spark->gran_info >= next->gran_info));
663          prev = next, next = next->next, count++) 
664      {}
665
666   } else {   /* 'utQo' */
667     /* Priority sparking is disabled */
668     
669     found = rtsFalse;   /* to add it at the end */
670
671   }
672
673   if (found) {
674     /* next points to the first spark with a gran_info smaller than that
675        of spark; therefore, add spark before next into the spark queue */
676     spark->next = next;
677     if ( next == NULL ) {
678       pending_sparks_tl = spark;
679     } else {
680       next->prev = spark;
681     }
682     spark->prev = prev;
683     if ( prev == NULL ) {
684       pending_sparks_hd = spark;
685     } else {
686       prev->next = spark;
687     }
688   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
689     /* add the spark at the end of the spark queue */
690     spark->next = NULL;                        
691     spark->prev = pending_sparks_tl;
692     if (pending_sparks_hd == NULL)
693       pending_sparks_hd = spark;
694     else
695       pending_sparks_tl->next = spark;
696     pending_sparks_tl = spark;    
697   } 
698   ++SparksAvail;
699
700   /* add costs for search in priority sparking */
701   if (RtsFlags.GranFlags.DoPrioritySparking) {
702     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
703   }
704
705   IF_GRAN_DEBUG(checkSparkQ,
706                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
707                       spark, spark->node, CurrentProc);
708                 print_sparkq_stats());
709
710 #  if defined(GRAN_CHECK)
711   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
712     for (prev = NULL, next =  pending_sparks_hd;
713          (next != NULL);
714          prev = next, next = next->next) 
715       {}
716     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
717       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
718               spark,CurrentProc, 
719               pending_sparks_tl, prev);
720   }
721 #  endif
722
723 #  if defined(GRAN_CHECK)
724   /* Check if the sparkq is still sorted. Just for testing, really!  */
725   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
726        RtsFlags.GranFlags.Debug.pri ) {
727     rtsBool sorted = rtsTrue;
728     rtsSpark *prev, *next;
729
730     if (pending_sparks_hd == NULL ||
731         pending_sparks_hd->next == NULL ) {
732       /* just 1 elem => ok */
733     } else {
734       for (prev = pending_sparks_hd,
735            next = pending_sparks_hd->next;
736            (next != NULL) ;
737            prev = next, next = next->next) {
738         sorted = sorted && 
739                  (prev->gran_info >= next->gran_info);
740       }
741     }
742     if (!sorted) {
743       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
744               CurrentProc);
745       print_sparkq(CurrentProc);
746     }
747   }
748 #  endif
749 }
750
751 nat
752 spark_queue_len(proc) 
753 PEs proc;
754 {
755  rtsSpark *prev, *spark;                     /* prev only for testing !! */
756  nat len;
757
758  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
759       spark != NULL; 
760       len++, prev = spark, spark = spark->next)
761    {}
762
763 #  if defined(GRAN_CHECK)
764   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
765     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
766       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
767               proc, pending_sparks_tls[proc], prev);
768 #  endif
769
770  return (len);
771 }
772
773 /* 
774    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
775    hd and tl pointers of the spark queue. Returns a pointer to the next
776    spark in the queue.
777 */
778 rtsSpark *
779 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
780 rtsSpark *spark;
781 PEs p;
782 rtsBool dispose_too;
783 {
784   rtsSpark *new_spark;
785
786   if (spark==NULL) 
787     barf("delete_from_sparkq: trying to delete NULL spark\n");
788
789 #  if defined(GRAN_CHECK)
790   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
791     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
792             pending_sparks_hd, pending_sparks_tl,
793             spark->prev, spark, spark->next, 
794             (spark->next==NULL ? 0 : spark->next->prev));
795   }
796 #  endif
797
798   if (spark->prev==NULL) {
799     /* spark is first spark of queue => adjust hd pointer */
800     ASSERT(pending_sparks_hds[p]==spark);
801     pending_sparks_hds[p] = spark->next;
802   } else {
803     spark->prev->next = spark->next;
804   }
805   if (spark->next==NULL) {
806     ASSERT(pending_sparks_tls[p]==spark);
807     /* spark is first spark of queue => adjust tl pointer */
808     pending_sparks_tls[p] = spark->prev;
809   } else {
810     spark->next->prev = spark->prev;
811   }
812   new_spark = spark->next;
813   
814 #  if defined(GRAN_CHECK)
815   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
816     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
817             pending_sparks_hd, pending_sparks_tl,
818             spark->prev, spark, spark->next, 
819             (spark->next==NULL ? 0 : spark->next->prev), spark);
820   }
821 #  endif
822
823   if (dispose_too)
824     disposeSpark(spark);
825                   
826   return new_spark;
827 }
828
829 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
830 void
831 markSparkQueue(void)
832
833   StgClosure *MarkRoot(StgClosure *root); // prototype
834   PEs p;
835   rtsSpark *sp;
836
837   for (p=0; p<RtsFlags.GranFlags.proc; p++)
838     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
839       ASSERT(sp->node!=NULL);
840       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
841       // ToDo?: statistics gathering here (also for GUM!)
842       sp->node = (StgClosure *)MarkRoot(sp->node);
843     }
844
845   IF_DEBUG(gc,
846            debugBelch("markSparkQueue: spark statistics at start of GC:");
847            print_sparkq_stats());
848 }
849
850 void
851 print_spark(spark)
852 rtsSpark *spark;
853
854   char str[16];
855
856   if (spark==NULL) {
857     debugBelch("Spark: NIL\n");
858     return;
859   } else {
860     sprintf(str,
861             ((spark->node==NULL) ? "______" : "%#6lx"), 
862             stgCast(StgPtr,spark->node));
863
864     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
865             str, spark->name, 
866             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
867             spark->prev, spark->next);
868   }
869 }
870
871 void
872 print_sparkq(proc)
873 PEs proc;
874 // rtsSpark *hd;
875 {
876   rtsSpark *x = pending_sparks_hds[proc];
877
878   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
879   for (; x!=(rtsSpark*)NULL; x=x->next) {
880     print_spark(x);
881   }
882 }
883
884 /* 
885    Print a statistics of all spark queues.
886 */
887 void
888 print_sparkq_stats(void)
889 {
890   PEs p;
891
892   debugBelch("SparkQs: [");
893   for (p=0; p<RtsFlags.GranFlags.proc; p++)
894     debugBelch(", PE %d: %d", p, spark_queue_len(p));
895   debugBelch("\n");
896 }
897
898 #endif