[project @ 2005-11-18 15:24:12 by simonmar]
authorsimonmar <unknown>
Fri, 18 Nov 2005 15:24:12 +0000 (15:24 +0000)
committersimonmar <unknown>
Fri, 18 Nov 2005 15:24:12 +0000 (15:24 +0000)
Two improvements to the SMP runtime:

  - support for 'par', aka sparks.  Load balancing is very primitive
    right now, but I have seen programs that go faster using par.

  - support for backing off when a thread is found to be duplicating
    a computation currently underway in another thread.  This also
    fixes some instability in SMP, because it turned out that when
    an update frame points to an indirection, which can happen if
    a thunk is under evaluation in multiple threads, then after GC
    has shorted out the indirection the update will trash the value.
    Now we suspend the duplicate computation to the heap before this
    can happen.

Additionally:

  - stack squeezing is separate from lazy blackholing, and now only
    happens if there's a reasonable amount of squeezing to be done
    in relation to the number of words of stack that have to be moved.
    This means we won't try to shift 10Mb of stack just to save 2
    words at the bottom (it probably never happened, but still).

  - update frames are now marked when they have been visited by lazy
    blackholing, as per the SMP paper.

  - cleaned up raiseAsync() a bit.

17 files changed:
ghc/includes/Closures.h
ghc/includes/Regs.h
ghc/includes/RtsFlags.h
ghc/includes/StgMiscClosures.h
ghc/includes/Storage.h
ghc/rts/Capability.c
ghc/rts/GC.c
ghc/rts/Interpreter.c
ghc/rts/RtsFlags.c
ghc/rts/Schedule.c
ghc/rts/Schedule.h
ghc/rts/Sparks.c
ghc/rts/Sparks.h
ghc/rts/StgCRun.c
ghc/rts/StgStartup.cmm
ghc/rts/Updates.cmm
ghc/rts/Updates.h

