/* ---------------------------------------------------------------------------
- * $Id: Sparks.c,v 1.6 2003/03/25 17:58:50 sof Exp $
*
- * (c) The GHC Team, 2000
+ * (c) The GHC Team, 2000-2006
*
- * Sparking support for PAR and SMP versions of the RTS.
+ * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
*
* -------------------------------------------------------------------------*/
-//@node Spark Management Routines, , ,
-//@section Spark Management Routines
-
-//@menu
-//* Includes::
-//* GUM code::
-//* GranSim code::
-//@end menu
-//*/
-
-//@node Includes, GUM code, Spark Management Routines, Spark Management Routines
-//@subsection Includes
-
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "ParTicky.h"
-# if defined(PAR)
+# if defined(PARALLEL_HASKELL)
# include "ParallelRts.h"
# include "GranSimRts.h" // for GR_...
# elif defined(GRAN)
# endif
#include "Sparks.h"
-#if /*defined(SMP) ||*/ defined(PAR)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-//@node GUM code, GranSim code, Includes, Spark Management Routines
-//@subsection GUM code
+static INLINE_ME void bump_hd (StgSparkPool *p)
+{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
-static void slide_spark_pool( StgSparkPool *pool );
+static INLINE_ME void bump_tl (StgSparkPool *p)
+{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
-rtsBool
-initSparkPools( void )
-{
- Capability *cap;
- StgSparkPool *pool;
+/* -----------------------------------------------------------------------------
+ *
+ * Initialising spark pools.
+ *
+ * -------------------------------------------------------------------------- */
-#ifdef SMP
- /* walk over the capabilities, allocating a spark pool for each one */
- for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
- /* allocate a single spark pool */
- cap = &MainRegTable;
- {
-#endif
- pool = &(cap->rSparks);
-
+static void
+initSparkPool(StgSparkPool *pool)
+{
pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
- * sizeof(StgClosure *),
- "initSparkPools");
+ * sizeof(StgClosure *),
+ "initSparkPools");
pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
pool->hd = pool->base;
pool->tl = pool->base;
- }
- return rtsTrue; /* Qapla' */
}
-/*
- We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
- spark. Rationale, we don't want to give away the only work a PE has.
- ToDo: introduce low- and high-water-marks for load balancing.
-*/
-StgClosure *
-findSpark( rtsBool for_export )
+void
+initSparkPools( void )
{
- Capability *cap;
- StgSparkPool *pool;
- StgClosure *spark, *first=NULL;
- rtsBool isIdlePE = EMPTY_RUN_QUEUE();
-
-#ifdef SMP
- /* walk over the capabilities, allocating a spark pool for each one */
- for (cap = free_capabilities; cap != NULL; cap = cap->link) {
+#ifdef THREADED_RTS
+ /* walk over the capabilities, allocating a spark pool for each one */
+ nat i;
+ for (i = 0; i < n_capabilities; i++) {
+ initSparkPool(&capabilities[i].r.rSparks);
+ }
#else
- /* allocate a single spark pool */
- cap = &MainRegTable;
- {
+ /* allocate a single spark pool */
+ initSparkPool(&MainCapability.r.rSparks);
#endif
- pool = &(cap->rSparks);
- while (pool->hd < pool->tl) {
- spark = *pool->hd++;
- if (closure_SHOULD_SPARK(spark)) {
- if (for_export && isIdlePE) {
- if (first==NULL) {
- first = spark; // keep the first usable spark if PE is idle
- } else {
- pool->hd--; // found a second spark; keep it in the pool
- ASSERT(*pool->hd==spark);
+}
+
+/* -----------------------------------------------------------------------------
+ *
+ * findSpark: find a spark on the current Capability that we can fork
+ * into a thread.
+ *
+ * -------------------------------------------------------------------------- */
+
+StgClosure *
+findSpark (Capability *cap)
+{
+ StgSparkPool *pool;
+ StgClosure *spark;
+
+ pool = &(cap->r.rSparks);
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+ while (pool->hd != pool->tl) {
+ spark = *pool->hd;
+ bump_hd(pool);
+ if (closure_SHOULD_SPARK(spark)) {
+#ifdef GRAN
if (RtsFlags.ParFlags.ParStats.Sparks)
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_STEALING, ((StgTSO *)NULL), first,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- return first; // and return the *first* spark found
- }
- } else {
- if (RtsFlags.ParFlags.ParStats.Sparks && for_export)
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_STEALING, ((StgTSO *)NULL), spark,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- return spark; // return first spark found
+ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
+ GR_STEALING, ((StgTSO *)NULL), spark,
+ 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+#endif
+ return spark;
}
- }
}
- slide_spark_pool(pool);
- }
- return NULL;
+ // spark pool is now empty
+ return NULL;
}
-/*
- activateSpark is defined in Schedule.c
-*/
+/* -----------------------------------------------------------------------------
+ * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
+ * implicit slide i.e. after marking all sparks are at the beginning of the
+ * spark pool and the spark pool only contains sparkable closures
+ * -------------------------------------------------------------------------- */
+
+void
+markSparkQueue (evac_fn evac)
+{
+ StgClosure **sparkp, **to_sparkp;
+ nat i, n, pruned_sparks; // stats only
+ StgSparkPool *pool;
+ Capability *cap;
+
+ PAR_TICKY_MARK_SPARK_QUEUE_START();
+
+ n = 0;
+ pruned_sparks = 0;
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ pool = &(cap->r.rSparks);
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+#if defined(PARALLEL_HASKELL)
+ // stats only
+ n = 0;
+ pruned_sparks = 0;
+#endif
+
+ sparkp = pool->hd;
+ to_sparkp = pool->hd;
+ while (sparkp != pool->tl) {
+ ASSERT(to_sparkp<=sparkp);
+ ASSERT(*sparkp!=NULL);
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
+ // ToDo?: statistics gathering here (also for GUM!)
+ if (closure_SHOULD_SPARK(*sparkp)) {
+ evac(sparkp);
+ *to_sparkp++ = *sparkp;
+ n++;
+ } else {
+ pruned_sparks++;
+ }
+ sparkp++;
+ if (sparkp == pool->lim) {
+ sparkp = pool->base;
+ }
+ }
+ pool->tl = to_sparkp;
+
+ PAR_TICKY_MARK_SPARK_QUEUE_END(n);
+
+#if defined(PARALLEL_HASKELL)
+ IF_DEBUG(scheduler,
+ debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+ n, pruned_sparks, mytid));
+#else
+ IF_DEBUG(scheduler,
+ debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n",
+ n, pruned_sparks));
+#endif
+
+ IF_DEBUG(scheduler,
+ debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)\n",
+ sparkPoolSize(pool), pool->hd, pool->tl));
+
+ }
+}
+
+/* -----------------------------------------------------------------------------
+ *
+ * Turn a spark into a real thread
+ *
+ * -------------------------------------------------------------------------- */
+
+void
+createSparkThread (Capability *cap, StgClosure *p)
+{
+ StgTSO *tso;
+
+ tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
+ appendToRunQueue(cap,tso);
+}
+
+/* -----------------------------------------------------------------------------
+ *
+ * Create a new spark
+ *
+ * -------------------------------------------------------------------------- */
+
+#define DISCARD_NEW
+
+StgInt
+newSpark (StgRegTable *reg, StgClosure *p)
+{
+ StgSparkPool *pool = &(reg->rSparks);
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+ if (closure_SHOULD_SPARK(p)) {
+#ifdef DISCARD_NEW
+ StgClosure **new_tl;
+ new_tl = pool->tl + 1;
+ if (new_tl == pool->lim) { new_tl = pool->base; }
+ if (new_tl != pool->hd) {
+ *pool->tl = p;
+ pool->tl = new_tl;
+ } else if (!closure_SHOULD_SPARK(*pool->hd)) {
+ // if the old closure is not sparkable, discard it and
+ // keep the new one. Otherwise, keep the old one.
+ *pool->tl = p;
+ bump_hd(pool);
+ }
+#else /* DISCARD OLD */
+ *pool->tl = p;
+ bump_tl(pool);
+ if (pool->tl == pool->hd) { bump_hd(pool); }
+#endif
+ }
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+ return 1;
+}
+
+#else
+
+StgInt
+newSpark (StgRegTable *reg, StgClosure *p)
+{
+ /* nothing */
+ return 1;
+}
+
+#endif /* PARALLEL_HASKELL || THREADED_RTS */
+
+
+/* -----------------------------------------------------------------------------
+ *
+ * GRAN & PARALLEL_HASKELL stuff beyond here.
+ *
+ * -------------------------------------------------------------------------- */
+
+#if defined(PARALLEL_HASKELL) || defined(GRAN)
+
+static void slide_spark_pool( StgSparkPool *pool );
+
rtsBool
add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
{
pool->tl < pool->lim) {
*(pool->tl++) = closure;
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
// collect parallel global statistics (currently done together with GC stats)
if (RtsFlags.ParFlags.ParStats.Global &&
RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
- // fprintf(stderr, "Creating spark for %x @ %11.2f\n", closure, usertime());
+ // debugBelch("Creating spark for %x @ %11.2f\n", closure, usertime());
globalParStats.tot_sparks_created++;
}
#endif
return rtsTrue;
} else {
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
// collect parallel global statistics (currently done together with GC stats)
if (RtsFlags.ParFlags.ParStats.Global &&
RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
- //fprintf(stderr, "Ignoring spark for %x @ %11.2f\n", closure, usertime());
+ //debugBelch("Ignoring spark for %x @ %11.2f\n", closure, usertime());
globalParStats.tot_sparks_ignored++;
}
#endif
pool->tl = to_sparkp;
}
-nat
-spark_queue_len( StgSparkPool *pool )
-{
- return (nat) (pool->tl - pool->hd);
-}
-
-/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
- implicit slide i.e. after marking all sparks are at the beginning of the
- spark pool and the spark pool only contains sparkable closures
-*/
-void
-markSparkQueue( void )
-{
- StgClosure **sparkp, **to_sparkp;
- nat n, pruned_sparks; // stats only
- StgSparkPool *pool;
- Capability *cap;
-
- PAR_TICKY_MARK_SPARK_QUEUE_START();
-
-#ifdef SMP
- /* walk over the capabilities, allocating a spark pool for each one */
- for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
- /* allocate a single spark pool */
- cap = &MainRegTable;
- {
-#endif
- pool = &(cap->rSparks);
-
-#if defined(PAR)
- // stats only
- n = 0;
- pruned_sparks = 0;
-#endif
-
- sparkp = pool->hd;
- to_sparkp = pool->base;
- while (sparkp < pool->tl) {
- ASSERT(to_sparkp<=sparkp);
- ASSERT(*sparkp!=NULL);
- ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
- // ToDo?: statistics gathering here (also for GUM!)
- if (closure_SHOULD_SPARK(*sparkp)) {
- *to_sparkp = MarkRoot(*sparkp);
- to_sparkp++;
-#ifdef PAR
- n++;
-#endif
- } else {
-#ifdef PAR
- pruned_sparks++;
-#endif
- }
- sparkp++;
- }
- pool->hd = pool->base;
- pool->tl = to_sparkp;
-
- PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-
-#if defined(SMP)
- IF_DEBUG(scheduler,
- belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
- n, pruned_sparks, pthread_self()));
-#elif defined(PAR)
- IF_DEBUG(scheduler,
- belch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
- n, pruned_sparks, mytid));
-#else
- IF_DEBUG(scheduler,
- belch("markSparkQueue: marked %d sparks and pruned %d sparks",
- n, pruned_sparks));
-#endif
-
- IF_DEBUG(scheduler,
- belch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)",
- spark_queue_len(pool), pool->hd, pool->tl));
-
- }
-}
-
void
disposeSpark(spark)
StgClosure *spark;
{
-#if !defined(SMP)
+#if !defined(THREADED_RTS)
Capability *cap;
StgSparkPool *pool;
#elif defined(GRAN)
-//@node GranSim code, , GUM code, Spark Management Routines
-//@subsection GranSim code
-
-//@menu
-//* Basic interface to sparkq::
-//* Aux fcts::
-//@end menu
-
-//@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
-//@subsubsection Basic interface to sparkq
/*
Search the spark queue of the proc in event for a spark that's worth
turning into a thread
(was gimme_spark in the old RTS)
*/
-//@cindex findLocalSpark
void
findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
{
if (!closure_SHOULD_SPARK(node))
{
IF_GRAN_DEBUG(checkSparkQ,
- belch("^^ pruning spark %p (node %p) in gimme_spark",
+ debugBelch("^^ pruning spark %p (node %p) in gimme_spark",
spark, node));
if (RtsFlags.GranFlags.GranSimStats.Sparks)
# if defined(GRAN) && defined(GRAN_CHECK)
/* Should never happen; just for testing
if (spark==pending_sparks_tl) {
- fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
+ debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
stg_exit(EXIT_FAILURE);
} */
# endif
/* Should never happen; just for testing
if (spark==pending_sparks_tl) {
- fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n");
+ debugBelch("ReSchedule: Last spark != SparkQueueTl\n");
stg_exit(EXIT_FAILURE);
break;
} */
spark = spark->next;
IF_GRAN_DEBUG(pri,
- belch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
+ debugBelch("++ Ignoring spark of priority %u (SparkPriority=%u); node=%p; name=%u\n",
spark->gran_info, RtsFlags.GranFlags.SparkPriority,
spark->node, spark->name);)
}
node pointed to by the spark at some point in the future.
(was munch_spark in the old RTS)
*/
-//@cindex activateSpark
rtsBool
activateSpark (rtsEvent *event, rtsSparkQ spark)
{
globalGranStats.tot_low_pri_sparks++;
IF_GRAN_DEBUG(pri,
- belch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
+ debugBelch("++ No high priority spark available; low priority (%u) spark chosen: node=%p; name=%u\n",
spark->gran_info,
spark->node, spark->name));
}
Granularity info transformers.
Applied to the GRAN_INFO field of a spark.
*/
-static inline nat ID(nat x) { return(x); };
-static inline nat INV(nat x) { return(-x); };
-static inline nat IGNORE(nat x) { return (0); };
-static inline nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
+STATIC_INLINE nat ID(nat x) { return(x); };
+STATIC_INLINE nat INV(nat x) { return(-x); };
+STATIC_INLINE nat IGNORE(nat x) { return (0); };
+STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
/* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
-//@cindex newSpark
rtsSpark *
newSpark(node,name,gran_info,size_info,par_info,local)
StgClosure *node;
if ( RtsFlags.GranFlags.SparkPriority!=0 &&
pri<RtsFlags.GranFlags.SparkPriority ) {
IF_GRAN_DEBUG(pri,
- belch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
+ debugBelch(",, NewSpark: Ignoring spark of priority %u (SparkPriority=%u); node=%#x; name=%u\n",
pri, RtsFlags.GranFlags.SparkPriority, node, name));
return ((rtsSpark*)NULL);
}
return(newspark);
}
-//@cindex disposeSpark
void
disposeSpark(spark)
rtsSpark *spark;
stgFree(spark);
}
-//@cindex disposeSparkQ
void
disposeSparkQ(spark)
rtsSparkQ spark;
# ifdef GRAN_CHECK
if (SparksAvail < 0) {
- fprintf(stderr,"disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
+ debugBelch("disposeSparkQ: SparksAvail<0 after disposing sparkq @ %p\n", &spark);
print_spark(spark);
}
# endif
the queue.
*/
-//@cindex add_to_spark_queue
void
add_to_spark_queue(spark)
rtsSpark *spark;
}
IF_GRAN_DEBUG(checkSparkQ,
- belch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
+ debugBelch("++ Spark stats after adding spark %p (node %p) to queue on PE %d",
spark, spark->node, CurrentProc);
print_sparkq_stats());
prev = next, next = next->next)
{}
if ( (prev!=NULL) && (prev!=pending_sparks_tl) )
- fprintf(stderr,"SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
+ debugBelch("SparkQ inconsistency after adding spark %p: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
spark,CurrentProc,
pending_sparks_tl, prev);
}
}
}
if (!sorted) {
- fprintf(stderr,"ghuH: SPARKQ on PE %d is not sorted:\n",
+ debugBelch("ghuH: SPARKQ on PE %d is not sorted:\n",
CurrentProc);
print_sparkq(CurrentProc);
}
# endif
}
-//@node Aux fcts, , Basic interface to sparkq, GranSim code
-//@subsubsection Aux fcts
-
-//@cindex spark_queue_len
nat
spark_queue_len(proc)
PEs proc;
# if defined(GRAN_CHECK)
if ( RtsFlags.GranFlags.Debug.checkSparkQ )
if ( (prev!=NULL) && (prev!=pending_sparks_tls[proc]) )
- fprintf(stderr,"ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
+ debugBelch("ERROR in spark_queue_len: (PE %u) pending_sparks_tl (%p) not end of queue (%p)\n",
proc, pending_sparks_tls[proc], prev);
# endif
hd and tl pointers of the spark queue. Returns a pointer to the next
spark in the queue.
*/
-//@cindex delete_from_sparkq
rtsSpark *
delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
rtsSpark *spark;
# if defined(GRAN_CHECK)
if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
- fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
+ debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p)\n",
pending_sparks_hd, pending_sparks_tl,
spark->prev, spark, spark->next,
(spark->next==NULL ? 0 : spark->next->prev));
# if defined(GRAN_CHECK)
if ( RtsFlags.GranFlags.Debug.checkSparkQ ) {
- fprintf(stderr,"## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
+ debugBelch("## |%p:%p| (%p)<-spark=%p->(%p) <-(%p); spark=%p will be deleted NOW \n",
pending_sparks_hd, pending_sparks_tl,
spark->prev, spark, spark->next,
(spark->next==NULL ? 0 : spark->next->prev), spark);
}
/* Mark all nodes pointed to by sparks in the spark queues (for GC) */
-//@cindex markSparkQueue
void
markSparkQueue(void)
{
sp->node = (StgClosure *)MarkRoot(sp->node);
}
IF_DEBUG(gc,
- belch("@@ markSparkQueue: spark statistics at start of GC:");
+ debugBelch("@@ markSparkQueue: spark statistics at start of GC:");
print_sparkq_stats());
}
-//@cindex print_spark
void
print_spark(spark)
rtsSpark *spark;
char str[16];
if (spark==NULL) {
- fprintf(stderr,"Spark: NIL\n");
+ debugBelch("Spark: NIL\n");
return;
} else {
sprintf(str,
((spark->node==NULL) ? "______" : "%#6lx"),
stgCast(StgPtr,spark->node));
- fprintf(stderr,"Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
+ debugBelch("Spark: Node %8s, Name %#6x, Global %5s, Creator %5x, Prev %6p, Next %6p\n",
str, spark->name,
((spark->global)==rtsTrue?"True":"False"), spark->creator,
spark->prev, spark->next);
}
}
-//@cindex print_sparkq
void
print_sparkq(proc)
PEs proc;
{
rtsSpark *x = pending_sparks_hds[proc];
- fprintf(stderr,"Spark Queue of PE %d with root at %p:\n", proc, x);
+ debugBelch("Spark Queue of PE %d with root at %p:\n", proc, x);
for (; x!=(rtsSpark*)NULL; x=x->next) {
print_spark(x);
}
/*
Print a statistics of all spark queues.
*/
-//@cindex print_sparkq_stats
void
print_sparkq_stats(void)
{
PEs p;
- fprintf(stderr, "SparkQs: [");
+ debugBelch("SparkQs: [");
for (p=0; p<RtsFlags.GranFlags.proc; p++)
- fprintf(stderr, ", PE %d: %d", p, spark_queue_len(p));
- fprintf(stderr, "\n");
+ debugBelch(", PE %d: %d", p, spark_queue_len(p));
+ debugBelch("\n");
}
#endif