Work stealing for sparks
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2008
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * The implementation uses Double-Ended Queues with lock-free access
8  * (thereby often called "deque") as described in
9  *
10  * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
11  * SPAA'05, July 2005, Las Vegas, USA.
12  * ACM 1-58113-986-1/05/0007
13  *
14  * Author: Jost Berthold MSRC 07-09/2008
15  *
16  * The DeQue is held as a circular array with known length. Positions
17  * of top (read-end) and bottom (write-end) always increase, and the
18  * array is accessed with indices modulo array-size. While this bears
19  * the risk of overflow, we assume that (with 64 bit indices), a
20  * program must run very long to reach that point.
21  * 
22  * The write end of the queue (position bottom) can only be used with
23  * mutual exclusion, i.e. by exactly one caller at a time.  At this
24  * end, new items can be enqueued using pushBottom()/newSpark(), and
25  * removed using popBottom()/reclaimSpark() (the latter implying a cas
26  * synchronisation with potential concurrent readers for the case of
27  * just one element).
28  * 
29  * Multiple readers can steal()/findSpark() from the read end
30  * (position top), and are synchronised without a lock, based on a cas
31  * of the top position. One reader wins, the others return NULL for a
32  * failure.
33  * 
34  * Both popBottom and steal also return NULL when the queue is empty.
35  * 
36  -------------------------------------------------------------------------*/
37
38 #include "PosixSource.h"
39 #include "Rts.h"
40 #include "Storage.h"
41 #include "Schedule.h"
42 #include "SchedAPI.h"
43 #include "RtsFlags.h"
44 #include "RtsUtils.h"
45 #include "ParTicky.h"
46 #include "Trace.h"
47
48 #include "SMP.h" // for cas
49
50 #include "Sparks.h"
51
52 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
53
54 /* internal helpers ... */
55
56 StgWord roundUp2(StgWord val);
57
58 StgWord roundUp2(StgWord val) {
59   StgWord rounded = 1;
60
61   /* StgWord is unsigned anyway, only catch 0 */
62   if (val == 0) {
63     barf("DeQue,roundUp2: invalid size 0 requested");
64   }
65   /* at least 1 bit set, shift up to its place */
66   do {
67     rounded = rounded << 1;
68   } while (0 != (val = val>>1));
69   return rounded;
70 }
71
72 INLINE_HEADER
73 rtsBool casTop(StgPtr addr, StgWord old, StgWord new);
74
75 #if !defined(THREADED_RTS)
76 /* missing def. in non THREADED RTS, and makes no sense anyway... */
77 StgWord cas(StgPtr addr,StgWord old,StgWord new);
78 StgWord cas(StgPtr addr,StgWord old,StgWord new) { 
79   barf("cas: not implemented without multithreading");
80   old = new = *addr; /* to avoid gcc warnings */
81 }
82 #endif
83
84 INLINE_HEADER
85 rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
86   StgWord res = cas((StgPtr) addr, old, new);
87   return ((res == old));
88 }
89
90 /* or simply like this */
91 #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
92
93 /* -----------------------------------------------------------------------------
94  * 
95  * Initialising spark pools.
96  *
97  * -------------------------------------------------------------------------- */
98
99 /* constructor */
100 SparkPool* initPool(StgWord size) {
101
102   StgWord realsize; 
103   SparkPool *q;
104
105   realsize = roundUp2(size); /* to compute modulo as a bitwise & */
106
107   q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
108                               "newSparkPool");
109   q->elements = (StgClosurePtr*) 
110                 stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
111                                "newSparkPool:data space");
112   q->top=0;
113   q->bottom=0;
114   q->topBound=0; /* read by writer, updated each time top is read */
115
116   q->size = realsize;  /* power of 2 */
117   q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
118
119   ASSERT_SPARK_POOL_INVARIANTS(q); 
120   return q;
121 }
122
123 void
124 initSparkPools( void )
125 {
126 #ifdef THREADED_RTS
127     /* walk over the capabilities, allocating a spark pool for each one */
128     nat i;
129     for (i = 0; i < n_capabilities; i++) {
130       capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
131     }
132 #else
133     /* allocate a single spark pool */
134     MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
135 #endif
136 }
137
138 void
139 freeSparkPool(SparkPool *pool) {
140   /* should not interfere with concurrent findSpark() calls! And
141      nobody should use the pointer any more. We cross our fingers...*/
142   stgFree(pool->elements);
143   stgFree(pool);
144 }
145
146 /* reclaimSpark(cap): remove a spark from the write end of the queue.
147  * Returns the removed spark, and NULL if a race is lost or the pool
148  * empty.
149  *
150  * If only one spark is left in the pool, we synchronise with
151  * concurrently stealing threads by using cas to modify the top field.
152  * This routine should NEVER be called by a task which does not own
153  * the capability. Can this be checked here?
154  */
155 StgClosure* reclaimSpark(Capability *cap) {
156   SparkPool *deque = cap->sparks;
157   /* also a bit tricky, has to avoid concurrent steal() calls by
158      accessing top with cas, when there is only one element left */
159   StgWord t, b;
160   StgClosurePtr* pos;
161   long  currSize;
162   StgClosurePtr removed;
163
164   ASSERT_SPARK_POOL_INVARIANTS(deque); 
165
166   b = deque->bottom;
167   /* "decrement b as a test, see what happens" */
168   deque->bottom = --b; 
169   pos = (deque->elements) + (b & (deque->moduloSize));
170   t = deque->top; /* using topBound would give an *upper* bound, we
171                      need a lower bound. We use the real top here, but
172                      can update the topBound value */
173   deque->topBound = t;
174   currSize = b - t;
175   if (currSize < 0) { /* was empty before decrementing b, set b
176                          consistently and abort */
177     deque->bottom = t;
178     return NULL;
179   }
180   removed = *pos;
181   if (currSize > 0) { /* no danger, still elements in buffer after b-- */
182     return removed;
183   } 
184   /* otherwise, has someone meanwhile stolen the same (last) element?
185      Check and increment top value to know  */
186   if ( !(CASTOP(&(deque->top),t,t+1)) ) {
187     removed = NULL; /* no success, but continue adjusting bottom */
188   }
189   deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
190   deque->topBound = t+1; /* ...and cached top value as well */
191
192   ASSERT_SPARK_POOL_INVARIANTS(deque); 
193
194   return removed;
195 }
196
197 /* -----------------------------------------------------------------------------
198  * 
199  * findSpark: find a spark on the current Capability that we can fork
200  * into a thread.
201  *
202  * May be called by concurrent threads, which synchronise on top
203  * variable. Returns a spark, or NULL if pool empty or race lost.
204  *
205  -------------------------------------------------------------------------- */
206
207 StgClosurePtr steal(SparkPool *deque);
208
209 /* steal an element from the read end. Synchronises multiple callers
210    by failing with NULL return. Returns NULL when deque is empty. */
211 StgClosurePtr steal(SparkPool *deque) {
212   StgClosurePtr* pos;
213   StgClosurePtr* arraybase;
214   StgWord sz;
215   StgClosurePtr stolen;
216   StgWord b,t; 
217
218   ASSERT_SPARK_POOL_INVARIANTS(deque); 
219
220   b = deque->bottom;
221   t = deque->top;
222   if (b - t <= 0 ) { 
223     return NULL; /* already looks empty, abort */
224   }
225
226   /* now access array, see pushBottom() */
227   arraybase = deque->elements;
228   sz = deque->moduloSize;
229   pos = arraybase + (t & sz);  
230   stolen = *pos;
231
232   /* now decide whether we have won */
233   if ( !(CASTOP(&(deque->top),t,t+1)) ) {
234     /* lost the race, someon else has changed top in the meantime */
235     stolen = NULL;    
236   }  /* else: OK, top has been incremented by the cas call */
237
238
239   ASSERT_SPARK_POOL_INVARIANTS(deque); 
240   /* return NULL or stolen element */
241   return stolen;
242 }
243
244 StgClosure *
245 findSpark (Capability *cap)
246 {
247   SparkPool *deque = (cap->sparks);
248   StgClosure *stolen;
249
250   ASSERT_SPARK_POOL_INVARIANTS(deque); 
251
252   do { 
253     /* keep trying until good spark found or pool looks empty. 
254        TODO is this a good idea? */
255
256     stolen = steal(deque);
257     
258   } while ( ( !stolen /* nothing stolen*/
259               || !closure_SHOULD_SPARK(stolen)) /* spark not OK */
260             && !looksEmpty(deque)); /* run empty, give up */
261
262   /* return stolen element */
263   return stolen;
264 }
265
266
267 /* "guesses" whether a deque is empty. Can return false negatives in
268    presence of concurrent steal() calls, and false positives in
269    presence of a concurrent pushBottom().*/
270 rtsBool looksEmpty(SparkPool* deque) {
271   StgWord t = deque->top;
272   StgWord b = deque->bottom;
273   /* try to prefer false negatives by reading top first */
274   return (b - t <= 0);
275   /* => array is *never* completely filled, always 1 place free! */
276 }
277
278 /* -----------------------------------------------------------------------------
279  * 
280  * Turn a spark into a real thread
281  *
282  * -------------------------------------------------------------------------- */
283
284 void
285 createSparkThread (Capability *cap, StgClosure *p)
286 {
287     StgTSO *tso;
288
289     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
290     appendToRunQueue(cap,tso);
291 }
292
293 /* -----------------------------------------------------------------------------
294  * 
295  * Create a new spark
296  *
297  * -------------------------------------------------------------------------- */
298
299 #define DISCARD_NEW
300 void pushBottom(SparkPool* deque, StgClosurePtr elem);
301
302 /* enqueue an element. Should always succeed by resizing the array
303    (not implemented yet, silently fails in that case). */
304 void pushBottom(SparkPool* deque, StgClosurePtr elem) {
305   StgWord t;
306   StgClosurePtr* pos;
307   StgWord sz = deque->moduloSize; 
308   StgWord b = deque->bottom;
309
310   ASSERT_SPARK_POOL_INVARIANTS(deque); 
311
312   /* we try to avoid reading deque->top (accessed by all) and use
313      deque->topBound (accessed only by writer) instead. 
314      This is why we do not just call empty(deque) here.
315   */
316   t = deque->topBound;
317   if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */
318     /* could be full, check the real top value in this case */
319     t = deque->top;
320     deque->topBound = t;
321     if (b - t >= sz) { /* really no space left :-( */
322       /* reallocate the array, copying the values. Concurrent steal()s
323          will in the meantime use the old one and modify only top.
324          This means: we cannot safely free the old space! Can keep it
325          on a free list internally here...
326
327          Potential bug in combination with steal(): if array is
328          replaced, it is unclear which one concurrent steal operations
329          use. Must read the array base address in advance in steal().
330       */
331 #if defined(DISCARD_NEW)
332       ASSERT_SPARK_POOL_INVARIANTS(deque); 
333       return; /* for now, silently fail */
334 #else
335       /* could make room by incrementing the top position here.  In
336        * this case, should use CASTOP. If this fails, someone else has
337        * removed something, and new room will be available.
338        */
339       ASSERT_SPARK_POOL_INVARIANTS(deque); 
340 #endif
341     }
342   }
343   pos = (deque->elements) + (b & sz);
344   *pos = elem;
345   (deque->bottom)++;
346
347   ASSERT_SPARK_POOL_INVARIANTS(deque); 
348   return;
349 }
350
351
352 /* this is called as a direct C-call from Stg => we need to keep the
353    pool in a register (???) */
354 StgInt
355 newSpark (StgRegTable *reg, StgClosure *p)
356 {
357     SparkPool *pool = (reg->rCurrentTSO->cap->sparks);
358
359     /* I am not sure whether this is the right thing to do.
360      * Maybe it is better to exploit the tag information
361      * instead of throwing it away?
362      */
363     p = UNTAG_CLOSURE(p);
364
365     ASSERT_SPARK_POOL_INVARIANTS(pool);
366
367     if (closure_SHOULD_SPARK(p)) {
368       pushBottom(pool,p);
369     }   
370
371     ASSERT_SPARK_POOL_INVARIANTS(pool);
372     return 1;
373 }
374
375
376
377 /* --------------------------------------------------------------------------
378  * Remove all sparks from the spark queues which should not spark any
379  * more.  Called after GC. We assume exclusive access to the structure
380  * and replace  all sparks in the queue, see explanation below. At exit,
381  * the spark pool only contains sparkable closures.
382  * -------------------------------------------------------------------------- */
383
384 static void
385 pruneSparkQueue (Capability *cap)
386
387     SparkPool *pool;
388     StgClosurePtr spark, evacspark, *elements;
389     nat n, pruned_sparks; // stats only
390     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
391     
392     PAR_TICKY_MARK_SPARK_QUEUE_START();
393     
394     n = 0;
395     pruned_sparks = 0;
396     
397     pool = cap->sparks;
398     
399     debugTrace(DEBUG_sched,
400                "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
401                sparkPoolSize(pool), pool->bottom, pool->top);
402     ASSERT_SPARK_POOL_INVARIANTS(pool);
403
404     elements = pool->elements;
405
406     /* We have exclusive access to the structure here, so we can reset
407        bottom and top counters, and prune invalid sparks. Contents are
408        copied in-place if they are valuable, otherwise discarded. The
409        routine uses "real" indices t and b, starts by computing them
410        as the modulus size of top and bottom,
411
412        Copying:
413
414        At the beginning, the pool structure can look like this:
415        ( bottom % size >= top % size , no wrap-around)
416                   t          b
417        ___________***********_________________
418
419        or like this ( bottom % size < top % size, wrap-around )
420                   b         t
421        ***********__________******************
422        As we need to remove useless sparks anyway, we make one pass
423        between t and b, moving valuable content to b and subsequent
424        cells (wrapping around when the size is reached).
425
426                      b      t
427        ***********OOO_______XX_X__X?**********
428                      ^____move?____/
429
430        After this movement, botInd becomes the new bottom, and old
431        bottom becomes the new top index, both as indices in the array
432        size range.
433     */
434     // starting here
435     currInd = (pool->top) & (pool->moduloSize); // mod
436
437     // copies of evacuated closures go to space from botInd on
438     // we keep oldBotInd to know when to stop
439     oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
440
441     // on entry to loop, we are within the bounds
442     ASSERT( currInd < pool->size && botInd  < pool->size );
443
444     while (currInd != oldBotInd ) {
445       /* must use != here, wrap-around at size
446          subtle: loop not entered if queue empty
447        */
448
449       /* check element at currInd. if valuable, evacuate and move to
450          botInd, otherwise move on */
451       spark = elements[currInd];
452
453       /* if valuable work: shift inside the pool */
454       if ( closure_SHOULD_SPARK(spark) ) {
455         elements[botInd] = spark; // keep entry (new address)
456         botInd++;
457         n++;
458       } else { 
459         pruned_sparks++; // discard spark
460       }
461       currInd++;
462
463       // in the loop, we may reach the bounds, and instantly wrap around
464       ASSERT( currInd <= pool->size && botInd <= pool->size );
465       if ( currInd == pool->size ) { currInd = 0; }
466       if ( botInd == pool->size )  { botInd = 0;  }
467
468     } // while-loop over spark pool elements
469
470     ASSERT(currInd == oldBotInd);
471
472     pool->top = oldBotInd; // where we started writing
473     pool->topBound = pool->top;
474
475     pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
476     // first free place we did not use (corrected by wraparound)
477
478     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
479
480     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
481     
482     debugTrace(DEBUG_sched,
483                "new spark queue len=%d; (hd=%ld; tl=%ld)",
484                sparkPoolSize(pool), pool->bottom, pool->top);
485
486     ASSERT_SPARK_POOL_INVARIANTS(pool);
487 }
488
489 void
490 pruneSparkQueues (void)
491 {
492     nat i;
493     for (i = 0; i < n_capabilities; i++) {
494         pruneSparkQueue(&capabilities[i]);
495     }
496 }
497
498 /* GC for the spark pool, called inside Capability.c for all
499    capabilities in turn. Blindly "evac"s complete spark pool. */
500 void
501 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
502 {
503     StgClosure **sparkp;
504     SparkPool *pool;
505     StgWord top,bottom, modMask;
506     
507     pool = cap->sparks;
508
509     ASSERT_SPARK_POOL_INVARIANTS(pool);
510
511     top = pool->top;
512     bottom = pool->bottom;
513     sparkp = pool->elements;
514     modMask = pool->moduloSize;
515
516     while (top < bottom) {
517     /* call evac for all closures in range (wrap-around via modulo)
518      * In GHC-6.10, evac takes an additional 1st argument to hold a
519      * GC-specific register, see rts/sm/GC.c::mark_root()
520      */
521       evac( user , sparkp + (top & modMask) ); 
522       top++;
523     }
524
525     debugTrace(DEBUG_sched,
526                "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
527                sparkPoolSize(pool), pool->bottom, pool->top);
528 }
529
530 /* ----------------------------------------------------------------------------
531
532  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
533  * capabilities) and its size. Accesses all spark pools and equally
534  * distributes the sparks among them.
535  *
536  * Could be called after GC, before Cap. release, from scheduler. 
537  * -------------------------------------------------------------------------- */
538 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
539
540 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) {
541   barf("not implemented");
542 }
543
544 #else
545
546 StgInt
547 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
548 {
549     /* nothing */
550     return 1;
551 }
552
553
554 #endif /* PARALLEL_HASKELL || THREADED_RTS */
555
556
557 /* -----------------------------------------------------------------------------
558  * 
559  * GRAN & PARALLEL_HASKELL stuff beyond here.
560  *
561  *  TODO "nuke" this!
562  *
563  * -------------------------------------------------------------------------- */
564
565 #if defined(PARALLEL_HASKELL) || defined(GRAN)
566
567 static void slide_spark_pool( StgSparkPool *pool );
568
569 rtsBool
570 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
571 {
572   if (pool->tl == pool->lim)
573     slide_spark_pool(pool);
574
575   if (closure_SHOULD_SPARK(closure) && 
576       pool->tl < pool->lim) {
577     *(pool->tl++) = closure;
578
579 #if defined(PARALLEL_HASKELL)
580     // collect parallel global statistics (currently done together with GC stats)
581     if (RtsFlags.ParFlags.ParStats.Global &&
582         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
583       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
584       globalParStats.tot_sparks_created++;
585     }
586 #endif
587     return rtsTrue;
588   } else {
589 #if defined(PARALLEL_HASKELL)
590     // collect parallel global statistics (currently done together with GC stats)
591     if (RtsFlags.ParFlags.ParStats.Global &&
592         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
593       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
594       globalParStats.tot_sparks_ignored++;
595     }
596 #endif
597     return rtsFalse;
598   }
599 }
600
601 static void
602 slide_spark_pool( StgSparkPool *pool )
603 {
604   StgClosure **sparkp, **to_sparkp;
605
606   sparkp = pool->hd;
607   to_sparkp = pool->base;
608   while (sparkp < pool->tl) {
609     ASSERT(to_sparkp<=sparkp);
610     ASSERT(*sparkp!=NULL);
611     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
612
613     if (closure_SHOULD_SPARK(*sparkp)) {
614       *to_sparkp++ = *sparkp++;
615     } else {
616       sparkp++;
617     }
618   }
619   pool->hd = pool->base;
620   pool->tl = to_sparkp;
621 }
622
623 void
624 disposeSpark(spark)
625 StgClosure *spark;
626 {
627 #if !defined(THREADED_RTS)
628   Capability *cap;
629   StgSparkPool *pool;
630
631   cap = &MainRegTable;
632   pool = &(cap->rSparks);
633   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
634 #endif
635   ASSERT(spark != (StgClosure *)NULL);
636   /* Do nothing */
637 }
638
639
640 #elif defined(GRAN)
641
642 /* 
643    Search the spark queue of the proc in event for a spark that's worth
644    turning into a thread 
645    (was gimme_spark in the old RTS)
646 */
647 void
648 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
649 {
650    PEs proc = event->proc,       /* proc to search for work */
651        creator = event->creator; /* proc that requested work */
652    StgClosure* node;
653    rtsBool found;
654    rtsSparkQ spark_of_non_local_node = NULL, 
655              spark_of_non_local_node_prev = NULL, 
656              low_priority_spark = NULL, 
657              low_priority_spark_prev = NULL,
658              spark = NULL, prev = NULL;
659   
660    /* Choose a spark from the local spark queue */
661    prev = (rtsSpark*)NULL;
662    spark = pending_sparks_hds[proc];
663    found = rtsFalse;
664
665    // ToDo: check this code & implement local sparking !! -- HWL  
666    while (!found && spark != (rtsSpark*)NULL)
667      {
668        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
669               (prev==(rtsSpark*)NULL || prev->next==spark) &&
670               (spark->prev==prev));
671        node = spark->node;
672        if (!closure_SHOULD_SPARK(node)) 
673          {
674            IF_GRAN_DEBUG(checkSparkQ,
675                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
676                                spark, node));
677
678            if (RtsFlags.GranFlags.GranSimStats.Sparks)
679              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
680                               spark->node, spark->name, spark_queue_len(proc));
681   
682            ASSERT(spark != (rtsSpark*)NULL);
683            ASSERT(SparksAvail>0);
684            --SparksAvail;
685
686            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
687            spark = delete_from_sparkq (spark, proc, rtsTrue);
688            if (spark != (rtsSpark*)NULL)
689              prev = spark->prev;
690            continue;
691          }
692        /* -- node should eventually be sparked */
693        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
694                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
695          {
696            barf("Local sparking not yet implemented");
697
698            /* Remember first low priority spark */
699            if (spark_of_non_local_node==(rtsSpark*)NULL) {
700              spark_of_non_local_node_prev = prev;
701              spark_of_non_local_node = spark;
702               }
703   
704            if (spark->next == (rtsSpark*)NULL) { 
705              /* ASSERT(spark==SparkQueueTl);  just for testing */
706              prev = spark_of_non_local_node_prev;
707              spark = spark_of_non_local_node;
708              found = rtsTrue;
709              break;
710            }
711   
712 # if defined(GRAN) && defined(GRAN_CHECK)
713            /* Should never happen; just for testing 
714            if (spark==pending_sparks_tl) {
715              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
716                 stg_exit(EXIT_FAILURE);
717                 } */
718 # endif
719            prev = spark; 
720            spark = spark->next;
721            ASSERT(SparksAvail>0);
722            --SparksAvail;
723            continue;
724          }
725        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
726                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
727          {
728            if (RtsFlags.GranFlags.DoPrioritySparking)
729              barf("Priority sparking not yet implemented");
730
731            found = rtsTrue;
732          }
733 #if 0      
734        else /* only used if SparkPriority2 is defined */
735          {
736            /* ToDo: fix the code below and re-integrate it */
737            /* Remember first low priority spark */
738            if (low_priority_spark==(rtsSpark*)NULL) { 
739              low_priority_spark_prev = prev;
740              low_priority_spark = spark;
741            }
742   
743            if (spark->next == (rtsSpark*)NULL) { 
744                 /* ASSERT(spark==spark_queue_tl);  just for testing */
745              prev = low_priority_spark_prev;
746              spark = low_priority_spark;
747              found = rtsTrue;       /* take low pri spark => rc is 2  */
748              break;
749            }
750   
751            /* Should never happen; just for testing 
752            if (spark==pending_sparks_tl) {
753              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
754                 stg_exit(EXIT_FAILURE);
755              break;
756            } */                
757            prev = spark; 
758            spark = spark->next;
759
760            IF_GRAN_DEBUG(pri,
761                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
762                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
763                                spark->node, spark->name);)
764            }
765 #endif
766    }  /* while (spark!=NULL && !found) */
767
768    *spark_res = spark;
769    *found_res = found;
770 }
771
772 /*
773   Turn the spark into a thread.
774   In GranSim this basically means scheduling a StartThread event for the
775   node pointed to by the spark at some point in the future.
776   (was munch_spark in the old RTS)
777 */
778 rtsBool
779 activateSpark (rtsEvent *event, rtsSparkQ spark) 
780 {
781   PEs proc = event->proc,       /* proc to search for work */
782       creator = event->creator; /* proc that requested work */
783   StgTSO* tso;
784   StgClosure* node;
785   rtsTime spark_arrival_time;
786
787   /* 
788      We've found a node on PE proc requested by PE creator.
789      If proc==creator we can turn the spark into a thread immediately;
790      otherwise we schedule a MoveSpark event on the requesting PE
791   */
792      
793   /* DaH Qu' yIchen */
794   if (proc!=creator) { 
795
796     /* only possible if we simulate GUM style fishing */
797     ASSERT(RtsFlags.GranFlags.Fishing);
798
799     /* Message packing costs for sending a Fish; qeq jabbI'ID */
800     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
801   
802     if (RtsFlags.GranFlags.GranSimStats.Sparks)
803       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
804                        (StgTSO*)NULL, spark->node,
805                        spark->name, spark_queue_len(proc));
806
807     /* time of the spark arrival on the remote PE */
808     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
809
810     new_event(creator, proc, spark_arrival_time,
811               MoveSpark,
812               (StgTSO*)NULL, spark->node, spark);
813
814     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
815             
816   } else { /* proc==creator i.e. turn the spark into a thread */
817
818     if ( RtsFlags.GranFlags.GranSimStats.Global && 
819          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
820
821       globalGranStats.tot_low_pri_sparks++;
822       IF_GRAN_DEBUG(pri,
823                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
824                           spark->gran_info, 
825                           spark->node, spark->name));
826     } 
827     
828     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
829     
830     node = spark->node;
831     
832 # if 0
833     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
834     if (GARBAGE COLLECTION IS NECESSARY) {
835       /* Some kind of backoff needed here in case there's too little heap */
836 #  if defined(GRAN_CHECK) && defined(GRAN)
837       if (RtsFlags.GcFlags.giveStats)
838         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
839                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
840                 spark, node, spark->name);
841 #  endif
842       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
843                   FindWork,
844                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
845       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
846       GarbageCollect(GetRoots, rtsFalse);
847       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
848       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
849       spark = NULL;
850       return; /* was: continue; */ /* to the next event, eventually */
851     }
852 # endif
853     
854     if (RtsFlags.GranFlags.GranSimStats.Sparks)
855       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
856                        spark->node, spark->name,
857                        spark_queue_len(CurrentProc));
858     
859     new_event(proc, proc, CurrentTime[proc],
860               StartThread, 
861               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
862     
863     procStatus[proc] = Starting;
864   }
865 }
866
867 /* -------------------------------------------------------------------------
868    This is the main point where handling granularity information comes into
869    play. 
870    ------------------------------------------------------------------------- */
871
872 #define MAX_RAND_PRI    100
873
874 /* 
875    Granularity info transformers. 
876    Applied to the GRAN_INFO field of a spark.
877 */
878 STATIC_INLINE nat  ID(nat x) { return(x); };
879 STATIC_INLINE nat  INV(nat x) { return(-x); };
880 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
881 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
882
883 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
884 rtsSpark *
885 newSpark(node,name,gran_info,size_info,par_info,local)
886 StgClosure *node;
887 nat name, gran_info, size_info, par_info, local;
888 {
889   nat pri;
890   rtsSpark *newspark;
891
892   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
893         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
894         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
895                            ID(gran_info);
896
897   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
898        pri<RtsFlags.GranFlags.SparkPriority ) {
899     IF_GRAN_DEBUG(pri,
900       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
901               pri, RtsFlags.GranFlags.SparkPriority, node, name));
902     return ((rtsSpark*)NULL);
903   }
904
905   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
906   newspark->prev = newspark->next = (rtsSpark*)NULL;
907   newspark->node = node;
908   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
909   newspark->gran_info = pri;
910   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
911
912   if (RtsFlags.GranFlags.GranSimStats.Global) {
913     globalGranStats.tot_sparks_created++;
914     globalGranStats.sparks_created_on_PE[CurrentProc]++;
915   }
916
917   return(newspark);
918 }
919
920 void
921 disposeSpark(spark)
922 rtsSpark *spark;
923 {
924   ASSERT(spark!=NULL);
925   stgFree(spark);
926 }
927
928 void 
929 disposeSparkQ(spark)
930 rtsSparkQ spark;
931 {
932   if (spark==NULL) 
933     return;
934
935   disposeSparkQ(spark->next);
936
937 # ifdef GRAN_CHECK
938   if (SparksAvail < 0) {
939     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
940     print_spark(spark);
941   }
942 # endif
943
944   stgFree(spark);
945 }
946
947 /*
948    With PrioritySparking add_to_spark_queue performs an insert sort to keep
949    the spark queue sorted. Otherwise the spark is just added to the end of
950    the queue. 
951 */
952
953 void
954 add_to_spark_queue(spark)
955 rtsSpark *spark;
956 {
957   rtsSpark *prev = NULL, *next = NULL;
958   nat count = 0;
959   rtsBool found = rtsFalse;
960
961   if ( spark == (rtsSpark *)NULL ) {
962     return;
963   }
964
965   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
966     /* Priority sparking is enabled i.e. spark queues must be sorted */
967
968     for (prev = NULL, next = pending_sparks_hd, count=0;
969          (next != NULL) && 
970          !(found = (spark->gran_info >= next->gran_info));
971          prev = next, next = next->next, count++) 
972      {}
973
974   } else {   /* 'utQo' */
975     /* Priority sparking is disabled */
976     
977     found = rtsFalse;   /* to add it at the end */
978
979   }
980
981   if (found) {
982     /* next points to the first spark with a gran_info smaller than that
983        of spark; therefore, add spark before next into the spark queue */
984     spark->next = next;
985     if ( next == NULL ) {
986       pending_sparks_tl = spark;
987     } else {
988       next->prev = spark;
989     }
990     spark->prev = prev;
991     if ( prev == NULL ) {
992       pending_sparks_hd = spark;
993     } else {
994       prev->next = spark;
995     }
996   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
997     /* add the spark at the end of the spark queue */
998     spark->next = NULL;                        
999     spark->prev = pending_sparks_tl;
1000     if (pending_sparks_hd == NULL)
1001       pending_sparks_hd = spark;
1002     else
1003       pending_sparks_tl->next = spark;
1004     pending_sparks_tl = spark;    
1005   } 
1006   ++SparksAvail;
1007
1008   /* add costs for search in priority sparking */
1009   if (RtsFlags.GranFlags.DoPrioritySparking) {
1010     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
1011   }
1012
1013   IF_GRAN_DEBUG(checkSparkQ,
1014                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
1015                       spark, spark->node, CurrentProc);
1016                 print_sparkq_stats());
1017
1018 #  if defined(GRAN_CHECK)
1019   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
1020     for (prev = NULL, next =  pending_sparks_hd;
1021          (next != NULL);
1022          prev = next, next = next->next) 
1023       {}
1024     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
1025       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1026               spark,CurrentProc, 
1027               pending_sparks_tl, prev);
1028   }
1029 #  endif
1030
1031 #  if defined(GRAN_CHECK)
1032   /* Check if the sparkq is still sorted. Just for testing, really!  */
1033   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
1034        RtsFlags.GranFlags.Debug.pri ) {
1035     rtsBool sorted = rtsTrue;
1036     rtsSpark *prev, *next;
1037
1038     if (pending_sparks_hd == NULL ||
1039         pending_sparks_hd->next == NULL ) {
1040       /* just 1 elem => ok */
1041     } else {
1042       for (prev = pending_sparks_hd,
1043            next = pending_sparks_hd->next;
1044            (next != NULL) ;
1045            prev = next, next = next->next) {
1046         sorted = sorted && 
1047                  (prev->gran_info >= next->gran_info);
1048       }
1049     }
1050     if (!sorted) {
1051       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
1052               CurrentProc);
1053       print_sparkq(CurrentProc);
1054     }
1055   }
1056 #  endif
1057 }
1058
1059 nat
1060 spark_queue_len(proc) 
1061 PEs proc;
1062 {
1063  rtsSpark *prev, *spark;                     /* prev only for testing !! */
1064  nat len;
1065
1066  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
1067       spark != NULL; 
1068       len++, prev = spark, spark = spark->next)
1069    {}
1070
1071 #  if defined(GRAN_CHECK)
1072   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
1073     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
1074       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1075               proc, pending_sparks_tls[proc], prev);
1076 #  endif
1077
1078  return (len);
1079 }
1080
1081 /* 
1082    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
1083    hd and tl pointers of the spark queue. Returns a pointer to the next
1084    spark in the queue.
1085 */
1086 rtsSpark *
1087 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
1088 rtsSpark *spark;
1089 PEs p;
1090 rtsBool dispose_too;
1091 {
1092   rtsSpark *new_spark;
1093
1094   if (spark==NULL) 
1095     barf("delete_from_sparkq: trying to delete NULL spark\n");
1096
1097 #  if defined(GRAN_CHECK)
1098   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1099     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
1100             pending_sparks_hd, pending_sparks_tl,
1101             spark->prev, spark, spark->next, 
1102             (spark->next==NULL ? 0 : spark->next->prev));
1103   }
1104 #  endif
1105
1106   if (spark->prev==NULL) {
1107     /* spark is first spark of queue => adjust hd pointer */
1108     ASSERT(pending_sparks_hds[p]==spark);
1109     pending_sparks_hds[p] = spark->next;
1110   } else {
1111     spark->prev->next = spark->next;
1112   }
1113   if (spark->next==NULL) {
1114     ASSERT(pending_sparks_tls[p]==spark);
1115     /* spark is first spark of queue => adjust tl pointer */
1116     pending_sparks_tls[p] = spark->prev;
1117   } else {
1118     spark->next->prev = spark->prev;
1119   }
1120   new_spark = spark->next;
1121   
1122 #  if defined(GRAN_CHECK)
1123   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1124     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
1125             pending_sparks_hd, pending_sparks_tl,
1126             spark->prev, spark, spark->next, 
1127             (spark->next==NULL ? 0 : spark->next->prev), spark);
1128   }
1129 #  endif
1130
1131   if (dispose_too)
1132     disposeSpark(spark);
1133                   
1134   return new_spark;
1135 }
1136
1137 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
1138 void
1139 markSparkQueue(void)
1140
1141   StgClosure *MarkRoot(StgClosure *root); // prototype
1142   PEs p;
1143   rtsSpark *sp;
1144
1145   for (p=0; p<RtsFlags.GranFlags.proc; p++)
1146     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
1147       ASSERT(sp->node!=NULL);
1148       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
1149       // ToDo?: statistics gathering here (also for GUM!)
1150       sp->node = (StgClosure *)MarkRoot(sp->node);
1151     }
1152
1153   IF_DEBUG(gc,
1154            debugBelch("markSparkQueue: spark statistics at start of GC:");
1155            print_sparkq_stats());
1156 }
1157
1158 void
1159 print_spark(spark)
1160 rtsSpark *spark;
1161
1162   char str[16];
1163
1164   if (spark==NULL) {
1165     debugBelch("Spark: NIL\n");
1166     return;
1167   } else {
1168     sprintf(str,
1169             ((spark->node==NULL) ? "______" : "%#6lx"), 
1170             stgCast(StgPtr,spark->node));
1171
1172     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
1173             str, spark->name, 
1174             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
1175             spark->prev, spark->next);
1176   }
1177 }
1178
1179 void
1180 print_sparkq(proc)
1181 PEs proc;
1182 // rtsSpark *hd;
1183 {
1184   rtsSpark *x = pending_sparks_hds[proc];
1185
1186   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
1187   for (; x!=(rtsSpark*)NULL; x=x->next) {
1188     print_spark(x);
1189   }
1190 }
1191
1192 /* 
1193    Print a statistics of all spark queues.
1194 */
1195 void
1196 print_sparkq_stats(void)
1197 {
1198   PEs p;
1199
1200   debugBelch("SparkQs: [");
1201   for (p=0; p<RtsFlags.GranFlags.proc; p++)
1202     debugBelch(", PE %d: %d", p, spark_queue_len(p));
1203   debugBelch("\n");
1204 }
1205
1206 #endif