index f9bfeb4..df3bca3 100644 (file)
@@ -339,6 +339,11 @@ typedef struct {
  *   - In StgTRecHeader, it might be worthwhile having separate chunks
  *     of read-only and read-write locations.  This would save a
  *     new_value field in the read-only locations.
+ *
+ *   - In StgAtomicallyFrame, we could combine the waiting bit into
+ *     the header (maybe a different info tbl for a waiting transaction).
+ *     This means we can specialise the code for the atomically frame
+ *     (it immediately switches on frame->waiting anyway).
  */
 
 typedef struct StgTVarWaitQueue_ {
index f1b8597..def36c3 100644 (file)
 
 #include "gmp.h" // Needs MP_INT definition 
 
-/* 
- * This is the table that holds shadow-locations for all the STG
- * registers.  The shadow locations are used when:
- *
- *     1) the particular register isn't mapped to a real machine
- *        register, probably because there's a shortage of real registers.
- *     2) caller-saves registers are saved across a CCall
+/*
+ * Spark pools: used to store pending sparks (SMP & PARALLEL_HASKELL only)
+ * This is a circular buffer.  Invariants:
+ *    - base <= hd < lim
+ *    - base <= tl < lim
+ *    - if hd==tl, then the pool is empty.
+ *    - if hd == tl+1, then the pool is full.
+ * Adding to the pool is done by assigning to *tl++ (wrapping round as
+ * necessary).  When adding to a full pool, we have the option of
+ * throwing away either the oldest (hd++) or the most recent (tl--) entry.
  */
-
 typedef struct StgSparkPool_ {
   StgClosure **base;
   StgClosure **lim;
@@ -40,6 +42,12 @@ typedef struct StgSparkPool_ {
   StgClosure **tl;
 } StgSparkPool;
 
+#define ASSERT_SPARK_POOL_INVARIANTS(p)                \
+  ASSERT((p)->base <= (p)->hd);                        \
+  ASSERT((p)->hd < (p)->lim);                  \
+  ASSERT((p)->base <= (p)->tl);                        \
+  ASSERT((p)->tl < (p)->lim);
+
 typedef struct {
   StgFunPtr      stgGCEnter1;
   StgFunPtr      stgGCFun;
@@ -64,6 +72,14 @@ typedef union {
     StgTSOPtr      t;
 } StgUnion;
 
+/* 
+ * This is the table that holds shadow-locations for all the STG
+ * registers.  The shadow locations are used when:
+ *
+ *     1) the particular register isn't mapped to a real machine
+ *        register, probably because there's a shortage of real registers.
+ *     2) caller-saves registers are saved across a CCall
+ */
 typedef struct StgRegTable_ {
   StgUnion       rR1;
   StgUnion       rR2;
index a4c6d9b..4a37d48 100644 (file)
@@ -62,6 +62,7 @@ struct DEBUG_FLAGS {
     rtsBool linker;         /* 'l'   the object linker */
     rtsBool apply;          /* 'a' */
     rtsBool stm;            /* 'm' */
+    rtsBool squeeze;        /* 'z'  stack squeezing & lazy blackholing */
 };
 
 struct COST_CENTRE_FLAGS {
index 1d381e6..a9938aa 100644 (file)
@@ -37,6 +37,7 @@
 
 /* Stack frames */
 RTS_RET_INFO(stg_upd_frame_info);
+RTS_RET_INFO(stg_marked_upd_frame_info);
 RTS_RET_INFO(stg_noupd_frame_info);
 RTS_RET_INFO(stg_seq_frame_info);
 RTS_RET_INFO(stg_catch_frame_info);
@@ -45,6 +46,7 @@ RTS_RET_INFO(stg_atomically_frame_info);
 RTS_RET_INFO(stg_catch_stm_frame_info);
 
 RTS_ENTRY(stg_upd_frame_ret);
+RTS_ENTRY(stg_marked_upd_frame_ret);
 RTS_ENTRY(stg_seq_frame_ret);
 
 /* Entry code for constructors created by the bytecode interpreter */
index 1f6ef3f..e37c50d 100644 (file)
@@ -392,7 +392,7 @@ extern lnat     countNurseryBlocks   ( void );
    Functions from GC.c 
    -------------------------------------------------------------------------- */
 
-extern void         threadPaused ( StgTSO * );
+extern void         threadPaused ( Capability *cap, StgTSO * );
 extern StgClosure * isAlive      ( StgClosure *p );
 extern void         markCAFs     ( evac_fn evac );
 
index d08bf23..5872f42 100644 (file)
@@ -23,6 +23,7 @@
 #include "OSThreads.h"
 #include "Capability.h"
 #include "Schedule.h"
+#include "Sparks.h"
 
 #if !defined(SMP)
 Capability MainCapability;     // for non-SMP, we have one global capability
@@ -74,6 +75,8 @@ anyWorkForMe( Capability *cap, Task *task )
        } else {
            return (cap->run_queue_hd->bound == task);
        }
+    } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
+       return rtsTrue;
     }
     return globalWorkToDo();
 }
@@ -263,7 +266,7 @@ releaseCapability_ (Capability* cap)
 
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
-    if (!emptyRunQueue(cap) || globalWorkToDo()) {
+    if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
            // The worker Task pops itself from the queue;
index 0399d60..bc8546a 100644 (file)
@@ -4250,74 +4250,6 @@ gcCAFs(void)
 
 
 /* -----------------------------------------------------------------------------
-   Lazy black holing.
-
-   Whenever a thread returns to the scheduler after possibly doing
-   some work, we have to run down the stack and black-hole all the
-   closures referred to by update frames.
-   -------------------------------------------------------------------------- */
-
-static void
-threadLazyBlackHole(StgTSO *tso)
-{
-    StgClosure *frame;
-    StgRetInfoTable *info;
-    StgClosure *bh;
-    StgPtr stack_end;
-    
-    stack_end = &tso->stack[tso->stack_size];
-    
-    frame = (StgClosure *)tso->sp;
-
-    while (1) {
-       info = get_ret_itbl(frame);
-       
-       switch (info->i.type) {
-           
-       case UPDATE_FRAME:
-           bh = ((StgUpdateFrame *)frame)->updatee;
-           
-           /* if the thunk is already blackholed, it means we've also
-            * already blackholed the rest of the thunks on this stack,
-            * so we can stop early.
-            *
-            * The blackhole made for a CAF is a CAF_BLACKHOLE, so they
-            * don't interfere with this optimisation.
-            */
-           if (bh->header.info == &stg_BLACKHOLE_info) {
-               return;
-           }
-           
-           if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
-#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
-               debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh);
-#endif
-#ifdef PROFILING
-               // @LDV profiling
-               // We pretend that bh is now dead.
-               LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
-#endif
-               SET_INFO(bh,&stg_BLACKHOLE_info);
-
-               // We pretend that bh has just been created.
-               LDV_RECORD_CREATE(bh);
-           }
-           
-           frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
-           break;
-           
-       case STOP_FRAME:
-           return;
-           
-           // normal stack frames; do nothing except advance the pointer
-       default:
-           frame = (StgClosure *)((StgPtr)frame + stack_frame_sizeW(frame));
-       }
-    }
-}
-
-
-/* -----------------------------------------------------------------------------
  * Stack squeezing
  *
  * Code largely pinched from old RTS, then hacked to bits.  We also do
@@ -4328,12 +4260,11 @@ threadLazyBlackHole(StgTSO *tso)
 struct stack_gap { StgWord gap_size; struct stack_gap *next_gap; };
 
 static void
-threadSqueezeStack(StgTSO *tso)
+stackSqueeze(StgTSO *tso, StgPtr bottom)
 {
     StgPtr frame;
     rtsBool prev_was_update_frame;
     StgClosure *updatee = NULL;
-    StgPtr bottom;
     StgRetInfoTable *info;
     StgWord current_gap_size;
     struct stack_gap *gap;
@@ -4344,8 +4275,6 @@ threadSqueezeStack(StgTSO *tso)
     //    contains two values: the size of the gap, and the distance
     //    to the next gap (or the stack top).
 
-    bottom = &(tso->stack[tso->stack_size]);
-
     frame = tso->sp;
 
     ASSERT(frame < bottom);
@@ -4363,20 +4292,6 @@ threadSqueezeStack(StgTSO *tso)
        { 
            StgUpdateFrame *upd = (StgUpdateFrame *)frame;
 
-           if (upd->updatee->header.info == &stg_BLACKHOLE_info) {
-
-               // found a BLACKHOLE'd update frame; we've been here
-               // before, in a previous GC, so just break out.
-
-               // Mark the end of the gap, if we're in one.
-               if (current_gap_size != 0) {
-                   gap = (struct stack_gap *)(frame-sizeofW(StgUpdateFrame));
-               }
-               
-               frame += sizeofW(StgUpdateFrame);
-               goto done_traversing;
-           }
-
            if (prev_was_update_frame) {
 
                TICK_UPD_SQUEEZED();
@@ -4409,31 +4324,6 @@ threadSqueezeStack(StgTSO *tso)
 
            // single update frame, or the topmost update frame in a series
            else {
-               StgClosure *bh = upd->updatee;
-
-               // Do lazy black-holing
-               if (bh->header.info != &stg_BLACKHOLE_info &&
-                   bh->header.info != &stg_CAF_BLACKHOLE_info) {
-#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
-                   debugBelch("Unexpected lazy BHing required at 0x%04lx",(long)bh);
-#endif
-#ifdef DEBUG
-                   // zero out the slop so that the sanity checker can tell
-                   // where the next closure is.
-                   DEBUG_FILL_SLOP(bh);
-#endif
-#ifdef PROFILING
-                   // We pretend that bh is now dead.
-                   // ToDo: is the slop filling the same as DEBUG_FILL_SLOP?
-                   LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
-#endif
-                   // Todo: maybe use SET_HDR() and remove LDV_RECORD_CREATE()?
-                   SET_INFO(bh,&stg_BLACKHOLE_info);
-
-                   // We pretend that bh has just been created.
-                   LDV_RECORD_CREATE(bh);
-               }
-
                prev_was_update_frame = rtsTrue;
                updatee = upd->updatee;
                frame += sizeofW(StgUpdateFrame);
@@ -4456,8 +4346,10 @@ threadSqueezeStack(StgTSO *tso)
        }
     }
 
-done_traversing:
-           
+    if (current_gap_size != 0) {
+       gap = (struct stack_gap *) (frame - sizeofW(StgUpdateFrame));
+    }
+
     // Now we have a stack with gaps in it, and we have to walk down
     // shoving the stack up to fill in the gaps.  A diagram might
     // help:
@@ -4515,12 +4407,110 @@ done_traversing:
  * turned on.
  * -------------------------------------------------------------------------- */
 void
-threadPaused(StgTSO *tso)
+threadPaused(Capability *cap, StgTSO *tso)
 {
-  if ( RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue )
-    threadSqueezeStack(tso);   // does black holing too 
-  else
-    threadLazyBlackHole(tso);
+    StgClosure *frame;
+    StgRetInfoTable *info;
+    StgClosure *bh;
+    StgPtr stack_end;
+    nat words_to_squeeze = 0;
+    nat weight           = 0;
+    nat weight_pending   = 0;
+    rtsBool prev_was_update_frame;
+    
+    stack_end = &tso->stack[tso->stack_size];
+    
+    frame = (StgClosure *)tso->sp;
+
+    while (1) {
+       // If we've already marked this frame, then stop here.
+       if (frame->header.info == (StgInfoTable *)&stg_marked_upd_frame_info) {
+           goto end;
+       }
+
+       info = get_ret_itbl(frame);
+       
+       switch (info->i.type) {
+           
+       case UPDATE_FRAME:
+
+           SET_INFO(frame, (StgInfoTable *)&stg_marked_upd_frame_info);
+
+           bh = ((StgUpdateFrame *)frame)->updatee;
+
+           if (closure_IND(bh) || bh->header.info == &stg_BLACKHOLE_info) {
+               IF_DEBUG(squeeze, debugBelch("suspending duplicate work: %d words of stack\n", (StgPtr)frame - tso->sp));
+
+               // If this closure is already an indirection, then
+               // suspend the computation up to this point:
+               suspendComputation(cap,tso,(StgPtr)frame);
+
+               // Now drop the update frame, and arrange to return
+               // the value to the frame underneath:
+               tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2;
+               tso->sp[1] = (StgWord)bh;
+               tso->sp[0] = (W_)&stg_enter_info;
+
+               // And continue with threadPaused; there might be
+               // yet more computation to suspend.
+               threadPaused(cap,tso);
+               return;
+           }
+
+           if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
+#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
+               debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh);
+#endif
+               // zero out the slop so that the sanity checker can tell
+               // where the next closure is.
+               DEBUG_FILL_SLOP(bh);
+#ifdef PROFILING
+               // @LDV profiling
+               // We pretend that bh is now dead.
+               LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
+#endif
+               SET_INFO(bh,&stg_BLACKHOLE_info);
+
+               // We pretend that bh has just been created.
+               LDV_RECORD_CREATE(bh);
+           }
+           
+           frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
+           if (prev_was_update_frame) {
+               words_to_squeeze += sizeofW(StgUpdateFrame);
+               weight += weight_pending;
+               weight_pending = 0;
+           }
+           prev_was_update_frame = rtsTrue;
+           break;
+           
+       case STOP_FRAME:
+           goto end;
+           
+           // normal stack frames; do nothing except advance the pointer
+       default:
+       {
+           nat frame_size = stack_frame_sizeW(frame);
+           weight_pending += frame_size;
+           frame = (StgClosure *)((StgPtr)frame + frame_size);
+           prev_was_update_frame = rtsFalse;
+       }
+       }
+    }
+
+end:
+    IF_DEBUG(squeeze, 
+            debugBelch("words_to_squeeze: %d, weight: %d, squeeze: %s\n", 
+                       words_to_squeeze, weight, 
+                       weight < words_to_squeeze ? "YES" : "NO"));
+
+    // Should we squeeze or not?  Arbitrary heuristic: we squeeze if
+    // the number of words we have to shift down is less than the
+    // number of stack words we squeeze away by doing so.
+    if (1 /*RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue &&
+           weight < words_to_squeeze*/) {
+       stackSqueeze(tso, (StgPtr)frame);
+    }
 }
 
 /* -----------------------------------------------------------------------------
index f007c4a..b31ade0 100644 (file)
@@ -57,7 +57,7 @@
 #define RETURN_TO_SCHEDULER(todo,retcode)      \
    SAVE_STACK_POINTERS;                                \
    cap->r.rCurrentTSO->what_next = (todo);     \
-   threadPaused(cap->r.rCurrentTSO);           \
+   threadPaused(cap,cap->r.rCurrentTSO);               \
    cap->r.rRet = (retcode);                    \
    return cap;
 
index f086368..25d53da 100644 (file)
@@ -190,6 +190,7 @@ void initRtsFlagsDefaults(void)
     RtsFlags.DebugFlags.gran           = rtsFalse;
     RtsFlags.DebugFlags.par            = rtsFalse;
     RtsFlags.DebugFlags.linker         = rtsFalse;
+    RtsFlags.DebugFlags.squeeze                = rtsFalse;
 #endif
 
 #if defined(PROFILING) || defined(PAR)
@@ -431,6 +432,7 @@ usage_text[] = {
 "  -DP  DEBUG: par",
 "  -Dl  DEBUG: linker",
 "  -Dm  DEBUG: stm",
+"  -Dz  DEBUG: stack squezing",
 "",
 #endif /* DEBUG */
 #if defined(SMP)
