Asynchronous exception support for SMP
authorSimon Marlow <simonmar@microsoft.com>
Fri, 16 Jun 2006 10:33:42 +0000 (10:33 +0000)
committerSimon Marlow <simonmar@microsoft.com>
Fri, 16 Jun 2006 10:33:42 +0000 (10:33 +0000)
This patch makes throwTo work with -threaded, and also refactors large
parts of the concurrency support in the RTS to clean things up.  We
have some new files:

  RaiseAsync.{c,h} asynchronous exception support
  Threads.{c,h}         general threading-related utils

Some of the contents of these new files used to be in Schedule.c,
which is smaller and cleaner as a result of the split.

Asynchronous exception support in the presence of multiple running
Haskell threads is rather tricky.  In fact, to my annoyance there are
still one or two bugs to track down, but the majority of the tests run
now.

23 files changed:
includes/Constants.h
includes/SMP.h
includes/StgMiscClosures.h
includes/TSO.h
includes/mkDerivedConstants.c
rts/Capability.c
rts/Capability.h
rts/Exception.cmm
rts/Exception.h [deleted file]
rts/GC.c
rts/GCCompact.c
rts/HCIncludes.h [new file with mode: 0644]
rts/HeapStackCheck.cmm
rts/Makefile
rts/RaiseAsync.c [new file with mode: 0644]
rts/RaiseAsync.h [new file with mode: 0644]
rts/Schedule.c
rts/Schedule.h
rts/ThreadLabels.c
rts/ThreadLabels.h
rts/Threads.c [new file with mode: 0644]
rts/Threads.h [new file with mode: 0644]
rts/Trace.h

index 4f3c35b..ef2a486 100644 (file)
 #define ThreadBlocked  4
 #define ThreadFinished 5
 
+/* 
+ * Flags for the tso->flags field.
+ *
+ * The TSO_DIRTY flag indicates that this TSO's stack should be
+ * scanned during garbage collection.  The link field of a TSO is
+ * always scanned, so we don't have to dirty a TSO just for linking
+ * it on a different list.
+ *
+ * TSO_DIRTY is set by 
+ *    - schedule(), just before running a thread,
+ *    - raiseAsync(), because it modifies a thread's stack
+ *    - resumeThread(), just before running the thread again
+ * and unset by the garbage collector (only).
+ */
+#define TSO_DIRTY   1
+
+/*
+ * TSO_LOCKED is set when a TSO is locked to a particular Capability.
+ */
+#define TSO_LOCKED  2
+
+/*
+ * TSO_BLOCKEX: the TSO is blocking exceptions
+ *
+ * TSO_INTERRUPTIBLE: the TSO can be interrupted if it blocks
+ * interruptibly (eg. with BlockedOnMVar).
+ */
+#define TSO_BLOCKEX       4
+#define TSO_INTERRUPTIBLE 8
+
 /* -----------------------------------------------------------------------------
    RET_DYN stack frames
    -------------------------------------------------------------------------- */
index 5974c96..d985576 100644 (file)
@@ -155,6 +155,21 @@ xchg(StgPtr p, StgWord w)
     return old;
 }
 
+INLINE_HEADER StgInfoTable *
+lockClosure(StgClosure *p)
+{ return (StgInfoTable *)p->header.info; }
+
+INLINE_HEADER void
+unlockClosure(StgClosure *p STG_UNUSED, StgInfoTable *info STG_UNUSED)
+{ /* nothing */ }
+
 #endif /* !THREADED_RTS */
 
+// Handy specialised versions of lockClosure()/unlockClosure()
+INLINE_HEADER void lockTSO(StgTSO *tso)
+{ lockClosure((StgClosure *)tso); }
+
+INLINE_HEADER void unlockTSO(StgTSO *tso)
+{ unlockClosure((StgClosure*)tso, (StgInfoTable*)&stg_TSO_info); }
+
 #endif /* SMP_H */
index 4a6a7c4..fcc9736 100644 (file)
@@ -490,6 +490,8 @@ RTS_FUN(stg_block_async_void);
 RTS_ENTRY(stg_block_async_void_ret);
 #endif
 RTS_FUN(stg_block_stmwait);
+RTS_FUN(stg_block_throwto);
+RTS_RET_INFO(stg_block_throwto_info);
 
 /* Entry/exit points from StgStartup.cmm */
 
index d096d40..0c3e4ee 100644 (file)
@@ -77,27 +77,6 @@ typedef StgTSOStatBuf StgTSOGranInfo;
  */
 typedef StgWord32 StgThreadID;
 
-/* 
- * Flags for the tso->flags field.
- *
- * The TSO_DIRTY flag indicates that this TSO's stack should be
- * scanned during garbage collection.  The link field of a TSO is
- * always scanned, so we don't have to dirty a TSO just for linking
- * it on a different list.
- *
- * TSO_DIRTY is set by 
- *    - schedule(), just before running a thread,
- *    - raiseAsync(), because it modifies a thread's stack
- *    - resumeThread(), just before running the thread again
- * and unset by the garbage collector (only).
- */
-#define TSO_DIRTY   1
-
-/*
- * TSO_LOCKED is set when a TSO is locked to a particular Capability.
- */
-#define TSO_LOCKED  2
-
 #define tsoDirty(tso)  ((tso)->flags & TSO_DIRTY)
 #define tsoLocked(tso) ((tso)->flags & TSO_LOCKED)
 
@@ -127,6 +106,7 @@ typedef union {
   StgWord target;
 } StgTSOBlockInfo;
 
+
 /*
  * TSOs live on the heap, and therefore look just like heap objects.
  * Large TSOs will live in their own "block group" allocated by the
@@ -151,13 +131,19 @@ typedef struct StgTSO_ {
     StgWord16               why_blocked;    /* Values defined in Constants.h */
     StgWord32               flags;
     StgTSOBlockInfo         block_info;
-    struct StgTSO_*         blocked_exceptions;
     StgThreadID             id;
     int                     saved_errno;
     struct Task_*           bound;
     struct Capability_*     cap;
     struct StgTRecHeader_ * trec;       /* STM transaction record */
 
+    /* 
+       A list of threads blocked on this TSO waiting to throw
+       exceptions.  In order to access this field, the TSO must be
+       locked using lockClosure/unlockClosure (see SMP.h).
+    */
+    struct StgTSO_ *        blocked_exceptions;
+
 #ifdef TICKY_TICKY
     /* TICKY-specific stuff would go here. */
 #endif
index 27d4fa9..62d9490 100644 (file)
@@ -18,6 +18,7 @@
  * doesn't affect the offsets of anything else.
  */
 #define PROFILING
+#define THREADED_RTS
 
 #include "Rts.h"
 #include "RtsFlags.h"
@@ -227,6 +228,7 @@ main(int argc, char *argv[])
     def_offset("stgGCFun", FUN_OFFSET(stgGCFun));
 
     field_offset(Capability, r);
+    field_offset(Capability, lock);
 
     struct_field(bdescr, start);
     struct_field(bdescr, free);
@@ -276,8 +278,10 @@ main(int argc, char *argv[])
     closure_field(StgTSO, block_info);
     closure_field(StgTSO, blocked_exceptions);
     closure_field(StgTSO, id);
+    closure_field(StgTSO, cap);
     closure_field(StgTSO, saved_errno);
     closure_field(StgTSO, trec);
+    closure_field(StgTSO, flags);
     closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
     tso_field(StgTSO, sp);
     tso_field_offset(StgTSO, stack);
index 0415092..2384262 100644 (file)
@@ -518,8 +518,10 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
 {
     ASSERT(tso->cap == cap);
     ASSERT(tso->bound ? tso->bound->cap == cap : 1);
+    ASSERT_LOCK_HELD(&cap->lock);
+
+    tso->cap = cap;
 
-    ACQUIRE_LOCK(&cap->lock);
     if (cap->running_task == NULL) {
        // nobody is running this Capability, we can add our thread
        // directly onto the run queue and start up a Task to run it.
@@ -535,6 +537,33 @@ wakeupThreadOnCapability (Capability *cap, StgTSO *tso)
        // freed without first checking the wakeup queue (see
        // releaseCapability_).
     }
+}
+
+void
+wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso)
+{
+    ACQUIRE_LOCK(&cap->lock);
+    migrateThreadToCapability (cap, tso);
+    RELEASE_LOCK(&cap->lock);
+}
+
+void
+migrateThreadToCapability (Capability *cap, StgTSO *tso)
+{
+    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+    if (tso->bound) {
+       ASSERT(tso->bound->cap == tso->cap);
+       tso->bound->cap = cap;
+    }
+    tso->cap = cap;
+    wakeupThreadOnCapability(cap,tso);
+}
+
+void
+migrateThreadToCapability_lock (Capability *cap, StgTSO *tso)
+{
+    ACQUIRE_LOCK(&cap->lock);
+    migrateThreadToCapability (cap, tso);
     RELEASE_LOCK(&cap->lock);
 }
 
index a2551d0..641f37d 100644 (file)
@@ -199,6 +199,10 @@ void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
 // from the one held by the current Task).
 //
 void wakeupThreadOnCapability (Capability *cap, StgTSO *tso);
+void wakeupThreadOnCapability_lock (Capability *cap, StgTSO *tso);
+
+void migrateThreadToCapability (Capability *cap, StgTSO *tso);
+void migrateThreadToCapability_lock (Capability *cap, StgTSO *tso);
 
 // Wakes up a worker thread on just one Capability, used when we
 // need to service some global event.
index b5c2962..f4327b9 100644 (file)
@@ -11,6 +11,7 @@
  * ---------------------------------------------------------------------------*/
 
 #include "Cmm.h"
+#include "RaiseAsync.h"
 
 /* -----------------------------------------------------------------------------
    Exception Primitives
@@ -54,13 +55,13 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret,
 {
     // Not true: see comments above
     // ASSERT(StgTSO_blocked_exceptions(CurrentTSO) != NULL);
-#if defined(GRAN) || defined(PAR)
-    foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr", 
-                                  NULL "ptr"); 
-#else
-    foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
-#endif
-    StgTSO_blocked_exceptions(CurrentTSO) = NULL;
+
+    foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr", 
+                                           CurrentTSO "ptr") [R1];
+
+    StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & 
+       ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+
 #ifdef REG_R1
     Sp_adj(1);
     jump %ENTRY_CODE(Sp(0));
@@ -76,7 +77,10 @@ INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret,
 {
     // Not true: see comments above
     // ASSERT(StgTSO_blocked_exceptions(CurrentTSO) == NULL);
-    StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
+
+    StgTSO_flags(CurrentTSO) = 
+       StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+
 #ifdef REG_R1
     Sp_adj(1);
     jump %ENTRY_CODE(Sp(0));
@@ -92,15 +96,18 @@ blockAsyncExceptionszh_fast
     /* Args: R1 :: IO a */
     STK_CHK_GEN( WDS(2)/* worst case */, R1_PTR, blockAsyncExceptionszh_fast);
 
-    if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) {
-      StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
-      /* avoid growing the stack unnecessarily */
-      if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
-       Sp_adj(1);
-      } else {
-       Sp_adj(-1);
-       Sp(0) = stg_unblockAsyncExceptionszh_ret_info;
-      }
+    if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
+       
+       StgTSO_flags(CurrentTSO) = 
+          StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+
+       /* avoid growing the stack unnecessarily */
+       if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
+           Sp_adj(1);
+       } else {
+           Sp_adj(-1);
+           Sp(0) = stg_unblockAsyncExceptionszh_ret_info;
+       }
     }
     TICK_UNKNOWN_CALL();
     TICK_SLOW_CALL_v();
@@ -112,22 +119,17 @@ unblockAsyncExceptionszh_fast
     /* Args: R1 :: IO a */
     STK_CHK_GEN( WDS(2), R1_PTR, unblockAsyncExceptionszh_fast);
 
-    if (StgTSO_blocked_exceptions(CurrentTSO) != NULL) {
-#if defined(GRAN) || defined(PAR)
-      foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr", 
-                                    StgTSO_block_info(CurrentTSO) "ptr");
-#else
-      foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr");
-#endif
-      StgTSO_blocked_exceptions(CurrentTSO) = NULL;
+    if (TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) {
+       foreign "C" awakenBlockedExceptionQueue(MyCapability() "ptr", 
+                                               CurrentTSO "ptr") [R1];
 
-      /* avoid growing the stack unnecessarily */
-      if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
-       Sp_adj(1);
-      } else {
-       Sp_adj(-1);
-       Sp(0) = stg_blockAsyncExceptionszh_ret_info;
-      }
+       /* avoid growing the stack unnecessarily */
+       if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
+           Sp_adj(1);
+       } else {
+           Sp_adj(-1);
+           Sp(0) = stg_blockAsyncExceptionszh_ret_info;
+       }
     }
     TICK_UNKNOWN_CALL();
     TICK_SLOW_CALL_v();
@@ -135,74 +137,62 @@ unblockAsyncExceptionszh_fast
 }
 
 
-#define interruptible(what_next)               \
-        (   what_next == BlockedOnMVar         \
-         || what_next == BlockedOnException    \
-         || what_next == BlockedOnRead         \
-         || what_next == BlockedOnWrite                \
-         || what_next == BlockedOnDelay                \
-         || what_next == BlockedOnDoProc)
-
 killThreadzh_fast
 {
-  /* args: R1 = TSO to kill, R2 = Exception */
-
-  W_ why_blocked;
-
-  /* This thread may have been relocated.
-   * (see Schedule.c:threadStackOverflow)
-   */
- while:
-  if (StgTSO_what_next(R1) == ThreadRelocated::I16) {
-    R1 = StgTSO_link(R1);
-    goto while;
-  }
-
-  /* Determine whether this thread is interruptible or not */
-
-  /* If the target thread is currently blocking async exceptions,
-   * we'll have to block until it's ready to accept them.  The
-   * exception is interruptible threads - ie. those that are blocked
-   * on some resource.
-   */
-  why_blocked = TO_W_(StgTSO_why_blocked(R1));
-  if (StgTSO_blocked_exceptions(R1) != NULL && !interruptible(why_blocked))
-  {
-      StgTSO_link(CurrentTSO) = StgTSO_blocked_exceptions(R1);
-      StgTSO_blocked_exceptions(R1) = CurrentTSO;
-      
-      StgTSO_why_blocked(CurrentTSO) = BlockedOnException::I16;
-      StgTSO_block_info(CurrentTSO) = R1;
-      
-      BLOCK( R1_PTR & R2_PTR, killThreadzh_fast );
-  }
-
-  /* Killed threads turn into zombies, which might be garbage
-   * collected at a later date.  That's why we don't have to
-   * explicitly remove them from any queues they might be on.
-   */
-
-  /* We might have killed ourselves.  In which case, better be *very*
-   * careful.  If the exception killed us, then return to the scheduler.
-   * If the exception went to a catch frame, we'll just continue from
-   * the handler.
-   */
-  if (R1 == CurrentTSO) {
+    /* args: R1 = TSO to kill, R2 = Exception */
+
+    W_ why_blocked;
+    W_ target;
+    W_ exception;
+    
+    target = R1;
+    exception = R2;
+    
+    STK_CHK_GEN( WDS(3), R1_PTR | R2_PTR, killThreadzh_fast);
+
+    /* 
+     * We might have killed ourselves.  In which case, better be *very*
+     * careful.  If the exception killed us, then return to the scheduler.
+     * If the exception went to a catch frame, we'll just continue from
+     * the handler.
+     */
+    if (target == CurrentTSO) {
        SAVE_THREAD_STATE();
-       foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
+       /* ToDo: what if the current thread is blocking exceptions? */
+       foreign "C" throwToSingleThreaded(MyCapability() "ptr", 
+                                         target "ptr", exception "ptr")[R1,R2];
        if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) {
-               R1 = ThreadFinished;
-               jump StgReturn;
+           R1 = ThreadFinished;
+           jump StgReturn;
        } else {
-               LOAD_THREAD_STATE();
-               ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
-               jump %ENTRY_CODE(Sp(0));
+           LOAD_THREAD_STATE();
+           ASSERT(StgTSO_what_next(CurrentTSO) == ThreadRunGHC::I16);
+           jump %ENTRY_CODE(Sp(0));
+       }
+    } else {
+       W_ out;
+       W_ retcode;
+       out = BaseReg + OFFSET_StgRegTable_rmp_tmp_w;
+       
+       retcode = foreign "C" throwTo(MyCapability() "ptr",
+                                     CurrentTSO "ptr",
+                                     target "ptr",
+                                     exception "ptr",
+                                     out "ptr") [R1,R2];
+       
+       switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) {
+
+       case THROWTO_SUCCESS: {
+           jump %ENTRY_CODE(Sp(0));
        }
-  } else {
-       foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr");
-  }
 
-  jump %ENTRY_CODE(Sp(0));
+       case THROWTO_BLOCKED: {
+           R3 = W_[out];
+           // we must block, and call throwToReleaseTarget() before returning
+           jump stg_block_throwto;
+       }
+       }
+    }
 }
 
 /* -----------------------------------------------------------------------------
@@ -300,15 +290,14 @@ catchzh_fast
     SET_HDR(Sp,stg_catch_frame_info,W_[CCCS]);
     
     StgCatchFrame_handler(Sp) = R2;
-    StgCatchFrame_exceptions_blocked(Sp) = 
-       (StgTSO_blocked_exceptions(CurrentTSO) != NULL);
+    StgCatchFrame_exceptions_blocked(Sp) = TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX;
     TICK_CATCHF_PUSHED();
 
     /* Apply R1 to the realworld token */
     TICK_UNKNOWN_CALL();
     TICK_SLOW_CALL_v();
     jump stg_ap_v_fast;
-}      
+}
 
 /* -----------------------------------------------------------------------------
  * The raise infotable
@@ -423,9 +412,8 @@ retry_pop_stack:
 
     /* Ensure that async excpetions are blocked when running the handler.
     */
