Use message-passing to implement throwTo in the RTS
[ghc-hetmet.git] / rts / Schedule.c
index 3ae1fe0..70e0246 100644 (file)
@@ -17,7 +17,7 @@
 #include "Interpreter.h"
 #include "Printer.h"
 #include "RtsSignals.h"
-#include "Sanity.h"
+#include "sm/Sanity.h"
 #include "Stats.h"
 #include "STM.h"
 #include "Prelude.h"
@@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
+static void scheduleProcessInbox(Capability *cap);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 static void schedulePushWork(Capability *cap, Task *task);
@@ -162,7 +162,7 @@ static Capability *scheduleDoGC(Capability *cap, Task *task,
 static rtsBool checkBlackHoles(Capability *cap);
 
 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
-static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
+static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
 
 static void deleteThread (Capability *cap, StgTSO *tso);
 static void deleteAllThreads (Capability *cap);
@@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task)
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
-       if (task->tso == NULL && emptyRunQueue(cap)) {
+       if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
@@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task)
     // Check whether we can run this thread in the current task.
     // If not, we have to pass our capability to the right task.
     {
-       Task *bound = t->bound;
+        InCall *bound = t->bound;
       
        if (bound) {
-           if (bound == task) {
+           if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
@@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task)
            }
        } else {
            // The thread we want to run is unbound.
-           if (task->tso) { 
+           if (task->incall->tso) { 
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
                            (unsigned long)t->id);
@@ -441,7 +441,7 @@ run_thread:
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
     ASSERT(t->cap == cap);
-    ASSERT(t->bound ? t->bound->cap == cap : 1);
+    ASSERT(t->bound ? t->bound->task->cap == cap : 1);
 
     prev_what_next = t->what_next;
 
@@ -463,12 +463,16 @@ run_thread:
         if (prev == ACTIVITY_DONE_GC) {
             startTimer();
         }
-    } else {
+    } else if (recent_activity != ACTIVITY_INACTIVE) {
+        // If we reached ACTIVITY_INACTIVE, then don't reset it until
+        // we've done the GC.  The thread running here might just be
+        // the IO manager thread that handle_tick() woke up via
+        // wakeUpRts().
         recent_activity = ACTIVITY_YES;
     }
 #endif
 
-    traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0);
+    traceEventRunThread(cap, t);
 
     switch (prev_what_next) {
        
@@ -518,7 +522,7 @@ run_thread:
     t->saved_winerror = GetLastError();
 #endif
 
-    traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret);
+    traceEventStopThread(cap, t, ret);
 
 #if defined(THREADED_RTS)
     // If ret is ThreadBlocked, and this Task is bound to the TSO that
@@ -547,7 +551,7 @@ run_thread:
     schedulePostRunThread(cap,t);
 
     if (ret != StackOverflow) {
-        t = threadStackUnderflow(task,t);
+        t = threadStackUnderflow(cap,task,t);
     }
 
     ready_to_gc = rtsFalse;
@@ -614,7 +618,7 @@ scheduleFindWork (Capability *cap)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
-    scheduleCheckWakeupThreads(cap);
+    scheduleProcessInbox(cap);
 
     scheduleCheckBlockedThreads(cap);
 
@@ -635,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task)
     //     and this task it bound).
     return (waiting_for_gc || 
             cap->returning_tasks_hd != NULL ||
-            (!emptyRunQueue(cap) && (task->tso == NULL
+            (!emptyRunQueue(cap) && (task->incall->tso == NULL
                                      ? cap->run_queue_hd->bound != NULL
-                                     : cap->run_queue_hd->bound != task)));
+                                     : cap->run_queue_hd->bound != task->incall)));
 }
 
 // This is the single place where a Task goes to sleep.  There are
