[project @ 2004-02-12 02:04:59 by mthomas]
[ghc-hetmet.git] / ghc / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Sparks.c,v 1.7 2003/11/12 17:49:11 sof Exp $
3  *
4  * (c) The GHC Team, 2000
5  *
6  * Sparking support for PAR and SMP versions of the RTS.
7  *
8  * -------------------------------------------------------------------------*/
9
10 //@node Spark Management Routines, , ,
11 //@section Spark Management Routines
12
13 //@menu
14 //* Includes::                  
15 //* GUM code::                  
16 //* GranSim code::              
17 //@end menu
18 //*/
19
20 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
21 //@subsection Includes
22
23 #include "PosixSource.h"
24 #include "Rts.h"
25 #include "Schedule.h"
26 #include "SchedAPI.h"
27 #include "Storage.h"
28 #include "RtsFlags.h"
29 #include "RtsUtils.h"
30 #include "ParTicky.h"
31 # if defined(PAR)
32 # include "ParallelRts.h"
33 # include "GranSimRts.h"   // for GR_...
34 # elif defined(GRAN)
35 # include "GranSimRts.h"
36 # endif
37 #include "Sparks.h"
38
39 #if /*defined(SMP) ||*/ defined(PAR)
40
41 //@node GUM code, GranSim code, Includes, Spark Management Routines
42 //@subsection GUM code
43
44 static void slide_spark_pool( StgSparkPool *pool );
45
46 rtsBool
47 initSparkPools( void )
48 {
49   Capability *cap;
50   StgSparkPool *pool;
51
52 #ifdef SMP
53   /* walk over the capabilities, allocating a spark pool for each one */
54   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
55 #else
56   /* allocate a single spark pool */
57   cap = &MainRegTable;
58   {
59 #endif
60     pool = &(cap->rSparks);
61     
62     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
63                                      * sizeof(StgClosure *),
64                                      "initSparkPools");
65     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
66     pool->hd  = pool->base;
67     pool->tl  = pool->base;
68   }
69   return rtsTrue; /* Qapla' */
70 }
71
72 /* 
73    We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
74    spark. Rationale, we don't want to give away the only work a PE has.
75    ToDo: introduce low- and high-water-marks for load balancing.
76 */
77 StgClosure *
78 findSpark( rtsBool for_export )
79 {
80   Capability *cap;
81   StgSparkPool *pool;
82   StgClosure *spark, *first=NULL;
83   rtsBool isIdlePE = EMPTY_RUN_QUEUE();
84
85 #ifdef SMP
86   /* walk over the capabilities, allocating a spark pool for each one */
87   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
88 #else
89   /* allocate a single spark pool */
90   cap = &MainRegTable;
91   {
92 #endif
93     pool = &(cap->rSparks);
94     while (pool->hd < pool->tl) {
95       spark = *pool->hd++;
96       if (closure_SHOULD_SPARK(spark)) {
97         if (for_export && isIdlePE) {
98           if (first==NULL) {
99             first = spark; // keep the first usable spark if PE is idle
100           } else {
101             pool->hd--;    // found a second spark; keep it in the pool 
102             ASSERT(*pool->hd==spark);
103             if (RtsFlags.ParFlags.ParStats.Sparks) 
104               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
105                                GR_STEALING, ((StgTSO *)NULL), first, 
106                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
107             return first;  // and return the *first* spark found
108           }
109         } else {
110           if (RtsFlags.ParFlags.ParStats.Sparks && for_export) 
111             DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
112                              GR_STEALING, ((StgTSO *)NULL), spark, 
113                              0, 0 /* spark_queue_len(ADVISORY_POOL) */);
114           return spark;    // return first spark found
115         }
116       }
117     }
118     slide_spark_pool(pool);
119   }
120   return NULL;
121 }
122
123 /* 
124    activateSpark is defined in Schedule.c
125 */
126 rtsBool
127 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
128 {
129   if (pool->tl == pool->lim)
130     slide_spark_pool(pool);
131
132   if (closure_SHOULD_SPARK(closure) && 
133       pool->tl < pool->lim) {
134     *(pool->tl++) = closure;
135
136 #if defined(PAR)
137     // collect parallel global statistics (currently done together with GC stats)
138     if (RtsFlags.ParFlags.ParStats.Global &&
139         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
140       // fprintf(stderr, "Creating spark for %x @ %11.2f\n", closure, usertime()); 
141       globalParStats.tot_sparks_created++;
142     }
143 #endif
144     return rtsTrue;
145   } else {
146 #if defined(PAR)
147     // collect parallel global statistics (currently done together with GC stats)
148     if (RtsFlags.ParFlags.ParStats.Global &&
149         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
150       //fprintf(stderr, "Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
151       globalParStats.tot_sparks_ignored++;
152     }
153 #endif
154     return rtsFalse;
155   }
156 }
157
158 static void
159 slide_spark_pool( StgSparkPool *pool )
160 {
161   StgClosure **sparkp, **to_sparkp;
162
163   sparkp = pool->hd;
164   to_sparkp = pool->base;
165   while (sparkp < pool->tl) {
166     ASSERT(to_sparkp<=sparkp);
167     ASSERT(*sparkp!=NULL);
168     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
169
170     if (closure_SHOULD_SPARK(*sparkp)) {
171       *to_sparkp++ = *sparkp++;
172     } else {
173       sparkp++;
174     }
175   }
176   pool->hd = pool->base;
177   pool->tl = to_sparkp;
178 }
179
180 nat
181 spark_queue_len( StgSparkPool *pool ) 
182 {
183   return (nat) (pool->tl - pool->hd);
184 }
185
186 /* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
187    implicit slide i.e. after marking all sparks are at the beginning of the
188    spark pool and the spark pool only contains sparkable closures 
189 */
190 void
191 markSparkQueue( void )
192
193   StgClosure **sparkp, **to_sparkp;
194   nat n, pruned_sparks; // stats only
195   StgSparkPool *pool;
196   Capability *cap;
197
198   PAR_TICKY_MARK_SPARK_QUEUE_START();
199
200 #ifdef SMP
201   /* walk over the capabilities, allocating a spark pool for each one */
202   for (cap = free_capabilities; cap != NULL; cap = cap->link) {
203 #else
204   /* allocate a single spark pool */
205   cap = &MainRegTable;
206   {
207 #endif
208     pool = &(cap->rSparks);
209
210 #if defined(PAR)
211     // stats only
212     n = 0;
213     pruned_sparks = 0;
214 #endif
215
216     sparkp = pool->hd;
217     to_sparkp = pool->base;
218     while (sparkp < pool->tl) {
219       ASSERT(to_sparkp<=sparkp);
220       ASSERT(*sparkp!=NULL);
221       ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
222       // ToDo?: statistics gathering here (also for GUM!)
223       if (closure_SHOULD_SPARK(*sparkp)) {
224         *to_sparkp = MarkRoot(*sparkp);
225         to_sparkp++;
226 #ifdef PAR
227         n++;
228 #endif
229       } else {
230 #ifdef PAR
231         pruned_sparks++;
232 #endif
233       }
234       sparkp++;
235     }
236     pool->hd = pool->base;
237     pool->tl = to_sparkp;
238
239     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
240     
241 #if defined(SMP)
242     IF_DEBUG(scheduler,
243              belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
244                    n, pruned_sparks, pthread_self()));
245 #elif defined(PAR)
246     IF_DEBUG(scheduler,
247              belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
248                    n, pruned_sparks, mytid));
249 #else
250     IF_DEBUG(scheduler,
251              belch("markSparkQueue: marked %d sparks and pruned %d sparks",
252                    n, pruned_sparks));
253 #endif
254
255     IF_DEBUG(scheduler,
256              belch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)",
257                    spark_queue_len(pool), pool->hd, pool->tl));
258
259   }
260 }
261
262 void
263 disposeSpark(spark)
264 StgClosure *spark;
265 {
266 #if !defined(SMP)
267   Capability *cap;
268   StgSparkPool *pool;
269
270   cap = &MainRegTable;
271   pool = &(cap->rSparks);
272   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
273 #endif
274   ASSERT(spark != (StgClosure *)NULL);
275   /* Do nothing */
276 }
277
278
279 #elif defined(GRAN)
280
281 //@node GranSim code,  , GUM code, Spark Management Routines
282 //@subsection GranSim code
283
284 //@menu
285 //* Basic interface to sparkq::  
286 //* Aux fcts::                  
287 //@end menu
288
289 //@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
290 //@subsubsection Basic interface to sparkq
291 /* 
292    Search the spark queue of the proc in event for a spark that's worth
293    turning into a thread 
294    (was gimme_spark in the old RTS)
295 */
296 //@cindex findLocalSpark
297 void
298 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
299 {
300    PEs proc = event->proc,       /* proc to search for work */
301        creator = event->creator; /* proc that requested work */
302    StgClosure* node;
303    rtsBool found;
304    rtsSparkQ spark_of_non_local_node = NULL, 
305              spark_of_non_local_node_prev = NULL, 
306              low_priority_spark = NULL, 
307              low_priority_spark_prev = NULL,
308              spark = NULL, prev = NULL;
309   
310    /* Choose a spark from the local spark queue */
311    prev = (rtsSpark*)NULL;
312    spark = pending_sparks_hds[proc];
313    found = rtsFalse;
314
315    // ToDo: check this code & implement local sparking !! -- HWL  
316    while (!found && spark != (rtsSpark*)NULL)
317      {
318        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
319               (prev==(rtsSpark*)NULL || prev->next==spark) &&
320               (spark->prev==prev));
321        node = spark->node;
322        if (!closure_SHOULD_SPARK(node)) 
323          {
324            IF_GRAN_DEBUG(checkSparkQ,
325                          belch("^^ pruning spark %p (node %p) in gimme_spark",
326                                spark, node));
327
328            if (RtsFlags.GranFlags.GranSimStats.Sparks)
329              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
330                               spark->node, spark->name, spark_queue_len(proc));
331   
332            ASSERT(spark != (rtsSpark*)NULL);
333            ASSERT(SparksAvail>0);
334            --SparksAvail;
335
336            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
337            spark = delete_from_sparkq (spark, proc, rtsTrue);
338            if (spark != (rtsSpark*)NULL)
339              prev = spark->prev;
340            continue;
341          }
342        /* -- node should eventually be sparked */
343        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
344                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
345          {
346            barf("Local sparking not yet implemented");
347
348            /* Remember first low priority spark */
349            if (spark_of_non_local_node==(rtsSpark*)NULL) {
350              spark_of_non_local_node_prev = prev;
351              spark_of_non_local_node = spark;
352               }
353   
354            if (spark->next == (rtsSpark*)NULL) { 
355              /* ASSERT(spark==SparkQueueTl);  just for testing */
356              prev = spark_of_non_local_node_prev;
357              spark = spark_of_non_local_node;
358              found = rtsTrue;
359              break;
360            }
361   
362 # if defined(GRAN) && defined(GRAN_CHECK)
363            /* Should never happen; just for testing 
364            if (spark==pending_sparks_tl) {
365              fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
366                 stg_exit(EXIT_FAILURE);
367                 } */
368 # endif
369            prev = spark; 
370            spark = spark->next;
371            ASSERT(SparksAvail>0);
372            --SparksAvail;
373            continue;
374          }
375        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
376                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
377          {
378            if (RtsFlags.GranFlags.DoPrioritySparking)
379              barf("Priority sparking not yet implemented");
380
381            found = rtsTrue;
382          }
383 #if 0      
384        else /* only used if SparkPriority2 is defined */
385          {
386            /* ToDo: fix the code below and re-integrate it */
387            /* Remember first low priority spark */
388            if (low_priority_spark==(rtsSpark*)NULL) { 
389              low_priority_spark_prev = prev;
390              low_priority_spark = spark;
391            }
392   
393            if (spark->next == (rtsSpark*)NULL) { 
394                 /* ASSERT(spark==spark_queue_tl);  just for testing */
395              prev = low_priority_spark_prev;
396              spark = low_priority_spark;
397              found = rtsTrue;       /* take low pri spark => rc is 2  */
398              break;
399            }
400   
401            /* Should never happen; just for testing 
402            if (spark==pending_sparks_tl) {
403              fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
404                 stg_exit(EXIT_FAILURE);
405              break;
406            } */                
407            prev = spark; 
408            spark = spark->next;
409
410            IF_GRAN_DEBUG(pri,
411                          belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
412                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
413                                spark->node, spark->name);)
414            }
415 #endif
416    }  /* while (spark!=NULL && !found) */
417
418    *spark_res = spark;
419    *found_res = found;
420 }
421
422 /*
423   Turn the spark into a thread.
424   In GranSim this basically means scheduling a StartThread event for the
425   node pointed to by the spark at some point in the future.
426   (was munch_spark in the old RTS)
427 */
428 //@cindex activateSpark
429 rtsBool
430 activateSpark (rtsEvent *event, rtsSparkQ spark) 
431 {
432   PEs proc = event->proc,       /* proc to search for work */
433       creator = event->creator; /* proc that requested work */
434   StgTSO* tso;
435   StgClosure* node;
436   rtsTime spark_arrival_time;
437
438   /* 
439      We've found a node on PE proc requested by PE creator.
440      If proc==creator we can turn the spark into a thread immediately;
441      otherwise we schedule a MoveSpark event on the requesting PE
442   */
443      
444   /* DaH Qu' yIchen */
445   if (proc!=creator) { 
446
447     /* only possible if we simulate GUM style fishing */
448     ASSERT(RtsFlags.GranFlags.Fishing);
449
450     /* Message packing costs for sending a Fish; qeq jabbI'ID */
451     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
452   
453     if (RtsFlags.GranFlags.GranSimStats.Sparks)
454       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
455                        (StgTSO*)NULL, spark->node,
456                        spark->name, spark_queue_len(proc));
457
458     /* time of the spark arrival on the remote PE */
459     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
460
461     new_event(creator, proc, spark_arrival_time,
462               MoveSpark,
463               (StgTSO*)NULL, spark->node, spark);
464
465     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
466             
467   } else { /* proc==creator i.e. turn the spark into a thread */
468
469     if ( RtsFlags.GranFlags.GranSimStats.Global && 
470          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
471
472       globalGranStats.tot_low_pri_sparks++;
473       IF_GRAN_DEBUG(pri,
474                     belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
475                           spark->gran_info, 
476                           spark->node, spark->name));
477     } 
478     
479     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
480     
481     node = spark->node;
482     
483 # if 0
484     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
485     if (GARBAGE COLLECTION IS NECESSARY) {
486       /* Some kind of backoff needed here in case there's too little heap */
487 #  if defined(GRAN_CHECK) && defined(GRAN)
488       if (RtsFlags.GcFlags.giveStats)
489         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
490                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
491                 spark, node, spark->name);
492 #  endif
493       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
494                   FindWork,
495                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
496       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
497       GarbageCollect(GetRoots, rtsFalse);
498       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
499       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
500       spark = NULL;
501       return; /* was: continue; */ /* to the next event, eventually */
502     }
503 # endif
504     
505     if (RtsFlags.GranFlags.GranSimStats.Sparks)
506       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
507                        spark->node, spark->name,
508                        spark_queue_len(CurrentProc));
509     
510     new_event(proc, proc, CurrentTime[proc],
511               StartThread, 
512               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
513     
514     procStatus[proc] = Starting;
515   }
516 }
517
518 /* -------------------------------------------------------------------------
519    This is the main point where handling granularity information comes into
520    play. 
521    ------------------------------------------------------------------------- */
522
523 #define MAX_RAND_PRI    100
524
525 /* 
526    Granularity info transformers. 
527    Applied to the GRAN_INFO field of a spark.
528 */
529 STATIC_INLINE nat  ID(nat x) { return(x); };
530 STATIC_INLINE nat  INV(nat x) { return(-x); };
531 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
532 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
533
534 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
535 //@cindex newSpark
536 rtsSpark *
537 newSpark(node,name,gran_info,size_info,par_info,local)
538 StgClosure *node;
539 nat name, gran_info, size_info, par_info, local;
540 {
541   nat pri;
542   rtsSpark *newspark;
543
544   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
545         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
546         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
547                            ID(gran_info);
548
549   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
550        pri<RtsFlags.GranFlags.SparkPriority ) {
551     IF_GRAN_DEBUG(pri,
552       belch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
553               pri, RtsFlags.GranFlags.SparkPriority, node, name));
554     return ((rtsSpark*)NULL);
555   }
556
557   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
558   newspark->prev = newspark->next = (rtsSpark*)NULL;
559   newspark->node = node;
560   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
561   newspark->gran_info = pri;
562   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
563
564   if (RtsFlags.GranFlags.GranSimStats.Global) {
565     globalGranStats.tot_sparks_created++;
566     globalGranStats.sparks_created_on_PE[CurrentProc]++;
567   }
568
569   return(newspark);
570 }
571
572 //@cindex disposeSpark
573 void
574 disposeSpark(spark)
575 rtsSpark *spark;
576 {
577   ASSERT(spark!=NULL);
578   stgFree(spark);
579 }
580
581 //@cindex disposeSparkQ
582 void 
583 disposeSparkQ(spark)
584 rtsSparkQ spark;
585 {
586   if (spark==NULL) 
587     return;
588
589   disposeSparkQ(spark->next);
590
591 # ifdef GRAN_CHECK
592   if (SparksAvail < 0) {
593     fprintf(stderr,"disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
594     print_spark(spark);
595   }
596 # endif
597
598   stgFree(spark);
599 }
600
601 /*
602    With PrioritySparking add_to_spark_queue performs an insert sort to keep
603    the spark queue sorted. Otherwise the spark is just added to the end of
604    the queue. 
605 */
606
607 //@cindex add_to_spark_queue
608 void
609 add_to_spark_queue(spark)
610 rtsSpark *spark;
611 {
612   rtsSpark *prev = NULL, *next = NULL;
613   nat count = 0;
614   rtsBool found = rtsFalse;
615
616   if ( spark == (rtsSpark *)NULL ) {
617     return;
618   }
619
620   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
621     /* Priority sparking is enabled i.e. spark queues must be sorted */
622
623     for (prev = NULL, next = pending_sparks_hd, count=0;
624          (next != NULL) && 
625          !(found = (spark->gran_info >= next->gran_info));
626          prev = next, next = next->next, count++) 
627      {}
628
629   } else {   /* 'utQo' */
630     /* Priority sparking is disabled */
631     
632     found = rtsFalse;   /* to add it at the end */
633
634   }
635
636   if (found) {
637     /* next points to the first spark with a gran_info smaller than that
638        of spark; therefore, add spark before next into the spark queue */
639     spark->next = next;
640     if ( next == NULL ) {
641       pending_sparks_tl = spark;
642     } else {
643       next->prev = spark;
644     }
645     spark->prev = prev;
646     if ( prev == NULL ) {
647       pending_sparks_hd = spark;
648     } else {
649       prev->next = spark;
650     }
651   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
652     /* add the spark at the end of the spark queue */
653     spark->next = NULL;                        
654     spark->prev = pending_sparks_tl;
655     if (pending_sparks_hd == NULL)
656       pending_sparks_hd = spark;
657     else
658       pending_sparks_tl->next = spark;
659     pending_sparks_tl = spark;    
660   } 
661   ++SparksAvail;
662
663   /* add costs for search in priority sparking */
664   if (RtsFlags.GranFlags.DoPrioritySparking) {
665     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
666   }
667
668   IF_GRAN_DEBUG(checkSparkQ,
669                 belch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
670                       spark, spark->node, CurrentProc);
671                 print_sparkq_stats());
672
673 #  if defined(GRAN_CHECK)
674   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
675     for (prev = NULL, next =  pending_sparks_hd;
676          (next != NULL);
677          prev = next, next = next->next) 
678       {}
679     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
680       fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
681               spark,CurrentProc, 
682               pending_sparks_tl, prev);
683   }
684 #  endif
685
686 #  if defined(GRAN_CHECK)
687   /* Check if the sparkq is still sorted. Just for testing, really!  */
688   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
689        RtsFlags.GranFlags.Debug.pri ) {
690     rtsBool sorted = rtsTrue;
691     rtsSpark *prev, *next;
692
693     if (pending_sparks_hd == NULL ||
694         pending_sparks_hd->next == NULL ) {
695       /* just 1 elem => ok */
696     } else {
697       for (prev = pending_sparks_hd,
698            next = pending_sparks_hd->next;
699            (next != NULL) ;
700            prev = next, next = next->next) {
701         sorted = sorted && 
702                  (prev->gran_info >= next->gran_info);
703       }
704     }
705     if (!sorted) {
706       fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n",
707               CurrentProc);
708       print_sparkq(CurrentProc);
709     }
710   }
711 #  endif
712 }
713
714 //@node Aux fcts,  , Basic interface to sparkq, GranSim code
715 //@subsubsection Aux fcts
716
717 //@cindex spark_queue_len
718 nat
719 spark_queue_len(proc) 
720 PEs proc;
721 {
722  rtsSpark *prev, *spark;                     /* prev only for testing !! */
723  nat len;
724
725  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
726       spark != NULL; 
727       len++, prev = spark, spark = spark->next)
728    {}
729
730 #  if defined(GRAN_CHECK)
731   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
732     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
733       fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
734               proc, pending_sparks_tls[proc], prev);
735 #  endif
736
737  return (len);
738 }
739
740 /* 
741    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
742    hd and tl pointers of the spark queue. Returns a pointer to the next
743    spark in the queue.
744 */
745 //@cindex delete_from_sparkq
746 rtsSpark *
747 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
748 rtsSpark *spark;
749 PEs p;
750 rtsBool dispose_too;
751 {
752   rtsSpark *new_spark;
753
754   if (spark==NULL) 
755     barf("delete_from_sparkq: trying to delete NULL spark\n");
756
757 #  if defined(GRAN_CHECK)
758   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
759     fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
760             pending_sparks_hd, pending_sparks_tl,
761             spark->prev, spark, spark->next, 
762             (spark->next==NULL ? 0 : spark->next->prev));
763   }
764 #  endif
765
766   if (spark->prev==NULL) {
767     /* spark is first spark of queue => adjust hd pointer */
768     ASSERT(pending_sparks_hds[p]==spark);
769     pending_sparks_hds[p] = spark->next;
770   } else {
771     spark->prev->next = spark->next;
772   }
773   if (spark->next==NULL) {
774     ASSERT(pending_sparks_tls[p]==spark);
775     /* spark is first spark of queue => adjust tl pointer */
776     pending_sparks_tls[p] = spark->prev;
777   } else {
778     spark->next->prev = spark->prev;
779   }
780   new_spark = spark->next;
781   
782 #  if defined(GRAN_CHECK)
783   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
784     fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
785             pending_sparks_hd, pending_sparks_tl,
786             spark->prev, spark, spark->next, 
787             (spark->next==NULL ? 0 : spark->next->prev), spark);
788   }
789 #  endif
790
791   if (dispose_too)
792     disposeSpark(spark);
793                   
794   return new_spark;
795 }
796
797 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
798 //@cindex markSparkQueue
799 void
800 markSparkQueue(void)
801
802   StgClosure *MarkRoot(StgClosure *root); // prototype
803   PEs p;
804   rtsSpark *sp;
805
806   for (p=0; p<RtsFlags.GranFlags.proc; p++)
807     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
808       ASSERT(sp->node!=NULL);
809       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
810       // ToDo?: statistics gathering here (also for GUM!)
811       sp->node = (StgClosure *)MarkRoot(sp->node);
812     }
813   IF_DEBUG(gc,
814            belch("@@ markSparkQueue: spark statistics at start of GC:");
815            print_sparkq_stats());
816 }
817
818 //@cindex print_spark
819 void
820 print_spark(spark)
821 rtsSpark *spark;
822
823   char str[16];
824
825   if (spark==NULL) {
826     fprintf(stderr,"Spark: NIL\n");
827     return;
828   } else {
829     sprintf(str,
830             ((spark->node==NULL) ? "______" : "%#6lx"), 
831             stgCast(StgPtr,spark->node));
832
833     fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
834             str, spark->name, 
835             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
836             spark->prev, spark->next);
837   }
838 }
839
840 //@cindex print_sparkq
841 void
842 print_sparkq(proc)
843 PEs proc;
844 // rtsSpark *hd;
845 {
846   rtsSpark *x = pending_sparks_hds[proc];
847
848   fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x);
849   for (; x!=(rtsSpark*)NULL; x=x->next) {
850     print_spark(x);
851   }
852 }
853
854 /* 
855    Print a statistics of all spark queues.
856 */
857 //@cindex print_sparkq_stats
858 void
859 print_sparkq_stats(void)
860 {
861   PEs p;
862
863   fprintf(stderr, "SparkQs: [");
864   for (p=0; p<RtsFlags.GranFlags.proc; p++)
865     fprintf(stderr, ", PE %d: %d", p, spark_queue_len(p));
866   fprintf(stderr, "\n");
867 }
868
869 #endif