-    if (StgTSO_blocked_exceptions(CurrentTSO) == NULL) {
-      StgTSO_blocked_exceptions(CurrentTSO) = END_TSO_QUEUE;
-    }
+    StgTSO_flags(CurrentTSO) = 
+       StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
 
     /* Call the handler, passing the exception value and a realworld
      * token as arguments.
diff --git a/rts/Exception.h b/rts/Exception.h
deleted file mode 100644 (file)
index f7832f4..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-/* -----------------------------------------------------------------------------
- *
- * (c) The GHC Team, 1998-2005
- *
- * Exception support
- *
- * ---------------------------------------------------------------------------*/
-
-#ifndef EXCEPTION_H
-#define EXCEPTION_H
-
-extern const StgRetInfoTable stg_blockAsyncExceptionszh_ret_info;
-extern const StgRetInfoTable stg_unblockAsyncExceptionszh_ret_info;
-
-/* Determine whether a thread is interruptible (ie. blocked
- * indefinitely).  Interruptible threads can be sent an exception with
- * killThread# even if they have async exceptions blocked.
- */
-STATIC_INLINE int
-interruptible(StgTSO *t)
-{
-  switch (t->why_blocked) {
-  case BlockedOnMVar:
-  case BlockedOnException:
-  case BlockedOnRead:
-  case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
-  case BlockedOnDoProc:
-#endif
-  case BlockedOnDelay:
-    return 1;
-  // NB. Threaded blocked on foreign calls (BlockedOnCCall) are
-  // *not* interruptible.  We can't send these threads an exception.
-  default:
-    return 0;
-  }
-}
-
-#endif /* EXCEPTION_H */
-
index 727027d..10f6a36 100644 (file)
--- a/rts/GC.c
+++ b/rts/GC.c
@@ -44,6 +44,7 @@
 #endif
 #include "Trace.h"
 #include "RetainerProfile.h"
+#include "RaiseAsync.h"
 
 #include <string.h>
 
@@ -2631,10 +2632,8 @@ scavengeTSO (StgTSO *tso)
        ) {
        tso->block_info.closure = evacuate(tso->block_info.closure);
     }
-    if ( tso->blocked_exceptions != NULL ) {
-       tso->blocked_exceptions = 
-           (StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions);
-    }
+    tso->blocked_exceptions = 
+       (StgTSO *)evacuate((StgClosure *)tso->blocked_exceptions);
     
     // We don't always chase the link field: TSOs on the blackhole
     // queue are not automatically alive, so the link field is a
@@ -4620,6 +4619,14 @@ threadPaused(Capability *cap, StgTSO *tso)
     nat weight_pending   = 0;
     rtsBool prev_was_update_frame;
     
+    // Check to see whether we have threads waiting to raise
+    // exceptions, and we're not blocking exceptions, or are blocked
+    // interruptibly.  This is important; if a thread is running with
+    // TSO_BLOCKEX and becomes blocked interruptibly, this is the only
+    // place we ensure that the blocked_exceptions get a chance.
+    maybePerformBlockedException (cap, tso);
+    if (tso->what_next == ThreadKilled) { return; }
+
     stack_end = &tso->stack[tso->stack_size];
     
     frame = (StgClosure *)tso->sp;
index 45222c3..7f91501 100644 (file)
@@ -403,9 +403,7 @@ thread_TSO (StgTSO *tso)
        ) {
        thread_(&tso->block_info.closure);
     }
-    if ( tso->blocked_exceptions != NULL ) {
-       thread_(&tso->blocked_exceptions);
-    }
+    thread_(&tso->blocked_exceptions);
     
     thread_(&tso->trec);
 
diff --git a/rts/HCIncludes.h b/rts/HCIncludes.h
new file mode 100644 (file)
index 0000000..06cc61a
--- /dev/null
@@ -0,0 +1,22 @@
+/* includes for compiling .cmm files via-C */
+#include "Rts.h"
+#include "RtsFlags.h"
+#include "RtsUtils.h"
+#include "StgRun.h"
+#include "Schedule.h"
+#include "Printer.h"
+#include "Sanity.h"
+#include "STM.h"
+#include "Storage.h"
+#include "SchedAPI.h"
+#include "Timer.h"
+#include "ProfHeap.h"
+#include "LdvProfile.h"
+#include "Profiling.h"
+#include "OSThreads.h"
+#include "Apply.h"
+#include "SMP.h"
+#include "RaiseAsync.h"
+#include "ThreadLabels.h"
+#include "Threads.h"
+#include "Prelude.h"
index 4e5dd24..aae28cb 100644 (file)
@@ -902,6 +902,31 @@ stg_block_blackhole
     BLOCK_BUT_FIRST(stg_block_blackhole_finally);
 }
 
