1 /* ---------------------------------------------------------------------------
3 * (c) The GHC Team, 2000
5 * Sparking support for PAR and SMP versions of the RTS.
7 * -------------------------------------------------------------------------*/
9 //@node Spark Management Routines, , ,
10 //@section Spark Management Routines
19 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
20 //@subsection Includes
22 #include "PosixSource.h"
31 # include "ParallelRts.h"
32 # include "GranSimRts.h" // for GR_...
34 # include "GranSimRts.h"
38 #if /*defined(SMP) ||*/ defined(PAR)
40 //@node GUM code, GranSim code, Includes, Spark Management Routines
41 //@subsection GUM code
43 static void slide_spark_pool( StgSparkPool *pool );
46 initSparkPools( void )
52 /* walk over the capabilities, allocating a spark pool for each one */
53 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
55 /* allocate a single spark pool */
59 pool = &(cap->rSparks);
61 pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
62 * sizeof(StgClosure *),
64 pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
65 pool->hd = pool->base;
66 pool->tl = pool->base;
71 We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
72 spark. Rationale, we don't want to give away the only work a PE has.
73 ToDo: introduce low- and high-water-marks for load balancing.
76 findSpark( rtsBool for_export )
80 StgClosure *spark, *first=NULL;
81 rtsBool isIdlePE = EMPTY_RUN_QUEUE();
84 /* walk over the capabilities, allocating a spark pool for each one */
85 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
87 /* allocate a single spark pool */
91 pool = &(cap->rSparks);
92 while (pool->hd < pool->tl) {
94 if (closure_SHOULD_SPARK(spark)) {
95 if (for_export && isIdlePE) {
97 first = spark; // keep the first usable spark if PE is idle
99 pool->hd--; // found a second spark; keep it in the pool
100 ASSERT(*pool->hd==spark);
101 if (RtsFlags.ParFlags.ParStats.Sparks)
102 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
103 GR_STEALING, ((StgTSO *)NULL), first,
104 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
105 return first; // and return the *first* spark found
108 if (RtsFlags.ParFlags.ParStats.Sparks && for_export)
109 DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
110 GR_STEALING, ((StgTSO *)NULL), spark,
111 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
112 return spark; // return first spark found
116 slide_spark_pool(pool);
122 activateSpark is defined in Schedule.c
125 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
127 if (pool->tl == pool->lim)
128 slide_spark_pool(pool);
130 if (closure_SHOULD_SPARK(closure) &&
131 pool->tl < pool->lim) {
132 *(pool->tl++) = closure;
135 // collect parallel global statistics (currently done together with GC stats)
136 if (RtsFlags.ParFlags.ParStats.Global &&
137 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
138 // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime());
139 globalParStats.tot_sparks_created++;
145 // collect parallel global statistics (currently done together with GC stats)
146 if (RtsFlags.ParFlags.ParStats.Global &&
147 RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
148 //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime());
149 globalParStats.tot_sparks_ignored++;
157 slide_spark_pool( StgSparkPool *pool )
159 StgClosure **sparkp, **to_sparkp;
162 to_sparkp = pool->base;
163 while (sparkp < pool->tl) {
164 ASSERT(to_sparkp<=sparkp);
165 ASSERT(*sparkp!=NULL);
166 ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
168 if (closure_SHOULD_SPARK(*sparkp)) {
169 *to_sparkp++ = *sparkp++;
174 pool->hd = pool->base;
175 pool->tl = to_sparkp;
179 spark_queue_len( StgSparkPool *pool )
181 return (nat) (pool->tl - pool->hd);
184 /* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
185 implicit slide i.e. after marking all sparks are at the beginning of the
186 spark pool and the spark pool only contains sparkable closures
189 markSparkQueue( void )
191 StgClosure **sparkp, **to_sparkp;
192 nat n, pruned_sparks; // stats only
196 PAR_TICKY_MARK_SPARK_QUEUE_START();
199 /* walk over the capabilities, allocating a spark pool for each one */
200 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
202 /* allocate a single spark pool */
206 pool = &(cap->rSparks);
215 to_sparkp = pool->base;
216 while (sparkp < pool->tl) {
217 ASSERT(to_sparkp<=sparkp);
218 ASSERT(*sparkp!=NULL);
219 ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
220 // ToDo?: statistics gathering here (also for GUM!)
221 if (closure_SHOULD_SPARK(*sparkp)) {
222 *to_sparkp = MarkRoot(*sparkp);
234 pool->hd = pool->base;
235 pool->tl = to_sparkp;
237 PAR_TICKY_MARK_SPARK_QUEUE_END(n);
241 debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
242 n, pruned_sparks, pthread_self()));
245 debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
246 n, pruned_sparks, mytid));
249 debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks",
254 debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)",
255 spark_queue_len(pool), pool->hd, pool->tl));
269 pool = &(cap->rSparks);
270 ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
272 ASSERT(spark != (StgClosure *)NULL);
279 //@node GranSim code, , GUM code, Spark Management Routines
280 //@subsection GranSim code
283 //* Basic interface to sparkq::
287 //@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
288 //@subsubsection Basic interface to sparkq
290 Search the spark queue of the proc in event for a spark that's worth
291 turning into a thread
292 (was gimme_spark in the old RTS)
294 //@cindex findLocalSpark
296 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
298 PEs proc = event->proc, /* proc to search for work */
299 creator = event->creator; /* proc that requested work */
302 rtsSparkQ spark_of_non_local_node = NULL,
303 spark_of_non_local_node_prev = NULL,
304 low_priority_spark = NULL,
305 low_priority_spark_prev = NULL,
306 spark = NULL, prev = NULL;
308 /* Choose a spark from the local spark queue */
309 prev = (rtsSpark*)NULL;
310 spark = pending_sparks_hds[proc];
313 // ToDo: check this code & implement local sparking !! -- HWL
314 while (!found && spark != (rtsSpark*)NULL)
316 ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
317 (prev==(rtsSpark*)NULL || prev->next==spark) &&
318 (spark->prev==prev));
320 if (!closure_SHOULD_SPARK(node))
322 IF_GRAN_DEBUG(checkSparkQ,
323 debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
326 if (RtsFlags.GranFlags.GranSimStats.Sparks)
327 DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
328 spark->node, spark->name, spark_queue_len(proc));
330 ASSERT(spark != (rtsSpark*)NULL);
331 ASSERT(SparksAvail>0);
334 ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
335 spark = delete_from_sparkq (spark, proc, rtsTrue);
336 if (spark != (rtsSpark*)NULL)
340 /* -- node should eventually be sparked */
341 else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes &&
342 !IS_LOCAL_TO(PROCS(node),CurrentProc))
344 barf("Local sparking not yet implemented");
346 /* Remember first low priority spark */
347 if (spark_of_non_local_node==(rtsSpark*)NULL) {
348 spark_of_non_local_node_prev = prev;
349 spark_of_non_local_node = spark;
352 if (spark->next == (rtsSpark*)NULL) {
353 /* ASSERT(spark==SparkQueueTl); just for testing */
354 prev = spark_of_non_local_node_prev;
355 spark = spark_of_non_local_node;
360 # if defined(GRAN) && defined(GRAN_CHECK)
361 /* Should never happen; just for testing
362 if (spark==pending_sparks_tl) {
363 debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
364 stg_exit(EXIT_FAILURE);
369 ASSERT(SparksAvail>0);
373 else if ( RtsFlags.GranFlags.DoPrioritySparking ||
374 (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
376 if (RtsFlags.GranFlags.DoPrioritySparking)
377 barf("Priority sparking not yet implemented");
382 else /* only used if SparkPriority2 is defined */
384 /* ToDo: fix the code below and re-integrate it */
385 /* Remember first low priority spark */
386 if (low_priority_spark==(rtsSpark*)NULL) {
387 low_priority_spark_prev = prev;
388 low_priority_spark = spark;
391 if (spark->next == (rtsSpark*)NULL) {
392 /* ASSERT(spark==spark_queue_tl); just for testing */
393 prev = low_priority_spark_prev;
394 spark = low_priority_spark;
395 found = rtsTrue; /* take low pri spark => rc is 2 */
399 /* Should never happen; just for testing
400 if (spark==pending_sparks_tl) {
401 debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
402 stg_exit(EXIT_FAILURE);
409 debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
410 spark->gran_info, RtsFlags.GranFlags.SparkPriority,
411 spark->node, spark->name);)
414 } /* while (spark!=NULL && !found) */
421 Turn the spark into a thread.
422 In GranSim this basically means scheduling a StartThread event for the
423 node pointed to by the spark at some point in the future.
424 (was munch_spark in the old RTS)
426 //@cindex activateSpark
428 activateSpark (rtsEvent *event, rtsSparkQ spark)
430 PEs proc = event->proc, /* proc to search for work */
431 creator = event->creator; /* proc that requested work */
434 rtsTime spark_arrival_time;
437 We've found a node on PE proc requested by PE creator.
438 If proc==creator we can turn the spark into a thread immediately;
439 otherwise we schedule a MoveSpark event on the requesting PE
445 /* only possible if we simulate GUM style fishing */
446 ASSERT(RtsFlags.GranFlags.Fishing);
448 /* Message packing costs for sending a Fish; qeq jabbI'ID */
449 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
451 if (RtsFlags.GranFlags.GranSimStats.Sparks)
452 DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
453 (StgTSO*)NULL, spark->node,
454 spark->name, spark_queue_len(proc));
456 /* time of the spark arrival on the remote PE */
457 spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
459 new_event(creator, proc, spark_arrival_time,
461 (StgTSO*)NULL, spark->node, spark);
463 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
465 } else { /* proc==creator i.e. turn the spark into a thread */
467 if ( RtsFlags.GranFlags.GranSimStats.Global &&
468 spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
470 globalGranStats.tot_low_pri_sparks++;
472 debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
474 spark->node, spark->name));
477 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
482 /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
483 if (GARBAGE COLLECTION IS NECESSARY) {
484 /* Some kind of backoff needed here in case there's too little heap */
485 # if defined(GRAN_CHECK) && defined(GRAN)
486 if (RtsFlags.GcFlags.giveStats)
487 fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p; name=%u\n",
488 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
489 spark, node, spark->name);
491 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
493 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
494 barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
495 GarbageCollect(GetRoots, rtsFalse);
496 // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
497 // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
499 return; /* was: continue; */ /* to the next event, eventually */
503 if (RtsFlags.GranFlags.GranSimStats.Sparks)
504 DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
505 spark->node, spark->name,
506 spark_queue_len(CurrentProc));
508 new_event(proc, proc, CurrentTime[proc],
510 END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
512 procStatus[proc] = Starting;
516 /* -------------------------------------------------------------------------
517 This is the main point where handling granularity information comes into
519 ------------------------------------------------------------------------- */
521 #define MAX_RAND_PRI 100
524 Granularity info transformers.
525 Applied to the GRAN_INFO field of a spark.
527 STATIC_INLINE nat ID(nat x) { return(x); };
528 STATIC_INLINE nat INV(nat x) { return(-x); };
529 STATIC_INLINE nat IGNORE(nat x) { return (0); };
530 STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
532 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
535 newSpark(node,name,gran_info,size_info,par_info,local)
537 nat name, gran_info, size_info, par_info, local;
542 pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
543 RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
544 RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
547 if ( RtsFlags.GranFlags.SparkPriority!=0 &&
548 pri<RtsFlags.GranFlags.SparkPriority ) {
550 debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
551 pri, RtsFlags.GranFlags.SparkPriority, node, name));
552 return ((rtsSpark*)NULL);
555 newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
556 newspark->prev = newspark->next = (rtsSpark*)NULL;
557 newspark->node = node;
558 newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
559 newspark->gran_info = pri;
560 newspark->global = !local; /* Check that with parAt, parAtAbs !!*/
562 if (RtsFlags.GranFlags.GranSimStats.Global) {
563 globalGranStats.tot_sparks_created++;
564 globalGranStats.sparks_created_on_PE[CurrentProc]++;
570 //@cindex disposeSpark
579 //@cindex disposeSparkQ
587 disposeSparkQ(spark->next);
590 if (SparksAvail < 0) {
591 debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
600 With PrioritySparking add_to_spark_queue performs an insert sort to keep
601 the spark queue sorted. Otherwise the spark is just added to the end of
605 //@cindex add_to_spark_queue
607 add_to_spark_queue(spark)
610 rtsSpark *prev = NULL, *next = NULL;
612 rtsBool found = rtsFalse;
614 if ( spark == (rtsSpark *)NULL ) {
618 if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
619 /* Priority sparking is enabled i.e. spark queues must be sorted */
621 for (prev = NULL, next = pending_sparks_hd, count=0;
623 !(found = (spark->gran_info >= next->gran_info));
624 prev = next, next = next->next, count++)
627 } else { /* 'utQo' */
628 /* Priority sparking is disabled */
630 found = rtsFalse; /* to add it at the end */
635 /* next points to the first spark with a gran_info smaller than that
636 of spark; therefore, add spark before next into the spark queue */
638 if ( next == NULL ) {
639 pending_sparks_tl = spark;
644 if ( prev == NULL ) {
645 pending_sparks_hd = spark;
649 } else { /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
650 /* add the spark at the end of the spark queue */
652 spark->prev = pending_sparks_tl;
653 if (pending_sparks_hd == NULL)
654 pending_sparks_hd = spark;
656 pending_sparks_tl->next = spark;
657 pending_sparks_tl = spark;
661 /* add costs for search in priority sparking */
662 if (RtsFlags.GranFlags.DoPrioritySparking) {
663 CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
666 IF_GRAN_DEBUG(checkSparkQ,
667 debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
668 spark, spark->node, CurrentProc);
669 print_sparkq_stats());
671 # if defined(GRAN_CHECK)
672 if (RtsFlags.GranFlags.Debug.checkSparkQ) {
673 for (prev = NULL, next = pending_sparks_hd;
675 prev = next, next = next->next)
677 if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
678 debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
680 pending_sparks_tl, prev);
684 # if defined(GRAN_CHECK)
685 /* Check if the sparkq is still sorted. Just for testing, really! */
686 if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
687 RtsFlags.GranFlags.Debug.pri ) {
688 rtsBool sorted = rtsTrue;
689 rtsSpark *prev, *next;
691 if (pending_sparks_hd == NULL ||
692 pending_sparks_hd->next == NULL ) {
693 /* just 1 elem => ok */
695 for (prev = pending_sparks_hd,
696 next = pending_sparks_hd->next;
698 prev = next, next = next->next) {
700 (prev->gran_info >= next->gran_info);
704 debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
706 print_sparkq(CurrentProc);
712 //@node Aux fcts, , Basic interface to sparkq, GranSim code
713 //@subsubsection Aux fcts
715 //@cindex spark_queue_len
717 spark_queue_len(proc)
720 rtsSpark *prev, *spark; /* prev only for testing !! */
723 for (len = 0, prev = NULL, spark = pending_sparks_hds[proc];
725 len++, prev = spark, spark = spark->next)
728 # if defined(GRAN_CHECK)
729 if ( RtsFlags.GranFlags.Debug.checkSparkQ )
730 if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
731 debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
732 proc, pending_sparks_tls[proc], prev);
739 Take spark out of the spark queue on PE p and nuke the spark. Adjusts
740 hd and tl pointers of the spark queue. Returns a pointer to the next
743 //@cindex delete_from_sparkq
745 delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
753 barf("delete_from_sparkq: trying to delete NULL spark\n");
755 # if defined(GRAN_CHECK)
756 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
757 debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
758 pending_sparks_hd, pending_sparks_tl,
759 spark->prev, spark, spark->next,
760 (spark->next==NULL ? 0 : spark->next->prev));
764 if (spark->prev==NULL) {
765 /* spark is first spark of queue => adjust hd pointer */
766 ASSERT(pending_sparks_hds[p]==spark);
767 pending_sparks_hds[p] = spark->next;
769 spark->prev->next = spark->next;
771 if (spark->next==NULL) {
772 ASSERT(pending_sparks_tls[p]==spark);
773 /* spark is first spark of queue => adjust tl pointer */
774 pending_sparks_tls[p] = spark->prev;
776 spark->next->prev = spark->prev;
778 new_spark = spark->next;
780 # if defined(GRAN_CHECK)
781 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
782 debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
783 pending_sparks_hd, pending_sparks_tl,
784 spark->prev, spark, spark->next,
785 (spark->next==NULL ? 0 : spark->next->prev), spark);
795 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
796 //@cindex markSparkQueue
800 StgClosure *MarkRoot(StgClosure *root); // prototype
804 for (p=0; p<RtsFlags.GranFlags.proc; p++)
805 for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
806 ASSERT(sp->node!=NULL);
807 ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
808 // ToDo?: statistics gathering here (also for GUM!)
809 sp->node = (StgClosure *)MarkRoot(sp->node);
812 debugBelch("@@ markSparkQueue: spark statistics at start of GC:");
813 print_sparkq_stats());
816 //@cindex print_spark
824 debugBelch("Spark: NIL\n");
828 ((spark->node==NULL) ? "______" : "%#6lx"),
829 stgCast(StgPtr,spark->node));
831 debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
833 ((spark->global)==rtsTrue?"True":"False"), spark->creator,
834 spark->prev, spark->next);
838 //@cindex print_sparkq
844 rtsSpark *x = pending_sparks_hds[proc];
846 debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
847 for (; x!=(rtsSpark*)NULL; x=x->next) {
853 Print a statistics of all spark queues.
855 //@cindex print_sparkq_stats
857 print_sparkq_stats(void)
861 debugBelch("SparkQs: [");
862 for (p=0; p<RtsFlags.GranFlags.proc; p++)
863 debugBelch(", PE %d: %d", p, spark_queue_len(p));