@@ -669,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
     if (!force_yield &&
         !shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
-         !emptyWakeupQueue(cap) ||
+         !emptyInbox(cap) ||
          blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
@@ -705,7 +709,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
-    // migration can be turned off with +RTS -qg
+    // migration can be turned off with +RTS -qm
     if (!RtsFlags.ParFlags.migrate) return;
 
     // Check whether we have more threads on our run queue, or sparks
@@ -721,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
        cap0 = &capabilities[i];
        if (cap != cap0 && tryGrabCapability(cap0,task)) {
-           if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+           if (!emptyRunQueue(cap0)
+                || cap->returning_tasks_hd != NULL
+                || cap->inbox != (Message*)END_TSO_QUEUE) {
                // it already has some work, we just grabbed it at 
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
@@ -764,7 +770,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                next = t->_link;
                t->_link = END_TSO_QUEUE;
                if (t->what_next == ThreadRelocated
-                   || t->bound == task // don't move my bound thread
+                   || t->bound == task->incall // don't move my bound thread
                    || tsoLocked(t)) {  // don't move a locked thread
                    setTSOLink(cap, prev, t);
                    prev = t;
@@ -775,12 +781,11 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    setTSOLink(cap, prev, t);
                    prev = t;
                } else {
-                   debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
                    appendToRunQueue(free_caps[i],t);
 
-                    traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no);
+                    traceEventMigrateThread (cap, t, free_caps[i]->no);
 
-                   if (t->bound) { t->bound->cap = free_caps[i]; }
+                   if (t->bound) { t->bound->task->cap = free_caps[i]; }
                    t->cap = free_caps[i];
                    i++;
                }
@@ -802,7 +807,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                    if (spark != NULL) {
                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
 
-      traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no);
+            traceEventStealSpark(free_caps[i], t, cap->no);
 
                        newSpark(&(free_caps[i]->r), spark);
                    }
@@ -868,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
  * Check for threads woken up by other Capabilities
  * ------------------------------------------------------------------------- */
 
+#if defined(THREADED_RTS)
 static void
-scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+executeMessage (Capability *cap, Message *m)
+{
+    const StgInfoTable *i;
+
+loop:
+    write_barrier(); // allow m->header to be modified by another thread
+    i = m->header.info;
+    if (i == &stg_MSG_WAKEUP_info)
+    {
+        MessageWakeup *w = (MessageWakeup *)m;
+        StgTSO *tso = w->tso;
+        debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
+                      (lnat)tso->id);
+        ASSERT(tso->cap == cap);
+        ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+        ASSERT(tso->block_info.closure == (StgClosure *)m);
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap, tso);
+    }
+    else if (i == &stg_MSG_THROWTO_info)
+    {
+        MessageThrowTo *t = (MessageThrowTo *)m;
+        nat r;
+        const StgInfoTable *i;
+
+        i = lockClosure((StgClosure*)m);
+        if (i != &stg_MSG_THROWTO_info) {
+            unlockClosure((StgClosure*)m, i);
+            goto loop;
+        }
+
+        debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
+                      (lnat)t->source->id, (lnat)t->target->id);
+
+        ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+        ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+        r = throwToMsg(cap, t);
+
+        switch (r) {
+        case THROWTO_SUCCESS:
+            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+            t->source->sp += 3;
+            unblockOne(cap, t->source);
+            // this message is done
+            unlockClosure((StgClosure*)m, &stg_IND_info);
+            break;
+        case THROWTO_BLOCKED:
+            // unlock the message
+            unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+            break;
+        }
+    }
+    else if (i == &stg_IND_info)
+    {
+        // message was revoked
+        return;
+    }
+    else if (i == &stg_WHITEHOLE_info)
+    {
+        goto loop;
+    }
+    else
+    {
+        barf("executeMessage: %p", i);
+    }
+}
+#endif
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
-    // Any threads that were woken up by other Capabilities get
-    // appended to our run queue.
-    if (!emptyWakeupQueue(cap)) {
-       ACQUIRE_LOCK(&cap->lock);
-       if (emptyRunQueue(cap)) {
-           cap->run_queue_hd = cap->wakeup_queue_hd;
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       } else {
-           setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       }
-       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
-       RELEASE_LOCK(&cap->lock);
+    Message *m;
+
+    while (!emptyInbox(cap)) {
+        ACQUIRE_LOCK(&cap->lock);
+        m = cap->inbox;
+        cap->inbox = m->link;
+        RELEASE_LOCK(&cap->lock);
+        executeMessage(cap, (Message *)m);
     }
 #endif
 }
@@ -976,13 +1047,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception.
         */