@@ -726,6 +728,9 @@ error = rtsTrue;
                      case 'm':
                          RtsFlags.DebugFlags.stm = rtsTrue;
                          break;
+                     case 'z':
+                         RtsFlags.DebugFlags.squeeze = rtsTrue;
+                         break;
                      default:
                          bad_option( rts_argv[arg] );
                      }
index 299e132..4891bbf 100644 (file)
@@ -250,7 +250,7 @@ static void AllRoots(evac_fn evac);
 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
 
 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
-                       rtsBool stop_at_atomically);
+                       rtsBool stop_at_atomically, StgPtr stop_here);
 
 static void deleteThread (Capability *cap, StgTSO *tso);
 static void deleteRunQueue (Capability *cap);
@@ -396,7 +396,7 @@ schedule (Capability *initialCapability, Task *task)
       
 #ifdef SMP
       schedulePushWork(cap,task);
-#endif         
+#endif
 
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
@@ -415,6 +415,9 @@ schedule (Capability *initialCapability, Task *task)
     //
     if (interrupted) {
        deleteRunQueue(cap);
+#if defined(SMP)
+       discardSparksCap(cap);
+#endif
        if (shutting_down_scheduler) {
            IF_DEBUG(scheduler, sched_belch("shutting down"));
            // If we are a worker, just exit.  If we're a bound thread
@@ -428,23 +431,17 @@ schedule (Capability *initialCapability, Task *task)
        }
     }
 
