X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSparks.c;h=615d832e33e8de18e45e095030da158dc419d843;hb=28a464a75e14cece5db40f2765a29348273ff2d2;hp=46c3225256026b41d13468ba6ebdd0d4d9d88ca2;hpb=bc5c802181b513216bc88f0d1ec9574157ee05fe;p=ghc-hetmet.git diff --git a/ghc/rts/Sparks.c b/ghc/rts/Sparks.c index 46c3225..615d832 100644 --- a/ghc/rts/Sparks.c +++ b/ghc/rts/Sparks.c @@ -1,25 +1,11 @@ /* --------------------------------------------------------------------------- - * $Id: Sparks.c,v 1.4 2001/08/14 13:40:09 sewardj Exp $ * - * (c) The GHC Team, 2000 + * (c) The GHC Team, 2000-2006 * - * Sparking support for PAR and SMP versions of the RTS. + * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS. * * -------------------------------------------------------------------------*/ -//@node Spark Management Routines, , , -//@section Spark Management Routines - -//@menu -//* Includes:: -//* GUM code:: -//* GranSim code:: -//@end menu -//*/ - -//@node Includes, GUM code, Spark Management Routines, Spark Management Routines -//@subsection Includes - #include "PosixSource.h" #include "Rts.h" #include "Schedule.h" @@ -28,7 +14,7 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "ParTicky.h" -# if defined(PAR) +# if defined(PARALLEL_HASKELL) # include "ParallelRts.h" # include "GranSimRts.h" // for GR_... # elif defined(GRAN) @@ -36,93 +22,226 @@ # endif #include "Sparks.h" -#if defined(SMP) || defined(PAR) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) -//@node GUM code, GranSim code, Includes, Spark Management Routines -//@subsection GUM code +static INLINE_ME void bump_hd (StgSparkPool *p) +{ p->hd++; if (p->hd == p->lim) p->hd = p->base; } -static void slide_spark_pool( StgSparkPool *pool ); +static INLINE_ME void bump_tl (StgSparkPool *p) +{ p->tl++; if (p->tl == p->lim) p->tl = p->base; } -rtsBool -initSparkPools( void ) -{ - Capability *cap; - StgSparkPool *pool; +/* ----------------------------------------------------------------------------- + * + * Initialising spark pools. + * + * -------------------------------------------------------------------------- */ -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { -#else - /* allocate a single spark pool */ - cap = &MainRegTable; - { -#endif - pool = &(cap->rSparks); - +static void +initSparkPool(StgSparkPool *pool) +{ pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks - * sizeof(StgClosure *), - "initSparkPools"); + * sizeof(StgClosure *), + "initSparkPools"); pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks; pool->hd = pool->base; pool->tl = pool->base; - } - return rtsTrue; /* Qapla' */ } -/* - We traverse the spark pool until we find the 2nd usable (i.e. non-NF) - spark. Rationale, we don't want to give away the only work a PE has. - ToDo: introduce low- and high-water-marks for load balancing. -*/ -StgClosure * -findSpark( rtsBool for_export ) +void +initSparkPools( void ) { - Capability *cap; - StgSparkPool *pool; - StgClosure *spark, *first=NULL; - rtsBool isIdlePE = EMPTY_RUN_QUEUE(); - -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { +#ifdef THREADED_RTS + /* walk over the capabilities, allocating a spark pool for each one */ + nat i; + for (i = 0; i < n_capabilities; i++) { + initSparkPool(&capabilities[i].r.rSparks); + } #else - /* allocate a single spark pool */ - cap = &MainRegTable; - { + /* allocate a single spark pool */ + initSparkPool(&MainCapability.r.rSparks); #endif - pool = &(cap->rSparks); - while (pool->hd < pool->tl) { - spark = *pool->hd++; - if (closure_SHOULD_SPARK(spark)) { - if (for_export && isIdlePE) { - if (first==NULL) { - first = spark; // keep the first usable spark if PE is idle - } else { - pool->hd--; // found a second spark; keep it in the pool - ASSERT(*pool->hd==spark); +} + +/* ----------------------------------------------------------------------------- + * + * findSpark: find a spark on the current Capability that we can fork + * into a thread. + * + * -------------------------------------------------------------------------- */ + +StgClosure * +findSpark (Capability *cap) +{ + StgSparkPool *pool; + StgClosure *spark; + + pool = &(cap->r.rSparks); + ASSERT_SPARK_POOL_INVARIANTS(pool); + + while (pool->hd != pool->tl) { + spark = *pool->hd; + bump_hd(pool); + if (closure_SHOULD_SPARK(spark)) { +#ifdef GRAN if (RtsFlags.ParFlags.ParStats.Sparks) - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_STEALING, ((StgTSO *)NULL), first, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); - return first; // and return the *first* spark found - } - } else { - if (RtsFlags.ParFlags.ParStats.Sparks && for_export) - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_STEALING, ((StgTSO *)NULL), spark, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); - return spark; // return first spark found + DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, + GR_STEALING, ((StgTSO *)NULL), spark, + 0, 0 /* spark_queue_len(ADVISORY_POOL) */); +#endif + return spark; } - } } - slide_spark_pool(pool); - } - return NULL; + // spark pool is now empty + return NULL; } -/* - activateSpark is defined in Schedule.c -*/ +/* ----------------------------------------------------------------------------- + * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an + * implicit slide i.e. after marking all sparks are at the beginning of the + * spark pool and the spark pool only contains sparkable closures + * -------------------------------------------------------------------------- */ + +void +markSparkQueue (evac_fn evac) +{ + StgClosure **sparkp, **to_sparkp; + nat i, n, pruned_sparks; // stats only + StgSparkPool *pool; + Capability *cap; + + PAR_TICKY_MARK_SPARK_QUEUE_START(); + + n = 0; + pruned_sparks = 0; + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + pool = &(cap->r.rSparks); + + ASSERT_SPARK_POOL_INVARIANTS(pool); + +#if defined(PARALLEL_HASKELL) + // stats only + n = 0; + pruned_sparks = 0; +#endif + + sparkp = pool->hd; + to_sparkp = pool->hd; + while (sparkp != pool->tl) { + ASSERT(to_sparkp<=sparkp); + ASSERT(*sparkp!=NULL); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp))); + // ToDo?: statistics gathering here (also for GUM!) + if (closure_SHOULD_SPARK(*sparkp)) { + evac(sparkp); + *to_sparkp++ = *sparkp; + n++; + } else { + pruned_sparks++; + } + sparkp++; + if (sparkp == pool->lim) { + sparkp = pool->base; + } + } + pool->tl = to_sparkp; + + PAR_TICKY_MARK_SPARK_QUEUE_END(n); + +#if defined(PARALLEL_HASKELL) + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", + n, pruned_sparks, mytid)); +#else + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n", + n, pruned_sparks)); +#endif + + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)\n", + sparkPoolSize(pool), pool->hd, pool->tl)); + + } +} + +/* ----------------------------------------------------------------------------- + * + * Turn a spark into a real thread + * + * -------------------------------------------------------------------------- */ + +void +createSparkThread (Capability *cap, StgClosure *p) +{ + StgTSO *tso; + + tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p); + appendToRunQueue(cap,tso); +} + +/* ----------------------------------------------------------------------------- + * + * Create a new spark + * + * -------------------------------------------------------------------------- */ + +#define DISCARD_NEW + +StgInt +newSpark (StgRegTable *reg, StgClosure *p) +{ + StgSparkPool *pool = &(reg->rSparks); + + ASSERT_SPARK_POOL_INVARIANTS(pool); + + if (closure_SHOULD_SPARK(p)) { +#ifdef DISCARD_NEW + StgClosure **new_tl; + new_tl = pool->tl + 1; + if (new_tl == pool->lim) { new_tl = pool->base; } + if (new_tl != pool->hd) { + *pool->tl = p; + pool->tl = new_tl; + } else if (!closure_SHOULD_SPARK(*pool->hd)) { + // if the old closure is not sparkable, discard it and + // keep the new one. Otherwise, keep the old one. + *pool->tl = p; + bump_hd(pool); + } +#else /* DISCARD OLD */ + *pool->tl = p; + bump_tl(pool); + if (pool->tl == pool->hd) { bump_hd(pool); } +#endif + } + + ASSERT_SPARK_POOL_INVARIANTS(pool); + return 1; +} + +#else + +StgInt +newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED) +{ + /* nothing */ + return 1; +} + +#endif /* PARALLEL_HASKELL || THREADED_RTS */ + + +/* ----------------------------------------------------------------------------- + * + * GRAN & PARALLEL_HASKELL stuff beyond here. + * + * -------------------------------------------------------------------------- */ + +#if defined(PARALLEL_HASKELL) || defined(GRAN) + +static void slide_spark_pool( StgSparkPool *pool ); + rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ) { @@ -133,21 +252,21 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ) pool->tl < pool->lim) { *(pool->tl++) = closure; -#if defined(PAR) +#if defined(PARALLEL_HASKELL) // collect parallel global statistics (currently done together with GC stats) if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { - // fprintf(stderr, "Creating spark for %x @ %11.2f\n", closure, usertime()); + // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); globalParStats.tot_sparks_created++; } #endif return rtsTrue; } else { -#if defined(PAR) +#if defined(PARALLEL_HASKELL) // collect parallel global statistics (currently done together with GC stats) if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { - //fprintf(stderr, "Ignoring spark for %x @ %11.2f\n", closure, usertime()); + //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); globalParStats.tot_sparks_ignored++; } #endif @@ -177,93 +296,11 @@ slide_spark_pool( StgSparkPool *pool ) pool->tl = to_sparkp; } -nat -spark_queue_len( StgSparkPool *pool ) -{ - return (nat) (pool->tl - pool->hd); -} - -/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an - implicit slide i.e. after marking all sparks are at the beginning of the - spark pool and the spark pool only contains sparkable closures -*/ -void -markSparkQueue( void ) -{ - StgClosure **sparkp, **to_sparkp; - nat n, pruned_sparks; // stats only - StgSparkPool *pool; - Capability *cap; - - PAR_TICKY_MARK_SPARK_QUEUE_START(); - -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { -#else - /* allocate a single spark pool */ - cap = &MainRegTable; - { -#endif - pool = &(cap->rSparks); - -#if defined(PAR) - // stats only - n = 0; - pruned_sparks = 0; -#endif - - sparkp = pool->hd; - to_sparkp = pool->base; - while (sparkp < pool->tl) { - ASSERT(to_sparkp<=sparkp); - ASSERT(*sparkp!=NULL); - ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info)); - // ToDo?: statistics gathering here (also for GUM!) - if (closure_SHOULD_SPARK(*sparkp)) { - *to_sparkp = MarkRoot(*sparkp); - to_sparkp++; -#ifdef PAR - n++; -#endif - } else { -#ifdef PAR - pruned_sparks++; -#endif - } - sparkp++; - } - pool->hd = pool->base; - pool->tl = to_sparkp; - - PAR_TICKY_MARK_SPARK_QUEUE_END(n); - -#if defined(SMP) - IF_DEBUG(scheduler, - belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", - n, pruned_sparks, pthread_self())); -#elif defined(PAR) - IF_DEBUG(scheduler, - belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", - n, pruned_sparks, mytid)); -#else - IF_DEBUG(scheduler, - belch("markSparkQueue: marked %d sparks and pruned %d sparks", - n, pruned_sparks)); -#endif - - IF_DEBUG(scheduler, - belch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)", - spark_queue_len(pool), pool->hd, pool->tl)); - - } -} - void disposeSpark(spark) StgClosure *spark; { -#if !defined(SMP) +#if !defined(THREADED_RTS) Capability *cap; StgSparkPool *pool; @@ -278,22 +315,11 @@ StgClosure *spark; #elif defined(GRAN) -//@node GranSim code, , GUM code, Spark Management Routines -//@subsection GranSim code - -//@menu -//* Basic interface to sparkq:: -//* Aux fcts:: -//@end menu - -//@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code -//@subsubsection Basic interface to sparkq /* Search the spark queue of the proc in event for a spark that's worth turning into a thread (was gimme_spark in the old RTS) */ -//@cindex findLocalSpark void findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) { @@ -322,7 +348,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) if (!closure_SHOULD_SPARK(node)) { IF_GRAN_DEBUG(checkSparkQ, - belch("^^ pruning spark %p (node %p) in gimme_spark", + debugBelch("^^ pruning spark %p (node %p) in gimme_spark", spark, node)); if (RtsFlags.GranFlags.GranSimStats.Sparks) @@ -362,7 +388,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) # if defined(GRAN) && defined(GRAN_CHECK) /* Should never happen; just for testing if (spark==pending_sparks_tl) { - fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n"); + debugBelch("ReSchedule: Last spark != SparkQueueTl\n"); stg_exit(EXIT_FAILURE); } */ # endif @@ -400,7 +426,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) /* Should never happen; just for testing if (spark==pending_sparks_tl) { - fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n"); + debugBelch("ReSchedule: Last spark != SparkQueueTl\n"); stg_exit(EXIT_FAILURE); break; } */ @@ -408,7 +434,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) spark = spark->next; IF_GRAN_DEBUG(pri, - belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", + debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", spark->gran_info, RtsFlags.GranFlags.SparkPriority, spark->node, spark->name);) } @@ -425,7 +451,6 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) node pointed to by the spark at some point in the future. (was munch_spark in the old RTS) */ -//@cindex activateSpark rtsBool activateSpark (rtsEvent *event, rtsSparkQ spark) { @@ -471,7 +496,7 @@ activateSpark (rtsEvent *event, rtsSparkQ spark) globalGranStats.tot_low_pri_sparks++; IF_GRAN_DEBUG(pri, - belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n", + debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n", spark->gran_info, spark->node, spark->name)); } @@ -526,13 +551,12 @@ activateSpark (rtsEvent *event, rtsSparkQ spark) Granularity info transformers. Applied to the GRAN_INFO field of a spark. */ -static inline nat ID(nat x) { return(x); }; -static inline nat INV(nat x) { return(-x); }; -static inline nat IGNORE(nat x) { return (0); }; -static inline nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); } +STATIC_INLINE nat ID(nat x) { return(x); }; +STATIC_INLINE nat INV(nat x) { return(-x); }; +STATIC_INLINE nat IGNORE(nat x) { return (0); }; +STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); } /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */ -//@cindex newSpark rtsSpark * newSpark(node,name,gran_info,size_info,par_info,local) StgClosure *node; @@ -549,7 +573,7 @@ nat name, gran_info, size_info, par_info, local; if ( RtsFlags.GranFlags.SparkPriority!=0 && prinode, CurrentProc); print_sparkq_stats()); @@ -677,7 +698,7 @@ rtsSpark *spark; prev = next, next = next->next) {} if ( (prev!=NULL) && (prev!=pending_sparks_tl) ) - fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", + debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", spark,CurrentProc, pending_sparks_tl, prev); } @@ -703,7 +724,7 @@ rtsSpark *spark; } } if (!sorted) { - fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n", + debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n", CurrentProc); print_sparkq(CurrentProc); } @@ -711,10 +732,6 @@ rtsSpark *spark; # endif } -//@node Aux fcts, , Basic interface to sparkq, GranSim code -//@subsubsection Aux fcts - -//@cindex spark_queue_len nat spark_queue_len(proc) PEs proc; @@ -730,7 +747,7 @@ PEs proc; # if defined(GRAN_CHECK) if ( RtsFlags.GranFlags.Debug.checkSparkQ ) if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) ) - fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", + debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", proc, pending_sparks_tls[proc], prev); # endif @@ -742,7 +759,6 @@ PEs proc; hd and tl pointers of the spark queue. Returns a pointer to the next spark in the queue. */ -//@cindex delete_from_sparkq rtsSpark * delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */ rtsSpark *spark; @@ -756,7 +772,7 @@ rtsBool dispose_too; # if defined(GRAN_CHECK) if ( RtsFlags.GranFlags.Debug.checkSparkQ ) { - fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n", + debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n", pending_sparks_hd, pending_sparks_tl, spark->prev, spark, spark->next, (spark->next==NULL ? 0 : spark->next->prev)); @@ -781,7 +797,7 @@ rtsBool dispose_too; # if defined(GRAN_CHECK) if ( RtsFlags.GranFlags.Debug.checkSparkQ ) { - fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n", + debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n", pending_sparks_hd, pending_sparks_tl, spark->prev, spark, spark->next, (spark->next==NULL ? 0 : spark->next->prev), spark); @@ -795,7 +811,6 @@ rtsBool dispose_too; } /* Mark all nodes pointed to by sparks in the spark queues (for GC) */ -//@cindex markSparkQueue void markSparkQueue(void) { @@ -811,11 +826,10 @@ markSparkQueue(void) sp->node = (StgClosure *)MarkRoot(sp->node); } IF_DEBUG(gc, - belch("@@ markSparkQueue: spark statistics at start of GC:"); + debugBelch("@@ markSparkQueue: spark statistics at start of GC:"); print_sparkq_stats()); } -//@cindex print_spark void print_spark(spark) rtsSpark *spark; @@ -823,21 +837,20 @@ rtsSpark *spark; char str[16]; if (spark==NULL) { - fprintf(stderr,"Spark: NIL\n"); + debugBelch("Spark: NIL\n"); return; } else { sprintf(str, ((spark->node==NULL) ? "______" : "%#6lx"), stgCast(StgPtr,spark->node)); - fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n", + debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n", str, spark->name, ((spark->global)==rtsTrue?"True":"False"), spark->creator, spark->prev, spark->next); } } -//@cindex print_sparkq void print_sparkq(proc) PEs proc; @@ -845,7 +858,7 @@ PEs proc; { rtsSpark *x = pending_sparks_hds[proc]; - fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x); + debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x); for (; x!=(rtsSpark*)NULL; x=x->next) { print_spark(x); } @@ -854,16 +867,15 @@ PEs proc; /* Print a statistics of all spark queues. */ -//@cindex print_sparkq_stats void print_sparkq_stats(void) { PEs p; - fprintf(stderr, "SparkQs: ["); + debugBelch("SparkQs: ["); for (p=0; p