Do not link ghc stage1 using -threaded, only for stage2 or 3
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2008
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  -------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "Storage.h"
12 #include "Schedule.h"
13 #include "SchedAPI.h"
14 #include "RtsFlags.h"
15 #include "RtsUtils.h"
16 #include "ParTicky.h"
17 #include "Trace.h"
18 #include "Prelude.h"
19
20 #include "SMP.h" // for cas
21
22 #include "Sparks.h"
23
24 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
25
26 void
27 initSparkPools( void )
28 {
29 #ifdef THREADED_RTS
30     /* walk over the capabilities, allocating a spark pool for each one */
31     nat i;
32     for (i = 0; i < n_capabilities; i++) {
33       capabilities[i].sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
34     }
35 #else
36     /* allocate a single spark pool */
37     MainCapability->sparks = newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
38 #endif
39 }
40
41 void
42 freeSparkPool (SparkPool *pool)
43 {
44     freeWSDeque(pool);
45 }
46
47 /* -----------------------------------------------------------------------------
48  * 
49  * Turn a spark into a real thread
50  *
51  * -------------------------------------------------------------------------- */
52
53 void
54 createSparkThread (Capability *cap)
55 {
56     StgTSO *tso;
57
58     tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
59                           &base_GHCziConc_runSparks_closure);
60
61     postEvent(cap, EVENT_CREATE_SPARK_THREAD, 0, tso->id);
62
63     appendToRunQueue(cap,tso);
64 }
65
66 /* --------------------------------------------------------------------------
67  * newSpark: create a new spark, as a result of calling "par"
68  * Called directly from STG.
69  * -------------------------------------------------------------------------- */
70
71 StgInt
72 newSpark (StgRegTable *reg, StgClosure *p)
73 {
74     Capability *cap = regTableToCapability(reg);
75     SparkPool *pool = cap->sparks;
76
77     /* I am not sure whether this is the right thing to do.
78      * Maybe it is better to exploit the tag information
79      * instead of throwing it away?
80      */
81     p = UNTAG_CLOSURE(p);
82
83     if (closure_SHOULD_SPARK(p)) {
84         pushWSDeque(pool,p);
85     }   
86
87     cap->sparks_created++;
88
89     postEvent(cap, EVENT_CREATE_SPARK, cap->r.rCurrentTSO->id, 0);
90
91     return 1;
92 }
93
94 /* -----------------------------------------------------------------------------
95  * 
96  * tryStealSpark: try to steal a spark from a Capability.
97  *
98  * Returns a valid spark, or NULL if the pool was empty, and can
99  * occasionally return NULL if there was a race with another thread
100  * stealing from the same pool.  In this case, try again later.
101  *
102  -------------------------------------------------------------------------- */
103
104 StgClosure *
105 tryStealSpark (Capability *cap)
106 {
107   SparkPool *pool = cap->sparks;
108   StgClosure *stolen;
109
110   do { 
111       stolen = stealWSDeque_(pool); 
112       // use the no-loopy version, stealWSDeque_(), since if we get a
113       // spurious NULL here the caller may want to try stealing from
114       // other pools before trying again.
115   } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
116
117   return stolen;
118 }
119
120 /* --------------------------------------------------------------------------
121  * Remove all sparks from the spark queues which should not spark any
122  * more.  Called after GC. We assume exclusive access to the structure
123  * and replace  all sparks in the queue, see explanation below. At exit,
124  * the spark pool only contains sparkable closures.
125  * -------------------------------------------------------------------------- */
126
127 void
128 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
129
130     SparkPool *pool;
131     StgClosurePtr spark, tmp, *elements;
132     nat n, pruned_sparks; // stats only
133     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
134     const StgInfoTable *info;
135     
136     PAR_TICKY_MARK_SPARK_QUEUE_START();
137     
138     n = 0;
139     pruned_sparks = 0;
140     
141     pool = cap->sparks;
142     
143     // it is possible that top > bottom, indicating an empty pool.  We
144     // fix that here; this is only necessary because the loop below
145     // assumes it.
146     if (pool->top > pool->bottom)
147         pool->top = pool->bottom;
148
149     // Take this opportunity to reset top/bottom modulo the size of
150     // the array, to avoid overflow.  This is only possible because no
151     // stealing is happening during GC.
152     pool->bottom  -= pool->top & ~pool->moduloSize;
153     pool->top     &= pool->moduloSize;
154     pool->topBound = pool->top;
155
156     debugTrace(DEBUG_sched,
157                "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
158                sparkPoolSize(pool), pool->bottom, pool->top);
159
160     ASSERT_WSDEQUE_INVARIANTS(pool);
161
162     elements = (StgClosurePtr *)pool->elements;
163
164     /* We have exclusive access to the structure here, so we can reset
165        bottom and top counters, and prune invalid sparks. Contents are
166        copied in-place if they are valuable, otherwise discarded. The
167        routine uses "real" indices t and b, starts by computing them
168        as the modulus size of top and bottom,
169
170        Copying:
171
172        At the beginning, the pool structure can look like this:
173        ( bottom % size >= top % size , no wrap-around)
174                   t          b
175        ___________***********_________________
176
177        or like this ( bottom % size < top % size, wrap-around )
178                   b         t
179        ***********__________******************
180        As we need to remove useless sparks anyway, we make one pass
181        between t and b, moving valuable content to b and subsequent
182        cells (wrapping around when the size is reached).
183
184                      b      t
185        ***********OOO_______XX_X__X?**********
186                      ^____move?____/
187
188        After this movement, botInd becomes the new bottom, and old
189        bottom becomes the new top index, both as indices in the array
190        size range.
191     */
192     // starting here
193     currInd = (pool->top) & (pool->moduloSize); // mod
194
195     // copies of evacuated closures go to space from botInd on
196     // we keep oldBotInd to know when to stop
197     oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
198
199     // on entry to loop, we are within the bounds
200     ASSERT( currInd < pool->size && botInd  < pool->size );
201
202     while (currInd != oldBotInd ) {
203       /* must use != here, wrap-around at size
204          subtle: loop not entered if queue empty
205        */
206
207       /* check element at currInd. if valuable, evacuate and move to
208          botInd, otherwise move on */
209       spark = elements[currInd];
210
211       // We have to be careful here: in the parallel GC, another
212       // thread might evacuate this closure while we're looking at it,
213       // so grab the info pointer just once.
214       info = spark->header.info;
215       if (IS_FORWARDING_PTR(info)) {
216           tmp = (StgClosure*)UN_FORWARDING_PTR(info);
217           /* if valuable work: shift inside the pool */
218           if (closure_SHOULD_SPARK(tmp)) {
219               elements[botInd] = tmp; // keep entry (new address)
220               botInd++;
221               n++;
222           } else {
223               pruned_sparks++; // discard spark
224               cap->sparks_pruned++;
225           }
226       } else {
227           if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
228               elements[botInd] = spark; // keep entry (new address)
229               evac (user, &elements[botInd]);
230               botInd++;
231               n++;
232           } else {
233               pruned_sparks++; // discard spark
234               cap->sparks_pruned++;
235           }
236       }
237       currInd++;
238
239       // in the loop, we may reach the bounds, and instantly wrap around
240       ASSERT( currInd <= pool->size && botInd <= pool->size );
241       if ( currInd == pool->size ) { currInd = 0; }
242       if ( botInd == pool->size )  { botInd = 0;  }
243
244     } // while-loop over spark pool elements
245
246     ASSERT(currInd == oldBotInd);
247
248     pool->top = oldBotInd; // where we started writing
249     pool->topBound = pool->top;
250
251     pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
252     // first free place we did not use (corrected by wraparound)
253
254     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
255
256     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
257     
258     debugTrace(DEBUG_sched,
259                "new spark queue len=%ld; (hd=%ld; tl=%ld)",
260                sparkPoolSize(pool), pool->bottom, pool->top);
261
262     ASSERT_WSDEQUE_INVARIANTS(pool);
263 }
264
265 /* GC for the spark pool, called inside Capability.c for all
266    capabilities in turn. Blindly "evac"s complete spark pool. */
267 void
268 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
269 {
270     StgClosure **sparkp;
271     SparkPool *pool;
272     StgWord top,bottom, modMask;
273     
274     pool = cap->sparks;
275
276     ASSERT_WSDEQUE_INVARIANTS(pool);
277
278     top = pool->top;
279     bottom = pool->bottom;
280     sparkp = (StgClosurePtr*)pool->elements;
281     modMask = pool->moduloSize;
282
283     while (top < bottom) {
284     /* call evac for all closures in range (wrap-around via modulo)
285      * In GHC-6.10, evac takes an additional 1st argument to hold a
286      * GC-specific register, see rts/sm/GC.c::mark_root()
287      */
288       evac( user , sparkp + (top & modMask) ); 
289       top++;
290     }
291
292     debugTrace(DEBUG_sched,
293                "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
294                sparkPoolSize(pool), pool->bottom, pool->top);
295 }
296
297 /* ----------------------------------------------------------------------------
298  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
299  * capabilities) and its size. Accesses all spark pools and equally
300  * distributes the sparks among them.
301  *
302  * Could be called after GC, before Cap. release, from scheduler. 
303  * -------------------------------------------------------------------------- */
304 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
305
306 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
307                            Capability caps[] STG_UNUSED) {
308   barf("not implemented");
309 }
310
311 #else
312
313 StgInt
314 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
315 {
316     /* nothing */
317     return 1;
318 }
319
320
321 #endif /* PARALLEL_HASKELL || THREADED_RTS */
322
323
324 /* -----------------------------------------------------------------------------
325  * 
326  * GRAN & PARALLEL_HASKELL stuff beyond here.
327  *
328  *  TODO "nuke" this!
329  *
330  * -------------------------------------------------------------------------- */
331
332 #if defined(PARALLEL_HASKELL) || defined(GRAN)
333
334 static void slide_spark_pool( StgSparkPool *pool );
335
336 rtsBool
337 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
338 {
339   if (pool->tl == pool->lim)
340     slide_spark_pool(pool);
341
342   if (closure_SHOULD_SPARK(closure) && 
343       pool->tl < pool->lim) {
344     *(pool->tl++) = closure;
345
346 #if defined(PARALLEL_HASKELL)
347     // collect parallel global statistics (currently done together with GC stats)
348     if (RtsFlags.ParFlags.ParStats.Global &&
349         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
350       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
351       globalParStats.tot_sparks_created++;
352     }
353 #endif
354     return rtsTrue;
355   } else {
356 #if defined(PARALLEL_HASKELL)
357     // collect parallel global statistics (currently done together with GC stats)
358     if (RtsFlags.ParFlags.ParStats.Global &&
359         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
360       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
361       globalParStats.tot_sparks_ignored++;
362     }
363 #endif
364     return rtsFalse;
365   }
366 }
367
368 static void
369 slide_spark_pool( StgSparkPool *pool )
370 {
371   StgClosure **sparkp, **to_sparkp;
372
373   sparkp = pool->hd;
374   to_sparkp = pool->base;
375   while (sparkp < pool->tl) {
376     ASSERT(to_sparkp<=sparkp);
377     ASSERT(*sparkp!=NULL);
378     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
379
380     if (closure_SHOULD_SPARK(*sparkp)) {
381       *to_sparkp++ = *sparkp++;
382     } else {
383       sparkp++;
384     }
385   }
386   pool->hd = pool->base;
387   pool->tl = to_sparkp;
388 }
389
390 void
391 disposeSpark(spark)
392 StgClosure *spark;
393 {
394 #if !defined(THREADED_RTS)
395   Capability *cap;
396   StgSparkPool *pool;
397
398   cap = &MainRegTable;
399   pool = &(cap->rSparks);
400   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
401 #endif
402   ASSERT(spark != (StgClosure *)NULL);
403   /* Do nothing */
404 }
405
406
407 #elif defined(GRAN)
408
409 /* 
410    Search the spark queue of the proc in event for a spark that's worth
411    turning into a thread 
412    (was gimme_spark in the old RTS)
413 */
414 void
415 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
416 {
417    PEs proc = event->proc,       /* proc to search for work */
418        creator = event->creator; /* proc that requested work */
419    StgClosure* node;
420    rtsBool found;
421    rtsSparkQ spark_of_non_local_node = NULL, 
422              spark_of_non_local_node_prev = NULL, 
423              low_priority_spark = NULL, 
424              low_priority_spark_prev = NULL,
425              spark = NULL, prev = NULL;
426   
427    /* Choose a spark from the local spark queue */
428    prev = (rtsSpark*)NULL;
429    spark = pending_sparks_hds[proc];
430    found = rtsFalse;
431
432    // ToDo: check this code & implement local sparking !! -- HWL  
433    while (!found && spark != (rtsSpark*)NULL)
434      {
435        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
436               (prev==(rtsSpark*)NULL || prev->next==spark) &&
437               (spark->prev==prev));
438        node = spark->node;
439        if (!closure_SHOULD_SPARK(node)) 
440          {
441            IF_GRAN_DEBUG(checkSparkQ,
442                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
443                                spark, node));
444
445            if (RtsFlags.GranFlags.GranSimStats.Sparks)
446              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
447                               spark->node, spark->name, spark_queue_len(proc));
448   
449            ASSERT(spark != (rtsSpark*)NULL);
450            ASSERT(SparksAvail>0);
451            --SparksAvail;
452
453            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
454            spark = delete_from_sparkq (spark, proc, rtsTrue);
455            if (spark != (rtsSpark*)NULL)
456              prev = spark->prev;
457            continue;
458          }
459        /* -- node should eventually be sparked */
460        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
461                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
462          {
463            barf("Local sparking not yet implemented");
464
465            /* Remember first low priority spark */
466            if (spark_of_non_local_node==(rtsSpark*)NULL) {
467              spark_of_non_local_node_prev = prev;
468              spark_of_non_local_node = spark;
469               }
470   
471            if (spark->next == (rtsSpark*)NULL) { 
472              /* ASSERT(spark==SparkQueueTl);  just for testing */
473              prev = spark_of_non_local_node_prev;
474              spark = spark_of_non_local_node;
475              found = rtsTrue;
476              break;
477            }
478   
479 # if defined(GRAN) && defined(GRAN_CHECK)
480            /* Should never happen; just for testing 
481            if (spark==pending_sparks_tl) {
482              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
483                 stg_exit(EXIT_FAILURE);
484                 } */
485 # endif
486            prev = spark; 
487            spark = spark->next;
488            ASSERT(SparksAvail>0);
489            --SparksAvail;
490            continue;
491          }
492        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
493                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
494          {
495            if (RtsFlags.GranFlags.DoPrioritySparking)
496              barf("Priority sparking not yet implemented");
497
498            found = rtsTrue;
499          }
500 #if 0      
501        else /* only used if SparkPriority2 is defined */
502          {
503            /* ToDo: fix the code below and re-integrate it */
504            /* Remember first low priority spark */
505            if (low_priority_spark==(rtsSpark*)NULL) { 
506              low_priority_spark_prev = prev;
507              low_priority_spark = spark;
508            }
509   
510            if (spark->next == (rtsSpark*)NULL) { 
511                 /* ASSERT(spark==spark_queue_tl);  just for testing */
512              prev = low_priority_spark_prev;
513              spark = low_priority_spark;
514              found = rtsTrue;       /* take low pri spark => rc is 2  */
515              break;
516            }
517   
518            /* Should never happen; just for testing 
519            if (spark==pending_sparks_tl) {
520              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
521                 stg_exit(EXIT_FAILURE);
522              break;
523            } */                
524            prev = spark; 
525            spark = spark->next;
526
527            IF_GRAN_DEBUG(pri,
528                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
529                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
530                                spark->node, spark->name);)
531            }
532 #endif
533    }  /* while (spark!=NULL && !found) */
534
535    *spark_res = spark;
536    *found_res = found;
537 }
538
539 /*
540   Turn the spark into a thread.
541   In GranSim this basically means scheduling a StartThread event for the
542   node pointed to by the spark at some point in the future.
543   (was munch_spark in the old RTS)
544 */
545 rtsBool
546 activateSpark (rtsEvent *event, rtsSparkQ spark) 
547 {
548   PEs proc = event->proc,       /* proc to search for work */
549       creator = event->creator; /* proc that requested work */
550   StgTSO* tso;
551   StgClosure* node;
552   rtsTime spark_arrival_time;
553
554   /* 
555      We've found a node on PE proc requested by PE creator.
556      If proc==creator we can turn the spark into a thread immediately;
557      otherwise we schedule a MoveSpark event on the requesting PE
558   */
559      
560   /* DaH Qu' yIchen */
561   if (proc!=creator) { 
562
563     /* only possible if we simulate GUM style fishing */
564     ASSERT(RtsFlags.GranFlags.Fishing);
565
566     /* Message packing costs for sending a Fish; qeq jabbI'ID */
567     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
568   
569     if (RtsFlags.GranFlags.GranSimStats.Sparks)
570       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
571                        (StgTSO*)NULL, spark->node,
572                        spark->name, spark_queue_len(proc));
573
574     /* time of the spark arrival on the remote PE */
575     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
576
577     new_event(creator, proc, spark_arrival_time,
578               MoveSpark,
579               (StgTSO*)NULL, spark->node, spark);
580
581     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
582             
583   } else { /* proc==creator i.e. turn the spark into a thread */
584
585     if ( RtsFlags.GranFlags.GranSimStats.Global && 
586          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
587
588       globalGranStats.tot_low_pri_sparks++;
589       IF_GRAN_DEBUG(pri,
590                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
591                           spark->gran_info, 
592                           spark->node, spark->name));
593     } 
594     
595     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
596     
597     node = spark->node;
598     
599 # if 0
600     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
601     if (GARBAGE COLLECTION IS NECESSARY) {
602       /* Some kind of backoff needed here in case there's too little heap */
603 #  if defined(GRAN_CHECK) && defined(GRAN)
604       if (RtsFlags.GcFlags.giveStats)
605         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
606                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
607                 spark, node, spark->name);
608 #  endif
609       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
610                   FindWork,
611                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
612       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
613       GarbageCollect(GetRoots, rtsFalse);
614       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
615       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
616       spark = NULL;
617       return; /* was: continue; */ /* to the next event, eventually */
618     }
619 # endif
620     
621     if (RtsFlags.GranFlags.GranSimStats.Sparks)
622       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
623                        spark->node, spark->name,
624                        spark_queue_len(CurrentProc));
625     
626     new_event(proc, proc, CurrentTime[proc],
627               StartThread, 
628               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
629     
630     procStatus[proc] = Starting;
631   }
632 }
633
634 /* -------------------------------------------------------------------------
635    This is the main point where handling granularity information comes into
636    play. 
637    ------------------------------------------------------------------------- */
638
639 #define MAX_RAND_PRI    100
640
641 /* 
642    Granularity info transformers. 
643    Applied to the GRAN_INFO field of a spark.
644 */
645 STATIC_INLINE nat  ID(nat x) { return(x); };
646 STATIC_INLINE nat  INV(nat x) { return(-x); };
647 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
648 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
649
650 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
651 rtsSpark *
652 newSpark(node,name,gran_info,size_info,par_info,local)
653 StgClosure *node;
654 nat name, gran_info, size_info, par_info, local;
655 {
656   nat pri;
657   rtsSpark *newspark;
658
659   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
660         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
661         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
662                            ID(gran_info);
663
664   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
665        pri<RtsFlags.GranFlags.SparkPriority ) {
666     IF_GRAN_DEBUG(pri,
667       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
668               pri, RtsFlags.GranFlags.SparkPriority, node, name));
669     return ((rtsSpark*)NULL);
670   }
671
672   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
673   newspark->prev = newspark->next = (rtsSpark*)NULL;
674   newspark->node = node;
675   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
676   newspark->gran_info = pri;
677   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
678
679   if (RtsFlags.GranFlags.GranSimStats.Global) {
680     globalGranStats.tot_sparks_created++;
681     globalGranStats.sparks_created_on_PE[CurrentProc]++;
682   }
683
684   return(newspark);
685 }
686
687 void
688 disposeSpark(spark)
689 rtsSpark *spark;
690 {
691   ASSERT(spark!=NULL);
692   stgFree(spark);
693 }
694
695 void 
696 disposeSparkQ(spark)
697 rtsSparkQ spark;
698 {
699   if (spark==NULL) 
700     return;
701
702   disposeSparkQ(spark->next);
703
704 # ifdef GRAN_CHECK
705   if (SparksAvail < 0) {
706     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
707     print_spark(spark);
708   }
709 # endif
710
711   stgFree(spark);
712 }
713
714 /*
715    With PrioritySparking add_to_spark_queue performs an insert sort to keep
716    the spark queue sorted. Otherwise the spark is just added to the end of
717    the queue. 
718 */
719
720 void
721 add_to_spark_queue(spark)
722 rtsSpark *spark;
723 {
724   rtsSpark *prev = NULL, *next = NULL;
725   nat count = 0;
726   rtsBool found = rtsFalse;
727
728   if ( spark == (rtsSpark *)NULL ) {
729     return;
730   }
731
732   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
733     /* Priority sparking is enabled i.e. spark queues must be sorted */
734
735     for (prev = NULL, next = pending_sparks_hd, count=0;
736          (next != NULL) && 
737          !(found = (spark->gran_info >= next->gran_info));
738          prev = next, next = next->next, count++) 
739      {}
740
741   } else {   /* 'utQo' */
742     /* Priority sparking is disabled */
743     
744     found = rtsFalse;   /* to add it at the end */
745
746   }
747
748   if (found) {
749     /* next points to the first spark with a gran_info smaller than that
750        of spark; therefore, add spark before next into the spark queue */
751     spark->next = next;
752     if ( next == NULL ) {
753       pending_sparks_tl = spark;
754     } else {
755       next->prev = spark;
756     }
757     spark->prev = prev;
758     if ( prev == NULL ) {
759       pending_sparks_hd = spark;
760     } else {
761       prev->next = spark;
762     }
763   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
764     /* add the spark at the end of the spark queue */
765     spark->next = NULL;                        
766     spark->prev = pending_sparks_tl;
767     if (pending_sparks_hd == NULL)
768       pending_sparks_hd = spark;
769     else
770       pending_sparks_tl->next = spark;
771     pending_sparks_tl = spark;    
772   } 
773   ++SparksAvail;
774
775   /* add costs for search in priority sparking */
776   if (RtsFlags.GranFlags.DoPrioritySparking) {
777     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
778   }
779
780   IF_GRAN_DEBUG(checkSparkQ,
781                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
782                       spark, spark->node, CurrentProc);
783                 print_sparkq_stats());
784
785 #  if defined(GRAN_CHECK)
786   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
787     for (prev = NULL, next =  pending_sparks_hd;
788          (next != NULL);
789          prev = next, next = next->next) 
790       {}
791     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
792       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
793               spark,CurrentProc, 
794               pending_sparks_tl, prev);
795   }
796 #  endif
797
798 #  if defined(GRAN_CHECK)
799   /* Check if the sparkq is still sorted. Just for testing, really!  */
800   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
801        RtsFlags.GranFlags.Debug.pri ) {
802     rtsBool sorted = rtsTrue;
803     rtsSpark *prev, *next;
804
805     if (pending_sparks_hd == NULL ||
806         pending_sparks_hd->next == NULL ) {
807       /* just 1 elem => ok */
808     } else {
809       for (prev = pending_sparks_hd,
810            next = pending_sparks_hd->next;
811            (next != NULL) ;
812            prev = next, next = next->next) {
813         sorted = sorted && 
814                  (prev->gran_info >= next->gran_info);
815       }
816     }
817     if (!sorted) {
818       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
819               CurrentProc);
820       print_sparkq(CurrentProc);
821     }
822   }
823 #  endif
824 }
825
826 nat
827 spark_queue_len(proc) 
828 PEs proc;
829 {
830  rtsSpark *prev, *spark;                     /* prev only for testing !! */
831  nat len;
832
833  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
834       spark != NULL; 
835       len++, prev = spark, spark = spark->next)
836    {}
837
838 #  if defined(GRAN_CHECK)
839   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
840     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
841       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
842               proc, pending_sparks_tls[proc], prev);
843 #  endif
844
845  return (len);
846 }
847
848 /* 
849    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
850    hd and tl pointers of the spark queue. Returns a pointer to the next
851    spark in the queue.
852 */
853 rtsSpark *
854 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
855 rtsSpark *spark;
856 PEs p;
857 rtsBool dispose_too;
858 {
859   rtsSpark *new_spark;
860
861   if (spark==NULL) 
862     barf("delete_from_sparkq: trying to delete NULL spark\n");
863
864 #  if defined(GRAN_CHECK)
865   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
866     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
867             pending_sparks_hd, pending_sparks_tl,
868             spark->prev, spark, spark->next, 
869             (spark->next==NULL ? 0 : spark->next->prev));
870   }
871 #  endif
872
873   if (spark->prev==NULL) {
874     /* spark is first spark of queue => adjust hd pointer */
875     ASSERT(pending_sparks_hds[p]==spark);
876     pending_sparks_hds[p] = spark->next;
877   } else {
878     spark->prev->next = spark->next;
879   }
880   if (spark->next==NULL) {
881     ASSERT(pending_sparks_tls[p]==spark);
882     /* spark is first spark of queue => adjust tl pointer */
883     pending_sparks_tls[p] = spark->prev;
884   } else {
885     spark->next->prev = spark->prev;
886   }
887   new_spark = spark->next;
888   
889 #  if defined(GRAN_CHECK)
890   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
891     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
892             pending_sparks_hd, pending_sparks_tl,
893             spark->prev, spark, spark->next, 
894             (spark->next==NULL ? 0 : spark->next->prev), spark);
895   }
896 #  endif
897
898   if (dispose_too)
899     disposeSpark(spark);
900                   
901   return new_spark;
902 }
903
904 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
905 void
906 markSparkQueue(void)
907
908   StgClosure *MarkRoot(StgClosure *root); // prototype
909   PEs p;
910   rtsSpark *sp;
911
912   for (p=0; p<RtsFlags.GranFlags.proc; p++)
913     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
914       ASSERT(sp->node!=NULL);
915       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
916       // ToDo?: statistics gathering here (also for GUM!)
917       sp->node = (StgClosure *)MarkRoot(sp->node);
918     }
919
920   IF_DEBUG(gc,
921            debugBelch("markSparkQueue: spark statistics at start of GC:");
922            print_sparkq_stats());
923 }
924
925 void
926 print_spark(spark)
927 rtsSpark *spark;
928
929   char str[16];
930
931   if (spark==NULL) {
932     debugBelch("Spark: NIL\n");
933     return;
934   } else {
935     sprintf(str,
936             ((spark->node==NULL) ? "______" : "%#6lx"), 
937             stgCast(StgPtr,spark->node));
938
939     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
940             str, spark->name, 
941             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
942             spark->prev, spark->next);
943   }
944 }
945
946 void
947 print_sparkq(proc)
948 PEs proc;
949 // rtsSpark *hd;
950 {
951   rtsSpark *x = pending_sparks_hds[proc];
952
953   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
954   for (; x!=(rtsSpark*)NULL; x=x->next) {
955     print_spark(x);
956   }
957 }
958
959 /* 
960    Print a statistics of all spark queues.
961 */
962 void
963 print_sparkq_stats(void)
964 {
965   PEs p;
966
967   debugBelch("SparkQs: [");
968   for (p=0; p<RtsFlags.GranFlags.proc; p++)
969     debugBelch(", PE %d: %d", p, spark_queue_len(p));
970   debugBelch("\n");
971 }
972
973 #endif