+INFO_TABLE_RET( stg_block_throwto, 2/*framesize*/, 0/*bitmap*/, RET_SMALL )
+{
+    R2 = Sp(2);
+    R1 = Sp(1);
+    Sp_adj(3);
+    jump killThreadzh_fast;
+}
+
+stg_block_throwto_finally
+{
+#ifdef THREADED_RTS
+    foreign "C" throwToReleaseTarget (R3 "ptr");
+#endif
+    jump StgReturn;
+}
+
+stg_block_throwto
+{
+    Sp_adj(-3);
+    Sp(2) = R2;
+    Sp(1) = R1;
+    Sp(0) = stg_block_throwto_info;
+    BLOCK_BUT_FIRST(stg_block_throwto_finally);
+}
+
 #ifdef mingw32_HOST_OS
 INFO_TABLE_RET( stg_block_async, 0/*framesize*/, 0/*bitmap*/, RET_SMALL )
 {
index 67201cd..b1111a0 100644 (file)
@@ -301,26 +301,7 @@ endif
 # Compiling the cmm files
 
 # ToDo: should we really include Rts.h here?  Required for GNU_ATTRIBUTE().
-SRC_HC_OPTS += \
-  -I. \
-  -\#include Prelude.h \
-  -\#include Rts.h \
-  -\#include RtsFlags.h \
-  -\#include RtsUtils.h \
-  -\#include StgRun.h \
-  -\#include Schedule.h \
-  -\#include Printer.h \
-  -\#include Sanity.h \
-  -\#include STM.h \
-  -\#include Storage.h \
-  -\#include SchedAPI.h \
-  -\#include Timer.h \
-  -\#include ProfHeap.h \
-  -\#include LdvProfile.h \
-  -\#include Profiling.h \
-  -\#include OSThreads.h \
-  -\#include Apply.h \
-  -\#include SMP.h
+SRC_HC_OPTS += -I. -\#include HCIncludes.h
 
 ifeq "$(Windows)" "YES"
 PrimOps_HC_OPTS += -\#include '<windows.h>' -\#include win32/AsyncIO.h
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
new file mode 100644 (file)
index 0000000..9041c06
--- /dev/null
@@ -0,0 +1,1015 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2006
+ *
+ * Asynchronous exceptions
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "PosixSource.h"
+#include "Rts.h"
+#include "Threads.h"
+#include "Trace.h"
+#include "RaiseAsync.h"
+#include "SMP.h"
+#include "Schedule.h"
+#include "Storage.h"
+#include "Updates.h"
+#include "STM.h"
+#include "Sanity.h"
+
+static void raiseAsync (Capability *cap,
+                       StgTSO *tso,
+                       StgClosure *exception, 
+                       rtsBool stop_at_atomically,
+                       StgPtr stop_here);
+
+static void removeFromQueues(Capability *cap, StgTSO *tso);
+
+static void blockedThrowTo (StgTSO *source, StgTSO *target);
+
+static void performBlockedException (Capability *cap, 
+                                    StgTSO *source, StgTSO *target);
+
+/* -----------------------------------------------------------------------------
+   throwToSingleThreaded
+
+   This version of throwTo is safe to use if and only if one of the
+   following holds:
+   
+     - !THREADED_RTS
+
+     - all the other threads in the system are stopped (eg. during GC).
+
+     - we surely own the target TSO (eg. we just took it from the
+       run queue of the current capability, or we are running it).
+
+   It doesn't cater for blocking the source thread until the exception
+   has been raised.
+   -------------------------------------------------------------------------- */
+
+void
+throwToSingleThreaded(Capability *cap, StgTSO *tso, StgClosure *exception)
+{
+    throwToSingleThreaded_(cap, tso, exception, rtsFalse, NULL);
+}
+
+void
+throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
+                      rtsBool stop_at_atomically, StgPtr stop_here)
+{
+    // Thread already dead?
+    if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+       return;
+    }
+
+    // Remove it from any blocking queues
+    removeFromQueues(cap,tso);
+
+    raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
+}
+
+void
+suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+{
+    // Thread already dead?
+    if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
+       return;
+    }
+
+    // Remove it from any blocking queues
+    removeFromQueues(cap,tso);
+
+    raiseAsync(cap, tso, NULL, rtsFalse, stop_here);
+}
+
+/* -----------------------------------------------------------------------------
+   throwTo
+
+   This function may be used to throw an exception from one thread to
+   another, during the course of normal execution.  This is a tricky
+   task: the target thread might be running on another CPU, or it
+   may be blocked and could be woken up at any point by another CPU.
+   We have some delicate synchronisation to do.
+
+   There is a completely safe fallback scheme: it is always possible
+   to just block the source TSO on the target TSO's blocked_exceptions
+   queue.  This queue is locked using lockTSO()/unlockTSO().  It is
+   checked at regular intervals: before and after running a thread
+   (schedule() and threadPaused() respectively), and just before GC
+   (scheduleDoGC()).  Activating a thread on this queue should be done
+   using maybePerformBlockedException(): this is done in the context
+   of the target thread, so the exception can be raised eagerly.
+
+   This fallback scheme works even if the target thread is complete or
+   killed: scheduleDoGC() will discover the blocked thread before the
+   target is GC'd.
+
+   Blocking the source thread on the target thread's blocked_exception
+   queue is also employed when the target thread is currently blocking
+   exceptions (ie. inside Control.Exception.block).
+
+   We could use the safe fallback scheme exclusively, but that
+   wouldn't be ideal: most calls to throwTo would block immediately,
+   possibly until the next GC, which might require the deadlock
+   detection mechanism to kick in.  So we try to provide promptness
+   wherever possible.
+
+   We can promptly deliver the exception if the target thread is:
+
+     - runnable, on the same Capability as the source thread (because
+       we own the run queue and therefore the target thread).
+   
+     - blocked, and we can obtain exclusive access to it.  Obtaining
+       exclusive access to the thread depends on how it is blocked.
+
+   We must also be careful to not trip over threadStackOverflow(),
+   which might be moving the TSO to enlarge its stack.
+   lockTSO()/unlockTSO() are used here too.
+
+   Returns: 
+
+   THROWTO_SUCCESS    exception was raised, ok to continue
+
+   THROWTO_BLOCKED    exception was not raised; block the source
+                      thread then call throwToReleaseTarget() when
+                     the source thread is properly tidied away.
+
+   -------------------------------------------------------------------------- */
+
+nat
+throwTo (Capability *cap,      // the Capability we hold 
+        StgTSO *source,        // the TSO sending the exception
+        StgTSO *target,        // the TSO receiving the exception
+        StgClosure *exception, // the exception closure
+        /*[out]*/ void **out USED_IF_THREADS)
+{
+    StgWord status;
+
+    // follow ThreadRelocated links in the target first
+    while (target->what_next == ThreadRelocated) {
+       target = target->link;
+       // No, it might be a WHITEHOLE:
+       // ASSERT(get_itbl(target)->type == TSO);
+    }
+
+    debugTrace(DEBUG_sched, "throwTo: from thread %d to thread %d",
+              source->id, target->id);
+
+#ifdef DEBUG
+    if (traceClass(DEBUG_sched)) {
+       debugTraceBegin("throwTo: target");
+       printThreadStatus(target);
+       debugTraceEnd();
+    }
+#endif
+
+    goto check_target;
+retry:
+    debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
+    // Thread already dead?
+    if (target->what_next == ThreadComplete 
+       || target->what_next == ThreadKilled) {
+       return THROWTO_SUCCESS;
+    }
+
+    status = target->why_blocked;
+    
+    switch (status) {
+    case NotBlocked:
+       /* if status==NotBlocked, and target->cap == cap, then
+          we own this TSO and can raise the exception.
+          
+          How do we establish this condition?  Very carefully.
+
+          Let 
+              P = (status == NotBlocked)
+              Q = (tso->cap == cap)
+              
+          Now, if P & Q are true, then the TSO is locked and owned by
+          this capability.  No other OS thread can steal it.
+
+          If P==0 and Q==1: the TSO is blocked, but attached to this
+          capabilty, and it can be stolen by another capability.
+          
+          If P==1 and Q==0: the TSO is runnable on another
+          capability.  At any time, the TSO may change from runnable
+          to blocked and vice versa, while it remains owned by
+          another capability.
+
+          Suppose we test like this:
+
+             p = P
+             q = Q
+             if (p && q) ...
+
+           this is defeated by another capability stealing a blocked
+           TSO from us to wake it up (Schedule.c:unblockOne()).  The
+           other thread is doing
+
+             Q = 0
+             P = 1
+
+           assuming arbitrary reordering, we could see this
+           interleaving:
+
+             start: P==0 && Q==1 
+             P = 1
+             p = P
+             q = Q
+             Q = 0
+             if (p && q) ...
+              
+           so we need a memory barrier:
+
+             p = P
+             mb()
+             q = Q
+             if (p && q) ...
+
+           this avoids the problematic case.  There are other cases
+           to consider, but this is the tricky one.
+
+           Note that we must be sure that unblockOne() does the
+           writes in the correct order: Q before P.  The memory
+           barrier ensures that if we have seen the write to P, we
+           have also seen the write to Q.
+       */
+    {
+       Capability *target_cap;
+
+       wb();
+       target_cap = target->cap;
+       if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
+           // It's on our run queue and not blocking exceptions
+           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           return THROWTO_SUCCESS;
+       } else {
+           // Otherwise, just block on the blocked_exceptions queue
+           // of the target thread.  The queue will get looked at
+           // soon enough: it is checked before and after running a
+           // thread, and during GC.
+           lockTSO(target);
+
+           // Avoid race with threadStackOverflow, which may have
+           // just moved this TSO.
+           if (target->what_next == ThreadRelocated) {
+               unlockTSO(target);
+               target = target->link;
+               goto retry;
+           }
+           blockedThrowTo(source,target);
+           *out = target;
+           return THROWTO_BLOCKED;
+       }
+    }
+
+    case BlockedOnMVar:
+    {
+       /*
+         To establish ownership of this TSO, we need to acquire a
+         lock on the MVar that it is blocked on.
+       */
+       StgMVar *mvar;
+       StgInfoTable *info USED_IF_THREADS;
+       
+       mvar = (StgMVar *)target->block_info.closure;
+
+       // ASSUMPTION: tso->block_info must always point to a
+       // closure.  In the threaded RTS it does.
+       if (get_itbl(mvar)->type != MVAR) goto retry;
+
+       info = lockClosure((StgClosure *)mvar);
+
+       if (target->what_next == ThreadRelocated) {
+           target = target->link;
+           unlockClosure((StgClosure *)mvar,info);
+           goto retry;
+       }
+       // we have the MVar, let's check whether the thread
+       // is still blocked on the same MVar.
+       if (target->why_blocked != BlockedOnMVar
+           || (StgMVar *)target->block_info.closure != mvar) {
+           unlockClosure((StgClosure *)mvar, info);
+           goto retry;
+       }
+
+       if ((target->flags & TSO_BLOCKEX) &&
+           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+           lockClosure((StgClosure *)target);
+           blockedThrowTo(source,target);
+           unlockClosure((StgClosure *)mvar, info);
+           *out = target;
+           return THROWTO_BLOCKED; // caller releases TSO
+       } else {
+           removeThreadFromMVarQueue(mvar, target);
+           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           unblockOne(cap, target);
+           unlockClosure((StgClosure *)mvar, info);
+           return THROWTO_SUCCESS;
+       }
+    }
+
+    case BlockedOnBlackHole:
+    {
+       ACQUIRE_LOCK(&sched_mutex);
+       // double checking the status after the memory barrier:
+       if (target->why_blocked != BlockedOnBlackHole) {
+           RELEASE_LOCK(&sched_mutex);
+           goto retry;
+       }
+
+       if (target->flags & TSO_BLOCKEX) {
+           lockTSO(target);
+           blockedThrowTo(source,target);
+           RELEASE_LOCK(&sched_mutex);
+           *out = target;
+           return THROWTO_BLOCKED; // caller releases TSO
+       } else {
+           removeThreadFromQueue(&blackhole_queue, target);
+           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           unblockOne(cap, target);
+           RELEASE_LOCK(&sched_mutex);
+           return THROWTO_SUCCESS;
+       }
+    }
+
+    case BlockedOnException:
+    {
+       StgTSO *target2;
+       StgInfoTable *info;
+
+       /*
+         To obtain exclusive access to a BlockedOnException thread,
+         we must call lockClosure() on the TSO on which it is blocked.
+         Since the TSO might change underneath our feet, after we
+         call lockClosure() we must check that 
+          
+             (a) the closure we locked is actually a TSO
+            (b) the original thread is still  BlockedOnException,
+            (c) the original thread is still blocked on the TSO we locked
+            and (d) the target thread has not been relocated.
+
+         We synchronise with threadStackOverflow() (which relocates
+         threads) using lockClosure()/unlockClosure().
+       */
+       target2 = target->block_info.tso;
+
+       info = lockClosure((StgClosure *)target2);
+       if (info != &stg_TSO_info) {
+           unlockClosure((StgClosure *)target2, info);
+           goto retry;
+       }
+       if (target->what_next == ThreadRelocated) {
+           target = target->link;
+           unlockTSO(target2);
+           goto retry;
+       }
+       if (target2->what_next == ThreadRelocated) {
+           target->block_info.tso = target2->link;
+           unlockTSO(target2);
+           goto retry;
+       }
+       if (target->why_blocked != BlockedOnException
+           || target->block_info.tso != target2) {
+           unlockTSO(target2);
+           goto retry;
+       }
+       
+       /* 
+          Now we have exclusive rights to the target TSO...
+
+          If it is blocking exceptions, add the source TSO to its
+          blocked_exceptions queue.  Otherwise, raise the exception.
+       */
+       if ((target->flags & TSO_BLOCKEX) &&
+           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+           lockTSO(target);
+           blockedThrowTo(source,target);
+           unlockTSO(target2);
+           *out = target;
+           return THROWTO_BLOCKED;
+       } else {
+           removeThreadFromQueue(&target2->blocked_exceptions, target);
+           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           unblockOne(cap, target);
+           unlockTSO(target2);
+           return THROWTO_SUCCESS;
+       }
+    }  
+
+    case BlockedOnSTM:
+       barf("ToDo");
+
+    case BlockedOnCCall:
+    case BlockedOnCCall_NoUnblockExc:
+       // I don't think it's possible to acquire ownership of a
+       // BlockedOnCCall thread.  We just assume that the target
+       // thread is blocking exceptions, and block on its
+       // blocked_exception queue.
+       lockTSO(target);
+       blockedThrowTo(source,target);
+       *out = target;
+       return THROWTO_BLOCKED;
+
+#ifndef THREADEDED_RTS
+    case BlockedOnRead:
+    case BlockedOnWrite:
+    case BlockedOnDelay:
+       if ((target->flags & TSO_BLOCKEX) &&
+           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+           blockedThrowTo(source,target);
+           return THROWTO_BLOCKED;
+       } else {
+           removeFromQueues(cap,target);
+           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           return THROWTO_SUCCESS;
+       }
+#endif
+
+    default:
+       barf("throwTo: unrecognised why_blocked value");
+    }
+    barf("throwTo");
+}
+
+// Block a TSO on another TSO's blocked_exceptions queue.
+// Precondition: we hold an exclusive lock on the target TSO (this is
+// complex to achieve as there's no single lock on a TSO; see
+// throwTo()).
+static void
+blockedThrowTo (StgTSO *source, StgTSO *target)
+{
+    debugTrace(DEBUG_sched, "throwTo: blocking on thread %d", target->id);
+    source->link = target->blocked_exceptions;
+    target->blocked_exceptions = source;
+    dirtyTSO(target); // we modified the blocked_exceptions queue
+    
+    source->block_info.tso = target;
+    wb(); // throwTo_exception *must* be visible if BlockedOnException is.
+    source->why_blocked = BlockedOnException;
+}
+
+
+#ifdef THREADED_RTS
+void
+throwToReleaseTarget (void *tso)
+{
+    unlockTSO((StgTSO *)tso);
+}
+#endif
+
+/* -----------------------------------------------------------------------------
+   Waking up threads blocked in throwTo
+
+   There are two ways to do this: maybePerformBlockedException() will
+   perform the throwTo() for the thread at the head of the queue
+   immediately, and leave the other threads on the queue.
+   maybePerformBlockedException() also checks the TSO_BLOCKEX flag
+   before raising an exception.
+
+   awakenBlockedExceptionQueue() will wake up all the threads in the
+   queue, but not perform any throwTo() immediately.  This might be
+   more appropriate when the target thread is the one actually running
+   (see Exception.cmm).
+   -------------------------------------------------------------------------- */
+
+void
+maybePerformBlockedException (Capability *cap, StgTSO *tso)
+{
+    StgTSO *source;
+    
+    if (tso->blocked_exceptions != END_TSO_QUEUE
+       && ((tso->flags & TSO_BLOCKEX) == 0
+           || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
+
+       // Lock the TSO, this gives us exclusive access to the queue
+       lockTSO(tso);
+
+       // Check the queue again; it might have changed before we
+       // locked it.
+       if (tso->blocked_exceptions == END_TSO_QUEUE) {
+           unlockTSO(tso);
+           return;
+       }
+
+       // We unblock just the first thread on the queue, and perform
+       // its throw immediately.
+       source = tso->blocked_exceptions;
+       performBlockedException(cap, source, tso);
+       tso->blocked_exceptions = unblockOne_(cap, source, 
+                                             rtsFalse/*no migrate*/);
+       unlockTSO(tso);
+    }
+}
+
+void
+awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
+{
+    if (tso->blocked_exceptions != END_TSO_QUEUE) {
+       lockTSO(tso);
+       awakenBlockedQueue(cap, tso->blocked_exceptions);
+       tso->blocked_exceptions = END_TSO_QUEUE;
+       unlockTSO(tso);
+    }
+}    
+
+static void
+performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
+{
+    StgClosure *exception;
+
+    ASSERT(source->why_blocked == BlockedOnException);
+    ASSERT(source->block_info.tso->id == target->id);
+    ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
+    ASSERT(((StgTSO *)source->sp[1])->id == target->id);
+    // check ids not pointers, because the thread might be relocated
+
+    exception = (StgClosure *)source->sp[2];
+    throwToSingleThreaded(cap, target, exception);
+    source->sp += 3;
+}
+
+/* -----------------------------------------------------------------------------
+   Remove a thread from blocking queues.
+
+   This is for use when we raise an exception in another thread, which
+   may be blocked.
+   This has nothing to do with the UnblockThread event in GranSim. -- HWL
+   -------------------------------------------------------------------------- */
+
+#if defined(GRAN) || defined(PARALLEL_HASKELL)
+/*
+  NB: only the type of the blocking queue is different in GranSim and GUM
+      the operations on the queue-elements are the same
+      long live polymorphism!
+
+  Locks: sched_mutex is held upon entry and exit.
+
+*/
+static void
+removeFromQueues(Capability *cap, StgTSO *tso)
+{
+  StgBlockingQueueElement *t, **last;
+
+  switch (tso->why_blocked) {
+
+  case NotBlocked:
+    return;  /* not blocked */
+
+  case BlockedOnSTM:
+    // Be careful: nothing to do here!  We tell the scheduler that the thread
+    // is runnable and we leave it to the stack-walking code to abort the 
+    // transaction while unwinding the stack.  We should perhaps have a debugging
+    // test to make sure that this really happens and that the 'zombie' transaction
+    // does not get committed.
+    goto done;
+
+  case BlockedOnMVar:
+    ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
+    {
+      StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
+      StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
+
+      last = (StgBlockingQueueElement **)&mvar->head;
+      for (t = (StgBlockingQueueElement *)mvar->head; 
+          t != END_BQ_QUEUE; 
+          last = &t->link, last_tso = t, t = t->link) {
+       if (t == (StgBlockingQueueElement *)tso) {
+         *last = (StgBlockingQueueElement *)tso->link;
+         if (mvar->tail == tso) {
+           mvar->tail = (StgTSO *)last_tso;
+         }
+         goto done;
+       }
+      }
+      barf("removeFromQueues (MVAR): TSO not found");
+    }
+
+  case BlockedOnBlackHole:
+    ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
+    {
+      StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
+
+      last = &bq->blocking_queue;
+      for (t = bq->blocking_queue; 
+          t != END_BQ_QUEUE; 
+          last = &t->link, t = t->link) {
+       if (t == (StgBlockingQueueElement *)tso) {
+         *last = (StgBlockingQueueElement *)tso->link;
+         goto done;
+       }
+      }
+      barf("removeFromQueues (BLACKHOLE): TSO not found");
+    }
+
+  case BlockedOnException:
+    {
+      StgTSO *target  = tso->block_info.tso;
+
+      ASSERT(get_itbl(target)->type == TSO);
+
+      while (target->what_next == ThreadRelocated) {
+         target = target2->link;
+         ASSERT(get_itbl(target)->type == TSO);
+      }
+
+      last = (StgBlockingQueueElement **)&target->blocked_exceptions;
+      for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
+          t != END_BQ_QUEUE; 
+          last = &t->link, t = t->link) {
+       ASSERT(get_itbl(t)->type == TSO);
+       if (t == (StgBlockingQueueElement *)tso) {
+         *last = (StgBlockingQueueElement *)tso->link;
+         goto done;
+       }
+      }
+      barf("removeFromQueues (Exception): TSO not found");
+    }
+
+  case BlockedOnRead:
+  case BlockedOnWrite:
+#if defined(mingw32_HOST_OS)
+  case BlockedOnDoProc:
+#endif
+    {
+      /* take TSO off blocked_queue */
+      StgBlockingQueueElement *prev = NULL;
+      for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
+          prev = t, t = t->link) {
+       if (t == (StgBlockingQueueElement *)tso) {
+         if (prev == NULL) {
+           blocked_queue_hd = (StgTSO *)t->link;
+           if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
+             blocked_queue_tl = END_TSO_QUEUE;
+           }
+         } else {
+           prev->link = t->link;
+           if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
+             blocked_queue_tl = (StgTSO *)prev;
+           }
+         }
+#if defined(mingw32_HOST_OS)
+         /* (Cooperatively) signal that the worker thread should abort
+          * the request.
+          */
+         abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
+         goto done;
+       }
+      }
+      barf("removeFromQueues (I/O): TSO not found");
+    }
+
+  case BlockedOnDelay:
+    {
+      /* take TSO off sleeping_queue */
+      StgBlockingQueueElement *prev = NULL;
+      for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
+          prev = t, t = t->link) {
+       if (t == (StgBlockingQueueElement *)tso) {
+         if (prev == NULL) {
+           sleeping_queue = (StgTSO *)t->link;
+         } else {
+           prev->link = t->link;
+         }
+         goto done;
+       }
+      }
+      barf("removeFromQueues (delay): TSO not found");
+    }
+
+  default:
+    barf("removeFromQueues");
+  }
+
+ done:
+  tso->link = END_TSO_QUEUE;
+  tso->why_blocked = NotBlocked;
+  tso->block_info.closure = NULL;
+  pushOnRunQueue(cap,tso);
+}
+#else
+static void
+removeFromQueues(Capability *cap, StgTSO *tso)
+{
+  switch (tso->why_blocked) {
+
+  case NotBlocked:
+      return;
+
+  case BlockedOnSTM:
+    // Be careful: nothing to do here!  We tell the scheduler that the
+    // thread is runnable and we leave it to the stack-walking code to
+    // abort the transaction while unwinding the stack.  We should
+    // perhaps have a debugging test to make sure that this really
+    // happens and that the 'zombie' transaction does not get
+    // committed.
+    goto done;
+
+  case BlockedOnMVar:
+      removeThreadFromMVarQueue((StgMVar *)tso->block_info.closure, tso);
+      goto done;
+
+  case BlockedOnBlackHole:
+      removeThreadFromQueue(&blackhole_queue, tso);
+      goto done;
+
+  case BlockedOnException:
+    {
+      StgTSO *target  = tso->block_info.tso;
+
+      // NO: when called by threadPaused(), we probably have this
+      // TSO already locked (WHITEHOLEd) because we just placed
+      // ourselves on its queue.
+      // ASSERT(get_itbl(target)->type == TSO);
+
+      while (target->what_next == ThreadRelocated) {
+         target = target->link;
+      }
+      
+      removeThreadFromQueue(&target->blocked_exceptions, tso);
+      goto done;
+    }
+
+#if !defined(THREADED_RTS)
+  case BlockedOnRead:
+  case BlockedOnWrite:
+#if defined(mingw32_HOST_OS)
+  case BlockedOnDoProc:
+#endif
+      removeThreadFromDeQueue(&blocked_queue_hd, &blocked_queue_tl, tso);
+#if defined(mingw32_HOST_OS)
+      /* (Cooperatively) signal that the worker thread should abort
+       * the request.
+       */
+      abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
+      goto done;
+
+  case BlockedOnDelay:
+       removeThreadFromQueue(&sleeping_queue, tso);
+       goto done;
+#endif
+
+  default:
+      barf("removeFromQueues");
+  }
+
+ done:
+  tso->link = END_TSO_QUEUE;
+  tso->why_blocked = NotBlocked;
+  tso->block_info.closure = NULL;
+  appendToRunQueue(cap,tso);
+
+  // We might have just migrated this TSO to our Capability:
+  if (tso->bound) {
+      tso->bound->cap = cap;
+  }
+  tso->cap = cap;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
+ * raiseAsync()
+ *
+ * The following function implements the magic for raising an
+ * asynchronous exception in an existing thread.
+ *
+ * We first remove the thread from any queue on which it might be
+ * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
+ *
+ * We strip the stack down to the innermost CATCH_FRAME, building
+ * thunks in the heap for all the active computations, so they can 
+ * be restarted if necessary.  When we reach a CATCH_FRAME, we build
+ * an application of the handler to the exception, and push it on
+ * the top of the stack.
+ * 
+ * How exactly do we save all the active computations?  We create an
+ * AP_STACK for every UpdateFrame on the stack.  Entering one of these
+ * AP_STACKs pushes everything from the corresponding update frame
+ * upwards onto the stack.  (Actually, it pushes everything up to the
+ * next update frame plus a pointer to the next AP_STACK object.
+ * Entering the next AP_STACK object pushes more onto the stack until we
+ * reach the last AP_STACK object - at which point the stack should look
+ * exactly as it did when we killed the TSO and we can continue
+ * execution by entering the closure on top of the stack.
+ *
+ * We can also kill a thread entirely - this happens if either (a) the 
+ * exception passed to raiseAsync is NULL, or (b) there's no
+ * CATCH_FRAME on the stack.  In either case, we strip the entire
+ * stack and replace the thread with a zombie.
+ *
+ * ToDo: in THREADED_RTS mode, this function is only safe if either
+ * (a) we hold all the Capabilities (eg. in GC, or if there is only
+ * one Capability), or (b) we own the Capability that the TSO is
+ * currently blocked on or on the run queue of.
+ *
+ * -------------------------------------------------------------------------- */
+
+static void
+raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, 
+          rtsBool stop_at_atomically, StgPtr stop_here)
+{
+    StgRetInfoTable *info;
+    StgPtr sp, frame;
+    nat i;
+
+    debugTrace(DEBUG_sched,
+              "raising exception in thread %ld.", (long)tso->id);
+    
+    // mark it dirty; we're about to change its stack.
+    dirtyTSO(tso);
+
+    sp = tso->sp;
+    
+    // ASSUMES: the thread is not already complete or dead.  Upper
+    // layers should deal with that.
+    ASSERT(tso->what_next != ThreadComplete && tso->what_next != ThreadKilled);
+
+    // The stack freezing code assumes there's a closure pointer on
+    // the top of the stack, so we have to arrange that this is the case...
+    //
+    if (sp[0] == (W_)&stg_enter_info) {
+       sp++;
+    } else {
+       sp--;
+       sp[0] = (W_)&stg_dummy_ret_closure;
+    }
+
+    frame = sp + 1;
+    while (stop_here == NULL || frame < stop_here) {
+
+       // 1. Let the top of the stack be the "current closure"
+       //
+       // 2. Walk up the stack until we find either an UPDATE_FRAME or a
+       // CATCH_FRAME.
+       //
+       // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
+       // current closure applied to the chunk of stack up to (but not
+       // including) the update frame.  This closure becomes the "current
+       // closure".  Go back to step 2.
+       //
+       // 4. If it's a CATCH_FRAME, then leave the exception handler on
+       // top of the stack applied to the exception.
+       // 
+       // 5. If it's a STOP_FRAME, then kill the thread.
+        // 
+        // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
+        // transaction
+       
+       info = get_ret_itbl((StgClosure *)frame);
+
+       switch (info->i.type) {
+
+       case UPDATE_FRAME:
+       {
+           StgAP_STACK * ap;
+           nat words;
+           
+           // First build an AP_STACK consisting of the stack chunk above the
+           // current update frame, with the top word on the stack as the
+           // fun field.
+           //
+           words = frame - sp - 1;
+           ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
+           
+           ap->size = words;
+           ap->fun  = (StgClosure *)sp[0];
+           sp++;
+           for(i=0; i < (nat)words; ++i) {
+               ap->payload[i] = (StgClosure *)*sp++;
+           }
+           
+           SET_HDR(ap,&stg_AP_STACK_info,
+                   ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
+           TICK_ALLOC_UP_THK(words+1,0);
+           
+           //IF_DEBUG(scheduler,
+           //       debugBelch("sched: Updating ");
+           //       printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
+           //       debugBelch(" with ");
+           //       printObj((StgClosure *)ap);
+           //  );
+
+           // Replace the updatee with an indirection
+           //
+           // Warning: if we're in a loop, more than one update frame on
+           // the stack may point to the same object.  Be careful not to
+           // overwrite an IND_OLDGEN in this case, because we'll screw
+           // up the mutable lists.  To be on the safe side, don't
+           // overwrite any kind of indirection at all.  See also
+           // threadSqueezeStack in GC.c, where we have to make a similar
+           // check.
+           //
+           if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
+               // revert the black hole
+               UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
+                              (StgClosure *)ap);
+           }
+           sp += sizeofW(StgUpdateFrame) - 1;
+           sp[0] = (W_)ap; // push onto stack
+           frame = sp + 1;
+           continue; //no need to bump frame
+       }
+
+       case STOP_FRAME:
+           // We've stripped the entire stack, the thread is now dead.
+           tso->what_next = ThreadKilled;
+           tso->sp = frame + sizeofW(StgStopFrame);
+           return;
+
+       case CATCH_FRAME:
+           // If we find a CATCH_FRAME, and we've got an exception to raise,
+           // then build the THUNK raise(exception), and leave it on
+           // top of the CATCH_FRAME ready to enter.
+           //
+       {
+#ifdef PROFILING
+           StgCatchFrame *cf = (StgCatchFrame *)frame;
+#endif
+           StgThunk *raise;
+           
+           if (exception == NULL) break;
+
+           // we've got an exception to raise, so let's pass it to the
+           // handler in this frame.
+           //
+           raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+           TICK_ALLOC_SE_THK(1,0);
+           SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
+           raise->payload[0] = exception;
+           
+           // throw away the stack from Sp up to the CATCH_FRAME.
+           //
+           sp = frame - 1;
+           
+           /* Ensure that async excpetions are blocked now, so we don't get
+            * a surprise exception before we get around to executing the
+            * handler.
+            */
+           tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
+
+           /* Put the newly-built THUNK on top of the stack, ready to execute
+            * when the thread restarts.
+            */
+           sp[0] = (W_)raise;
+           sp[-1] = (W_)&stg_enter_info;
+           tso->sp = sp-1;
+           tso->what_next = ThreadRunGHC;
+           IF_DEBUG(sanity, checkTSO(tso));
+           return;
+       }
+           
+       case ATOMICALLY_FRAME:
+           if (stop_at_atomically) {
+               ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+               stmCondemnTransaction(cap, tso -> trec);
+#ifdef REG_R1
+               tso->sp = frame;
+#else
+               // R1 is not a register: the return convention for IO in
+               // this case puts the return value on the stack, so we
+               // need to set up the stack to return to the atomically
+               // frame properly...
+               tso->sp = frame - 2;
+               tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+               tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+               tso->what_next = ThreadRunGHC;
+               return;
+           }
+           // Not stop_at_atomically... fall through and abort the
+           // transaction.
+           
+       case CATCH_RETRY_FRAME:
+           // IF we find an ATOMICALLY_FRAME then we abort the
+           // current transaction and propagate the exception.  In
+           // this case (unlike ordinary exceptions) we do not care
+           // whether the transaction is valid or not because its
+           // possible validity cannot have caused the exception
+           // and will not be visible after the abort.
+           debugTrace(DEBUG_stm, 
+                      "found atomically block delivering async exception");
+
+            StgTRecHeader *trec = tso -> trec;
+            StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+            stmAbortTransaction(cap, trec);
+            tso -> trec = outer;
+           break;
+           
+       default:
+           break;
+       }
+
+       // move on to the next stack frame
+       frame += stack_frame_sizeW((StgClosure *)frame);
+    }
+
+    // if we got here, then we stopped at stop_here
+    ASSERT(stop_here != NULL);
+}
+
+
diff --git a/rts/RaiseAsync.h b/rts/RaiseAsync.h
new file mode 100644 (file)
index 0000000..8e59d51
--- /dev/null
@@ -0,0 +1,71 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 1998-2006
+ *
+ * Asynchronous exceptions
+ *
+ * --------------------------------------------------------------------------*/
+
+#ifndef RAISEASYNC_H
+#define RAISEASYNC_H
+
+#define THROWTO_SUCCESS   0
+#define THROWTO_BLOCKED   1
+
+#ifndef CMINUSMINUS
+void throwToSingleThreaded (Capability *cap,
+                           StgTSO *tso,
+                           StgClosure *exception);
+
+void throwToSingleThreaded_ (Capability *cap, 
+                            StgTSO *tso, 
+                            StgClosure *exception, 
+                            rtsBool stop_at_atomically,
+                            StgPtr stop_here);
+
+void suspendComputation (Capability *cap, 
+                        StgTSO *tso, 
+                        StgPtr stop_here);
+
+nat throwTo (Capability *cap,           // the Capability we hold 
+            StgTSO *source,             // the TSO sending the exception
+            StgTSO *target,             // the TSO receiving the exception
+            StgClosure *exception,      // the exception closure
+            /*[out]*/ void **out   // pass to throwToReleaseTarget()
+    );
+
+#ifdef THREADED_RTS
+void throwToReleaseTarget (void *tso);
+#endif
+
+void maybePerformBlockedException (Capability *cap, StgTSO *tso);
+void awakenBlockedExceptionQueue  (Capability *cap, StgTSO *tso);
+
+/* Determine whether a thread is interruptible (ie. blocked
+ * indefinitely).  Interruptible threads can be sent an exception with
+ * killThread# even if they have async exceptions blocked.
+ */
+STATIC_INLINE int
+interruptible(StgTSO *t)
+{
+  switch (t->why_blocked) {
+  case BlockedOnMVar:
+  case BlockedOnException:
+  case BlockedOnRead:
+  case BlockedOnWrite:
+#if defined(mingw32_HOST_OS)
+  case BlockedOnDoProc:
+#endif
+  case BlockedOnDelay:
+    return 1;
+  // NB. Threaded blocked on foreign calls (BlockedOnCCall) are
+  // *not* interruptible.  We can't send these threads an exception.
+  default:
+    return 0;
+  }
+}
+
+#endif /* CMINUSMINUS */
+
+#endif /* RAISEASYNC_H */
+
index d9adda4..11b9f87 100644 (file)
@@ -1,6 +1,6 @@
 /* ---------------------------------------------------------------------------
  *
- * (c) The GHC Team, 1998-2005
+ * (c) The GHC Team, 1998-2006
  *
  * The scheduler and thread-related functionality
  *
@@ -19,7 +19,6 @@
 #include "Schedule.h"
 #include "StgMiscClosures.h"
 #include "Interpreter.h"
-#include "Exception.h"
 #include "Printer.h"
 #include "RtsSignals.h"
 #include "Sanity.h"
@@ -51,6 +50,8 @@
 #include "win32/IOManager.h"
 #endif
 #include "Trace.h"
+#include "RaiseAsync.h"
+#include "Threads.h"
 
 #ifdef HAVE_SYS_TYPES_H
 #include <sys/types.h>
@@ -140,23 +141,6 @@ nat recent_activity = ACTIVITY_YES;
  */
 rtsBool sched_state = SCHED_RUNNING;
 