-#if defined(not_yet) && defined(SMP)
-    //
-    // Top up the run queue from our spark pool.  We try to make the
-    // number of threads in the run queue equal to the number of
-    // free capabilities.
-    //
+#if defined(SMP)
+    // If the run queue is empty, take a spark and turn it into a thread.
     {
-       StgClosure *spark;
-       if (emptyRunQueue()) {
-           spark = findSpark(rtsFalse);
-           if (spark == NULL) {
-               break; /* no more sparks in the pool */
-           } else {
-               createSparkThread(spark);         
+       if (emptyRunQueue(cap)) {
+           StgClosure *spark;
+           spark = findSpark(cap);
+           if (spark != NULL) {
                IF_DEBUG(scheduler,
-                        sched_belch("==^^ turning spark of closure %p into a thread",
+                        sched_belch("turning spark of closure %p into a thread",
                                     (StgClosure *)spark));
+               createSparkThread(cap,spark);     
            }
        }
     }
@@ -739,9 +736,10 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
-    // Check whether we have more threads on our run queue that we
-    // could hand to another Capability.
-    if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
+    // Check whether we have more threads on our run queue, or sparks
+    // in our pool, that we could hand to another Capability.
+    if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
+       && sparkPoolSizeCap(cap) < 2) {
        return;
     }
 
@@ -772,31 +770,54 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
 
     if (n_free_caps > 0) {
        StgTSO *prev, *t, *next;
+       rtsBool pushed_to_all;
+
        IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
 
-       prev = cap->run_queue_hd;
-       t = prev->link;
-       prev->link = END_TSO_QUEUE;
        i = 0;
-       for (; t != END_TSO_QUEUE; t = next) {
-           next = t->link;
-           t->link = END_TSO_QUEUE;
-           if (t->what_next == ThreadRelocated
-               || t->bound == task) { // don't move my bound thread
-               prev->link = t;
-               prev = t;
-           } else if (i == n_free_caps) {
-               i = 0;
-               // keep one for us
-               prev->link = t;
-               prev = t;
-           } else {
-               appendToRunQueue(free_caps[i],t);
-               if (t->bound) { t->bound->cap = free_caps[i]; }
-               i++;
+       pushed_to_all = rtsFalse;
+
+       if (cap->run_queue_hd != END_TSO_QUEUE) {
+           prev = cap->run_queue_hd;
+           t = prev->link;
+           prev->link = END_TSO_QUEUE;
+           for (; t != END_TSO_QUEUE; t = next) {
+               next = t->link;
+               t->link = END_TSO_QUEUE;
+               if (t->what_next == ThreadRelocated
+                   || t->bound == task) { // don't move my bound thread
+                   prev->link = t;
+                   prev = t;
+               } else if (i == n_free_caps) {
+                   pushed_to_all = rtsTrue;
+                   i = 0;
+                   // keep one for us
+                   prev->link = t;
+                   prev = t;
+               } else {
+                   appendToRunQueue(free_caps[i],t);
+                   if (t->bound) { t->bound->cap = free_caps[i]; }
+                   i++;
+               }
+           }
+           cap->run_queue_tl = prev;
+       }
+
+       // If there are some free capabilities that we didn't push any
+       // threads to, then try to push a spark to each one.
+       if (!pushed_to_all) {
+           StgClosure *spark;
+           // i is the next free capability to push to
+           for (; i < n_free_caps; i++) {
+               if (emptySparkPoolCap(free_caps[i])) {
+                   spark = findSpark(cap);
+                   if (spark != NULL) {
+                       IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
+                       newSpark(&(free_caps[i]->r), spark);
+                   }
+               }
            }
        }
-       cap->run_queue_tl = prev;
 
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
@@ -812,15 +833,20 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
  * Start any pending signal handlers
  * ------------------------------------------------------------------------- */
 
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
 static void
 scheduleStartSignalHandlers(Capability *cap)
 {
-#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
     if (signals_pending()) { // safe outside the lock
        startSignalHandlers(cap);
     }
-#endif
 }
+#else
+static void
+scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
+{
+}
+#endif
 
 /* ----------------------------------------------------------------------------
  * Check for blocked threads that can be woken up.
@@ -1926,7 +1952,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
                        // ATOMICALLY_FRAME, aborting the (nested)
                        // transaction, and saving the stack of any
                        // partially-evaluated thunks on the heap.
-                       raiseAsync_(cap, t, NULL, rtsTrue);
+                       raiseAsync_(cap, t, NULL, rtsTrue, NULL);
                        
 #ifdef REG_R1
                        ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
@@ -2165,7 +2191,7 @@ suspendThread (StgRegTable *reg)
   // XXX this might not be necessary --SDM
   tso->what_next = ThreadRunGHC;
 
-  threadPaused(tso);
+  threadPaused(cap,tso);
 
   if(tso->blocked_exceptions == NULL)  {
       tso->why_blocked = BlockedOnCCall;
@@ -2660,6 +2686,10 @@ initScheduler(void)
 
   initTaskManager();
 
+#if defined(SMP) || defined(PARALLEL_HASKELL)
+  initSparkPools();
+#endif
+
 #if defined(SMP)
   /*
    * Eagerly start one worker to run each Capability, except for
@@ -2679,10 +2709,6 @@ initScheduler(void)
   }
 #endif
 
-#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
-  initSparkPools();
-#endif
-
   RELEASE_LOCK(&sched_mutex);
 }
 
@@ -2772,7 +2798,7 @@ GetRoots( evac_fn evac )
 
     evac((StgClosure **)&blackhole_queue);
 
-#if defined(PARALLEL_HASKELL) || defined(GRAN)
+#if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
     markSparkQueue(evac);
 #endif
     
@@ -3607,15 +3633,22 @@ checkBlackHoles (Capability *cap)
 void
 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
 {
-    raiseAsync_(cap, tso, exception, rtsFalse);
+    raiseAsync_(cap, tso, exception, rtsFalse, NULL);
+}
+
+void
+suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+{
+    raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
 }
 
 static void
 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
-           rtsBool stop_at_atomically)
+           rtsBool stop_at_atomically, StgPtr stop_here)
 {
     StgRetInfoTable *info;
-    StgPtr sp;
+    StgPtr sp, frame;
+    nat i;
   
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
@@ -3640,8 +3673,8 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
        sp[0] = (W_)&stg_dummy_ret_closure;
     }
 
-    while (1) {
-       nat i;
+    frame = sp + 1;
+    while (stop_here == NULL || frame < stop_here) {
 
        // 1. Let the top of the stack be the "current closure"
        //
@@ -3661,95 +3694,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
         // transaction
        
-       
-       StgPtr frame;
-       
-       frame = sp + 1;
        info = get_ret_itbl((StgClosure *)frame);
-       
-       while (info->i.type != UPDATE_FRAME
-              && (info->i.type != CATCH_FRAME || exception == NULL)
-              && info->i.type != STOP_FRAME
-              && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
-       {
-            if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
-              // IF we find an ATOMICALLY_FRAME then we abort the
-              // current transaction and propagate the exception.  In
-              // this case (unlike ordinary exceptions) we do not care
-              // whether the transaction is valid or not because its
-              // possible validity cannot have caused the exception
-              // and will not be visible after the abort.
-              IF_DEBUG(stm,
-                       debugBelch("Found atomically block delivering async exception\n"));
-              stmAbortTransaction(tso -> trec);
-              tso -> trec = stmGetEnclosingTRec(tso -> trec);
-            }
-           frame += stack_frame_sizeW((StgClosure *)frame);
-           info = get_ret_itbl((StgClosure *)frame);
-       }
-       
+
        switch (info->i.type) {
-           
-       case ATOMICALLY_FRAME:
-           ASSERT(stop_at_atomically);
-           ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
-           stmCondemnTransaction(tso -> trec);
-#ifdef REG_R1
-           tso->sp = frame;
-#else
-           // R1 is not a register: the return convention for IO in
-           // this case puts the return value on the stack, so we
-           // need to set up the stack to return to the atomically
-           // frame properly...
-           tso->sp = frame - 2;
-           tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
-           tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
-           tso->what_next = ThreadRunGHC;
-           return;
 
-       case CATCH_FRAME:
-           // If we find a CATCH_FRAME, and we've got an exception to raise,
-           // then build the THUNK raise(exception), and leave it on
-           // top of the CATCH_FRAME ready to enter.
-           //
-       {
-#ifdef PROFILING
-           StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
-           StgThunk *raise;
-           
-           // we've got an exception to raise, so let's pass it to the
-           // handler in this frame.
-           //
-           raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
-           TICK_ALLOC_SE_THK(1,0);
-           SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
-           raise->payload[0] = exception;
-           
-           // throw away the stack from Sp up to the CATCH_FRAME.
-           //
-           sp = frame - 1;
-           
-           /* Ensure that async excpetions are blocked now, so we don't get
-            * a surprise exception before we get around to executing the
-            * handler.
-            */
-           if (tso->blocked_exceptions == NULL) {
-               tso->blocked_exceptions = END_TSO_QUEUE;
-           }
-           
-           /* Put the newly-built THUNK on top of the stack, ready to execute
-            * when the thread restarts.
-            */
-           sp[0] = (W_)raise;
-           sp[-1] = (W_)&stg_enter_info;
-           tso->sp = sp-1;
-           tso->what_next = ThreadRunGHC;
-           IF_DEBUG(sanity, checkTSO(tso));
-           return;
-       }
-       
        case UPDATE_FRAME:
        {
            StgAP_STACK * ap;
@@ -3780,9 +3728,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
                     printObj((StgClosure *)ap);
                );
 
-           // Replace the updatee with an indirection - happily
-           // this will also wake up any threads currently
-           // waiting on the result.
+           // Replace the updatee with an indirection
            //
            // Warning: if we're in a loop, more than one update frame on
            // the stack may point to the same object.  Be careful not to
@@ -3799,20 +3745,104 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
            }
            sp += sizeofW(StgUpdateFrame) - 1;
            sp[0] = (W_)ap; // push onto stack
+           frame = sp + 1;
            break;
        }
-       
+
        case STOP_FRAME:
            // We've stripped the entire stack, the thread is now dead.
            tso->what_next = ThreadKilled;
            tso->sp = frame + sizeofW(StgStopFrame);
            return;
+
+       case CATCH_FRAME:
+           // If we find a CATCH_FRAME, and we've got an exception to raise,
+           // then build the THUNK raise(exception), and leave it on
+           // top of the CATCH_FRAME ready to enter.
+           //
+       {
+#ifdef PROFILING
+           StgCatchFrame *cf = (StgCatchFrame *)frame;
+#endif
+           StgThunk *raise;
+           
+           if (exception == NULL) break;
+
+           // we've got an exception to raise, so let's pass it to the
+           // handler in this frame.
+           //
+           raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
+           TICK_ALLOC_SE_THK(1,0);
+           SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
+           raise->payload[0] = exception;
+           
+           // throw away the stack from Sp up to the CATCH_FRAME.
+           //
+           sp = frame - 1;
+           
+           /* Ensure that async excpetions are blocked now, so we don't get
+            * a surprise exception before we get around to executing the
+            * handler.
+            */
+           if (tso->blocked_exceptions == NULL) {
+               tso->blocked_exceptions = END_TSO_QUEUE;
+           }
+
+           /* Put the newly-built THUNK on top of the stack, ready to execute
+            * when the thread restarts.
+            */
+           sp[0] = (W_)raise;
+           sp[-1] = (W_)&stg_enter_info;
+           tso->sp = sp-1;
+           tso->what_next = ThreadRunGHC;
+           IF_DEBUG(sanity, checkTSO(tso));
+           return;
+       }
+           
+       case ATOMICALLY_FRAME:
+           if (stop_at_atomically) {
+               ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+               stmCondemnTransaction(tso -> trec);
+#ifdef REG_R1
+               tso->sp = frame;
+#else
+               // R1 is not a register: the return convention for IO in
+               // this case puts the return value on the stack, so we
+               // need to set up the stack to return to the atomically
+               // frame properly...
+               tso->sp = frame - 2;
+               tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+               tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+               tso->what_next = ThreadRunGHC;
+               return;
+           }
+           // Not stop_at_atomically... fall through and abort the
+           // transaction.
+           
+       case CATCH_RETRY_FRAME:
+           // IF we find an ATOMICALLY_FRAME then we abort the
+           // current transaction and propagate the exception.  In
+           // this case (unlike ordinary exceptions) we do not care
+           // whether the transaction is valid or not because its
+           // possible validity cannot have caused the exception
+           // and will not be visible after the abort.
+           IF_DEBUG(stm,
+                    debugBelch("Found atomically block delivering async exception\n"));
+           stmAbortTransaction(tso -> trec);
+           tso -> trec = stmGetEnclosingTRec(tso -> trec);
+           break;
            
        default:
-           barf("raiseAsync");
+           break;
        }
+
+       // move on to the next stack frame
+       frame += stack_frame_sizeW((StgClosure *)frame);
     }
