[project @ 2006-01-18 10:31:50 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 5cad5b2..21bd59b 100644 (file)
@@ -47,6 +47,9 @@
 #include "Capability.h"
 #include "Task.h"
 #include "AwaitEvent.h"
+#if defined(mingw32_HOST_OS)
+#include "win32/IOManager.h"
+#endif
 
 #ifdef HAVE_SYS_TYPES_H
 #include <sys/types.h>
 # define STATIC_INLINE static
 #endif
 
-#ifdef THREADED_RTS
-#define USED_WHEN_THREADED_RTS
-#define USED_WHEN_NON_THREADED_RTS STG_UNUSED
-#else
-#define USED_WHEN_THREADED_RTS     STG_UNUSED
-#define USED_WHEN_NON_THREADED_RTS
-#endif
-
-#ifdef SMP
-#define USED_WHEN_SMP
-#else
-#define USED_WHEN_SMP STG_UNUSED
-#endif
-
 /* -----------------------------------------------------------------------------
  * Global variables
  * -------------------------------------------------------------------------- */
@@ -189,7 +178,7 @@ rtsBool shutting_down_scheduler = rtsFalse;
  * the THREADED_RTS and (inc. SMP) runtime.
  */
 #if defined(THREADED_RTS)
-Mutex sched_mutex = INIT_MUTEX_VAR;
+Mutex sched_mutex;
 #endif
 
 #if defined(PARALLEL_HASKELL)
@@ -210,7 +199,10 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
-static void scheduleStartSignalHandlers (void);
+#if defined(SMP)
+static void schedulePushWork(Capability *cap, Task *task);
+#endif
+static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
@@ -244,7 +236,7 @@ static void AllRoots(evac_fn evac);
 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
 
 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
-                       rtsBool stop_at_atomically);
+                       rtsBool stop_at_atomically, StgPtr stop_here);
 
 static void deleteThread (Capability *cap, StgTSO *tso);
 static void deleteRunQueue (Capability *cap);
@@ -339,7 +331,9 @@ schedule (Capability *initialCapability, Task *task)
 #endif
   nat prev_what_next;
   rtsBool ready_to_gc;
+#if defined(THREADED_RTS)
   rtsBool first = rtsTrue;
+#endif
   
   cap = initialCapability;
 
@@ -367,10 +361,6 @@ schedule (Capability *initialCapability, Task *task)
 
   while (TERMINATION_CONDITION) {
 
-      ASSERT(cap->running_task == task);
-      ASSERT(task->cap == cap);
-      ASSERT(myTask() == task);
-
 #if defined(GRAN)
       /* Choose the processor with the next event */
       CurrentProc = event->proc;
@@ -383,11 +373,16 @@ schedule (Capability *initialCapability, Task *task)
          // thread for a bit, even if there are others banging at the
          // door.
          first = rtsFalse;
+         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
       } else {
          // Yield the capability to higher-priority tasks if necessary.
          yieldCapability(&cap, task);
       }
 #endif
