Refactor the spark queue implementation into a generic work-stealing deque
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2008
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 #include "Trace.h"
18 #include "Prelude.h"
19
20 #include "SMP.h" // for cas
21
22 #include "Sparks.h"
23
24 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
25
26 void
27 initSparkPools( void )
28 {
29 #ifdef THREADED_RTS
30     /* walk over the capabilities, allocating a spark pool for each one */
31     nat i;
32     for (i = 0; i < n_capabilities; i++) {
33       capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
34     }
35 #else
36     /* allocate a single spark pool */
37     MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
38 #endif
39 }
40
41 void
42 freeSparkPool (SparkPool *pool)
43 {
44     freeWSDeque(pool);
45 }
46
47 /* -----------------------------------------------------------------------------
48  * 
49  * Turn a spark into a real thread
50  *
51  * -------------------------------------------------------------------------- */
52
53 void
54 createSparkThread (Capability *cap)
55 {
56     StgTSO *tso;
57
58     tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
59                           &base_GHCziConc_runSparks_closure);
60     appendToRunQueue(cap,tso);
61 }
62
63 /* --------------------------------------------------------------------------
64  * newSpark: create a new spark, as a result of calling "par"
65  * Called directly from STG.
66  * -------------------------------------------------------------------------- */
67
68 StgInt
69 newSpark (StgRegTable *reg, StgClosure *p)
70 {
71     Capability *cap = regTableToCapability(reg);
72     SparkPool *pool = cap->sparks;
73
74     /* I am not sure whether this is the right thing to do.
75      * Maybe it is better to exploit the tag information
76      * instead of throwing it away?
77      */
78     p = UNTAG_CLOSURE(p);
79
80     if (closure_SHOULD_SPARK(p)) {
81         pushWSDeque(pool,p);
82     }   
83
84     cap->sparks_created++;
85
86     return 1;
87 }
88
89 /* -----------------------------------------------------------------------------
90  * 
91  * tryStealSpark: try to steal a spark from a Capability.
92  *
93  * Returns a valid spark, or NULL if the pool was empty, and can
94  * occasionally return NULL if there was a race with another thread
95  * stealing from the same pool.  In this case, try again later.
96  *
97  -------------------------------------------------------------------------- */
98
99 StgClosure *
100 tryStealSpark (Capability *cap)
101 {
102   SparkPool *pool = cap->sparks;
103   StgClosure *stolen;
104
105   do { 
106       stolen = stealWSDeque_(pool); 
107       // use the no-loopy version, stealWSDeque_(), since if we get a
108       // spurious NULL here the caller may want to try stealing from
109       // other pools before trying again.
110   } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
111
112   return stolen;
113 }
114
115 /* --------------------------------------------------------------------------
116  * Remove all sparks from the spark queues which should not spark any
117  * more.  Called after GC. We assume exclusive access to the structure
118  * and replace  all sparks in the queue, see explanation below. At exit,
119  * the spark pool only contains sparkable closures.
120  * -------------------------------------------------------------------------- */
121
122 void
123 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
124
125     SparkPool *pool;
126     StgClosurePtr spark, tmp, *elements;
127     nat n, pruned_sparks; // stats only
128     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
129     const StgInfoTable *info;
130     
131     PAR_TICKY_MARK_SPARK_QUEUE_START();
132     
133     n = 0;
134     pruned_sparks = 0;
135     
136     pool = cap->sparks;
137     
138     // it is possible that top > bottom, indicating an empty pool.  We
139     // fix that here; this is only necessary because the loop below
140     // assumes it.
141     if (pool->top > pool->bottom)
142         pool->top = pool->bottom;
143
144     // Take this opportunity to reset top/bottom modulo the size of
145     // the array, to avoid overflow.  This is only possible because no
146     // stealing is happening during GC.
147     pool->bottom  -= pool->top & ~pool->moduloSize;
148     pool->top     &= pool->moduloSize;
149     pool->topBound = pool->top;
150
151     debugTrace(DEBUG_sched,
152                "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
153                sparkPoolSize(pool), pool->bottom, pool->top);
154
155     ASSERT_WSDEQUE_INVARIANTS(pool);
156
157     elements = (StgClosurePtr *)pool->elements;
158
159     /* We have exclusive access to the structure here, so we can reset
160        bottom and top counters, and prune invalid sparks. Contents are
161        copied in-place if they are valuable, otherwise discarded. The
162        routine uses "real" indices t and b, starts by computing them
163        as the modulus size of top and bottom,
164
165        Copying:
166
167        At the beginning, the pool structure can look like this:
168        ( bottom % size >= top % size , no wrap-around)
169                   t          b
170        ___________***********_________________
171
172        or like this ( bottom % size < top % size, wrap-around )
173                   b         t
174        ***********__________******************
175        As we need to remove useless sparks anyway, we make one pass
176        between t and b, moving valuable content to b and subsequent
177        cells (wrapping around when the size is reached).
178
179                      b      t
180        ***********OOO_______XX_X__X?**********
181                      ^____move?____/
182
183        After this movement, botInd becomes the new bottom, and old
184        bottom becomes the new top index, both as indices in the array
185        size range.
186     */
187     // starting here
188     currInd = (pool->top) & (pool->moduloSize); // mod
189
190     // copies of evacuated closures go to space from botInd on
191     // we keep oldBotInd to know when to stop
192     oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
193
194     // on entry to loop, we are within the bounds
195     ASSERT( currInd < pool->size && botInd  < pool->size );
196
197     while (currInd != oldBotInd ) {
198       /* must use != here, wrap-around at size
199          subtle: loop not entered if queue empty
200        */
201
202       /* check element at currInd. if valuable, evacuate and move to
203          botInd, otherwise move on */
204       spark = elements[currInd];
205
206       // We have to be careful here: in the parallel GC, another
207       // thread might evacuate this closure while we're looking at it,
208       // so grab the info pointer just once.
209       info = spark->header.info;
210       if (IS_FORWARDING_PTR(info)) {
211           tmp = (StgClosure*)UN_FORWARDING_PTR(info);
212           /* if valuable work: shift inside the pool */
213           if (closure_SHOULD_SPARK(tmp)) {
214               elements[botInd] = tmp; // keep entry (new address)
215               botInd++;
216               n++;
217           } else {
218               pruned_sparks++; // discard spark
219               cap->sparks_pruned++;
220           }
221       } else {
222           if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
223               elements[botInd] = spark; // keep entry (new address)
224               evac (user, &elements[botInd]);
225               botInd++;
226               n++;
227           } else {
228               pruned_sparks++; // discard spark
229               cap->sparks_pruned++;
230           }
231       }
232       currInd++;
233
234       // in the loop, we may reach the bounds, and instantly wrap around
235       ASSERT( currInd <= pool->size && botInd <= pool->size );
236       if ( currInd == pool->size ) { currInd = 0; }
237       if ( botInd == pool->size )  { botInd = 0;  }
238
239     } // while-loop over spark pool elements
240
241     ASSERT(currInd == oldBotInd);
242
243     pool->top = oldBotInd; // where we started writing
244     pool->topBound = pool->top;
245
246     pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
247     // first free place we did not use (corrected by wraparound)
248
249     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
250
251     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
252     
253     debugTrace(DEBUG_sched,
254                "new spark queue len=%ld; (hd=%ld; tl=%ld)",
255                sparkPoolSize(pool), pool->bottom, pool->top);
256
257     ASSERT_WSDEQUE_INVARIANTS(pool);
258 }
259
260 /* GC for the spark pool, called inside Capability.c for all
261    capabilities in turn. Blindly "evac"s complete spark pool. */
262 void
263 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
264 {
265     StgClosure **sparkp;
266     SparkPool *pool;
267     StgWord top,bottom, modMask;
268     
269     pool = cap->sparks;
270
271     ASSERT_WSDEQUE_INVARIANTS(pool);
272
273     top = pool->top;
274     bottom = pool->bottom;
275     sparkp = (StgClosurePtr*)pool->elements;
276     modMask = pool->moduloSize;
277
278     while (top < bottom) {
279     /* call evac for all closures in range (wrap-around via modulo)
280      * In GHC-6.10, evac takes an additional 1st argument to hold a
281      * GC-specific register, see rts/sm/GC.c::mark_root()
282      */
283       evac( user , sparkp + (top & modMask) ); 
284       top++;
285     }
286
287     debugTrace(DEBUG_sched,
288                "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
289                sparkPoolSize(pool), pool->bottom, pool->top);
290 }
291
292 /* ----------------------------------------------------------------------------
293  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
294  * capabilities) and its size. Accesses all spark pools and equally
295  * distributes the sparks among them.
296  *
297  * Could be called after GC, before Cap. release, from scheduler. 
298  * -------------------------------------------------------------------------- */
299 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
300
301 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
302                            Capability caps[] STG_UNUSED) {
303   barf("not implemented");
304 }
305
306 #else
307
308 StgInt
309 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
310 {
311     /* nothing */
312     return 1;
313 }
314
315
316 #endif /* PARALLEL_HASKELL || THREADED_RTS */
317
318
319 /* -----------------------------------------------------------------------------
320  * 
321  * GRAN & PARALLEL_HASKELL stuff beyond here.
322  *
323  *  TODO "nuke" this!
324  *
325  * -------------------------------------------------------------------------- */
326
327 #if defined(PARALLEL_HASKELL) || defined(GRAN)
328
329 static void slide_spark_pool( StgSparkPool *pool );
330
331 rtsBool
332 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
333 {
334   if (pool->tl == pool->lim)
335     slide_spark_pool(pool);
336
337   if (closure_SHOULD_SPARK(closure) && 
338       pool->tl < pool->lim) {
339     *(pool->tl++) = closure;
340
341 #if defined(PARALLEL_HASKELL)
342     // collect parallel global statistics (currently done together with GC stats)
343     if (RtsFlags.ParFlags.ParStats.Global &&
344         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
345       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
346       globalParStats.tot_sparks_created++;
347     }
348 #endif
349     return rtsTrue;
350   } else {
351 #if defined(PARALLEL_HASKELL)
352     // collect parallel global statistics (currently done together with GC stats)
353     if (RtsFlags.ParFlags.ParStats.Global &&
354         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
355       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
356       globalParStats.tot_sparks_ignored++;
357     }
358 #endif
359     return rtsFalse;
360   }
361 }
362
363 static void
364 slide_spark_pool( StgSparkPool *pool )
365 {
366   StgClosure **sparkp, **to_sparkp;
367
368   sparkp = pool->hd;
369   to_sparkp = pool->base;
370   while (sparkp < pool->tl) {
371     ASSERT(to_sparkp<=sparkp);
372     ASSERT(*sparkp!=NULL);
373     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
374
375     if (closure_SHOULD_SPARK(*sparkp)) {
376       *to_sparkp++ = *sparkp++;
377     } else {
378       sparkp++;
379     }
380   }
381   pool->hd = pool->base;
382   pool->tl = to_sparkp;
383 }
384
385 void
386 disposeSpark(spark)
387 StgClosure *spark;
388 {
389 #if !defined(THREADED_RTS)
390   Capability *cap;
391   StgSparkPool *pool;
392
393   cap = &MainRegTable;
394   pool = &(cap->rSparks);
395   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
396 #endif
397   ASSERT(spark != (StgClosure *)NULL);
398   /* Do nothing */
399 }
400
401
402 #elif defined(GRAN)
403
404 /* 
405    Search the spark queue of the proc in event for a spark that's worth
406    turning into a thread 
407    (was gimme_spark in the old RTS)
408 */
409 void
410 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
411 {
412    PEs proc = event->proc,       /* proc to search for work */
413        creator = event->creator; /* proc that requested work */
414    StgClosure* node;
415    rtsBool found;
416    rtsSparkQ spark_of_non_local_node = NULL, 
417              spark_of_non_local_node_prev = NULL, 
418              low_priority_spark = NULL, 
419              low_priority_spark_prev = NULL,
420              spark = NULL, prev = NULL;
421   
422    /* Choose a spark from the local spark queue */
423    prev = (rtsSpark*)NULL;
424    spark = pending_sparks_hds[proc];
425    found = rtsFalse;
426
427    // ToDo: check this code & implement local sparking !! -- HWL  
428    while (!found && spark != (rtsSpark*)NULL)
429      {
430        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
431               (prev==(rtsSpark*)NULL || prev->next==spark) &&
432               (spark->prev==prev));
433        node = spark->node;
434        if (!closure_SHOULD_SPARK(node)) 
435          {
436            IF_GRAN_DEBUG(checkSparkQ,
437                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
438                                spark, node));
439
440            if (RtsFlags.GranFlags.GranSimStats.Sparks)
441              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
442                               spark->node, spark->name, spark_queue_len(proc));
443   
444            ASSERT(spark != (rtsSpark*)NULL);
445            ASSERT(SparksAvail>0);
446            --SparksAvail;
447
448            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
449            spark = delete_from_sparkq (spark, proc, rtsTrue);
450            if (spark != (rtsSpark*)NULL)
451              prev = spark->prev;
452            continue;
453          }
454        /* -- node should eventually be sparked */
455        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
456                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
457          {
458            barf("Local sparking not yet implemented");
459
460            /* Remember first low priority spark */
461            if (spark_of_non_local_node==(rtsSpark*)NULL) {
462              spark_of_non_local_node_prev = prev;
463              spark_of_non_local_node = spark;
464               }
465   
466            if (spark->next == (rtsSpark*)NULL) { 
467              /* ASSERT(spark==SparkQueueTl);  just for testing */
468              prev = spark_of_non_local_node_prev;
469              spark = spark_of_non_local_node;
470              found = rtsTrue;
471              break;
472            }
473   
474 # if defined(GRAN) && defined(GRAN_CHECK)
475            /* Should never happen; just for testing 
476            if (spark==pending_sparks_tl) {
477              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
478                 stg_exit(EXIT_FAILURE);
479                 } */
480 # endif
481            prev = spark; 
482            spark = spark->next;
483            ASSERT(SparksAvail>0);
484            --SparksAvail;
485            continue;
486          }
487        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
488                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
489          {
490            if (RtsFlags.GranFlags.DoPrioritySparking)
491              barf("Priority sparking not yet implemented");
492
493            found = rtsTrue;
494          }
495 #if 0      
496        else /* only used if SparkPriority2 is defined */
497          {
498            /* ToDo: fix the code below and re-integrate it */
499            /* Remember first low priority spark */
500            if (low_priority_spark==(rtsSpark*)NULL) { 
501              low_priority_spark_prev = prev;
502              low_priority_spark = spark;
503            }
504   
505            if (spark->next == (rtsSpark*)NULL) { 
506                 /* ASSERT(spark==spark_queue_tl);  just for testing */
507              prev = low_priority_spark_prev;
508              spark = low_priority_spark;
509              found = rtsTrue;       /* take low pri spark => rc is 2  */
510              break;
511            }
512   
513            /* Should never happen; just for testing 
514            if (spark==pending_sparks_tl) {
515              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
516                 stg_exit(EXIT_FAILURE);
517              break;
518            } */                
519            prev = spark; 
520            spark = spark->next;
521
522            IF_GRAN_DEBUG(pri,
523                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
524                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
525                                spark->node, spark->name);)
526            }
527 #endif
528    }  /* while (spark!=NULL && !found) */
529
530    *spark_res = spark;
531    *found_res = found;
532 }
533
534 /*
535   Turn the spark into a thread.
536   In GranSim this basically means scheduling a StartThread event for the
537   node pointed to by the spark at some point in the future.
538   (was munch_spark in the old RTS)
539 */
540 rtsBool
541 activateSpark (rtsEvent *event, rtsSparkQ spark) 
542 {
543   PEs proc = event->proc,       /* proc to search for work */
544       creator = event->creator; /* proc that requested work */
545   StgTSO* tso;
546   StgClosure* node;
547   rtsTime spark_arrival_time;
548
549   /* 
550      We've found a node on PE proc requested by PE creator.
551      If proc==creator we can turn the spark into a thread immediately;
552      otherwise we schedule a MoveSpark event on the requesting PE
553   */
554      
555   /* DaH Qu' yIchen */
556   if (proc!=creator) { 
557
558     /* only possible if we simulate GUM style fishing */
559     ASSERT(RtsFlags.GranFlags.Fishing);
560
561     /* Message packing costs for sending a Fish; qeq jabbI'ID */
562     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
563   
564     if (RtsFlags.GranFlags.GranSimStats.Sparks)
565       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
566                        (StgTSO*)NULL, spark->node,
567                        spark->name, spark_queue_len(proc));
568
569     /* time of the spark arrival on the remote PE */
570     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
571
572     new_event(creator, proc, spark_arrival_time,
573               MoveSpark,
574               (StgTSO*)NULL, spark->node, spark);
575
576     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
577             
578   } else { /* proc==creator i.e. turn the spark into a thread */
579
580     if ( RtsFlags.GranFlags.GranSimStats.Global && 
581          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
582
583       globalGranStats.tot_low_pri_sparks++;
584       IF_GRAN_DEBUG(pri,
585                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
586                           spark->gran_info, 
587                           spark->node, spark->name));
588     } 
589     
590     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
591     
592     node = spark->node;
593     
594 # if 0
595     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
596     if (GARBAGE COLLECTION IS NECESSARY) {
597       /* Some kind of backoff needed here in case there's too little heap */
598 #  if defined(GRAN_CHECK) && defined(GRAN)
599       if (RtsFlags.GcFlags.giveStats)
600         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
601                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
602                 spark, node, spark->name);
603 #  endif
604       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
605                   FindWork,
606                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
607       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
608       GarbageCollect(GetRoots, rtsFalse);
609       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
610       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
611       spark = NULL;
612       return; /* was: continue; */ /* to the next event, eventually */
613     }
614 # endif
615     
616     if (RtsFlags.GranFlags.GranSimStats.Sparks)
617       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
618                        spark->node, spark->name,
619                        spark_queue_len(CurrentProc));
620     
621     new_event(proc, proc, CurrentTime[proc],
622               StartThread, 
623               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
624     
625     procStatus[proc] = Starting;
626   }
627 }
628
629 /* -------------------------------------------------------------------------
630    This is the main point where handling granularity information comes into
631    play. 
632    ------------------------------------------------------------------------- */
633
634 #define MAX_RAND_PRI    100
635
636 /* 
637    Granularity info transformers. 
638    Applied to the GRAN_INFO field of a spark.
639 */
640 STATIC_INLINE nat  ID(nat x) { return(x); };
641 STATIC_INLINE nat  INV(nat x) { return(-x); };
642 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
643 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
644
645 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
646 rtsSpark *
647 newSpark(node,name,gran_info,size_info,par_info,local)
648 StgClosure *node;
649 nat name, gran_info, size_info, par_info, local;
650 {
651   nat pri;
652   rtsSpark *newspark;
653
654   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
655         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
656         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
657                            ID(gran_info);
658
659   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
660        pri<RtsFlags.GranFlags.SparkPriority ) {
661     IF_GRAN_DEBUG(pri,
662       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
663               pri, RtsFlags.GranFlags.SparkPriority, node, name));
664     return ((rtsSpark*)NULL);
665   }
666
667   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
668   newspark->prev = newspark->next = (rtsSpark*)NULL;
669   newspark->node = node;
670   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
671   newspark->gran_info = pri;
672   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
673
674   if (RtsFlags.GranFlags.GranSimStats.Global) {
675     globalGranStats.tot_sparks_created++;
676     globalGranStats.sparks_created_on_PE[CurrentProc]++;
677   }
678
679   return(newspark);
680 }
681
682 void
683 disposeSpark(spark)
684 rtsSpark *spark;
685 {
686   ASSERT(spark!=NULL);
687   stgFree(spark);
688 }
689
690 void 
691 disposeSparkQ(spark)
692 rtsSparkQ spark;
693 {
694   if (spark==NULL) 
695     return;
696
697   disposeSparkQ(spark->next);
698
699 # ifdef GRAN_CHECK
700   if (SparksAvail < 0) {
701     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
702     print_spark(spark);
703   }
704 # endif
705
706   stgFree(spark);
707 }
708
709 /*
710    With PrioritySparking add_to_spark_queue performs an insert sort to keep
711    the spark queue sorted. Otherwise the spark is just added to the end of
712    the queue. 
713 */
714
715 void
716 add_to_spark_queue(spark)
717 rtsSpark *spark;
718 {
719   rtsSpark *prev = NULL, *next = NULL;
720   nat count = 0;
721   rtsBool found = rtsFalse;
722
723   if ( spark == (rtsSpark *)NULL ) {
724     return;
725   }
726
727   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
728     /* Priority sparking is enabled i.e. spark queues must be sorted */
729
730     for (prev = NULL, next = pending_sparks_hd, count=0;
731          (next != NULL) && 
732          !(found = (spark->gran_info >= next->gran_info));
733          prev = next, next = next->next, count++) 
734      {}
735
736   } else {   /* 'utQo' */
737     /* Priority sparking is disabled */
738     
739     found = rtsFalse;   /* to add it at the end */
740
741   }
742
743   if (found) {
744     /* next points to the first spark with a gran_info smaller than that
745        of spark; therefore, add spark before next into the spark queue */
746     spark->next = next;
747     if ( next == NULL ) {
748       pending_sparks_tl = spark;
749     } else {
750       next->prev = spark;
751     }
752     spark->prev = prev;
753     if ( prev == NULL ) {
754       pending_sparks_hd = spark;
755     } else {
756       prev->next = spark;
757     }
758   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
759     /* add the spark at the end of the spark queue */
760     spark->next = NULL;                        
761     spark->prev = pending_sparks_tl;
762     if (pending_sparks_hd == NULL)
763       pending_sparks_hd = spark;
764     else
765       pending_sparks_tl->next = spark;
766     pending_sparks_tl = spark;    
767   } 
768   ++SparksAvail;
769
770   /* add costs for search in priority sparking */
771   if (RtsFlags.GranFlags.DoPrioritySparking) {
772     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
773   }
774
775   IF_GRAN_DEBUG(checkSparkQ,
776                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
777                       spark, spark->node, CurrentProc);
778                 print_sparkq_stats());
779
780 #  if defined(GRAN_CHECK)
781   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
782     for (prev = NULL, next =  pending_sparks_hd;
783          (next != NULL);
784          prev = next, next = next->next) 
785       {}
786     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
787       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
788               spark,CurrentProc, 
789               pending_sparks_tl, prev);
790   }
791 #  endif
792
793 #  if defined(GRAN_CHECK)
794   /* Check if the sparkq is still sorted. Just for testing, really!  */
795   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
796        RtsFlags.GranFlags.Debug.pri ) {
797     rtsBool sorted = rtsTrue;
798     rtsSpark *prev, *next;
799
800     if (pending_sparks_hd == NULL ||
801         pending_sparks_hd->next == NULL ) {
802       /* just 1 elem => ok */
803     } else {
804       for (prev = pending_sparks_hd,
805            next = pending_sparks_hd->next;
806            (next != NULL) ;
807            prev = next, next = next->next) {
808         sorted = sorted && 
809                  (prev->gran_info >= next->gran_info);
810       }
811     }
812     if (!sorted) {
813       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
814               CurrentProc);
815       print_sparkq(CurrentProc);
816     }
817   }
818 #  endif
819 }
820
821 nat
822 spark_queue_len(proc) 
823 PEs proc;
824 {
825  rtsSpark *prev, *spark;                     /* prev only for testing !! */
826  nat len;
827
828  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
829       spark != NULL; 
830       len++, prev = spark, spark = spark->next)
831    {}
832
833 #  if defined(GRAN_CHECK)
834   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
835     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
836       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
837               proc, pending_sparks_tls[proc], prev);
838 #  endif
839
840  return (len);
841 }
842
843 /* 
844    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
845    hd and tl pointers of the spark queue. Returns a pointer to the next
846    spark in the queue.
847 */
848 rtsSpark *
849 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
850 rtsSpark *spark;
851 PEs p;
852 rtsBool dispose_too;
853 {
854   rtsSpark *new_spark;
855
856   if (spark==NULL) 
857     barf("delete_from_sparkq: trying to delete NULL spark\n");
858
859 #  if defined(GRAN_CHECK)
860   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
861     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
862             pending_sparks_hd, pending_sparks_tl,
863             spark->prev, spark, spark->next, 
864             (spark->next==NULL ? 0 : spark->next->prev));
865   }
866 #  endif
867
868   if (spark->prev==NULL) {
869     /* spark is first spark of queue => adjust hd pointer */
870     ASSERT(pending_sparks_hds[p]==spark);
871     pending_sparks_hds[p] = spark->next;
872   } else {
873     spark->prev->next = spark->next;
874   }
875   if (spark->next==NULL) {
876     ASSERT(pending_sparks_tls[p]==spark);
877     /* spark is first spark of queue => adjust tl pointer */
878     pending_sparks_tls[p] = spark->prev;
879   } else {
880     spark->next->prev = spark->prev;
881   }
882   new_spark = spark->next;
883   
884 #  if defined(GRAN_CHECK)
885   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
886     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
887             pending_sparks_hd, pending_sparks_tl,
888             spark->prev, spark, spark->next, 
889             (spark->next==NULL ? 0 : spark->next->prev), spark);
890   }
891 #  endif
892
893   if (dispose_too)
894     disposeSpark(spark);
895                   
896   return new_spark;
897 }
898
899 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
900 void
901 markSparkQueue(void)
902
903   StgClosure *MarkRoot(StgClosure *root); // prototype
904   PEs p;
905   rtsSpark *sp;
906
907   for (p=0; p<RtsFlags.GranFlags.proc; p++)
908     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
909       ASSERT(sp->node!=NULL);
910       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
911       // ToDo?: statistics gathering here (also for GUM!)
912       sp->node = (StgClosure *)MarkRoot(sp->node);
913     }
914
915   IF_DEBUG(gc,
916            debugBelch("markSparkQueue: spark statistics at start of GC:");
917            print_sparkq_stats());
918 }
919
920 void
921 print_spark(spark)
922 rtsSpark *spark;
923
924   char str[16];
925
926   if (spark==NULL) {
927     debugBelch("Spark: NIL\n");
928     return;
929   } else {
930     sprintf(str,
931             ((spark->node==NULL) ? "______" : "%#6lx"), 
932             stgCast(StgPtr,spark->node));
933
934     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
935             str, spark->name, 
936             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
937             spark->prev, spark->next);
938   }
939 }
940
941 void
942 print_sparkq(proc)
943 PEs proc;
944 // rtsSpark *hd;
945 {
946   rtsSpark *x = pending_sparks_hds[proc];
947
948   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
949   for (; x!=(rtsSpark*)NULL; x=x->next) {
950     print_spark(x);
951   }
952 }
953
954 /* 
955    Print a statistics of all spark queues.
956 */
957 void
958 print_sparkq_stats(void)
959 {
960   PEs p;
961
962   debugBelch("SparkQs: [");
963   for (p=0; p<RtsFlags.GranFlags.proc; p++)
964     debugBelch(", PE %d: %d", p, spark_queue_len(p));
965   debugBelch("\n");
966 }
967
968 #endif