-    barf("raiseAsync");
+
+    // if we got here, then we stopped at stop_here
+    ASSERT(stop_here != NULL);
 }
 
 /* -----------------------------------------------------------------------------
@@ -4156,6 +4186,7 @@ printAllThreads(void)
       }
   }
 
+  debugBelch("other threads:\n");
   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
       if (t->why_blocked != NotBlocked) {
          printThreadStatus(t);
index 9335e59..1626852 100644 (file)
@@ -55,6 +55,14 @@ StgTSO * unblockOne(Capability *cap, StgTSO *tso);
  */
 void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception);
 
+/* suspendComputation()
+ *
+ * A variant of raiseAsync(), this strips the stack of the specified
+ * thread down to the stop_here point, leaving a current closure on
+ * top of the stack at [stop_here - 1].
+ */
+void suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here);
+
 /* raiseExceptionHelper */
 StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
 
index 6e638d4..12af296 100644 (file)
@@ -1,24 +1,11 @@
 /* ---------------------------------------------------------------------------
  *
- * (c) The GHC Team, 2000
+ * (c) The GHC Team, 2000-2005
  *
- * Sparking support for PAR and SMP versions of the RTS.
+ * Sparking support for PARALLEL_HASKELL and SMP versions of the RTS.
  *
  * -------------------------------------------------------------------------*/
 
-//@node Spark Management Routines, , ,
-//@section Spark Management Routines
-
-//@menu
-//* Includes::                 
-//* GUM code::                 
-//* GranSim code::             
-//@end menu
-//*/
-
-//@node Includes, GUM code, Spark Management Routines, Spark Management Routines
-//@subsection Includes
-
 #include "PosixSource.h"
 #include "Rts.h"
 #include "Schedule.h"
@@ -27,7 +14,7 @@
 #include "RtsFlags.h"
 #include "RtsUtils.h"
 #include "ParTicky.h"
-# if defined(PAR)
+# if defined(PARALLEL_HASKELL)
 # include "ParallelRts.h"
 # include "GranSimRts.h"   // for GR_...
 # elif defined(GRAN)
 # endif
 #include "Sparks.h"
 
-#if /*defined(SMP) ||*/ defined(PAR)
+#if defined(SMP) || defined(PARALLEL_HASKELL)
 
