1 /* ---------------------------------------------------------------------------
2 * $Id: Sparks.c,v 1.6 2003/03/25 17:58:50 sof Exp $
4 * (c) The GHC Team, 2000
6 * Sparking support for PAR and SMP versions of the RTS.
8 * -------------------------------------------------------------------------*/
10 //@node Spark Management Routines, , ,
11 //@section Spark Management Routines
20 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
21 //@subsection Includes
23 #include "PosixSource.h"
32 # include "ParallelRts.h"
33 # include "GranSimRts.h" // for GR_...
35 # include "GranSimRts.h"
39 #if /*defined(SMP) ||*/ defined(PAR)
41 //@node GUM code, GranSim code, Includes, Spark Management Routines
42 //@subsection GUM code
44 static void slide_spark_pool( StgSparkPool *pool );
47 initSparkPools( void )
53 /* walk over the capabilities, allocating a spark pool for each one */
54 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
56 /* allocate a single spark pool */
60 pool = &(cap->rSparks);
62 pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
63 * sizeof(StgClosure *),
65 pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
66 pool->hd = pool->base;
67 pool->tl = pool->base;
69 return rtsTrue; /* Qapla' */
73 We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
74 spark. Rationale, we don't want to give away the only work a PE has.
75 ToDo: introduce low- and high-water-marks for load balancing.
78 findSpark( rtsBool for_export )
82 StgClosure *spark, *first=NULL;
83 rtsBool isIdlePE = EMPTY_RUN_QUEUE();
86 /* walk over the capabilities, allocating a spark pool for each one */
87 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
89 /* allocate a single spark pool */
93 pool = &(cap->rSparks);
94 while (pool->hd < pool->tl) {
96 if (closure_SHOULD_SPARK(spark)) {
97 if (for_export && isIdlePE) {
99 first = spark; // keep the first usable spark if PE is idle
101 pool->hd--; // found a second spark; keep it in the pool
102 ASSERT(*pool->hd==spark);
103 if (RtsFlags.ParFlags.ParStats.Sparks)
104 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
105 GR_STEALING, ((StgTSO *)NULL), first,
106 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
107 return first; // and return the *first* spark found
110 if (RtsFlags.ParFlags.ParStats.Sparks && for_export)
111 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
112 GR_STEALING, ((StgTSO *)NULL), spark,
113 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
114 return spark; // return first spark found
118 slide_spark_pool(pool);
124 activateSpark is defined in Schedule.c
127 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
129 if (pool->tl == pool->lim)
130 slide_spark_pool(pool);
132 if (closure_SHOULD_SPARK(closure) &&
133 pool->tl < pool->lim) {
134 *(pool->tl++) = closure;
137 // collect parallel global statistics (currently done together with GC stats)
138 if (RtsFlags.ParFlags.ParStats.Global &&
139 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
140 // fprintf(stderr, "Creating spark for %x @ %11.2f\n", closure, usertime());
141 globalParStats.tot_sparks_created++;
147 // collect parallel global statistics (currently done together with GC stats)
148 if (RtsFlags.ParFlags.ParStats.Global &&
149 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
150 //fprintf(stderr, "Ignoring spark for %x @ %11.2f\n", closure, usertime());
151 globalParStats.tot_sparks_ignored++;
159 slide_spark_pool( StgSparkPool *pool )
161 StgClosure **sparkp, **to_sparkp;
164 to_sparkp = pool->base;
165 while (sparkp < pool->tl) {
166 ASSERT(to_sparkp<=sparkp);
167 ASSERT(*sparkp!=NULL);
168 ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
170 if (closure_SHOULD_SPARK(*sparkp)) {
171 *to_sparkp++ = *sparkp++;
176 pool->hd = pool->base;
177 pool->tl = to_sparkp;
181 spark_queue_len( StgSparkPool *pool )
183 return (nat) (pool->tl - pool->hd);
186 /* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
187 implicit slide i.e. after marking all sparks are at the beginning of the
188 spark pool and the spark pool only contains sparkable closures
191 markSparkQueue( void )
193 StgClosure **sparkp, **to_sparkp;
194 nat n, pruned_sparks; // stats only
198 PAR_TICKY_MARK_SPARK_QUEUE_START();
201 /* walk over the capabilities, allocating a spark pool for each one */
202 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
204 /* allocate a single spark pool */
208 pool = &(cap->rSparks);
217 to_sparkp = pool->base;
218 while (sparkp < pool->tl) {
219 ASSERT(to_sparkp<=sparkp);
220 ASSERT(*sparkp!=NULL);
221 ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
222 // ToDo?: statistics gathering here (also for GUM!)
223 if (closure_SHOULD_SPARK(*sparkp)) {
224 *to_sparkp = MarkRoot(*sparkp);
236 pool->hd = pool->base;
237 pool->tl = to_sparkp;
239 PAR_TICKY_MARK_SPARK_QUEUE_END(n);
243 belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
244 n, pruned_sparks, pthread_self()));
247 belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
248 n, pruned_sparks, mytid));
251 belch("markSparkQueue: marked %d sparks and pruned %d sparks",
256 belch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)",
257 spark_queue_len(pool), pool->hd, pool->tl));
271 pool = &(cap->rSparks);
272 ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
274 ASSERT(spark != (StgClosure *)NULL);
281 //@node GranSim code, , GUM code, Spark Management Routines
282 //@subsection GranSim code
285 //* Basic interface to sparkq::
289 //@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
290 //@subsubsection Basic interface to sparkq
292 Search the spark queue of the proc in event for a spark that's worth
293 turning into a thread
294 (was gimme_spark in the old RTS)
296 //@cindex findLocalSpark
298 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
300 PEs proc = event->proc, /* proc to search for work */
301 creator = event->creator; /* proc that requested work */
304 rtsSparkQ spark_of_non_local_node = NULL,
305 spark_of_non_local_node_prev = NULL,
306 low_priority_spark = NULL,
307 low_priority_spark_prev = NULL,
308 spark = NULL, prev = NULL;
310 /* Choose a spark from the local spark queue */
311 prev = (rtsSpark*)NULL;
312 spark = pending_sparks_hds[proc];
315 // ToDo: check this code & implement local sparking !! -- HWL
316 while (!found && spark != (rtsSpark*)NULL)
318 ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
319 (prev==(rtsSpark*)NULL || prev->next==spark) &&
320 (spark->prev==prev));
322 if (!closure_SHOULD_SPARK(node))
324 IF_GRAN_DEBUG(checkSparkQ,
325 belch("^^ pruning spark %p (node %p) in gimme_spark",
328 if (RtsFlags.GranFlags.GranSimStats.Sparks)
329 DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
330 spark->node, spark->name, spark_queue_len(proc));
332 ASSERT(spark != (rtsSpark*)NULL);
333 ASSERT(SparksAvail>0);
336 ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
337 spark = delete_from_sparkq (spark, proc, rtsTrue);
338 if (spark != (rtsSpark*)NULL)
342 /* -- node should eventually be sparked */
343 else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes &&
344 !IS_LOCAL_TO(PROCS(node),CurrentProc))
346 barf("Local sparking not yet implemented");
348 /* Remember first low priority spark */
349 if (spark_of_non_local_node==(rtsSpark*)NULL) {
350 spark_of_non_local_node_prev = prev;
351 spark_of_non_local_node = spark;
354 if (spark->next == (rtsSpark*)NULL) {
355 /* ASSERT(spark==SparkQueueTl); just for testing */
356 prev = spark_of_non_local_node_prev;
357 spark = spark_of_non_local_node;
362 # if defined(GRAN) && defined(GRAN_CHECK)
363 /* Should never happen; just for testing
364 if (spark==pending_sparks_tl) {
365 fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
366 stg_exit(EXIT_FAILURE);
371 ASSERT(SparksAvail>0);
375 else if ( RtsFlags.GranFlags.DoPrioritySparking ||
376 (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
378 if (RtsFlags.GranFlags.DoPrioritySparking)
379 barf("Priority sparking not yet implemented");
384 else /* only used if SparkPriority2 is defined */
386 /* ToDo: fix the code below and re-integrate it */
387 /* Remember first low priority spark */
388 if (low_priority_spark==(rtsSpark*)NULL) {
389 low_priority_spark_prev = prev;
390 low_priority_spark = spark;
393 if (spark->next == (rtsSpark*)NULL) {
394 /* ASSERT(spark==spark_queue_tl); just for testing */
395 prev = low_priority_spark_prev;
396 spark = low_priority_spark;
397 found = rtsTrue; /* take low pri spark => rc is 2 */
401 /* Should never happen; just for testing
402 if (spark==pending_sparks_tl) {
403 fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
404 stg_exit(EXIT_FAILURE);
411 belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
412 spark->gran_info, RtsFlags.GranFlags.SparkPriority,
413 spark->node, spark->name);)
416 } /* while (spark!=NULL && !found) */
423 Turn the spark into a thread.
424 In GranSim this basically means scheduling a StartThread event for the
425 node pointed to by the spark at some point in the future.
426 (was munch_spark in the old RTS)
428 //@cindex activateSpark
430 activateSpark (rtsEvent *event, rtsSparkQ spark)
432 PEs proc = event->proc, /* proc to search for work */
433 creator = event->creator; /* proc that requested work */
436 rtsTime spark_arrival_time;
439 We've found a node on PE proc requested by PE creator.
440 If proc==creator we can turn the spark into a thread immediately;
441 otherwise we schedule a MoveSpark event on the requesting PE
447 /* only possible if we simulate GUM style fishing */
448 ASSERT(RtsFlags.GranFlags.Fishing);
450 /* Message packing costs for sending a Fish; qeq jabbI'ID */
451 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
453 if (RtsFlags.GranFlags.GranSimStats.Sparks)
454 DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
455 (StgTSO*)NULL, spark->node,
456 spark->name, spark_queue_len(proc));
458 /* time of the spark arrival on the remote PE */
459 spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
461 new_event(creator, proc, spark_arrival_time,
463 (StgTSO*)NULL, spark->node, spark);
465 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
467 } else { /* proc==creator i.e. turn the spark into a thread */
469 if ( RtsFlags.GranFlags.GranSimStats.Global &&
470 spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
472 globalGranStats.tot_low_pri_sparks++;
474 belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
476 spark->node, spark->name));
479 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
484 /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
485 if (GARBAGE COLLECTION IS NECESSARY) {
486 /* Some kind of backoff needed here in case there's too little heap */
487 # if defined(GRAN_CHECK) && defined(GRAN)
488 if (RtsFlags.GcFlags.giveStats)
489 fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p; name=%u\n",
490 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
491 spark, node, spark->name);
493 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
495 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
496 barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
497 GarbageCollect(GetRoots, rtsFalse);
498 // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
499 // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
501 return; /* was: continue; */ /* to the next event, eventually */
505 if (RtsFlags.GranFlags.GranSimStats.Sparks)
506 DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
507 spark->node, spark->name,
508 spark_queue_len(CurrentProc));
510 new_event(proc, proc, CurrentTime[proc],
512 END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
514 procStatus[proc] = Starting;
518 /* -------------------------------------------------------------------------
519 This is the main point where handling granularity information comes into
521 ------------------------------------------------------------------------- */
523 #define MAX_RAND_PRI 100
526 Granularity info transformers.
527 Applied to the GRAN_INFO field of a spark.
529 static inline nat ID(nat x) { return(x); };
530 static inline nat INV(nat x) { return(-x); };
531 static inline nat IGNORE(nat x) { return (0); };
532 static inline nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
534 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
537 newSpark(node,name,gran_info,size_info,par_info,local)
539 nat name, gran_info, size_info, par_info, local;
544 pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
545 RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
546 RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
549 if ( RtsFlags.GranFlags.SparkPriority!=0 &&
550 pri<RtsFlags.GranFlags.SparkPriority ) {
552 belch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
553 pri, RtsFlags.GranFlags.SparkPriority, node, name));
554 return ((rtsSpark*)NULL);
557 newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
558 newspark->prev = newspark->next = (rtsSpark*)NULL;
559 newspark->node = node;
560 newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
561 newspark->gran_info = pri;
562 newspark->global = !local; /* Check that with parAt, parAtAbs !!*/
564 if (RtsFlags.GranFlags.GranSimStats.Global) {
565 globalGranStats.tot_sparks_created++;
566 globalGranStats.sparks_created_on_PE[CurrentProc]++;
572 //@cindex disposeSpark
581 //@cindex disposeSparkQ
589 disposeSparkQ(spark->next);
592 if (SparksAvail < 0) {
593 fprintf(stderr,"disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
602 With PrioritySparking add_to_spark_queue performs an insert sort to keep
603 the spark queue sorted. Otherwise the spark is just added to the end of
607 //@cindex add_to_spark_queue
609 add_to_spark_queue(spark)
612 rtsSpark *prev = NULL, *next = NULL;
614 rtsBool found = rtsFalse;
616 if ( spark == (rtsSpark *)NULL ) {
620 if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
621 /* Priority sparking is enabled i.e. spark queues must be sorted */
623 for (prev = NULL, next = pending_sparks_hd, count=0;
625 !(found = (spark->gran_info >= next->gran_info));
626 prev = next, next = next->next, count++)
629 } else { /* 'utQo' */
630 /* Priority sparking is disabled */
632 found = rtsFalse; /* to add it at the end */
637 /* next points to the first spark with a gran_info smaller than that
638 of spark; therefore, add spark before next into the spark queue */
640 if ( next == NULL ) {
641 pending_sparks_tl = spark;
646 if ( prev == NULL ) {
647 pending_sparks_hd = spark;
651 } else { /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
652 /* add the spark at the end of the spark queue */
654 spark->prev = pending_sparks_tl;
655 if (pending_sparks_hd == NULL)
656 pending_sparks_hd = spark;
658 pending_sparks_tl->next = spark;
659 pending_sparks_tl = spark;
663 /* add costs for search in priority sparking */
664 if (RtsFlags.GranFlags.DoPrioritySparking) {
665 CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
668 IF_GRAN_DEBUG(checkSparkQ,
669 belch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
670 spark, spark->node, CurrentProc);
671 print_sparkq_stats());
673 # if defined(GRAN_CHECK)
674 if (RtsFlags.GranFlags.Debug.checkSparkQ) {
675 for (prev = NULL, next = pending_sparks_hd;
677 prev = next, next = next->next)
679 if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
680 fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
682 pending_sparks_tl, prev);
686 # if defined(GRAN_CHECK)
687 /* Check if the sparkq is still sorted. Just for testing, really! */
688 if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
689 RtsFlags.GranFlags.Debug.pri ) {
690 rtsBool sorted = rtsTrue;
691 rtsSpark *prev, *next;
693 if (pending_sparks_hd == NULL ||
694 pending_sparks_hd->next == NULL ) {
695 /* just 1 elem => ok */
697 for (prev = pending_sparks_hd,
698 next = pending_sparks_hd->next;
700 prev = next, next = next->next) {
702 (prev->gran_info >= next->gran_info);
706 fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n",
708 print_sparkq(CurrentProc);
714 //@node Aux fcts, , Basic interface to sparkq, GranSim code
715 //@subsubsection Aux fcts
717 //@cindex spark_queue_len
719 spark_queue_len(proc)
722 rtsSpark *prev, *spark; /* prev only for testing !! */
725 for (len = 0, prev = NULL, spark = pending_sparks_hds[proc];
727 len++, prev = spark, spark = spark->next)
730 # if defined(GRAN_CHECK)
731 if ( RtsFlags.GranFlags.Debug.checkSparkQ )
732 if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
733 fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
734 proc, pending_sparks_tls[proc], prev);
741 Take spark out of the spark queue on PE p and nuke the spark. Adjusts
742 hd and tl pointers of the spark queue. Returns a pointer to the next
745 //@cindex delete_from_sparkq
747 delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
755 barf("delete_from_sparkq: trying to delete NULL spark\n");
757 # if defined(GRAN_CHECK)
758 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
759 fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
760 pending_sparks_hd, pending_sparks_tl,
761 spark->prev, spark, spark->next,
762 (spark->next==NULL ? 0 : spark->next->prev));
766 if (spark->prev==NULL) {
767 /* spark is first spark of queue => adjust hd pointer */
768 ASSERT(pending_sparks_hds[p]==spark);
769 pending_sparks_hds[p] = spark->next;
771 spark->prev->next = spark->next;
773 if (spark->next==NULL) {
774 ASSERT(pending_sparks_tls[p]==spark);
775 /* spark is first spark of queue => adjust tl pointer */
776 pending_sparks_tls[p] = spark->prev;
778 spark->next->prev = spark->prev;
780 new_spark = spark->next;
782 # if defined(GRAN_CHECK)
783 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
784 fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
785 pending_sparks_hd, pending_sparks_tl,
786 spark->prev, spark, spark->next,
787 (spark->next==NULL ? 0 : spark->next->prev), spark);
797 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
798 //@cindex markSparkQueue
802 StgClosure *MarkRoot(StgClosure *root); // prototype
806 for (p=0; p<RtsFlags.GranFlags.proc; p++)
807 for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
808 ASSERT(sp->node!=NULL);
809 ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
810 // ToDo?: statistics gathering here (also for GUM!)
811 sp->node = (StgClosure *)MarkRoot(sp->node);
814 belch("@@ markSparkQueue: spark statistics at start of GC:");
815 print_sparkq_stats());
818 //@cindex print_spark
826 fprintf(stderr,"Spark: NIL\n");
830 ((spark->node==NULL) ? "______" : "%#6lx"),
831 stgCast(StgPtr,spark->node));
833 fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
835 ((spark->global)==rtsTrue?"True":"False"), spark->creator,
836 spark->prev, spark->next);
840 //@cindex print_sparkq
846 rtsSpark *x = pending_sparks_hds[proc];
848 fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x);
849 for (; x!=(rtsSpark*)NULL; x=x->next) {
855 Print a statistics of all spark queues.
857 //@cindex print_sparkq_stats
859 print_sparkq_stats(void)
863 fprintf(stderr, "SparkQs: [");
864 for (p=0; p<RtsFlags.GranFlags.proc; p++)
865 fprintf(stderr, ", PE %d: %d", p, spark_queue_len(p));
866 fprintf(stderr, "\n");