-/* Next thread ID to allocate.
- * LOCK: sched_mutex
- */
-static StgThreadID next_thread_id = 1;
-
-/* The smallest stack size that makes any sense is:
- *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
- *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
- *  + 1                       (the closure to enter)
- *  + 1                              (stg_ap_v_ret)
- *  + 1                              (spare slot req'd by stg_ap_v_ret)
- *
- * A thread with this stack will bomb immediately with a stack
- * overflow, which will increase its stack size.  
- */
-#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
-
 #if defined(GRAN)
 StgTSO *CurrentTSO;
 #endif
@@ -188,6 +172,10 @@ rtsTime TimeOfLastYield;
 rtsBool emitSchedule = rtsTrue;
 #endif
 
+#if !defined(mingw32_HOST_OS)
+#define FORKPROCESS_PRIMOP_SUPPORTED
+#endif
+
 /* -----------------------------------------------------------------------------
  * static function prototypes
  * -------------------------------------------------------------------------- */
@@ -233,22 +221,16 @@ static Capability *scheduleDoGC(Capability *cap, Task *task,
                                rtsBool force_major, 
                                void (*get_roots)(evac_fn));
 
-static void unblockThread(Capability *cap, StgTSO *tso);
 static rtsBool checkBlackHoles(Capability *cap);
 static void AllRoots(evac_fn evac);
 
 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
 
-static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
-                       rtsBool stop_at_atomically, StgPtr stop_here);
-
 static void deleteThread (Capability *cap, StgTSO *tso);
 static void deleteAllThreads (Capability *cap);
 
-#ifdef DEBUG
-static void printThreadBlockage(StgTSO *tso);
-static void printThreadStatus(StgTSO *tso);
-void printThreadQueue(StgTSO *tso);
+#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+static void deleteThread_(Capability *cap, StgTSO *tso);
 #endif
 
 #if defined(PARALLEL_HASKELL)
@@ -596,6 +578,9 @@ run_thread:
     startHeapProfTimer();
 #endif
 
+    // Check for exceptions blocked on this thread
+    maybePerformBlockedException (cap, t);
+
     // ----------------------------------------------------------------------
     // Run the current thread 
 
@@ -1019,7 +1004,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
            case BlockedOnBlackHole:
            case BlockedOnException:
            case BlockedOnMVar:
-               raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
+               throwToSingleThreaded(cap, task->tso, 
+                                     (StgClosure *)NonTermination_closure);
                return;
            default:
                barf("deadlock: main thread blocked in a strange way");
@@ -1644,7 +1630,7 @@ static void
 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
 {
     debugTrace (DEBUG_sched,
-               "--<< thread %ld (%s) stopped, StackOverflow\n", 
+               "--<< thread %ld (%s) stopped, StackOverflow", 
                (long)t->id, whatNext_strs[t->what_next]);
 
     /* just adjust the stack for this thread, then pop it back
@@ -1687,11 +1673,11 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
 #ifdef DEBUG
     if (t->what_next != prev_what_next) {
        debugTrace(DEBUG_sched,
-                  "--<< thread %ld (%s) stopped to switch evaluators\n", 
+                  "--<< thread %ld (%s) stopped to switch evaluators", 
                   (long)t->id, whatNext_strs[t->what_next]);
     } else {
        debugTrace(DEBUG_sched,
-                  "--<< thread %ld (%s) stopped, yielding\n",
+                  "--<< thread %ld (%s) stopped, yielding",
                   (long)t->id, whatNext_strs[t->what_next]);
     }
 #endif
@@ -2024,6 +2010,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
                next = t->link;
            } else {
                next = t->global_link;
+               
+               // This is a good place to check for blocked
+               // exceptions.  It might be the case that a thread is
+               // blocked on delivering an exception to a thread that
+               // is also blocked - we try to ensure that this
+               // doesn't happen in throwTo(), but it's too hard (or
+               // impossible) to close all the race holes, so we
+               // accept that some might get through and deal with
+               // them here.  A GC will always happen at some point,
+               // even if the system is otherwise deadlocked.
+               maybePerformBlockedException (&capabilities[0], t);
+
                if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
                    if (!stmValidateNestOfTransactions (t -> trec)) {
                        debugTrace(DEBUG_sched | DEBUG_stm,
@@ -2033,7 +2031,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
                        // ATOMICALLY_FRAME, aborting the (nested)
                        // transaction, and saving the stack of any
                        // partially-evaluated thunks on the heap.
-                       raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
+                       throwToSingleThreaded_(&capabilities[0], t, 
+                                              NULL, rtsTrue, NULL);
                        
 #ifdef REG_R1
                        ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
@@ -2099,45 +2098,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
 }
 
 /* ---------------------------------------------------------------------------
- * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
- * used by Control.Concurrent for error checking.
- * ------------------------------------------------------------------------- */
-StgBool
-rtsSupportsBoundThreads(void)
-{
-#if defined(THREADED_RTS)
-  return rtsTrue;
-#else
-  return rtsFalse;
-#endif
-}
-
-/* ---------------------------------------------------------------------------
- * isThreadBound(tso): check whether tso is bound to an OS thread.
- * ------------------------------------------------------------------------- */
-StgBool
-isThreadBound(StgTSO* tso USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
-  return (tso->bound != NULL);
-#endif
-  return rtsFalse;
-}
-
-/* ---------------------------------------------------------------------------
  * Singleton fork(). Do not copy any running threads.
  * ------------------------------------------------------------------------- */
 
-#if !defined(mingw32_HOST_OS)
-#define FORKPROCESS_PRIMOP_SUPPORTED
-#endif
-
-#ifdef FORKPROCESS_PRIMOP_SUPPORTED
-static void 
-deleteThread_(Capability *cap, StgTSO *tso);
-#endif
 StgInt
 forkProcess(HsStablePtr *entry
 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
@@ -2243,26 +2206,28 @@ forkProcess(HsStablePtr *entry
 static void
 deleteAllThreads ( Capability *cap )
 {
-  StgTSO* t, *next;
-  debugTrace(DEBUG_sched,"deleting all threads");
-  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
-      if (t->what_next == ThreadRelocated) {
-         next = t->link;
-      } else {
-         next = t->global_link;
-         deleteThread(cap,t);
-      }
-  }      
+    // NOTE: only safe to call if we own all capabilities.
+
+    StgTSO* t, *next;
+    debugTrace(DEBUG_sched,"deleting all threads");
+    for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+       if (t->what_next == ThreadRelocated) {
+           next = t->link;
+       } else {
+           next = t->global_link;
+           deleteThread(cap,t);
+       }
+    }      
 
-  // The run queue now contains a bunch of ThreadKilled threads.  We
-  // must not throw these away: the main thread(s) will be in there
-  // somewhere, and the main scheduler loop has to deal with it.
-  // Also, the run queue is the only thing keeping these threads from
-  // being GC'd, and we don't want the "main thread has been GC'd" panic.
+    // The run queue now contains a bunch of ThreadKilled threads.  We
+    // must not throw these away: the main thread(s) will be in there
+    // somewhere, and the main scheduler loop has to deal with it.
+    // Also, the run queue is the only thing keeping these threads from
+    // being GC'd, and we don't want the "main thread has been GC'd" panic.
 
 #if !defined(THREADED_RTS)
-  ASSERT(blocked_queue_hd == END_TSO_QUEUE);
-  ASSERT(sleeping_queue == END_TSO_QUEUE);
+    ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+    ASSERT(sleeping_queue == END_TSO_QUEUE);
 #endif
 }
 
@@ -2337,9 +2302,10 @@ suspendThread (StgRegTable *reg)
 
   threadPaused(cap,tso);
 
-  if(tso->blocked_exceptions == NULL)  {
+  if ((tso->flags & TSO_BLOCKEX) == 0)  {
       tso->why_blocked = BlockedOnCCall;
-      tso->blocked_exceptions = END_TSO_QUEUE;
+      tso->flags |= TSO_BLOCKEX;
+      tso->flags &= ~TSO_INTERRUPTIBLE;
   } else {
       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
   }
@@ -2390,8 +2356,8 @@ resumeThread (void *task_)
     debugTrace(DEBUG_sched, "thread %d: re-entering RTS", tso->id);
     
     if (tso->why_blocked == BlockedOnCCall) {
-       awakenBlockedQueue(cap,tso->blocked_exceptions);
-       tso->blocked_exceptions = NULL;
+       awakenBlockedExceptionQueue(cap,tso);
+       tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
     }
     
     /* Reset blocking status */
@@ -2410,300 +2376,6 @@ resumeThread (void *task_)
 }
 
 /* ---------------------------------------------------------------------------
- * Comparing Thread ids.
- *
- * This is used from STG land in the implementation of the
- * instances of Eq/Ord for ThreadIds.
- * ------------------------------------------------------------------------ */
-
-int
-cmp_thread(StgPtr tso1, StgPtr tso2) 
-{ 
-  StgThreadID id1 = ((StgTSO *)tso1)->id; 
-  StgThreadID id2 = ((StgTSO *)tso2)->id;
-  if (id1 < id2) return (-1);
-  if (id1 > id2) return 1;
-  return 0;
-}
-
-/* ---------------------------------------------------------------------------
- * Fetching the ThreadID from an StgTSO.
- *
- * This is used in the implementation of Show for ThreadIds.
- * ------------------------------------------------------------------------ */
-int
-rts_getThreadId(StgPtr tso) 
-{
-  return ((StgTSO *)tso)->id;
-}
-
-#ifdef DEBUG
-void
-labelThread(StgPtr tso, char *label)
-{
-  int len;
-  void *buf;
-
-  /* Caveat: Once set, you can only set the thread name to "" */
-  len = strlen(label)+1;
-  buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
-  strncpy(buf,label,len);
-  /* Update will free the old memory for us */
-  updateThreadLabel(((StgTSO *)tso)->id,buf);
-}
-#endif /* DEBUG */
-
-/* ---------------------------------------------------------------------------
-   Create a new thread.
-
-   The new thread starts with the given stack size.  Before the
-   scheduler can run, however, this thread needs to have a closure
-   (and possibly some arguments) pushed on its stack.  See
-   pushClosure() in Schedule.h.
-
-   createGenThread() and createIOThread() (in SchedAPI.h) are
-   convenient packaged versions of this function.
-
-   currently pri (priority) is only used in a GRAN setup -- HWL
-   ------------------------------------------------------------------------ */
-#if defined(GRAN)
-/*   currently pri (priority) is only used in a GRAN setup -- HWL */
-StgTSO *
-createThread(nat size, StgInt pri)
-#else
-StgTSO *
-createThread(Capability *cap, nat size)
-#endif
-{
-    StgTSO *tso;
-    nat stack_size;
-
-    /* sched_mutex is *not* required */
-
-    /* First check whether we should create a thread at all */
-#if defined(PARALLEL_HASKELL)
-    /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
-    if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
-       threadsIgnored++;
-       debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
-                  RtsFlags.ParFlags.maxThreads, advisory_thread_count);
-       return END_TSO_QUEUE;
-    }
-    threadsCreated++;
-#endif
-
-#if defined(GRAN)
-    ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
-#endif
-
-    // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
-
-    /* catch ridiculously small stack sizes */
-    if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
-       size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
-    }
-
-    stack_size = size - TSO_STRUCT_SIZEW;
-    
-    tso = (StgTSO *)allocateLocal(cap, size);
-    TICK_ALLOC_TSO(stack_size, 0);
-
-    SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
-#if defined(GRAN)
-    SET_GRAN_HDR(tso, ThisPE);
-#endif
-
-    // Always start with the compiled code evaluator
-    tso->what_next = ThreadRunGHC;
-
-    tso->why_blocked  = NotBlocked;
-    tso->blocked_exceptions = NULL;
-    tso->flags = TSO_DIRTY;
-    
-    tso->saved_errno = 0;
-    tso->bound = NULL;
-    tso->cap = cap;
-    
-    tso->stack_size     = stack_size;
-    tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
-                         - TSO_STRUCT_SIZEW;
-    tso->sp             = (P_)&(tso->stack) + stack_size;
-
-    tso->trec = NO_TREC;
-    
-#ifdef PROFILING
-    tso->prof.CCCS = CCS_MAIN;
-#endif
-    
-  /* put a stop frame on the stack */
-    tso->sp -= sizeofW(StgStopFrame);
-    SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
-    tso->link = END_TSO_QUEUE;
-    
-  // ToDo: check this
-#if defined(GRAN)
-    /* uses more flexible routine in GranSim */
-    insertThread(tso, CurrentProc);
-#else
-    /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
-     * from its creation
-     */
-#endif
-    
-#if defined(GRAN) 
-    if (RtsFlags.GranFlags.GranSimStats.Full) 
-       DumpGranEvent(GR_START,tso);
-#elif defined(PARALLEL_HASKELL)
-    if (RtsFlags.ParFlags.ParStats.Full) 
-       DumpGranEvent(GR_STARTQ,tso);
-    /* HACk to avoid SCHEDULE 
-       LastTSO = tso; */
-#endif
-    
-    /* Link the new thread on the global thread list.
-     */
-    ACQUIRE_LOCK(&sched_mutex);
-    tso->id = next_thread_id++;  // while we have the mutex
-    tso->global_link = all_threads;
-    all_threads = tso;
-    RELEASE_LOCK(&sched_mutex);
-    
-#if defined(DIST)
-    tso->dist.priority = MandatoryPriority; //by default that is...
-#endif
-    
-#if defined(GRAN)
-    tso->gran.pri = pri;
-# if defined(DEBUG)
-    tso->gran.magic = TSO_MAGIC; // debugging only
-# endif
-    tso->gran.sparkname   = 0;
-    tso->gran.startedat   = CURRENT_TIME; 
-    tso->gran.exported    = 0;
-    tso->gran.basicblocks = 0;
-    tso->gran.allocs      = 0;
-    tso->gran.exectime    = 0;
-    tso->gran.fetchtime   = 0;
-    tso->gran.fetchcount  = 0;
-    tso->gran.blocktime   = 0;
-    tso->gran.blockcount  = 0;
-    tso->gran.blockedat   = 0;
-    tso->gran.globalsparks = 0;
-    tso->gran.localsparks  = 0;
-    if (RtsFlags.GranFlags.Light)
-       tso->gran.clock  = Now; /* local clock */
-    else
-       tso->gran.clock  = 0;
-    
-    IF_DEBUG(gran,printTSO(tso));
-#elif defined(PARALLEL_HASKELL)
-# if defined(DEBUG)
-    tso->par.magic = TSO_MAGIC; // debugging only
-# endif
-    tso->par.sparkname   = 0;
-    tso->par.startedat   = CURRENT_TIME; 
-    tso->par.exported    = 0;
-    tso->par.basicblocks = 0;
-    tso->par.allocs      = 0;
-    tso->par.exectime    = 0;
-    tso->par.fetchtime   = 0;
-    tso->par.fetchcount  = 0;
-    tso->par.blocktime   = 0;
-    tso->par.blockcount  = 0;
-    tso->par.blockedat   = 0;
-    tso->par.globalsparks = 0;
-    tso->par.localsparks  = 0;
-#endif
-    
-#if defined(GRAN)
-    globalGranStats.tot_threads_created++;
-    globalGranStats.threads_created_on_PE[CurrentProc]++;
-    globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
-    globalGranStats.tot_sq_probes++;
-#elif defined(PARALLEL_HASKELL)
-    // collect parallel global statistics (currently done together with GC stats)
-    if (RtsFlags.ParFlags.ParStats.Global &&
-       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
-       //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
-       globalParStats.tot_threads_created++;
-    }
-#endif 
-    
-#if defined(GRAN)
-    debugTrace(GRAN_DEBUG_pri,
-              "==__ schedule: Created TSO %d (%p);",
-              CurrentProc, tso, tso->id);
-#elif defined(PARALLEL_HASKELL)
-    debugTrace(PAR_DEBUG_verbose,
-              "==__ schedule: Created TSO %d (%p); %d threads active",
-              (long)tso->id, tso, advisory_thread_count);
-#else
-    debugTrace(DEBUG_sched,
-              "created thread %ld, stack size = %lx words", 
-              (long)tso->id, (long)tso->stack_size);
-#endif    
-    return tso;
-}
-
-#if defined(PAR)
-/* RFP:
-   all parallel thread creation calls should fall through the following routine.
-*/
-StgTSO *
-createThreadFromSpark(rtsSpark spark) 
-{ StgTSO *tso;
-  ASSERT(spark != (rtsSpark)NULL);
-// JB: TAKE CARE OF THIS COUNTER! BUGGY
-  if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
-  { threadsIgnored++;
-    barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
-         RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
-    return END_TSO_QUEUE;
-  }
-  else
-  { threadsCreated++;
-    tso = createThread(RtsFlags.GcFlags.initialStkSize);
-    if (tso==END_TSO_QUEUE)    
-      barf("createSparkThread: Cannot create TSO");
-#if defined(DIST)
-    tso->priority = AdvisoryPriority;
-#endif
-    pushClosure(tso,spark);
-    addToRunQueue(tso);
-    advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
-  }
-  return tso;
-}
-#endif
-
-/*
-  Turn a spark into a thread.
-  ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
-*/
-#if 0
-StgTSO *
-activateSpark (rtsSpark spark) 
-{
-  StgTSO *tso;
-
-  tso = createSparkThread(spark);
-  if (RtsFlags.ParFlags.ParStats.Full) {   
-    //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
-      IF_PAR_DEBUG(verbose,
-                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
-                             (StgClosure *)spark, info_type((StgClosure *)spark)));
-  }
-  // ToDo: fwd info on local/global spark to thread -- HWL
-  // tso->gran.exported =  spark->exported;
-  // tso->gran.locked =   !spark->global;
-  // tso->gran.sparkname = spark->name;
-
-  return tso;
-}
-#endif
-
-/* ---------------------------------------------------------------------------
  * scheduleThread()
  *
  * scheduleThread puts a thread on the end  of the runnable queue.
@@ -2731,12 +2403,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
-       Capability *target_cap = &capabilities[cpu];
-       if (tso->bound) {
-           tso->bound->cap = target_cap;
-       }
-       tso->cap = target_cap;
-       wakeupThreadOnCapability(target_cap,tso);
+       migrateThreadToCapability_lock(&capabilities[cpu],tso);
     }
 #else
     appendToRunQueue(cap,tso);
@@ -3072,19 +2739,25 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   StgTSO *dest;
 
   IF_DEBUG(sanity,checkTSO(tso));
+
+  // don't allow throwTo() to modify the blocked_exceptions queue
+  // while we are moving the TSO:
+  lockClosure((StgClosure *)tso);
+
   if (tso->stack_size >= tso->max_stack_size) {
 
       debugTrace(DEBUG_gc,
-                "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
+                "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
                 (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
       IF_DEBUG(gc,
               /* If we're debugging, just print out the top of the stack */
               printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
                                                tso->sp+64)));
 
