Fix some unsigned comparisions that should be signed
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2008
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * The implementation uses Double-Ended Queues with lock-free access
8  * (thereby often called "deque") as described in
9  *
10  * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
11  * SPAA'05, July 2005, Las Vegas, USA.
12  * ACM 1-58113-986-1/05/0007
13  *
14  * Author: Jost Berthold MSRC 07-09/2008
15  *
16  * The DeQue is held as a circular array with known length. Positions
17  * of top (read-end) and bottom (write-end) always increase, and the
18  * array is accessed with indices modulo array-size. While this bears
19  * the risk of overflow, we assume that (with 64 bit indices), a
20  * program must run very long to reach that point.
21  * 
22  * The write end of the queue (position bottom) can only be used with
23  * mutual exclusion, i.e. by exactly one caller at a time.  At this
24  * end, new items can be enqueued using pushBottom()/newSpark(), and
25  * removed using popBottom()/reclaimSpark() (the latter implying a cas
26  * synchronisation with potential concurrent readers for the case of
27  * just one element).
28  * 
29  * Multiple readers can steal()/findSpark() from the read end
30  * (position top), and are synchronised without a lock, based on a cas
31  * of the top position. One reader wins, the others return NULL for a
32  * failure.
33  * 
34  * Both popBottom and steal also return NULL when the queue is empty.
35  * 
36  -------------------------------------------------------------------------*/
37
38 #include "PosixSource.h"
39 #include "Rts.h"
40 #include "Storage.h"
41 #include "Schedule.h"
42 #include "SchedAPI.h"
43 #include "RtsFlags.h"
44 #include "RtsUtils.h"
45 #include "ParTicky.h"
46 #include "Trace.h"
47 #include "Prelude.h"
48
49 #include "SMP.h" // for cas
50
51 #include "Sparks.h"
52
53 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
54
55 /* internal helpers ... */
56
57 static StgWord
58 roundUp2(StgWord val)
59 {
60   StgWord rounded = 1;
61
62   /* StgWord is unsigned anyway, only catch 0 */
63   if (val == 0) {
64     barf("DeQue,roundUp2: invalid size 0 requested");
65   }
66   /* at least 1 bit set, shift up to its place */
67   do {
68     rounded = rounded << 1;
69   } while (0 != (val = val>>1));
70   return rounded;
71 }
72
73 #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
74
75 /* -----------------------------------------------------------------------------
76  * 
77  * Initialising spark pools.
78  *
79  * -------------------------------------------------------------------------- */
80
81 /* constructor */
82 static SparkPool*
83 initPool(StgWord size)
84 {
85   StgWord realsize; 
86   SparkPool *q;
87
88   realsize = roundUp2(size); /* to compute modulo as a bitwise & */
89
90   q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
91                               "newSparkPool");
92   q->elements = (StgClosurePtr*) 
93                 stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
94                                "newSparkPool:data space");
95   q->top=0;
96   q->bottom=0;
97   q->topBound=0; /* read by writer, updated each time top is read */
98
99   q->size = realsize;  /* power of 2 */
100   q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
101
102   ASSERT_SPARK_POOL_INVARIANTS(q); 
103   return q;
104 }
105
106 void
107 initSparkPools( void )
108 {
109 #ifdef THREADED_RTS
110     /* walk over the capabilities, allocating a spark pool for each one */
111     nat i;
112     for (i = 0; i < n_capabilities; i++) {
113       capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
114     }
115 #else
116     /* allocate a single spark pool */
117     MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
118 #endif
119 }
120
121 void
122 freeSparkPool (SparkPool *pool)
123 {
124   /* should not interfere with concurrent findSpark() calls! And
125      nobody should use the pointer any more. We cross our fingers...*/
126   stgFree(pool->elements);
127   stgFree(pool);
128 }
129
130 /* -----------------------------------------------------------------------------
131  * 
132  * reclaimSpark: remove a spark from the write end of the queue.
133  * Returns the removed spark, and NULL if a race is lost or the pool
134  * empty.
135  *
136  * If only one spark is left in the pool, we synchronise with
137  * concurrently stealing threads by using cas to modify the top field.
138  * This routine should NEVER be called by a task which does not own
139  * the capability. Can this be checked here?
140  *
141  * -------------------------------------------------------------------------- */
142
143 StgClosure *
144 reclaimSpark (SparkPool *deque)
145 {
146   /* also a bit tricky, has to avoid concurrent steal() calls by
147      accessing top with cas, when there is only one element left */
148   StgWord t, b;
149   StgClosurePtr* pos;
150   long  currSize;
151   StgClosurePtr removed;
152
153   ASSERT_SPARK_POOL_INVARIANTS(deque); 
154
155   b = deque->bottom;
156   /* "decrement b as a test, see what happens" */
157   deque->bottom = --b; 
158   pos = (deque->elements) + (b & (deque->moduloSize));
159   t = deque->top; /* using topBound would give an *upper* bound, we
160                      need a lower bound. We use the real top here, but
161                      can update the topBound value */
162   deque->topBound = t;
163   currSize = b - t;
164   if (currSize < 0) { /* was empty before decrementing b, set b
165                          consistently and abort */
166     deque->bottom = t;
167     return NULL;
168   }
169   removed = *pos;
170   if (currSize > 0) { /* no danger, still elements in buffer after b-- */
171     return removed;
172   } 
173   /* otherwise, has someone meanwhile stolen the same (last) element?
174      Check and increment top value to know  */
175   if ( !(CASTOP(&(deque->top),t,t+1)) ) {
176     removed = NULL; /* no success, but continue adjusting bottom */
177   }
178   deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
179   deque->topBound = t+1; /* ...and cached top value as well */
180
181   ASSERT_SPARK_POOL_INVARIANTS(deque); 
182
183   return removed;
184 }
185
186 /* -----------------------------------------------------------------------------
187  * 
188  * tryStealSpark: try to steal a spark from a Capability.
189  *
190  * Returns a valid spark, or NULL if the pool was empty, and can
191  * occasionally return NULL if there was a race with another thread
192  * stealing from the same pool.  In this case, try again later.
193  *
194  -------------------------------------------------------------------------- */
195
196 static StgClosurePtr
197 steal(SparkPool *deque)
198 {
199   StgClosurePtr* pos;
200   StgClosurePtr* arraybase;
201   StgWord sz;
202   StgClosurePtr stolen;
203   StgWord b,t; 
204
205 // Can't do this on someone else's spark pool:
206 // ASSERT_SPARK_POOL_INVARIANTS(deque); 
207
208   b = deque->bottom;
209   t = deque->top;
210
211   // NB. b and t are unsigned; we need a signed value for the test
212   // below.
213   if ((long)b - (long)t <= 0 ) { 
214     return NULL; /* already looks empty, abort */
215   }
216
217   /* now access array, see pushBottom() */
218   arraybase = deque->elements;
219   sz = deque->moduloSize;
220   pos = arraybase + (t & sz);  
221   stolen = *pos;
222
223   /* now decide whether we have won */
224   if ( !(CASTOP(&(deque->top),t,t+1)) ) {
225       /* lost the race, someon else has changed top in the meantime */
226       return NULL;
227   }  /* else: OK, top has been incremented by the cas call */
228
229 // Can't do this on someone else's spark pool:
230 // ASSERT_SPARK_POOL_INVARIANTS(deque); 
231
232   /* return stolen element */
233   return stolen;
234 }
235
236 StgClosure *
237 tryStealSpark (Capability *cap)
238 {
239   SparkPool *pool = cap->sparks;
240   StgClosure *stolen;
241
242   do { 
243       stolen = steal(pool);
244   } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
245
246   return stolen;
247 }
248
249
250 /* -----------------------------------------------------------------------------
251  * 
252  * "guesses" whether a deque is empty. Can return false negatives in
253  *  presence of concurrent steal() calls, and false positives in
254  *  presence of a concurrent pushBottom().
255  *
256  * -------------------------------------------------------------------------- */
257
258 rtsBool
259 looksEmpty(SparkPool* deque)
260 {
261   StgWord t = deque->top;
262   StgWord b = deque->bottom;
263   /* try to prefer false negatives by reading top first */
264   return ((long)b - (long)t <= 0);
265   /* => array is *never* completely filled, always 1 place free! */
266 }
267
268 /* -----------------------------------------------------------------------------
269  * 
270  * Turn a spark into a real thread
271  *
272  * -------------------------------------------------------------------------- */
273
274 void
275 createSparkThread (Capability *cap)
276 {
277     StgTSO *tso;
278
279     tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
280                           &base_GHCziConc_runSparks_closure);
281     appendToRunQueue(cap,tso);
282 }
283
284 /* -----------------------------------------------------------------------------
285  * 
286  * Create a new spark
287  *
288  * -------------------------------------------------------------------------- */
289
290 #define DISCARD_NEW
291
292 /* enqueue an element. Should always succeed by resizing the array
293    (not implemented yet, silently fails in that case). */
294 static void
295 pushBottom (SparkPool* deque, StgClosurePtr elem)
296 {
297   StgWord t;
298   StgClosurePtr* pos;
299   StgWord sz = deque->moduloSize; 
300   StgWord b = deque->bottom;
301
302   ASSERT_SPARK_POOL_INVARIANTS(deque); 
303
304   /* we try to avoid reading deque->top (accessed by all) and use
305      deque->topBound (accessed only by writer) instead. 
306      This is why we do not just call empty(deque) here.
307   */
308   t = deque->topBound;
309   if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { 
310     /* NB. 1. sz == deque->size - 1, thus ">="
311            2. signed comparison, it is possible that t > b
312     */
313     /* could be full, check the real top value in this case */
314     t = deque->top;
315     deque->topBound = t;
316     if (b - t >= sz) { /* really no space left :-( */
317       /* reallocate the array, copying the values. Concurrent steal()s
318          will in the meantime use the old one and modify only top.
319          This means: we cannot safely free the old space! Can keep it
320          on a free list internally here...
321
322          Potential bug in combination with steal(): if array is
323          replaced, it is unclear which one concurrent steal operations
324          use. Must read the array base address in advance in steal().
325       */
326 #if defined(DISCARD_NEW)
327       ASSERT_SPARK_POOL_INVARIANTS(deque); 
328       return; /* for now, silently fail */
329 #else
330       /* could make room by incrementing the top position here.  In
331        * this case, should use CASTOP. If this fails, someone else has
332        * removed something, and new room will be available.
333        */
334       ASSERT_SPARK_POOL_INVARIANTS(deque); 
335 #endif
336     }
337   }
338   pos = (deque->elements) + (b & sz);
339   *pos = elem;
340   (deque->bottom)++;
341
342   ASSERT_SPARK_POOL_INVARIANTS(deque); 
343   return;
344 }
345
346
347 /* --------------------------------------------------------------------------
348  * newSpark: create a new spark, as a result of calling "par"
349  * Called directly from STG.
350  * -------------------------------------------------------------------------- */
351
352 StgInt
353 newSpark (StgRegTable *reg, StgClosure *p)
354 {
355     Capability *cap = regTableToCapability(reg);
356     SparkPool *pool = cap->sparks;
357
358     /* I am not sure whether this is the right thing to do.
359      * Maybe it is better to exploit the tag information
360      * instead of throwing it away?
361      */
362     p = UNTAG_CLOSURE(p);
363
364     ASSERT_SPARK_POOL_INVARIANTS(pool);
365
366     if (closure_SHOULD_SPARK(p)) {
367       pushBottom(pool,p);
368     }   
369
370     cap->sparks_created++;
371
372     ASSERT_SPARK_POOL_INVARIANTS(pool);
373     return 1;
374 }
375
376
377
378 /* --------------------------------------------------------------------------
379  * Remove all sparks from the spark queues which should not spark any
380  * more.  Called after GC. We assume exclusive access to the structure
381  * and replace  all sparks in the queue, see explanation below. At exit,
382  * the spark pool only contains sparkable closures.
383  * -------------------------------------------------------------------------- */
384
385 void
386 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
387
388     SparkPool *pool;
389     StgClosurePtr spark, tmp, *elements;
390     nat n, pruned_sparks; // stats only
391     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
392     const StgInfoTable *info;
393     
394     PAR_TICKY_MARK_SPARK_QUEUE_START();
395     
396     n = 0;
397     pruned_sparks = 0;
398     
399     pool = cap->sparks;
400     
401     // it is possible that top > bottom, indicating an empty pool.  We
402     // fix that here; this is only necessary because the loop below
403     // assumes it.
404     if (pool->top > pool->bottom)
405         pool->top = pool->bottom;
406
407     // Take this opportunity to reset top/bottom modulo the size of
408     // the array, to avoid overflow.  This is only possible because no
409     // stealing is happening during GC.
410     pool->bottom  -= pool->top & ~pool->moduloSize;
411     pool->top     &= pool->moduloSize;
412     pool->topBound = pool->top;
413
414     debugTrace(DEBUG_sched,
415                "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
416                sparkPoolSize(pool), pool->bottom, pool->top);
417     ASSERT_SPARK_POOL_INVARIANTS(pool);
418
419     elements = pool->elements;
420
421     /* We have exclusive access to the structure here, so we can reset
422        bottom and top counters, and prune invalid sparks. Contents are
423        copied in-place if they are valuable, otherwise discarded. The
424        routine uses "real" indices t and b, starts by computing them
425        as the modulus size of top and bottom,
426
427        Copying:
428
429        At the beginning, the pool structure can look like this:
430        ( bottom % size >= top % size , no wrap-around)
431                   t          b
432        ___________***********_________________
433
434        or like this ( bottom % size < top % size, wrap-around )
435                   b         t
436        ***********__________******************
437        As we need to remove useless sparks anyway, we make one pass
438        between t and b, moving valuable content to b and subsequent
439        cells (wrapping around when the size is reached).
440
441                      b      t
442        ***********OOO_______XX_X__X?**********
443                      ^____move?____/
444
445        After this movement, botInd becomes the new bottom, and old
446        bottom becomes the new top index, both as indices in the array
447        size range.
448     */
449     // starting here
450     currInd = (pool->top) & (pool->moduloSize); // mod
451
452     // copies of evacuated closures go to space from botInd on
453     // we keep oldBotInd to know when to stop
454     oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
455
456     // on entry to loop, we are within the bounds
457     ASSERT( currInd < pool->size && botInd  < pool->size );
458
459     while (currInd != oldBotInd ) {
460       /* must use != here, wrap-around at size
461          subtle: loop not entered if queue empty
462        */
463
464       /* check element at currInd. if valuable, evacuate and move to
465          botInd, otherwise move on */
466       spark = elements[currInd];
467
468       // We have to be careful here: in the parallel GC, another
469       // thread might evacuate this closure while we're looking at it,
470       // so grab the info pointer just once.
471       info = spark->header.info;
472       if (IS_FORWARDING_PTR(info)) {
473           tmp = (StgClosure*)UN_FORWARDING_PTR(info);
474           /* if valuable work: shift inside the pool */
475           if (closure_SHOULD_SPARK(tmp)) {
476               elements[botInd] = tmp; // keep entry (new address)
477               botInd++;
478               n++;
479           } else {
480               pruned_sparks++; // discard spark
481               cap->sparks_pruned++;
482           }
483       } else {
484           if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
485               elements[botInd] = spark; // keep entry (new address)
486               evac (user, &elements[botInd]);
487               botInd++;
488               n++;
489           } else {
490               pruned_sparks++; // discard spark
491               cap->sparks_pruned++;
492           }
493       }
494       currInd++;
495
496       // in the loop, we may reach the bounds, and instantly wrap around
497       ASSERT( currInd <= pool->size && botInd <= pool->size );
498       if ( currInd == pool->size ) { currInd = 0; }
499       if ( botInd == pool->size )  { botInd = 0;  }
500
501     } // while-loop over spark pool elements
502
503     ASSERT(currInd == oldBotInd);
504
505     pool->top = oldBotInd; // where we started writing
506     pool->topBound = pool->top;
507
508     pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
509     // first free place we did not use (corrected by wraparound)
510
511     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
512
513     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
514     
515     debugTrace(DEBUG_sched,
516                "new spark queue len=%d; (hd=%ld; tl=%ld)",
517                sparkPoolSize(pool), pool->bottom, pool->top);
518
519     ASSERT_SPARK_POOL_INVARIANTS(pool);
520 }
521
522 /* GC for the spark pool, called inside Capability.c for all
523    capabilities in turn. Blindly "evac"s complete spark pool. */
524 void
525 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
526 {
527     StgClosure **sparkp;
528     SparkPool *pool;
529     StgWord top,bottom, modMask;
530     
531     pool = cap->sparks;
532
533     ASSERT_SPARK_POOL_INVARIANTS(pool);
534
535     top = pool->top;
536     bottom = pool->bottom;
537     sparkp = pool->elements;
538     modMask = pool->moduloSize;
539
540     while (top < bottom) {
541     /* call evac for all closures in range (wrap-around via modulo)
542      * In GHC-6.10, evac takes an additional 1st argument to hold a
543      * GC-specific register, see rts/sm/GC.c::mark_root()
544      */
545       evac( user , sparkp + (top & modMask) ); 
546       top++;
547     }
548
549     debugTrace(DEBUG_sched,
550                "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
551                sparkPoolSize(pool), pool->bottom, pool->top);
552 }
553
554 /* ----------------------------------------------------------------------------
555  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
556  * capabilities) and its size. Accesses all spark pools and equally
557  * distributes the sparks among them.
558  *
559  * Could be called after GC, before Cap. release, from scheduler. 
560  * -------------------------------------------------------------------------- */
561 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
562
563 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
564                            Capability caps[] STG_UNUSED) {
565   barf("not implemented");
566 }
567
568 #else
569
570 StgInt
571 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
572 {
573     /* nothing */
574     return 1;
575 }
576
577
578 #endif /* PARALLEL_HASKELL || THREADED_RTS */
579
580
581 /* -----------------------------------------------------------------------------
582  * 
583  * GRAN & PARALLEL_HASKELL stuff beyond here.
584  *
585  *  TODO "nuke" this!
586  *
587  * -------------------------------------------------------------------------- */
588
589 #if defined(PARALLEL_HASKELL) || defined(GRAN)
590
591 static void slide_spark_pool( StgSparkPool *pool );
592
593 rtsBool
594 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
595 {
596   if (pool->tl == pool->lim)
597     slide_spark_pool(pool);
598
599   if (closure_SHOULD_SPARK(closure) && 
600       pool->tl < pool->lim) {
601     *(pool->tl++) = closure;
602
603 #if defined(PARALLEL_HASKELL)
604     // collect parallel global statistics (currently done together with GC stats)
605     if (RtsFlags.ParFlags.ParStats.Global &&
606         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
607       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
608       globalParStats.tot_sparks_created++;
609     }
610 #endif
611     return rtsTrue;
612   } else {
613 #if defined(PARALLEL_HASKELL)
614     // collect parallel global statistics (currently done together with GC stats)
615     if (RtsFlags.ParFlags.ParStats.Global &&
616         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
617       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
618       globalParStats.tot_sparks_ignored++;
619     }
620 #endif
621     return rtsFalse;
622   }
623 }
624
625 static void
626 slide_spark_pool( StgSparkPool *pool )
627 {
628   StgClosure **sparkp, **to_sparkp;
629
630   sparkp = pool->hd;
631   to_sparkp = pool->base;
632   while (sparkp < pool->tl) {
633     ASSERT(to_sparkp<=sparkp);
634     ASSERT(*sparkp!=NULL);
635     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
636
637     if (closure_SHOULD_SPARK(*sparkp)) {
638       *to_sparkp++ = *sparkp++;
639     } else {
640       sparkp++;
641     }
642   }
643   pool->hd = pool->base;
644   pool->tl = to_sparkp;
645 }
646
647 void
648 disposeSpark(spark)
649 StgClosure *spark;
650 {
651 #if !defined(THREADED_RTS)
652   Capability *cap;
653   StgSparkPool *pool;
654
655   cap = &MainRegTable;
656   pool = &(cap->rSparks);
657   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
658 #endif
659   ASSERT(spark != (StgClosure *)NULL);
660   /* Do nothing */
661 }
662
663
664 #elif defined(GRAN)
665
666 /* 
667    Search the spark queue of the proc in event for a spark that's worth
668    turning into a thread 
669    (was gimme_spark in the old RTS)
670 */
671 void
672 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
673 {
674    PEs proc = event->proc,       /* proc to search for work */
675        creator = event->creator; /* proc that requested work */
676    StgClosure* node;
677    rtsBool found;
678    rtsSparkQ spark_of_non_local_node = NULL, 
679              spark_of_non_local_node_prev = NULL, 
680              low_priority_spark = NULL, 
681              low_priority_spark_prev = NULL,
682              spark = NULL, prev = NULL;
683   
684    /* Choose a spark from the local spark queue */
685    prev = (rtsSpark*)NULL;
686    spark = pending_sparks_hds[proc];
687    found = rtsFalse;
688
689    // ToDo: check this code & implement local sparking !! -- HWL  
690    while (!found && spark != (rtsSpark*)NULL)
691      {
692        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
693               (prev==(rtsSpark*)NULL || prev->next==spark) &&
694               (spark->prev==prev));
695        node = spark->node;
696        if (!closure_SHOULD_SPARK(node)) 
697          {
698            IF_GRAN_DEBUG(checkSparkQ,
699                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
700                                spark, node));
701
702            if (RtsFlags.GranFlags.GranSimStats.Sparks)
703              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
704                               spark->node, spark->name, spark_queue_len(proc));
705   
706            ASSERT(spark != (rtsSpark*)NULL);
707            ASSERT(SparksAvail>0);
708            --SparksAvail;
709
710            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
711            spark = delete_from_sparkq (spark, proc, rtsTrue);
712            if (spark != (rtsSpark*)NULL)
713              prev = spark->prev;
714            continue;
715          }
716        /* -- node should eventually be sparked */
717        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
718                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
719          {
720            barf("Local sparking not yet implemented");
721
722            /* Remember first low priority spark */
723            if (spark_of_non_local_node==(rtsSpark*)NULL) {
724              spark_of_non_local_node_prev = prev;
725              spark_of_non_local_node = spark;
726               }
727   
728            if (spark->next == (rtsSpark*)NULL) { 
729              /* ASSERT(spark==SparkQueueTl);  just for testing */
730              prev = spark_of_non_local_node_prev;
731              spark = spark_of_non_local_node;
732              found = rtsTrue;
733              break;
734            }
735   
736 # if defined(GRAN) && defined(GRAN_CHECK)
737            /* Should never happen; just for testing 
738            if (spark==pending_sparks_tl) {
739              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
740                 stg_exit(EXIT_FAILURE);
741                 } */
742 # endif
743            prev = spark; 
744            spark = spark->next;
745            ASSERT(SparksAvail>0);
746            --SparksAvail;
747            continue;
748          }
749        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
750                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
751          {
752            if (RtsFlags.GranFlags.DoPrioritySparking)
753              barf("Priority sparking not yet implemented");
754
755            found = rtsTrue;
756          }
757 #if 0      
758        else /* only used if SparkPriority2 is defined */
759          {
760            /* ToDo: fix the code below and re-integrate it */
761            /* Remember first low priority spark */
762            if (low_priority_spark==(rtsSpark*)NULL) { 
763              low_priority_spark_prev = prev;
764              low_priority_spark = spark;
765            }
766   
767            if (spark->next == (rtsSpark*)NULL) { 
768                 /* ASSERT(spark==spark_queue_tl);  just for testing */
769              prev = low_priority_spark_prev;
770              spark = low_priority_spark;
771              found = rtsTrue;       /* take low pri spark => rc is 2  */
772              break;
773            }
774   
775            /* Should never happen; just for testing 
776            if (spark==pending_sparks_tl) {
777              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
778                 stg_exit(EXIT_FAILURE);
779              break;
780            } */                
781            prev = spark; 
782            spark = spark->next;
783
784            IF_GRAN_DEBUG(pri,
785                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
786                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
787                                spark->node, spark->name);)
788            }
789 #endif
790    }  /* while (spark!=NULL && !found) */
791
792    *spark_res = spark;
793    *found_res = found;
794 }
795
796 /*
797   Turn the spark into a thread.
798   In GranSim this basically means scheduling a StartThread event for the
799   node pointed to by the spark at some point in the future.
800   (was munch_spark in the old RTS)
801 */
802 rtsBool
803 activateSpark (rtsEvent *event, rtsSparkQ spark) 
804 {
805   PEs proc = event->proc,       /* proc to search for work */
806       creator = event->creator; /* proc that requested work */
807   StgTSO* tso;
808   StgClosure* node;
809   rtsTime spark_arrival_time;
810
811   /* 
812      We've found a node on PE proc requested by PE creator.
813      If proc==creator we can turn the spark into a thread immediately;
814      otherwise we schedule a MoveSpark event on the requesting PE
815   */
816      
817   /* DaH Qu' yIchen */
818   if (proc!=creator) { 
819
820     /* only possible if we simulate GUM style fishing */
821     ASSERT(RtsFlags.GranFlags.Fishing);
822
823     /* Message packing costs for sending a Fish; qeq jabbI'ID */
824     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
825   
826     if (RtsFlags.GranFlags.GranSimStats.Sparks)
827       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
828                        (StgTSO*)NULL, spark->node,
829                        spark->name, spark_queue_len(proc));
830
831     /* time of the spark arrival on the remote PE */
832     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
833
834     new_event(creator, proc, spark_arrival_time,
835               MoveSpark,
836               (StgTSO*)NULL, spark->node, spark);
837
838     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
839             
840   } else { /* proc==creator i.e. turn the spark into a thread */
841
842     if ( RtsFlags.GranFlags.GranSimStats.Global && 
843          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
844
845       globalGranStats.tot_low_pri_sparks++;
846       IF_GRAN_DEBUG(pri,
847                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
848                           spark->gran_info, 
849                           spark->node, spark->name));
850     } 
851     
852     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
853     
854     node = spark->node;
855     
856 # if 0
857     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
858     if (GARBAGE COLLECTION IS NECESSARY) {
859       /* Some kind of backoff needed here in case there's too little heap */
860 #  if defined(GRAN_CHECK) && defined(GRAN)
861       if (RtsFlags.GcFlags.giveStats)
862         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
863                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
864                 spark, node, spark->name);
865 #  endif
866       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
867                   FindWork,
868                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
869       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
870       GarbageCollect(GetRoots, rtsFalse);
871       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
872       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
873       spark = NULL;
874       return; /* was: continue; */ /* to the next event, eventually */
875     }
876 # endif
877     
878     if (RtsFlags.GranFlags.GranSimStats.Sparks)
879       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
880                        spark->node, spark->name,
881                        spark_queue_len(CurrentProc));
882     
883     new_event(proc, proc, CurrentTime[proc],
884               StartThread, 
885               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
886     
887     procStatus[proc] = Starting;
888   }
889 }
890
891 /* -------------------------------------------------------------------------
892    This is the main point where handling granularity information comes into
893    play. 
894    ------------------------------------------------------------------------- */
895
896 #define MAX_RAND_PRI    100
897
898 /* 
899    Granularity info transformers. 
900    Applied to the GRAN_INFO field of a spark.
901 */
902 STATIC_INLINE nat  ID(nat x) { return(x); };
903 STATIC_INLINE nat  INV(nat x) { return(-x); };
904 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
905 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
906
907 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
908 rtsSpark *
909 newSpark(node,name,gran_info,size_info,par_info,local)
910 StgClosure *node;
911 nat name, gran_info, size_info, par_info, local;
912 {
913   nat pri;
914   rtsSpark *newspark;
915
916   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
917         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
918         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
919                            ID(gran_info);
920
921   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
922        pri<RtsFlags.GranFlags.SparkPriority ) {
923     IF_GRAN_DEBUG(pri,
924       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
925               pri, RtsFlags.GranFlags.SparkPriority, node, name));
926     return ((rtsSpark*)NULL);
927   }
928
929   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
930   newspark->prev = newspark->next = (rtsSpark*)NULL;
931   newspark->node = node;
932   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
933   newspark->gran_info = pri;
934   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
935
936   if (RtsFlags.GranFlags.GranSimStats.Global) {
937     globalGranStats.tot_sparks_created++;
938     globalGranStats.sparks_created_on_PE[CurrentProc]++;
939   }
940
941   return(newspark);
942 }
943
944 void
945 disposeSpark(spark)
946 rtsSpark *spark;
947 {
948   ASSERT(spark!=NULL);
949   stgFree(spark);
950 }
951
952 void 
953 disposeSparkQ(spark)
954 rtsSparkQ spark;
955 {
956   if (spark==NULL) 
957     return;
958
959   disposeSparkQ(spark->next);
960
961 # ifdef GRAN_CHECK
962   if (SparksAvail < 0) {
963     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
964     print_spark(spark);
965   }
966 # endif
967
968   stgFree(spark);
969 }
970
971 /*
972    With PrioritySparking add_to_spark_queue performs an insert sort to keep
973    the spark queue sorted. Otherwise the spark is just added to the end of
974    the queue. 
975 */
976
977 void
978 add_to_spark_queue(spark)
979 rtsSpark *spark;
980 {
981   rtsSpark *prev = NULL, *next = NULL;
982   nat count = 0;
983   rtsBool found = rtsFalse;
984
985   if ( spark == (rtsSpark *)NULL ) {
986     return;
987   }
988
989   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
990     /* Priority sparking is enabled i.e. spark queues must be sorted */
991
992     for (prev = NULL, next = pending_sparks_hd, count=0;
993          (next != NULL) && 
994          !(found = (spark->gran_info >= next->gran_info));
995          prev = next, next = next->next, count++) 
996      {}
997
998   } else {   /* 'utQo' */
999     /* Priority sparking is disabled */
1000     
1001     found = rtsFalse;   /* to add it at the end */
1002
1003   }
1004
1005   if (found) {
1006     /* next points to the first spark with a gran_info smaller than that
1007        of spark; therefore, add spark before next into the spark queue */
1008     spark->next = next;
1009     if ( next == NULL ) {
1010       pending_sparks_tl = spark;
1011     } else {
1012       next->prev = spark;
1013     }
1014     spark->prev = prev;
1015     if ( prev == NULL ) {
1016       pending_sparks_hd = spark;
1017     } else {
1018       prev->next = spark;
1019     }
1020   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
1021     /* add the spark at the end of the spark queue */
1022     spark->next = NULL;                        
1023     spark->prev = pending_sparks_tl;
1024     if (pending_sparks_hd == NULL)
1025       pending_sparks_hd = spark;
1026     else
1027       pending_sparks_tl->next = spark;
1028     pending_sparks_tl = spark;    
1029   } 
1030   ++SparksAvail;
1031
1032   /* add costs for search in priority sparking */
1033   if (RtsFlags.GranFlags.DoPrioritySparking) {
1034     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
1035   }
1036
1037   IF_GRAN_DEBUG(checkSparkQ,
1038                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
1039                       spark, spark->node, CurrentProc);
1040                 print_sparkq_stats());
1041
1042 #  if defined(GRAN_CHECK)
1043   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
1044     for (prev = NULL, next =  pending_sparks_hd;
1045          (next != NULL);
1046          prev = next, next = next->next) 
1047       {}
1048     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
1049       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1050               spark,CurrentProc, 
1051               pending_sparks_tl, prev);
1052   }
1053 #  endif
1054
1055 #  if defined(GRAN_CHECK)
1056   /* Check if the sparkq is still sorted. Just for testing, really!  */
1057   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
1058        RtsFlags.GranFlags.Debug.pri ) {
1059     rtsBool sorted = rtsTrue;
1060     rtsSpark *prev, *next;
1061
1062     if (pending_sparks_hd == NULL ||
1063         pending_sparks_hd->next == NULL ) {
1064       /* just 1 elem => ok */
1065     } else {
1066       for (prev = pending_sparks_hd,
1067            next = pending_sparks_hd->next;
1068            (next != NULL) ;
1069            prev = next, next = next->next) {
1070         sorted = sorted && 
1071                  (prev->gran_info >= next->gran_info);
1072       }
1073     }
1074     if (!sorted) {
1075       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
1076               CurrentProc);
1077       print_sparkq(CurrentProc);
1078     }
1079   }
1080 #  endif
1081 }
1082
1083 nat
1084 spark_queue_len(proc) 
1085 PEs proc;
1086 {
1087  rtsSpark *prev, *spark;                     /* prev only for testing !! */
1088  nat len;
1089
1090  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
1091       spark != NULL; 
1092       len++, prev = spark, spark = spark->next)
1093    {}
1094
1095 #  if defined(GRAN_CHECK)
1096   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
1097     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
1098       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1099               proc, pending_sparks_tls[proc], prev);
1100 #  endif
1101
1102  return (len);
1103 }
1104
1105 /* 
1106    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
1107    hd and tl pointers of the spark queue. Returns a pointer to the next
1108    spark in the queue.
1109 */
1110 rtsSpark *
1111 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
1112 rtsSpark *spark;
1113 PEs p;
1114 rtsBool dispose_too;
1115 {
1116   rtsSpark *new_spark;
1117
1118   if (spark==NULL) 
1119     barf("delete_from_sparkq: trying to delete NULL spark\n");
1120
1121 #  if defined(GRAN_CHECK)
1122   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1123     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
1124             pending_sparks_hd, pending_sparks_tl,
1125             spark->prev, spark, spark->next, 
1126             (spark->next==NULL ? 0 : spark->next->prev));
1127   }
1128 #  endif
1129
1130   if (spark->prev==NULL) {
1131     /* spark is first spark of queue => adjust hd pointer */
1132     ASSERT(pending_sparks_hds[p]==spark);
1133     pending_sparks_hds[p] = spark->next;
1134   } else {
1135     spark->prev->next = spark->next;
1136   }
1137   if (spark->next==NULL) {
1138     ASSERT(pending_sparks_tls[p]==spark);
1139     /* spark is first spark of queue => adjust tl pointer */
1140     pending_sparks_tls[p] = spark->prev;
1141   } else {
1142     spark->next->prev = spark->prev;
1143   }
1144   new_spark = spark->next;
1145   
1146 #  if defined(GRAN_CHECK)
1147   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1148     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
1149             pending_sparks_hd, pending_sparks_tl,
1150             spark->prev, spark, spark->next, 
1151             (spark->next==NULL ? 0 : spark->next->prev), spark);
1152   }
1153 #  endif
1154
1155   if (dispose_too)
1156     disposeSpark(spark);
1157                   
1158   return new_spark;
1159 }
1160
1161 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
1162 void
1163 markSparkQueue(void)
1164
1165   StgClosure *MarkRoot(StgClosure *root); // prototype
1166   PEs p;
1167   rtsSpark *sp;
1168
1169   for (p=0; p<RtsFlags.GranFlags.proc; p++)
1170     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
1171       ASSERT(sp->node!=NULL);
1172       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
1173       // ToDo?: statistics gathering here (also for GUM!)
1174       sp->node = (StgClosure *)MarkRoot(sp->node);
1175     }
1176
1177   IF_DEBUG(gc,
1178            debugBelch("markSparkQueue: spark statistics at start of GC:");
1179            print_sparkq_stats());
1180 }
1181
1182 void
1183 print_spark(spark)
1184 rtsSpark *spark;
1185
1186   char str[16];
1187
1188   if (spark==NULL) {
1189     debugBelch("Spark: NIL\n");
1190     return;
1191   } else {
1192     sprintf(str,
1193             ((spark->node==NULL) ? "______" : "%#6lx"), 
1194             stgCast(StgPtr,spark->node));
1195
1196     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
1197             str, spark->name, 
1198             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
1199             spark->prev, spark->next);
1200   }
1201 }
1202
1203 void
1204 print_sparkq(proc)
1205 PEs proc;
1206 // rtsSpark *hd;
1207 {
1208   rtsSpark *x = pending_sparks_hds[proc];
1209
1210   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
1211   for (; x!=(rtsSpark*)NULL; x=x->next) {
1212     print_spark(x);
1213   }
1214 }
1215
1216 /* 
1217    Print a statistics of all spark queues.
1218 */
1219 void
1220 print_sparkq_stats(void)
1221 {
1222   PEs p;
1223
1224   debugBelch("SparkQs: [");
1225   for (p=0; p<RtsFlags.GranFlags.proc; p++)
1226     debugBelch(", PE %d: %d", p, spark_queue_len(p));
1227   debugBelch("\n");
1228 }
1229
1230 #endif