-//@node GUM code, GranSim code, Includes, Spark Management Routines
-//@subsection GUM code
+static INLINE_ME void bump_hd (StgSparkPool *p)
+{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
 
-static void slide_spark_pool( StgSparkPool *pool );
+static INLINE_ME void bump_tl (StgSparkPool *p)
+{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
 
-void
-initSparkPools( void )
-{
-  Capability *cap;
-  StgSparkPool *pool;
+/* -----------------------------------------------------------------------------
+ * 
+ * Initialising spark pools.
+ *
+ * -------------------------------------------------------------------------- */
 
-#ifdef SMP
-  /* walk over the capabilities, allocating a spark pool for each one */
-  for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
-  /* allocate a single spark pool */
-  cap = &MainRegTable;
-  {
-#endif
-    pool = &(cap->rSparks);
-    
+static void 
+initSparkPool(StgSparkPool *pool)
+{
     pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
-                                    * sizeof(StgClosure *),
-                                    "initSparkPools");
+                               * sizeof(StgClosure *),
+                               "initSparkPools");
     pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
     pool->hd  = pool->base;
     pool->tl  = pool->base;
-  }
 }
 
-/* 
-   We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
-   spark. Rationale, we don't want to give away the only work a PE has.
-   ToDo: introduce low- and high-water-marks for load balancing.
-*/
-StgClosure *
-findSpark( rtsBool for_export )
+void
+initSparkPools( void )
 {
-  Capability *cap;
-  StgSparkPool *pool;
-  StgClosure *spark, *first=NULL;
-  rtsBool isIdlePE = EMPTY_RUN_QUEUE();
-
 #ifdef SMP
-  /* walk over the capabilities, allocating a spark pool for each one */
-  for (cap = free_capabilities; cap != NULL; cap = cap->link) {
+    /* walk over the capabilities, allocating a spark pool for each one */
+    nat i;
+    for (i = 0; i < n_capabilities; i++) {
+       initSparkPool(&capabilities[i].r.rSparks);
+    }
 #else
-  /* allocate a single spark pool */
-  cap = &MainRegTable;
-  {
+    /* allocate a single spark pool */
+    initSparkPool(&MainCapability.r.rSparks);
 #endif
-    pool = &(cap->rSparks);
-    while (pool->hd < pool->tl) {
-      spark = *pool->hd++;
-      if (closure_SHOULD_SPARK(spark)) {
-       if (for_export && isIdlePE) {
-         if (first==NULL) {
-           first = spark; // keep the first usable spark if PE is idle
-         } else {
-           pool->hd--;    // found a second spark; keep it in the pool 
-           ASSERT(*pool->hd==spark);
+}
+
+/* -----------------------------------------------------------------------------
+ * 
+ * findSpark: find a spark on the current Capability that we can fork
+ * into a thread.
+ *
+ * -------------------------------------------------------------------------- */
+
+StgClosure *
+findSpark (Capability *cap)
+{
+    StgSparkPool *pool;
+    StgClosure *spark;
+    
+    pool = &(cap->r.rSparks);
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+    while (pool->hd != pool->tl) {
+       spark = *pool->hd;
+       bump_hd(pool);
+       if (closure_SHOULD_SPARK(spark)) {
+#ifdef GRAN
            if (RtsFlags.ParFlags.ParStats.Sparks) 
-             DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                              GR_STEALING, ((StgTSO *)NULL), first, 
-                              0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-           return first;  // and return the *first* spark found
-         }
-        } else {
-         if (RtsFlags.ParFlags.ParStats.Sparks && for_export) 
-           DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                            GR_STEALING, ((StgTSO *)NULL), spark, 
-                            0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-         return spark;    // return first spark found
+               DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
+                                GR_STEALING, ((StgTSO *)NULL), spark, 
+                                0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+#endif
+           return spark;
        }
-      }
     }
-    slide_spark_pool(pool);
-  }
-  return NULL;
+    // spark pool is now empty
+    return NULL;
 }
 
-/* 
-   activateSpark is defined in Schedule.c
-*/
+/* -----------------------------------------------------------------------------
+ * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
+ * implicit slide i.e. after marking all sparks are at the beginning of the
+ * spark pool and the spark pool only contains sparkable closures 
+ * -------------------------------------------------------------------------- */
+
+void
+markSparkQueue (evac_fn evac)
+{ 
+    StgClosure **sparkp, **to_sparkp;
+    nat i, n, pruned_sparks; // stats only
+    StgSparkPool *pool;
+    Capability *cap;
+    
+    PAR_TICKY_MARK_SPARK_QUEUE_START();
+    
+    n = 0;
+    pruned_sparks = 0;
+    for (i = 0; i < n_capabilities; i++) {
+       cap = &capabilities[i];
+       pool = &(cap->r.rSparks);
+       
+       ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+#if defined(PARALLEL_HASKELL)
+       // stats only
+       n = 0;
+       pruned_sparks = 0;
+#endif
+       
+       sparkp = pool->hd;
+       to_sparkp = pool->hd;
+       while (sparkp != pool->tl) {
+           ASSERT(to_sparkp<=sparkp);
+           ASSERT(*sparkp!=NULL);
+           ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
+           // ToDo?: statistics gathering here (also for GUM!)
+           if (closure_SHOULD_SPARK(*sparkp)) {
+               evac(sparkp);
+               *to_sparkp++ = *sparkp;
+               n++;
+           } else {
+               pruned_sparks++;
+           }
+           sparkp++;
+           if (sparkp == pool->lim) {
+               sparkp = pool->base;
+           }
+       }
+       pool->tl = to_sparkp;
+       
+       PAR_TICKY_MARK_SPARK_QUEUE_END(n);
+       
+#if defined(PARALLEL_HASKELL)
+       IF_DEBUG(scheduler,
+                debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+                           n, pruned_sparks, mytid));
+#else
+       IF_DEBUG(scheduler,
+              debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n",
+                         n, pruned_sparks));
+#endif
+       
+       IF_DEBUG(scheduler,
+                debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)\n",
+                           sparkPoolSize(pool), pool->hd, pool->tl));
+       
+    }
+}
+
+/* -----------------------------------------------------------------------------
+ * 
+ * Turn a spark into a real thread
+ *
+ * -------------------------------------------------------------------------- */
+
+void
+createSparkThread (Capability *cap, StgClosure *p)
+{
+    StgTSO *tso;
+
+    tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
+    appendToRunQueue(cap,tso);
+}
+
+/* -----------------------------------------------------------------------------
+ * 
+ * Create a new spark
+ *
+ * -------------------------------------------------------------------------- */
+
+#define DISCARD_NEW
+
+StgInt
+newSpark (StgRegTable *reg, StgClosure *p)
+{
+    StgSparkPool *pool = &(reg->rSparks);
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+    if (closure_SHOULD_SPARK(p)) {
+#ifdef DISCARD_NEW
+       StgClosure **new_tl;
+       new_tl = pool->tl + 1;
+       if (new_tl == pool->lim) { new_tl = pool->base; }
+       if (new_tl != pool->hd) {
+           *pool->tl = p;
+           pool->tl = new_tl;
+       } else if (!closure_SHOULD_SPARK(*pool->hd)) {
+           // if the old closure is not sparkable, discard it and
+           // keep the new one.  Otherwise, keep the old one.
+           *pool->tl = p;
+           bump_hd(pool);
+       }
+#else  /* DISCARD OLD */
+       *pool->tl = p;
+       bump_tl(pool);
+       if (pool->tl == pool->hd) { bump_hd(pool); }
+#endif
+    }  
+
+    ASSERT_SPARK_POOL_INVARIANTS(pool);
+    return 1;
+}
+
+#endif /* PARALLEL_HASKELL || SMP */
+
+/* -----------------------------------------------------------------------------
+ * 
+ * GRAN & PARALLEL_HASKELL stuff beyond here.
+ *
+ * -------------------------------------------------------------------------- */
+
+#if defined(PARALLEL_HASKELL) || defined(GRAN)
+
+static void slide_spark_pool( StgSparkPool *pool );
+
 rtsBool
 add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
 {
@@ -131,7 +242,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
       pool->tl < pool->lim) {
     *(pool->tl++) = closure;
 
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
     // collect parallel global statistics (currently done together with GC stats)
     if (RtsFlags.ParFlags.ParStats.Global &&
        RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
@@ -141,7 +252,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
 #endif
     return rtsTrue;
   } else {
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
     // collect parallel global statistics (currently done together with GC stats)
     if (RtsFlags.ParFlags.ParStats.Global &&
        RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
@@ -175,88 +286,6 @@ slide_spark_pool( StgSparkPool *pool )
   pool->tl = to_sparkp;
 }
 
-nat
-spark_queue_len( StgSparkPool *pool ) 
-{
-  return (nat) (pool->tl - pool->hd);
-}
-
-/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
-   implicit slide i.e. after marking all sparks are at the beginning of the
-   spark pool and the spark pool only contains sparkable closures 
-*/
-void
-markSparkQueue( void )
-{ 
-  StgClosure **sparkp, **to_sparkp;
-  nat n, pruned_sparks; // stats only
-  StgSparkPool *pool;
-  Capability *cap;
-
-  PAR_TICKY_MARK_SPARK_QUEUE_START();
-
-#ifdef SMP
-  /* walk over the capabilities, allocating a spark pool for each one */
-  for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
-  /* allocate a single spark pool */
-  cap = &MainRegTable;
-  {
-#endif
-    pool = &(cap->rSparks);
-
-#if defined(PAR)
-    // stats only
-    n = 0;
-    pruned_sparks = 0;
-#endif
-
-    sparkp = pool->hd;
-    to_sparkp = pool->base;
-    while (sparkp < pool->tl) {
-      ASSERT(to_sparkp<=sparkp);
-      ASSERT(*sparkp!=NULL);
-      ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
-      // ToDo?: statistics gathering here (also for GUM!)
-      if (closure_SHOULD_SPARK(*sparkp)) {
-       *to_sparkp = MarkRoot(*sparkp);
-       to_sparkp++;
-#ifdef PAR
-       n++;
-#endif
-      } else {
-#ifdef PAR
-       pruned_sparks++;
-#endif
-      }
-      sparkp++;
-    }
-    pool->hd = pool->base;
-    pool->tl = to_sparkp;
-
-    PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-    
-#if defined(SMP)
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
-                  n, pruned_sparks, pthread_self()));
-#elif defined(PAR)
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
-                  n, pruned_sparks, mytid));
-#else
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks",
-                  n, pruned_sparks));
-#endif
-
-    IF_DEBUG(scheduler,
-            debugBelch("markSparkQueue:   new spark queue len=%d; (hd=%p; tl=%p)",
-                  spark_queue_len(pool), pool->hd, pool->tl));
-
-  }
-}
-
 void
 disposeSpark(spark)
 StgClosure *spark;