-    /* Send this thread the StackOverflow exception */
-    raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
-    return tso;
+      // Send this thread the StackOverflow exception
+      unlockTSO(tso);
+      throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
+      return tso;
   }
 
   /* Try to double the current stack size.  If that takes us over the
@@ -3098,7 +2771,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
 
   debugTrace(DEBUG_sched, 
-            "increasing stack size from %ld words to %d.\n",
+            "increasing stack size from %ld words to %d.",
             (long)tso->stack_size, new_stack_size);
 
   dest = (StgTSO *)allocate(new_tso_size);
@@ -3133,7 +2806,10 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
               printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
                                                tso->sp+64)));
   
-  IF_DEBUG(sanity,checkTSO(tso));
+  unlockTSO(dest);
+  unlockTSO(tso);
+
+  IF_DEBUG(sanity,checkTSO(dest));
 #if 0
   IF_DEBUG(scheduler,printTSO(dest));
 #endif
@@ -3142,301 +2818,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
 }
 
 /* ---------------------------------------------------------------------------
-   Wake up a queue that was blocked on some resource.
-   ------------------------------------------------------------------------ */
-
-#if defined(GRAN)
-STATIC_INLINE void
-unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
-{
-}
-#elif defined(PARALLEL_HASKELL)
-STATIC_INLINE void
-unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
-{
-  /* write RESUME events to log file and
-     update blocked and fetch time (depending on type of the orig closure) */
-  if (RtsFlags.ParFlags.ParStats.Full) {
-    DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
-                    GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
-                    0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-    if (emptyRunQueue())
-      emitSchedule = rtsTrue;
-
-    switch (get_itbl(node)->type) {
-       case FETCH_ME_BQ:
-         ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
-         break;
-       case RBH:
-       case FETCH_ME:
-       case BLACKHOLE_BQ:
-         ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
-         break;
-#ifdef DIST
-        case MVAR:
-          break;
-#endif   
-       default:
-         barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
-       }
-      }
-}
-#endif
-
-#if defined(GRAN)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
-    StgTSO *tso;
-    PEs node_loc, tso_loc;
-
-    node_loc = where_is(node); // should be lifted out of loop
-    tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
-    tso_loc = where_is((StgClosure *)tso);
-    if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
-      /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
-      ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
-      CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
-      // insertThread(tso, node_loc);
-      new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
-               ResumeThread,
-               tso, node, (rtsSpark*)NULL);
-      tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
-      // len_local++;
-      // len++;
-    } else { // TSO is remote (actually should be FMBQ)
-      CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
-                                  RtsFlags.GranFlags.Costs.gunblocktime +
-                                 RtsFlags.GranFlags.Costs.latency;
-      new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
-               UnblockThread,
-               tso, node, (rtsSpark*)NULL);
-      tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
-      // len++;
-    }
-    /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
-    IF_GRAN_DEBUG(bq,
-                 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
-                         (node_loc==tso_loc ? "Local" : "Global"), 
-                         tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
-    tso->block_info.closure = NULL;
-    debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)\n", 
-              tso->id, tso));
-}
-#elif defined(PARALLEL_HASKELL)
-StgBlockingQueueElement *
-unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
-{
-    StgBlockingQueueElement *next;
-
-    switch (get_itbl(bqe)->type) {
-    case TSO:
-      ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
-      /* if it's a TSO just push it onto the run_queue */
-      next = bqe->link;
-      ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
-      APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
-      threadRunnable();
-      unblockCount(bqe, node);
-      /* reset blocking status after dumping event */
-      ((StgTSO *)bqe)->why_blocked = NotBlocked;
-      break;
-
-    case BLOCKED_FETCH:
-      /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
-      next = bqe->link;
-      bqe->link = (StgBlockingQueueElement *)PendingFetches;
-      PendingFetches = (StgBlockedFetch *)bqe;
-      break;
-
-# if defined(DEBUG)
-      /* can ignore this case in a non-debugging setup; 
-        see comments on RBHSave closures above */
-    case CONSTR:
-      /* check that the closure is an RBHSave closure */
-      ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
-            get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
-            get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
-      break;
-
-    default:
-      barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
-          get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
-          (StgClosure *)bqe);
-# endif
-    }
-  IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
-  return next;
-}
-#endif
-
-StgTSO *
-unblockOne(Capability *cap, StgTSO *tso)
-{
-  StgTSO *next;
-
-  ASSERT(get_itbl(tso)->type == TSO);
-  ASSERT(tso->why_blocked != NotBlocked);
-
-  tso->why_blocked = NotBlocked;
-  next = tso->link;
-  tso->link = END_TSO_QUEUE;
-
-#if defined(THREADED_RTS)
-  if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
-      // We are waking up this thread on the current Capability, which
-      // might involve migrating it from the Capability it was last on.
-      if (tso->bound) {
-         ASSERT(tso->bound->cap == tso->cap);
-         tso->bound->cap = cap;
-      }
-      tso->cap = cap;
-      appendToRunQueue(cap,tso);
-      // we're holding a newly woken thread, make sure we context switch
-      // quickly so we can migrate it if necessary.
-      context_switch = 1;
-  } else {
-      // we'll try to wake it up on the Capability it was last on.
-      wakeupThreadOnCapability(tso->cap, tso);
-  }
-#else
-  appendToRunQueue(cap,tso);
-  context_switch = 1;
-#endif
-
-  debugTrace(DEBUG_sched,
-            "waking up thread %ld on cap %d",
-            (long)tso->id, tso->cap->no);
-
-  return next;
-}
-
-
-#if defined(GRAN)
-void 
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
-{
-  StgBlockingQueueElement *bqe;
-  PEs node_loc;
-  nat len = 0; 
-
-  IF_GRAN_DEBUG(bq, 
-               debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
-                     node, CurrentProc, CurrentTime[CurrentProc], 
-                     CurrentTSO->id, CurrentTSO));
-
-  node_loc = where_is(node);
-
-  ASSERT(q == END_BQ_QUEUE ||
-        get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
-        get_itbl(q)->type == CONSTR); // closure (type constructor)
-  ASSERT(is_unique(node));
-
-  /* FAKE FETCH: magically copy the node to the tso's proc;
-     no Fetch necessary because in reality the node should not have been 
-     moved to the other PE in the first place
-  */
-  if (CurrentProc!=node_loc) {
-    IF_GRAN_DEBUG(bq, 
-                 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
-                       node, node_loc, CurrentProc, CurrentTSO->id, 
-                       // CurrentTSO, where_is(CurrentTSO),
-                       node->header.gran.procs));
-    node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
-    IF_GRAN_DEBUG(bq, 
-                 debugBelch("## new bitmask of node %p is %#x\n",
-                       node, node->header.gran.procs));
-    if (RtsFlags.GranFlags.GranSimStats.Global) {
-      globalGranStats.tot_fake_fetches++;
-    }
-  }
-
-  bqe = q;
-  // ToDo: check: ASSERT(CurrentProc==node_loc);
-  while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
-    //next = bqe->link;
-    /* 
-       bqe points to the current element in the queue
-       next points to the next element in the queue
-    */
-    //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
-    //tso_loc = where_is(tso);
-    len++;
-    bqe = unblockOne(bqe, node);
-  }
-
-  /* if this is the BQ of an RBH, we have to put back the info ripped out of
-     the closure to make room for the anchor of the BQ */
-  if (bqe!=END_BQ_QUEUE) {
-    ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
-    /*
-    ASSERT((info_ptr==&RBH_Save_0_info) ||
-          (info_ptr==&RBH_Save_1_info) ||
-          (info_ptr==&RBH_Save_2_info));
-    */
-    /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
-    ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
-    ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
-
-    IF_GRAN_DEBUG(bq,
-                 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
-                       node, info_type(node)));
-  }
-
-  /* statistics gathering */
-  if (RtsFlags.GranFlags.GranSimStats.Global) {
-    // globalGranStats.tot_bq_processing_time += bq_processing_time;
-    globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
-    // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
-    globalGranStats.tot_awbq++;             // total no. of bqs awakened
-  }
-  IF_GRAN_DEBUG(bq,
-               debugBelch("## BQ Stats of %p: [%d entries] %s\n",
-                       node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
-}
-#elif defined(PARALLEL_HASKELL)
-void 
-awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
-{
-  StgBlockingQueueElement *bqe;
-
-  IF_PAR_DEBUG(verbose, 
-              debugBelch("##-_ AwBQ for node %p on [%x]: \n",
-                    node, mytid));
-#ifdef DIST  
-  //RFP
-  if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
-    IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
-    return;
-  }
-#endif
-  
-  ASSERT(q == END_BQ_QUEUE ||
-        get_itbl(q)->type == TSO ||           
-        get_itbl(q)->type == BLOCKED_FETCH || 
-        get_itbl(q)->type == CONSTR); 
-
-  bqe = q;
-  while (get_itbl(bqe)->type==TSO || 
-        get_itbl(bqe)->type==BLOCKED_FETCH) {
-    bqe = unblockOne(bqe, node);
-  }
-}
-
-#else   /* !GRAN && !PARALLEL_HASKELL */
-
-void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
-{
-    if (tso == NULL) return; // hack; see bug #1235728, and comments in
-                            // Exception.cmm
-    while (tso != END_TSO_QUEUE) {
-       tso = unblockOne(cap,tso);
-    }
-}
-#endif
-
-/* ---------------------------------------------------------------------------
    Interrupt execution
    - usually called inside a signal handler so it mustn't do anything fancy.   
    ------------------------------------------------------------------------ */
