1 /* ---------------------------------------------------------------------------
2 * $Id: Sparks.c,v 1.2 2000/03/31 03:09:36 hwloidl Exp $
4 * (c) The GHC Team, 2000
6 * Sparking support for PAR and SMP versions of the RTS.
8 * -------------------------------------------------------------------------*/
10 //@node Spark Management Routines, , ,
11 //@section Spark Management Routines
19 //@node Includes, GUM code, Spark Management Routines, Spark Management Routines
20 //@subsection Includes
29 # include "ParallelRts.h"
31 # include "GranSimRts.h"
35 #if defined(SMP) || defined(PAR)
37 //@node GUM code, GranSim code, Includes, Spark Management Routines
38 //@subsection GUM code
40 static void slide_spark_pool( StgSparkPool *pool );
43 initSparkPools( void )
49 /* walk over the capabilities, allocating a spark pool for each one */
50 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
52 /* allocate a single spark pool */
56 pool = &(cap->rSparks);
58 pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
59 * sizeof(StgClosure *),
61 pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
62 pool->hd = pool->base;
63 pool->tl = pool->base;
75 /* walk over the capabilities, allocating a spark pool for each one */
76 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
78 /* allocate a single spark pool */
82 pool = &(cap->rSparks);
83 while (pool->hd < pool->tl) {
85 if (closure_SHOULD_SPARK(spark))
88 slide_spark_pool(pool);
94 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
96 if (pool->tl == pool->lim)
97 slide_spark_pool(pool);
99 if (closure_SHOULD_SPARK(closure) &&
100 pool->tl < pool->lim) {
101 *(pool->tl++) = closure;
109 slide_spark_pool( StgSparkPool *pool )
111 StgClosure **sparkp, **to_sparkp;
114 to_sparkp = pool->base;
115 while (sparkp < pool->tl) {
116 ASSERT(to_sparkp<=sparkp);
117 ASSERT(*sparkp!=NULL);
118 ASSERT(LOOKS_LIKE_GHC_INFO((*sparkp)->header.info));
120 if (closure_SHOULD_SPARK(*sparkp)) {
121 *to_sparkp++ = *sparkp++;
126 pool->hd = pool->base;
127 pool->tl = to_sparkp;
131 spark_queue_len( StgSparkPool *pool )
133 return (nat) (pool->tl - pool->hd);
136 /* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
137 implicit slide i.e. after marking all sparks are at the beginning of the
138 spark pool and the spark pool only contains sparkable closures
141 markSparkQueue( void )
143 StgClosure **sparkp, **to_sparkp;
145 nat n, pruned_sparks;
151 /* walk over the capabilities, allocating a spark pool for each one */
152 for (cap = free_capabilities; cap != NULL; cap = cap->link) {
154 /* allocate a single spark pool */
158 pool = &(cap->rSparks);
166 to_sparkp = pool->base;
167 while (sparkp < pool->tl) {
168 ASSERT(to_sparkp<=sparkp);
169 ASSERT(*sparkp!=NULL);
170 ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
171 // ToDo?: statistics gathering here (also for GUM!)
172 if (closure_SHOULD_SPARK(*sparkp)) {
173 *to_sparkp = MarkRoot(*sparkp);
185 pool->hd = pool->base;
186 pool->tl = to_sparkp;
190 belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
191 n, pruned_sparks, pthread_self()));
194 belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
195 n, pruned_sparks, mytid));
198 belch("markSparkQueue: marked %d sparks and pruned %d sparks",
203 belch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)",
204 spark_queue_len(pool), pool->hd, pool->tl));
218 pool = &(cap->rSparks);
219 ASSERT(pool->hd <= pool->tl && pool->tl <= pool->lim);
221 ASSERT(spark != (StgClosure *)NULL);
228 //@node GranSim code, , GUM code, Spark Management Routines
229 //@subsection GranSim code
232 //* Basic interface to sparkq::
236 //@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
237 //@subsubsection Basic interface to sparkq
239 Search the spark queue of the proc in event for a spark that's worth
240 turning into a thread
241 (was gimme_spark in the old RTS)
243 //@cindex findLocalSpark
245 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
247 PEs proc = event->proc, /* proc to search for work */
248 creator = event->creator; /* proc that requested work */
251 rtsSparkQ spark_of_non_local_node = NULL,
252 spark_of_non_local_node_prev = NULL,
253 low_priority_spark = NULL,
254 low_priority_spark_prev = NULL,
255 spark = NULL, prev = NULL;
257 /* Choose a spark from the local spark queue */
258 prev = (rtsSpark*)NULL;
259 spark = pending_sparks_hds[proc];
262 // ToDo: check this code & implement local sparking !! -- HWL
263 while (!found && spark != (rtsSpark*)NULL)
265 ASSERT((prev!=(rtsSpark*)NULL || spark==pending_sparks_hds[proc]) &&
266 (prev==(rtsSpark*)NULL || prev->next==spark) &&
267 (spark->prev==prev));
269 if (!closure_SHOULD_SPARK(node))
271 IF_GRAN_DEBUG(checkSparkQ,
272 belch("^^ pruning spark %p (node %p) in gimme_spark",
275 if (RtsFlags.GranFlags.GranSimStats.Sparks)
276 DumpRawGranEvent(proc, (PEs)0, SP_PRUNED,(StgTSO*)NULL,
277 spark->node, spark->name, spark_queue_len(proc));
279 ASSERT(spark != (rtsSpark*)NULL);
280 ASSERT(SparksAvail>0);
283 ASSERT(prev==(rtsSpark*)NULL || prev->next==spark);
284 spark = delete_from_sparkq (spark, proc, rtsTrue);
285 if (spark != (rtsSpark*)NULL)
289 /* -- node should eventually be sparked */
290 else if (RtsFlags.GranFlags.PreferSparksOfLocalNodes &&
291 !IS_LOCAL_TO(PROCS(node),CurrentProc))
293 barf("Local sparking not yet implemented");
295 /* Remember first low priority spark */
296 if (spark_of_non_local_node==(rtsSpark*)NULL) {
297 spark_of_non_local_node_prev = prev;
298 spark_of_non_local_node = spark;
301 if (spark->next == (rtsSpark*)NULL) {
302 /* ASSERT(spark==SparkQueueTl); just for testing */
303 prev = spark_of_non_local_node_prev;
304 spark = spark_of_non_local_node;
309 # if defined(GRAN) && defined(GRAN_CHECK)
310 /* Should never happen; just for testing
311 if (spark==pending_sparks_tl) {
312 fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
313 stg_exit(EXIT_FAILURE);
318 ASSERT(SparksAvail>0);
322 else if ( RtsFlags.GranFlags.DoPrioritySparking ||
323 (spark->gran_info >= RtsFlags.GranFlags.SparkPriority2) )
325 if (RtsFlags.GranFlags.DoPrioritySparking)
326 barf("Priority sparking not yet implemented");
331 else /* only used if SparkPriority2 is defined */
333 /* ToDo: fix the code below and re-integrate it */
334 /* Remember first low priority spark */
335 if (low_priority_spark==(rtsSpark*)NULL) {
336 low_priority_spark_prev = prev;
337 low_priority_spark = spark;
340 if (spark->next == (rtsSpark*)NULL) {
341 /* ASSERT(spark==spark_queue_tl); just for testing */
342 prev = low_priority_spark_prev;
343 spark = low_priority_spark;
344 found = rtsTrue; /* take low pri spark => rc is 2 */
348 /* Should never happen; just for testing
349 if (spark==pending_sparks_tl) {
350 fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
351 stg_exit(EXIT_FAILURE);
358 belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
359 spark->gran_info, RtsFlags.GranFlags.SparkPriority,
360 spark->node, spark->name);)
363 } /* while (spark!=NULL && !found) */
370 Turn the spark into a thread.
371 In GranSim this basically means scheduling a StartThread event for the
372 node pointed to by the spark at some point in the future.
373 (was munch_spark in the old RTS)
375 //@cindex activateSpark
377 activateSpark (rtsEvent *event, rtsSparkQ spark)
379 PEs proc = event->proc, /* proc to search for work */
380 creator = event->creator; /* proc that requested work */
383 rtsTime spark_arrival_time;
386 We've found a node on PE proc requested by PE creator.
387 If proc==creator we can turn the spark into a thread immediately;
388 otherwise we schedule a MoveSpark event on the requesting PE
394 /* only possible if we simulate GUM style fishing */
395 ASSERT(RtsFlags.GranFlags.Fishing);
397 /* Message packing costs for sending a Fish; qeq jabbI'ID */
398 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
400 if (RtsFlags.GranFlags.GranSimStats.Sparks)
401 DumpRawGranEvent(proc, (PEs)0, SP_EXPORTED,
402 (StgTSO*)NULL, spark->node,
403 spark->name, spark_queue_len(proc));
405 /* time of the spark arrival on the remote PE */
406 spark_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
408 new_event(creator, proc, spark_arrival_time,
410 (StgTSO*)NULL, spark->node, spark);
412 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
414 } else { /* proc==creator i.e. turn the spark into a thread */
416 if ( RtsFlags.GranFlags.GranSimStats.Global &&
417 spark->gran_info < RtsFlags.GranFlags.SparkPriority2 ) {
419 globalGranStats.tot_low_pri_sparks++;
421 belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
423 spark->node, spark->name);)
426 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
431 /* ToDo: fix the GC interface and move to StartThread handling-- HWL */
432 if (GARBAGE COLLECTION IS NECESSARY) {
433 /* Some kind of backoff needed here in case there's too little heap */
434 # if defined(GRAN_CHECK) && defined(GRAN)
435 if (RtsFlags.GcFlags.giveStats)
436 fprintf(RtsFlags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%p, node=%p; name=%u\n",
437 /* (found==2 ? "no hi pri spark" : "hi pri spark"), */
438 spark, node, spark->name);
440 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+1,
442 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
443 barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()");
444 GarbageCollect(GetRoots);
445 // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse);
446 // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE;
448 return; /* was: continue; */ /* to the next event, eventually */
452 if (RtsFlags.GranFlags.GranSimStats.Sparks)
453 DumpRawGranEvent(CurrentProc,(PEs)0,SP_USED,(StgTSO*)NULL,
454 spark->node, spark->name,
455 spark_queue_len(CurrentProc));
457 new_event(proc, proc, CurrentTime[proc],
459 END_TSO_QUEUE, node, spark); // (rtsSpark*)NULL);
461 procStatus[proc] = Starting;
465 /* -------------------------------------------------------------------------
466 This is the main point where handling granularity information comes into
468 ------------------------------------------------------------------------- */
470 #define MAX_RAND_PRI 100
473 Granularity info transformers.
474 Applied to the GRAN_INFO field of a spark.
476 static inline nat ID(nat x) { return(x); };
477 static inline nat INV(nat x) { return(-x); };
478 static inline nat IGNORE(nat x) { return (0); };
479 static inline nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
481 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
484 newSpark(node,name,gran_info,size_info,par_info,local)
486 nat name, gran_info, size_info, par_info, local;
491 pri = RtsFlags.GranFlags.RandomPriorities ? RAND(gran_info) :
492 RtsFlags.GranFlags.InversePriorities ? INV(gran_info) :
493 RtsFlags.GranFlags.IgnorePriorities ? IGNORE(gran_info) :
496 if ( RtsFlags.GranFlags.SparkPriority!=0 &&
497 pri<RtsFlags.GranFlags.SparkPriority ) {
499 belch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
500 pri, RtsFlags.GranFlags.SparkPriority, node, name));
501 return ((rtsSpark*)NULL);
504 newspark = (rtsSpark*) stgMallocBytes(sizeof(rtsSpark), "NewSpark");
505 newspark->prev = newspark->next = (rtsSpark*)NULL;
506 newspark->node = node;
507 newspark->name = (name==1) ? CurrentTSO->gran.sparkname : name;
508 newspark->gran_info = pri;
509 newspark->global = !local; /* Check that with parAt, parAtAbs !!*/
511 if (RtsFlags.GranFlags.GranSimStats.Global) {
512 globalGranStats.tot_sparks_created++;
513 globalGranStats.sparks_created_on_PE[CurrentProc]++;
519 //@cindex disposeSpark
528 //@cindex disposeSparkQ
536 disposeSparkQ(spark->next);
539 if (SparksAvail < 0) {
540 fprintf(stderr,"disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
549 With PrioritySparking add_to_spark_queue performs an insert sort to keep
550 the spark queue sorted. Otherwise the spark is just added to the end of
554 //@cindex add_to_spark_queue
556 add_to_spark_queue(spark)
559 rtsSpark *prev = NULL, *next = NULL;
561 rtsBool found = rtsFalse;
563 if ( spark == (rtsSpark *)NULL ) {
567 if (RtsFlags.GranFlags.DoPrioritySparking && (spark->gran_info != 0) ) {
568 /* Priority sparking is enabled i.e. spark queues must be sorted */
570 for (prev = NULL, next = pending_sparks_hd, count=0;
572 !(found = (spark->gran_info >= next->gran_info));
573 prev = next, next = next->next, count++)
576 } else { /* 'utQo' */
577 /* Priority sparking is disabled */
579 found = rtsFalse; /* to add it at the end */
584 /* next points to the first spark with a gran_info smaller than that
585 of spark; therefore, add spark before next into the spark queue */
587 if ( next == NULL ) {
588 pending_sparks_tl = spark;
593 if ( prev == NULL ) {
594 pending_sparks_hd = spark;
598 } else { /* (RtsFlags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */
599 /* add the spark at the end of the spark queue */
601 spark->prev = pending_sparks_tl;
602 if (pending_sparks_hd == NULL)
603 pending_sparks_hd = spark;
605 pending_sparks_tl->next = spark;
606 pending_sparks_tl = spark;
610 /* add costs for search in priority sparking */
611 if (RtsFlags.GranFlags.DoPrioritySparking) {
612 CurrentTime[CurrentProc] += count * RtsFlags.GranFlags.Costs.pri_spark_overhead;
615 IF_GRAN_DEBUG(checkSparkQ,
616 belch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
617 spark, spark->node, CurrentProc);
618 print_sparkq_stats());
620 # if defined(GRAN_CHECK)
621 if (RtsFlags.GranFlags.Debug.checkSparkQ) {
622 for (prev = NULL, next = pending_sparks_hd;
624 prev = next, next = next->next)
626 if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
627 fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
629 pending_sparks_tl, prev);
633 # if defined(GRAN_CHECK)
634 /* Check if the sparkq is still sorted. Just for testing, really! */
635 if ( RtsFlags.GranFlags.Debug.checkSparkQ &&
636 RtsFlags.GranFlags.Debug.pri ) {
637 rtsBool sorted = rtsTrue;
638 rtsSpark *prev, *next;
640 if (pending_sparks_hd == NULL ||
641 pending_sparks_hd->next == NULL ) {
642 /* just 1 elem => ok */
644 for (prev = pending_sparks_hd,
645 next = pending_sparks_hd->next;
647 prev = next, next = next->next) {
649 (prev->gran_info >= next->gran_info);
653 fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n",
655 print_sparkq(CurrentProc);
661 //@node Aux fcts, , Basic interface to sparkq, GranSim code
662 //@subsubsection Aux fcts
664 //@cindex spark_queue_len
666 spark_queue_len(proc)
669 rtsSpark *prev, *spark; /* prev only for testing !! */
672 for (len = 0, prev = NULL, spark = pending_sparks_hds[proc];
674 len++, prev = spark, spark = spark->next)
677 # if defined(GRAN_CHECK)
678 if ( RtsFlags.GranFlags.Debug.checkSparkQ )
679 if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
680 fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
681 proc, pending_sparks_tls[proc], prev);
688 Take spark out of the spark queue on PE p and nuke the spark. Adjusts
689 hd and tl pointers of the spark queue. Returns a pointer to the next
692 //@cindex delete_from_sparkq
694 delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
702 barf("delete_from_sparkq: trying to delete NULL spark\n");
704 # if defined(GRAN_CHECK)
705 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
706 fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
707 pending_sparks_hd, pending_sparks_tl,
708 spark->prev, spark, spark->next,
709 (spark->next==NULL ? 0 : spark->next->prev));
713 if (spark->prev==NULL) {
714 /* spark is first spark of queue => adjust hd pointer */
715 ASSERT(pending_sparks_hds[p]==spark);
716 pending_sparks_hds[p] = spark->next;
718 spark->prev->next = spark->next;
720 if (spark->next==NULL) {
721 ASSERT(pending_sparks_tls[p]==spark);
722 /* spark is first spark of queue => adjust tl pointer */
723 pending_sparks_tls[p] = spark->prev;
725 spark->next->prev = spark->prev;
727 new_spark = spark->next;
729 # if defined(GRAN_CHECK)
730 if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
731 fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
732 pending_sparks_hd, pending_sparks_tl,
733 spark->prev, spark, spark->next,
734 (spark->next==NULL ? 0 : spark->next->prev), spark);
744 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
745 //@cindex markSparkQueue
749 StgClosure *MarkRoot(StgClosure *root); // prototype
753 for (p=0; p<RtsFlags.GranFlags.proc; p++)
754 for (sp=pending_sparks_hds[p]; sp!=NULL; sp=sp->next) {
755 ASSERT(sp->node!=NULL);
756 ASSERT(LOOKS_LIKE_GHC_INFO(sp->node->header.info));
757 // ToDo?: statistics gathering here (also for GUM!)
758 sp->node = (StgClosure *)MarkRoot(sp->node);
761 belch("@@ markSparkQueue: spark statistics at start of GC:");
762 print_sparkq_stats());
765 //@cindex print_spark
773 fprintf(stderr,"Spark: NIL\n");
777 ((spark->node==NULL) ? "______" : "%#6lx"),
778 stgCast(StgPtr,spark->node));
780 fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
782 ((spark->global)==rtsTrue?"True":"False"), spark->creator,
783 spark->prev, spark->next);
787 //@cindex print_sparkq
793 rtsSpark *x = pending_sparks_hds[proc];
795 fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x);
796 for (; x!=(rtsSpark*)NULL; x=x->next) {
802 Print a statistics of all spark queues.
804 //@cindex print_sparkq_stats
806 print_sparkq_stats(void)
810 fprintf(stderr, "SparkQs: [");
811 for (p=0; p<RtsFlags.GranFlags.proc; p++)
812 fprintf(stderr, ", PE %d: %d", p, spark_queue_len(p));
813 fprintf(stderr, "\n");