-       if (task->tso) {
-           switch (task->tso->why_blocked) {
+       if (task->incall->tso) {
+           switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
-           case BlockedOnException:
+           case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
-               throwToSingleThreaded(cap, task->tso, 
+               throwToSingleThreaded(cap, task->incall->tso, 
                                      (StgClosure *)nonTermination_closure);
                return;
            default:
@@ -1118,7 +1189,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
            { 
                bdescr *x;
                for (x = bd; x < bd + blocks; x++) {
-                    initBdescr(x,cap->r.rNursery);
+                    initBdescr(x,g0,g0);
                     x->free = x->start;
                    x->flags = 0;
                }
@@ -1171,8 +1242,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
        /* The TSO attached to this Task may have moved, so update the
         * pointer to it.
         */
-       if (task->tso == t) {
-           task->tso = new_t;
+       if (task->incall->tso == t) {
+           task->incall->tso = new_t;
        }
        pushOnRunQueue(cap,new_t);
     }
@@ -1185,38 +1256,35 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
 static rtsBool
 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
 {
-    // Reset the context switch flag.  We don't do this just before
-    // running the thread, because that would mean we would lose ticks
-    // during GC, which can lead to unfair scheduling (a thread hogs
-    // the CPU because the tick always arrives during GC).  This way
-    // penalises threads that do a lot of allocation, but that seems
-    // better than the alternative.
-    cap->context_switch = 0;
-    
     /* put the thread back on the run queue.  Then, if we're ready to
      * GC, check whether this is the last task to stop.  If so, wake
      * up the GC thread.  getThread will block during a GC until the
      * GC is finished.
      */
-#ifdef DEBUG
-    if (t->what_next != prev_what_next) {
+
+    ASSERT(t->_link == END_TSO_QUEUE);
+    
+    // Shortcut if we're just switching evaluators: don't bother
+    // doing stack squeezing (which can be expensive), just run the
+    // thread.
+    if (cap->context_switch == 0 && t->what_next != prev_what_next) {
        debugTrace(DEBUG_sched,
                   "--<< thread %ld (%s) stopped to switch evaluators", 
                   (long)t->id, what_next_strs[t->what_next]);
+       return rtsTrue;
     }
-#endif
+
+    // Reset the context switch flag.  We don't do this just before
+    // running the thread, because that would mean we would lose ticks
+    // during GC, which can lead to unfair scheduling (a thread hogs
+    // the CPU because the tick always arrives during GC).  This way
+    // penalises threads that do a lot of allocation, but that seems
+    // better than the alternative.
+    cap->context_switch = 0;
     
     IF_DEBUG(sanity,
             //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
             checkTSO(t));
-    ASSERT(t->_link == END_TSO_QUEUE);
-    
-    // Shortcut if we're just switching evaluators: don't bother
-    // doing stack squeezing (which can be expensive), just run the
-    // thread.
-    if (t->what_next != prev_what_next) {
-       return rtsTrue;
-    }
 
     addToRunQueue(cap,t);
 
@@ -1268,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
      */
 
     // blocked exceptions can now complete, even if the thread was in
-    // blocked mode (see #2910).  This unconditionally calls
-    // lockTSO(), which ensures that we don't miss any threads that
-    // are engaged in throwTo() with this thread as a target.
+    // blocked mode (see #2910).
     awakenBlockedExceptionQueue (cap, t);
 
       //
@@ -1285,7 +1351,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
 
       if (t->bound) {
 
-         if (t->bound != task) {
+         if (t->bound != task->incall) {
 #if !defined(THREADED_RTS)
              // Must be a bound thread that is not the topmost one.  Leave
              // it on the run queue until the stack has unwound to the
@@ -1302,12 +1368,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
 #endif
          }
 
-         ASSERT(task->tso == t);
+         ASSERT(task->incall->tso == t);
 
          if (t->what_next == ThreadComplete) {
              if (task->ret) {
                  // NOTE: return val is tso->sp[1] (see StgStartup.hc)
-                 *(task->ret) = (StgClosure *)task->tso->sp[1]; 
+                 *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; 
              }
              task->stat = Success;
          } else {
@@ -1325,8 +1391,19 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
              }
          }
 #ifdef DEBUG
-         removeThreadLabel((StgWord)task->tso->id);
+         removeThreadLabel((StgWord)task->incall->tso->id);
 #endif
+
+          // We no longer consider this thread and task to be bound to
+          // each other.  The TSO lives on until it is GC'd, but the
+          // task is about to be released by the caller, and we don't
+          // want anyone following the pointer from the TSO to the
+          // defunct task (which might have already been
+          // re-used). This was a real bug: the GC updated
+          // tso->bound->tso which lead to a deadlock.
+          t->bound = NULL;
+          task->incall->tso = NULL;
+
          return rtsTrue; // tells schedule() to return
       }
 
@@ -1377,7 +1454,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     if (sched_state < SCHED_INTERRUPTING
         && RtsFlags.ParFlags.parGcEnabled
         && N >= RtsFlags.ParFlags.parGcGen
-        && ! oldest_gen->steps[0].mark)
+        && ! oldest_gen->mark)
     {
         gc_type = PENDING_GC_PAR;
     } else {
@@ -1417,11 +1494,11 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     
     if (gc_type == PENDING_GC_SEQ)
     {
-        traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+        traceEventRequestSeqGc(cap);
     }
     else
     {
-        traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
+        traceEventRequestParGc(cap);
         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
     }
 
@@ -1477,8 +1554,8 @@ delete_threads_and_gc:
     
     heap_census = scheduleNeedHeapProfile(rtsTrue);
 
+    traceEventGcStart(cap);
 #if defined(THREADED_RTS)
-    traceSchedEvent(cap, EVENT_GC_START, 0, 0);
     // reset waiting_for_gc *before* GC, so that when the GC threads
     // emerge they don't immediately re-enter the GC.
     waiting_for_gc = 0;
@@ -1486,7 +1563,7 @@ delete_threads_and_gc:
 #else
     GarbageCollect(force_major || heap_census, 0, cap);
 #endif
-    traceSchedEvent(cap, EVENT_GC_END, 0, 0);
+    traceEventGcEnd(cap);
 
     if (recent_activity == ACTIVITY_INACTIVE && force_major)
     {
@@ -1575,11 +1652,10 @@ forkProcess(HsStablePtr *entry
            )
 {
 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
-    Task *task;
     pid_t pid;
     StgTSO* t,*next;
     Capability *cap;
-    nat s;
+    nat g;
     
 #if defined(THREADED_RTS)
     if (RtsFlags.ParFlags.nNodes > 1) {
@@ -1627,8 +1703,8 @@ forkProcess(HsStablePtr *entry
        // all Tasks, because they correspond to OS threads that are
        // now gone.
 
-        for (s = 0; s < total_steps; s++) {
-          for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
+        for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+          for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
            if (t->what_next == ThreadRelocated) {
                next = t->_link;
            } else {
@@ -1650,25 +1726,15 @@ forkProcess(HsStablePtr *entry
 
        // Any suspended C-calling Tasks are no more, their OS threads
        // don't exist now:
-       cap->suspended_ccalling_tasks = NULL;
+       cap->suspended_ccalls = NULL;
 
        // Empty the threads lists.  Otherwise, the garbage
        // collector may attempt to resurrect some of these threads.
-        for (s = 0; s < total_steps; s++) {
-            all_steps[s].threads = END_TSO_QUEUE;
+        for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+            generations[g].threads = END_TSO_QUEUE;
         }
 
-       // Wipe the task list, except the current Task.
-       ACQUIRE_LOCK(&sched_mutex);
-       for (task = all_tasks; task != NULL; task=task->all_link) {
-           if (task != cap->running_task) {
-#if defined(THREADED_RTS)
-                initMutex(&task->lock); // see #1391
-#endif
-               discardTask(task);
-           }
-       }
-       RELEASE_LOCK(&sched_mutex);
+        discardTasksExcept(cap->running_task);
 
 #if defined(THREADED_RTS)
        // Wipe our spare workers list, they no longer exist.  New
@@ -1709,19 +1775,19 @@ deleteAllThreads ( Capability *cap )
     // NOTE: only safe to call if we own all capabilities.
 
     StgTSO* t, *next;
-    nat s;
+    nat g;
 
     debugTrace(DEBUG_sched,"deleting all threads");
-    for (s = 0; s < total_steps; s++) {
-      for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
-       if (t->what_next == ThreadRelocated) {
-           next = t->_link;
-       } else {
-           next = t->global_link;
-           deleteThread(cap,t);
-       }
-      }
-    }      
+    for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+        for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
+            if (t->what_next == ThreadRelocated) {
+                next = t->_link;
+            } else {
+                next = t->global_link;
+                deleteThread(cap,t);
+            }
+        }
+    }
 
     // The run queue now contains a bunch of ThreadKilled threads.  We
     // must not throw these away: the main thread(s) will be in there
@@ -1736,35 +1802,41 @@ deleteAllThreads ( Capability *cap )
 }
 
 /* -----------------------------------------------------------------------------
-   Managing the suspended_ccalling_tasks list.
+   Managing the suspended_ccalls list.
    Locks required: sched_mutex
    -------------------------------------------------------------------------- */
 
 STATIC_INLINE void
 suspendTask (Capability *cap, Task *task)
 {
-    ASSERT(task->next == NULL && task->prev == NULL);
-    task->next = cap->suspended_ccalling_tasks;
-    task->prev = NULL;
-    if (cap->suspended_ccalling_tasks) {
-       cap->suspended_ccalling_tasks->prev = task;
-    }
-    cap->suspended_ccalling_tasks = task;
+    InCall *incall;
+    
+    incall = task->incall;
+    ASSERT(incall->next == NULL && incall->prev == NULL);
+    incall->next = cap->suspended_ccalls;
+    incall->prev = NULL;
+    if (cap->suspended_ccalls) {
+       cap->suspended_ccalls->prev = incall;
+    }
+    cap->suspended_ccalls = incall;
 }
 
 STATIC_INLINE void
 recoverSuspendedTask (Capability *cap, Task *task)
 {
-    if (task->prev) {
-       task->prev->next = task->next;
+    InCall *incall;
+
+    incall = task->incall;
+    if (incall->prev) {
+       incall->prev->next = incall->next;
     } else {
-       ASSERT(cap->suspended_ccalling_tasks == task);
-       cap->suspended_ccalling_tasks = task->next;
+       ASSERT(cap->suspended_ccalls == incall);
+       cap->suspended_ccalls = incall->next;
     }
-    if (task->next) {
-       task->next->prev = task->prev;
+    if (incall->next) {
+       incall->next->prev = incall->prev;
     }
-    task->next = task->prev = NULL;
+    incall->next = incall->prev = NULL;
 }
 
 /* ---------------------------------------------------------------------------
@@ -1805,7 +1877,7 @@ suspendThread (StgRegTable *reg)
   task = cap->running_task;
   tso = cap->r.rCurrentTSO;
 
-  traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL);
+  traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL);
 
   // XXX this might not be necessary --SDM
   tso->what_next = ThreadRunGHC;
@@ -1821,7 +1893,8 @@ suspendThread (StgRegTable *reg)
   }
 
   // Hand back capability
-  task->suspended_tso = tso;
+  task->incall->suspended_tso = tso;
+  task->incall->suspended_cap = cap;
 
   ACQUIRE_LOCK(&cap->lock);
 
@@ -1842,6 +1915,7 @@ StgRegTable *
 resumeThread (void *task_)
 {
     StgTSO *tso;
+    InCall *incall;
     Capability *cap;
     Task *task = task_;
     int saved_errno;
@@ -1854,25 +1928,29 @@ resumeThread (void *task_)
     saved_winerror = GetLastError();
 #endif
 
-    cap = task->cap;
+    incall = task->incall;
+    cap = incall->suspended_cap;
+    task->cap = cap;
+
     // Wait for permission to re-enter the RTS with the result.
     waitForReturnCapability(&cap,task);
     // we might be on a different capability now... but if so, our
-    // entry on the suspended_ccalling_tasks list will also have been
+    // entry on the suspended_ccalls list will also have been
     // migrated.
 
     // Remove the thread from the suspended list
     recoverSuspendedTask(cap,task);
 
-    tso = task->suspended_tso;
-    task->suspended_tso = NULL;
+    tso = incall->suspended_tso;
+    incall->suspended_tso = NULL;
+    incall->suspended_cap = NULL;
     tso->_link = END_TSO_QUEUE; // no write barrier reqd
 
-    traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next);
+    traceEventRunThread(cap, tso);
     
     if (tso->why_blocked == BlockedOnCCall) {
         // avoid locking the TSO if we don't have to
-        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
         }
        tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
@@ -1924,7 +2002,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
-        traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no);
+        traceEventMigrateThread (cap, tso, capabilities[cpu].no);
        wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
     }
 #else
@@ -1936,29 +2014,31 @@ Capability *
 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
 {
     Task *task;
+    StgThreadID id;
 
     // We already created/initialised the Task
     task = cap->running_task;
 
     // This TSO is now a bound thread; make the Task and TSO
     // point to each other.
-    tso->bound = task;
+    tso->bound = task->incall;
     tso->cap = cap;
 
-    task->tso = tso;
+    task->incall->tso = tso;
     task->ret = ret;
     task->stat = NoStatus;
 
     appendToRunQueue(cap,tso);
 
-    debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
+    id = tso->id;
+    debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
 
     cap = schedule(cap,task);
 
     ASSERT(task->stat != NoStatus);
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
 
-    debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
+    debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
     return cap;
 }
 
@@ -1967,23 +2047,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
  * ------------------------------------------------------------------------- */
 
 #if defined(THREADED_RTS)
-void OSThreadProcAttr
-workerStart(Task *task)
+void scheduleWorker (Capability *cap, Task *task)
 {
-    Capability *cap;
-
-    // See startWorkerTask().
-    ACQUIRE_LOCK(&task->lock);
-    cap = task->cap;
-    RELEASE_LOCK(&task->lock);
-
-    if (RtsFlags.ParFlags.setAffinity) {
-        setThreadAffinity(cap->no, n_capabilities);
-    }
-
-    // set the thread-local pointer to the Task:
-    taskEnter(task);
-
     // schedule() runs without a lock.
     cap = schedule(cap,task);
 
@@ -2049,6 +2114,8 @@ initScheduler(void)
   initSparkPools();
 #endif
 
+  RELEASE_LOCK(&sched_mutex);
+
 #if defined(THREADED_RTS)
   /*
    * Eagerly start one worker to run each Capability, except for
@@ -2062,13 +2129,11 @@ initScheduler(void)
       for (i = 1; i < n_capabilities; i++) {
          cap = &capabilities[i];
          ACQUIRE_LOCK(&cap->lock);
-         startWorkerTask(cap, workerStart);
+         startWorkerTask(cap);
          RELEASE_LOCK(&cap->lock);
       }
   }
 #endif
-
-  RELEASE_LOCK(&sched_mutex);
 }
 
 void
@@ -2088,7 +2153,8 @@ exitScheduler(
     if (sched_state < SCHED_SHUTTING_DOWN) {
        sched_state = SCHED_INTERRUPTING;
         waitForReturnCapability(&task->cap,task);
-       scheduleDoGC(task->cap,task,rtsFalse);    
+       scheduleDoGC(task->cap,task,rtsFalse);
+        ASSERT(task->incall->tso == NULL);
         releaseCapability(task->cap);
     }
     sched_state = SCHED_SHUTTING_DOWN;
@@ -2098,6 +2164,7 @@ exitScheduler(
        nat i;
        
        for (i = 0; i < n_capabilities; i++) {
+            ASSERT(task->incall->tso == NULL);
            shutdownCapability(&capabilities[i], task, wait_foreign);
        }
     }
@@ -2146,7 +2213,7 @@ performGC_(rtsBool force_major)
 
     // We must grab a new Task here, because the existing Task may be
     // associated with a particular Capability, and chained onto the 
-    // suspended_ccalling_tasks queue.
+    // suspended_ccalls queue.
     task = newBoundTask();
 
     waitForReturnCapability(&task->cap,task);
@@ -2186,10 +2253,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
 
   IF_DEBUG(sanity,checkTSO(tso));
 
-  // don't allow throwTo() to modify the blocked_exceptions queue
-  // while we are moving the TSO:
-  lockClosure((StgClosure *)tso);
-
   if (tso->stack_size >= tso->max_stack_size
       && !(tso->flags & TSO_BLOCKEX)) {
       // NB. never raise a StackOverflow exception if the thread is
@@ -2221,7 +2284,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
                                                tso->sp+64)));
 
       // Send this thread the StackOverflow exception
-      unlockTSO(tso);
       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
       return tso;
   }
@@ -2237,7 +2299,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   // the stack anyway.
   if ((tso->flags & TSO_SQUEEZED) && 
       ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
-      unlockTSO(tso);
       return tso;
   }
 
@@ -2287,9 +2348,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   tso->sp = (P_)&(tso->stack[tso->stack_size]);
   tso->why_blocked = NotBlocked;
 
-  unlockTSO(dest);
-  unlockTSO(tso);
-
   IF_DEBUG(sanity,checkTSO(dest));
 #if 0
   IF_DEBUG(scheduler,printTSO(dest));
@@ -2299,7 +2357,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
 }
 
 static StgTSO *
-threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
+threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
 {
     bdescr *bd, *new_bd;
     lnat free_w, tso_size_w;
@@ -2322,10 +2380,6 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
         return tso;
     }
 
-    // don't allow throwTo() to modify the blocked_exceptions queue
-    // while we are moving the TSO:
-    lockClosure((StgClosure *)tso);
-
     // this is the number of words we'll free
     free_w = round_to_mblocks(tso_size_w/2);
 
@@ -2337,6 +2391,13 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
     new_tso->stack_size = new_bd->free - new_tso->stack;
 
+    // The original TSO was dirty and probably on the mutable
+    // list. The new TSO is not yet on the mutable list, so we better
+    // put it there.
+    new_tso->dirty = 0;
+    new_tso->flags &= ~TSO_LINK_DIRTY;
+    dirty_TSO(cap, new_tso);
+
     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
 
@@ -2345,13 +2406,10 @@ threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
 
     // The TSO attached to this Task may have moved, so update the
     // pointer to it.
-    if (task->tso == tso) {
-        task->tso = new_tso;
+    if (task->incall->tso == tso) {
+        task->incall->tso = new_tso;
     }
 
-    unlockTSO(new_tso);
-    unlockTSO(tso);
-
     IF_DEBUG(sanity,checkTSO(new_tso));
 
     return new_tso;
@@ -2537,7 +2595,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
                SET_HDR(raise_closure, &stg_raise_info, CCCS);
                raise_closure->payload[0] = exception;
            }
-           UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
+           UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
+                    (StgClosure *)raise_closure);
            p = next;
            continue;
 
@@ -2647,14 +2706,14 @@ resurrectThreads (StgTSO *threads)
 {
     StgTSO *tso, *next;
     Capability *cap;
-    step *step;
+    generation *gen;
 
     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
        next = tso->global_link;
 
-        step = Bdescr((P_)tso)->step;
-       tso->global_link = step->threads;
-       step->threads = tso;
+        gen = Bdescr((P_)tso)->gen;
+       tso->global_link = gen->threads;
+       gen->threads = tso;
 
        debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
        
@@ -2681,61 +2740,9 @@ resurrectThreads (StgTSO *threads)
             * can wake up threads, remember...).
             */
            continue;
-       case BlockedOnException:
-            // throwTo should never block indefinitely: if the target
-            // thread dies or completes, throwTo returns.
-           barf("resurrectThreads: thread BlockedOnException");
-            break;
        default:
-           barf("resurrectThreads: thread blocked in a strange way");
+           barf("resurrectThreads: thread blocked in a strange way: %d",
+                 tso->why_blocked);
        }
     }
 }
-
-/* -----------------------------------------------------------------------------
-   performPendingThrowTos is called after garbage collection, and
-   passed a list of threads that were found to have pending throwTos
-   (tso->blocked_exceptions was not empty), and were blocked.
-   Normally this doesn't happen, because we would deliver the
-   exception directly if the target thread is blocked, but there are
-   small windows where it might occur on a multiprocessor (see
-   throwTo()).
-
-   NB. we must be holding all the capabilities at this point, just
-   like resurrectThreads().
-   -------------------------------------------------------------------------- */
-
-void
-performPendingThrowTos (StgTSO *threads)
-{
-    StgTSO *tso, *next;
-    Capability *cap;
-    Task *task, *saved_task;;
-    step *step;
-
-    task = myTask();
-    cap = task->cap;
-
-    for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
-       next = tso->global_link;
-
-        step = Bdescr((P_)tso)->step;
-       tso->global_link = step->threads;
-       step->threads = tso;
-
-       debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
-       
-        // We must pretend this Capability belongs to the current Task
-        // for the time being, as invariants will be broken otherwise.
-        // In fact the current Task has exclusive access to the systme
-        // at this point, so this is just bookkeeping:
-       task->cap = tso->cap;
-        saved_task = tso->cap->running_task;
-        tso->cap->running_task = task;
-        maybePerformBlockedException(tso->cap, tso);
-        tso->cap->running_task = saved_task;
-    }
-
-    // Restore our original Capability:
-    task->cap = cap;
-}