retreat the top/bottom fields of the spark pool in pruneSparkPool()
[ghc-hetmet.git] / rts / Sparks.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 2000-2008
4  *
5  * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6  *
7  * The implementation uses Double-Ended Queues with lock-free access
8  * (thereby often called "deque") as described in
9  *
10  * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
11  * SPAA'05, July 2005, Las Vegas, USA.
12  * ACM 1-58113-986-1/05/0007
13  *
14  * Author: Jost Berthold MSRC 07-09/2008
15  *
16  * The DeQue is held as a circular array with known length. Positions
17  * of top (read-end) and bottom (write-end) always increase, and the
18  * array is accessed with indices modulo array-size. While this bears
19  * the risk of overflow, we assume that (with 64 bit indices), a
20  * program must run very long to reach that point.
21  * 
22  * The write end of the queue (position bottom) can only be used with
23  * mutual exclusion, i.e. by exactly one caller at a time.  At this
24  * end, new items can be enqueued using pushBottom()/newSpark(), and
25  * removed using popBottom()/reclaimSpark() (the latter implying a cas
26  * synchronisation with potential concurrent readers for the case of
27  * just one element).
28  * 
29  * Multiple readers can steal()/findSpark() from the read end
30  * (position top), and are synchronised without a lock, based on a cas
31  * of the top position. One reader wins, the others return NULL for a
32  * failure.
33  * 
34  * Both popBottom and steal also return NULL when the queue is empty.
35  * 
36  -------------------------------------------------------------------------*/
37
38 #include "PosixSource.h"
39 #include "Rts.h"
40 #include "Storage.h"
41 #include "Schedule.h"
42 #include "SchedAPI.h"
43 #include "RtsFlags.h"
44 #include "RtsUtils.h"
45 #include "ParTicky.h"
46 #include "Trace.h"
47
48 #include "SMP.h" // for cas
49
50 #include "Sparks.h"
51
52 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
53
54 /* internal helpers ... */
55
56 static StgWord
57 roundUp2(StgWord val)
58 {
59   StgWord rounded = 1;
60
61   /* StgWord is unsigned anyway, only catch 0 */
62   if (val == 0) {
63     barf("DeQue,roundUp2: invalid size 0 requested");
64   }
65   /* at least 1 bit set, shift up to its place */
66   do {
67     rounded = rounded << 1;
68   } while (0 != (val = val>>1));
69   return rounded;
70 }
71
72 #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
73
74 /* -----------------------------------------------------------------------------
75  * 
76  * Initialising spark pools.
77  *
78  * -------------------------------------------------------------------------- */
79
80 /* constructor */
81 static SparkPool*
82 initPool(StgWord size)
83 {
84   StgWord realsize; 
85   SparkPool *q;
86
87   realsize = roundUp2(size); /* to compute modulo as a bitwise & */
88
89   q = (SparkPool*) stgMallocBytes(sizeof(SparkPool),   /* admin fields */
90                               "newSparkPool");
91   q->elements = (StgClosurePtr*) 
92                 stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
93                                "newSparkPool:data space");
94   q->top=0;
95   q->bottom=0;
96   q->topBound=0; /* read by writer, updated each time top is read */
97
98   q->size = realsize;  /* power of 2 */
99   q->moduloSize = realsize - 1; /* n % size == n & moduloSize  */
100
101   ASSERT_SPARK_POOL_INVARIANTS(q); 
102   return q;
103 }
104
105 void
106 initSparkPools( void )
107 {
108 #ifdef THREADED_RTS
109     /* walk over the capabilities, allocating a spark pool for each one */
110     nat i;
111     for (i = 0; i < n_capabilities; i++) {
112       capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
113     }
114 #else
115     /* allocate a single spark pool */
116     MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
117 #endif
118 }
119
120 void
121 freeSparkPool (SparkPool *pool)
122 {
123   /* should not interfere with concurrent findSpark() calls! And
124      nobody should use the pointer any more. We cross our fingers...*/
125   stgFree(pool->elements);
126   stgFree(pool);
127 }
128
129 /* -----------------------------------------------------------------------------
130  * 
131  * reclaimSpark: remove a spark from the write end of the queue.
132  * Returns the removed spark, and NULL if a race is lost or the pool
133  * empty.
134  *
135  * If only one spark is left in the pool, we synchronise with
136  * concurrently stealing threads by using cas to modify the top field.
137  * This routine should NEVER be called by a task which does not own
138  * the capability. Can this be checked here?
139  *
140  * -------------------------------------------------------------------------- */
141
142 StgClosure *
143 reclaimSpark (SparkPool *deque)
144 {
145   /* also a bit tricky, has to avoid concurrent steal() calls by
146      accessing top with cas, when there is only one element left */
147   StgWord t, b;
148   StgClosurePtr* pos;
149   long  currSize;
150   StgClosurePtr removed;
151
152   ASSERT_SPARK_POOL_INVARIANTS(deque); 
153
154   b = deque->bottom;
155   /* "decrement b as a test, see what happens" */
156   deque->bottom = --b; 
157   pos = (deque->elements) + (b & (deque->moduloSize));
158   t = deque->top; /* using topBound would give an *upper* bound, we
159                      need a lower bound. We use the real top here, but
160                      can update the topBound value */
161   deque->topBound = t;
162   currSize = b - t;
163   if (currSize < 0) { /* was empty before decrementing b, set b
164                          consistently and abort */
165     deque->bottom = t;
166     return NULL;
167   }
168   removed = *pos;
169   if (currSize > 0) { /* no danger, still elements in buffer after b-- */
170     return removed;
171   } 
172   /* otherwise, has someone meanwhile stolen the same (last) element?
173      Check and increment top value to know  */
174   if ( !(CASTOP(&(deque->top),t,t+1)) ) {
175     removed = NULL; /* no success, but continue adjusting bottom */
176   }
177   deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
178   deque->topBound = t+1; /* ...and cached top value as well */
179
180   ASSERT_SPARK_POOL_INVARIANTS(deque); 
181
182   return removed;
183 }
184
185 /* -----------------------------------------------------------------------------
186  * 
187  * tryStealSpark: try to steal a spark from a Capability.
188  *
189  * Returns a valid spark, or NULL if the pool was empty, and can
190  * occasionally return NULL if there was a race with another thread
191  * stealing from the same pool.  In this case, try again later.
192  *
193  -------------------------------------------------------------------------- */
194
195 static StgClosurePtr
196 steal(SparkPool *deque)
197 {
198   StgClosurePtr* pos;
199   StgClosurePtr* arraybase;
200   StgWord sz;
201   StgClosurePtr stolen;
202   StgWord b,t; 
203
204   ASSERT_SPARK_POOL_INVARIANTS(deque); 
205
206   b = deque->bottom;
207   t = deque->top;
208   if (b - t <= 0 ) { 
209     return NULL; /* already looks empty, abort */
210   }
211
212   /* now access array, see pushBottom() */
213   arraybase = deque->elements;
214   sz = deque->moduloSize;
215   pos = arraybase + (t & sz);  
216   stolen = *pos;
217
218   /* now decide whether we have won */
219   if ( !(CASTOP(&(deque->top),t,t+1)) ) {
220       /* lost the race, someon else has changed top in the meantime */
221       return NULL;
222   }  /* else: OK, top has been incremented by the cas call */
223
224   ASSERT_SPARK_POOL_INVARIANTS(deque); 
225   /* return stolen element */
226   return stolen;
227 }
228
229 StgClosure *
230 tryStealSpark (SparkPool *pool)
231 {
232   StgClosure *stolen;
233
234   do { 
235       stolen = steal(pool);
236   } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
237
238   return stolen;
239 }
240
241
242 /* -----------------------------------------------------------------------------
243  * 
244  * "guesses" whether a deque is empty. Can return false negatives in
245  *  presence of concurrent steal() calls, and false positives in
246  *  presence of a concurrent pushBottom().
247  *
248  * -------------------------------------------------------------------------- */
249
250 rtsBool
251 looksEmpty(SparkPool* deque)
252 {
253   StgWord t = deque->top;
254   StgWord b = deque->bottom;
255   /* try to prefer false negatives by reading top first */
256   return (b - t <= 0);
257   /* => array is *never* completely filled, always 1 place free! */
258 }
259
260 /* -----------------------------------------------------------------------------
261  * 
262  * Turn a spark into a real thread
263  *
264  * -------------------------------------------------------------------------- */
265
266 void
267 createSparkThread (Capability *cap, StgClosure *p)
268 {
269     StgTSO *tso;
270
271     tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
272     appendToRunQueue(cap,tso);
273     cap->sparks_converted++;
274 }
275
276 /* -----------------------------------------------------------------------------
277  * 
278  * Create a new spark
279  *
280  * -------------------------------------------------------------------------- */
281
282 #define DISCARD_NEW
283
284 /* enqueue an element. Should always succeed by resizing the array
285    (not implemented yet, silently fails in that case). */
286 static void
287 pushBottom (SparkPool* deque, StgClosurePtr elem)
288 {
289   StgWord t;
290   StgClosurePtr* pos;
291   StgWord sz = deque->moduloSize; 
292   StgWord b = deque->bottom;
293
294   ASSERT_SPARK_POOL_INVARIANTS(deque); 
295
296   /* we try to avoid reading deque->top (accessed by all) and use
297      deque->topBound (accessed only by writer) instead. 
298      This is why we do not just call empty(deque) here.
299   */
300   t = deque->topBound;
301   if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */
302     /* could be full, check the real top value in this case */
303     t = deque->top;
304     deque->topBound = t;
305     if (b - t >= sz) { /* really no space left :-( */
306       /* reallocate the array, copying the values. Concurrent steal()s
307          will in the meantime use the old one and modify only top.
308          This means: we cannot safely free the old space! Can keep it
309          on a free list internally here...
310
311          Potential bug in combination with steal(): if array is
312          replaced, it is unclear which one concurrent steal operations
313          use. Must read the array base address in advance in steal().
314       */
315 #if defined(DISCARD_NEW)
316       ASSERT_SPARK_POOL_INVARIANTS(deque); 
317       return; /* for now, silently fail */
318 #else
319       /* could make room by incrementing the top position here.  In
320        * this case, should use CASTOP. If this fails, someone else has
321        * removed something, and new room will be available.
322        */
323       ASSERT_SPARK_POOL_INVARIANTS(deque); 
324 #endif
325     }
326   }
327   pos = (deque->elements) + (b & sz);
328   *pos = elem;
329   (deque->bottom)++;
330
331   ASSERT_SPARK_POOL_INVARIANTS(deque); 
332   return;
333 }
334
335
336 /* --------------------------------------------------------------------------
337  * newSpark: create a new spark, as a result of calling "par"
338  * Called directly from STG.
339  * -------------------------------------------------------------------------- */
340
341 StgInt
342 newSpark (StgRegTable *reg, StgClosure *p)
343 {
344     Capability *cap = regTableToCapability(reg);
345     SparkPool *pool = cap->sparks;
346
347     /* I am not sure whether this is the right thing to do.
348      * Maybe it is better to exploit the tag information
349      * instead of throwing it away?
350      */
351     p = UNTAG_CLOSURE(p);
352
353     ASSERT_SPARK_POOL_INVARIANTS(pool);
354
355     if (closure_SHOULD_SPARK(p)) {
356       pushBottom(pool,p);
357     }   
358
359     cap->sparks_created++;
360
361     ASSERT_SPARK_POOL_INVARIANTS(pool);
362     return 1;
363 }
364
365
366
367 /* --------------------------------------------------------------------------
368  * Remove all sparks from the spark queues which should not spark any
369  * more.  Called after GC. We assume exclusive access to the structure
370  * and replace  all sparks in the queue, see explanation below. At exit,
371  * the spark pool only contains sparkable closures.
372  * -------------------------------------------------------------------------- */
373
374 void
375 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
376
377     SparkPool *pool;
378     StgClosurePtr spark, tmp, *elements;
379     nat n, pruned_sparks; // stats only
380     StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
381     const StgInfoTable *info;
382     
383     PAR_TICKY_MARK_SPARK_QUEUE_START();
384     
385     n = 0;
386     pruned_sparks = 0;
387     
388     pool = cap->sparks;
389     
390     // Take this opportunity to reset top/bottom modulo the size of
391     // the array, to avoid overflow.  This is only possible because no
392     // stealing is happening during GC.
393     pool->bottom  -= pool->top & ~pool->moduloSize;
394     pool->top     &= pool->moduloSize;
395     pool->topBound = pool->top;
396
397     debugTrace(DEBUG_sched,
398                "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
399                sparkPoolSize(pool), pool->bottom, pool->top);
400     ASSERT_SPARK_POOL_INVARIANTS(pool);
401
402     elements = pool->elements;
403
404     /* We have exclusive access to the structure here, so we can reset
405        bottom and top counters, and prune invalid sparks. Contents are
406        copied in-place if they are valuable, otherwise discarded. The
407        routine uses "real" indices t and b, starts by computing them
408        as the modulus size of top and bottom,
409
410        Copying:
411
412        At the beginning, the pool structure can look like this:
413        ( bottom % size >= top % size , no wrap-around)
414                   t          b
415        ___________***********_________________
416
417        or like this ( bottom % size < top % size, wrap-around )
418                   b         t
419        ***********__________******************
420        As we need to remove useless sparks anyway, we make one pass
421        between t and b, moving valuable content to b and subsequent
422        cells (wrapping around when the size is reached).
423
424                      b      t
425        ***********OOO_______XX_X__X?**********
426                      ^____move?____/
427
428        After this movement, botInd becomes the new bottom, and old
429        bottom becomes the new top index, both as indices in the array
430        size range.
431     */
432     // starting here
433     currInd = (pool->top) & (pool->moduloSize); // mod
434
435     // copies of evacuated closures go to space from botInd on
436     // we keep oldBotInd to know when to stop
437     oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
438
439     // on entry to loop, we are within the bounds
440     ASSERT( currInd < pool->size && botInd  < pool->size );
441
442     while (currInd != oldBotInd ) {
443       /* must use != here, wrap-around at size
444          subtle: loop not entered if queue empty
445        */
446
447       /* check element at currInd. if valuable, evacuate and move to
448          botInd, otherwise move on */
449       spark = elements[currInd];
450
451       // We have to be careful here: in the parallel GC, another
452       // thread might evacuate this closure while we're looking at it,
453       // so grab the info pointer just once.
454       info = spark->header.info;
455       if (IS_FORWARDING_PTR(info)) {
456           tmp = (StgClosure*)UN_FORWARDING_PTR(info);
457           /* if valuable work: shift inside the pool */
458           if (closure_SHOULD_SPARK(tmp)) {
459               elements[botInd] = tmp; // keep entry (new address)
460               botInd++;
461               n++;
462           } else {
463               pruned_sparks++; // discard spark
464               cap->sparks_pruned++;
465           }
466       } else {
467           if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
468               elements[botInd] = spark; // keep entry (new address)
469               evac (user, &elements[botInd]);
470               botInd++;
471               n++;
472           } else {
473               pruned_sparks++; // discard spark
474               cap->sparks_pruned++;
475           }
476       }
477       currInd++;
478
479       // in the loop, we may reach the bounds, and instantly wrap around
480       ASSERT( currInd <= pool->size && botInd <= pool->size );
481       if ( currInd == pool->size ) { currInd = 0; }
482       if ( botInd == pool->size )  { botInd = 0;  }
483
484     } // while-loop over spark pool elements
485
486     ASSERT(currInd == oldBotInd);
487
488     pool->top = oldBotInd; // where we started writing
489     pool->topBound = pool->top;
490
491     pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
492     // first free place we did not use (corrected by wraparound)
493
494     PAR_TICKY_MARK_SPARK_QUEUE_END(n);
495
496     debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
497     
498     debugTrace(DEBUG_sched,
499                "new spark queue len=%d; (hd=%ld; tl=%ld)",
500                sparkPoolSize(pool), pool->bottom, pool->top);
501
502     ASSERT_SPARK_POOL_INVARIANTS(pool);
503 }
504
505 /* GC for the spark pool, called inside Capability.c for all
506    capabilities in turn. Blindly "evac"s complete spark pool. */
507 void
508 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
509 {
510     StgClosure **sparkp;
511     SparkPool *pool;
512     StgWord top,bottom, modMask;
513     
514     pool = cap->sparks;
515
516     ASSERT_SPARK_POOL_INVARIANTS(pool);
517
518     top = pool->top;
519     bottom = pool->bottom;
520     sparkp = pool->elements;
521     modMask = pool->moduloSize;
522
523     while (top < bottom) {
524     /* call evac for all closures in range (wrap-around via modulo)
525      * In GHC-6.10, evac takes an additional 1st argument to hold a
526      * GC-specific register, see rts/sm/GC.c::mark_root()
527      */
528       evac( user , sparkp + (top & modMask) ); 
529       top++;
530     }
531
532     debugTrace(DEBUG_sched,
533                "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
534                sparkPoolSize(pool), pool->bottom, pool->top);
535 }
536
537 /* ----------------------------------------------------------------------------
538  * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
539  * capabilities) and its size. Accesses all spark pools and equally
540  * distributes the sparks among them.
541  *
542  * Could be called after GC, before Cap. release, from scheduler. 
543  * -------------------------------------------------------------------------- */
544 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
545
546 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
547                            Capability caps[] STG_UNUSED) {
548   barf("not implemented");
549 }
550
551 #else
552
553 StgInt
554 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
555 {
556     /* nothing */
557     return 1;
558 }
559
560
561 #endif /* PARALLEL_HASKELL || THREADED_RTS */
562
563
564 /* -----------------------------------------------------------------------------
565  * 
566  * GRAN & PARALLEL_HASKELL stuff beyond here.
567  *
568  *  TODO "nuke" this!
569  *
570  * -------------------------------------------------------------------------- */
571
572 #if defined(PARALLEL_HASKELL) || defined(GRAN)
573
574 static void slide_spark_pool( StgSparkPool *pool );
575
576 rtsBool
577 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
578 {
579   if (pool->tl == pool->lim)
580     slide_spark_pool(pool);
581
582   if (closure_SHOULD_SPARK(closure) && 
583       pool->tl < pool->lim) {
584     *(pool->tl++) = closure;
585
586 #if defined(PARALLEL_HASKELL)
587     // collect parallel global statistics (currently done together with GC stats)
588     if (RtsFlags.ParFlags.ParStats.Global &&
589         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
590       // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); 
591       globalParStats.tot_sparks_created++;
592     }
593 #endif
594     return rtsTrue;
595   } else {
596 #if defined(PARALLEL_HASKELL)
597     // collect parallel global statistics (currently done together with GC stats)
598     if (RtsFlags.ParFlags.ParStats.Global &&
599         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
600       //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); 
601       globalParStats.tot_sparks_ignored++;
602     }
603 #endif
604     return rtsFalse;
605   }
606 }
607
608 static void
609 slide_spark_pool( StgSparkPool *pool )
610 {
611   StgClosure **sparkp, **to_sparkp;
612
613   sparkp = pool->hd;
614   to_sparkp = pool->base;
615   while (sparkp < pool->tl) {
616     ASSERT(to_sparkp<=sparkp);
617     ASSERT(*sparkp!=NULL);
618     ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
619
620     if (closure_SHOULD_SPARK(*sparkp)) {
621       *to_sparkp++ = *sparkp++;
622     } else {
623       sparkp++;
624     }
625   }
626   pool->hd = pool->base;
627   pool->tl = to_sparkp;
628 }
629
630 void
631 disposeSpark(spark)
632 StgClosure *spark;
633 {
634 #if !defined(THREADED_RTS)
635   Capability *cap;
636   StgSparkPool *pool;
637
638   cap = &MainRegTable;
639   pool = &(cap->rSparks);
640   ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
641 #endif
642   ASSERT(spark != (StgClosure *)NULL);
643   /* Do nothing */
644 }
645
646
647 #elif defined(GRAN)
648
649 /* 
650    Search the spark queue of the proc in event for a spark that's worth
651    turning into a thread 
652    (was gimme_spark in the old RTS)
653 */
654 void
655 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
656 {
657    PEs proc = event->proc,       /* proc to search for work */
658        creator = event->creator; /* proc that requested work */
659    StgClosure* node;
660    rtsBool found;
661    rtsSparkQ spark_of_non_local_node = NULL, 
662              spark_of_non_local_node_prev = NULL, 
663              low_priority_spark = NULL, 
664              low_priority_spark_prev = NULL,
665              spark = NULL, prev = NULL;
666   
667    /* Choose a spark from the local spark queue */
668    prev = (rtsSpark*)NULL;
669    spark = pending_sparks_hds[proc];
670    found = rtsFalse;
671
672    // ToDo: check this code & implement local sparking !! -- HWL  
673    while (!found && spark != (rtsSpark*)NULL)
674      {
675        ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
676               (prev==(rtsSpark*)NULL || prev->next==spark) &&
677               (spark->prev==prev));
678        node = spark->node;
679        if (!closure_SHOULD_SPARK(node)) 
680          {
681            IF_GRAN_DEBUG(checkSparkQ,
682                          debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
683                                spark, node));
684
685            if (RtsFlags.GranFlags.GranSimStats.Sparks)
686              DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
687                               spark->node, spark->name, spark_queue_len(proc));
688   
689            ASSERT(spark != (rtsSpark*)NULL);
690            ASSERT(SparksAvail>0);
691            --SparksAvail;
692
693            ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
694            spark = delete_from_sparkq (spark, proc, rtsTrue);
695            if (spark != (rtsSpark*)NULL)
696              prev = spark->prev;
697            continue;
698          }
699        /* -- node should eventually be sparked */
700        else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes && 
701                !IS_LOCAL_TO(PROCS(node),CurrentProc)) 
702          {
703            barf("Local sparking not yet implemented");
704
705            /* Remember first low priority spark */
706            if (spark_of_non_local_node==(rtsSpark*)NULL) {
707              spark_of_non_local_node_prev = prev;
708              spark_of_non_local_node = spark;
709               }
710   
711            if (spark->next == (rtsSpark*)NULL) { 
712              /* ASSERT(spark==SparkQueueTl);  just for testing */
713              prev = spark_of_non_local_node_prev;
714              spark = spark_of_non_local_node;
715              found = rtsTrue;
716              break;
717            }
718   
719 # if defined(GRAN) && defined(GRAN_CHECK)
720            /* Should never happen; just for testing 
721            if (spark==pending_sparks_tl) {
722              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
723                 stg_exit(EXIT_FAILURE);
724                 } */
725 # endif
726            prev = spark; 
727            spark = spark->next;
728            ASSERT(SparksAvail>0);
729            --SparksAvail;
730            continue;
731          }
732        else if ( RtsFlags.GranFlags.DoPrioritySparking || 
733                  (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
734          {
735            if (RtsFlags.GranFlags.DoPrioritySparking)
736              barf("Priority sparking not yet implemented");
737
738            found = rtsTrue;
739          }
740 #if 0      
741        else /* only used if SparkPriority2 is defined */
742          {
743            /* ToDo: fix the code below and re-integrate it */
744            /* Remember first low priority spark */
745            if (low_priority_spark==(rtsSpark*)NULL) { 
746              low_priority_spark_prev = prev;
747              low_priority_spark = spark;
748            }
749   
750            if (spark->next == (rtsSpark*)NULL) { 
751                 /* ASSERT(spark==spark_queue_tl);  just for testing */
752              prev = low_priority_spark_prev;
753              spark = low_priority_spark;
754              found = rtsTrue;       /* take low pri spark => rc is 2  */
755              break;
756            }
757   
758            /* Should never happen; just for testing 
759            if (spark==pending_sparks_tl) {
760              debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
761                 stg_exit(EXIT_FAILURE);
762              break;
763            } */                
764            prev = spark; 
765            spark = spark->next;
766
767            IF_GRAN_DEBUG(pri,
768                          debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", 
769                                spark->gran_info, RtsFlags.GranFlags.SparkPriority, 
770                                spark->node, spark->name);)
771            }
772 #endif
773    }  /* while (spark!=NULL && !found) */
774
775    *spark_res = spark;
776    *found_res = found;
777 }
778
779 /*
780   Turn the spark into a thread.
781   In GranSim this basically means scheduling a StartThread event for the
782   node pointed to by the spark at some point in the future.
783   (was munch_spark in the old RTS)
784 */
785 rtsBool
786 activateSpark (rtsEvent *event, rtsSparkQ spark) 
787 {
788   PEs proc = event->proc,       /* proc to search for work */
789       creator = event->creator; /* proc that requested work */
790   StgTSO* tso;
791   StgClosure* node;
792   rtsTime spark_arrival_time;
793
794   /* 
795      We've found a node on PE proc requested by PE creator.
796      If proc==creator we can turn the spark into a thread immediately;
797      otherwise we schedule a MoveSpark event on the requesting PE
798   */
799      
800   /* DaH Qu' yIchen */
801   if (proc!=creator) { 
802
803     /* only possible if we simulate GUM style fishing */
804     ASSERT(RtsFlags.GranFlags.Fishing);
805
806     /* Message packing costs for sending a Fish; qeq jabbI'ID */
807     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
808   
809     if (RtsFlags.GranFlags.GranSimStats.Sparks)
810       DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
811                        (StgTSO*)NULL, spark->node,
812                        spark->name, spark_queue_len(proc));
813
814     /* time of the spark arrival on the remote PE */
815     spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
816
817     new_event(creator, proc, spark_arrival_time,
818               MoveSpark,
819               (StgTSO*)NULL, spark->node, spark);
820
821     CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
822             
823   } else { /* proc==creator i.e. turn the spark into a thread */
824
825     if ( RtsFlags.GranFlags.GranSimStats.Global && 
826          spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
827
828       globalGranStats.tot_low_pri_sparks++;
829       IF_GRAN_DEBUG(pri,
830                     debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
831                           spark->gran_info, 
832                           spark->node, spark->name));
833     } 
834     
835     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
836     
837     node = spark->node;
838     
839 # if 0
840     /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
841     if (GARBAGE COLLECTION IS NECESSARY) {
842       /* Some kind of backoff needed here in case there's too little heap */
843 #  if defined(GRAN_CHECK) && defined(GRAN)
844       if (RtsFlags.GcFlags.giveStats)
845         fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p;  name=%u\n", 
846                 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
847                 spark, node, spark->name);
848 #  endif
849       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
850                   FindWork,
851                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
852       barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
853       GarbageCollect(GetRoots, rtsFalse);
854       // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
855       // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
856       spark = NULL;
857       return; /* was: continue; */ /* to the next event, eventually */
858     }
859 # endif
860     
861     if (RtsFlags.GranFlags.GranSimStats.Sparks)
862       DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
863                        spark->node, spark->name,
864                        spark_queue_len(CurrentProc));
865     
866     new_event(proc, proc, CurrentTime[proc],
867               StartThread, 
868               END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
869     
870     procStatus[proc] = Starting;
871   }
872 }
873
874 /* -------------------------------------------------------------------------
875    This is the main point where handling granularity information comes into
876    play. 
877    ------------------------------------------------------------------------- */
878
879 #define MAX_RAND_PRI    100
880
881 /* 
882    Granularity info transformers. 
883    Applied to the GRAN_INFO field of a spark.
884 */
885 STATIC_INLINE nat  ID(nat x) { return(x); };
886 STATIC_INLINE nat  INV(nat x) { return(-x); };
887 STATIC_INLINE nat  IGNORE(nat x) { return (0); };
888 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
889
890 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
891 rtsSpark *
892 newSpark(node,name,gran_info,size_info,par_info,local)
893 StgClosure *node;
894 nat name, gran_info, size_info, par_info, local;
895 {
896   nat pri;
897   rtsSpark *newspark;
898
899   pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
900         RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
901         RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
902                            ID(gran_info);
903
904   if ( RtsFlags.GranFlags.SparkPriority!=0 && 
905        pri<RtsFlags.GranFlags.SparkPriority ) {
906     IF_GRAN_DEBUG(pri,
907       debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n", 
908               pri, RtsFlags.GranFlags.SparkPriority, node, name));
909     return ((rtsSpark*)NULL);
910   }
911
912   newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
913   newspark->prev = newspark->next = (rtsSpark*)NULL;
914   newspark->node = node;
915   newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
916   newspark->gran_info = pri;
917   newspark->global = !local;      /* Check that with parAt, parAtAbs !!*/
918
919   if (RtsFlags.GranFlags.GranSimStats.Global) {
920     globalGranStats.tot_sparks_created++;
921     globalGranStats.sparks_created_on_PE[CurrentProc]++;
922   }
923
924   return(newspark);
925 }
926
927 void
928 disposeSpark(spark)
929 rtsSpark *spark;
930 {
931   ASSERT(spark!=NULL);
932   stgFree(spark);
933 }
934
935 void 
936 disposeSparkQ(spark)
937 rtsSparkQ spark;
938 {
939   if (spark==NULL) 
940     return;
941
942   disposeSparkQ(spark->next);
943
944 # ifdef GRAN_CHECK
945   if (SparksAvail < 0) {
946     debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
947     print_spark(spark);
948   }
949 # endif
950
951   stgFree(spark);
952 }
953
954 /*
955    With PrioritySparking add_to_spark_queue performs an insert sort to keep
956    the spark queue sorted. Otherwise the spark is just added to the end of
957    the queue. 
958 */
959
960 void
961 add_to_spark_queue(spark)
962 rtsSpark *spark;
963 {
964   rtsSpark *prev = NULL, *next = NULL;
965   nat count = 0;
966   rtsBool found = rtsFalse;
967
968   if ( spark == (rtsSpark *)NULL ) {
969     return;
970   }
971
972   if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
973     /* Priority sparking is enabled i.e. spark queues must be sorted */
974
975     for (prev = NULL, next = pending_sparks_hd, count=0;
976          (next != NULL) && 
977          !(found = (spark->gran_info >= next->gran_info));
978          prev = next, next = next->next, count++) 
979      {}
980
981   } else {   /* 'utQo' */
982     /* Priority sparking is disabled */
983     
984     found = rtsFalse;   /* to add it at the end */
985
986   }
987
988   if (found) {
989     /* next points to the first spark with a gran_info smaller than that
990        of spark; therefore, add spark before next into the spark queue */
991     spark->next = next;
992     if ( next == NULL ) {
993       pending_sparks_tl = spark;
994     } else {
995       next->prev = spark;
996     }
997     spark->prev = prev;
998     if ( prev == NULL ) {
999       pending_sparks_hd = spark;
1000     } else {
1001       prev->next = spark;
1002     }
1003   } else {  /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
1004     /* add the spark at the end of the spark queue */
1005     spark->next = NULL;                        
1006     spark->prev = pending_sparks_tl;
1007     if (pending_sparks_hd == NULL)
1008       pending_sparks_hd = spark;
1009     else
1010       pending_sparks_tl->next = spark;
1011     pending_sparks_tl = spark;    
1012   } 
1013   ++SparksAvail;
1014
1015   /* add costs for search in priority sparking */
1016   if (RtsFlags.GranFlags.DoPrioritySparking) {
1017     CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
1018   }
1019
1020   IF_GRAN_DEBUG(checkSparkQ,
1021                 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
1022                       spark, spark->node, CurrentProc);
1023                 print_sparkq_stats());
1024
1025 #  if defined(GRAN_CHECK)
1026   if (RtsFlags.GranFlags.Debug.checkSparkQ) {
1027     for (prev = NULL, next =  pending_sparks_hd;
1028          (next != NULL);
1029          prev = next, next = next->next) 
1030       {}
1031     if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
1032       debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1033               spark,CurrentProc, 
1034               pending_sparks_tl, prev);
1035   }
1036 #  endif
1037
1038 #  if defined(GRAN_CHECK)
1039   /* Check if the sparkq is still sorted. Just for testing, really!  */
1040   if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
1041        RtsFlags.GranFlags.Debug.pri ) {
1042     rtsBool sorted = rtsTrue;
1043     rtsSpark *prev, *next;
1044
1045     if (pending_sparks_hd == NULL ||
1046         pending_sparks_hd->next == NULL ) {
1047       /* just 1 elem => ok */
1048     } else {
1049       for (prev = pending_sparks_hd,
1050            next = pending_sparks_hd->next;
1051            (next != NULL) ;
1052            prev = next, next = next->next) {
1053         sorted = sorted && 
1054                  (prev->gran_info >= next->gran_info);
1055       }
1056     }
1057     if (!sorted) {
1058       debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
1059               CurrentProc);
1060       print_sparkq(CurrentProc);
1061     }
1062   }
1063 #  endif
1064 }
1065
1066 nat
1067 spark_queue_len(proc) 
1068 PEs proc;
1069 {
1070  rtsSpark *prev, *spark;                     /* prev only for testing !! */
1071  nat len;
1072
1073  for (len = 0, prev = NULL, spark = pending_sparks_hds[proc]; 
1074       spark != NULL; 
1075       len++, prev = spark, spark = spark->next)
1076    {}
1077
1078 #  if defined(GRAN_CHECK)
1079   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) 
1080     if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
1081       debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1082               proc, pending_sparks_tls[proc], prev);
1083 #  endif
1084
1085  return (len);
1086 }
1087
1088 /* 
1089    Take spark out of the spark queue on PE p and nuke the spark. Adjusts
1090    hd and tl pointers of the spark queue. Returns a pointer to the next
1091    spark in the queue.
1092 */
1093 rtsSpark *
1094 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
1095 rtsSpark *spark;
1096 PEs p;
1097 rtsBool dispose_too;
1098 {
1099   rtsSpark *new_spark;
1100
1101   if (spark==NULL) 
1102     barf("delete_from_sparkq: trying to delete NULL spark\n");
1103
1104 #  if defined(GRAN_CHECK)
1105   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1106     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
1107             pending_sparks_hd, pending_sparks_tl,
1108             spark->prev, spark, spark->next, 
1109             (spark->next==NULL ? 0 : spark->next->prev));
1110   }
1111 #  endif
1112
1113   if (spark->prev==NULL) {
1114     /* spark is first spark of queue => adjust hd pointer */
1115     ASSERT(pending_sparks_hds[p]==spark);
1116     pending_sparks_hds[p] = spark->next;
1117   } else {
1118     spark->prev->next = spark->next;
1119   }
1120   if (spark->next==NULL) {
1121     ASSERT(pending_sparks_tls[p]==spark);
1122     /* spark is first spark of queue => adjust tl pointer */
1123     pending_sparks_tls[p] = spark->prev;
1124   } else {
1125     spark->next->prev = spark->prev;
1126   }
1127   new_spark = spark->next;
1128   
1129 #  if defined(GRAN_CHECK)
1130   if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1131     debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
1132             pending_sparks_hd, pending_sparks_tl,
1133             spark->prev, spark, spark->next, 
1134             (spark->next==NULL ? 0 : spark->next->prev), spark);
1135   }
1136 #  endif
1137
1138   if (dispose_too)
1139     disposeSpark(spark);
1140                   
1141   return new_spark;
1142 }
1143
1144 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
1145 void
1146 markSparkQueue(void)
1147
1148   StgClosure *MarkRoot(StgClosure *root); // prototype
1149   PEs p;
1150   rtsSpark *sp;
1151
1152   for (p=0; p<RtsFlags.GranFlags.proc; p++)
1153     for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
1154       ASSERT(sp->node!=NULL);
1155       ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
1156       // ToDo?: statistics gathering here (also for GUM!)
1157       sp->node = (StgClosure *)MarkRoot(sp->node);
1158     }
1159
1160   IF_DEBUG(gc,
1161            debugBelch("markSparkQueue: spark statistics at start of GC:");
1162            print_sparkq_stats());
1163 }
1164
1165 void
1166 print_spark(spark)
1167 rtsSpark *spark;
1168
1169   char str[16];
1170
1171   if (spark==NULL) {
1172     debugBelch("Spark: NIL\n");
1173     return;
1174   } else {
1175     sprintf(str,
1176             ((spark->node==NULL) ? "______" : "%#6lx"), 
1177             stgCast(StgPtr,spark->node));
1178
1179     debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
1180             str, spark->name, 
1181             ((spark->global)==rtsTrue?"True":"False"), spark->creator, 
1182             spark->prev, spark->next);
1183   }
1184 }
1185
1186 void
1187 print_sparkq(proc)
1188 PEs proc;
1189 // rtsSpark *hd;
1190 {
1191   rtsSpark *x = pending_sparks_hds[proc];
1192
1193   debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
1194   for (; x!=(rtsSpark*)NULL; x=x->next) {
1195     print_spark(x);
1196   }
1197 }
1198
1199 /* 
1200    Print a statistics of all spark queues.
1201 */
1202 void
1203 print_sparkq_stats(void)
1204 {
1205   PEs p;
1206
1207   debugBelch("SparkQs: [");
1208   for (p=0; p<RtsFlags.GranFlags.proc; p++)
1209     debugBelch(", PE %d: %d", p, spark_queue_len(p));
1210   debugBelch("\n");
1211 }
1212
1213 #endif