remove empty dir
[ghc-hetmet.git] / ghc / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2006
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Schedule.h"
12 #include "SchedAPI.h"
13 #include "Storage.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 # if defined(PARALLEL_HASKELL)
18 # include "ParallelRts.h"
19 # include "GranSimRts.h"   // for GR_...
20 # elif defined(GRAN)
21 # include "GranSimRts.h"
22 # endif
23 #include "Sparks.h"
24
25 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
26
27 static INLINE_ME void bump_hd (StgSparkPool *p)
28 { p->hd++; if (p->hd == p->lim) p->hd = p->base; }
29
30 static INLINE_ME void bump_tl (StgSparkPool *p)
31 { p->tl++; if (p->tl == p->lim) p->tl = p->base; }
32
33 /* -----------------------------------------------------------------------------
34  * 
35  * Initialising spark pools.
36  *
37  * -------------------------------------------------------------------------- */
38
39 static void 
40 initSparkPool(StgSparkPool *pool)
41 {
42     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
43                                 * sizeof(StgClosure *),
44                                 "initSparkPools");
45     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
46     pool->hd  = pool->base;
47     pool->tl  = pool->base;
48 }
49
50 void
51 initSparkPools( void )
52 {
53 #ifdef THREADED_RTS
54     /* walk over the capabilities, allocating a spark pool for each one */
55     nat i;
56     for (i = 0; i < n_capabilities; i++) {
57         initSparkPool(&capabilities[i].r.rSparks);
58     }
59 #else
60     /* allocate a single spark pool */
61     initSparkPool(&MainCapability.r.rSparks);
62 #endif
63 }
64
65 /* -----------------------------------------------------------------------------
66  * 
67  * findSpark: find a spark on the current Capability that we can fork
68  * into a thread.
69  *
70  * -------------------------------------------------------------------------- */
71
72 StgClosure *
73 findSpark (Capability *cap)
74 {
75     StgSparkPool *pool;
76     StgClosure *spark;
77     
78     pool = &(cap->r.rSparks);
79     ASSERT_SPARK_POOL_INVARIANTS(pool);
80
81     while (pool->hd != pool->tl) {
82         spark = *pool->hd;
83         bump_hd(pool);
84         if (closure_SHOULD_SPARK(spark)) {
85 #ifdef GRAN
86             if (RtsFlags.ParFlags.ParStats.Sparks) 
87                 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
88                                  GR_STEALING, ((StgTSO *)NULL), spark, 
89                                  0, 0 /* spark_queue_len(ADVISORY_POOL) */);
90 #endif
91             return spark;
92         }
93     }
94     // spark pool is now empty
95     return NULL;
96 }
97
98 /* -----------------------------------------------------------------------------
99  * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
100  * implicit slide i.e. after marking all sparks are at the beginning of the
101  * spark pool and the spark pool only contains sparkable closures 
102  * -------------------------------------------------------------------------- */
103
104 void
105 markSparkQueue (evac_fn evac)
106
107     StgClosure **sparkp, **to_sparkp;
108     nat i, n, pruned_sparks; // stats only
109     StgSparkPool *pool;
110     Capability *cap;
111     
112     PAR_TICKY_MARK_SPARK_QUEUE_START();
113     
114     n = 0;
115     pruned_sparks = 0;
116     for (i = 0; i < n_capabilities; i++) {
117         cap = &capabilities[i];
118         pool = &(cap->r.rSparks);
119         
120         ASSERT_SPARK_POOL_INVARIANTS(pool);
121
122 #if defined(PARALLEL_HASKELL)
123         // stats only
124         n = 0;
125         pruned_sparks = 0;
126 #endif
127         
128         sparkp = pool->hd;
129         to_sparkp = pool->hd;
130         while (sparkp != pool->tl) {
131             ASSERT(to_sparkp<=sparkp);
132             ASSERT(*sparkp!=NULL);
133             ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
134             // ToDo?: statistics gathering here (also for GUM!)
135             if (closure_SHOULD_SPARK(*sparkp)) {
136                 evac(sparkp);
137                 *to_sparkp++ = *sparkp;
138                 n++;
139             } else {
140                 pruned_sparks++;
141             }
142             sparkp++;
143             if (sparkp == pool->lim) {
144                 sparkp = pool->base;
145             }
146         }
147         pool->tl = to_sparkp;
148         
149         PAR_TICKY_MARK_SPARK_QUEUE_END(n);
150         
151 #if defined(PARALLEL_HASKELL)
152         IF_DEBUG(scheduler,
153                  debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
154                             n, pruned_sparks, mytid));
155 #else
156         IF_DEBUG(scheduler,
157                debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n",
158                           n, pruned_sparks));
159 #endif
160         
161         IF_DEBUG(scheduler,
162                  debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)\n",
163                             sparkPoolSize(pool), pool->hd, pool->tl));
164         
165     }
166 }
167
168 /* -----------------------------------------------------------------------------
169  * 
170  * Turn a spark into a real thread
171  *
172  * -------------------------------------------------------------------------- */
173
174 void
175 createSparkThread (Capability *cap, StgClosure *p)
176 {
177     StgTSO *tso;
178
179     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
180     appendToRunQueue(cap,tso);
181 }
182
183 /* -----------------------------------------------------------------------------
184  * 
185  * Create a new spark
186  *
187  * -------------------------------------------------------------------------- */
188
189 #define DISCARD_NEW
190
191 StgInt
192 newSpark (StgRegTable *reg, StgClosure *p)
193 {
194     StgSparkPool *pool = &(reg->rSparks);
195
196     ASSERT_SPARK_POOL_INVARIANTS(pool);
197
198     if (closure_SHOULD_SPARK(p)) {
199 #ifdef DISCARD_NEW
200         StgClosure **new_tl;
201         new_tl = pool->tl + 1;
202         if (new_tl == pool->lim) { new_tl = pool->base; }
203         if (new_tl != pool->hd) {
204             *pool->tl = p;
205             pool->tl = new_tl;
206         } else if (!closure_SHOULD_SPARK(*pool->hd)) {
207             // if the old closure is not sparkable, discard it and
208             // keep the new one.  Otherwise, keep the old one.
209             *pool->tl = p;
210             bump_hd(pool);
211         }
212 #else  /* DISCARD OLD */
213         *pool->tl = p;
214         bump_tl(pool);
215         if (pool->tl == pool->hd) { bump_hd(pool); }
216 #endif
217     }   
218
219     ASSERT_SPARK_POOL_INVARIANTS(pool);
220     return 1;
221 }
222
223 #else
224
225 StgInt
226 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
227 {
228     /* nothing */
229     return 1;
230 }
231
232 #endif /* PARALLEL_HASKELL || THREADED_RTS */
233
234
235 /* -----------------------------------------------------------------------------
236  * 
237  * GRAN & PARALLEL_HASKELL stuff beyond here.
238  *
239  * -------------------------------------------------------------------------- */
240
241 #if defined(PARALLEL_HASKELL) || defined(GRAN)
242
243 static void slide_spark_pool( StgSparkPool *pool );
244
245 rtsBool
246 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
247 {
248   if (pool->tl == pool->lim)
249     slide_spark_pool(pool);
250
251   if (closure_SHOULD_SPARK(closure) && 
252       pool->tl < pool->lim) {
253     *(pool->tl++) = closure;
254
255 #if defined(PARALLEL_HASKELL)
256     // collect parallel global statistics (currently done together with GC stats)
257     if (RtsFlags.ParFlags.ParStats.Global &&
258         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
259       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
260       globalParStats.tot_sparks_created++;
261     }
262 #endif
263     return rtsTrue;
264   } else {
265 #if defined(PARALLEL_HASKELL)
266     // collect parallel global statistics (currently done together with GC stats)
267     if (RtsFlags.ParFlags.ParStats.Global &&
268         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
269       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
270       globalParStats.tot_sparks_ignored++;
271     }
272 #endif
273     return rtsFalse;
274   }
275 }
276
277 static void
278 slide_spark_pool( StgSparkPool *pool )
279 {
280   StgClosure **sparkp, **to_sparkp;
281
282   sparkp = pool->hd;
283   to_sparkp = pool->base;
284   while (sparkp < pool->tl) {
285     ASSERT(to_sparkp<=sparkp);
286     ASSERT(*sparkp!=NULL);
287     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
288
289     if (closure_SHOULD_SPARK(*sparkp)) {
290       *to_sparkp++ = *sparkp++;
291     } else {
292       sparkp++;
293     }
294   }
295   pool->hd = pool->base;
296   pool->tl = to_sparkp;
297 }
298
299 void
300 disposeSpark(spark)
301 StgClosure *spark;
302 {
303 #if !defined(THREADED_RTS)
304   Capability *cap;
305   StgSparkPool *pool;
306
307   cap = &MainRegTable;
308   pool = &(cap->rSparks);
309   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
310 #endif
311   ASSERT(spark != (StgClosure *)NULL);
312   /* Do nothing */
313 }
314
315
316 #elif defined(GRAN)
317
318 /* 
319    Search the spark queue of the proc in event for a spark that's worth
320    turning into a thread 
321    (was gimme_spark in the old RTS)
322 */
323 void
324 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
325 {
326    PEs proc = event->proc,       /* proc to search for work */
327        creator = event->creator; /* proc that requested work */
328    StgClosure* node;
329    rtsBool found;
330    rtsSparkQ spark_of_non_local_node = NULL, 
331              spark_of_non_local_node_prev = NULL, 
332              low_priority_spark = NULL, 
333              low_priority_spark_prev = NULL,
334              spark = NULL, prev = NULL;
335   
336    /* Choose a spark from the local spark queue */
337    prev = (rtsSpark*)NULL;
338    spark = pending_sparks_hds[proc];
339    found = rtsFalse;
340
341    // ToDo: check this code & implement local sparking !! -- HWL  
342    while (!found && spark != (rtsSpark*)NULL)
343      {
344        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
345               (prev==(rtsSpark*)NULL || prev->next==spark) &&
346               (spark->prev==prev));
347        node = spark->node;
348        if (!closure_SHOULD_SPARK(node)) 
349          {
350            IF_GRAN_DEBUG(checkSparkQ,
351                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
352                                spark, node));
353
354            if (RtsFlags.GranFlags.GranSimStats.Sparks)
355              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
356                               spark->node, spark->name, spark_queue_len(proc));
357   
358            ASSERT(spark != (rtsSpark*)NULL);
359            ASSERT(SparksAvail>0);
360            --SparksAvail;
361
362            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
363            spark = delete_from_sparkq (spark, proc, rtsTrue);
364            if (spark != (rtsSpark*)NULL)
365              prev = spark->prev;
366            continue;
367          }
368        /* -- node should eventually be sparked */
369        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
370                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
371          {
372            barf("Local sparking not yet implemented");
373
374            /* Remember first low priority spark */
375            if (spark_of_non_local_node==(rtsSpark*)NULL) {
376              spark_of_non_local_node_prev = prev;
377              spark_of_non_local_node = spark;
378               }
379   
380            if (spark->next == (rtsSpark*)NULL) { 
381              /* ASSERT(spark==SparkQueueTl);  just for testing */
382              prev = spark_of_non_local_node_prev;
383              spark = spark_of_non_local_node;
384              found = rtsTrue;
385              break;
386            }
387   
388 # if defined(GRAN) && defined(GRAN_CHECK)
389            /* Should never happen; just for testing 
390            if (spark==pending_sparks_tl) {
391              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
392                 stg_exit(EXIT_FAILURE);
393                 } */
394 # endif
395            prev = spark; 
396            spark = spark->next;
397            ASSERT(SparksAvail>0);
398            --SparksAvail;
399            continue;
400          }
401        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
402                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
403          {
404            if (RtsFlags.GranFlags.DoPrioritySparking)
405              barf("Priority sparking not yet implemented");
406
407            found = rtsTrue;
408          }
409 #if 0      
410        else /* only used if SparkPriority2 is defined */
411          {
412            /* ToDo: fix the code below and re-integrate it */
413            /* Remember first low priority spark */
414            if (low_priority_spark==(rtsSpark*)NULL) { 
415              low_priority_spark_prev = prev;
416              low_priority_spark = spark;
417            }
418   
419            if (spark->next == (rtsSpark*)NULL) { 
420                 /* ASSERT(spark==spark_queue_tl);  just for testing */
421              prev = low_priority_spark_prev;
422              spark = low_priority_spark;
423              found = rtsTrue;       /* take low pri spark => rc is 2  */
424              break;
425            }
426   
427            /* Should never happen; just for testing 
428            if (spark==pending_sparks_tl) {
429              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
430                 stg_exit(EXIT_FAILURE);
431              break;
432            } */                
433            prev = spark; 
434            spark = spark->next;
435
436            IF_GRAN_DEBUG(pri,
437                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
438                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
439                                spark->node, spark->name);)
440            }
441 #endif
442    }  /* while (spark!=NULL && !found) */
443
444    *spark_res = spark;
445    *found_res = found;
446 }
447
448 /*
449   Turn the spark into a thread.
450   In GranSim this basically means scheduling a StartThread event for the
451   node pointed to by the spark at some point in the future.
452   (was munch_spark in the old RTS)
453 */
454 rtsBool
455 activateSpark (rtsEvent *event, rtsSparkQ spark) 
456 {
457   PEs proc = event->proc,       /* proc to search for work */
458       creator = event->creator; /* proc that requested work */
459   StgTSO* tso;
460   StgClosure* node;
461   rtsTime spark_arrival_time;
462
463   /* 
464      We've found a node on PE proc requested by PE creator.
465      If proc==creator we can turn the spark into a thread immediately;
466      otherwise we schedule a MoveSpark event on the requesting PE
467   */
468      
469   /* DaH Qu' yIchen */
470   if (proc!=creator) { 
471
472     /* only possible if we simulate GUM style fishing */
473     ASSERT(RtsFlags.GranFlags.Fishing);
474
475     /* Message packing costs for sending a Fish; qeq jabbI'ID */
476     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
477   
478     if (RtsFlags.GranFlags.GranSimStats.Sparks)
479       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
480                        (StgTSO*)NULL, spark->node,
481                        spark->name, spark_queue_len(proc));
482
483     /* time of the spark arrival on the remote PE */
484     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
485
486     new_event(creator, proc, spark_arrival_time,
487               MoveSpark,
488               (StgTSO*)NULL, spark->node, spark);
489
490     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
491             
492   } else { /* proc==creator i.e. turn the spark into a thread */
493
494     if ( RtsFlags.GranFlags.GranSimStats.Global && 
495          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
496
497       globalGranStats.tot_low_pri_sparks++;
498       IF_GRAN_DEBUG(pri,
499                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
500                           spark->gran_info, 
501                           spark->node, spark->name));
502     } 
503     
504     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
505     
506     node = spark->node;
507     
508 # if 0
509     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
510     if (GARBAGE COLLECTION IS NECESSARY) {
511       /* Some kind of backoff needed here in case there's too little heap */
512 #  if defined(GRAN_CHECK) && defined(GRAN)
513       if (RtsFlags.GcFlags.giveStats)
514         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
515                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
516                 spark, node, spark->name);
517 #  endif
518       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
519                   FindWork,
520                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
521       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
522       GarbageCollect(GetRoots, rtsFalse);
523       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
524       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
525       spark = NULL;
526       return; /* was: continue; */ /* to the next event, eventually */
527     }
528 # endif
529     
530     if (RtsFlags.GranFlags.GranSimStats.Sparks)
531       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
532                        spark->node, spark->name,
533                        spark_queue_len(CurrentProc));
534     
535     new_event(proc, proc, CurrentTime[proc],
536               StartThread, 
537               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
538     
539     procStatus[proc] = Starting;
540   }
541 }
542
543 /* -------------------------------------------------------------------------
544    This is the main point where handling granularity information comes into
545    play. 
546    ------------------------------------------------------------------------- */
547
548 #define MAX_RAND_PRI    100
549
550 /* 
551    Granularity info transformers. 
552    Applied to the GRAN_INFO field of a spark.
553 */
554 STATIC_INLINE nat  ID(nat x) { return(x); };
555 STATIC_INLINE nat  INV(nat x) { return(-x); };
556 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
557 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
558
559 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
560 rtsSpark *
561 newSpark(node,name,gran_info,size_info,par_info,local)
562 StgClosure *node;
563 nat name, gran_info, size_info, par_info, local;
564 {
565   nat pri;
566   rtsSpark *newspark;
567
568   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
569         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
570         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
571                            ID(gran_info);
572
573   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
574        pri<RtsFlags.GranFlags.SparkPriority ) {
575     IF_GRAN_DEBUG(pri,
576       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
577               pri, RtsFlags.GranFlags.SparkPriority, node, name));
578     return ((rtsSpark*)NULL);
579   }
580
581   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
582   newspark->prev = newspark->next = (rtsSpark*)NULL;
583   newspark->node = node;
584   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
585   newspark->gran_info = pri;
586   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
587
588   if (RtsFlags.GranFlags.GranSimStats.Global) {
589     globalGranStats.tot_sparks_created++;
590     globalGranStats.sparks_created_on_PE[CurrentProc]++;
591   }
592
593   return(newspark);
594 }
595
596 void
597 disposeSpark(spark)
598 rtsSpark *spark;
599 {
600   ASSERT(spark!=NULL);
601   stgFree(spark);
602 }
603
604 void 
605 disposeSparkQ(spark)
606 rtsSparkQ spark;
607 {
608   if (spark==NULL) 
609     return;
610
611   disposeSparkQ(spark->next);
612
613 # ifdef GRAN_CHECK
614   if (SparksAvail < 0) {
615     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
616     print_spark(spark);
617   }
618 # endif
619
620   stgFree(spark);
621 }
622
623 /*
624    With PrioritySparking add_to_spark_queue performs an insert sort to keep
625    the spark queue sorted. Otherwise the spark is just added to the end of
626    the queue. 
627 */
628
629 void
630 add_to_spark_queue(spark)
631 rtsSpark *spark;
632 {
633   rtsSpark *prev = NULL, *next = NULL;
634   nat count = 0;
635   rtsBool found = rtsFalse;
636
637   if ( spark == (rtsSpark *)NULL ) {
638     return;
639   }
640
641   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
642     /* Priority sparking is enabled i.e. spark queues must be sorted */
643
644     for (prev = NULL, next = pending_sparks_hd, count=0;
645          (next != NULL) && 
646          !(found = (spark->gran_info >= next->gran_info));
647          prev = next, next = next->next, count++) 
648      {}
649
650   } else {   /* 'utQo' */
651     /* Priority sparking is disabled */
652     
653     found = rtsFalse;   /* to add it at the end */
654
655   }
656
657   if (found) {
658     /* next points to the first spark with a gran_info smaller than that
659        of spark; therefore, add spark before next into the spark queue */
660     spark->next = next;
661     if ( next == NULL ) {
662       pending_sparks_tl = spark;
663     } else {
664       next->prev = spark;
665     }
666     spark->prev = prev;
667     if ( prev == NULL ) {
668       pending_sparks_hd = spark;
669     } else {
670       prev->next = spark;
671     }
672   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
673     /* add the spark at the end of the spark queue */
674     spark->next = NULL;                        
675     spark->prev = pending_sparks_tl;
676     if (pending_sparks_hd == NULL)
677       pending_sparks_hd = spark;
678     else
679       pending_sparks_tl->next = spark;
680     pending_sparks_tl = spark;    
681   } 
682   ++SparksAvail;
683
684   /* add costs for search in priority sparking */
685   if (RtsFlags.GranFlags.DoPrioritySparking) {
686     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
687   }
688
689   IF_GRAN_DEBUG(checkSparkQ,
690                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
691                       spark, spark->node, CurrentProc);
692                 print_sparkq_stats());
693
694 #  if defined(GRAN_CHECK)
695   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
696     for (prev = NULL, next =  pending_sparks_hd;
697          (next != NULL);
698          prev = next, next = next->next) 
699       {}
700     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
701       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
702               spark,CurrentProc, 
703               pending_sparks_tl, prev);
704   }
705 #  endif
706
707 #  if defined(GRAN_CHECK)
708   /* Check if the sparkq is still sorted. Just for testing, really!  */
709   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
710        RtsFlags.GranFlags.Debug.pri ) {
711     rtsBool sorted = rtsTrue;
712     rtsSpark *prev, *next;
713
714     if (pending_sparks_hd == NULL ||
715         pending_sparks_hd->next == NULL ) {
716       /* just 1 elem => ok */
717     } else {
718       for (prev = pending_sparks_hd,
719            next = pending_sparks_hd->next;
720            (next != NULL) ;
721            prev = next, next = next->next) {
722         sorted = sorted && 
723                  (prev->gran_info >= next->gran_info);
724       }
725     }
726     if (!sorted) {
727       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
728               CurrentProc);
729       print_sparkq(CurrentProc);
730     }
731   }
732 #  endif
733 }
734
735 nat
736 spark_queue_len(proc) 
737 PEs proc;
738 {
739  rtsSpark *prev, *spark;                     /* prev only for testing !! */
740  nat len;
741
742  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
743       spark != NULL; 
744       len++, prev = spark, spark = spark->next)
745    {}
746
747 #  if defined(GRAN_CHECK)
748   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
749     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
750       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
751               proc, pending_sparks_tls[proc], prev);
752 #  endif
753
754  return (len);
755 }
756
757 /* 
758    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
759    hd and tl pointers of the spark queue. Returns a pointer to the next
760    spark in the queue.
761 */
762 rtsSpark *
763 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
764 rtsSpark *spark;
765 PEs p;
766 rtsBool dispose_too;
767 {
768   rtsSpark *new_spark;
769
770   if (spark==NULL) 
771     barf("delete_from_sparkq: trying to delete NULL spark\n");
772
773 #  if defined(GRAN_CHECK)
774   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
775     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
776             pending_sparks_hd, pending_sparks_tl,
777             spark->prev, spark, spark->next, 
778             (spark->next==NULL ? 0 : spark->next->prev));
779   }
780 #  endif
781
782   if (spark->prev==NULL) {
783     /* spark is first spark of queue => adjust hd pointer */
784     ASSERT(pending_sparks_hds[p]==spark);
785     pending_sparks_hds[p] = spark->next;
786   } else {
787     spark->prev->next = spark->next;
788   }
789   if (spark->next==NULL) {
790     ASSERT(pending_sparks_tls[p]==spark);
791     /* spark is first spark of queue => adjust tl pointer */
792     pending_sparks_tls[p] = spark->prev;
793   } else {
794     spark->next->prev = spark->prev;
795   }
796   new_spark = spark->next;
797   
798 #  if defined(GRAN_CHECK)
799   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
800     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
801             pending_sparks_hd, pending_sparks_tl,
802             spark->prev, spark, spark->next, 
803             (spark->next==NULL ? 0 : spark->next->prev), spark);
804   }
805 #  endif
806
807   if (dispose_too)
808     disposeSpark(spark);
809                   
810   return new_spark;
811 }
812
813 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
814 void
815 markSparkQueue(void)
816
817   StgClosure *MarkRoot(StgClosure *root); // prototype
818   PEs p;
819   rtsSpark *sp;
820
821   for (p=0; p<RtsFlags.GranFlags.proc; p++)
822     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
823       ASSERT(sp->node!=NULL);
824       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
825       // ToDo?: statistics gathering here (also for GUM!)
826       sp->node = (StgClosure *)MarkRoot(sp->node);
827     }
828   IF_DEBUG(gc,
829            debugBelch("@@ markSparkQueue: spark statistics at start of GC:");
830            print_sparkq_stats());
831 }
832
833 void
834 print_spark(spark)
835 rtsSpark *spark;
836
837   char str[16];
838
839   if (spark==NULL) {
840     debugBelch("Spark: NIL\n");
841     return;
842   } else {
843     sprintf(str,
844             ((spark->node==NULL) ? "______" : "%#6lx"), 
845             stgCast(StgPtr,spark->node));
846
847     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
848             str, spark->name, 
849             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
850             spark->prev, spark->next);
851   }
852 }
853
854 void
855 print_sparkq(proc)
856 PEs proc;
857 // rtsSpark *hd;
858 {
859   rtsSpark *x = pending_sparks_hds[proc];
860
861   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
862   for (; x!=(rtsSpark*)NULL; x=x->next) {
863     print_spark(x);
864   }
865 }
866
867 /* 
868    Print a statistics of all spark queues.
869 */
870 void
871 print_sparkq_stats(void)
872 {
873   PEs p;
874
875   debugBelch("SparkQs: [");
876   for (p=0; p<RtsFlags.GranFlags.proc; p++)
877     debugBelch(", PE %d: %d", p, spark_queue_len(p));
878   debugBelch("\n");
879 }
880
881 #endif