X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSparks.c;h=98d710280b14246491c76f3def5e6ed3f39e8ce1;hb=3eacdc7faf0d0e87a7201253f9f12c1fb4db7249;hp=4a9bf005fd09c61b190983b6a77bfb7384ca2539;hpb=dd4c28a9c706cce09ecc2c6f532969efa925532f;p=ghc-hetmet.git diff --git a/ghc/rts/Sparks.c b/ghc/rts/Sparks.c index 4a9bf00..98d7102 100644 --- a/ghc/rts/Sparks.c +++ b/ghc/rts/Sparks.c @@ -1,95 +1,247 @@ /* --------------------------------------------------------------------------- - * $Id: Sparks.c,v 1.2 2000/03/31 03:09:36 hwloidl Exp $ * - * (c) The GHC Team, 2000 + * (c) The GHC Team, 2000-2006 * - * Sparking support for PAR and SMP versions of the RTS. + * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS. * * -------------------------------------------------------------------------*/ -//@node Spark Management Routines, , , -//@section Spark Management Routines - -//@menu -//* Includes:: -//* GUM code:: -//* GranSim code:: -//@end menu - -//@node Includes, GUM code, Spark Management Routines, Spark Management Routines -//@subsection Includes - +#include "PosixSource.h" #include "Rts.h" #include "Schedule.h" #include "SchedAPI.h" #include "Storage.h" #include "RtsFlags.h" #include "RtsUtils.h" -# if defined(PAR) +#include "ParTicky.h" +# if defined(PARALLEL_HASKELL) # include "ParallelRts.h" +# include "GranSimRts.h" // for GR_... # elif defined(GRAN) # include "GranSimRts.h" # endif #include "Sparks.h" -#if defined(SMP) || defined(PAR) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) -//@node GUM code, GranSim code, Includes, Spark Management Routines -//@subsection GUM code +static INLINE_ME void bump_hd (StgSparkPool *p) +{ p->hd++; if (p->hd == p->lim) p->hd = p->base; } -static void slide_spark_pool( StgSparkPool *pool ); +static INLINE_ME void bump_tl (StgSparkPool *p) +{ p->tl++; if (p->tl == p->lim) p->tl = p->base; } -void -initSparkPools( void ) -{ - Capability *cap; - StgSparkPool *pool; +/* ----------------------------------------------------------------------------- + * + * Initialising spark pools. + * + * -------------------------------------------------------------------------- */ -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { -#else - /* allocate a single spark pool */ - cap = &MainRegTable; - { -#endif - pool = &(cap->rSparks); - +static void +initSparkPool(StgSparkPool *pool) +{ pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks - * sizeof(StgClosure *), - "initSparkPools"); + * sizeof(StgClosure *), + "initSparkPools"); pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks; pool->hd = pool->base; pool->tl = pool->base; - } } +void +initSparkPools( void ) +{ +#ifdef THREADED_RTS + /* walk over the capabilities, allocating a spark pool for each one */ + nat i; + for (i = 0; i < n_capabilities; i++) { + initSparkPool(&capabilities[i].r.rSparks); + } +#else + /* allocate a single spark pool */ + initSparkPool(&MainCapability.r.rSparks); +#endif +} + +/* ----------------------------------------------------------------------------- + * + * findSpark: find a spark on the current Capability that we can fork + * into a thread. + * + * -------------------------------------------------------------------------- */ + StgClosure * -findSpark( void ) +findSpark (Capability *cap) { - Capability *cap; - StgSparkPool *pool; - StgClosure *spark; + StgSparkPool *pool; + StgClosure *spark; + + pool = &(cap->r.rSparks); + ASSERT_SPARK_POOL_INVARIANTS(pool); + + while (pool->hd != pool->tl) { + spark = *pool->hd; + bump_hd(pool); + if (closure_SHOULD_SPARK(spark)) { +#ifdef GRAN + if (RtsFlags.ParFlags.ParStats.Sparks) + DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, + GR_STEALING, ((StgTSO *)NULL), spark, + 0, 0 /* spark_queue_len(ADVISORY_POOL) */); +#endif + return spark; + } + } + // spark pool is now empty + return NULL; +} -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { +/* ----------------------------------------------------------------------------- + * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an + * implicit slide i.e. after marking all sparks are at the beginning of the + * spark pool and the spark pool only contains sparkable closures + * -------------------------------------------------------------------------- */ + +void +markSparkQueue (evac_fn evac) +{ + StgClosure **sparkp, **to_sparkp; + nat i, n, pruned_sparks; // stats only + StgSparkPool *pool; + Capability *cap; + + PAR_TICKY_MARK_SPARK_QUEUE_START(); + + n = 0; + pruned_sparks = 0; + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + pool = &(cap->r.rSparks); + + ASSERT_SPARK_POOL_INVARIANTS(pool); + +#if defined(PARALLEL_HASKELL) + // stats only + n = 0; + pruned_sparks = 0; +#endif + + sparkp = pool->hd; + to_sparkp = pool->hd; + while (sparkp != pool->tl) { + ASSERT(to_sparkp<=sparkp); + ASSERT(*sparkp!=NULL); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp))); + // ToDo?: statistics gathering here (also for GUM!) + if (closure_SHOULD_SPARK(*sparkp)) { + evac(sparkp); + *to_sparkp++ = *sparkp; + n++; + } else { + pruned_sparks++; + } + sparkp++; + if (sparkp == pool->lim) { + sparkp = pool->base; + } + } + pool->tl = to_sparkp; + + PAR_TICKY_MARK_SPARK_QUEUE_END(n); + +#if defined(PARALLEL_HASKELL) + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", + n, pruned_sparks, mytid)); #else - /* allocate a single spark pool */ - cap = &MainRegTable; - { + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n", + n, pruned_sparks)); #endif - pool = &(cap->rSparks); - while (pool->hd < pool->tl) { - spark = *pool->hd++; - if (closure_SHOULD_SPARK(spark)) - return spark; + + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)\n", + sparkPoolSize(pool), pool->hd, pool->tl)); + } - slide_spark_pool(pool); - } - return NULL; } +/* ----------------------------------------------------------------------------- + * + * Turn a spark into a real thread + * + * -------------------------------------------------------------------------- */ + +void +createSparkThread (Capability *cap, StgClosure *p) +{ + StgTSO *tso; + + tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p); + appendToRunQueue(cap,tso); +} + +/* ----------------------------------------------------------------------------- + * + * Create a new spark + * + * -------------------------------------------------------------------------- */ + +#define DISCARD_NEW + +StgInt +newSpark (StgRegTable *reg, StgClosure *p) +{ + StgSparkPool *pool = &(reg->rSparks); + + ASSERT_SPARK_POOL_INVARIANTS(pool); + + if (closure_SHOULD_SPARK(p)) { +#ifdef DISCARD_NEW + StgClosure **new_tl; + new_tl = pool->tl + 1; + if (new_tl == pool->lim) { new_tl = pool->base; } + if (new_tl != pool->hd) { + *pool->tl = p; + pool->tl = new_tl; + } else if (!closure_SHOULD_SPARK(*pool->hd)) { + // if the old closure is not sparkable, discard it and + // keep the new one. Otherwise, keep the old one. + *pool->tl = p; + bump_hd(pool); + } +#else /* DISCARD OLD */ + *pool->tl = p; + bump_tl(pool); + if (pool->tl == pool->hd) { bump_hd(pool); } +#endif + } + + ASSERT_SPARK_POOL_INVARIANTS(pool); + return 1; +} + +#else + +StgInt +newSpark (StgRegTable *reg, StgClosure *p) +{ + /* nothing */ + return 1; +} + +#endif /* PARALLEL_HASKELL || THREADED_RTS */ + + +/* ----------------------------------------------------------------------------- + * + * GRAN & PARALLEL_HASKELL stuff beyond here. + * + * -------------------------------------------------------------------------- */ + +#if defined(PARALLEL_HASKELL) || defined(GRAN) + +static void slide_spark_pool( StgSparkPool *pool ); + rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ) { @@ -99,8 +251,25 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ) if (closure_SHOULD_SPARK(closure) && pool->tl < pool->lim) { *(pool->tl++) = closure; + +#if defined(PARALLEL_HASKELL) + // collect parallel global statistics (currently done together with GC stats) + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime()); + globalParStats.tot_sparks_created++; + } +#endif return rtsTrue; } else { +#if defined(PARALLEL_HASKELL) + // collect parallel global statistics (currently done together with GC stats) + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime()); + globalParStats.tot_sparks_ignored++; + } +#endif return rtsFalse; } } @@ -127,90 +296,11 @@ slide_spark_pool( StgSparkPool *pool ) pool->tl = to_sparkp; } -nat -spark_queue_len( StgSparkPool *pool ) -{ - return (nat) (pool->tl - pool->hd); -} - -/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an - implicit slide i.e. after marking all sparks are at the beginning of the - spark pool and the spark pool only contains sparkable closures -*/ -void -markSparkQueue( void ) -{ - StgClosure **sparkp, **to_sparkp; -#ifdef DEBUG - nat n, pruned_sparks; -#endif - StgSparkPool *pool; - Capability *cap; - -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { -#else - /* allocate a single spark pool */ - cap = &MainRegTable; - { -#endif - pool = &(cap->rSparks); - -#ifdef DEBUG - n = 0; - pruned_sparks = 0; -#endif - - sparkp = pool->hd; - to_sparkp = pool->base; - while (sparkp < pool->tl) { - ASSERT(to_sparkp<=sparkp); - ASSERT(*sparkp!=NULL); - ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info)); - // ToDo?: statistics gathering here (also for GUM!) - if (closure_SHOULD_SPARK(*sparkp)) { - *to_sparkp = MarkRoot(*sparkp); - to_sparkp++; -#ifdef DEBUG - n++; -#endif - } else { -#ifdef DEBUG - pruned_sparks++; -#endif - } - sparkp++; - } - pool->hd = pool->base; - pool->tl = to_sparkp; - -#if defined(SMP) - IF_DEBUG(scheduler, - belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", - n, pruned_sparks, pthread_self())); -#elif defined(PAR) - IF_DEBUG(scheduler, - belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", - n, pruned_sparks, mytid)); -#else - IF_DEBUG(scheduler, - belch("markSparkQueue: marked %d sparks and pruned %d sparks", - n, pruned_sparks)); -#endif - - IF_DEBUG(scheduler, - belch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)", - spark_queue_len(pool), pool->hd, pool->tl)); - - } -} - void disposeSpark(spark) StgClosure *spark; { -#if !defined(SMP) +#if !defined(THREADED_RTS) Capability *cap; StgSparkPool *pool; @@ -225,22 +315,11 @@ StgClosure *spark; #elif defined(GRAN) -//@node GranSim code, , GUM code, Spark Management Routines -//@subsection GranSim code - -//@menu -//* Basic interface to sparkq:: -//* Aux fcts:: -//@end menu - -//@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code -//@subsubsection Basic interface to sparkq /* Search the spark queue of the proc in event for a spark that's worth turning into a thread (was gimme_spark in the old RTS) */ -//@cindex findLocalSpark void findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) { @@ -269,7 +348,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) if (!closure_SHOULD_SPARK(node)) { IF_GRAN_DEBUG(checkSparkQ, - belch("^^ pruning spark %p (node %p) in gimme_spark", + debugBelch("^^ pruning spark %p (node %p) in gimme_spark", spark, node)); if (RtsFlags.GranFlags.GranSimStats.Sparks) @@ -309,7 +388,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) # if defined(GRAN) && defined(GRAN_CHECK) /* Should never happen; just for testing if (spark==pending_sparks_tl) { - fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n"); + debugBelch("ReSchedule: Last spark != SparkQueueTl\n"); stg_exit(EXIT_FAILURE); } */ # endif @@ -347,7 +426,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) /* Should never happen; just for testing if (spark==pending_sparks_tl) { - fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n"); + debugBelch("ReSchedule: Last spark != SparkQueueTl\n"); stg_exit(EXIT_FAILURE); break; } */ @@ -355,7 +434,7 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) spark = spark->next; IF_GRAN_DEBUG(pri, - belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", + debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n", spark->gran_info, RtsFlags.GranFlags.SparkPriority, spark->node, spark->name);) } @@ -372,7 +451,6 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) node pointed to by the spark at some point in the future. (was munch_spark in the old RTS) */ -//@cindex activateSpark rtsBool activateSpark (rtsEvent *event, rtsSparkQ spark) { @@ -418,9 +496,9 @@ activateSpark (rtsEvent *event, rtsSparkQ spark) globalGranStats.tot_low_pri_sparks++; IF_GRAN_DEBUG(pri, - belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n", + debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n", spark->gran_info, - spark->node, spark->name);) + spark->node, spark->name)); } CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime; @@ -441,7 +519,7 @@ activateSpark (rtsEvent *event, rtsSparkQ spark) FindWork, (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL); barf("//// activateSpark: out of heap ; ToDo: call GarbageCollect()"); - GarbageCollect(GetRoots); + GarbageCollect(GetRoots, rtsFalse); // HWL old: ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse); // HWL old: SAVE_Hp -= TSO_HS+TSO_CTS_SIZE; spark = NULL; @@ -473,13 +551,12 @@ activateSpark (rtsEvent *event, rtsSparkQ spark) Granularity info transformers. Applied to the GRAN_INFO field of a spark. */ -static inline nat ID(nat x) { return(x); }; -static inline nat INV(nat x) { return(-x); }; -static inline nat IGNORE(nat x) { return (0); }; -static inline nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); } +STATIC_INLINE nat ID(nat x) { return(x); }; +STATIC_INLINE nat INV(nat x) { return(-x); }; +STATIC_INLINE nat IGNORE(nat x) { return (0); }; +STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); } /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */ -//@cindex newSpark rtsSpark * newSpark(node,name,gran_info,size_info,par_info,local) StgClosure *node; @@ -496,7 +573,7 @@ nat name, gran_info, size_info, par_info, local; if ( RtsFlags.GranFlags.SparkPriority!=0 && prinode, CurrentProc); print_sparkq_stats()); @@ -624,7 +698,7 @@ rtsSpark *spark; prev = next, next = next->next) {} if ( (prev!=NULL) && (prev!=pending_sparks_tl) ) - fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", + debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", spark,CurrentProc, pending_sparks_tl, prev); } @@ -650,7 +724,7 @@ rtsSpark *spark; } } if (!sorted) { - fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n", + debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n", CurrentProc); print_sparkq(CurrentProc); } @@ -658,10 +732,6 @@ rtsSpark *spark; # endif } -//@node Aux fcts, , Basic interface to sparkq, GranSim code -//@subsubsection Aux fcts - -//@cindex spark_queue_len nat spark_queue_len(proc) PEs proc; @@ -677,7 +747,7 @@ PEs proc; # if defined(GRAN_CHECK) if ( RtsFlags.GranFlags.Debug.checkSparkQ ) if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) ) - fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", + debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n", proc, pending_sparks_tls[proc], prev); # endif @@ -689,7 +759,6 @@ PEs proc; hd and tl pointers of the spark queue. Returns a pointer to the next spark in the queue. */ -//@cindex delete_from_sparkq rtsSpark * delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */ rtsSpark *spark; @@ -703,7 +772,7 @@ rtsBool dispose_too; # if defined(GRAN_CHECK) if ( RtsFlags.GranFlags.Debug.checkSparkQ ) { - fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n", + debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n", pending_sparks_hd, pending_sparks_tl, spark->prev, spark, spark->next, (spark->next==NULL ? 0 : spark->next->prev)); @@ -728,7 +797,7 @@ rtsBool dispose_too; # if defined(GRAN_CHECK) if ( RtsFlags.GranFlags.Debug.checkSparkQ ) { - fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n", + debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n", pending_sparks_hd, pending_sparks_tl, spark->prev, spark, spark->next, (spark->next==NULL ? 0 : spark->next->prev), spark); @@ -742,7 +811,6 @@ rtsBool dispose_too; } /* Mark all nodes pointed to by sparks in the spark queues (for GC) */ -//@cindex markSparkQueue void markSparkQueue(void) { @@ -758,11 +826,10 @@ markSparkQueue(void) sp->node = (StgClosure *)MarkRoot(sp->node); } IF_DEBUG(gc, - belch("@@ markSparkQueue: spark statistics at start of GC:"); + debugBelch("@@ markSparkQueue: spark statistics at start of GC:"); print_sparkq_stats()); } -//@cindex print_spark void print_spark(spark) rtsSpark *spark; @@ -770,21 +837,20 @@ rtsSpark *spark; char str[16]; if (spark==NULL) { - fprintf(stderr,"Spark: NIL\n"); + debugBelch("Spark: NIL\n"); return; } else { sprintf(str, ((spark->node==NULL) ? "______" : "%#6lx"), stgCast(StgPtr,spark->node)); - fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n", + debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n", str, spark->name, ((spark->global)==rtsTrue?"True":"False"), spark->creator, spark->prev, spark->next); } } -//@cindex print_sparkq void print_sparkq(proc) PEs proc; @@ -792,7 +858,7 @@ PEs proc; { rtsSpark *x = pending_sparks_hds[proc]; - fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x); + debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x); for (; x!=(rtsSpark*)NULL; x=x->next) { print_spark(x); } @@ -801,16 +867,15 @@ PEs proc; /* Print a statistics of all spark queues. */ -//@cindex print_sparkq_stats void print_sparkq_stats(void) { PEs p; - fprintf(stderr, "SparkQs: ["); + debugBelch("SparkQs: ["); for (p=0; p