@@ -3481,316 +2862,6 @@ wakeUpRts(void)
 }
 
 /* -----------------------------------------------------------------------------
-   Unblock a thread
-
-   This is for use when we raise an exception in another thread, which
-   may be blocked.
-   This has nothing to do with the UnblockThread event in GranSim. -- HWL
-   -------------------------------------------------------------------------- */
-
-#if defined(GRAN) || defined(PARALLEL_HASKELL)
-/*
-  NB: only the type of the blocking queue is different in GranSim and GUM
-      the operations on the queue-elements are the same
-      long live polymorphism!
-
-  Locks: sched_mutex is held upon entry and exit.
-
-*/
-static void
-unblockThread(Capability *cap, StgTSO *tso)
-{
-  StgBlockingQueueElement *t, **last;
-
-  switch (tso->why_blocked) {
-
-  case NotBlocked:
-    return;  /* not blocked */
-
-  case BlockedOnSTM:
-    // Be careful: nothing to do here!  We tell the scheduler that the thread
-    // is runnable and we leave it to the stack-walking code to abort the 
-    // transaction while unwinding the stack.  We should perhaps have a debugging
-    // test to make sure that this really happens and that the 'zombie' transaction
-    // does not get committed.
-    goto done;
-
-  case BlockedOnMVar:
-    ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
-    {
-      StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
-      StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
-      last = (StgBlockingQueueElement **)&mvar->head;
-      for (t = (StgBlockingQueueElement *)mvar->head; 
-          t != END_BQ_QUEUE; 
-          last = &t->link, last_tso = t, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         *last = (StgBlockingQueueElement *)tso->link;
-         if (mvar->tail == tso) {
-           mvar->tail = (StgTSO *)last_tso;
-         }
-         goto done;
-       }
-      }
-      barf("unblockThread (MVAR): TSO not found");
-    }
-
-  case BlockedOnBlackHole:
-    ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
-    {
-      StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
-      last = &bq->blocking_queue;
-      for (t = bq->blocking_queue; 
-          t != END_BQ_QUEUE; 
-          last = &t->link, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         *last = (StgBlockingQueueElement *)tso->link;
-         goto done;
-       }
-      }
-      barf("unblockThread (BLACKHOLE): TSO not found");
-    }
-
-  case BlockedOnException:
-    {
-      StgTSO *target  = tso->block_info.tso;
-
-      ASSERT(get_itbl(target)->type == TSO);
-
-      if (target->what_next == ThreadRelocated) {
-         target = target->link;
-         ASSERT(get_itbl(target)->type == TSO);
-      }
-
-      ASSERT(target->blocked_exceptions != NULL);
-
-      last = (StgBlockingQueueElement **)&target->blocked_exceptions;
-      for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
-          t != END_BQ_QUEUE; 
-          last = &t->link, t = t->link) {
-       ASSERT(get_itbl(t)->type == TSO);
-       if (t == (StgBlockingQueueElement *)tso) {
-         *last = (StgBlockingQueueElement *)tso->link;
-         goto done;
-       }
-      }
-      barf("unblockThread (Exception): TSO not found");
-    }
-
-  case BlockedOnRead:
-  case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
-  case BlockedOnDoProc:
-#endif
-    {
-      /* take TSO off blocked_queue */
-      StgBlockingQueueElement *prev = NULL;
-      for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
-          prev = t, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         if (prev == NULL) {
-           blocked_queue_hd = (StgTSO *)t->link;
-           if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
-             blocked_queue_tl = END_TSO_QUEUE;
-           }
-         } else {
-           prev->link = t->link;
-           if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
-             blocked_queue_tl = (StgTSO *)prev;
-           }
-         }
-#if defined(mingw32_HOST_OS)
-         /* (Cooperatively) signal that the worker thread should abort
-          * the request.
-          */
-         abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
-         goto done;
-       }
-      }
-      barf("unblockThread (I/O): TSO not found");
-    }
-
-  case BlockedOnDelay:
-    {
-      /* take TSO off sleeping_queue */
-      StgBlockingQueueElement *prev = NULL;
-      for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
-          prev = t, t = t->link) {
-       if (t == (StgBlockingQueueElement *)tso) {
-         if (prev == NULL) {
-           sleeping_queue = (StgTSO *)t->link;
-         } else {
-           prev->link = t->link;
-         }
-         goto done;
-       }
-      }
-      barf("unblockThread (delay): TSO not found");
-    }
-
-  default:
-    barf("unblockThread");
-  }
-
- done:
-  tso->link = END_TSO_QUEUE;
-  tso->why_blocked = NotBlocked;
-  tso->block_info.closure = NULL;
-  pushOnRunQueue(cap,tso);
-}
-#else
-static void
-unblockThread(Capability *cap, StgTSO *tso)
-{
-  StgTSO *t, **last;
-  
-  /* To avoid locking unnecessarily. */
-  if (tso->why_blocked == NotBlocked) {
-    return;
-  }
-
-  switch (tso->why_blocked) {
-
-  case BlockedOnSTM:
-    // Be careful: nothing to do here!  We tell the scheduler that the thread
-    // is runnable and we leave it to the stack-walking code to abort the 
-    // transaction while unwinding the stack.  We should perhaps have a debugging
-    // test to make sure that this really happens and that the 'zombie' transaction
-    // does not get committed.
-    goto done;
-
-  case BlockedOnMVar:
-    ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
-    {
-      StgTSO *last_tso = END_TSO_QUEUE;
-      StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
-
-      last = &mvar->head;
-      for (t = mvar->head; t != END_TSO_QUEUE; 
-          last = &t->link, last_tso = t, t = t->link) {
-       if (t == tso) {
-         *last = tso->link;
-         if (mvar->tail == tso) {
-           mvar->tail = last_tso;
-         }
-         goto done;
-       }
-      }
-      barf("unblockThread (MVAR): TSO not found");
-    }
-
-  case BlockedOnBlackHole:
-    {
-      last = &blackhole_queue;
-      for (t = blackhole_queue; t != END_TSO_QUEUE; 
-          last = &t->link, t = t->link) {
-       if (t == tso) {
-         *last = tso->link;
-         goto done;
-       }
-      }
-      barf("unblockThread (BLACKHOLE): TSO not found");
-    }
-
-  case BlockedOnException:
-    {
-      StgTSO *target  = tso->block_info.tso;
-
-      ASSERT(get_itbl(target)->type == TSO);
-
-      while (target->what_next == ThreadRelocated) {
-         target = target->link;
-         ASSERT(get_itbl(target)->type == TSO);
-      }
-      
-      ASSERT(target->blocked_exceptions != NULL);
-
-      last = &target->blocked_exceptions;
-      for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
-          last = &t->link, t = t->link) {
-       ASSERT(get_itbl(t)->type == TSO);
-       if (t == tso) {
-         *last = tso->link;
-         goto done;
-       }
-      }
-      barf("unblockThread (Exception): TSO not found");
-    }
-
-#if !defined(THREADED_RTS)
-  case BlockedOnRead:
-  case BlockedOnWrite:
-#if defined(mingw32_HOST_OS)
-  case BlockedOnDoProc:
-#endif
-    {
-      StgTSO *prev = NULL;
-      for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
-          prev = t, t = t->link) {
-       if (t == tso) {
-         if (prev == NULL) {
-           blocked_queue_hd = t->link;
-           if (blocked_queue_tl == t) {
-             blocked_queue_tl = END_TSO_QUEUE;
-           }
-         } else {
-           prev->link = t->link;
-           if (blocked_queue_tl == t) {
-             blocked_queue_tl = prev;
-           }
-         }
-#if defined(mingw32_HOST_OS)
-         /* (Cooperatively) signal that the worker thread should abort
-          * the request.
-          */
-         abandonWorkRequest(tso->block_info.async_result->reqID);
-#endif
-         goto done;
-       }
-      }
-      barf("unblockThread (I/O): TSO not found");
-    }
-
-  case BlockedOnDelay:
-    {
-      StgTSO *prev = NULL;
-      for (t = sleeping_queue; t != END_TSO_QUEUE; 
-          prev = t, t = t->link) {
-       if (t == tso) {
-         if (prev == NULL) {
-           sleeping_queue = t->link;
-         } else {
-           prev->link = t->link;
-         }
-         goto done;
-       }
-      }
-      barf("unblockThread (delay): TSO not found");
-    }
-#endif
-
-  default:
-    barf("unblockThread");
-  }
-
- done:
-  tso->link = END_TSO_QUEUE;
-  tso->why_blocked = NotBlocked;
-  tso->block_info.closure = NULL;
-  appendToRunQueue(cap,tso);
-
-  // We might have just migrated this TSO to our Capability:
-  if (tso->bound) {
-      tso->bound->cap = cap;
-  }
-  tso->cap = cap;
-}
-#endif
-
-/* -----------------------------------------------------------------------------
  * checkBlackHoles()
  *
  * Check the blackhole_queue for threads that can be woken up.  We do
@@ -3840,264 +2911,6 @@ checkBlackHoles (Capability *cap)
 }
 
 /* -----------------------------------------------------------------------------
- * raiseAsync()
- *
- * The following function implements the magic for raising an
- * asynchronous exception in an existing thread.
- *
- * We first remove the thread from any queue on which it might be
- * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
- *
- * We strip the stack down to the innermost CATCH_FRAME, building
- * thunks in the heap for all the active computations, so they can 
- * be restarted if necessary.  When we reach a CATCH_FRAME, we build
- * an application of the handler to the exception, and push it on
- * the top of the stack.
- * 
- * How exactly do we save all the active computations?  We create an
- * AP_STACK for every UpdateFrame on the stack.  Entering one of these
- * AP_STACKs pushes everything from the corresponding update frame
- * upwards onto the stack.  (Actually, it pushes everything up to the
- * next update frame plus a pointer to the next AP_STACK object.
- * Entering the next AP_STACK object pushes more onto the stack until we
- * reach the last AP_STACK object - at which point the stack should look
- * exactly as it did when we killed the TSO and we can continue
- * execution by entering the closure on top of the stack.
- *
- * We can also kill a thread entirely - this happens if either (a) the 
- * exception passed to raiseAsync is NULL, or (b) there's no
- * CATCH_FRAME on the stack.  In either case, we strip the entire
- * stack and replace the thread with a zombie.
- *
- * ToDo: in THREADED_RTS mode, this function is only safe if either
- * (a) we hold all the Capabilities (eg. in GC, or if there is only
- * one Capability), or (b) we own the Capability that the TSO is
- * currently blocked on or on the run queue of.
- *
- * -------------------------------------------------------------------------- */
-void
-raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
-{
-    raiseAsync_(cap, tso, exception, rtsFalse, NULL);
-}
-
-void
-suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
-{
-    raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
-}
-
-static void
-raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
-           rtsBool stop_at_atomically, StgPtr stop_here)
-{
-    StgRetInfoTable *info;
-    StgPtr sp, frame;
-    nat i;
-  
-    // Thread already dead?
-    if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
-       return;
-    }
-
-    debugTrace(DEBUG_sched,
-              "raising exception in thread %ld.", (long)tso->id);
-    
-    // Remove it from any blocking queues
-    unblockThread(cap,tso);
-
-    // mark it dirty; we're about to change its stack.
-    dirtyTSO(tso);
-
-    sp = tso->sp;
-    
-    // The stack freezing code assumes there's a closure pointer on
-    // the top of the stack, so we have to arrange that this is the case...
-    //
-    if (sp[0] == (W_)&stg_enter_info) {
-       sp++;
-    } else {
-       sp--;
-       sp[0] = (W_)&stg_dummy_ret_closure;
-    }
-
-    frame = sp + 1;
-    while (stop_here == NULL || frame < stop_here) {
-
-       // 1. Let the top of the stack be the "current closure"
-       //
-       // 2. Walk up the stack until we find either an UPDATE_FRAME or a
-       // CATCH_FRAME.
-       //
-       // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
-       // current closure applied to the chunk of stack up to (but not
-       // including) the update frame.  This closure becomes the "current
-       // closure".  Go back to step 2.
-       //
-       // 4. If it's a CATCH_FRAME, then leave the exception handler on
-       // top of the stack applied to the exception.
-       // 
-       // 5. If it's a STOP_FRAME, then kill the thread.
-        // 
-        // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
-        // transaction
-       
-       info = get_ret_itbl((StgClosure *)frame);
-
-       switch (info->i.type) {
-
-       case UPDATE_FRAME:
-       {
-           StgAP_STACK * ap;
-           nat words;
-           
-           // First build an AP_STACK consisting of the stack chunk above the
-           // current update frame, with the top word on the stack as the
-           // fun field.
-           //
-           words = frame - sp - 1;
-           ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
-           
-           ap->size = words;
-           ap->fun  = (StgClosure *)sp[0];
-           sp++;
-           for(i=0; i < (nat)words; ++i) {
-               ap->payload[i] = (StgClosure *)*sp++;
-           }
-           
-           SET_HDR(ap,&stg_AP_STACK_info,
-                   ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
-           TICK_ALLOC_UP_THK(words+1,0);
-           
-           //IF_DEBUG(scheduler,
-           //       debugBelch("sched: Updating ");
-           //       printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
-           //       debugBelch(" with ");
-           //       printObj((StgClosure *)ap);
-           //  );
-
-           // Replace the updatee with an indirection
-           //
-           // Warning: if we're in a loop, more than one update frame on
-           // the stack may point to the same object.  Be careful not to
-           // overwrite an IND_OLDGEN in this case, because we'll screw
-           // up the mutable lists.  To be on the safe side, don't
-           // overwrite any kind of indirection at all.  See also
-           // threadSqueezeStack in GC.c, where we have to make a similar
-           // check.
-           //
-           if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
-               // revert the black hole
-               UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
-                              (StgClosure *)ap);
-           }
-           sp += sizeofW(StgUpdateFrame) - 1;
-           sp[0] = (W_)ap; // push onto stack
-           frame = sp + 1;
-           continue; //no need to bump frame
-       }
-
-       case STOP_FRAME:
-           // We've stripped the entire stack, the thread is now dead.
-           tso->what_next = ThreadKilled;
-           tso->sp = frame + sizeofW(StgStopFrame);
-           return;
-
-       case CATCH_FRAME:
-           // If we find a CATCH_FRAME, and we've got an exception to raise,
-           // then build the THUNK raise(exception), and leave it on
-           // top of the CATCH_FRAME ready to enter.
-           //
-       {
-#ifdef PROFILING
-           StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
-           StgThunk *raise;
-           
-           if (exception == NULL) break;
-
-           // we've got an exception to raise, so let's pass it to the
-           // handler in this frame.
-           //
-           raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
-           TICK_ALLOC_SE_THK(1,0);
-           SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
-           raise->payload[0] = exception;
-           
-           // throw away the stack from Sp up to the CATCH_FRAME.
-           //
-           sp = frame - 1;
-           
-           /* Ensure that async excpetions are blocked now, so we don't get
-            * a surprise exception before we get around to executing the
-            * handler.
-            */
-           if (tso->blocked_exceptions == NULL) {
-               tso->blocked_exceptions = END_TSO_QUEUE;
-           }
-
-           /* Put the newly-built THUNK on top of the stack, ready to execute
-            * when the thread restarts.
-            */
-           sp[0] = (W_)raise;
-           sp[-1] = (W_)&stg_enter_info;
-           tso->sp = sp-1;
-           tso->what_next = ThreadRunGHC;
-           IF_DEBUG(sanity, checkTSO(tso));
-           return;
-       }
-           
-       case ATOMICALLY_FRAME:
-           if (stop_at_atomically) {
-               ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
-               stmCondemnTransaction(cap, tso -> trec);
-#ifdef REG_R1
-               tso->sp = frame;
-#else
-               // R1 is not a register: the return convention for IO in
-               // this case puts the return value on the stack, so we
-               // need to set up the stack to return to the atomically
-               // frame properly...
-               tso->sp = frame - 2;
-               tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
-               tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
-               tso->what_next = ThreadRunGHC;
-               return;
-           }
-           // Not stop_at_atomically... fall through and abort the
-           // transaction.
-           
-       case CATCH_RETRY_FRAME:
-           // IF we find an ATOMICALLY_FRAME then we abort the
-           // current transaction and propagate the exception.  In
-           // this case (unlike ordinary exceptions) we do not care
-           // whether the transaction is valid or not because its
-           // possible validity cannot have caused the exception
-           // and will not be visible after the abort.
-           debugTrace(DEBUG_stm, 
-                      "found atomically block delivering async exception");
-
-            StgTRecHeader *trec = tso -> trec;
-            StgTRecHeader *outer = stmGetEnclosingTRec(trec);
-            stmAbortTransaction(cap, trec);
-            tso -> trec = outer;
-           break;
-           
-       default:
-           break;
-       }
-
-       // move on to the next stack frame
-       frame += stack_frame_sizeW((StgClosure *)frame);
-    }
-
-    // if we got here, then we stopped at stop_here
-    ASSERT(stop_here != NULL);
-}
-
-/* -----------------------------------------------------------------------------
    Deleting threads
 
    This is used for interruption (^C) and forking, and corresponds to
@@ -4108,10 +2921,15 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
 static void 
 deleteThread (Capability *cap, StgTSO *tso)
 {
-  if (tso->why_blocked != BlockedOnCCall &&
-      tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
-      raiseAsync(cap,tso,NULL);
-  }
+    // NOTE: must only be called on a TSO that we have exclusive
+    // access to, because we will call throwToSingleThreaded() below.
+    // The TSO must be on the run queue of the Capability we own, or 
+    // we must own all Capabilities.
+
+    if (tso->why_blocked != BlockedOnCCall &&
+       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
+       throwToSingleThreaded(cap,tso,NULL);
+    }
 }
 
 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
@@ -4293,13 +3111,16 @@ resurrectThreads (StgTSO *threads)
        case BlockedOnMVar:
        case BlockedOnException:
            /* Called by GC - sched_mutex lock is currently held. */
-           raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
+           throwToSingleThreaded(cap, tso,
+                                 (StgClosure *)BlockedOnDeadMVar_closure);
            break;
        case BlockedOnBlackHole:
-           raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
+           throwToSingleThreaded(cap, tso,
+                                 (StgClosure *)NonTermination_closure);
            break;
        case BlockedOnSTM:
-           raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
+           throwToSingleThreaded(cap, tso,
+                                 (StgClosure *)BlockedIndefinitely_closure);
            break;
        case NotBlocked:
            /* This might happen if the thread was blocked on a black hole
@@ -4312,298 +3133,3 @@ resurrectThreads (StgTSO *threads)
        }
     }
 }
-
-/* ----------------------------------------------------------------------------
- * Debugging: why is a thread blocked
- * [Also provides useful information when debugging threaded programs
- *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
-   ------------------------------------------------------------------------- */
-
-#if DEBUG
-static void
-printThreadBlockage(StgTSO *tso)
-{
-  switch (tso->why_blocked) {
-  case BlockedOnRead:
-    debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
-    break;
-  case BlockedOnWrite:
-    debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
-    break;
-#if defined(mingw32_HOST_OS)
-    case BlockedOnDoProc:
-    debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
-    break;
-#endif
-  case BlockedOnDelay:
-    debugBelch("is blocked until %ld", (long)(tso->block_info.target));
-    break;
-  case BlockedOnMVar:
-    debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
-    break;
-  case BlockedOnException:
-    debugBelch("is blocked on delivering an exception to thread %d",
-           tso->block_info.tso->id);
-    break;
-  case BlockedOnBlackHole:
-    debugBelch("is blocked on a black hole");
-    break;
-  case NotBlocked:
-    debugBelch("is not blocked");
-    break;
-#if defined(PARALLEL_HASKELL)
-  case BlockedOnGA:
-    debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
-           tso->block_info.closure, info_type(tso->block_info.closure));
-    break;
-  case BlockedOnGA_NoSend:
-    debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
-           tso->block_info.closure, info_type(tso->block_info.closure));
-    break;
-#endif
-  case BlockedOnCCall:
-    debugBelch("is blocked on an external call");
-    break;
-  case BlockedOnCCall_NoUnblockExc:
-    debugBelch("is blocked on an external call (exceptions were already blocked)");
-    break;
-  case BlockedOnSTM:
-    debugBelch("is blocked on an STM operation");
-    break;
-  default:
-    barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
-        tso->why_blocked, tso->id, tso);
-  }
-}
-
-void
-printThreadStatus(StgTSO *t)
-{
-    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
-    {
-      void *label = lookupThreadLabel(t->id);
-      if (label) debugBelch("[\"%s\"] ",(char *)label);
-    }
-    if (t->what_next == ThreadRelocated) {
-       debugBelch("has been relocated...\n");
-    } else {
-       switch (t->what_next) {
-       case ThreadKilled:
-           debugBelch("has been killed");
-           break;
-       case ThreadComplete:
-           debugBelch("has completed");
-           break;
-       default:
-           printThreadBlockage(t);
-       }
-       debugBelch("\n");
-    }
-}
-
-void
-printAllThreads(void)
-{
-  StgTSO *t, *next;
-  nat i;
-  Capability *cap;
-
-# if defined(GRAN)
-  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
-  ullong_format_string(TIME_ON_PROC(CurrentProc), 
-                      time_string, rtsFalse/*no commas!*/);
-
-  debugBelch("all threads at [%s]:\n", time_string);
-# elif defined(PARALLEL_HASKELL)
-  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
-  ullong_format_string(CURRENT_TIME,
-                      time_string, rtsFalse/*no commas!*/);
-
-  debugBelch("all threads at [%s]:\n", time_string);
-# else
-  debugBelch("all threads:\n");
-# endif
-
-  for (i = 0; i < n_capabilities; i++) {
-      cap = &capabilities[i];
-      debugBelch("threads on capability %d:\n", cap->no);
-      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
-         printThreadStatus(t);
-      }
-  }
-
-  debugBelch("other threads:\n");
-  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
-      if (t->why_blocked != NotBlocked) {
-         printThreadStatus(t);
-      }
-      if (t->what_next == ThreadRelocated) {
-         next = t->link;
-      } else {
-         next = t->global_link;
-      }
-  }
-}
-
-// useful from gdb
-void 
-printThreadQueue(StgTSO *t)
-{
-    nat i = 0;
-    for (; t != END_TSO_QUEUE; t = t->link) {
-       printThreadStatus(t);
-       i++;
-    }
-    debugBelch("%d threads on queue\n", i);
-}
-
-/* 
-   Print a whole blocking queue attached to node (debugging only).
-*/
-# if defined(PARALLEL_HASKELL)
-void 
-print_bq (StgClosure *node)
-{
-  StgBlockingQueueElement *bqe;
-  StgTSO *tso;
-  rtsBool end;
-
-  debugBelch("## BQ of closure %p (%s): ",
-         node, info_type(node));
-
-  /* should cover all closures that may have a blocking queue */
-  ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
-        get_itbl(node)->type == FETCH_ME_BQ ||
-        get_itbl(node)->type == RBH ||
-        get_itbl(node)->type == MVAR);
-    
-  ASSERT(node!=(StgClosure*)NULL);         // sanity check
-
-  print_bqe(((StgBlockingQueue*)node)->blocking_queue);
-}
-
-/* 
-   Print a whole blocking queue starting with the element bqe.
-*/
-void 
-print_bqe (StgBlockingQueueElement *bqe)
-{
-  rtsBool end;
-
-  /* 
-     NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
-  */
-  for (end = (bqe==END_BQ_QUEUE);
-       !end; // iterate until bqe points to a CONSTR
-       end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
-       bqe = end ? END_BQ_QUEUE : bqe->link) {
-    ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
-    ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
-    /* types of closures that may appear in a blocking queue */
-    ASSERT(get_itbl(bqe)->type == TSO ||           
-          get_itbl(bqe)->type == BLOCKED_FETCH || 
-          get_itbl(bqe)->type == CONSTR); 
-    /* only BQs of an RBH end with an RBH_Save closure */
-    //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
-
-    switch (get_itbl(bqe)->type) {
-    case TSO:
-      debugBelch(" TSO %u (%x),",
-             ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
-      break;
-    case BLOCKED_FETCH:
-      debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
-             ((StgBlockedFetch *)bqe)->node, 
-             ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
-             ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
-             ((StgBlockedFetch *)bqe)->ga.weight);
-      break;
-    case CONSTR:
-      debugBelch(" %s (IP %p),",
-             (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
-              get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
-              get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
-              "RBH_Save_?"), get_itbl(bqe));
-      break;
-    default:
-      barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
-          info_type((StgClosure *)bqe)); // , node, info_type(node));
-      break;
-    }
-  } /* for */
-  debugBelch("\n");
-}
-# elif defined(GRAN)
-void 
-print_bq (StgClosure *node)
-{
-  StgBlockingQueueElement *bqe;
-  PEs node_loc, tso_loc;
-  rtsBool end;
-
-  /* should cover all closures that may have a blocking queue */
-  ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
-        get_itbl(node)->type == FETCH_ME_BQ ||
-        get_itbl(node)->type == RBH);
-    
-  ASSERT(node!=(StgClosure*)NULL);         // sanity check
-  node_loc = where_is(node);
-
-  debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
-         node, info_type(node), node_loc);
-
-  /* 
-     NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
-  */
-  for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
-       !end; // iterate until bqe points to a CONSTR
-       end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
-    ASSERT(bqe != END_BQ_QUEUE);             // sanity check
-    ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
-    /* types of closures that may appear in a blocking queue */
-    ASSERT(get_itbl(bqe)->type == TSO ||           
-          get_itbl(bqe)->type == CONSTR); 
-    /* only BQs of an RBH end with an RBH_Save closure */
-    ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
-
-    tso_loc = where_is((StgClosure *)bqe);
-    switch (get_itbl(bqe)->type) {
-    case TSO:
-      debugBelch(" TSO %d (%p) on [PE %d],",
-             ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
-      break;
-    case CONSTR:
-      debugBelch(" %s (IP %p),",
-             (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
-              get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
-              get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
-              "RBH_Save_?"), get_itbl(bqe));
-      break;
-    default:
-      barf("Unexpected closure type %s in blocking queue of %p (%s)",
-          info_type((StgClosure *)bqe), node, info_type(node));
-      break;
-    }
-  } /* for */
-  debugBelch("\n");
-}
-# endif
-
-#if defined(PARALLEL_HASKELL)
-static nat
-run_queue_len(void)
-{
-    nat i;
-    StgTSO *tso;
-    
-    for (i=0, tso=run_queue_hd; 
-        tso != END_TSO_QUEUE;
-        i++, tso=tso->link) {
-       /* nothing */
-    }
-       
-    return i;
-}
-#endif
-
-#endif /* DEBUG */
index e30e911..f82946e 100644 (file)
@@ -55,24 +55,7 @@ void wakeUpRts(void);
  * Called from STG :  yes
  * Locks assumed   :  we own the Capability.
  */
