1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2000-2008
5 * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
7 * The implementation uses Double-Ended Queues with lock-free access
8 * (thereby often called "deque") as described in
10 * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
11 * SPAA'05, July 2005, Las Vegas, USA.
12 * ACM 1-58113-986-1/05/0007
14 * Author: Jost Berthold MSRC 07-09/2008
16 * The DeQue is held as a circular array with known length. Positions
17 * of top (read-end) and bottom (write-end) always increase, and the
18 * array is accessed with indices modulo array-size. While this bears
19 * the risk of overflow, we assume that (with 64 bit indices), a
20 * program must run very long to reach that point.
22 * The write end of the queue (position bottom) can only be used with
23 * mutual exclusion, i.e. by exactly one caller at a time. At this
24 * end, new items can be enqueued using pushBottom()/newSpark(), and
25 * removed using popBottom()/reclaimSpark() (the latter implying a cas
26 * synchronisation with potential concurrent readers for the case of
29 * Multiple readers can steal()/findSpark() from the read end
30 * (position top), and are synchronised without a lock, based on a cas
31 * of the top position. One reader wins, the others return NULL for a
34 * Both popBottom and steal also return NULL when the queue is empty.
36 -------------------------------------------------------------------------*/
38 #include "PosixSource.h"
49 #include "SMP.h" // for cas
53 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
55 /* internal helpers ... */
62 /* StgWord is unsigned anyway, only catch 0 */
64 barf("DeQue,roundUp2: invalid size 0 requested");
66 /* at least 1 bit set, shift up to its place */
68 rounded = rounded << 1;
69 } while (0 != (val = val>>1));
73 #define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
75 /* -----------------------------------------------------------------------------
77 * Initialising spark pools.
79 * -------------------------------------------------------------------------- */
83 initPool(StgWord size)
88 realsize = roundUp2(size); /* to compute modulo as a bitwise & */
90 q = (SparkPool*) stgMallocBytes(sizeof(SparkPool), /* admin fields */
92 q->elements = (StgClosurePtr*)
93 stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
94 "newSparkPool:data space");
97 q->topBound=0; /* read by writer, updated each time top is read */
99 q->size = realsize; /* power of 2 */
100 q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
102 ASSERT_SPARK_POOL_INVARIANTS(q);
107 initSparkPools( void )
110 /* walk over the capabilities, allocating a spark pool for each one */
112 for (i = 0; i < n_capabilities; i++) {
113 capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
116 /* allocate a single spark pool */
117 MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
122 freeSparkPool (SparkPool *pool)
124 /* should not interfere with concurrent findSpark() calls! And
125 nobody should use the pointer any more. We cross our fingers...*/
126 stgFree(pool->elements);
130 /* -----------------------------------------------------------------------------
132 * reclaimSpark: remove a spark from the write end of the queue.
133 * Returns the removed spark, and NULL if a race is lost or the pool
136 * If only one spark is left in the pool, we synchronise with
137 * concurrently stealing threads by using cas to modify the top field.
138 * This routine should NEVER be called by a task which does not own
139 * the capability. Can this be checked here?
141 * -------------------------------------------------------------------------- */
144 reclaimSpark (SparkPool *deque)
146 /* also a bit tricky, has to avoid concurrent steal() calls by
147 accessing top with cas, when there is only one element left */
151 StgClosurePtr removed;
153 ASSERT_SPARK_POOL_INVARIANTS(deque);
156 /* "decrement b as a test, see what happens" */
158 pos = (deque->elements) + (b & (deque->moduloSize));
159 t = deque->top; /* using topBound would give an *upper* bound, we
160 need a lower bound. We use the real top here, but
161 can update the topBound value */
164 if (currSize < 0) { /* was empty before decrementing b, set b
165 consistently and abort */
170 if (currSize > 0) { /* no danger, still elements in buffer after b-- */
173 /* otherwise, has someone meanwhile stolen the same (last) element?
174 Check and increment top value to know */
175 if ( !(CASTOP(&(deque->top),t,t+1)) ) {
176 removed = NULL; /* no success, but continue adjusting bottom */
178 deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
179 deque->topBound = t+1; /* ...and cached top value as well */
181 ASSERT_SPARK_POOL_INVARIANTS(deque);
186 /* -----------------------------------------------------------------------------
188 * tryStealSpark: try to steal a spark from a Capability.
190 * Returns a valid spark, or NULL if the pool was empty, and can
191 * occasionally return NULL if there was a race with another thread
192 * stealing from the same pool. In this case, try again later.
194 -------------------------------------------------------------------------- */
197 steal(SparkPool *deque)
200 StgClosurePtr* arraybase;
202 StgClosurePtr stolen;
205 // Can't do this on someone else's spark pool:
206 // ASSERT_SPARK_POOL_INVARIANTS(deque);
211 // NB. b and t are unsigned; we need a signed value for the test
213 if ((long)b - (long)t <= 0 ) {
214 return NULL; /* already looks empty, abort */
217 /* now access array, see pushBottom() */
218 arraybase = deque->elements;
219 sz = deque->moduloSize;
220 pos = arraybase + (t & sz);
223 /* now decide whether we have won */
224 if ( !(CASTOP(&(deque->top),t,t+1)) ) {
225 /* lost the race, someon else has changed top in the meantime */
227 } /* else: OK, top has been incremented by the cas call */
229 // Can't do this on someone else's spark pool:
230 // ASSERT_SPARK_POOL_INVARIANTS(deque);
232 /* return stolen element */
237 tryStealSpark (Capability *cap)
239 SparkPool *pool = cap->sparks;
243 stolen = steal(pool);
244 } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
250 /* -----------------------------------------------------------------------------
252 * "guesses" whether a deque is empty. Can return false negatives in
253 * presence of concurrent steal() calls, and false positives in
254 * presence of a concurrent pushBottom().
256 * -------------------------------------------------------------------------- */
259 looksEmpty(SparkPool* deque)
261 StgWord t = deque->top;
262 StgWord b = deque->bottom;
263 /* try to prefer false negatives by reading top first */
264 return ((long)b - (long)t <= 0);
265 /* => array is *never* completely filled, always 1 place free! */
268 /* -----------------------------------------------------------------------------
270 * Turn a spark into a real thread
272 * -------------------------------------------------------------------------- */
275 createSparkThread (Capability *cap)
279 tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize,
280 &base_GHCziConc_runSparks_closure);
281 appendToRunQueue(cap,tso);
284 /* -----------------------------------------------------------------------------
288 * -------------------------------------------------------------------------- */
292 /* enqueue an element. Should always succeed by resizing the array
293 (not implemented yet, silently fails in that case). */
295 pushBottom (SparkPool* deque, StgClosurePtr elem)
299 StgWord sz = deque->moduloSize;
300 StgWord b = deque->bottom;
302 ASSERT_SPARK_POOL_INVARIANTS(deque);
304 /* we try to avoid reading deque->top (accessed by all) and use
305 deque->topBound (accessed only by writer) instead.
306 This is why we do not just call empty(deque) here.
309 if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) {
310 /* NB. 1. sz == deque->size - 1, thus ">="
311 2. signed comparison, it is possible that t > b
313 /* could be full, check the real top value in this case */
316 if (b - t >= sz) { /* really no space left :-( */
317 /* reallocate the array, copying the values. Concurrent steal()s
318 will in the meantime use the old one and modify only top.
319 This means: we cannot safely free the old space! Can keep it
320 on a free list internally here...
322 Potential bug in combination with steal(): if array is
323 replaced, it is unclear which one concurrent steal operations
324 use. Must read the array base address in advance in steal().
326 #if defined(DISCARD_NEW)
327 ASSERT_SPARK_POOL_INVARIANTS(deque);
328 return; /* for now, silently fail */
330 /* could make room by incrementing the top position here. In
331 * this case, should use CASTOP. If this fails, someone else has
332 * removed something, and new room will be available.
334 ASSERT_SPARK_POOL_INVARIANTS(deque);
338 pos = (deque->elements) + (b & sz);
342 ASSERT_SPARK_POOL_INVARIANTS(deque);
347 /* --------------------------------------------------------------------------
348 * newSpark: create a new spark, as a result of calling "par"
349 * Called directly from STG.
350 * -------------------------------------------------------------------------- */
353 newSpark (StgRegTable *reg, StgClosure *p)
355 Capability *cap = regTableToCapability(reg);
356 SparkPool *pool = cap->sparks;
358 /* I am not sure whether this is the right thing to do.
359 * Maybe it is better to exploit the tag information
360 * instead of throwing it away?
362 p = UNTAG_CLOSURE(p);
364 ASSERT_SPARK_POOL_INVARIANTS(pool);
366 if (closure_SHOULD_SPARK(p)) {
370 cap->sparks_created++;
372 ASSERT_SPARK_POOL_INVARIANTS(pool);
378 /* --------------------------------------------------------------------------
379 * Remove all sparks from the spark queues which should not spark any
380 * more. Called after GC. We assume exclusive access to the structure
381 * and replace all sparks in the queue, see explanation below. At exit,
382 * the spark pool only contains sparkable closures.
383 * -------------------------------------------------------------------------- */
386 pruneSparkQueue (evac_fn evac, void *user, Capability *cap)
389 StgClosurePtr spark, tmp, *elements;
390 nat n, pruned_sparks; // stats only
391 StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
392 const StgInfoTable *info;
394 PAR_TICKY_MARK_SPARK_QUEUE_START();
401 // it is possible that top > bottom, indicating an empty pool. We
402 // fix that here; this is only necessary because the loop below
404 if (pool->top > pool->bottom)
405 pool->top = pool->bottom;
407 // Take this opportunity to reset top/bottom modulo the size of
408 // the array, to avoid overflow. This is only possible because no
409 // stealing is happening during GC.
410 pool->bottom -= pool->top & ~pool->moduloSize;
411 pool->top &= pool->moduloSize;
412 pool->topBound = pool->top;
414 debugTrace(DEBUG_sched,
415 "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
416 sparkPoolSize(pool), pool->bottom, pool->top);
417 ASSERT_SPARK_POOL_INVARIANTS(pool);
419 elements = pool->elements;
421 /* We have exclusive access to the structure here, so we can reset
422 bottom and top counters, and prune invalid sparks. Contents are
423 copied in-place if they are valuable, otherwise discarded. The
424 routine uses "real" indices t and b, starts by computing them
425 as the modulus size of top and bottom,
429 At the beginning, the pool structure can look like this:
430 ( bottom % size >= top % size , no wrap-around)
432 ___________***********_________________
434 or like this ( bottom % size < top % size, wrap-around )
436 ***********__________******************
437 As we need to remove useless sparks anyway, we make one pass
438 between t and b, moving valuable content to b and subsequent
439 cells (wrapping around when the size is reached).
442 ***********OOO_______XX_X__X?**********
445 After this movement, botInd becomes the new bottom, and old
446 bottom becomes the new top index, both as indices in the array
450 currInd = (pool->top) & (pool->moduloSize); // mod
452 // copies of evacuated closures go to space from botInd on
453 // we keep oldBotInd to know when to stop
454 oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
456 // on entry to loop, we are within the bounds
457 ASSERT( currInd < pool->size && botInd < pool->size );
459 while (currInd != oldBotInd ) {
460 /* must use != here, wrap-around at size
461 subtle: loop not entered if queue empty
464 /* check element at currInd. if valuable, evacuate and move to
465 botInd, otherwise move on */
466 spark = elements[currInd];
468 // We have to be careful here: in the parallel GC, another
469 // thread might evacuate this closure while we're looking at it,
470 // so grab the info pointer just once.
471 info = spark->header.info;
472 if (IS_FORWARDING_PTR(info)) {
473 tmp = (StgClosure*)UN_FORWARDING_PTR(info);
474 /* if valuable work: shift inside the pool */
475 if (closure_SHOULD_SPARK(tmp)) {
476 elements[botInd] = tmp; // keep entry (new address)
480 pruned_sparks++; // discard spark
481 cap->sparks_pruned++;
484 if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) {
485 elements[botInd] = spark; // keep entry (new address)
486 evac (user, &elements[botInd]);
490 pruned_sparks++; // discard spark
491 cap->sparks_pruned++;
496 // in the loop, we may reach the bounds, and instantly wrap around
497 ASSERT( currInd <= pool->size && botInd <= pool->size );
498 if ( currInd == pool->size ) { currInd = 0; }
499 if ( botInd == pool->size ) { botInd = 0; }
501 } // while-loop over spark pool elements
503 ASSERT(currInd == oldBotInd);
505 pool->top = oldBotInd; // where we started writing
506 pool->topBound = pool->top;
508 pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size);
509 // first free place we did not use (corrected by wraparound)
511 PAR_TICKY_MARK_SPARK_QUEUE_END(n);
513 debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
515 debugTrace(DEBUG_sched,
516 "new spark queue len=%d; (hd=%ld; tl=%ld)",
517 sparkPoolSize(pool), pool->bottom, pool->top);
519 ASSERT_SPARK_POOL_INVARIANTS(pool);
522 /* GC for the spark pool, called inside Capability.c for all
523 capabilities in turn. Blindly "evac"s complete spark pool. */
525 traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
529 StgWord top,bottom, modMask;
533 ASSERT_SPARK_POOL_INVARIANTS(pool);
536 bottom = pool->bottom;
537 sparkp = pool->elements;
538 modMask = pool->moduloSize;
540 while (top < bottom) {
541 /* call evac for all closures in range (wrap-around via modulo)
542 * In GHC-6.10, evac takes an additional 1st argument to hold a
543 * GC-specific register, see rts/sm/GC.c::mark_root()
545 evac( user , sparkp + (top & modMask) );
549 debugTrace(DEBUG_sched,
550 "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
551 sparkPoolSize(pool), pool->bottom, pool->top);
554 /* ----------------------------------------------------------------------------
555 * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
556 * capabilities) and its size. Accesses all spark pools and equally
557 * distributes the sparks among them.
559 * Could be called after GC, before Cap. release, from scheduler.
560 * -------------------------------------------------------------------------- */
561 void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
563 void balanceSparkPoolsCaps(nat n_caps STG_UNUSED,
564 Capability caps[] STG_UNUSED) {
565 barf("not implemented");
571 newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
578 #endif /* PARALLEL_HASKELL || THREADED_RTS */
581 /* -----------------------------------------------------------------------------
583 * GRAN & PARALLEL_HASKELL stuff beyond here.
587 * -------------------------------------------------------------------------- */
589 #if defined(PARALLEL_HASKELL) || defined(GRAN)
591 static void slide_spark_pool( StgSparkPool *pool );
594 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
596 if (pool->tl == pool->lim)
597 slide_spark_pool(pool);
599 if (closure_SHOULD_SPARK(closure) &&
600 pool->tl < pool->lim) {
601 *(pool->tl++) = closure;
603 #if defined(PARALLEL_HASKELL)
604 // collect parallel global statistics (currently done together with GC stats)
605 if (RtsFlags.ParFlags.ParStats.Global &&
606 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
607 // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime());
608 globalParStats.tot_sparks_created++;
613 #if defined(PARALLEL_HASKELL)
614 // collect parallel global statistics (currently done together with GC stats)
615 if (RtsFlags.ParFlags.ParStats.Global &&
616 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
617 //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime());
618 globalParStats.tot_sparks_ignored++;
626 slide_spark_pool( StgSparkPool *pool )
628 StgClosure **sparkp, **to_sparkp;
631 to_sparkp = pool->base;
632 while (sparkp < pool->tl) {
633 ASSERT(to_sparkp<=sparkp);
634 ASSERT(*sparkp!=NULL);
635 ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
637 if (closure_SHOULD_SPARK(*sparkp)) {
638 *to_sparkp++ = *sparkp++;
643 pool->hd = pool->base;
644 pool->tl = to_sparkp;
651 #if !defined(THREADED_RTS)
656 pool = &(cap->rSparks);
657 ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
659 ASSERT(spark != (StgClosure *)NULL);
667 Search the spark queue of the proc in event for a spark that's worth
668 turning into a thread
669 (was gimme_spark in the old RTS)
672 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
674 PEs proc = event->proc, /* proc to search for work */
675 creator = event->creator; /* proc that requested work */
678 rtsSparkQ spark_of_non_local_node = NULL,
679 spark_of_non_local_node_prev = NULL,
680 low_priority_spark = NULL,
681 low_priority_spark_prev = NULL,
682 spark = NULL, prev = NULL;
684 /* Choose a spark from the local spark queue */
685 prev = (rtsSpark*)NULL;
686 spark = pending_sparks_hds[proc];
689 // ToDo: check this code & implement local sparking !! -- HWL
690 while (!found && spark != (rtsSpark*)NULL)
692 ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
693 (prev==(rtsSpark*)NULL || prev->next==spark) &&
694 (spark->prev==prev));
696 if (!closure_SHOULD_SPARK(node))
698 IF_GRAN_DEBUG(checkSparkQ,
699 debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
702 if (RtsFlags.GranFlags.GranSimStats.Sparks)
703 DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
704 spark->node, spark->name, spark_queue_len(proc));
706 ASSERT(spark != (rtsSpark*)NULL);
707 ASSERT(SparksAvail>0);
710 ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
711 spark = delete_from_sparkq (spark, proc, rtsTrue);
712 if (spark != (rtsSpark*)NULL)
716 /* -- node should eventually be sparked */
717 else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes &&
718 !IS_LOCAL_TO(PROCS(node),CurrentProc))
720 barf("Local sparking not yet implemented");
722 /* Remember first low priority spark */
723 if (spark_of_non_local_node==(rtsSpark*)NULL) {
724 spark_of_non_local_node_prev = prev;
725 spark_of_non_local_node = spark;
728 if (spark->next == (rtsSpark*)NULL) {
729 /* ASSERT(spark==SparkQueueTl); just for testing */
730 prev = spark_of_non_local_node_prev;
731 spark = spark_of_non_local_node;
736 # if defined(GRAN) && defined(GRAN_CHECK)
737 /* Should never happen; just for testing
738 if (spark==pending_sparks_tl) {
739 debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
740 stg_exit(EXIT_FAILURE);
745 ASSERT(SparksAvail>0);
749 else if ( RtsFlags.GranFlags.DoPrioritySparking ||
750 (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
752 if (RtsFlags.GranFlags.DoPrioritySparking)
753 barf("Priority sparking not yet implemented");
758 else /* only used if SparkPriority2 is defined */
760 /* ToDo: fix the code below and re-integrate it */
761 /* Remember first low priority spark */
762 if (low_priority_spark==(rtsSpark*)NULL) {
763 low_priority_spark_prev = prev;
764 low_priority_spark = spark;
767 if (spark->next == (rtsSpark*)NULL) {
768 /* ASSERT(spark==spark_queue_tl); just for testing */
769 prev = low_priority_spark_prev;
770 spark = low_priority_spark;
771 found = rtsTrue; /* take low pri spark => rc is 2 */
775 /* Should never happen; just for testing
776 if (spark==pending_sparks_tl) {
777 debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
778 stg_exit(EXIT_FAILURE);
785 debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
786 spark->gran_info, RtsFlags.GranFlags.SparkPriority,
787 spark->node, spark->name);)
790 } /* while (spark!=NULL && !found) */
797 Turn the spark into a thread.
798 In GranSim this basically means scheduling a StartThread event for the
799 node pointed to by the spark at some point in the future.
800 (was munch_spark in the old RTS)
803 activateSpark (rtsEvent *event, rtsSparkQ spark)
805 PEs proc = event->proc, /* proc to search for work */
806 creator = event->creator; /* proc that requested work */
809 rtsTime spark_arrival_time;
812 We've found a node on PE proc requested by PE creator.
813 If proc==creator we can turn the spark into a thread immediately;
814 otherwise we schedule a MoveSpark event on the requesting PE
820 /* only possible if we simulate GUM style fishing */
821 ASSERT(RtsFlags.GranFlags.Fishing);
823 /* Message packing costs for sending a Fish; qeq jabbI'ID */
824 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
826 if (RtsFlags.GranFlags.GranSimStats.Sparks)
827 DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
828 (StgTSO*)NULL, spark->node,
829 spark->name, spark_queue_len(proc));
831 /* time of the spark arrival on the remote PE */
832 spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
834 new_event(creator, proc, spark_arrival_time,
836 (StgTSO*)NULL, spark->node, spark);
838 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
840 } else { /* proc==creator i.e. turn the spark into a thread */
842 if ( RtsFlags.GranFlags.GranSimStats.Global &&
843 spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
845 globalGranStats.tot_low_pri_sparks++;
847 debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
849 spark->node, spark->name));
852 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
857 /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
858 if (GARBAGE COLLECTION IS NECESSARY) {
859 /* Some kind of backoff needed here in case there's too little heap */
860 # if defined(GRAN_CHECK) && defined(GRAN)
861 if (RtsFlags.GcFlags.giveStats)
862 fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p; name=%u\n",
863 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
864 spark, node, spark->name);
866 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
868 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
869 barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
870 GarbageCollect(GetRoots, rtsFalse);
871 // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
872 // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
874 return; /* was: continue; */ /* to the next event, eventually */
878 if (RtsFlags.GranFlags.GranSimStats.Sparks)
879 DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
880 spark->node, spark->name,
881 spark_queue_len(CurrentProc));
883 new_event(proc, proc, CurrentTime[proc],
885 END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
887 procStatus[proc] = Starting;
891 /* -------------------------------------------------------------------------
892 This is the main point where handling granularity information comes into
894 ------------------------------------------------------------------------- */
896 #define MAX_RAND_PRI 100
899 Granularity info transformers.
900 Applied to the GRAN_INFO field of a spark.
902 STATIC_INLINE nat ID(nat x) { return(x); };
903 STATIC_INLINE nat INV(nat x) { return(-x); };
904 STATIC_INLINE nat IGNORE(nat x) { return (0); };
905 STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
907 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
909 newSpark(node,name,gran_info,size_info,par_info,local)
911 nat name, gran_info, size_info, par_info, local;
916 pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
917 RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
918 RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
921 if ( RtsFlags.GranFlags.SparkPriority!=0 &&
922 pri<RtsFlags.GranFlags.SparkPriority ) {
924 debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
925 pri, RtsFlags.GranFlags.SparkPriority, node, name));
926 return ((rtsSpark*)NULL);
929 newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
930 newspark->prev = newspark->next = (rtsSpark*)NULL;
931 newspark->node = node;
932 newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
933 newspark->gran_info = pri;
934 newspark->global = !local; /* Check that with parAt, parAtAbs !!*/
936 if (RtsFlags.GranFlags.GranSimStats.Global) {
937 globalGranStats.tot_sparks_created++;
938 globalGranStats.sparks_created_on_PE[CurrentProc]++;
959 disposeSparkQ(spark->next);
962 if (SparksAvail < 0) {
963 debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
972 With PrioritySparking add_to_spark_queue performs an insert sort to keep
973 the spark queue sorted. Otherwise the spark is just added to the end of
978 add_to_spark_queue(spark)
981 rtsSpark *prev = NULL, *next = NULL;
983 rtsBool found = rtsFalse;
985 if ( spark == (rtsSpark *)NULL ) {
989 if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
990 /* Priority sparking is enabled i.e. spark queues must be sorted */
992 for (prev = NULL, next = pending_sparks_hd, count=0;
994 !(found = (spark->gran_info >= next->gran_info));
995 prev = next, next = next->next, count++)
998 } else { /* 'utQo' */
999 /* Priority sparking is disabled */
1001 found = rtsFalse; /* to add it at the end */
1006 /* next points to the first spark with a gran_info smaller than that
1007 of spark; therefore, add spark before next into the spark queue */
1009 if ( next == NULL ) {
1010 pending_sparks_tl = spark;
1015 if ( prev == NULL ) {
1016 pending_sparks_hd = spark;
1020 } else { /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
1021 /* add the spark at the end of the spark queue */
1023 spark->prev = pending_sparks_tl;
1024 if (pending_sparks_hd == NULL)
1025 pending_sparks_hd = spark;
1027 pending_sparks_tl->next = spark;
1028 pending_sparks_tl = spark;
1032 /* add costs for search in priority sparking */
1033 if (RtsFlags.GranFlags.DoPrioritySparking) {
1034 CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
1037 IF_GRAN_DEBUG(checkSparkQ,
1038 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
1039 spark, spark->node, CurrentProc);
1040 print_sparkq_stats());
1042 # if defined(GRAN_CHECK)
1043 if (RtsFlags.GranFlags.Debug.checkSparkQ) {
1044 for (prev = NULL, next = pending_sparks_hd;
1046 prev = next, next = next->next)
1048 if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
1049 debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1051 pending_sparks_tl, prev);
1055 # if defined(GRAN_CHECK)
1056 /* Check if the sparkq is still sorted. Just for testing, really! */
1057 if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
1058 RtsFlags.GranFlags.Debug.pri ) {
1059 rtsBool sorted = rtsTrue;
1060 rtsSpark *prev, *next;
1062 if (pending_sparks_hd == NULL ||
1063 pending_sparks_hd->next == NULL ) {
1064 /* just 1 elem => ok */
1066 for (prev = pending_sparks_hd,
1067 next = pending_sparks_hd->next;
1069 prev = next, next = next->next) {
1071 (prev->gran_info >= next->gran_info);
1075 debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
1077 print_sparkq(CurrentProc);
1084 spark_queue_len(proc)
1087 rtsSpark *prev, *spark; /* prev only for testing !! */
1090 for (len = 0, prev = NULL, spark = pending_sparks_hds[proc];
1092 len++, prev = spark, spark = spark->next)
1095 # if defined(GRAN_CHECK)
1096 if ( RtsFlags.GranFlags.Debug.checkSparkQ )
1097 if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
1098 debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
1099 proc, pending_sparks_tls[proc], prev);
1106 Take spark out of the spark queue on PE p and nuke the spark. Adjusts
1107 hd and tl pointers of the spark queue. Returns a pointer to the next
1111 delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
1114 rtsBool dispose_too;
1116 rtsSpark *new_spark;
1119 barf("delete_from_sparkq: trying to delete NULL spark\n");
1121 # if defined(GRAN_CHECK)
1122 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1123 debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
1124 pending_sparks_hd, pending_sparks_tl,
1125 spark->prev, spark, spark->next,
1126 (spark->next==NULL ? 0 : spark->next->prev));
1130 if (spark->prev==NULL) {
1131 /* spark is first spark of queue => adjust hd pointer */
1132 ASSERT(pending_sparks_hds[p]==spark);
1133 pending_sparks_hds[p] = spark->next;
1135 spark->prev->next = spark->next;
1137 if (spark->next==NULL) {
1138 ASSERT(pending_sparks_tls[p]==spark);
1139 /* spark is first spark of queue => adjust tl pointer */
1140 pending_sparks_tls[p] = spark->prev;
1142 spark->next->prev = spark->prev;
1144 new_spark = spark->next;
1146 # if defined(GRAN_CHECK)
1147 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
1148 debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
1149 pending_sparks_hd, pending_sparks_tl,
1150 spark->prev, spark, spark->next,
1151 (spark->next==NULL ? 0 : spark->next->prev), spark);
1156 disposeSpark(spark);
1161 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
1163 markSparkQueue(void)
1165 StgClosure *MarkRoot(StgClosure *root); // prototype
1169 for (p=0; p<RtsFlags.GranFlags.proc; p++)
1170 for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
1171 ASSERT(sp->node!=NULL);
1172 ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
1173 // ToDo?: statistics gathering here (also for GUM!)
1174 sp->node = (StgClosure *)MarkRoot(sp->node);
1178 debugBelch("markSparkQueue: spark statistics at start of GC:");
1179 print_sparkq_stats());
1189 debugBelch("Spark: NIL\n");
1193 ((spark->node==NULL) ? "______" : "%#6lx"),
1194 stgCast(StgPtr,spark->node));
1196 debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
1198 ((spark->global)==rtsTrue?"True":"False"), spark->creator,
1199 spark->prev, spark->next);
1208 rtsSpark *x = pending_sparks_hds[proc];
1210 debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
1211 for (; x!=(rtsSpark*)NULL; x=x->next) {
1217 Print a statistics of all spark queues.
1220 print_sparkq_stats(void)
1224 debugBelch("SparkQs: [");
1225 for (p=0; p<RtsFlags.GranFlags.proc; p++)
1226 debugBelch(", PE %d: %d", p, spark_queue_len(p));