@@ -276,22 +305,11 @@ StgClosure *spark;
 
 #elif defined(GRAN)
 
-//@node GranSim code,  , GUM code, Spark Management Routines
-//@subsection GranSim code
-
-//@menu
-//* Basic interface to sparkq::         
-//* Aux fcts::                 
-//@end menu
-
-//@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
-//@subsubsection Basic interface to sparkq
 /* 
    Search the spark queue of the proc in event for a spark that's worth
    turning into a thread 
    (was gimme_spark in the old RTS)
 */
-//@cindex findLocalSpark
 void
 findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
 {
@@ -423,7 +441,6 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
   node pointed to by the spark at some point in the future.
   (was munch_spark in the old RTS)
 */
-//@cindex activateSpark
 rtsBool
 activateSpark (rtsEvent *event, rtsSparkQ spark) 
 {
@@ -530,7 +547,6 @@ STATIC_INLINE nat  IGNORE(nat x) { return (0); };
 STATIC_INLINE nat  RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
 
 /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
-//@cindex newSpark
 rtsSpark *
 newSpark(node,name,gran_info,size_info,par_info,local)
 StgClosure *node;
@@ -567,7 +583,6 @@ nat name, gran_info, size_info, par_info, local;
   return(newspark);
 }
 
-//@cindex disposeSpark
 void
 disposeSpark(spark)
 rtsSpark *spark;
@@ -576,7 +591,6 @@ rtsSpark *spark;
   stgFree(spark);
 }
 
-//@cindex disposeSparkQ
 void 
 disposeSparkQ(spark)
 rtsSparkQ spark;
@@ -602,7 +616,6 @@ rtsSparkQ spark;
    the queue. 
 */
 
-//@cindex add_to_spark_queue
 void
 add_to_spark_queue(spark)
 rtsSpark *spark;
@@ -709,10 +722,6 @@ rtsSpark *spark;
 #  endif
 }
 
-//@node Aux fcts,  , Basic interface to sparkq, GranSim code
-//@subsubsection Aux fcts
-
-//@cindex spark_queue_len
 nat
 spark_queue_len(proc) 
 PEs proc;
@@ -740,7 +749,6 @@ PEs proc;
    hd and tl pointers of the spark queue. Returns a pointer to the next
    spark in the queue.
 */
-//@cindex delete_from_sparkq
 rtsSpark *
 delete_from_sparkq (spark, p, dispose_too)     /* unlink and dispose spark */
 rtsSpark *spark;
@@ -793,7 +801,6 @@ rtsBool dispose_too;
 }
 
 /* Mark all nodes pointed to by sparks in the spark queues (for GC) */
-//@cindex markSparkQueue
 void
 markSparkQueue(void)
 { 
@@ -813,7 +820,6 @@ markSparkQueue(void)
           print_sparkq_stats());
 }
 
-//@cindex print_spark
 void
 print_spark(spark)
 rtsSpark *spark;
@@ -835,7 +841,6 @@ rtsSpark *spark;
   }
 }
 
-//@cindex print_sparkq
 void
 print_sparkq(proc)
 PEs proc;
@@ -852,7 +857,6 @@ PEs proc;
 /* 
    Print a statistics of all spark queues.
 */