-StgTSO * unblockOne(Capability *cap, StgTSO *tso);
-
-/* raiseAsync()
- *
- * Raises an exception asynchronously in the specified thread.
- *
- * Called from STG :  yes
- * Locks assumed   :  none
- */
-void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception);
-
-/* suspendComputation()
- *
- * A variant of raiseAsync(), this strips the stack of the specified
- * thread down to the stop_here point, leaving a current closure on
- * top of the stack at [stop_here - 1].
- */
-void suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here);
+StgTSO * unblockOne (Capability *cap, StgTSO *tso);
 
 /* raiseExceptionHelper */
 StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
@@ -176,8 +159,6 @@ extern rtsBool blackholes_need_checking;
 extern Mutex RTS_VAR(sched_mutex);
 #endif
 
-StgBool isThreadBound(StgTSO *tso);
-
 SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret);
 
 /* Called by shutdown_handler(). */
@@ -198,8 +179,6 @@ void print_bq (StgClosure *node);
 void print_bqe (StgBlockingQueueElement *bqe);
 #endif
 
-void labelThread(StgPtr tso, char *label);
-
 /* -----------------------------------------------------------------------------
  * Some convenient macros/inline functions...
  */
index 9b9f172..72cd5d3 100644 (file)
@@ -8,8 +8,10 @@
  * ---------------------------------------------------------------------------*/
 
 #include "PosixSource.h"
+#include "Rts.h"
 #include "ThreadLabels.h"
 #include "RtsUtils.h"
+#include "Hash.h"
 
 #include <stdlib.h>
 
@@ -47,4 +49,19 @@ removeThreadLabel(StgWord key)
     stgFree(old);
   }  
 }
+
+void
+labelThread(StgPtr tso, char *label)
+{
+  int len;
+  void *buf;
+
+  /* Caveat: Once set, you can only set the thread name to "" */
+  len = strlen(label)+1;
+  buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
+  strncpy(buf,label,len);
+  /* Update will free the old memory for us */
+  updateThreadLabel(((StgTSO *)tso)->id,buf);
+}
+
 #endif /* DEBUG */
index 97d3d0d..eaed22d 100644 (file)
@@ -1,27 +1,21 @@
 /* -----------------------------------------------------------------------------
  * ThreadLabels.h
  *
- * (c) The GHC Team 2002-2003
+ * (c) The GHC Team 2002-2006
  *
  * Table of thread labels.
  *
  * ---------------------------------------------------------------------------*/
+
 #ifndef __THREADLABELS_H__
 #define __THREADLABELS_H__
 
-#include "Rts.h"
-#include "Hash.h"
-
-void
-initThreadLabelTable(void);
-
-void
-updateThreadLabel(StgWord key, void *data);
-
-void *
-lookupThreadLabel(StgWord key);
-
-void
-removeThreadLabel(StgWord key);
+#if defined(DEBUG)
+void    initThreadLabelTable (void);
+void    updateThreadLabel    (StgWord key, void *data);
+void *  lookupThreadLabel    (StgWord key);
+void    removeThreadLabel    (StgWord key);
+void    labelThread          (StgPtr tso, char *label);
+#endif
 
 #endif /* __THREADLABELS_H__ */
