allow build settings to be overriden by adding mk/validate.mk
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2006
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24 #include "Trace.h"
25
26 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
27
28 static INLINE_ME void bump_hd (StgSparkPool *p)
29 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
30
31 static INLINE_ME void bump_tl (StgSparkPool *p)
32 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
33
34 /* -----------------------------------------------------------------------------
35  * 
36  * Initialising spark pools.
37  *
38  * -------------------------------------------------------------------------- */
39
40 static void 
41 initSparkPool(StgSparkPool *pool)
42 {
43     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
44                                 * sizeof(StgClosure *),
45                                 "initSparkPools");
46     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
47     pool->hd  = pool->base;
48     pool->tl  = pool->base;
49 }
50
51 void
52 initSparkPools( void )
53 {
54 #ifdef THREADED_RTS
55     /* walk over the capabilities, allocating a spark pool for each one */
56     nat i;
57     for (i = 0; i < n_capabilities; i++) {
58         initSparkPool(&capabilities[i].r.rSparks);
59     }
60 #else
61     /* allocate a single spark pool */
62     initSparkPool(&MainCapability.r.rSparks);
63 #endif
64 }
65
66 void
67 freeSparkPool(StgSparkPool *pool) {
68     stgFree(pool->base);
69 }
70
71 /* -----------------------------------------------------------------------------
72  * 
73  * findSpark: find a spark on the current Capability that we can fork
74  * into a thread.
75  *
76  * -------------------------------------------------------------------------- */
77
78 StgClosure *
79 findSpark (Capability *cap)
80 {
81     StgSparkPool *pool;
82     StgClosure *spark;
83     
84     pool = &(cap->r.rSparks);
85     ASSERT_SPARK_POOL_INVARIANTS(pool);
86
87     while (pool->hd != pool->tl) {
88         spark = *pool->hd;
89         bump_hd(pool);
90         if (closure_SHOULD_SPARK(spark)) {
91 #ifdef GRAN
92             if (RtsFlags.ParFlags.ParStats.Sparks) 
93                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
94                                  GR_STEALING, ((StgTSO *)NULL), spark, 
95                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
96 #endif
97             return spark;
98         }
99     }
100     // spark pool is now empty
101     return NULL;
102 }
103
104 /* -----------------------------------------------------------------------------
105  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
106  * implicit slide i.e. after marking all sparks are at the beginning of the
107  * spark pool and the spark pool only contains sparkable closures 
108  * -------------------------------------------------------------------------- */
109
110 void
111 markSparkQueue (evac_fn evac)
112
113     StgClosure **sparkp, **to_sparkp;
114     nat i, n, pruned_sparks; // stats only
115     StgSparkPool *pool;
116     Capability *cap;
117     
118     PAR_TICKY_MARK_SPARK_QUEUE_START();
119     
120     n = 0;
121     pruned_sparks = 0;
122     for (i = 0; i < n_capabilities; i++) {
123         cap = &capabilities[i];
124         pool = &(cap->r.rSparks);
125         
126         ASSERT_SPARK_POOL_INVARIANTS(pool);
127
128 #if defined(PARALLEL_HASKELL)
129         // stats only
130         n = 0;
131         pruned_sparks = 0;
132 #endif
133         
134         sparkp = pool->hd;
135         to_sparkp = pool->hd;
136         while (sparkp != pool->tl) {
137             ASSERT(*sparkp!=NULL);
138             ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
139             // ToDo?: statistics gathering here (also for GUM!)
140             if (closure_SHOULD_SPARK(*sparkp)) {
141                 evac(sparkp);
142                 *to_sparkp++ = *sparkp;
143                 if (to_sparkp == pool->lim) {
144                     to_sparkp = pool->base;
145                 }
146                 n++;
147             } else {
148                 pruned_sparks++;
149             }
150             sparkp++;
151             if (sparkp == pool->lim) {
152                 sparkp = pool->base;
153             }
154         }
155         pool->tl = to_sparkp;
156         
157         PAR_TICKY_MARK_SPARK_QUEUE_END(n);
158         
159 #if defined(PARALLEL_HASKELL)
160         debugTrace(DEBUG_sched, 
161                    "marked %d sparks and pruned %d sparks on [%x]",
162                    n, pruned_sparks, mytid);
163 #else
164         debugTrace(DEBUG_sched, 
165                    "marked %d sparks and pruned %d sparks",
166                    n, pruned_sparks);
167 #endif
168         
169         debugTrace(DEBUG_sched,
170                    "new spark queue len=%d; (hd=%p; tl=%p)\n",
171                    sparkPoolSize(pool), pool->hd, pool->tl);
172     }
173 }
174
175 /* -----------------------------------------------------------------------------
176  * 
177  * Turn a spark into a real thread
178  *
179  * -------------------------------------------------------------------------- */
180
181 void
182 createSparkThread (Capability *cap, StgClosure *p)
183 {
184     StgTSO *tso;
185
186     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
187     appendToRunQueue(cap,tso);
188 }
189
190 /* -----------------------------------------------------------------------------
191  * 
192  * Create a new spark
193  *
194  * -------------------------------------------------------------------------- */
195
196 #define DISCARD_NEW
197
198 StgInt
199 newSpark (StgRegTable *reg, StgClosure *p)
200 {
201     StgSparkPool *pool = &(reg->rSparks);
202
203     ASSERT_SPARK_POOL_INVARIANTS(pool);
204
205     if (closure_SHOULD_SPARK(p)) {
206 #ifdef DISCARD_NEW
207         StgClosure **new_tl;
208         new_tl = pool->tl + 1;
209         if (new_tl == pool->lim) { new_tl = pool->base; }
210         if (new_tl != pool->hd) {
211             *pool->tl = p;
212             pool->tl = new_tl;
213         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
214             // if the old closure is not sparkable, discard it and
215             // keep the new one.  Otherwise, keep the old one.
216             *pool->tl = p;
217             bump_hd(pool);
218         }
219 #else  /* DISCARD OLD */
220         *pool->tl = p;
221         bump_tl(pool);
222         if (pool->tl == pool->hd) { bump_hd(pool); }
223 #endif
224     }   
225
226     ASSERT_SPARK_POOL_INVARIANTS(pool);
227     return 1;
228 }
229
230 #else
231
232 StgInt
233 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
234 {
235     /* nothing */
236     return 1;
237 }
238
239 #endif /* PARALLEL_HASKELL || THREADED_RTS */
240
241
242 /* -----------------------------------------------------------------------------
243  * 
244  * GRAN & PARALLEL_HASKELL stuff beyond here.
245  *
246  * -------------------------------------------------------------------------- */
247
248 #if defined(PARALLEL_HASKELL) || defined(GRAN)
249
250 static void slide_spark_pool( StgSparkPool *pool );
251
252 rtsBool
253 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
254 {
255   if (pool->tl == pool->lim)
256     slide_spark_pool(pool);
257
258   if (closure_SHOULD_SPARK(closure) && 
259       pool->tl < pool->lim) {
260     *(pool->tl++) = closure;
261
262 #if defined(PARALLEL_HASKELL)
263     // collect parallel global statistics (currently done together with GC stats)
264     if (RtsFlags.ParFlags.ParStats.Global &&
265         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
266       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
267       globalParStats.tot_sparks_created++;
268     }
269 #endif
270     return rtsTrue;
271   } else {
272 #if defined(PARALLEL_HASKELL)
273     // collect parallel global statistics (currently done together with GC stats)
274     if (RtsFlags.ParFlags.ParStats.Global &&
275         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
276       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
277       globalParStats.tot_sparks_ignored++;
278     }
279 #endif
280     return rtsFalse;
281   }
282 }
283
284 static void
285 slide_spark_pool( StgSparkPool *pool )
286 {
287   StgClosure **sparkp, **to_sparkp;
288
289   sparkp = pool->hd;
290   to_sparkp = pool->base;
291   while (sparkp < pool->tl) {
292     ASSERT(to_sparkp<=sparkp);
293     ASSERT(*sparkp!=NULL);
294     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
295
296     if (closure_SHOULD_SPARK(*sparkp)) {
297       *to_sparkp++ = *sparkp++;
298     } else {
299       sparkp++;
300     }
301   }
302   pool->hd = pool->base;
303   pool->tl = to_sparkp;
304 }
305
306 void
307 disposeSpark(spark)
308 StgClosure *spark;
309 {
310 #if !defined(THREADED_RTS)
311   Capability *cap;
312   StgSparkPool *pool;
313
314   cap = &MainRegTable;
315   pool = &(cap->rSparks);
316   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
317 #endif
318   ASSERT(spark != (StgClosure *)NULL);
319   /* Do nothing */
320 }
321
322
323 #elif defined(GRAN)
324
325 /* 
326    Search the spark queue of the proc in event for a spark that's worth
327    turning into a thread 
328    (was gimme_spark in the old RTS)
329 */
330 void
331 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
332 {
333    PEs proc = event->proc,       /* proc to search for work */
334        creator = event->creator; /* proc that requested work */
335    StgClosure* node;
336    rtsBool found;
337    rtsSparkQ spark_of_non_local_node = NULL, 
338              spark_of_non_local_node_prev = NULL, 
339              low_priority_spark = NULL, 
340              low_priority_spark_prev = NULL,
341              spark = NULL, prev = NULL;
342   
343    /* Choose a spark from the local spark queue */
344    prev = (rtsSpark*)NULL;
345    spark = pending_sparks_hds[proc];
346    found = rtsFalse;
347
348    // ToDo: check this code & implement local sparking !! -- HWL  
349    while (!found && spark != (rtsSpark*)NULL)
350      {
351        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
352               (prev==(rtsSpark*)NULL || prev->next==spark) &&
353               (spark->prev==prev));
354        node = spark->node;
355        if (!closure_SHOULD_SPARK(node)) 
356          {
357            IF_GRAN_DEBUG(checkSparkQ,
358                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
359                                spark, node));
360
361            if (RtsFlags.GranFlags.GranSimStats.Sparks)
362              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
363                               spark->node, spark->name, spark_queue_len(proc));
364   
365            ASSERT(spark != (rtsSpark*)NULL);
366            ASSERT(SparksAvail>0);
367            --SparksAvail;
368
369            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
370            spark = delete_from_sparkq (spark, proc, rtsTrue);
371            if (spark != (rtsSpark*)NULL)
372              prev = spark->prev;
373            continue;
374          }
375        /* -- node should eventually be sparked */
376        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
377                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
378          {
379            barf("Local sparking not yet implemented");
380
381            /* Remember first low priority spark */
382            if (spark_of_non_local_node==(rtsSpark*)NULL) {
383              spark_of_non_local_node_prev = prev;
384              spark_of_non_local_node = spark;
385               }
386   
387            if (spark->next == (rtsSpark*)NULL) { 
388              /* ASSERT(spark==SparkQueueTl);  just for testing */
389              prev = spark_of_non_local_node_prev;
390              spark = spark_of_non_local_node;
391              found = rtsTrue;
392              break;
393            }
394   
395 # if defined(GRAN) && defined(GRAN_CHECK)
396            /* Should never happen; just for testing 
397            if (spark==pending_sparks_tl) {
398              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
399                 stg_exit(EXIT_FAILURE);
400                 } */
401 # endif
402            prev = spark; 
403            spark = spark->next;
404            ASSERT(SparksAvail>0);
405            --SparksAvail;
406            continue;
407          }
408        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
409                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
410          {
411            if (RtsFlags.GranFlags.DoPrioritySparking)
412              barf("Priority sparking not yet implemented");
413
414            found = rtsTrue;
415          }
416 #if 0      
417        else /* only used if SparkPriority2 is defined */
418          {
419            /* ToDo: fix the code below and re-integrate it */
420            /* Remember first low priority spark */
421            if (low_priority_spark==(rtsSpark*)NULL) { 
422              low_priority_spark_prev = prev;
423              low_priority_spark = spark;
424            }
425   
426            if (spark->next == (rtsSpark*)NULL) { 
427                 /* ASSERT(spark==spark_queue_tl);  just for testing */
428              prev = low_priority_spark_prev;
429              spark = low_priority_spark;
430              found = rtsTrue;       /* take low pri spark => rc is 2  */
431              break;
432            }
433   
434            /* Should never happen; just for testing 
435            if (spark==pending_sparks_tl) {
436              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
437                 stg_exit(EXIT_FAILURE);
438              break;
439            } */                
440            prev = spark; 
441            spark = spark->next;
442
443            IF_GRAN_DEBUG(pri,
444                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
445                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
446                                spark->node, spark->name);)
447            }
448 #endif
449    }  /* while (spark!=NULL && !found) */
450
451    *spark_res = spark;
452    *found_res = found;
453 }
454
455 /*
456   Turn the spark into a thread.
457   In GranSim this basically means scheduling a StartThread event for the
458   node pointed to by the spark at some point in the future.
459   (was munch_spark in the old RTS)
460 */
461 rtsBool
462 activateSpark (rtsEvent *event, rtsSparkQ spark) 
463 {
464   PEs proc = event->proc,       /* proc to search for work */
465       creator = event->creator; /* proc that requested work */
466   StgTSO* tso;
467   StgClosure* node;
468   rtsTime spark_arrival_time;
469
470   /* 
471      We've found a node on PE proc requested by PE creator.
472      If proc==creator we can turn the spark into a thread immediately;
473      otherwise we schedule a MoveSpark event on the requesting PE
474   */
475      
476   /* DaH Qu' yIchen */
477   if (proc!=creator) { 
478
479     /* only possible if we simulate GUM style fishing */
480     ASSERT(RtsFlags.GranFlags.Fishing);
481
482     /* Message packing costs for sending a Fish; qeq jabbI'ID */
483     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
484   
485     if (RtsFlags.GranFlags.GranSimStats.Sparks)
486       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
487                        (StgTSO*)NULL, spark->node,
488                        spark->name, spark_queue_len(proc));
489
490     /* time of the spark arrival on the remote PE */
491     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
492
493     new_event(creator, proc, spark_arrival_time,
494               MoveSpark,
495               (StgTSO*)NULL, spark->node, spark);
496
497     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
498             
499   } else { /* proc==creator i.e. turn the spark into a thread */
500
501     if ( RtsFlags.GranFlags.GranSimStats.Global && 
502          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
503
504       globalGranStats.tot_low_pri_sparks++;
505       IF_GRAN_DEBUG(pri,
506                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
507                           spark->gran_info, 
508                           spark->node, spark->name));
509     } 
510     
511     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
512     
513     node = spark->node;
514     
515 # if 0
516     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
517     if (GARBAGE COLLECTION IS NECESSARY) {
518       /* Some kind of backoff needed here in case there's too little heap */
519 #  if defined(GRAN_CHECK) && defined(GRAN)
520       if (RtsFlags.GcFlags.giveStats)
521         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
522                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
523                 spark, node, spark->name);
524 #  endif
525       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
526                   FindWork,
527                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
528       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
529       GarbageCollect(GetRoots, rtsFalse);
530       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
531       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
532       spark = NULL;
533       return; /* was: continue; */ /* to the next event, eventually */
534     }
535 # endif
536     
537     if (RtsFlags.GranFlags.GranSimStats.Sparks)
538       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
539                        spark->node, spark->name,
540                        spark_queue_len(CurrentProc));
541     
542     new_event(proc, proc, CurrentTime[proc],
543               StartThread, 
544               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
545     
546     procStatus[proc] = Starting;
547   }
548 }
549
550 /* -------------------------------------------------------------------------
551    This is the main point where handling granularity information comes into
552    play. 
553    ------------------------------------------------------------------------- */
554
555 #define MAX_RAND_PRI    100
556
557 /* 
558    Granularity info transformers. 
559    Applied to the GRAN_INFO field of a spark.
560 */
561 STATIC_INLINE nat  ID(nat x) { return(x); };
562 STATIC_INLINE nat  INV(nat x) { return(-x); };
563 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
564 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
565
566 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
567 rtsSpark *
568 newSpark(node,name,gran_info,size_info,par_info,local)
569 StgClosure *node;
570 nat name, gran_info, size_info, par_info, local;
571 {
572   nat pri;
573   rtsSpark *newspark;
574
575   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
576         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
577         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
578                            ID(gran_info);
579
580   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
581        pri<RtsFlags.GranFlags.SparkPriority ) {
582     IF_GRAN_DEBUG(pri,
583       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
584               pri, RtsFlags.GranFlags.SparkPriority, node, name));
585     return ((rtsSpark*)NULL);
586   }
587
588   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
589   newspark->prev = newspark->next = (rtsSpark*)NULL;
590   newspark->node = node;
591   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
592   newspark->gran_info = pri;
593   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
594
595   if (RtsFlags.GranFlags.GranSimStats.Global) {
596     globalGranStats.tot_sparks_created++;
597     globalGranStats.sparks_created_on_PE[CurrentProc]++;
598   }
599
600   return(newspark);
601 }
602
603 void
604 disposeSpark(spark)
605 rtsSpark *spark;
606 {
607   ASSERT(spark!=NULL);
608   stgFree(spark);
609 }
610
611 void 
612 disposeSparkQ(spark)
613 rtsSparkQ spark;
614 {
615   if (spark==NULL) 
616     return;
617
618   disposeSparkQ(spark->next);
619
620 # ifdef GRAN_CHECK
621   if (SparksAvail < 0) {
622     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
623     print_spark(spark);
624   }
625 # endif
626
627   stgFree(spark);
628 }
629
630 /*
631    With PrioritySparking add_to_spark_queue performs an insert sort to keep
632    the spark queue sorted. Otherwise the spark is just added to the end of
633    the queue. 
634 */
635
636 void
637 add_to_spark_queue(spark)
638 rtsSpark *spark;
639 {
640   rtsSpark *prev = NULL, *next = NULL;
641   nat count = 0;
642   rtsBool found = rtsFalse;
643
644   if ( spark == (rtsSpark *)NULL ) {
645     return;
646   }
647
648   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
649     /* Priority sparking is enabled i.e. spark queues must be sorted */
650
651     for (prev = NULL, next = pending_sparks_hd, count=0;
652          (next != NULL) && 
653          !(found = (spark->gran_info >= next->gran_info));
654          prev = next, next = next->next, count++) 
655      {}
656
657   } else {   /* 'utQo' */
658     /* Priority sparking is disabled */
659     
660     found = rtsFalse;   /* to add it at the end */
661
662   }
663
664   if (found) {
665     /* next points to the first spark with a gran_info smaller than that
666        of spark; therefore, add spark before next into the spark queue */
667     spark->next = next;
668     if ( next == NULL ) {
669       pending_sparks_tl = spark;
670     } else {
671       next->prev = spark;
672     }
673     spark->prev = prev;
674     if ( prev == NULL ) {
675       pending_sparks_hd = spark;
676     } else {
677       prev->next = spark;
678     }
679   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
680     /* add the spark at the end of the spark queue */
681     spark->next = NULL;                        
682     spark->prev = pending_sparks_tl;
683     if (pending_sparks_hd == NULL)
684       pending_sparks_hd = spark;
685     else
686       pending_sparks_tl->next = spark;
687     pending_sparks_tl = spark;    
688   } 
689   ++SparksAvail;
690
691   /* add costs for search in priority sparking */
692   if (RtsFlags.GranFlags.DoPrioritySparking) {
693     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
694   }
695
696   IF_GRAN_DEBUG(checkSparkQ,
697                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
698                       spark, spark->node, CurrentProc);
699                 print_sparkq_stats());
700
701 #  if defined(GRAN_CHECK)
702   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
703     for (prev = NULL, next =  pending_sparks_hd;
704          (next != NULL);
705          prev = next, next = next->next) 
706       {}
707     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
708       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
709               spark,CurrentProc, 
710               pending_sparks_tl, prev);
711   }
712 #  endif
713
714 #  if defined(GRAN_CHECK)
715   /* Check if the sparkq is still sorted. Just for testing, really!  */
716   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
717        RtsFlags.GranFlags.Debug.pri ) {
718     rtsBool sorted = rtsTrue;
719     rtsSpark *prev, *next;
720
721     if (pending_sparks_hd == NULL ||
722         pending_sparks_hd->next == NULL ) {
723       /* just 1 elem => ok */
724     } else {
725       for (prev = pending_sparks_hd,
726            next = pending_sparks_hd->next;
727            (next != NULL) ;
728            prev = next, next = next->next) {
729         sorted = sorted && 
730                  (prev->gran_info >= next->gran_info);
731       }
732     }
733     if (!sorted) {
734       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
735               CurrentProc);
736       print_sparkq(CurrentProc);
737     }
738   }
739 #  endif
740 }
741
742 nat
743 spark_queue_len(proc) 
744 PEs proc;
745 {
746  rtsSpark *prev, *spark;                     /* prev only for testing !! */
747  nat len;
748
749  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
750       spark != NULL; 
751       len++, prev = spark, spark = spark->next)
752    {}
753
754 #  if defined(GRAN_CHECK)
755   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
756     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
757       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
758               proc, pending_sparks_tls[proc], prev);
759 #  endif
760
761  return (len);
762 }
763
764 /* 
765    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
766    hd and tl pointers of the spark queue. Returns a pointer to the next
767    spark in the queue.
768 */
769 rtsSpark *
770 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
771 rtsSpark *spark;
772 PEs p;
773 rtsBool dispose_too;
774 {
775   rtsSpark *new_spark;
776
777   if (spark==NULL) 
778     barf("delete_from_sparkq: trying to delete NULL spark\n");
779
780 #  if defined(GRAN_CHECK)
781   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
782     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
783             pending_sparks_hd, pending_sparks_tl,
784             spark->prev, spark, spark->next, 
785             (spark->next==NULL ? 0 : spark->next->prev));
786   }
787 #  endif
788
789   if (spark->prev==NULL) {
790     /* spark is first spark of queue => adjust hd pointer */
791     ASSERT(pending_sparks_hds[p]==spark);
792     pending_sparks_hds[p] = spark->next;
793   } else {
794     spark->prev->next = spark->next;
795   }
796   if (spark->next==NULL) {
797     ASSERT(pending_sparks_tls[p]==spark);
798     /* spark is first spark of queue => adjust tl pointer */
799     pending_sparks_tls[p] = spark->prev;
800   } else {
801     spark->next->prev = spark->prev;
802   }
803   new_spark = spark->next;
804   
805 #  if defined(GRAN_CHECK)
806   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
807     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
808             pending_sparks_hd, pending_sparks_tl,
809             spark->prev, spark, spark->next, 
810             (spark->next==NULL ? 0 : spark->next->prev), spark);
811   }
812 #  endif
813
814   if (dispose_too)
815     disposeSpark(spark);
816                   
817   return new_spark;
818 }
819
820 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
821 void
822 markSparkQueue(void)
823
824   StgClosure *MarkRoot(StgClosure *root); // prototype
825   PEs p;
826   rtsSpark *sp;
827
828   for (p=0; p<RtsFlags.GranFlags.proc; p++)
829     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
830       ASSERT(sp->node!=NULL);
831       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
832       // ToDo?: statistics gathering here (also for GUM!)
833       sp->node = (StgClosure *)MarkRoot(sp->node);
834     }
835
836   IF_DEBUG(gc,
837            debugBelch("markSparkQueue: spark statistics at start of GC:");
838            print_sparkq_stats());
839 }
840
841 void
842 print_spark(spark)
843 rtsSpark *spark;
844
845   char str[16];
846
847   if (spark==NULL) {
848     debugBelch("Spark: NIL\n");
849     return;
850   } else {
851     sprintf(str,
852             ((spark->node==NULL) ? "______" : "%#6lx"), 
853             stgCast(StgPtr,spark->node));
854
855     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
856             str, spark->name, 
857             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
858             spark->prev, spark->next);
859   }
860 }
861
862 void
863 print_sparkq(proc)
864 PEs proc;
865 // rtsSpark *hd;
866 {
867   rtsSpark *x = pending_sparks_hds[proc];
868
869   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
870   for (; x!=(rtsSpark*)NULL; x=x->next) {
871     print_spark(x);
872   }
873 }
874
875 /* 
876    Print a statistics of all spark queues.
877 */
878 void
879 print_sparkq_stats(void)
880 {
881   PEs p;
882
883   debugBelch("SparkQs: [");
884   for (p=0; p<RtsFlags.GranFlags.proc; p++)
885     debugBelch(", PE %d: %d", p, spark_queue_len(p));
886   debugBelch("\n");
887 }
888
889 #endif