+      
+#ifdef SMP
+      schedulePushWork(cap,task);
+#endif
 
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
@@ -406,41 +401,39 @@ schedule (Capability *initialCapability, Task *task)
     //
     if (interrupted) {
        deleteRunQueue(cap);
+#if defined(SMP)
+       discardSparksCap(cap);
+#endif
        if (shutting_down_scheduler) {
            IF_DEBUG(scheduler, sched_belch("shutting down"));
-           if (task->tso) { // we are bound
-               task->stat = Interrupted;
-               task->ret  = NULL;
+           // If we are a worker, just exit.  If we're a bound thread
+           // then we will exit below when we've removed our TSO from
+           // the run queue.
+           if (task->tso == NULL && emptyRunQueue(cap)) {
+               return cap;
            }
-           return cap;
        } else {
            IF_DEBUG(scheduler, sched_belch("interrupted"));
        }
     }
 
-#if defined(not_yet) && defined(SMP)
-    //
-    // Top up the run queue from our spark pool.  We try to make the
-    // number of threads in the run queue equal to the number of
-    // free capabilities.
-    //
+#if defined(SMP)
+    // If the run queue is empty, take a spark and turn it into a thread.
     {
-       StgClosure *spark;
-       if (emptyRunQueue()) {
-           spark = findSpark(rtsFalse);
-           if (spark == NULL) {
-               break; /* no more sparks in the pool */
-           } else {
-               createSparkThread(spark);         
+       if (emptyRunQueue(cap)) {
+           StgClosure *spark;
+           spark = findSpark(cap);
+           if (spark != NULL) {
                IF_DEBUG(scheduler,
-                        sched_belch("==^^ turning spark of closure %p into a thread",
+                        sched_belch("turning spark of closure %p into a thread",
                                     (StgClosure *)spark));
+               createSparkThread(cap,spark);     
            }
        }
     }
 #endif // SMP
 
-    scheduleStartSignalHandlers();
+    scheduleStartSignalHandlers(cap);
 
     // Only check the black holes here if we've nothing else to do.
     // During normal execution, the black hole list only gets checked
@@ -574,39 +567,66 @@ run_thread:
     recent_activity = ACTIVITY_YES;
 
     switch (prev_what_next) {
-
+       
     case ThreadKilled:
     case ThreadComplete:
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;
-
+       
     case ThreadRunGHC:
-       ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+    {
+       StgRegTable *r;
+       r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
+       cap = regTableToCapability(r);
+       ret = r->rRet;
        break;
-
+    }
+    
     case ThreadInterpret:
-       ret = interpretBCO(cap);
+       cap = interpretBCO(cap);
+       ret = cap->r.rRet;
        break;
-
+       
     default:
-      barf("schedule: invalid what_next field");
+       barf("schedule: invalid what_next field");
     }
 
-    // in SMP mode, we might return with a different capability than
-    // we started with, if the Haskell thread made a foreign call.  So
-    // let's find out what our current Capability is:
-    cap = task->cap;
-
     cap->in_haskell = rtsFalse;
 
     // The TSO might have moved, eg. if it re-entered the RTS and a GC
     // happened.  So find the new location:
     t = cap->r.rCurrentTSO;
 
+    // We have run some Haskell code: there might be blackhole-blocked
+    // threads to wake up now.
+    // Lock-free test here should be ok, we're just setting a flag.
+    if ( blackhole_queue != END_TSO_QUEUE ) {
+       blackholes_need_checking = rtsTrue;
+    }
+    
     // And save the current errno in this thread.
+    // XXX: possibly bogus for SMP because this thread might already
+    // be running again, see code below.
     t->saved_errno = errno;
 
+#ifdef SMP
+    // If ret is ThreadBlocked, and this Task is bound to the TSO that
+    // blocked, we are in limbo - the TSO is now owned by whatever it
+    // is blocked on, and may in fact already have been woken up,
+    // perhaps even on a different Capability.  It may be the case
+    // that task->cap != cap.  We better yield this Capability
+    // immediately and return to normaility.
+    if (ret == ThreadBlocked) {
+       IF_DEBUG(scheduler,
+                sched_belch("--<< thread %d (%s) stopped: blocked\n",
+                            t->id, whatNext_strs[t->what_next]));
+       continue;
+    }
+#endif
+
+    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
     // ----------------------------------------------------------------------
     
     // Costs for the scheduler are assigned to CCS_SYSTEM
@@ -615,13 +635,6 @@ run_thread:
     CCCS = CCS_SYSTEM;
 #endif
     
-    // We have run some Haskell code: there might be blackhole-blocked
-    // threads to wake up now.
-    // Lock-free test here should be ok, we're just setting a flag.
-    if ( blackhole_queue != END_TSO_QUEUE ) {
-       blackholes_need_checking = rtsTrue;
-    }
-    
 #if defined(THREADED_RTS)
     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
@@ -654,6 +667,7 @@ run_thread:
 
     case ThreadFinished:
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+       ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
        break;
 
     default:
@@ -694,26 +708,139 @@ schedulePreLoop(void)
 #endif
 }
 
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+#ifdef SMP
+static void
+schedulePushWork(Capability *cap USED_IF_SMP, 
+                Task *task      USED_IF_SMP)
+{
+    Capability *free_caps[n_capabilities], *cap0;
+    nat i, n_free_caps;
+
+    // Check whether we have more threads on our run queue, or sparks
+    // in our pool, that we could hand to another Capability.
+    if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
+       && sparkPoolSizeCap(cap) < 2) {
+       return;
+    }
+
+    // First grab as many free Capabilities as we can.
+    for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+       cap0 = &capabilities[i];
+       if (cap != cap0 && tryGrabCapability(cap0,task)) {
+           if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+               // it already has some work, we just grabbed it at 
+               // the wrong moment.  Or maybe it's deadlocked!
+               releaseCapability(cap0);
+           } else {
+               free_caps[n_free_caps++] = cap0;
+           }
+       }
+    }
+
+    // we now have n_free_caps free capabilities stashed in
+    // free_caps[].  Share our run queue equally with them.  This is
+    // probably the simplest thing we could do; improvements we might
+    // want to do include:
+    //
+    //   - giving high priority to moving relatively new threads, on 
+    //     the gournds that they haven't had time to build up a
+    //     working set in the cache on this CPU/Capability.
+    //
+    //   - giving low priority to moving long-lived threads
+
+    if (n_free_caps > 0) {
+       StgTSO *prev, *t, *next;
+       rtsBool pushed_to_all;
+
+       IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+       i = 0;
+       pushed_to_all = rtsFalse;
+
+       if (cap->run_queue_hd != END_TSO_QUEUE) {
+           prev = cap->run_queue_hd;
+           t = prev->link;
+           prev->link = END_TSO_QUEUE;
+           for (; t != END_TSO_QUEUE; t = next) {
+               next = t->link;
+               t->link = END_TSO_QUEUE;
+               if (t->what_next == ThreadRelocated
+                   || t->bound == task) { // don't move my bound thread
+                   prev->link = t;
+                   prev = t;
+               } else if (i == n_free_caps) {
+                   pushed_to_all = rtsTrue;
+                   i = 0;
+                   // keep one for us
+                   prev->link = t;
+                   prev = t;
+               } else {
+                   IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
+                   appendToRunQueue(free_caps[i],t);
+                   if (t->bound) { t->bound->cap = free_caps[i]; }
+                   i++;
+               }
+           }
+           cap->run_queue_tl = prev;
+       }
+
+       // If there are some free capabilities that we didn't push any
+       // threads to, then try to push a spark to each one.
+       if (!pushed_to_all) {
+           StgClosure *spark;
+           // i is the next free capability to push to
+           for (; i < n_free_caps; i++) {
+               if (emptySparkPoolCap(free_caps[i])) {
+                   spark = findSpark(cap);
+                   if (spark != NULL) {
+                       IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
+                       newSpark(&(free_caps[i]->r), spark);
+                   }
+               }
+           }
+       }
+
+       // release the capabilities
+       for (i = 0; i < n_free_caps; i++) {
+           task->cap = free_caps[i];
+           releaseCapability(free_caps[i]);
+       }
+    }
+    task->cap = cap; // reset to point to our Capability.
+}
+#endif
+
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
  * ------------------------------------------------------------------------- */
 
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
 static void
