have each GC thread call GetRoots()
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2006
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24 #include "Trace.h"
25
26 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
27
28 static INLINE_ME void bump_hd (StgSparkPool *p)
29 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
30
31 static INLINE_ME void bump_tl (StgSparkPool *p)
32 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
33
34 /* -----------------------------------------------------------------------------
35  * 
36  * Initialising spark pools.
37  *
38  * -------------------------------------------------------------------------- */
39
40 static void 
41 initSparkPool(StgSparkPool *pool)
42 {
43     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
44                                 * sizeof(StgClosure *),
45                                 "initSparkPools");
46     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
47     pool->hd  = pool->base;
48     pool->tl  = pool->base;
49 }
50
51 void
52 initSparkPools( void )
53 {
54 #ifdef THREADED_RTS
55     /* walk over the capabilities, allocating a spark pool for each one */
56     nat i;
57     for (i = 0; i < n_capabilities; i++) {
58         initSparkPool(&capabilities[i].r.rSparks);
59     }
60 #else
61     /* allocate a single spark pool */
62     initSparkPool(&MainCapability.r.rSparks);
63 #endif
64 }
65
66 void
67 freeSparkPool(StgSparkPool *pool) {
68     stgFree(pool->base);
69 }
70
71 /* -----------------------------------------------------------------------------
72  * 
73  * findSpark: find a spark on the current Capability that we can fork
74  * into a thread.
75  *
76  * -------------------------------------------------------------------------- */
77
78 StgClosure *
79 findSpark (Capability *cap)
80 {
81     StgSparkPool *pool;
82     StgClosure *spark;
83     
84     pool = &(cap->r.rSparks);
85     ASSERT_SPARK_POOL_INVARIANTS(pool);
86
87     while (pool->hd != pool->tl) {
88         spark = *pool->hd;
89         bump_hd(pool);
90         if (closure_SHOULD_SPARK(spark)) {
91 #ifdef GRAN
92             if (RtsFlags.ParFlags.ParStats.Sparks) 
93                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
94                                  GR_STEALING, ((StgTSO *)NULL), spark, 
95                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
96 #endif
97             return spark;
98         }
99     }
100     // spark pool is now empty
101     return NULL;
102 }
103
104 /* -----------------------------------------------------------------------------
105  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
106  * implicit slide i.e. after marking all sparks are at the beginning of the
107  * spark pool and the spark pool only contains sparkable closures 
108  * -------------------------------------------------------------------------- */
109
110 void
111 markSparkQueue (evac_fn evac, Capability *cap)
112
113     StgClosure **sparkp, **to_sparkp;
114     nat n, pruned_sparks; // stats only
115     StgSparkPool *pool;
116     
117     PAR_TICKY_MARK_SPARK_QUEUE_START();
118     
119     n = 0;
120     pruned_sparks = 0;
121     
122     pool = &(cap->r.rSparks);
123     
124     ASSERT_SPARK_POOL_INVARIANTS(pool);
125     
126 #if defined(PARALLEL_HASKELL)
127     // stats only
128     n = 0;
129     pruned_sparks = 0;
130 #endif
131         
132     sparkp = pool->hd;
133     to_sparkp = pool->hd;
134     while (sparkp != pool->tl) {
135         ASSERT(*sparkp!=NULL);
136         ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
137         // ToDo?: statistics gathering here (also for GUM!)
138         if (closure_SHOULD_SPARK(*sparkp)) {
139             evac(sparkp);
140             *to_sparkp++ = *sparkp;
141             if (to_sparkp == pool->lim) {
142                 to_sparkp = pool->base;
143             }
144             n++;
145         } else {
146             pruned_sparks++;
147         }
148         sparkp++;
149         if (sparkp == pool->lim) {
150             sparkp = pool->base;
151         }
152     }
153     pool->tl = to_sparkp;
154         
155     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
156         
157 #if defined(PARALLEL_HASKELL)
158     debugTrace(DEBUG_sched, 
159                "marked %d sparks and pruned %d sparks on [%x]",
160                n, pruned_sparks, mytid);
161 #else
162     debugTrace(DEBUG_sched, 
163                "marked %d sparks and pruned %d sparks",
164                n, pruned_sparks);
165 #endif
166     
167     debugTrace(DEBUG_sched,
168                "new spark queue len=%d; (hd=%p; tl=%p)\n",
169                sparkPoolSize(pool), pool->hd, pool->tl);
170 }
171
172 /* -----------------------------------------------------------------------------
173  * 
174  * Turn a spark into a real thread
175  *
176  * -------------------------------------------------------------------------- */
177
178 void
179 createSparkThread (Capability *cap, StgClosure *p)
180 {
181     StgTSO *tso;
182
183     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
184     appendToRunQueue(cap,tso);
185 }
186
187 /* -----------------------------------------------------------------------------
188  * 
189  * Create a new spark
190  *
191  * -------------------------------------------------------------------------- */
192
193 #define DISCARD_NEW
194
195 StgInt
196 newSpark (StgRegTable *reg, StgClosure *p)
197 {
198     StgSparkPool *pool = &(reg->rSparks);
199
200     /* I am not sure whether this is the right thing to do.
201      * Maybe it is better to exploit the tag information
202      * instead of throwing it away?
203      */
204     p = UNTAG_CLOSURE(p);
205
206     ASSERT_SPARK_POOL_INVARIANTS(pool);
207
208     if (closure_SHOULD_SPARK(p)) {
209 #ifdef DISCARD_NEW
210         StgClosure **new_tl;
211         new_tl = pool->tl + 1;
212         if (new_tl == pool->lim) { new_tl = pool->base; }
213         if (new_tl != pool->hd) {
214             *pool->tl = p;
215             pool->tl = new_tl;
216         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
217             // if the old closure is not sparkable, discard it and
218             // keep the new one.  Otherwise, keep the old one.
219             *pool->tl = p;
220             bump_hd(pool);
221         }
222 #else  /* DISCARD OLD */
223         *pool->tl = p;
224         bump_tl(pool);
225         if (pool->tl == pool->hd) { bump_hd(pool); }
226 #endif
227     }   
228
229     ASSERT_SPARK_POOL_INVARIANTS(pool);
230     return 1;
231 }
232
233 #else
234
235 StgInt
236 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
237 {
238     /* nothing */
239     return 1;
240 }
241
242 #endif /* PARALLEL_HASKELL || THREADED_RTS */
243
244
245 /* -----------------------------------------------------------------------------
246  * 
247  * GRAN & PARALLEL_HASKELL stuff beyond here.
248  *
249  * -------------------------------------------------------------------------- */
250
251 #if defined(PARALLEL_HASKELL) || defined(GRAN)
252
253 static void slide_spark_pool( StgSparkPool *pool );
254
255 rtsBool
256 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
257 {
258   if (pool->tl == pool->lim)
259     slide_spark_pool(pool);
260
261   if (closure_SHOULD_SPARK(closure) && 
262       pool->tl < pool->lim) {
263     *(pool->tl++) = closure;
264
265 #if defined(PARALLEL_HASKELL)
266     // collect parallel global statistics (currently done together with GC stats)
267     if (RtsFlags.ParFlags.ParStats.Global &&
268         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
269       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
270       globalParStats.tot_sparks_created++;
271     }
272 #endif
273     return rtsTrue;
274   } else {
275 #if defined(PARALLEL_HASKELL)
276     // collect parallel global statistics (currently done together with GC stats)
277     if (RtsFlags.ParFlags.ParStats.Global &&
278         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
279       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
280       globalParStats.tot_sparks_ignored++;
281     }
282 #endif
283     return rtsFalse;
284   }
285 }
286
287 static void
288 slide_spark_pool( StgSparkPool *pool )
289 {
290   StgClosure **sparkp, **to_sparkp;
291
292   sparkp = pool->hd;
293   to_sparkp = pool->base;
294   while (sparkp < pool->tl) {
295     ASSERT(to_sparkp<=sparkp);
296     ASSERT(*sparkp!=NULL);
297     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
298
299     if (closure_SHOULD_SPARK(*sparkp)) {
300       *to_sparkp++ = *sparkp++;
301     } else {
302       sparkp++;
303     }
304   }
305   pool->hd = pool->base;
306   pool->tl = to_sparkp;
307 }
308
309 void
310 disposeSpark(spark)
311 StgClosure *spark;
312 {
313 #if !defined(THREADED_RTS)
314   Capability *cap;
315   StgSparkPool *pool;
316
317   cap = &MainRegTable;
318   pool = &(cap->rSparks);
319   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
320 #endif
321   ASSERT(spark != (StgClosure *)NULL);
322   /* Do nothing */
323 }
324
325
326 #elif defined(GRAN)
327
328 /* 
329    Search the spark queue of the proc in event for a spark that's worth
330    turning into a thread 
331    (was gimme_spark in the old RTS)
332 */
333 void
334 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
335 {
336    PEs proc = event->proc,       /* proc to search for work */
337        creator = event->creator; /* proc that requested work */
338    StgClosure* node;
339    rtsBool found;
340    rtsSparkQ spark_of_non_local_node = NULL, 
341              spark_of_non_local_node_prev = NULL, 
342              low_priority_spark = NULL, 
343              low_priority_spark_prev = NULL,
344              spark = NULL, prev = NULL;
345   
346    /* Choose a spark from the local spark queue */
347    prev = (rtsSpark*)NULL;
348    spark = pending_sparks_hds[proc];
349    found = rtsFalse;
350
351    // ToDo: check this code & implement local sparking !! -- HWL  
352    while (!found && spark != (rtsSpark*)NULL)
353      {
354        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
355               (prev==(rtsSpark*)NULL || prev->next==spark) &&
356               (spark->prev==prev));
357        node = spark->node;
358        if (!closure_SHOULD_SPARK(node)) 
359          {
360            IF_GRAN_DEBUG(checkSparkQ,
361                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
362                                spark, node));
363
364            if (RtsFlags.GranFlags.GranSimStats.Sparks)
365              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
366                               spark->node, spark->name, spark_queue_len(proc));
367   
368            ASSERT(spark != (rtsSpark*)NULL);
369            ASSERT(SparksAvail>0);
370            --SparksAvail;
371
372            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
373            spark = delete_from_sparkq (spark, proc, rtsTrue);
374            if (spark != (rtsSpark*)NULL)
375              prev = spark->prev;
376            continue;
377          }
378        /* -- node should eventually be sparked */
379        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
380                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
381          {
382            barf("Local sparking not yet implemented");
383
384            /* Remember first low priority spark */
385            if (spark_of_non_local_node==(rtsSpark*)NULL) {
386              spark_of_non_local_node_prev = prev;
387              spark_of_non_local_node = spark;
388               }
389   
390            if (spark->next == (rtsSpark*)NULL) { 
391              /* ASSERT(spark==SparkQueueTl);  just for testing */
392              prev = spark_of_non_local_node_prev;
393              spark = spark_of_non_local_node;
394              found = rtsTrue;
395              break;
396            }
397   
398 # if defined(GRAN) && defined(GRAN_CHECK)
399            /* Should never happen; just for testing 
400            if (spark==pending_sparks_tl) {
401              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
402                 stg_exit(EXIT_FAILURE);
403                 } */
404 # endif
405            prev = spark; 
406            spark = spark->next;
407            ASSERT(SparksAvail>0);
408            --SparksAvail;
409            continue;
410          }
411        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
412                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
413          {
414            if (RtsFlags.GranFlags.DoPrioritySparking)
415              barf("Priority sparking not yet implemented");
416
417            found = rtsTrue;
418          }
419 #if 0      
420        else /* only used if SparkPriority2 is defined */
421          {
422            /* ToDo: fix the code below and re-integrate it */
423            /* Remember first low priority spark */
424            if (low_priority_spark==(rtsSpark*)NULL) { 
425              low_priority_spark_prev = prev;
426              low_priority_spark = spark;
427            }
428   
429            if (spark->next == (rtsSpark*)NULL) { 
430                 /* ASSERT(spark==spark_queue_tl);  just for testing */
431              prev = low_priority_spark_prev;
432              spark = low_priority_spark;
433              found = rtsTrue;       /* take low pri spark => rc is 2  */
434              break;
435            }
436   
437            /* Should never happen; just for testing 
438            if (spark==pending_sparks_tl) {
439              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
440                 stg_exit(EXIT_FAILURE);
441              break;
442            } */                
443            prev = spark; 
444            spark = spark->next;
445
446            IF_GRAN_DEBUG(pri,
447                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
448                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
449                                spark->node, spark->name);)
450            }
451 #endif
452    }  /* while (spark!=NULL && !found) */
453
454    *spark_res = spark;
455    *found_res = found;
456 }
457
458 /*
459   Turn the spark into a thread.
460   In GranSim this basically means scheduling a StartThread event for the
461   node pointed to by the spark at some point in the future.
462   (was munch_spark in the old RTS)
463 */
464 rtsBool
465 activateSpark (rtsEvent *event, rtsSparkQ spark) 
466 {
467   PEs proc = event->proc,       /* proc to search for work */
468       creator = event->creator; /* proc that requested work */
469   StgTSO* tso;
470   StgClosure* node;
471   rtsTime spark_arrival_time;
472
473   /* 
474      We've found a node on PE proc requested by PE creator.
475      If proc==creator we can turn the spark into a thread immediately;
476      otherwise we schedule a MoveSpark event on the requesting PE
477   */
478      
479   /* DaH Qu' yIchen */
480   if (proc!=creator) { 
481
482     /* only possible if we simulate GUM style fishing */
483     ASSERT(RtsFlags.GranFlags.Fishing);
484
485     /* Message packing costs for sending a Fish; qeq jabbI'ID */
486     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
487   
488     if (RtsFlags.GranFlags.GranSimStats.Sparks)
489       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
490                        (StgTSO*)NULL, spark->node,
491                        spark->name, spark_queue_len(proc));
492
493     /* time of the spark arrival on the remote PE */
494     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
495
496     new_event(creator, proc, spark_arrival_time,
497               MoveSpark,
498               (StgTSO*)NULL, spark->node, spark);
499
500     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
501             
502   } else { /* proc==creator i.e. turn the spark into a thread */
503
504     if ( RtsFlags.GranFlags.GranSimStats.Global && 
505          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
506
507       globalGranStats.tot_low_pri_sparks++;
508       IF_GRAN_DEBUG(pri,
509                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
510                           spark->gran_info, 
511                           spark->node, spark->name));
512     } 
513     
514     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
515     
516     node = spark->node;
517     
518 # if 0
519     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
520     if (GARBAGE COLLECTION IS NECESSARY) {
521       /* Some kind of backoff needed here in case there's too little heap */
522 #  if defined(GRAN_CHECK) && defined(GRAN)
523       if (RtsFlags.GcFlags.giveStats)
524         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
525                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
526                 spark, node, spark->name);
527 #  endif
528       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
529                   FindWork,
530                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
531       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
532       GarbageCollect(GetRoots, rtsFalse);
533       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
534       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
535       spark = NULL;
536       return; /* was: continue; */ /* to the next event, eventually */
537     }
538 # endif
539     
540     if (RtsFlags.GranFlags.GranSimStats.Sparks)
541       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
542                        spark->node, spark->name,
543                        spark_queue_len(CurrentProc));
544     
545     new_event(proc, proc, CurrentTime[proc],
546               StartThread, 
547               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
548     
549     procStatus[proc] = Starting;
550   }
551 }
552
553 /* -------------------------------------------------------------------------
554    This is the main point where handling granularity information comes into
555    play. 
556    ------------------------------------------------------------------------- */
557
558 #define MAX_RAND_PRI    100
559
560 /* 
561    Granularity info transformers. 
562    Applied to the GRAN_INFO field of a spark.
563 */
564 STATIC_INLINE nat  ID(nat x) { return(x); };
565 STATIC_INLINE nat  INV(nat x) { return(-x); };
566 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
567 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
568
569 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
570 rtsSpark *
571 newSpark(node,name,gran_info,size_info,par_info,local)
572 StgClosure *node;
573 nat name, gran_info, size_info, par_info, local;
574 {
575   nat pri;
576   rtsSpark *newspark;
577
578   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
579         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
580         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
581                            ID(gran_info);
582
583   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
584        pri<RtsFlags.GranFlags.SparkPriority ) {
585     IF_GRAN_DEBUG(pri,
586       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
587               pri, RtsFlags.GranFlags.SparkPriority, node, name));
588     return ((rtsSpark*)NULL);
589   }
590
591   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
592   newspark->prev = newspark->next = (rtsSpark*)NULL;
593   newspark->node = node;
594   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
595   newspark->gran_info = pri;
596   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
597
598   if (RtsFlags.GranFlags.GranSimStats.Global) {
599     globalGranStats.tot_sparks_created++;
600     globalGranStats.sparks_created_on_PE[CurrentProc]++;
601   }
602
603   return(newspark);
604 }
605
606 void
607 disposeSpark(spark)
608 rtsSpark *spark;
609 {
610   ASSERT(spark!=NULL);
611   stgFree(spark);
612 }
613
614 void 
615 disposeSparkQ(spark)
616 rtsSparkQ spark;
617 {
618   if (spark==NULL) 
619     return;
620
621   disposeSparkQ(spark->next);
622
623 # ifdef GRAN_CHECK
624   if (SparksAvail < 0) {
625     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
626     print_spark(spark);
627   }
628 # endif
629
630   stgFree(spark);
631 }
632
633 /*
634    With PrioritySparking add_to_spark_queue performs an insert sort to keep
635    the spark queue sorted. Otherwise the spark is just added to the end of
636    the queue. 
637 */
638
639 void
640 add_to_spark_queue(spark)
641 rtsSpark *spark;
642 {
643   rtsSpark *prev = NULL, *next = NULL;
644   nat count = 0;
645   rtsBool found = rtsFalse;
646
647   if ( spark == (rtsSpark *)NULL ) {
648     return;
649   }
650
651   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
652     /* Priority sparking is enabled i.e. spark queues must be sorted */
653
654     for (prev = NULL, next = pending_sparks_hd, count=0;
655          (next != NULL) && 
656          !(found = (spark->gran_info >= next->gran_info));
657          prev = next, next = next->next, count++) 
658      {}
659
660   } else {   /* 'utQo' */
661     /* Priority sparking is disabled */
662     
663     found = rtsFalse;   /* to add it at the end */
664
665   }
666
667   if (found) {
668     /* next points to the first spark with a gran_info smaller than that
669        of spark; therefore, add spark before next into the spark queue */
670     spark->next = next;
671     if ( next == NULL ) {
672       pending_sparks_tl = spark;
673     } else {
674       next->prev = spark;
675     }
676     spark->prev = prev;
677     if ( prev == NULL ) {
678       pending_sparks_hd = spark;
679     } else {
680       prev->next = spark;
681     }
682   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
683     /* add the spark at the end of the spark queue */
684     spark->next = NULL;                        
685     spark->prev = pending_sparks_tl;
686     if (pending_sparks_hd == NULL)
687       pending_sparks_hd = spark;
688     else
689       pending_sparks_tl->next = spark;
690     pending_sparks_tl = spark;    
691   } 
692   ++SparksAvail;
693
694   /* add costs for search in priority sparking */
695   if (RtsFlags.GranFlags.DoPrioritySparking) {
696     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
697   }
698
699   IF_GRAN_DEBUG(checkSparkQ,
700                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
701                       spark, spark->node, CurrentProc);
702                 print_sparkq_stats());
703
704 #  if defined(GRAN_CHECK)
705   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
706     for (prev = NULL, next =  pending_sparks_hd;
707          (next != NULL);
708          prev = next, next = next->next) 
709       {}
710     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
711       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
712               spark,CurrentProc, 
713               pending_sparks_tl, prev);
714   }
715 #  endif
716
717 #  if defined(GRAN_CHECK)
718   /* Check if the sparkq is still sorted. Just for testing, really!  */
719   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
720        RtsFlags.GranFlags.Debug.pri ) {
721     rtsBool sorted = rtsTrue;
722     rtsSpark *prev, *next;
723
724     if (pending_sparks_hd == NULL ||
725         pending_sparks_hd->next == NULL ) {
726       /* just 1 elem => ok */
727     } else {
728       for (prev = pending_sparks_hd,
729            next = pending_sparks_hd->next;
730            (next != NULL) ;
731            prev = next, next = next->next) {
732         sorted = sorted && 
733                  (prev->gran_info >= next->gran_info);
734       }
735     }
736     if (!sorted) {
737       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
738               CurrentProc);
739       print_sparkq(CurrentProc);
740     }
741   }
742 #  endif
743 }
744
745 nat
746 spark_queue_len(proc) 
747 PEs proc;
748 {
749  rtsSpark *prev, *spark;                     /* prev only for testing !! */
750  nat len;
751
752  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
753       spark != NULL; 
754       len++, prev = spark, spark = spark->next)
755    {}
756
757 #  if defined(GRAN_CHECK)
758   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
759     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
760       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
761               proc, pending_sparks_tls[proc], prev);
762 #  endif
763
764  return (len);
765 }
766
767 /* 
768    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
769    hd and tl pointers of the spark queue. Returns a pointer to the next
770    spark in the queue.
771 */
772 rtsSpark *
773 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
774 rtsSpark *spark;
775 PEs p;
776 rtsBool dispose_too;
777 {
778   rtsSpark *new_spark;
779
780   if (spark==NULL) 
781     barf("delete_from_sparkq: trying to delete NULL spark\n");
782
783 #  if defined(GRAN_CHECK)
784   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
785     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
786             pending_sparks_hd, pending_sparks_tl,
787             spark->prev, spark, spark->next, 
788             (spark->next==NULL ? 0 : spark->next->prev));
789   }
790 #  endif
791
792   if (spark->prev==NULL) {
793     /* spark is first spark of queue => adjust hd pointer */
794     ASSERT(pending_sparks_hds[p]==spark);
795     pending_sparks_hds[p] = spark->next;
796   } else {
797     spark->prev->next = spark->next;
798   }
799   if (spark->next==NULL) {
800     ASSERT(pending_sparks_tls[p]==spark);
801     /* spark is first spark of queue => adjust tl pointer */
802     pending_sparks_tls[p] = spark->prev;
803   } else {
804     spark->next->prev = spark->prev;
805   }
806   new_spark = spark->next;
807   
808 #  if defined(GRAN_CHECK)
809   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
810     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
811             pending_sparks_hd, pending_sparks_tl,
812             spark->prev, spark, spark->next, 
813             (spark->next==NULL ? 0 : spark->next->prev), spark);
814   }
815 #  endif
816
817   if (dispose_too)
818     disposeSpark(spark);
819                   
820   return new_spark;
821 }
822
823 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
824 void
825 markSparkQueue(void)
826
827   StgClosure *MarkRoot(StgClosure *root); // prototype
828   PEs p;
829   rtsSpark *sp;
830
831   for (p=0; p<RtsFlags.GranFlags.proc; p++)
832     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
833       ASSERT(sp->node!=NULL);
834       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
835       // ToDo?: statistics gathering here (also for GUM!)
836       sp->node = (StgClosure *)MarkRoot(sp->node);
837     }
838
839   IF_DEBUG(gc,
840            debugBelch("markSparkQueue: spark statistics at start of GC:");
841            print_sparkq_stats());
842 }
843
844 void
845 print_spark(spark)
846 rtsSpark *spark;
847
848   char str[16];
849
850   if (spark==NULL) {
851     debugBelch("Spark: NIL\n");
852     return;
853   } else {
854     sprintf(str,
855             ((spark->node==NULL) ? "______" : "%#6lx"), 
856             stgCast(StgPtr,spark->node));
857
858     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
859             str, spark->name, 
860             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
861             spark->prev, spark->next);
862   }
863 }
864
865 void
866 print_sparkq(proc)
867 PEs proc;
868 // rtsSpark *hd;
869 {
870   rtsSpark *x = pending_sparks_hds[proc];
871
872   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
873   for (; x!=(rtsSpark*)NULL; x=x->next) {
874     print_spark(x);
875   }
876 }
877
878 /* 
879    Print a statistics of all spark queues.
880 */
881 void
882 print_sparkq_stats(void)
883 {
884   PEs p;
885
886   debugBelch("SparkQs: [");
887   for (p=0; p<RtsFlags.GranFlags.proc; p++)
888     debugBelch(", PE %d: %d", p, spark_queue_len(p));
889   debugBelch("\n");
890 }
891
892 #endif