Detab TcUnify
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2006
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24 #include "Trace.h"
25
26 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
27
28 static INLINE_ME void bump_hd (StgSparkPool *p)
29 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
30
31 static INLINE_ME void bump_tl (StgSparkPool *p)
32 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
33
34 /* -----------------------------------------------------------------------------
35  * 
36  * Initialising spark pools.
37  *
38  * -------------------------------------------------------------------------- */
39
40 static void 
41 initSparkPool(StgSparkPool *pool)
42 {
43     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
44                                 * sizeof(StgClosure *),
45                                 "initSparkPools");
46     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
47     pool->hd  = pool->base;
48     pool->tl  = pool->base;
49 }
50
51 void
52 initSparkPools( void )
53 {
54 #ifdef THREADED_RTS
55     /* walk over the capabilities, allocating a spark pool for each one */
56     nat i;
57     for (i = 0; i < n_capabilities; i++) {
58         initSparkPool(&capabilities[i].r.rSparks);
59     }
60 #else
61     /* allocate a single spark pool */
62     initSparkPool(&MainCapability.r.rSparks);
63 #endif
64 }
65
66 void
67 freeSparkPool(StgSparkPool *pool) {
68     stgFree(pool->base);
69 }
70
71 /* -----------------------------------------------------------------------------
72  * 
73  * findSpark: find a spark on the current Capability that we can fork
74  * into a thread.
75  *
76  * -------------------------------------------------------------------------- */
77
78 StgClosure *
79 findSpark (Capability *cap)
80 {
81     StgSparkPool *pool;
82     StgClosure *spark;
83     
84     pool = &(cap->r.rSparks);
85     ASSERT_SPARK_POOL_INVARIANTS(pool);
86
87     while (pool->hd != pool->tl) {
88         spark = *pool->hd;
89         bump_hd(pool);
90         if (closure_SHOULD_SPARK(spark)) {
91 #ifdef GRAN
92             if (RtsFlags.ParFlags.ParStats.Sparks) 
93                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
94                                  GR_STEALING, ((StgTSO *)NULL), spark, 
95                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
96 #endif
97             return spark;
98         }
99     }
100     // spark pool is now empty
101     return NULL;
102 }
103
104 /* -----------------------------------------------------------------------------
105  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
106  * implicit slide i.e. after marking all sparks are at the beginning of the
107  * spark pool and the spark pool only contains sparkable closures 
108  * -------------------------------------------------------------------------- */
109
110 void
111 markSparkQueue (evac_fn evac)
112
113     StgClosure **sparkp, **to_sparkp;
114     nat i, n, pruned_sparks; // stats only
115     StgSparkPool *pool;
116     Capability *cap;
117     
118     PAR_TICKY_MARK_SPARK_QUEUE_START();
119     
120     n = 0;
121     pruned_sparks = 0;
122     for (i = 0; i < n_capabilities; i++) {
123         cap = &capabilities[i];
124         pool = &(cap->r.rSparks);
125         
126         ASSERT_SPARK_POOL_INVARIANTS(pool);
127
128 #if defined(PARALLEL_HASKELL)
129         // stats only
130         n = 0;
131         pruned_sparks = 0;
132 #endif
133         
134         sparkp = pool->hd;
135         to_sparkp = pool->hd;
136         while (sparkp != pool->tl) {
137             ASSERT(*sparkp!=NULL);
138             ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
139             // ToDo?: statistics gathering here (also for GUM!)
140             if (closure_SHOULD_SPARK(*sparkp)) {
141                 evac(sparkp);
142                 *to_sparkp++ = *sparkp;
143                 if (to_sparkp == pool->lim) {
144                     to_sparkp = pool->base;
145                 }
146                 n++;
147             } else {
148                 pruned_sparks++;
149             }
150             sparkp++;
151             if (sparkp == pool->lim) {
152                 sparkp = pool->base;
153             }
154         }
155         pool->tl = to_sparkp;
156         
157         PAR_TICKY_MARK_SPARK_QUEUE_END(n);
158         
159 #if defined(PARALLEL_HASKELL)
160         debugTrace(DEBUG_sched, 
161                    "marked %d sparks and pruned %d sparks on [%x]",
162                    n, pruned_sparks, mytid);
163 #else
164         debugTrace(DEBUG_sched, 
165                    "marked %d sparks and pruned %d sparks",
166                    n, pruned_sparks);
167 #endif
168         
169         debugTrace(DEBUG_sched,
170                    "new spark queue len=%d; (hd=%p; tl=%p)\n",
171                    sparkPoolSize(pool), pool->hd, pool->tl);
172     }
173 }
174
175 /* -----------------------------------------------------------------------------
176  * 
177  * Turn a spark into a real thread
178  *
179  * -------------------------------------------------------------------------- */
180
181 void
182 createSparkThread (Capability *cap, StgClosure *p)
183 {
184     StgTSO *tso;
185
186     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
187     appendToRunQueue(cap,tso);
188 }
189
190 /* -----------------------------------------------------------------------------
191  * 
192  * Create a new spark
193  *
194  * -------------------------------------------------------------------------- */
195
196 #define DISCARD_NEW
197
198 StgInt
199 newSpark (StgRegTable *reg, StgClosure *p)
200 {
201     StgSparkPool *pool = &(reg->rSparks);
202
203     /* I am not sure whether this is the right thing to do.
204      * Maybe it is better to exploit the tag information
205      * instead of throwing it away?
206      */
207     p = UNTAG_CLOSURE(p);
208
209     ASSERT_SPARK_POOL_INVARIANTS(pool);
210
211     if (closure_SHOULD_SPARK(p)) {
212 #ifdef DISCARD_NEW
213         StgClosure **new_tl;
214         new_tl = pool->tl + 1;
215         if (new_tl == pool->lim) { new_tl = pool->base; }
216         if (new_tl != pool->hd) {
217             *pool->tl = p;
218             pool->tl = new_tl;
219         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
220             // if the old closure is not sparkable, discard it and
221             // keep the new one.  Otherwise, keep the old one.
222             *pool->tl = p;
223             bump_hd(pool);
224         }
225 #else  /* DISCARD OLD */
226         *pool->tl = p;
227         bump_tl(pool);
228         if (pool->tl == pool->hd) { bump_hd(pool); }
229 #endif
230     }   
231
232     ASSERT_SPARK_POOL_INVARIANTS(pool);
233     return 1;
234 }
235
236 #else
237
238 StgInt
239 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
240 {
241     /* nothing */
242     return 1;
243 }
244
245 #endif /* PARALLEL_HASKELL || THREADED_RTS */
246
247
248 /* -----------------------------------------------------------------------------
249  * 
250  * GRAN & PARALLEL_HASKELL stuff beyond here.
251  *
252  * -------------------------------------------------------------------------- */
253
254 #if defined(PARALLEL_HASKELL) || defined(GRAN)
255
256 static void slide_spark_pool( StgSparkPool *pool );
257
258 rtsBool
259 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
260 {
261   if (pool->tl == pool->lim)
262     slide_spark_pool(pool);
263
264   if (closure_SHOULD_SPARK(closure) && 
265       pool->tl < pool->lim) {
266     *(pool->tl++) = closure;
267
268 #if defined(PARALLEL_HASKELL)
269     // collect parallel global statistics (currently done together with GC stats)
270     if (RtsFlags.ParFlags.ParStats.Global &&
271         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
272       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
273       globalParStats.tot_sparks_created++;
274     }
275 #endif
276     return rtsTrue;
277   } else {
278 #if defined(PARALLEL_HASKELL)
279     // collect parallel global statistics (currently done together with GC stats)
280     if (RtsFlags.ParFlags.ParStats.Global &&
281         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
282       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
283       globalParStats.tot_sparks_ignored++;
284     }
285 #endif
286     return rtsFalse;
287   }
288 }
289
290 static void
291 slide_spark_pool( StgSparkPool *pool )
292 {
293   StgClosure **sparkp, **to_sparkp;
294
295   sparkp = pool->hd;
296   to_sparkp = pool->base;
297   while (sparkp < pool->tl) {
298     ASSERT(to_sparkp<=sparkp);
299     ASSERT(*sparkp!=NULL);
300     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
301
302     if (closure_SHOULD_SPARK(*sparkp)) {
303       *to_sparkp++ = *sparkp++;
304     } else {
305       sparkp++;
306     }
307   }
308   pool->hd = pool->base;
309   pool->tl = to_sparkp;
310 }
311
312 void
313 disposeSpark(spark)
314 StgClosure *spark;
315 {
316 #if !defined(THREADED_RTS)
317   Capability *cap;
318   StgSparkPool *pool;
319
320   cap = &MainRegTable;
321   pool = &(cap->rSparks);
322   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
323 #endif
324   ASSERT(spark != (StgClosure *)NULL);
325   /* Do nothing */
326 }
327
328
329 #elif defined(GRAN)
330
331 /* 
332    Search the spark queue of the proc in event for a spark that's worth
333    turning into a thread 
334    (was gimme_spark in the old RTS)
335 */
336 void
337 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
338 {
339    PEs proc = event->proc,       /* proc to search for work */
340        creator = event->creator; /* proc that requested work */
341    StgClosure* node;
342    rtsBool found;
343    rtsSparkQ spark_of_non_local_node = NULL, 
344              spark_of_non_local_node_prev = NULL, 
345              low_priority_spark = NULL, 
346              low_priority_spark_prev = NULL,
347              spark = NULL, prev = NULL;
348   
349    /* Choose a spark from the local spark queue */
350    prev = (rtsSpark*)NULL;
351    spark = pending_sparks_hds[proc];
352    found = rtsFalse;
353
354    // ToDo: check this code & implement local sparking !! -- HWL  
355    while (!found && spark != (rtsSpark*)NULL)
356      {
357        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
358               (prev==(rtsSpark*)NULL || prev->next==spark) &&
359               (spark->prev==prev));
360        node = spark->node;
361        if (!closure_SHOULD_SPARK(node)) 
362          {
363            IF_GRAN_DEBUG(checkSparkQ,
364                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
365                                spark, node));
366
367            if (RtsFlags.GranFlags.GranSimStats.Sparks)
368              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
369                               spark->node, spark->name, spark_queue_len(proc));
370   
371            ASSERT(spark != (rtsSpark*)NULL);
372            ASSERT(SparksAvail>0);
373            --SparksAvail;
374
375            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
376            spark = delete_from_sparkq (spark, proc, rtsTrue);
377            if (spark != (rtsSpark*)NULL)
378              prev = spark->prev;
379            continue;
380          }
381        /* -- node should eventually be sparked */
382        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
383                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
384          {
385            barf("Local sparking not yet implemented");
386
387            /* Remember first low priority spark */
388            if (spark_of_non_local_node==(rtsSpark*)NULL) {
389              spark_of_non_local_node_prev = prev;
390              spark_of_non_local_node = spark;
391               }
392   
393            if (spark->next == (rtsSpark*)NULL) { 
394              /* ASSERT(spark==SparkQueueTl);  just for testing */
395              prev = spark_of_non_local_node_prev;
396              spark = spark_of_non_local_node;
397              found = rtsTrue;
398              break;
399            }
400   
401 # if defined(GRAN) && defined(GRAN_CHECK)
402            /* Should never happen; just for testing 
403            if (spark==pending_sparks_tl) {
404              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
405                 stg_exit(EXIT_FAILURE);
406                 } */
407 # endif
408            prev = spark; 
409            spark = spark->next;
410            ASSERT(SparksAvail>0);
411            --SparksAvail;
412            continue;
413          }
414        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
415                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
416          {
417            if (RtsFlags.GranFlags.DoPrioritySparking)
418              barf("Priority sparking not yet implemented");
419
420            found = rtsTrue;
421          }
422 #if 0      
423        else /* only used if SparkPriority2 is defined */
424          {
425            /* ToDo: fix the code below and re-integrate it */
426            /* Remember first low priority spark */
427            if (low_priority_spark==(rtsSpark*)NULL) { 
428              low_priority_spark_prev = prev;
429              low_priority_spark = spark;
430            }
431   
432            if (spark->next == (rtsSpark*)NULL) { 
433                 /* ASSERT(spark==spark_queue_tl);  just for testing */
434              prev = low_priority_spark_prev;
435              spark = low_priority_spark;
436              found = rtsTrue;       /* take low pri spark => rc is 2  */
437              break;
438            }
439   
440            /* Should never happen; just for testing 
441            if (spark==pending_sparks_tl) {
442              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
443                 stg_exit(EXIT_FAILURE);
444              break;
445            } */                
446            prev = spark; 
447            spark = spark->next;
448
449            IF_GRAN_DEBUG(pri,
450                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
451                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
452                                spark->node, spark->name);)
453            }
454 #endif
455    }  /* while (spark!=NULL && !found) */
456
457    *spark_res = spark;
458    *found_res = found;
459 }
460
461 /*
462   Turn the spark into a thread.
463   In GranSim this basically means scheduling a StartThread event for the
464   node pointed to by the spark at some point in the future.
465   (was munch_spark in the old RTS)
466 */
467 rtsBool
468 activateSpark (rtsEvent *event, rtsSparkQ spark) 
469 {
470   PEs proc = event->proc,       /* proc to search for work */
471       creator = event->creator; /* proc that requested work */
472   StgTSO* tso;
473   StgClosure* node;
474   rtsTime spark_arrival_time;
475
476   /* 
477      We've found a node on PE proc requested by PE creator.
478      If proc==creator we can turn the spark into a thread immediately;
479      otherwise we schedule a MoveSpark event on the requesting PE
480   */
481      
482   /* DaH Qu' yIchen */
483   if (proc!=creator) { 
484
485     /* only possible if we simulate GUM style fishing */
486     ASSERT(RtsFlags.GranFlags.Fishing);
487
488     /* Message packing costs for sending a Fish; qeq jabbI'ID */
489     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
490   
491     if (RtsFlags.GranFlags.GranSimStats.Sparks)
492       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
493                        (StgTSO*)NULL, spark->node,
494                        spark->name, spark_queue_len(proc));
495
496     /* time of the spark arrival on the remote PE */
497     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
498
499     new_event(creator, proc, spark_arrival_time,
500               MoveSpark,
501               (StgTSO*)NULL, spark->node, spark);
502
503     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
504             
505   } else { /* proc==creator i.e. turn the spark into a thread */
506
507     if ( RtsFlags.GranFlags.GranSimStats.Global && 
508          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
509
510       globalGranStats.tot_low_pri_sparks++;
511       IF_GRAN_DEBUG(pri,
512                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
513                           spark->gran_info, 
514                           spark->node, spark->name));
515     } 
516     
517     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
518     
519     node = spark->node;
520     
521 # if 0
522     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
523     if (GARBAGE COLLECTION IS NECESSARY) {
524       /* Some kind of backoff needed here in case there's too little heap */
525 #  if defined(GRAN_CHECK) && defined(GRAN)
526       if (RtsFlags.GcFlags.giveStats)
527         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
528                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
529                 spark, node, spark->name);
530 #  endif
531       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
532                   FindWork,
533                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
534       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
535       GarbageCollect(GetRoots, rtsFalse);
536       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
537       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
538       spark = NULL;
539       return; /* was: continue; */ /* to the next event, eventually */
540     }
541 # endif
542     
543     if (RtsFlags.GranFlags.GranSimStats.Sparks)
544       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
545                        spark->node, spark->name,
546                        spark_queue_len(CurrentProc));
547     
548     new_event(proc, proc, CurrentTime[proc],
549               StartThread, 
550               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
551     
552     procStatus[proc] = Starting;
553   }
554 }
555
556 /* -------------------------------------------------------------------------
557    This is the main point where handling granularity information comes into
558    play. 
559    ------------------------------------------------------------------------- */
560
561 #define MAX_RAND_PRI    100
562
563 /* 
564    Granularity info transformers. 
565    Applied to the GRAN_INFO field of a spark.
566 */
567 STATIC_INLINE nat  ID(nat x) { return(x); };
568 STATIC_INLINE nat  INV(nat x) { return(-x); };
569 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
570 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
571
572 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
573 rtsSpark *
574 newSpark(node,name,gran_info,size_info,par_info,local)
575 StgClosure *node;
576 nat name, gran_info, size_info, par_info, local;
577 {
578   nat pri;
579   rtsSpark *newspark;
580
581   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
582         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
583         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
584                            ID(gran_info);
585
586   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
587        pri<RtsFlags.GranFlags.SparkPriority ) {
588     IF_GRAN_DEBUG(pri,
589       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
590               pri, RtsFlags.GranFlags.SparkPriority, node, name));
591     return ((rtsSpark*)NULL);
592   }
593
594   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
595   newspark->prev = newspark->next = (rtsSpark*)NULL;
596   newspark->node = node;
597   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
598   newspark->gran_info = pri;
599   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
600
601   if (RtsFlags.GranFlags.GranSimStats.Global) {
602     globalGranStats.tot_sparks_created++;
603     globalGranStats.sparks_created_on_PE[CurrentProc]++;
604   }
605
606   return(newspark);
607 }
608
609 void
610 disposeSpark(spark)
611 rtsSpark *spark;
612 {
613   ASSERT(spark!=NULL);
614   stgFree(spark);
615 }
616
617 void 
618 disposeSparkQ(spark)
619 rtsSparkQ spark;
620 {
621   if (spark==NULL) 
622     return;
623
624   disposeSparkQ(spark->next);
625
626 # ifdef GRAN_CHECK
627   if (SparksAvail < 0) {
628     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
629     print_spark(spark);
630   }
631 # endif
632
633   stgFree(spark);
634 }
635
636 /*
637    With PrioritySparking add_to_spark_queue performs an insert sort to keep
638    the spark queue sorted. Otherwise the spark is just added to the end of
639    the queue. 
640 */
641
642 void
643 add_to_spark_queue(spark)
644 rtsSpark *spark;
645 {
646   rtsSpark *prev = NULL, *next = NULL;
647   nat count = 0;
648   rtsBool found = rtsFalse;
649
650   if ( spark == (rtsSpark *)NULL ) {
651     return;
652   }
653
654   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
655     /* Priority sparking is enabled i.e. spark queues must be sorted */
656
657     for (prev = NULL, next = pending_sparks_hd, count=0;
658          (next != NULL) && 
659          !(found = (spark->gran_info >= next->gran_info));
660          prev = next, next = next->next, count++) 
661      {}
662
663   } else {   /* 'utQo' */
664     /* Priority sparking is disabled */
665     
666     found = rtsFalse;   /* to add it at the end */
667
668   }
669
670   if (found) {
671     /* next points to the first spark with a gran_info smaller than that
672        of spark; therefore, add spark before next into the spark queue */
673     spark->next = next;
674     if ( next == NULL ) {
675       pending_sparks_tl = spark;
676     } else {
677       next->prev = spark;
678     }
679     spark->prev = prev;
680     if ( prev == NULL ) {
681       pending_sparks_hd = spark;
682     } else {
683       prev->next = spark;
684     }
685   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
686     /* add the spark at the end of the spark queue */
687     spark->next = NULL;                        
688     spark->prev = pending_sparks_tl;
689     if (pending_sparks_hd == NULL)
690       pending_sparks_hd = spark;
691     else
692       pending_sparks_tl->next = spark;
693     pending_sparks_tl = spark;    
694   } 
695   ++SparksAvail;
696
697   /* add costs for search in priority sparking */
698   if (RtsFlags.GranFlags.DoPrioritySparking) {
699     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
700   }
701
702   IF_GRAN_DEBUG(checkSparkQ,
703                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
704                       spark, spark->node, CurrentProc);
705                 print_sparkq_stats());
706
707 #  if defined(GRAN_CHECK)
708   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
709     for (prev = NULL, next =  pending_sparks_hd;
710          (next != NULL);
711          prev = next, next = next->next) 
712       {}
713     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
714       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
715               spark,CurrentProc, 
716               pending_sparks_tl, prev);
717   }
718 #  endif
719
720 #  if defined(GRAN_CHECK)
721   /* Check if the sparkq is still sorted. Just for testing, really!  */
722   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
723        RtsFlags.GranFlags.Debug.pri ) {
724     rtsBool sorted = rtsTrue;
725     rtsSpark *prev, *next;
726
727     if (pending_sparks_hd == NULL ||
728         pending_sparks_hd->next == NULL ) {
729       /* just 1 elem => ok */
730     } else {
731       for (prev = pending_sparks_hd,
732            next = pending_sparks_hd->next;
733            (next != NULL) ;
734            prev = next, next = next->next) {
735         sorted = sorted && 
736                  (prev->gran_info >= next->gran_info);
737       }
738     }
739     if (!sorted) {
740       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
741               CurrentProc);
742       print_sparkq(CurrentProc);
743     }
744   }
745 #  endif
746 }
747
748 nat
749 spark_queue_len(proc) 
750 PEs proc;
751 {
752  rtsSpark *prev, *spark;                     /* prev only for testing !! */
753  nat len;
754
755  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
756       spark != NULL; 
757       len++, prev = spark, spark = spark->next)
758    {}
759
760 #  if defined(GRAN_CHECK)
761   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
762     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
763       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
764               proc, pending_sparks_tls[proc], prev);
765 #  endif
766
767  return (len);
768 }
769
770 /* 
771    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
772    hd and tl pointers of the spark queue. Returns a pointer to the next
773    spark in the queue.
774 */
775 rtsSpark *
776 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
777 rtsSpark *spark;
778 PEs p;
779 rtsBool dispose_too;
780 {
781   rtsSpark *new_spark;
782
783   if (spark==NULL) 
784     barf("delete_from_sparkq: trying to delete NULL spark\n");
785
786 #  if defined(GRAN_CHECK)
787   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
788     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
789             pending_sparks_hd, pending_sparks_tl,
790             spark->prev, spark, spark->next, 
791             (spark->next==NULL ? 0 : spark->next->prev));
792   }
793 #  endif
794
795   if (spark->prev==NULL) {
796     /* spark is first spark of queue => adjust hd pointer */
797     ASSERT(pending_sparks_hds[p]==spark);
798     pending_sparks_hds[p] = spark->next;
799   } else {
800     spark->prev->next = spark->next;
801   }
802   if (spark->next==NULL) {
803     ASSERT(pending_sparks_tls[p]==spark);
804     /* spark is first spark of queue => adjust tl pointer */
805     pending_sparks_tls[p] = spark->prev;
806   } else {
807     spark->next->prev = spark->prev;
808   }
809   new_spark = spark->next;
810   
811 #  if defined(GRAN_CHECK)
812   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
813     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
814             pending_sparks_hd, pending_sparks_tl,
815             spark->prev, spark, spark->next, 
816             (spark->next==NULL ? 0 : spark->next->prev), spark);
817   }
818 #  endif
819
820   if (dispose_too)
821     disposeSpark(spark);
822                   
823   return new_spark;
824 }
825
826 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
827 void
828 markSparkQueue(void)
829
830   StgClosure *MarkRoot(StgClosure *root); // prototype
831   PEs p;
832   rtsSpark *sp;
833
834   for (p=0; p<RtsFlags.GranFlags.proc; p++)
835     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
836       ASSERT(sp->node!=NULL);
837       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
838       // ToDo?: statistics gathering here (also for GUM!)
839       sp->node = (StgClosure *)MarkRoot(sp->node);
840     }
841
842   IF_DEBUG(gc,
843            debugBelch("markSparkQueue: spark statistics at start of GC:");
844            print_sparkq_stats());
845 }
846
847 void
848 print_spark(spark)
849 rtsSpark *spark;
850
851   char str[16];
852
853   if (spark==NULL) {
854     debugBelch("Spark: NIL\n");
855     return;
856   } else {
857     sprintf(str,
858             ((spark->node==NULL) ? "______" : "%#6lx"), 
859             stgCast(StgPtr,spark->node));
860
861     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
862             str, spark->name, 
863             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
864             spark->prev, spark->next);
865   }
866 }
867
868 void
869 print_sparkq(proc)
870 PEs proc;
871 // rtsSpark *hd;
872 {
873   rtsSpark *x = pending_sparks_hds[proc];
874
875   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
876   for (; x!=(rtsSpark*)NULL; x=x->next) {
877     print_spark(x);
878   }
879 }
880
881 /* 
882    Print a statistics of all spark queues.
883 */
884 void
885 print_sparkq_stats(void)
886 {
887   PEs p;
888
889   debugBelch("SparkQs: [");
890   for (p=0; p<RtsFlags.GranFlags.proc; p++)
891     debugBelch(", PE %d: %d", p, spark_queue_len(p));
892   debugBelch("\n");
893 }
894
895 #endif