-scheduleStartSignalHandlers(void)
+scheduleStartSignalHandlers(Capability *cap)
 {
-#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
     if (signals_pending()) { // safe outside the lock
-       startSignalHandlers();
+       startSignalHandlers(cap);
     }
-#endif
 }
+#else
+static void
+scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
+{
+}
+#endif
 
 /* ----------------------------------------------------------------------------
  * Check for blocked threads that can be woken up.
  * ------------------------------------------------------------------------- */
 
 static void
-scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
+scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
 {
 #if !defined(THREADED_RTS)
     //
@@ -789,7 +916,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
        
        if ( !emptyRunQueue(cap) ) return;
 
-#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
        /* If we have user-installed signal handlers, then wait
         * for signals to arrive rather then bombing out with a
         * deadlock.
@@ -801,7 +928,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
            awaitUserSignals();
 
            if (signals_pending()) {
-               startSignalHandlers();
+               startSignalHandlers(cap);
            }
 
            // either we have threads to run, or we were interrupted:
@@ -1451,11 +1578,10 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
        /* enlarge the stack */
        StgTSO *new_t = threadStackOverflow(cap, t);
        
-       /* This TSO has moved, so update any pointers to it from the
-        * main thread stack.  It better not be on any other queues...
-        * (it shouldn't be).
+       /* The TSO attached to this Task may have moved, so update the
+        * pointer to it.
         */
-       if (task->tso != NULL) {
+       if (task->tso == t) {
            task->tso = new_t;
        }
        pushOnRunQueue(cap,new_t);
@@ -1742,7 +1868,7 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
  * -------------------------------------------------------------------------- */
 
 static void
-scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
+scheduleDoGC( Capability *cap, Task *task USED_IF_SMP, rtsBool force_major )
 {
     StgTSO *t;
 #ifdef SMP
@@ -1764,7 +1890,13 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
     //
        
     was_waiting = cas(&waiting_for_gc, 0, 1);
-    if (was_waiting) return;
+    if (was_waiting) {
+       do {
+           IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+           yieldCapability(&cap,task);
+       } while (waiting_for_gc);
+       return;
+    }
 
     for (i=0; i < n_capabilities; i++) {
        IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
@@ -1776,6 +1908,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
            // all the Capabilities, but even so it's a slightly
            // unsavoury invariant.
            task->cap = pcap;
+           context_switch = 1;
            waitForReturnCapability(&pcap, task);
            if (pcap != &capabilities[i]) {
                barf("scheduleDoGC: got the wrong capability");
@@ -1806,7 +1939,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
                        // ATOMICALLY_FRAME, aborting the (nested)
                        // transaction, and saving the stack of any
                        // partially-evaluated thunks on the heap.
-                       raiseAsync_(cap, t, NULL, rtsTrue);
+                       raiseAsync_(cap, t, NULL, rtsTrue, NULL);
                        
 #ifdef REG_R1
                        ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
@@ -1875,7 +2008,7 @@ rtsSupportsBoundThreads(void)
  * ------------------------------------------------------------------------- */
  
 StgBool
-isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
+isThreadBound(StgTSO* tso USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
   return (tso->bound != NULL);
@@ -1903,9 +2036,9 @@ forkProcess(HsStablePtr *entry
            )
 {
 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
+    Task *task;
     pid_t pid;
     StgTSO* t,*next;
-    Task *task;
     Capability *cap;
     
     IF_DEBUG(scheduler,sched_belch("forking!"));
@@ -1918,6 +2051,7 @@ forkProcess(HsStablePtr *entry
     if (pid) { // parent
        
        // just return the pid
+       rts_unlock(cap);
        return pid;
        
     } else { // child
@@ -1933,12 +2067,22 @@ forkProcess(HsStablePtr *entry
            deleteThreadImmediately(cap,t);
        }
        
-       // wipe the main thread list
-       while ((task = all_tasks) != NULL) {
-           all_tasks = task->all_link;
-           discardTask(task);
+       // wipe the task list
+       ACQUIRE_LOCK(&sched_mutex);
+       for (task = all_tasks; task != NULL; task=task->all_link) {
+           if (task != cap->running_task) discardTask(task);
        }
-       
+       RELEASE_LOCK(&sched_mutex);
+
+       cap->suspended_ccalling_tasks = NULL;
+
+#if defined(THREADED_RTS)
+       // wipe our spare workers list.
+       cap->spare_workers = NULL;
+       cap->returning_tasks_hd = NULL;
+       cap->returning_tasks_tl = NULL;
+#endif
+
        cap = rts_evalStableIO(cap, entry, NULL);  // run the action
        rts_checkSchedStatus("forkProcess",cap);
        
@@ -2038,7 +2182,7 @@ suspendThread (StgRegTable *reg)
   // XXX this might not be necessary --SDM
   tso->what_next = ThreadRunGHC;
 
-  threadPaused(tso);
+  threadPaused(cap,tso);
 
   if(tso->blocked_exceptions == NULL)  {
       tso->why_blocked = BlockedOnCCall;
@@ -2446,6 +2590,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
     cap = schedule(cap,task);
 
     ASSERT(task->stat != NoStatus);
+    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
     return cap;
@@ -2532,6 +2677,10 @@ initScheduler(void)
 
   initTaskManager();
 
+#if defined(SMP) || defined(PARALLEL_HASKELL)
+  initSparkPools();
+#endif
+
 #if defined(SMP)
   /*
    * Eagerly start one worker to run each Capability, except for
@@ -2551,10 +2700,6 @@ initScheduler(void)
   }
 #endif
 
-#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
-  initSparkPools();
-#endif
-
   RELEASE_LOCK(&sched_mutex);
 }
 
@@ -2644,7 +2789,7 @@ GetRoots( evac_fn evac )
 
     evac((StgClosure **)&blackhole_queue);
 
-#if defined(PARALLEL_HASKELL) || defined(GRAN)
+#if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
     markSparkQueue(evac);
 #endif
     
@@ -2751,7 +2896,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
 
-  IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
+  IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
 
   dest = (StgTSO *)allocate(new_tso_size);
   TICK_ALLOC_TSO(new_stack_size,0);
@@ -3479,15 +3624,22 @@ checkBlackHoles (Capability *cap)
 void
 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
 {
-    raiseAsync_(cap, tso, exception, rtsFalse);
+    raiseAsync_(cap, tso, exception, rtsFalse, NULL);
+}
+
+void
+suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+{
+    raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
 }
 
 static void
 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
-           rtsBool stop_at_atomically)
+           rtsBool stop_at_atomically, StgPtr stop_here)
 {
     StgRetInfoTable *info;
-    StgPtr sp;
+    StgPtr sp, frame;
+    nat i;
   
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
@@ -3512,8 +3664,8 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
        sp[0] = (W_)&stg_dummy_ret_closure;
     }
 
-    while (1) {
-       nat i;
+    frame = sp + 1;
+    while (stop_here == NULL || frame < stop_here) {
 
        // 1. Let the top of the stack be the "current closure"
        //
@@ -3533,95 +3685,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
         // transaction
        
-       
-       StgPtr frame;
-       
-       frame = sp + 1;
        info = get_ret_itbl((StgClosure *)frame);
-       
-       while (info->i.type != UPDATE_FRAME
-              && (info->i.type != CATCH_FRAME || exception == NULL)
-              && info->i.type != STOP_FRAME
-              && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
-       {
-            if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
-              // IF we find an ATOMICALLY_FRAME then we abort the
-              // current transaction and propagate the exception.  In
-              // this case (unlike ordinary exceptions) we do not care
-              // whether the transaction is valid or not because its
-              // possible validity cannot have caused the exception
-              // and will not be visible after the abort.
-              IF_DEBUG(stm,
-                       debugBelch("Found atomically block delivering async exception\n"));
-              stmAbortTransaction(tso -> trec);
-              tso -> trec = stmGetEnclosingTRec(tso -> trec);
-            }
-           frame += stack_frame_sizeW((StgClosure *)frame);
-           info = get_ret_itbl((StgClosure *)frame);
-       }
-       
+
        switch (info->i.type) {
-           
-       case ATOMICALLY_FRAME:
-           ASSERT(stop_at_atomically);
-           ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
-           stmCondemnTransaction(tso -> trec);
-#ifdef REG_R1
-           tso->sp = frame;
-#else
-           // R1 is not a register: the return convention for IO in
-           // this case puts the return value on the stack, so we
-           // need to set up the stack to return to the atomically
-           // frame properly...
-           tso->sp = frame - 2;
-           tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
-           tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
-           tso->what_next = ThreadRunGHC;
-           return;
 
-       case CATCH_FRAME:
-           // If we find a CATCH_FRAME, and we've got an exception to raise,
-           // then build the THUNK raise(exception), and leave it on
-           // top of the CATCH_FRAME ready to enter.
-           //
-       {
-#ifdef PROFILING
-           StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
-           StgThunk *raise;
-           
-           // we've got an exception to raise, so let's pass it to the
-           // handler in this frame.
-           //
-           raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
-           TICK_ALLOC_SE_THK(1,0);
-           SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
-           raise->payload[0] = exception;
-           
-           // throw away the stack from Sp up to the CATCH_FRAME.
-           //
-           sp = frame - 1;
-           
-           /* Ensure that async excpetions are blocked now, so we don't get
-            * a surprise exception before we get around to executing the
-            * handler.
-            */
-           if (tso->blocked_exceptions == NULL) {
-               tso->blocked_exceptions = END_TSO_QUEUE;
-           }
-           
-           /* Put the newly-built THUNK on top of the stack, ready to execute
-            * when the thread restarts.
-            */
-           sp[0] = (W_)raise;
-           sp[-1] = (W_)&stg_enter_info;
-           tso->sp = sp-1;
-           tso->what_next = ThreadRunGHC;
-           IF_DEBUG(sanity, checkTSO(tso));
-           return;
-       }
-       
        case UPDATE_FRAME:
        {
            StgAP_STACK * ap;
@@ -3652,9 +3719,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
                     printObj((StgClosure *)ap);
                );
 
-           // Replace the updatee with an indirection - happily
-           // this will also wake up any threads currently
-           // waiting on the result.
+           // Replace the updatee with an indirection
            //
            // Warning: if we're in a loop, more than one update frame on
            // the stack may point to the same object.  Be careful not to
@@ -3671,21 +3736,106 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
            }
            sp += sizeofW(StgUpdateFrame) - 1;
            sp[0] = (W_)ap; // push onto stack
-           break;
+           frame = sp + 1;
+           continue; //no need to bump frame
        }
-       
+
        case STOP_FRAME:
            // We've stripped the entire stack, the thread is now dead.
-           sp += sizeofW(StgStopFrame);
            tso->what_next = ThreadKilled;
-           tso->sp = sp;
+           tso->sp = frame + sizeofW(StgStopFrame);
            return;
+
+       case CATCH_FRAME:
+           // If we find a CATCH_FRAME, and we've got an exception to raise,
+           // then build the THUNK raise(exception), and leave it on
+           // top of the CATCH_FRAME ready to enter.
+           //
+       {
+#ifdef PROFILING
+           StgCatchFrame *cf = (StgCatchFrame *)frame;
+#endif
+           StgThunk *raise;
+           
+           if (exception == NULL) break;
+
+           // we've got an exception to raise, so let's pass it to the
+           // handler in this frame.
+           //
+           raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
+           TICK_ALLOC_SE_THK(1,0);
+           SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
+           raise->payload[0] = exception;
+           
+           // throw away the stack from Sp up to the CATCH_FRAME.
+           //
+           sp = frame - 1;
+           
+           /* Ensure that async excpetions are blocked now, so we don't get
+            * a surprise exception before we get around to executing the
+            * handler.
+            */
+           if (tso->blocked_exceptions == NULL) {
+               tso->blocked_exceptions = END_TSO_QUEUE;
+           }
+
+           /* Put the newly-built THUNK on top of the stack, ready to execute
+            * when the thread restarts.
+            */
+           sp[0] = (W_)raise;
+           sp[-1] = (W_)&stg_enter_info;
+           tso->sp = sp-1;
+           tso->what_next = ThreadRunGHC;
+           IF_DEBUG(sanity, checkTSO(tso));
+           return;
+       }
+           
+       case ATOMICALLY_FRAME:
+           if (stop_at_atomically) {
+               ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+               stmCondemnTransaction(cap, tso -> trec);
+#ifdef REG_R1
+               tso->sp = frame;
+#else
+               // R1 is not a register: the return convention for IO in
+               // this case puts the return value on the stack, so we
+               // need to set up the stack to return to the atomically
+               // frame properly...
+               tso->sp = frame - 2;
+               tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+               tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+               tso->what_next = ThreadRunGHC;
+               return;
+           }
+           // Not stop_at_atomically... fall through and abort the
+           // transaction.
+           
+       case CATCH_RETRY_FRAME:
+           // IF we find an ATOMICALLY_FRAME then we abort the
+           // current transaction and propagate the exception.  In
+           // this case (unlike ordinary exceptions) we do not care
+           // whether the transaction is valid or not because its
+           // possible validity cannot have caused the exception
+           // and will not be visible after the abort.
+           IF_DEBUG(stm,
+                    debugBelch("Found atomically block delivering async exception\n"));
+            StgTRecHeader *trec = tso -> trec;
+            StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+            stmAbortTransaction(cap, trec);
+            tso -> trec = outer;
+           break;
            
        default:
-           barf("raiseAsync");
+           break;
        }
+
+       // move on to the next stack frame
+       frame += stack_frame_sizeW((StgClosure *)frame);
     }
-    barf("raiseAsync");
+
+    // if we got here, then we stopped at stop_here
+    ASSERT(stop_here != NULL);
 }
 
 /* -----------------------------------------------------------------------------
@@ -3973,25 +4123,37 @@ printThreadBlockage(StgTSO *tso)
   }
 }
 
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
 {
-  switch (tso->what_next) {
-  case ThreadKilled:
-    debugBelch("has been killed");
-    break;
-  case ThreadComplete:
-    debugBelch("has completed");
-    break;
-  default:
-    printThreadBlockage(tso);
-  }
+    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+    {
+      void *label = lookupThreadLabel(t->id);
+      if (label) debugBelch("[\"%s\"] ",(char *)label);
+    }
+    if (t->what_next == ThreadRelocated) {
+       debugBelch("has been relocated...\n");
+    } else {
+       switch (t->what_next) {
+       case ThreadKilled:
+           debugBelch("has been killed");
+           break;
+       case ThreadComplete:
+           debugBelch("has completed");
+           break;
+       default:
+           printThreadBlockage(t);
+       }
+       debugBelch("\n");
+    }
 }
 
 void
 printAllThreads(void)
 {
-  StgTSO *t;
+  StgTSO *t, *next;
+  nat i;
+  Capability *cap;
 
 # if defined(GRAN)
   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
@@ -4009,20 +4171,24 @@ printAllThreads(void)
   debugBelch("all threads:\n");
 # endif
 
-  for (t = all_threads; t != END_TSO_QUEUE; ) {
-    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
-    {
-      void *label = lookupThreadLabel(t->id);
-      if (label) debugBelch("[\"%s\"] ",(char *)label);
-    }
-    if (t->what_next == ThreadRelocated) {
-       debugBelch("has been relocated...\n");
-       t = t->link;
-    } else {
-       printThreadStatus(t);
-       debugBelch("\n");
-       t = t->global_link;
-    }
+  for (i = 0; i < n_capabilities; i++) {
+      cap = &capabilities[i];
+      debugBelch("threads on capability %d:\n", cap->no);
+      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+         printThreadStatus(t);
+      }
+  }
+
+  debugBelch("other threads:\n");
+  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      if (t->why_blocked != NotBlocked) {
+         printThreadStatus(t);
+      }
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+      }
   }
 }
 
@@ -4032,13 +4198,7 @@ printThreadQueue(StgTSO *t)
 {
     nat i = 0;
     for (; t != END_TSO_QUEUE; t = t->link) {
-       debugBelch("\tthread %d @ %p ", t->id, (void *)t);
-       if (t->what_next == ThreadRelocated) {
-           debugBelch("has been relocated...\n");
-       } else {
-           printThreadStatus(t);
-           debugBelch("\n");
-       }
+       printThreadStatus(t);
        i++;
     }
     debugBelch("%d threads on queue\n", i);