diff --git a/rts/Threads.c b/rts/Threads.c
new file mode 100644 (file)
index 0000000..b550cc6
--- /dev/null
@@ -0,0 +1,974 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2006
+ *
+ * Thread-related functionality
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "PosixSource.h"
+#include "Rts.h"
+#include "SchedAPI.h"
+#include "Storage.h"
+#include "Threads.h"
+#include "RtsFlags.h"
+#include "STM.h"
+#include "Schedule.h"
+#include "Trace.h"
+#include "ThreadLabels.h"
+
+/* Next thread ID to allocate.
+ * LOCK: sched_mutex
+ */
+static StgThreadID next_thread_id = 1;
+
+/* The smallest stack size that makes any sense is:
+ *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
+ *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
+ *  + 1                       (the closure to enter)
+ *  + 1                              (stg_ap_v_ret)
+ *  + 1                              (spare slot req'd by stg_ap_v_ret)
+ *
+ * A thread with this stack will bomb immediately with a stack
+ * overflow, which will increase its stack size.  
+ */
+#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
+
+/* ---------------------------------------------------------------------------
+   Create a new thread.
+
+   The new thread starts with the given stack size.  Before the
+   scheduler can run, however, this thread needs to have a closure
+   (and possibly some arguments) pushed on its stack.  See
+   pushClosure() in Schedule.h.
+
+   createGenThread() and createIOThread() (in SchedAPI.h) are
+   convenient packaged versions of this function.
+
+   currently pri (priority) is only used in a GRAN setup -- HWL
+   ------------------------------------------------------------------------ */
+#if defined(GRAN)
+/*   currently pri (priority) is only used in a GRAN setup -- HWL */
+StgTSO *
+createThread(nat size, StgInt pri)
+#else
+StgTSO *
+createThread(Capability *cap, nat size)
+#endif
+{
+    StgTSO *tso;
+    nat stack_size;
+
+    /* sched_mutex is *not* required */
+
+    /* First check whether we should create a thread at all */
+#if defined(PARALLEL_HASKELL)
+    /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
+    if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
+       threadsIgnored++;
+       debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
+                  RtsFlags.ParFlags.maxThreads, advisory_thread_count);
+       return END_TSO_QUEUE;
+    }
+    threadsCreated++;
+#endif
+
+#if defined(GRAN)
+    ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
+#endif
+
+    // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
+
+    /* catch ridiculously small stack sizes */
+    if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
+       size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
+    }
+
+    stack_size = size - TSO_STRUCT_SIZEW;
+    
+    tso = (StgTSO *)allocateLocal(cap, size);
+    TICK_ALLOC_TSO(stack_size, 0);
+
+    SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
+#if defined(GRAN)
+    SET_GRAN_HDR(tso, ThisPE);
+#endif
+
+    // Always start with the compiled code evaluator
+    tso->what_next = ThreadRunGHC;
+
+    tso->why_blocked  = NotBlocked;
+    tso->blocked_exceptions = END_TSO_QUEUE;
+    tso->flags = TSO_DIRTY;
+    
+    tso->saved_errno = 0;
+    tso->bound = NULL;
+    tso->cap = cap;
+    
+    tso->stack_size     = stack_size;
+    tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
+                         - TSO_STRUCT_SIZEW;
+    tso->sp             = (P_)&(tso->stack) + stack_size;
+
+    tso->trec = NO_TREC;
+    
+#ifdef PROFILING
+    tso->prof.CCCS = CCS_MAIN;
+#endif
+    
+  /* put a stop frame on the stack */
+    tso->sp -= sizeofW(StgStopFrame);
+    SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
+    tso->link = END_TSO_QUEUE;
+    
+  // ToDo: check this
+#if defined(GRAN)
+    /* uses more flexible routine in GranSim */
+    insertThread(tso, CurrentProc);
+#else
+    /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
+     * from its creation
+     */
+#endif
+    
+#if defined(GRAN) 
+    if (RtsFlags.GranFlags.GranSimStats.Full) 
+       DumpGranEvent(GR_START,tso);
+#elif defined(PARALLEL_HASKELL)
+    if (RtsFlags.ParFlags.ParStats.Full) 
+       DumpGranEvent(GR_STARTQ,tso);
+    /* HACk to avoid SCHEDULE 
+       LastTSO = tso; */
+#endif
+    
+    /* Link the new thread on the global thread list.
+     */
+    ACQUIRE_LOCK(&sched_mutex);
+    tso->id = next_thread_id++;  // while we have the mutex
+    tso->global_link = all_threads;
+    all_threads = tso;
+    RELEASE_LOCK(&sched_mutex);
+    
+#if defined(DIST)
+    tso->dist.priority = MandatoryPriority; //by default that is...
+#endif
+    
+#if defined(GRAN)
+    tso->gran.pri = pri;
+# if defined(DEBUG)
+    tso->gran.magic = TSO_MAGIC; // debugging only
+# endif
+    tso->gran.sparkname   = 0;
+    tso->gran.startedat   = CURRENT_TIME; 
+    tso->gran.exported    = 0;
+    tso->gran.basicblocks = 0;
+    tso->gran.allocs      = 0;
+    tso->gran.exectime    = 0;
+    tso->gran.fetchtime   = 0;
+    tso->gran.fetchcount  = 0;
+    tso->gran.blocktime   = 0;
+    tso->gran.blockcount  = 0;
+    tso->gran.blockedat   = 0;
+    tso->gran.globalsparks = 0;
+    tso->gran.localsparks  = 0;
+    if (RtsFlags.GranFlags.Light)
+       tso->gran.clock  = Now; /* local clock */
+    else
+       tso->gran.clock  = 0;
+    
+    IF_DEBUG(gran,printTSO(tso));
+#elif defined(PARALLEL_HASKELL)
+# if defined(DEBUG)
+    tso->par.magic = TSO_MAGIC; // debugging only
+# endif
+    tso->par.sparkname   = 0;
+    tso->par.startedat   = CURRENT_TIME; 
+    tso->par.exported    = 0;
+    tso->par.basicblocks = 0;
+    tso->par.allocs      = 0;
+    tso->par.exectime    = 0;
+    tso->par.fetchtime   = 0;
+    tso->par.fetchcount  = 0;
+    tso->par.blocktime   = 0;
+    tso->par.blockcount  = 0;
+    tso->par.blockedat   = 0;
+    tso->par.globalsparks = 0;
+    tso->par.localsparks  = 0;
+#endif
+    
+#if defined(GRAN)
+    globalGranStats.tot_threads_created++;
+    globalGranStats.threads_created_on_PE[CurrentProc]++;
+    globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
+    globalGranStats.tot_sq_probes++;
+#elif defined(PARALLEL_HASKELL)
+    // collect parallel global statistics (currently done together with GC stats)
+    if (RtsFlags.ParFlags.ParStats.Global &&
+       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
+       //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
+       globalParStats.tot_threads_created++;
+    }
+#endif 
+    
+#if defined(GRAN)
+    debugTrace(GRAN_DEBUG_pri,
+              "==__ schedule: Created TSO %d (%p);",
+              CurrentProc, tso, tso->id);
+#elif defined(PARALLEL_HASKELL)
+    debugTrace(PAR_DEBUG_verbose,
+              "==__ schedule: Created TSO %d (%p); %d threads active",
+              (long)tso->id, tso, advisory_thread_count);
+#else
+    debugTrace(DEBUG_sched,
+              "created thread %ld, stack size = %lx words", 
+              (long)tso->id, (long)tso->stack_size);
+#endif    
+    return tso;
+}
+
+#if defined(PAR)
+/* RFP:
+   all parallel thread creation calls should fall through the following routine.
+*/
+StgTSO *
+createThreadFromSpark(rtsSpark spark) 
+{ StgTSO *tso;
+  ASSERT(spark != (rtsSpark)NULL);
+// JB: TAKE CARE OF THIS COUNTER! BUGGY
+  if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
+  { threadsIgnored++;
+    barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
+         RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
+    return END_TSO_QUEUE;
+  }
+  else
+  { threadsCreated++;
+    tso = createThread(RtsFlags.GcFlags.initialStkSize);
+    if (tso==END_TSO_QUEUE)    
+      barf("createSparkThread: Cannot create TSO");
+#if defined(DIST)
+    tso->priority = AdvisoryPriority;
+#endif
+    pushClosure(tso,spark);
+    addToRunQueue(tso);
+    advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
+  }
+  return tso;
+}
+#endif
+
+/* ---------------------------------------------------------------------------
+ * Comparing Thread ids.
+ *
+ * This is used from STG land in the implementation of the
+ * instances of Eq/Ord for ThreadIds.
+ * ------------------------------------------------------------------------ */
+
+int
+cmp_thread(StgPtr tso1, StgPtr tso2) 
+{ 
+  StgThreadID id1 = ((StgTSO *)tso1)->id; 
+  StgThreadID id2 = ((StgTSO *)tso2)->id;
+  if (id1 < id2) return (-1);
+  if (id1 > id2) return 1;
+  return 0;
+}
+
+/* ---------------------------------------------------------------------------
+ * Fetching the ThreadID from an StgTSO.
+ *
+ * This is used in the implementation of Show for ThreadIds.
+ * ------------------------------------------------------------------------ */
+int
+rts_getThreadId(StgPtr tso) 
+{
+  return ((StgTSO *)tso)->id;
+}
+
+/* -----------------------------------------------------------------------------
+   Remove a thread from a queue.
+   Fails fatally if the TSO is not on the queue.
+   -------------------------------------------------------------------------- */
+
+void
+removeThreadFromQueue (StgTSO **queue, StgTSO *tso)
+{
+    StgTSO *t, *prev;
+
+    prev = NULL;
+    for (t = *queue; t != END_TSO_QUEUE; prev = t, t = t->link) {
+       if (t == tso) {
+           if (prev) {
+               prev->link = t->link;
+           } else {
+               *queue = t->link;
+           }
+           return;
+       }
+    }
+    barf("removeThreadFromQueue: not found");
+}
+
+void
+removeThreadFromDeQueue (StgTSO **head, StgTSO **tail, StgTSO *tso)
+{
+    StgTSO *t, *prev;
+
+    prev = NULL;
+    for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->link) {
+       if (t == tso) {
+           if (prev) {
+               prev->link = t->link;
+           } else {
+               *head = t->link;
+           }
+           if (*tail == tso) {
+               if (prev) {
+                   *tail = prev;
+               } else {
+                   *tail = END_TSO_QUEUE;
+               }
+           }
+           return;
+       }
+    }
+    barf("removeThreadFromMVarQueue: not found");
+}
+
+void
+removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso)
+{
+    removeThreadFromDeQueue (&mvar->head, &mvar->tail, tso);
+}
+
+/* ----------------------------------------------------------------------------
+   unblockOne()
+
+   unblock a single thread.
+   ------------------------------------------------------------------------- */
+
+#if defined(GRAN)
+STATIC_INLINE void
+unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+{
+}
+#elif defined(PARALLEL_HASKELL)
+STATIC_INLINE void
+unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
+{
+  /* write RESUME events to log file and
+     update blocked and fetch time (depending on type of the orig closure) */
+  if (RtsFlags.ParFlags.ParStats.Full) {
+    DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
+                    GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
+                    0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+    if (emptyRunQueue())
+      emitSchedule = rtsTrue;
+
+    switch (get_itbl(node)->type) {
+       case FETCH_ME_BQ:
+         ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
+         break;
+       case RBH:
+       case FETCH_ME:
+       case BLACKHOLE_BQ:
+         ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
+         break;
+#ifdef DIST
+        case MVAR:
+          break;
+#endif   
+       default:
+         barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
+       }
+      }
+}
+#endif
+
+#if defined(GRAN)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
+{
+    StgTSO *tso;
+    PEs node_loc, tso_loc;
+
+    node_loc = where_is(node); // should be lifted out of loop
+    tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
+    tso_loc = where_is((StgClosure *)tso);
+    if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
+      /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
+      ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
+      CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
+      // insertThread(tso, node_loc);
+      new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
+               ResumeThread,
+               tso, node, (rtsSpark*)NULL);
+      tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
+      // len_local++;
+      // len++;
+    } else { // TSO is remote (actually should be FMBQ)
+      CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
+                                  RtsFlags.GranFlags.Costs.gunblocktime +
+                                 RtsFlags.GranFlags.Costs.latency;
+      new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
+               UnblockThread,
+               tso, node, (rtsSpark*)NULL);
+      tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
+      // len++;
+    }
+    /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
+    IF_GRAN_DEBUG(bq,
+                 debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
+                         (node_loc==tso_loc ? "Local" : "Global"), 
+                         tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
+    tso->block_info.closure = NULL;
+    debugTrace(DEBUG_sched, "-- waking up thread %ld (%p)", 
+              tso->id, tso));
+}
+#elif defined(PARALLEL_HASKELL)
+StgBlockingQueueElement *
+unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
+{
+    StgBlockingQueueElement *next;
+
+    switch (get_itbl(bqe)->type) {
+    case TSO:
+      ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
+      /* if it's a TSO just push it onto the run_queue */
+      next = bqe->link;
+      ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
+      APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
+      threadRunnable();
+      unblockCount(bqe, node);
+      /* reset blocking status after dumping event */
+      ((StgTSO *)bqe)->why_blocked = NotBlocked;
+      break;
+
+    case BLOCKED_FETCH:
+      /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
+      next = bqe->link;
+      bqe->link = (StgBlockingQueueElement *)PendingFetches;
+      PendingFetches = (StgBlockedFetch *)bqe;
+      break;
+
+# if defined(DEBUG)
+      /* can ignore this case in a non-debugging setup; 
+        see comments on RBHSave closures above */
+    case CONSTR:
+      /* check that the closure is an RBHSave closure */
+      ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
+            get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
+            get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
+      break;
+
+    default:
+      barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
+          get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
+          (StgClosure *)bqe);
+# endif
+    }
+  IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
+  return next;
+}
+#endif
+
+StgTSO *
+unblockOne (Capability *cap, StgTSO *tso)
+{
+    return unblockOne_(cap,tso,rtsTrue); // allow migration
+}
+
+StgTSO *
+unblockOne_ (Capability *cap, StgTSO *tso, 
+            rtsBool allow_migrate USED_IF_THREADS)
+{
+  StgTSO *next;
+
+  ASSERT(get_itbl(tso)->type == TSO);
+  ASSERT(tso->why_blocked != NotBlocked);
+
+  tso->why_blocked = NotBlocked;
+  next = tso->link;
+  tso->link = END_TSO_QUEUE;
+
+#if defined(THREADED_RTS)
+  if (tso->cap == cap || (!tsoLocked(tso) && 
+                         allow_migrate && 
+                         RtsFlags.ParFlags.wakeupMigrate)) {
+      // We are waking up this thread on the current Capability, which
+      // might involve migrating it from the Capability it was last on.
+      if (tso->bound) {
+         ASSERT(tso->bound->cap == tso->cap);
+         tso->bound->cap = cap;
+      }
+      tso->cap = cap;
+      appendToRunQueue(cap,tso);
+      // we're holding a newly woken thread, make sure we context switch
+      // quickly so we can migrate it if necessary.
+      context_switch = 1;
+  } else {
+      // we'll try to wake it up on the Capability it was last on.
+      wakeupThreadOnCapability_lock(tso->cap, tso);
+  }
+#else
+  appendToRunQueue(cap,tso);
+  context_switch = 1;
+#endif
+
+  debugTrace(DEBUG_sched,
+            "waking up thread %ld on cap %d",
+            (long)tso->id, tso->cap->no);
+
+  return next;
+}
+
+/* ----------------------------------------------------------------------------
+   awakenBlockedQueue
+
+   wakes up all the threads on the specified queue.
+   ------------------------------------------------------------------------- */
+
+#if defined(GRAN)
+void
+awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+{
+  StgBlockingQueueElement *bqe;
+  PEs node_loc;
+  nat len = 0; 
+
+  IF_GRAN_DEBUG(bq, 
+               debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
+                     node, CurrentProc, CurrentTime[CurrentProc], 
+                     CurrentTSO->id, CurrentTSO));
+
+  node_loc = where_is(node);
+
+  ASSERT(q == END_BQ_QUEUE ||
+        get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
+        get_itbl(q)->type == CONSTR); // closure (type constructor)
+  ASSERT(is_unique(node));
+
+  /* FAKE FETCH: magically copy the node to the tso's proc;
+     no Fetch necessary because in reality the node should not have been 
+     moved to the other PE in the first place
+  */
+  if (CurrentProc!=node_loc) {
+    IF_GRAN_DEBUG(bq, 
+                 debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
+                       node, node_loc, CurrentProc, CurrentTSO->id, 
+                       // CurrentTSO, where_is(CurrentTSO),
+                       node->header.gran.procs));
+    node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
+    IF_GRAN_DEBUG(bq, 
+                 debugBelch("## new bitmask of node %p is %#x\n",
+                       node, node->header.gran.procs));
+    if (RtsFlags.GranFlags.GranSimStats.Global) {
+      globalGranStats.tot_fake_fetches++;
+    }
+  }
+
+  bqe = q;
+  // ToDo: check: ASSERT(CurrentProc==node_loc);
+  while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
+    //next = bqe->link;
+    /* 
+       bqe points to the current element in the queue
+       next points to the next element in the queue
+    */
+    //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
+    //tso_loc = where_is(tso);
+    len++;
+    bqe = unblockOne(bqe, node);
+  }
+
+  /* if this is the BQ of an RBH, we have to put back the info ripped out of
+     the closure to make room for the anchor of the BQ */
+  if (bqe!=END_BQ_QUEUE) {
+    ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
+    /*
+    ASSERT((info_ptr==&RBH_Save_0_info) ||
+          (info_ptr==&RBH_Save_1_info) ||
+          (info_ptr==&RBH_Save_2_info));
+    */
+    /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
+    ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
+    ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
+
+    IF_GRAN_DEBUG(bq,
+                 debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
+                       node, info_type(node)));
+  }
+
+  /* statistics gathering */
+  if (RtsFlags.GranFlags.GranSimStats.Global) {
+    // globalGranStats.tot_bq_processing_time += bq_processing_time;
+    globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
+    // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
+    globalGranStats.tot_awbq++;             // total no. of bqs awakened
+  }
+  IF_GRAN_DEBUG(bq,
+               debugBelch("## BQ Stats of %p: [%d entries] %s\n",
+                       node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
+}
+#elif defined(PARALLEL_HASKELL)
+void 
+awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
+{
+  StgBlockingQueueElement *bqe;
+
+  IF_PAR_DEBUG(verbose, 
+              debugBelch("##-_ AwBQ for node %p on [%x]: \n",
+                    node, mytid));
+#ifdef DIST  
+  //RFP
+  if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
+    IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
+    return;
+  }
+#endif
+  
+  ASSERT(q == END_BQ_QUEUE ||
+        get_itbl(q)->type == TSO ||           
+        get_itbl(q)->type == BLOCKED_FETCH || 
+        get_itbl(q)->type == CONSTR); 
+
+  bqe = q;
+  while (get_itbl(bqe)->type==TSO || 
+        get_itbl(bqe)->type==BLOCKED_FETCH) {
+    bqe = unblockOne(bqe, node);
+  }
+}
+
+#else   /* !GRAN && !PARALLEL_HASKELL */
+
+void
+awakenBlockedQueue(Capability *cap, StgTSO *tso)
+{
+    while (tso != END_TSO_QUEUE) {
+       tso = unblockOne(cap,tso);
+    }
+}
+#endif
+
+
+/* ---------------------------------------------------------------------------
+ * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
+ * used by Control.Concurrent for error checking.
+ * ------------------------------------------------------------------------- */
+StgBool
+rtsSupportsBoundThreads(void)
+{
+#if defined(THREADED_RTS)
+  return rtsTrue;
+#else
+  return rtsFalse;
+#endif
+}
+
+/* ---------------------------------------------------------------------------
+ * isThreadBound(tso): check whether tso is bound to an OS thread.
+ * ------------------------------------------------------------------------- */
+StgBool
+isThreadBound(StgTSO* tso USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+  return (tso->bound != NULL);
+#endif
+  return rtsFalse;
+}
+
+/* ----------------------------------------------------------------------------
+ * Debugging: why is a thread blocked
+ * ------------------------------------------------------------------------- */
+
+#if DEBUG
+void
+printThreadBlockage(StgTSO *tso)
+{
+  switch (tso->why_blocked) {
+  case BlockedOnRead:
+    debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
+    break;
+  case BlockedOnWrite:
+    debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
+    break;
+#if defined(mingw32_HOST_OS)
+    case BlockedOnDoProc:
+    debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
+    break;
+#endif
+  case BlockedOnDelay:
+    debugBelch("is blocked until %ld", (long)(tso->block_info.target));
+    break;
+  case BlockedOnMVar:
+    debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
+    break;
+  case BlockedOnException:
+    debugBelch("is blocked on delivering an exception to thread %d",
+           tso->block_info.tso->id);
+    break;
+  case BlockedOnBlackHole:
+    debugBelch("is blocked on a black hole");
+    break;
+  case NotBlocked:
+    debugBelch("is not blocked");
+    break;
+#if defined(PARALLEL_HASKELL)
+  case BlockedOnGA:
+    debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
+           tso->block_info.closure, info_type(tso->block_info.closure));
+    break;
+  case BlockedOnGA_NoSend:
+    debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
+           tso->block_info.closure, info_type(tso->block_info.closure));
+    break;
+#endif
+  case BlockedOnCCall:
+    debugBelch("is blocked on an external call");
+    break;
+  case BlockedOnCCall_NoUnblockExc:
+    debugBelch("is blocked on an external call (exceptions were already blocked)");
+    break;
+  case BlockedOnSTM:
+    debugBelch("is blocked on an STM operation");
+    break;
+  default:
+    barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
+        tso->why_blocked, tso->id, tso);
+  }
+}
+
+void
+printThreadStatus(StgTSO *t)
+{
+    debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+    {
+      void *label = lookupThreadLabel(t->id);
+      if (label) debugBelch("[\"%s\"] ",(char *)label);
+    }
+    if (t->what_next == ThreadRelocated) {
+       debugBelch("has been relocated...\n");
+    } else {
+       switch (t->what_next) {
+       case ThreadKilled:
+           debugBelch("has been killed");
+           break;
+       case ThreadComplete:
+           debugBelch("has completed");
+           break;
+       default:
+           printThreadBlockage(t);
+       }
+       debugBelch("\n");
+    }
+}
+
+void
+printAllThreads(void)
+{
+  StgTSO *t, *next;
+  nat i;
+  Capability *cap;
+
+# if defined(GRAN)
+  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+  ullong_format_string(TIME_ON_PROC(CurrentProc), 
+                      time_string, rtsFalse/*no commas!*/);
+
+  debugBelch("all threads at [%s]:\n", time_string);
+# elif defined(PARALLEL_HASKELL)
+  char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
+  ullong_format_string(CURRENT_TIME,
+                      time_string, rtsFalse/*no commas!*/);
+
+  debugBelch("all threads at [%s]:\n", time_string);
+# else
+  debugBelch("all threads:\n");
+# endif
+
+  for (i = 0; i < n_capabilities; i++) {
+      cap = &capabilities[i];
+      debugBelch("threads on capability %d:\n", cap->no);
+      for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+         printThreadStatus(t);
+      }
+  }
+
+  debugBelch("other threads:\n");
+  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      if (t->why_blocked != NotBlocked) {
+         printThreadStatus(t);
+      }
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+      }
+  }
+}
+
+// useful from gdb
+void 
+printThreadQueue(StgTSO *t)
+{
+    nat i = 0;
+    for (; t != END_TSO_QUEUE; t = t->link) {
+       printThreadStatus(t);
+       i++;
+    }
+    debugBelch("%d threads on queue\n", i);
+}
+
+/* 
+   Print a whole blocking queue attached to node (debugging only).
+*/
+# if defined(PARALLEL_HASKELL)
+void 
+print_bq (StgClosure *node)
+{
+  StgBlockingQueueElement *bqe;
+  StgTSO *tso;
+  rtsBool end;
+
+  debugBelch("## BQ of closure %p (%s): ",
+         node, info_type(node));
+
+  /* should cover all closures that may have a blocking queue */
+  ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+        get_itbl(node)->type == FETCH_ME_BQ ||
+        get_itbl(node)->type == RBH ||
+        get_itbl(node)->type == MVAR);
+    
+  ASSERT(node!=(StgClosure*)NULL);         // sanity check
+
+  print_bqe(((StgBlockingQueue*)node)->blocking_queue);
+}
+
+/* 
+   Print a whole blocking queue starting with the element bqe.
+*/
+void 
+print_bqe (StgBlockingQueueElement *bqe)
+{
+  rtsBool end;
+
+  /* 
+     NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+  */
+  for (end = (bqe==END_BQ_QUEUE);
+       !end; // iterate until bqe points to a CONSTR
+       end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
+       bqe = end ? END_BQ_QUEUE : bqe->link) {
+    ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
+    ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
+    /* types of closures that may appear in a blocking queue */
+    ASSERT(get_itbl(bqe)->type == TSO ||           
+          get_itbl(bqe)->type == BLOCKED_FETCH || 
+          get_itbl(bqe)->type == CONSTR); 
+    /* only BQs of an RBH end with an RBH_Save closure */
+    //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+    switch (get_itbl(bqe)->type) {
+    case TSO:
+      debugBelch(" TSO %u (%x),",
+             ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
+      break;
+    case BLOCKED_FETCH:
+      debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
+             ((StgBlockedFetch *)bqe)->node, 
+             ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
+             ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
+             ((StgBlockedFetch *)bqe)->ga.weight);
+      break;
+    case CONSTR:
+      debugBelch(" %s (IP %p),",
+             (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+              get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+              get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+              "RBH_Save_?"), get_itbl(bqe));
+      break;
+    default:
+      barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
+          info_type((StgClosure *)bqe)); // , node, info_type(node));
+      break;
+    }
+  } /* for */
+  debugBelch("\n");
+}
+# elif defined(GRAN)
+void 
+print_bq (StgClosure *node)
+{
+  StgBlockingQueueElement *bqe;
+  PEs node_loc, tso_loc;
+  rtsBool end;
+
+  /* should cover all closures that may have a blocking queue */
+  ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
+        get_itbl(node)->type == FETCH_ME_BQ ||
+        get_itbl(node)->type == RBH);
+    
+  ASSERT(node!=(StgClosure*)NULL);         // sanity check
+  node_loc = where_is(node);
+
+  debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
+         node, info_type(node), node_loc);
+
+  /* 
+     NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
+  */
+  for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
+       !end; // iterate until bqe points to a CONSTR
+       end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
+    ASSERT(bqe != END_BQ_QUEUE);             // sanity check
+    ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
+    /* types of closures that may appear in a blocking queue */
+    ASSERT(get_itbl(bqe)->type == TSO ||           
+          get_itbl(bqe)->type == CONSTR); 
+    /* only BQs of an RBH end with an RBH_Save closure */
+    ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
+
+    tso_loc = where_is((StgClosure *)bqe);
+    switch (get_itbl(bqe)->type) {
+    case TSO:
+      debugBelch(" TSO %d (%p) on [PE %d],",
+             ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
+      break;
+    case CONSTR:
+      debugBelch(" %s (IP %p),",
+             (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
+              get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
+              get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
+              "RBH_Save_?"), get_itbl(bqe));
+      break;
+    default:
+      barf("Unexpected closure type %s in blocking queue of %p (%s)",
+          info_type((StgClosure *)bqe), node, info_type(node));
+      break;
+    }
+  } /* for */
+  debugBelch("\n");
+}
+# endif
+
+#if defined(PARALLEL_HASKELL)
+nat
+run_queue_len(void)
+{
+    nat i;
+    StgTSO *tso;
+    
+    for (i=0, tso=run_queue_hd; 
+        tso != END_TSO_QUEUE;
+        i++, tso=tso->link) {
+       /* nothing */
+    }
+       
+    return i;
+}
+#endif
+
+#endif /* DEBUG */
diff --git a/rts/Threads.h b/rts/Threads.h
new file mode 100644 (file)
index 0000000..e331c50
--- /dev/null
@@ -0,0 +1,46 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2006
+ *
+ * Thread-related functionality
+ *
+ * --------------------------------------------------------------------------*/
+
+#ifndef THREADS_H
+#define THREADS_H
+
+#if defined(GRAN) || defined(PARALLEL_HASKELL)
+StgBlockingQueueElement * unblockOne (StgBlockingQueueElement *bqe, 
+                                     StgClosure *node);
+#else
+StgTSO * unblockOne (Capability *cap, StgTSO *tso);
+StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
+#endif
+
+#if defined(GRAN) || defined(PARALLEL_HASKELL)
+void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
+#else
+void awakenBlockedQueue (Capability *cap, StgTSO *tso);
+#endif
+
+void removeThreadFromMVarQueue (StgMVar *mvar, StgTSO *tso);
+void removeThreadFromQueue     (StgTSO **queue, StgTSO *tso);
+void removeThreadFromDeQueue   (StgTSO **head, StgTSO **tail, StgTSO *tso);
+
+StgBool isThreadBound (StgTSO* tso);
+
+#ifdef DEBUG
+void printThreadBlockage (StgTSO *tso);
+void printThreadStatus (StgTSO *t);
+void printAllThreads (void);
+void printThreadQueue (StgTSO *t);
+# if defined(PARALLEL_HASKELL)
+void print_bq (StgClosure *node);
+void print_bqe (StgBlockingQueueElement *bqe);
+nat  run_queue_len (void);
+# elif defined(GRAN)
+void print_bq (StgClosure *node);
+# endif
+#endif
+
+#endif /* THREADS_H */
index 19e492c..cf6c141 100644 (file)
@@ -45,11 +45,11 @@ void traceEnd (void);
 #ifdef DEBUG
 #define debugTrace(class, str, ...) trace(class,str, ## __VA_ARGS__)
 // variable arg macros are C99, and supported by gcc.
-#define debugTraceBegin(class, str, ...) traceBegin(class,str, ## __VA_ARGS__)
+#define debugTraceBegin(str, ...) traceBegin(str, ## __VA_ARGS__)
 #define debugTraceEnd() traceEnd()
 #else
 #define debugTrace(class, str, ...) /* nothing */
-#define debugTraceBegin(class, str, ...) /* nothing */
+#define debugTraceBegin(str, ...) /* nothing */
 #define debugTraceEnd() /* nothing */
 #endif