-//@cindex print_sparkq_stats
 void
 print_sparkq_stats(void)
 {
index 3d7687a..1cc92eb 100644 (file)
@@ -9,8 +9,31 @@
 #ifndef SPARKS_H
 #define SPARKS_H
 
-#if defined(GRAN)
+#if defined(PARALLEL_HASKELL) || defined(SMP)
+StgClosure * findSpark         (Capability *cap);
+void         initSparkPools    (void);
+void         markSparkQueue    (evac_fn evac);
+void         createSparkThread (Capability *cap, StgClosure *p);
+StgInt       newSpark          (StgRegTable *reg, StgClosure *p);
+
+INLINE_HEADER void     discardSparks  (StgSparkPool *pool);
+INLINE_HEADER nat      sparkPoolSize  (StgSparkPool *pool);
+INLINE_HEADER rtsBool  emptySparkPool (StgSparkPool *pool);
+
+INLINE_HEADER void     discardSparksCap  (Capability *cap);
+INLINE_HEADER nat      sparkPoolSizeCap  (Capability *cap);
+INLINE_HEADER rtsBool  emptySparkPoolCap (Capability *cap);
+#endif
+
+#if defined(PARALLEL_HASKELL)
+StgTSO      *activateSpark (rtsSpark spark) ;
+rtsBool      add_to_spark_queue( StgClosure *closure, StgSparkPool *pool );
+void         markSparkQueue( void );
+nat          spark_queue_len( StgSparkPool *pool );
+void         disposeSpark( StgClosure *spark );
+#endif
 
+#if defined(GRAN)
 void      findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res);
 rtsBool   activateSpark (rtsEvent *event, rtsSparkQ spark);
 rtsSpark *newSpark(StgClosure *node, nat name, nat gran_info, 
@@ -24,19 +47,48 @@ void      print_sparkq(PEs proc);
 void     print_sparkq_stats(void);
 nat      spark_queue_len(PEs proc);
 void      markSparkQueue(void);
+#endif
+
+/* -----------------------------------------------------------------------------
+ * PRIVATE below here
+ * -------------------------------------------------------------------------- */
 
-#elif defined(PAR) || defined(SMP)
+#if defined(PARALLEL_HASKELL) || defined(SMP)
+
+INLINE_HEADER rtsBool
+emptySparkPool (StgSparkPool *pool)
+{
+    return (pool->hd == pool->tl);
+}
+
+INLINE_HEADER rtsBool
+emptySparkPoolCap (Capability *cap) 
+{ return emptySparkPool(&cap->r.rSparks); }
+
+INLINE_HEADER nat
+sparkPoolSize (StgSparkPool *pool) 
+{
+    if (pool->hd <= pool->tl) {
+       return (pool->hd - pool->tl);
+    } else {
+       return (pool->lim - pool->hd + pool->tl - pool->base);
+    }
+}
+
+INLINE_HEADER nat
+sparkPoolSizeCap (Capability *cap) 
+{ return sparkPoolSize(&cap->r.rSparks); }
+
+INLINE_HEADER void
+discardSparks (StgSparkPool *pool)
+{
+    pool->hd = pool->tl;
+}
+
+INLINE_HEADER void
+discardSparksCap (Capability *cap) 
+{ return discardSparks(&cap->r.rSparks); }
 
-StgClosure  *findSpark( rtsBool );
-void         initSparkPools( void );
-void         markSparkQueue( void );
-#if defined(PAR)
-StgTSO      *activateSpark (rtsSpark spark) ;
-rtsBool      add_to_spark_queue( StgClosure *closure, StgSparkPool *pool );
-void         markSparkQueue( void );
-nat          spark_queue_len( StgSparkPool *pool );
-void         disposeSpark( StgClosure *spark );
-#endif
 
 #endif
 
index fc08b50..54ebfe1 100644 (file)
@@ -203,7 +203,8 @@ StgRun(StgFunPtr f, StgRegTable *basereg) {
 
 extern StgRegTable * StgRun(StgFunPtr f, StgRegTable *basereg);
 
-static void StgRunIsImplementedInAssembler(void)
+void StgRunIsImplementedInAssembler(void);
+void StgRunIsImplementedInAssembler(void)
 {
     __asm__ volatile (
        /*
index 3569a39..2f2a759 100644 (file)
@@ -118,7 +118,7 @@ stg_returnToStackTop
 stg_returnToSched
 {
   SAVE_THREAD_STATE();
-  foreign "C" threadPaused(CurrentTSO);
+  foreign "C" threadPaused(MyCapability() "ptr", CurrentTSO);
   jump StgReturn;
 }
 
@@ -139,7 +139,7 @@ stg_returnToSchedNotPaused
 stg_returnToSchedButFirst
 {
   SAVE_THREAD_STATE();
-  foreign "C" threadPaused(CurrentTSO);
+  foreign "C" threadPaused(MyCapability() "ptr", CurrentTSO);
   jump R2;
 }
 
index 02d1827..1d2fc5f 100644 (file)
@@ -102,6 +102,20 @@ INFO_TABLE_RET( stg_upd_frame,
            )
 UPD_FRAME_ENTRY_TEMPLATE(,stg_IND_direct_info,%ENTRY_CODE(Sp(0)))
 
+
+INFO_TABLE_RET( stg_marked_upd_frame, 
+           UPD_FRAME_WORDS, UPD_FRAME_BITMAP, UPDATE_FRAME,
+           stg_upd_frame_0_ret,
+           stg_upd_frame_1_ret,
+           stg_upd_frame_2_ret,
+           stg_upd_frame_3_ret,
+           stg_upd_frame_4_ret,
+           stg_upd_frame_5_ret,
+           stg_upd_frame_6_ret,
+           stg_upd_frame_7_ret
+           )
+UPD_FRAME_ENTRY_TEMPLATE(,stg_IND_direct_info,%ENTRY_CODE(Sp(0)))
+
 /*-----------------------------------------------------------------------------
   Seq frames 
 
index fd48fb8..7b0dc3a 100644 (file)
@@ -204,16 +204,14 @@ extern void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
   W_ sz;                                                               \
   W_ i;                                                                        \
   inf = %GET_STD_INFO(p);                                              \
-  if (%INFO_TYPE(inf) == HALF_W_(THUNK_SELECTOR)) {                    \
-       StgThunk_payload(p,0) = 0;                                      \
-  } else {                                                             \
+  if (%INFO_TYPE(inf) != HALF_W_(THUNK_SELECTOR)) {                    \
     if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)) {                       \
       if (%INFO_TYPE(inf) == HALF_W_(AP_STACK)) {                      \
           sz = StgAP_STACK_size(p) + BYTES_TO_WDS(SIZEOF_StgAP_STACK_NoHdr); \
       } else {                                                         \
           sz = TO_W_(%INFO_PTRS(inf)) + TO_W_(%INFO_NPTRS(inf));       \
       }                                                                        \
-      i = 0;                                                           \
+      i = 1; /* skip over indirectee */                                        \
       for:                                                             \
         if (i < sz) {                                                  \
           StgThunk_payload(p,i) = 0;                                   \
@@ -232,20 +230,17 @@ DEBUG_FILL_SLOP(StgClosure *p)
 
     switch (inf->type) {
     case BLACKHOLE:
+    case THUNK_SELECTOR:
        return;
     case AP_STACK:
        sz = ((StgAP_STACK *)p)->size + sizeofW(StgAP_STACK) - sizeofW(StgHeader);
        break;
-    case THUNK_SELECTOR:
-#ifdef SMP
-       ((StgSelector *)p)->selectee = 0;
-#endif
-       return;
     default:
        sz = inf->layout.payload.ptrs + inf->layout.payload.nptrs;
         break;
     }
-    for (i = 0; i < sz; i++) {
+    // start at one to skip over the indirectee
+    for (i = 1; i < sz; i++) {
        ((StgThunk *)p)->payload[i] = 0;
     }
 }