1 /* ---------------------------------------------------------------------------
2 * $Id: Sparks.c,v 1.3 2001/03/22 03:51:10 hwloidl Exp $
4 * (c) The GHC Team, 2000
6 * Sparking support for PAR and SMP versions of the RTS.
8 * -------------------------------------------------------------------------*/
10 //@node Spark Management Routines, , ,
11 //@section Spark Management Routines
20 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
21 //@subsection Includes
31 # include "ParallelRts.h"
32 # include "GranSimRts.h" // for GR_...
34 # include "GranSimRts.h"
38 #if defined(SMP) || defined(PAR)
40 //@node GUM code, GranSim code, Includes, Spark Management Routines
41 //@subsection GUM code
43 static void slide_spark_pool( StgSparkPool *pool );
46 initSparkPools( void )
52 /* walk over the capabilities, allocating a spark pool for each one */
53 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
55 /* allocate a single spark pool */
59 pool = &(cap->rSparks);
61 pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
62 * sizeof(StgClosure *),
64 pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
65 pool->hd = pool->base;
66 pool->tl = pool->base;
68 return rtsTrue; /* Qapla' */
72 We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
73 spark. Rationale, we don't want to give away the only work a PE has.
74 ToDo: introduce low- and high-water-marks for load balancing.
77 findSpark( rtsBool for_export )
81 StgClosure *spark, *first=NULL;
82 rtsBool isIdlePE = EMPTY_RUN_QUEUE();
85 /* walk over the capabilities, allocating a spark pool for each one */
86 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
88 /* allocate a single spark pool */
92 pool = &(cap->rSparks);
93 while (pool->hd < pool->tl) {
95 if (closure_SHOULD_SPARK(spark)) {
96 if (for_export && isIdlePE) {
98 first = spark; // keep the first usable spark if PE is idle
100 pool->hd--; // found a second spark; keep it in the pool
101 ASSERT(*pool->hd==spark);
102 if (RtsFlags.ParFlags.ParStats.Sparks)
103 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
104 GR_STEALING, ((StgTSO *)NULL), first,
105 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
106 return first; // and return the *first* spark found
109 if (RtsFlags.ParFlags.ParStats.Sparks && for_export)
110 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
111 GR_STEALING, ((StgTSO *)NULL), spark,
112 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
113 return spark; // return first spark found
117 slide_spark_pool(pool);
123 activateSpark is defined in Schedule.c
126 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
128 if (pool->tl == pool->lim)
129 slide_spark_pool(pool);
131 if (closure_SHOULD_SPARK(closure) &&
132 pool->tl < pool->lim) {
133 *(pool->tl++) = closure;
136 // collect parallel global statistics (currently done together with GC stats)
137 if (RtsFlags.ParFlags.ParStats.Global &&
138 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
139 // fprintf(stderr, "Creating spark for %x @ %11.2f\n", closure, usertime());
140 globalParStats.tot_sparks_created++;
146 // collect parallel global statistics (currently done together with GC stats)
147 if (RtsFlags.ParFlags.ParStats.Global &&
148 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
149 //fprintf(stderr, "Ignoring spark for %x @ %11.2f\n", closure, usertime());
150 globalParStats.tot_sparks_ignored++;
158 slide_spark_pool( StgSparkPool *pool )
160 StgClosure **sparkp, **to_sparkp;
163 to_sparkp = pool->base;
164 while (sparkp < pool->tl) {
165 ASSERT(to_sparkp<=sparkp);
166 ASSERT(*sparkp!=NULL);
167 ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
169 if (closure_SHOULD_SPARK(*sparkp)) {
170 *to_sparkp++ = *sparkp++;
175 pool->hd = pool->base;
176 pool->tl = to_sparkp;
180 spark_queue_len( StgSparkPool *pool )
182 return (nat) (pool->tl - pool->hd);
185 /* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
186 implicit slide i.e. after marking all sparks are at the beginning of the
187 spark pool and the spark pool only contains sparkable closures
190 markSparkQueue( void )
192 StgClosure **sparkp, **to_sparkp;
193 nat n, pruned_sparks; // stats only
197 PAR_TICKY_MARK_SPARK_QUEUE_START();
200 /* walk over the capabilities, allocating a spark pool for each one */
201 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
203 /* allocate a single spark pool */
207 pool = &(cap->rSparks);
216 to_sparkp = pool->base;
217 while (sparkp < pool->tl) {
218 ASSERT(to_sparkp<=sparkp);
219 ASSERT(*sparkp!=NULL);
220 ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
221 // ToDo?: statistics gathering here (also for GUM!)
222 if (closure_SHOULD_SPARK(*sparkp)) {
223 *to_sparkp = MarkRoot(*sparkp);
235 pool->hd = pool->base;
236 pool->tl = to_sparkp;
238 PAR_TICKY_MARK_SPARK_QUEUE_END(n);
242 belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
243 n, pruned_sparks, pthread_self()));
246 belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
247 n, pruned_sparks, mytid));
250 belch("markSparkQueue: marked %d sparks and pruned %d sparks",
255 belch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)",
256 spark_queue_len(pool), pool->hd, pool->tl));
270 pool = &(cap->rSparks);
271 ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
273 ASSERT(spark != (StgClosure *)NULL);
280 //@node GranSim code, , GUM code, Spark Management Routines
281 //@subsection GranSim code
284 //* Basic interface to sparkq::
288 //@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
289 //@subsubsection Basic interface to sparkq
291 Search the spark queue of the proc in event for a spark that's worth
292 turning into a thread
293 (was gimme_spark in the old RTS)
295 //@cindex findLocalSpark
297 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
299 PEs proc = event->proc, /* proc to search for work */
300 creator = event->creator; /* proc that requested work */
303 rtsSparkQ spark_of_non_local_node = NULL,
304 spark_of_non_local_node_prev = NULL,
305 low_priority_spark = NULL,
306 low_priority_spark_prev = NULL,
307 spark = NULL, prev = NULL;
309 /* Choose a spark from the local spark queue */
310 prev = (rtsSpark*)NULL;
311 spark = pending_sparks_hds[proc];
314 // ToDo: check this code & implement local sparking !! -- HWL
315 while (!found && spark != (rtsSpark*)NULL)
317 ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
318 (prev==(rtsSpark*)NULL || prev->next==spark) &&
319 (spark->prev==prev));
321 if (!closure_SHOULD_SPARK(node))
323 IF_GRAN_DEBUG(checkSparkQ,
324 belch("^^ pruning spark %p (node %p) in gimme_spark",
327 if (RtsFlags.GranFlags.GranSimStats.Sparks)
328 DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
329 spark->node, spark->name, spark_queue_len(proc));
331 ASSERT(spark != (rtsSpark*)NULL);
332 ASSERT(SparksAvail>0);
335 ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
336 spark = delete_from_sparkq (spark, proc, rtsTrue);
337 if (spark != (rtsSpark*)NULL)
341 /* -- node should eventually be sparked */
342 else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes &&
343 !IS_LOCAL_TO(PROCS(node),CurrentProc))
345 barf("Local sparking not yet implemented");
347 /* Remember first low priority spark */
348 if (spark_of_non_local_node==(rtsSpark*)NULL) {
349 spark_of_non_local_node_prev = prev;
350 spark_of_non_local_node = spark;
353 if (spark->next == (rtsSpark*)NULL) {
354 /* ASSERT(spark==SparkQueueTl); just for testing */
355 prev = spark_of_non_local_node_prev;
356 spark = spark_of_non_local_node;
361 # if defined(GRAN) && defined(GRAN_CHECK)
362 /* Should never happen; just for testing
363 if (spark==pending_sparks_tl) {
364 fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
365 stg_exit(EXIT_FAILURE);
370 ASSERT(SparksAvail>0);
374 else if ( RtsFlags.GranFlags.DoPrioritySparking ||
375 (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
377 if (RtsFlags.GranFlags.DoPrioritySparking)
378 barf("Priority sparking not yet implemented");
383 else /* only used if SparkPriority2 is defined */
385 /* ToDo: fix the code below and re-integrate it */
386 /* Remember first low priority spark */
387 if (low_priority_spark==(rtsSpark*)NULL) {
388 low_priority_spark_prev = prev;
389 low_priority_spark = spark;
392 if (spark->next == (rtsSpark*)NULL) {
393 /* ASSERT(spark==spark_queue_tl); just for testing */
394 prev = low_priority_spark_prev;
395 spark = low_priority_spark;
396 found = rtsTrue; /* take low pri spark => rc is 2 */
400 /* Should never happen; just for testing
401 if (spark==pending_sparks_tl) {
402 fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
403 stg_exit(EXIT_FAILURE);
410 belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
411 spark->gran_info, RtsFlags.GranFlags.SparkPriority,
412 spark->node, spark->name);)
415 } /* while (spark!=NULL && !found) */
422 Turn the spark into a thread.
423 In GranSim this basically means scheduling a StartThread event for the
424 node pointed to by the spark at some point in the future.
425 (was munch_spark in the old RTS)
427 //@cindex activateSpark
429 activateSpark (rtsEvent *event, rtsSparkQ spark)
431 PEs proc = event->proc, /* proc to search for work */
432 creator = event->creator; /* proc that requested work */
435 rtsTime spark_arrival_time;
438 We've found a node on PE proc requested by PE creator.
439 If proc==creator we can turn the spark into a thread immediately;
440 otherwise we schedule a MoveSpark event on the requesting PE
446 /* only possible if we simulate GUM style fishing */
447 ASSERT(RtsFlags.GranFlags.Fishing);
449 /* Message packing costs for sending a Fish; qeq jabbI'ID */
450 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
452 if (RtsFlags.GranFlags.GranSimStats.Sparks)
453 DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
454 (StgTSO*)NULL, spark->node,
455 spark->name, spark_queue_len(proc));
457 /* time of the spark arrival on the remote PE */
458 spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
460 new_event(creator, proc, spark_arrival_time,
462 (StgTSO*)NULL, spark->node, spark);
464 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
466 } else { /* proc==creator i.e. turn the spark into a thread */
468 if ( RtsFlags.GranFlags.GranSimStats.Global &&
469 spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
471 globalGranStats.tot_low_pri_sparks++;
473 belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
475 spark->node, spark->name));
478 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
483 /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
484 if (GARBAGE COLLECTION IS NECESSARY) {
485 /* Some kind of backoff needed here in case there's too little heap */
486 # if defined(GRAN_CHECK) && defined(GRAN)
487 if (RtsFlags.GcFlags.giveStats)
488 fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p; name=%u\n",
489 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
490 spark, node, spark->name);
492 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
494 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
495 barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
496 GarbageCollect(GetRoots, rtsFalse);
497 // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
498 // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
500 return; /* was: continue; */ /* to the next event, eventually */
504 if (RtsFlags.GranFlags.GranSimStats.Sparks)
505 DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
506 spark->node, spark->name,
507 spark_queue_len(CurrentProc));
509 new_event(proc, proc, CurrentTime[proc],
511 END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
513 procStatus[proc] = Starting;
517 /* -------------------------------------------------------------------------
518 This is the main point where handling granularity information comes into
520 ------------------------------------------------------------------------- */
522 #define MAX_RAND_PRI 100
525 Granularity info transformers.
526 Applied to the GRAN_INFO field of a spark.
528 static inline nat ID(nat x) { return(x); };
529 static inline nat INV(nat x) { return(-x); };
530 static inline nat IGNORE(nat x) { return (0); };
531 static inline nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
533 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
536 newSpark(node,name,gran_info,size_info,par_info,local)
538 nat name, gran_info, size_info, par_info, local;
543 pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
544 RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
545 RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
548 if ( RtsFlags.GranFlags.SparkPriority!=0 &&
549 pri<RtsFlags.GranFlags.SparkPriority ) {
551 belch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
552 pri, RtsFlags.GranFlags.SparkPriority, node, name));
553 return ((rtsSpark*)NULL);
556 newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
557 newspark->prev = newspark->next = (rtsSpark*)NULL;
558 newspark->node = node;
559 newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
560 newspark->gran_info = pri;
561 newspark->global = !local; /* Check that with parAt, parAtAbs !!*/
563 if (RtsFlags.GranFlags.GranSimStats.Global) {
564 globalGranStats.tot_sparks_created++;
565 globalGranStats.sparks_created_on_PE[CurrentProc]++;
571 //@cindex disposeSpark
580 //@cindex disposeSparkQ
588 disposeSparkQ(spark->next);
591 if (SparksAvail < 0) {
592 fprintf(stderr,"disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
601 With PrioritySparking add_to_spark_queue performs an insert sort to keep
602 the spark queue sorted. Otherwise the spark is just added to the end of
606 //@cindex add_to_spark_queue
608 add_to_spark_queue(spark)
611 rtsSpark *prev = NULL, *next = NULL;
613 rtsBool found = rtsFalse;
615 if ( spark == (rtsSpark *)NULL ) {
619 if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
620 /* Priority sparking is enabled i.e. spark queues must be sorted */
622 for (prev = NULL, next = pending_sparks_hd, count=0;
624 !(found = (spark->gran_info >= next->gran_info));
625 prev = next, next = next->next, count++)
628 } else { /* 'utQo' */
629 /* Priority sparking is disabled */
631 found = rtsFalse; /* to add it at the end */
636 /* next points to the first spark with a gran_info smaller than that
637 of spark; therefore, add spark before next into the spark queue */
639 if ( next == NULL ) {
640 pending_sparks_tl = spark;
645 if ( prev == NULL ) {
646 pending_sparks_hd = spark;
650 } else { /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
651 /* add the spark at the end of the spark queue */
653 spark->prev = pending_sparks_tl;
654 if (pending_sparks_hd == NULL)
655 pending_sparks_hd = spark;
657 pending_sparks_tl->next = spark;
658 pending_sparks_tl = spark;
662 /* add costs for search in priority sparking */
663 if (RtsFlags.GranFlags.DoPrioritySparking) {
664 CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
667 IF_GRAN_DEBUG(checkSparkQ,
668 belch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
669 spark, spark->node, CurrentProc);
670 print_sparkq_stats());
672 # if defined(GRAN_CHECK)
673 if (RtsFlags.GranFlags.Debug.checkSparkQ) {
674 for (prev = NULL, next = pending_sparks_hd;
676 prev = next, next = next->next)
678 if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
679 fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
681 pending_sparks_tl, prev);
685 # if defined(GRAN_CHECK)
686 /* Check if the sparkq is still sorted. Just for testing, really! */
687 if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
688 RtsFlags.GranFlags.Debug.pri ) {
689 rtsBool sorted = rtsTrue;
690 rtsSpark *prev, *next;
692 if (pending_sparks_hd == NULL ||
693 pending_sparks_hd->next == NULL ) {
694 /* just 1 elem => ok */
696 for (prev = pending_sparks_hd,
697 next = pending_sparks_hd->next;
699 prev = next, next = next->next) {
701 (prev->gran_info >= next->gran_info);
705 fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n",
707 print_sparkq(CurrentProc);
713 //@node Aux fcts, , Basic interface to sparkq, GranSim code
714 //@subsubsection Aux fcts
716 //@cindex spark_queue_len
718 spark_queue_len(proc)
721 rtsSpark *prev, *spark; /* prev only for testing !! */
724 for (len = 0, prev = NULL, spark = pending_sparks_hds[proc];
726 len++, prev = spark, spark = spark->next)
729 # if defined(GRAN_CHECK)
730 if ( RtsFlags.GranFlags.Debug.checkSparkQ )
731 if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
732 fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
733 proc, pending_sparks_tls[proc], prev);
740 Take spark out of the spark queue on PE p and nuke the spark. Adjusts
741 hd and tl pointers of the spark queue. Returns a pointer to the next
744 //@cindex delete_from_sparkq
746 delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
754 barf("delete_from_sparkq: trying to delete NULL spark\n");
756 # if defined(GRAN_CHECK)
757 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
758 fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
759 pending_sparks_hd, pending_sparks_tl,
760 spark->prev, spark, spark->next,
761 (spark->next==NULL ? 0 : spark->next->prev));
765 if (spark->prev==NULL) {
766 /* spark is first spark of queue => adjust hd pointer */
767 ASSERT(pending_sparks_hds[p]==spark);
768 pending_sparks_hds[p] = spark->next;
770 spark->prev->next = spark->next;
772 if (spark->next==NULL) {
773 ASSERT(pending_sparks_tls[p]==spark);
774 /* spark is first spark of queue => adjust tl pointer */
775 pending_sparks_tls[p] = spark->prev;
777 spark->next->prev = spark->prev;
779 new_spark = spark->next;
781 # if defined(GRAN_CHECK)
782 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
783 fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
784 pending_sparks_hd, pending_sparks_tl,
785 spark->prev, spark, spark->next,
786 (spark->next==NULL ? 0 : spark->next->prev), spark);
796 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
797 //@cindex markSparkQueue
801 StgClosure *MarkRoot(StgClosure *root); // prototype
805 for (p=0; p<RtsFlags.GranFlags.proc; p++)
806 for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
807 ASSERT(sp->node!=NULL);
808 ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
809 // ToDo?: statistics gathering here (also for GUM!)
810 sp->node = (StgClosure *)MarkRoot(sp->node);
813 belch("@@ markSparkQueue: spark statistics at start of GC:");
814 print_sparkq_stats());
817 //@cindex print_spark
825 fprintf(stderr,"Spark: NIL\n");
829 ((spark->node==NULL) ? "______" : "%#6lx"),
830 stgCast(StgPtr,spark->node));
832 fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
834 ((spark->global)==rtsTrue?"True":"False"), spark->creator,
835 spark->prev, spark->next);
839 //@cindex print_sparkq
845 rtsSpark *x = pending_sparks_hds[proc];
847 fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x);
848 for (; x!=(rtsSpark*)NULL; x=x->next) {
854 Print a statistics of all spark queues.
856 //@cindex print_sparkq_stats
858 print_sparkq_stats(void)
862 fprintf(stderr, "SparkQs: [");
863 for (p=0; p<RtsFlags.GranFlags.proc; p++)
864 fprintf(stderr, ", PE %d: %d", p, spark_queue_len(p));
865 fprintf(stderr, "\n");