Use message-passing to implement throwTo in the RTS
authorSimon Marlow <marlowsd@gmail.com>
Thu, 11 Mar 2010 09:57:44 +0000 (09:57 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Thu, 11 Mar 2010 09:57:44 +0000 (09:57 +0000)
This replaces some complicated locking schemes with message-passing
in the implementation of throwTo. The benefits are

 - previously it was impossible to guarantee that a throwTo from
   a thread running on one CPU to a thread running on another CPU
   would be noticed, and we had to rely on the GC to pick up these
   forgotten exceptions. This no longer happens.

 - the locking regime is simpler (though the code is about the same
   size)

 - threads can be unblocked from a blocked_exceptions queue without
   having to traverse the whole queue now.  It's a rare case, but
   replaces an O(n) operation with an O(1).

 - generally we move in the direction of sharing less between
   Capabilities (aka HECs), which will become important with other
   changes we have planned.

Also in this patch I replaced several STM-specific closure types with
a generic MUT_PRIM closure type, which allowed a lot of code in the GC
and other places to go away, hence the line-count reduction.  The
message-passing changes resulted in about a net zero line-count
difference.

35 files changed:
includes/rts/Constants.h
includes/rts/storage/ClosureMacros.h
includes/rts/storage/ClosureTypes.h
includes/rts/storage/Closures.h
includes/rts/storage/SMPClosureOps.h
includes/rts/storage/TSO.h
includes/stg/MiscClosures.h
rts/Capability.c
rts/Capability.h
rts/ClosureFlags.c
rts/Exception.cmm
rts/FrontPanel.c
rts/HeapStackCheck.cmm
rts/LdvProfile.c
rts/PrimOps.cmm
rts/Printer.c
rts/ProfHeap.c
rts/RaiseAsync.c
rts/RaiseAsync.h
rts/RetainerProfile.c
rts/RtsMessages.c
rts/STM.c
rts/Schedule.c
rts/Schedule.h
rts/StgMiscClosures.cmm
rts/Threads.c
rts/Threads.h
rts/Trace.h
rts/eventlog/EventLog.h
rts/sm/Compact.c
rts/sm/Evac.c
rts/sm/GC.c
rts/sm/MarkWeak.c
rts/sm/Sanity.c
rts/sm/Scav.c

index 54a1ca7..bfc77fa 100644 (file)
 #define NotBlocked          0
 #define BlockedOnMVar       1
 #define BlockedOnBlackHole  2
 #define NotBlocked          0
 #define BlockedOnMVar       1
 #define BlockedOnBlackHole  2
-#define BlockedOnException  3
-#define BlockedOnRead       4
-#define BlockedOnWrite      5
-#define BlockedOnDelay      6
-#define BlockedOnSTM        7
+#define BlockedOnRead       3
+#define BlockedOnWrite      4
+#define BlockedOnDelay      5
+#define BlockedOnSTM        6
 
 /* Win32 only: */
 
 /* Win32 only: */
-#define BlockedOnDoProc     8
+#define BlockedOnDoProc     7
 
 /* Only relevant for PAR: */
   /* blocked on a remote closure represented by a Global Address: */
 
 /* Only relevant for PAR: */
   /* blocked on a remote closure represented by a Global Address: */
-#define BlockedOnGA         9
+#define BlockedOnGA         8
   /* same as above but without sending a Fetch message */
   /* same as above but without sending a Fetch message */
-#define BlockedOnGA_NoSend  10
+#define BlockedOnGA_NoSend  9
 /* Only relevant for THREADED_RTS: */
 /* Only relevant for THREADED_RTS: */
-#define BlockedOnCCall      11
-#define BlockedOnCCall_NoUnblockExc 12
+#define BlockedOnCCall      10
+#define BlockedOnCCall_NoUnblockExc 11
    /* same as above but don't unblock async exceptions in resumeThread() */
 
    /* same as above but don't unblock async exceptions in resumeThread() */
 
+/* Involved in a message sent to tso->msg_cap */
+#define BlockedOnMsgWakeup  12
+#define BlockedOnMsgThrowTo 13
 /*
  * These constants are returned to the scheduler by a thread that has
  * stopped for one reason or another.  See typedef StgThreadReturnCode
 /*
  * These constants are returned to the scheduler by a thread that has
  * stopped for one reason or another.  See typedef StgThreadReturnCode
index f73d2c5..a115f6f 100644 (file)
@@ -335,18 +335,8 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info)
        return tso_sizeW((StgTSO *)p);
     case BCO:
        return bco_sizeW((StgBCO *)p);
        return tso_sizeW((StgTSO *)p);
     case BCO:
        return bco_sizeW((StgBCO *)p);
-    case TVAR_WATCH_QUEUE:
-        return sizeofW(StgTVarWatchQueue);
-    case TVAR:
-        return sizeofW(StgTVar);
     case TREC_CHUNK:
         return sizeofW(StgTRecChunk);
     case TREC_CHUNK:
         return sizeofW(StgTRecChunk);
-    case TREC_HEADER:
-        return sizeofW(StgTRecHeader);
-    case ATOMIC_INVARIANT:
-        return sizeofW(StgAtomicInvariant);
-    case INVARIANT_CHECK_QUEUE:
-        return sizeofW(StgInvariantCheckQueue);
     default:
        return sizeW_fromITBL(info);
     }
     default:
        return sizeW_fromITBL(info);
     }
index 508dce2..6a76772 100644 (file)
 #define MUT_VAR_CLEAN          50
 #define MUT_VAR_DIRTY          51
 #define WEAK                   52
 #define MUT_VAR_CLEAN          50
 #define MUT_VAR_DIRTY          51
 #define WEAK                   52
-#define STABLE_NAME            53
-#define TSO                    54
-#define TVAR_WATCH_QUEUE        55
-#define INVARIANT_CHECK_QUEUE   56
-#define ATOMIC_INVARIANT        57
-#define TVAR                    58
-#define TREC_CHUNK              59
-#define TREC_HEADER             60
-#define ATOMICALLY_FRAME        61
-#define CATCH_RETRY_FRAME       62
-#define CATCH_STM_FRAME         63
-#define WHITEHOLE               64
-#define N_CLOSURE_TYPES         65
+#define PRIM                   53
+#define MUT_PRIM                54
+#define TSO                    55
+#define TREC_CHUNK              56
+#define ATOMICALLY_FRAME        57
+#define CATCH_RETRY_FRAME       58
+#define CATCH_STM_FRAME         59
+#define WHITEHOLE               60
+#define N_CLOSURE_TYPES         61
 
 #endif /* RTS_STORAGE_CLOSURETYPES_H */
 
 #endif /* RTS_STORAGE_CLOSURETYPES_H */
index 8928268..d7498e2 100644 (file)
@@ -390,10 +390,10 @@ typedef struct StgInvariantCheckQueue_ {
 
 struct StgTRecHeader_ {
   StgHeader                  header;
 
 struct StgTRecHeader_ {
   StgHeader                  header;
-  TRecState                  state;
   struct StgTRecHeader_     *enclosing_trec;
   StgTRecChunk              *current_chunk;
   StgInvariantCheckQueue    *invariants_to_check;
   struct StgTRecHeader_     *enclosing_trec;
   StgTRecChunk              *current_chunk;
   StgInvariantCheckQueue    *invariants_to_check;
+  TRecState                  state;
 };
 
 typedef struct {
 };
 
 typedef struct {
@@ -416,4 +416,27 @@ typedef struct {
   StgClosure    *alt_code;
 } StgCatchRetryFrame;
 
   StgClosure    *alt_code;
 } StgCatchRetryFrame;
 
+/* ----------------------------------------------------------------------------
+   Messages
+   ------------------------------------------------------------------------- */
+
+typedef struct Message_ {
+    StgHeader        header;
+    struct Message_ *link;
+} Message;
+
+typedef struct MessageWakeup_ {
+    StgHeader header;
+    Message  *link;
+    StgTSO   *tso;
+} MessageWakeup;
+
+typedef struct MessageThrowTo_ {
+    StgHeader   header;
+    Message    *link;
+    StgTSO     *source;
+    StgTSO     *target;
+    StgClosure *exception;
+} MessageThrowTo;
+
 #endif /* RTS_STORAGE_CLOSURES_H */
 #endif /* RTS_STORAGE_CLOSURES_H */
index 582ec0e..8dee7cb 100644 (file)
@@ -18,6 +18,7 @@
 #else
 
 EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p);
 #else
 
 EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p);
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p);
 EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info);
 
 #if defined(THREADED_RTS)
 EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info);
 
 #if defined(THREADED_RTS)
@@ -43,11 +44,15 @@ EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
     } while (1);
 }
 
     } while (1);
 }
 
-EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p)
 {
 {
-    // This is a strictly ordered write, so we need a write_barrier():
-    write_barrier();
-    p->header.info = info;
+    StgWord info;
+    info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
+    if (info != (W_)&stg_WHITEHOLE_info) {
+        return (StgInfoTable *)info;
+    } else {
+        return NULL;
+    }
 }
 
 #else /* !THREADED_RTS */
 }
 
 #else /* !THREADED_RTS */
@@ -56,12 +61,19 @@ EXTERN_INLINE StgInfoTable *
 lockClosure(StgClosure *p)
 { return (StgInfoTable *)p->header.info; }
 
 lockClosure(StgClosure *p)
 { return (StgInfoTable *)p->header.info; }
 
-EXTERN_INLINE void
-unlockClosure(StgClosure *p STG_UNUSED, const StgInfoTable *info STG_UNUSED)
-{ /* nothing */ }
+EXTERN_INLINE StgInfoTable *
+tryLockClosure(StgClosure *p)
+{ return (StgInfoTable *)p->header.info; }
 
 #endif /* THREADED_RTS */
 
 
 #endif /* THREADED_RTS */
 
+EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+{
+    // This is a strictly ordered write, so we need a write_barrier():
+    write_barrier();
+    p->header.info = info;
+}
+
 // Handy specialised versions of lockClosure()/unlockClosure()
 EXTERN_INLINE void lockTSO(StgTSO *tso);
 EXTERN_INLINE void lockTSO(StgTSO *tso)
 // Handy specialised versions of lockClosure()/unlockClosure()
 EXTERN_INLINE void lockTSO(StgTSO *tso);
 EXTERN_INLINE void lockTSO(StgTSO *tso)
index e8d97c5..e2015f2 100644 (file)
@@ -46,7 +46,8 @@ typedef struct {
 /* Reason for thread being blocked. See comment above struct StgTso_. */
 typedef union {
   StgClosure *closure;
 /* Reason for thread being blocked. See comment above struct StgTso_. */
 typedef union {
   StgClosure *closure;
-  struct StgTSO_ *tso;
+  struct MessageThrowTo_ *throwto;
+  struct MessageWakeup_  *wakeup;
   StgInt fd;   /* StgInt instead of int, so that it's the same size as the ptrs */
 #if defined(mingw32_HOST_OS)
   StgAsyncIOResult *async_result;
   StgInt fd;   /* StgInt instead of int, so that it's the same size as the ptrs */
 #if defined(mingw32_HOST_OS)
   StgAsyncIOResult *async_result;
@@ -87,7 +88,8 @@ typedef struct StgTSO_ {
          will already be dirty.
     */
 
          will already be dirty.
     */
 
-    struct StgTSO_*         global_link;    /* Links all threads together */
+    struct StgTSO_*         global_link;    // Links threads on the
+                                            // generation->threads lists
     
     StgWord                 dirty;          /* non-zero => dirty */
     /*
     
     StgWord                 dirty;          /* non-zero => dirty */
     /*
@@ -108,9 +110,9 @@ typedef struct StgTSO_ {
      * setTSOLink().
      */
 
      * setTSOLink().
      */
 
-    StgWord16               what_next;      /* Values defined in Constants.h */
-    StgWord16               why_blocked;    /* Values defined in Constants.h */
-    StgWord32               flags;
+    StgWord16               what_next;      // Values defined in Constants.h
+    StgWord16               why_blocked;    // Values defined in Constants.h
+    StgWord32               flags;          // Values defined in Constants.h
     StgTSOBlockInfo         block_info;
     StgThreadID             id;
     int                     saved_errno;
     StgTSOBlockInfo         block_info;
     StgThreadID             id;
     int                     saved_errno;
@@ -123,7 +125,7 @@ typedef struct StgTSO_ {
        exceptions.  In order to access this field, the TSO must be
        locked using lockClosure/unlockClosure (see SMP.h).
     */
        exceptions.  In order to access this field, the TSO must be
        locked using lockClosure/unlockClosure (see SMP.h).
     */
-    struct StgTSO_ *        blocked_exceptions;
+    struct MessageThrowTo_ * blocked_exceptions;
 
 #ifdef TICKY_TICKY
     /* TICKY-specific stuff would go here. */
 
 #ifdef TICKY_TICKY
     /* TICKY-specific stuff would go here. */
@@ -167,7 +169,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
 
        tso->why_blocked       tso->block_info      location
         ----------------------------------------------------------------------
 
        tso->why_blocked       tso->block_info      location
         ----------------------------------------------------------------------
-       NotBlocked             NULL                 runnable_queue, or running
+       NotBlocked             END_TSO_QUEUE        runnable_queue, or running
        
         BlockedOnBlackHole     the BLACKHOLE        blackhole_queue
        
        
         BlockedOnBlackHole     the BLACKHOLE        blackhole_queue
        
@@ -175,7 +177,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
 
        BlockedOnSTM           END_TSO_QUEUE        STM wait queue(s)
        
 
        BlockedOnSTM           END_TSO_QUEUE        STM wait queue(s)
        
-        BlockedOnException     the TSO              TSO->blocked_exception
+        BlockedOnMsgThrowTo    MessageThrowTo *     TSO->blocked_exception
 
         BlockedOnRead          NULL                 blocked_queue
         BlockedOnWrite         NULL                blocked_queue
 
         BlockedOnRead          NULL                 blocked_queue
         BlockedOnWrite         NULL                blocked_queue
@@ -189,7 +191,6 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
       
       tso->what_next == ThreadComplete or ThreadKilled
       tso->link     ==  (could be on some queue somewhere)
       
       tso->what_next == ThreadComplete or ThreadKilled
       tso->link     ==  (could be on some queue somewhere)
-      tso->su       ==  tso->stack + tso->stack_size
       tso->sp       ==  tso->stack + tso->stack_size - 1 (i.e. top stack word)
       tso->sp[0]    ==  return value of thread, if what_next == ThreadComplete,
                         exception             , if what_next == ThreadKilled
       tso->sp       ==  tso->stack + tso->stack_size - 1 (i.e. top stack word)
       tso->sp[0]    ==  return value of thread, if what_next == ThreadComplete,
                         exception             , if what_next == ThreadKilled
index e68282e..42e878f 100644 (file)
@@ -114,6 +114,8 @@ RTS_INFO(stg_MUT_ARR_PTRS_FROZEN0_info);
 RTS_INFO(stg_MUT_VAR_CLEAN_info);
 RTS_INFO(stg_MUT_VAR_DIRTY_info);
 RTS_INFO(stg_END_TSO_QUEUE_info);
 RTS_INFO(stg_MUT_VAR_CLEAN_info);
 RTS_INFO(stg_MUT_VAR_DIRTY_info);
 RTS_INFO(stg_END_TSO_QUEUE_info);
+RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_THROWTO_info);
 RTS_INFO(stg_MUT_CONS_info);
 RTS_INFO(stg_catch_info);
 RTS_INFO(stg_PAP_info);
 RTS_INFO(stg_MUT_CONS_info);
 RTS_INFO(stg_catch_info);
 RTS_INFO(stg_PAP_info);
@@ -163,6 +165,8 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0_entry);
 RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
 RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
 RTS_ENTRY(stg_END_TSO_QUEUE_entry);
 RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
 RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
 RTS_ENTRY(stg_END_TSO_QUEUE_entry);
+RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_THROWTO_entry);
 RTS_ENTRY(stg_MUT_CONS_entry);
 RTS_ENTRY(stg_catch_entry);
 RTS_ENTRY(stg_PAP_entry);
 RTS_ENTRY(stg_MUT_CONS_entry);
 RTS_ENTRY(stg_catch_entry);
 RTS_ENTRY(stg_PAP_entry);
@@ -205,8 +209,6 @@ RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure);
 RTS_CLOSURE(stg_NO_TREC_closure);
 
 RTS_ENTRY(stg_NO_FINALIZER_entry);
 RTS_CLOSURE(stg_NO_TREC_closure);
 
 RTS_ENTRY(stg_NO_FINALIZER_entry);
-RTS_ENTRY(stg_END_EXCEPTION_LIST_entry);
-RTS_ENTRY(stg_EXCEPTION_CONS_entry);
 
 #if IN_STG_CODE
 extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure;
 
 #if IN_STG_CODE
 extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure;
index ce6eceb..5f54eca 100644 (file)
@@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i )
     cap->suspended_ccalls  = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
     cap->suspended_ccalls  = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
-    cap->wakeup_queue_hd    = END_TSO_QUEUE;
-    cap->wakeup_queue_tl    = END_TSO_QUEUE;
+    cap->inbox              = (Message*)END_TSO_QUEUE;
     cap->sparks_created     = 0;
     cap->sparks_converted   = 0;
     cap->sparks_pruned      = 0;
     cap->sparks_created     = 0;
     cap->sparks_converted   = 0;
     cap->sparks_pruned      = 0;
@@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap,
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
     if (always_wakeup || 
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
     if (always_wakeup || 
-        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+        !emptyRunQueue(cap) || !emptyInbox(cap) ||
         !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
         !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
@@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task)
  * ------------------------------------------------------------------------- */
 
 void
  * ------------------------------------------------------------------------- */
 
 void
-wakeupThreadOnCapability (Capability *my_cap, 
+wakeupThreadOnCapability (Capability *cap,
                           Capability *other_cap, 
                           StgTSO *tso)
 {
                           Capability *other_cap, 
                           StgTSO *tso)
 {
-    ACQUIRE_LOCK(&other_cap->lock);
+    MessageWakeup *msg;
 
     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
     if (tso->bound) {
 
     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
     if (tso->bound) {
@@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap,
     }
     tso->cap = other_cap;
 
     }
     tso->cap = other_cap;
 
-    ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
+    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+           tso->block_info.closure->header.info == &stg_IND_info);
 
 
-    if (other_cap->running_task == NULL) {
-       // nobody is running this Capability, we can add our thread
-       // directly onto the run queue and start up a Task to run it.
+    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
 
 
-       other_cap->running_task = myTask(); 
-            // precond for releaseCapability_() and appendToRunQueue()
+    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+    msg->header.info = &stg_MSG_WAKEUP_info;
+    msg->tso = tso;
+    tso->block_info.closure = (StgClosure *)msg;
+    dirty_TSO(cap, tso);
+    write_barrier();
+    tso->why_blocked = BlockedOnMsgWakeup;
 
 
-       appendToRunQueue(other_cap,tso);
-
-       releaseCapability_(other_cap,rtsFalse);
-    } else {
-       appendToWakeupQueue(my_cap,other_cap,tso);
-        other_cap->context_switch = 1;
-       // someone is running on this Capability, so it cannot be
-       // freed without first checking the wakeup queue (see
-       // releaseCapability_).
-    }
-
-    RELEASE_LOCK(&other_cap->lock);
+    sendMessage(other_cap, (Message*)msg);
 }
 
 /* ----------------------------------------------------------------------------
 }
 
 /* ----------------------------------------------------------------------------
@@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
        evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
        evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
 #if defined(THREADED_RTS)
        evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
        evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
 #if defined(THREADED_RTS)
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+        evac(user, (StgClosure **)(void *)&cap->inbox);
 #endif
        for (incall = cap->suspended_ccalls; incall != NULL; 
             incall=incall->next) {
 #endif
        for (incall = cap->suspended_ccalls; incall != NULL; 
             incall=incall->next) {
@@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
 {
     markSomeCapabilities(evac, user, 0, 1, rtsFalse);
 }
 {
     markSomeCapabilities(evac, user, 0, 1, rtsFalse);
 }
+
+/* -----------------------------------------------------------------------------
+   Messages
+   -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *cap, Message *msg)
+{
+    ACQUIRE_LOCK(&cap->lock);
+
+    msg->link = cap->inbox;
+    cap->inbox = msg;
+
+    if (cap->running_task == NULL) {
+       cap->running_task = myTask(); 
+            // precond for releaseCapability_()
+       releaseCapability_(cap,rtsFalse);
+    } else {
+        contextSwitchCapability(cap);
+    }
+
+    RELEASE_LOCK(&cap->lock);
+}
+
+#endif // THREADED_RTS
index 41974dc..4030b5e 100644 (file)
@@ -88,11 +88,8 @@ struct Capability_ {
     Task *returning_tasks_hd; // Singly-linked, with head/tail
     Task *returning_tasks_tl;
 
     Task *returning_tasks_hd; // Singly-linked, with head/tail
     Task *returning_tasks_tl;
 
-    // A list of threads to append to this Capability's run queue at
-    // the earliest opportunity.  These are threads that have been
-    // woken up by another Capability.
-    StgTSO *wakeup_queue_hd;
-    StgTSO *wakeup_queue_tl;
+    // Messages, or END_TSO_QUEUE.
+    Message *inbox;
 
     SparkPool *sparks;
 
 
     SparkPool *sparks;
 
@@ -285,6 +282,18 @@ void markCapabilities (evac_fn evac, void *user);
 void traverseSparkQueues (evac_fn evac, void *user);
 
 /* -----------------------------------------------------------------------------
 void traverseSparkQueues (evac_fn evac, void *user);
 
 /* -----------------------------------------------------------------------------
+   Messages
+   -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
+
+void sendMessage (Capability *cap, Message *msg);
+
+#endif // THREADED_RTS
+
+/* -----------------------------------------------------------------------------
  * INLINE functions... private below here
  * -------------------------------------------------------------------------- */
 
  * INLINE functions... private below here
  * -------------------------------------------------------------------------- */
 
@@ -333,6 +342,15 @@ contextSwitchCapability (Capability *cap)
     cap->context_switch = 1;
 }
 
     cap->context_switch = 1;
 }
 
+#ifdef THREADED_RTS
+
+INLINE_HEADER rtsBool emptyInbox(Capability *cap)
+{
+    return (cap->inbox == (Message*)END_TSO_QUEUE);
+}
+
+#endif
+
 END_RTS_PRIVATE
 
 #endif /* CAPABILITY_H */
 END_RTS_PRIVATE
 
 #endif /* CAPABILITY_H */
index 477a892..358cb40 100644 (file)
@@ -74,20 +74,16 @@ StgWord16 closure_flags[] = {
  [MUT_VAR_CLEAN]       =  (_HNF|     _NS|         _MUT|_UPT           ),
  [MUT_VAR_DIRTY]       =  (_HNF|     _NS|         _MUT|_UPT           ),
  [WEAK]                        =  (_HNF|     _NS|              _UPT           ),
  [MUT_VAR_CLEAN]       =  (_HNF|     _NS|         _MUT|_UPT           ),
  [MUT_VAR_DIRTY]       =  (_HNF|     _NS|         _MUT|_UPT           ),
  [WEAK]                        =  (_HNF|     _NS|              _UPT           ),
- [STABLE_NAME]         =  (_HNF|     _NS|              _UPT           ),
+ [PRIM]                =  (_HNF|     _NS|              _UPT           ),
+ [MUT_PRIM]            =  (_HNF|     _NS|         _MUT|_UPT           ),
  [TSO]                         =  (_HNF|     _NS|         _MUT|_UPT           ),
  [TSO]                         =  (_HNF|     _NS|         _MUT|_UPT           ),
- [TVAR_WATCH_QUEUE]     =  (          _NS|         _MUT|_UPT           ),
- [INVARIANT_CHECK_QUEUE]=  (          _NS|         _MUT|_UPT           ),
- [ATOMIC_INVARIANT]     =  (          _NS|         _MUT|_UPT           ),
- [TVAR]                 =  (_HNF|     _NS|         _MUT|_UPT           ), 
  [TREC_CHUNK]           =  (          _NS|         _MUT|_UPT           ),
  [TREC_CHUNK]           =  (          _NS|         _MUT|_UPT           ),
- [TREC_HEADER]          =  (          _NS|         _MUT|_UPT           ),
  [ATOMICALLY_FRAME]     =  (     _BTM                                  ),
  [CATCH_RETRY_FRAME]    =  (     _BTM                                  ),
  [CATCH_STM_FRAME]      =  (     _BTM                                  ),
  [WHITEHOLE]           =  ( 0                                         )
 };
 
  [ATOMICALLY_FRAME]     =  (     _BTM                                  ),
  [CATCH_RETRY_FRAME]    =  (     _BTM                                  ),
  [CATCH_STM_FRAME]      =  (     _BTM                                  ),
  [WHITEHOLE]           =  ( 0                                         )
 };
 
-#if N_CLOSURE_TYPES != 65
+#if N_CLOSURE_TYPES != 61
 #error Closure types changed: update ClosureFlags.c!
 #endif
 #error Closure types changed: update ClosureFlags.c!
 #endif
index 6c887c2..55c79ce 100644 (file)
@@ -56,7 +56,7 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
     CInt r;
 
     StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & 
     CInt r;
 
     StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & 
-       ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+       %lobits32(~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
 
     /* Eagerly raise a blocked exception, if there is one */
     if (StgTSO_blocked_exceptions(CurrentTSO) != END_TSO_QUEUE) {
 
     /* Eagerly raise a blocked exception, if there is one */
     if (StgTSO_blocked_exceptions(CurrentTSO) != END_TSO_QUEUE) {
@@ -99,8 +99,8 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
 
 INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, RET_SMALL )
 {
 
 INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, RET_SMALL )
 {
-    StgTSO_flags(CurrentTSO) = 
-       StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+    StgTSO_flags(CurrentTSO) = %lobits32(
+       TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
 
     Sp_adj(1);
     jump %ENTRY_CODE(Sp(0));
 
     Sp_adj(1);
     jump %ENTRY_CODE(Sp(0));
@@ -113,8 +113,8 @@ stg_blockAsyncExceptionszh
 
     if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
        
 
     if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
        
-       StgTSO_flags(CurrentTSO) = 
-          StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+       StgTSO_flags(CurrentTSO) = %lobits32(
+          TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
 
        /* avoid growing the stack unnecessarily */
        if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
 
        /* avoid growing the stack unnecessarily */
        if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
@@ -142,8 +142,8 @@ stg_unblockAsyncExceptionszh
     /* If exceptions are already unblocked, there's nothing to do */
     if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) != 0) {
 
     /* If exceptions are already unblocked, there's nothing to do */
     if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) != 0) {
 
-       StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & 
-          ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+       StgTSO_flags(CurrentTSO) = %lobits32(
+           TO_W_(StgTSO_flags(CurrentTSO)) & ~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
 
        /* avoid growing the stack unnecessarily */
        if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
 
        /* avoid growing the stack unnecessarily */
        if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
@@ -252,27 +252,22 @@ stg_killThreadzh
        }
     } else {
        W_ out;
        }
     } else {
        W_ out;
-       W_ retcode;
+       W_ msg;
        out = Sp - WDS(1); /* ok to re-use stack space here */
 
        out = Sp - WDS(1); /* ok to re-use stack space here */
 
-       (retcode) = foreign "C" throwTo(MyCapability() "ptr",
-                                     CurrentTSO "ptr",
-                                     target "ptr",
-                                     exception "ptr",
-                                     out "ptr") [R1,R2];
+       (msg) = foreign "C" throwTo(MyCapability() "ptr",
+                                    CurrentTSO "ptr",
+                                    target "ptr",
+                                    exception "ptr") [R1,R2];
        
        
-       switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) {
-
-       case THROWTO_SUCCESS: {
+        if (msg == NULL) {
            jump %ENTRY_CODE(Sp(0));
            jump %ENTRY_CODE(Sp(0));
-       }
-
-       case THROWTO_BLOCKED: {
-           R3 = W_[out];
-           // we must block, and call throwToReleaseTarget() before returning
+       } else {
+            StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
+            StgTSO_block_info(CurrentTSO) = msg;
+           // we must block, and unlock the message before returning
            jump stg_block_throwto;
        }
            jump stg_block_throwto;
        }
-       }
     }
 }
 
     }
 }
 
@@ -507,8 +502,8 @@ retry_pop_stack:
 
     /* Ensure that async excpetions are blocked when running the handler.
     */
 
     /* Ensure that async excpetions are blocked when running the handler.
     */
-    StgTSO_flags(CurrentTSO) = 
-       StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+    StgTSO_flags(CurrentTSO) = %lobits32(
+       TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
 
     /* Call the handler, passing the exception value and a realworld
      * token as arguments.
 
     /* Call the handler, passing the exception value and a realworld
      * token as arguments.
index 163a7c0..ebba405 100644 (file)
@@ -697,7 +697,7 @@ residencyCensus( void )
                        break;
                        
                    case WEAK:
                        break;
                        
                    case WEAK:
-                   case STABLE_NAME:
+                   case PRIM:
                    case MVAR:
                    case MUT_VAR:
 /*                 case MUT_CONS: FIXME: case does not exist */
                    case MVAR:
                    case MUT_VAR:
 /*                 case MUT_CONS: FIXME: case does not exist */
index b516ef2..a528a3f 100644 (file)
@@ -631,9 +631,8 @@ INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
 
 stg_block_throwto_finally
 {
 
 stg_block_throwto_finally
 {
-#ifdef THREADED_RTS
-    foreign "C" throwToReleaseTarget (R3 "ptr");
-#endif
+    // unlock the throwto message
+    unlockClosure(StgTSO_block_info(CurrentTSO), stg_MSG_THROWTO_info);
     jump StgReturn;
 }
 
     jump StgReturn;
 }
 
index c2e7d7e..ccaf10c 100644 (file)
@@ -109,7 +109,7 @@ processHeapClosureForDead( StgClosure *c )
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
     case BCO:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
     case BCO:
-    case STABLE_NAME:
+    case PRIM:
     case TVAR_WATCH_QUEUE:
     case TVAR:
     case TREC_HEADER:
     case TVAR_WATCH_QUEUE:
     case TVAR:
     case TREC_HEADER:
index 5325c85..bf81eee 100644 (file)
@@ -542,9 +542,9 @@ stg_forkzh
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
-  StgTSO_flags(threadid) = 
-     StgTSO_flags(threadid) |  (StgTSO_flags(CurrentTSO) & 
-                                (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+  StgTSO_flags(threadid) = %lobits16(
+     TO_W_(StgTSO_flags(threadid)) | 
+     TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
 
   foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
 
 
   foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
 
@@ -572,9 +572,9 @@ stg_forkOnzh
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
-  StgTSO_flags(threadid) = 
-     StgTSO_flags(threadid) |  (StgTSO_flags(CurrentTSO) & 
-                                (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+  StgTSO_flags(threadid) = %lobits16(
+     TO_W_(StgTSO_flags(threadid)) | 
+     TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
 
   foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
 
 
   foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
 
index 1b8a6dd..e981329 100644 (file)
@@ -160,6 +160,12 @@ printClosure( StgClosure *obj )
        printStdObjPayload(obj);
        break;
 
        printStdObjPayload(obj);
        break;
 
+    case PRIM:
+       debugBelch("PRIM(");
+       printPtr((StgPtr)obj->header.info);
+       printStdObjPayload(obj);
+       break;
+
     case THUNK:
     case THUNK_1_0: case THUNK_0_1:
     case THUNK_1_1: case THUNK_0_2: case THUNK_2_0:
     case THUNK:
     case THUNK_1_0: case THUNK_0_1:
     case THUNK_1_1: case THUNK_0_2: case THUNK_2_0:
@@ -356,10 +362,6 @@ printClosure( StgClosure *obj )
            /* ToDo: chase 'link' ? */
             break;
 
            /* ToDo: chase 'link' ? */
             break;
 
-    case STABLE_NAME:
-            debugBelch("STABLE_NAME(%lu)\n", (lnat)((StgStableName*)obj)->sn); 
-            break;
-
     case TSO:
       debugBelch("TSO("); 
       debugBelch("%lu (%p)",(unsigned long)(((StgTSO*)obj)->id), (StgTSO*)obj);
     case TSO:
       debugBelch("TSO("); 
       debugBelch("%lu (%p)",(unsigned long)(((StgTSO*)obj)->id), (StgTSO*)obj);
@@ -1132,14 +1134,10 @@ char *closure_type_names[] = {
  [MUT_VAR_CLEAN]         = "MUT_VAR_CLEAN",
  [MUT_VAR_DIRTY]         = "MUT_VAR_DIRTY",
  [WEAK]                  = "WEAK",
  [MUT_VAR_CLEAN]         = "MUT_VAR_CLEAN",
  [MUT_VAR_DIRTY]         = "MUT_VAR_DIRTY",
  [WEAK]                  = "WEAK",
- [STABLE_NAME]           = "STABLE_NAME",
+ [PRIM]                         = "PRIM",
+ [MUT_PRIM]              = "MUT_PRIM",
  [TSO]                   = "TSO",
  [TSO]                   = "TSO",
- [TVAR_WATCH_QUEUE]      = "TVAR_WATCH_QUEUE",
- [INVARIANT_CHECK_QUEUE] = "INVARIANT_CHECK_QUEUE",
- [ATOMIC_INVARIANT]      = "ATOMIC_INVARIANT",
- [TVAR]                  = "TVAR",
  [TREC_CHUNK]            = "TREC_CHUNK",
  [TREC_CHUNK]            = "TREC_CHUNK",
- [TREC_HEADER]           = "TREC_HEADER",
  [ATOMICALLY_FRAME]      = "ATOMICALLY_FRAME",
  [CATCH_RETRY_FRAME]     = "CATCH_RETRY_FRAME",
  [CATCH_STM_FRAME]       = "CATCH_STM_FRAME",
  [ATOMICALLY_FRAME]      = "ATOMICALLY_FRAME",
  [CATCH_RETRY_FRAME]     = "CATCH_RETRY_FRAME",
  [CATCH_STM_FRAME]       = "CATCH_STM_FRAME",
index 15337d4..e90051c 100644 (file)
@@ -912,7 +912,8 @@ heapCensusChain( Census *census, bdescr *bd )
             case MVAR_CLEAN:
             case MVAR_DIRTY:
            case WEAK:
             case MVAR_CLEAN:
             case MVAR_DIRTY:
            case WEAK:
-           case STABLE_NAME:
+           case PRIM:
+           case MUT_PRIM:
            case MUT_VAR_CLEAN:
            case MUT_VAR_DIRTY:
                prim = rtsTrue;
            case MUT_VAR_CLEAN:
            case MUT_VAR_DIRTY:
                prim = rtsTrue;
@@ -960,31 +961,6 @@ heapCensusChain( Census *census, bdescr *bd )
                break;
 #endif
 
                break;
 #endif
 
-           case TREC_HEADER: 
-               prim = rtsTrue;
-               size = sizeofW(StgTRecHeader);
-               break;
-
-           case TVAR_WATCH_QUEUE:
-               prim = rtsTrue;
-               size = sizeofW(StgTVarWatchQueue);
-               break;
-               
-           case INVARIANT_CHECK_QUEUE:
-               prim = rtsTrue;
-               size = sizeofW(StgInvariantCheckQueue);
-               break;
-               
-           case ATOMIC_INVARIANT:
-               prim = rtsTrue;
-               size = sizeofW(StgAtomicInvariant);
-               break;
-               
-           case TVAR:
-               prim = rtsTrue;
-               size = sizeofW(StgTVar);
-               break;
-               
            case TREC_CHUNK:
                prim = rtsTrue;
                size = sizeofW(StgTRecChunk);
            case TREC_CHUNK:
                prim = rtsTrue;
                size = sizeofW(StgTRecChunk);
index ca5e5ea..d54f823 100644 (file)
@@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap,
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
-static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap, 
+                            StgTSO *target, MessageThrowTo *msg);
 
 
-static void performBlockedException (Capability *cap, 
-                                    StgTSO *source, StgTSO *target);
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+                            Capability *target_cap USED_IF_THREADS, 
+                            MessageThrowTo *msg USED_IF_THREADS);
+
+static void performBlockedException (Capability *cap, MessageThrowTo *msg);
 
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
 
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
@@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
    may be blocked and could be woken up at any point by another CPU.
    We have some delicate synchronisation to do.
 
    may be blocked and could be woken up at any point by another CPU.
    We have some delicate synchronisation to do.
 
-   There is a completely safe fallback scheme: it is always possible
-   to just block the source TSO on the target TSO's blocked_exceptions
-   queue.  This queue is locked using lockTSO()/unlockTSO().  It is
-   checked at regular intervals: before and after running a thread
-   (schedule() and threadPaused() respectively), and just before GC
-   (scheduleDoGC()).  Activating a thread on this queue should be done
-   using maybePerformBlockedException(): this is done in the context
-   of the target thread, so the exception can be raised eagerly.
-
-   This fallback scheme works even if the target thread is complete or
-   killed: scheduleDoGC() will discover the blocked thread before the
-   target is GC'd.
-
-   Blocking the source thread on the target thread's blocked_exception
-   queue is also employed when the target thread is currently blocking
-   exceptions (ie. inside Control.Exception.block).
-
-   We could use the safe fallback scheme exclusively, but that
-   wouldn't be ideal: most calls to throwTo would block immediately,
-   possibly until the next GC, which might require the deadlock
-   detection mechanism to kick in.  So we try to provide promptness
-   wherever possible.
-
-   We can promptly deliver the exception if the target thread is:
-
-     - runnable, on the same Capability as the source thread (because
-       we own the run queue and therefore the target thread).
-   
-     - blocked, and we can obtain exclusive access to it.  Obtaining
-       exclusive access to the thread depends on how it is blocked.
-
-   We must also be careful to not trip over threadStackOverflow(),
-   which might be moving the TSO to enlarge its stack.
-   lockTSO()/unlockTSO() are used here too.
-
+   The underlying scheme when multiple Capabilities are in use is
+   message passing: when the target of a throwTo is on another
+   Capability, we send a message (a MessageThrowTo closure) to that
+   Capability.
+
+   If the throwTo needs to block because the target TSO is masking
+   exceptions (the TSO_BLOCKEX flag), then the message is placed on
+   the blocked_exceptions queue attached to the target TSO.  When the
+   target TSO enters the unmasked state again, it must check the
+   queue.  The blocked_exceptions queue is not locked; only the
+   Capability owning the TSO may modify it.
+
+   To make things simpler for throwTo, we always create the message
+   first before deciding what to do.  The message may get sent, or it
+   may get attached to a TSO's blocked_exceptions queue, or the
+   exception may get thrown immediately and the message dropped,
+   depending on the current state of the target.
+
+   Currently we send a message if the target belongs to another
+   Capability, and it is
+
+     - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+       BlockedOnCCall
+
+     - or it is masking exceptions (TSO_BLOCKEX)
+
+   Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+   BlockedOnBlackHole then we acquire ownership of the TSO by locking
+   its parent container (e.g. the MVar) and then raise the exception.
+   We might change these cases to be more message-passing-like in the
+   future.
+  
    Returns: 
 
    Returns: 
 
-   THROWTO_SUCCESS    exception was raised, ok to continue
+   NULL               exception was raised, ok to continue
 
 
-   THROWTO_BLOCKED    exception was not raised; block the source
-                      thread then call throwToReleaseTarget() when
-                     the source thread is properly tidied away.
+   MessageThrowTo *   exception was not raised; the source TSO
+                      should now put itself in the state 
+                      BlockedOnMsgThrowTo, and when it is ready
+                      it should unlock the mssage using
+                      unlockClosure(msg, &stg_MSG_THROWTO_info);
+                      If it decides not to raise the exception after
+                      all, it can revoke it safely with
+                      unlockClosure(msg, &stg_IND_info);
 
    -------------------------------------------------------------------------- */
 
 
    -------------------------------------------------------------------------- */
 
-nat
+MessageThrowTo *
 throwTo (Capability *cap,      // the Capability we hold 
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
 throwTo (Capability *cap,      // the Capability we hold 
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
-        StgClosure *exception, // the exception closure
-        /*[out]*/ void **out USED_IF_THREADS)
+        StgClosure *exception) // the exception closure
+{
+    MessageThrowTo *msg;
+
+    msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+    // message starts locked; the caller has to unlock it when it is
+    // ready.
+    msg->header.info = &stg_WHITEHOLE_info;
+    msg->source      = source;
+    msg->target      = target;
+    msg->exception   = exception;
+
+    switch (throwToMsg(cap, msg))
+    {
+    case THROWTO_SUCCESS:
+        return NULL;
+    case THROWTO_BLOCKED:
+    default:
+        return msg;
+    }
+}
+    
+
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
 {
     StgWord status;
 {
     StgWord status;
+    StgTSO *target = msg->target;
 
     ASSERT(target != END_TSO_QUEUE);
 
 
     ASSERT(target != END_TSO_QUEUE);
 
@@ -159,13 +189,10 @@ throwTo (Capability *cap, // the Capability we hold
        // ASSERT(get_itbl(target)->type == TSO);
     }
 
        // ASSERT(get_itbl(target)->type == TSO);
     }
 
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
-                   (unsigned long)source->id, (unsigned long)target->id);
-    } else {
-        debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu",
-                   (unsigned long)target->id);
-    }
+    debugTraceCap(DEBUG_sched, cap,
+                  "throwTo: from thread %lu to thread %lu",
+                  (unsigned long)msg->source->id, 
+                  (unsigned long)msg->target->id);
 
 #ifdef DEBUG
     traceThreadStatus(DEBUG_sched, target);
 
 #ifdef DEBUG
     traceThreadStatus(DEBUG_sched, target);
@@ -173,6 +200,7 @@ throwTo (Capability *cap,   // the Capability we hold
 
     goto check_target;
 retry:
 
     goto check_target;
 retry:
+    write_barrier();
     debugTrace(DEBUG_sched, "throwTo: retrying...");
 
 check_target:
     debugTrace(DEBUG_sched, "throwTo: retrying...");
 
 check_target:
@@ -188,6 +216,7 @@ check_target:
     
     switch (status) {
     case NotBlocked:
     
     switch (status) {
     case NotBlocked:
+    case BlockedOnMsgWakeup:
        /* if status==NotBlocked, and target->cap == cap, then
           we own this TSO and can raise the exception.
           
        /* if status==NotBlocked, and target->cap == cap, then
           we own this TSO and can raise the exception.
           
@@ -251,37 +280,80 @@ check_target:
 
        write_barrier();
        target_cap = target->cap;
 
        write_barrier();
        target_cap = target->cap;
-       if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
-           // It's on our run queue and not blocking exceptions
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           return THROWTO_SUCCESS;
-       } else {
-           // Otherwise, just block on the blocked_exceptions queue
-           // of the target thread.  The queue will get looked at
-           // soon enough: it is checked before and after running a
-           // thread, and during GC.
-           lockTSO(target);
-
-           // Avoid race with threadStackOverflow, which may have
-           // just moved this TSO.
-           if (target->what_next == ThreadRelocated) {
-               unlockTSO(target);
-               target = target->_link;
-               goto retry;
-           }
-            // check again for ThreadComplete and ThreadKilled.  This
-            // cooperates with scheduleHandleThreadFinished to ensure
-            // that we never miss any threads that are throwing an
-            // exception to a thread in the process of terminating.
-            if (target->what_next == ThreadComplete
-                || target->what_next == ThreadKilled) {
-               unlockTSO(target);
+       if (target_cap != cap) {
+            throwToSendMsg(cap, target_cap, msg);
+            return THROWTO_BLOCKED;
+        } else {
+            if ((target->flags & TSO_BLOCKEX) == 0) {
+                // It's on our run queue and not blocking exceptions
+                raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
                 return THROWTO_SUCCESS;
                 return THROWTO_SUCCESS;
+            } else {
+                blockedThrowTo(cap,target,msg);
+                return THROWTO_BLOCKED;
             }
             }
-           blockedThrowTo(cap,source,target);
-           *out = target;
-           return THROWTO_BLOCKED;
-       }
+        }
+    }
+
+    case BlockedOnMsgThrowTo:
+    {
+        Capability *target_cap;
+        const StgInfoTable *i;
+        MessageThrowTo *m;
+
+        m = target->block_info.throwto;
+
+        // target is local to this cap, but has sent a throwto
+        // message to another cap.
+        //
+        // The source message is locked.  We need to revoke the
+        // target's message so that we can raise the exception, so
+        // we attempt to lock it.
+
+        // There's a possibility of a deadlock if two threads are both
+        // trying to throwTo each other (or more generally, a cycle of
+        // threads).  To break the symmetry we compare the addresses
+        // of the MessageThrowTo objects, and the one for which m <
+        // msg gets to spin, while the other can only try to lock
+        // once, but must then back off and unlock both before trying
+        // again.
+        if (m < msg) {
+            i = lockClosure((StgClosure *)m);
+        } else {
+            i = tryLockClosure((StgClosure *)m);
+            if (i == NULL) {
+//            debugBelch("collision\n");
+                throwToSendMsg(cap, target->cap, msg);
+                return THROWTO_BLOCKED;
+            }
+        }
+
+        if (i != &stg_MSG_THROWTO_info) {
+            // if it's an IND, this TSO has been woken up by another Cap
+            unlockClosure((StgClosure*)m, i);
+            goto retry;
+        }
+
+        target_cap = target->cap;
+        if (target_cap != cap) {
+            unlockClosure((StgClosure*)m, i);
+            throwToSendMsg(cap, target_cap, msg);
+            return THROWTO_BLOCKED;
+        }
+
+       if ((target->flags & TSO_BLOCKEX) &&
+           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+            unlockClosure((StgClosure*)m, i);
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
+        }
+
+        // nobody else can wake up this TSO after we claim the message
+        unlockClosure((StgClosure*)m, &stg_IND_info);
+
+        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+        unblockOne(cap, target);
+        return THROWTO_SUCCESS;
     }
 
     case BlockedOnMVar:
     }
 
     case BlockedOnMVar:
@@ -322,14 +394,17 @@ check_target:
 
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
 
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockClosure((StgClosure *)target);
-           blockedThrowTo(cap,source,target);
+            Capability *target_cap = target->cap;
+            if (target->cap != cap) {
+                throwToSendMsg(cap,target_cap,msg);
+            } else {
+                blockedThrowTo(cap,target,msg);
+            }
            unlockClosure((StgClosure *)mvar, info);
            unlockClosure((StgClosure *)mvar, info);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
+           return THROWTO_BLOCKED;
        } else {
            removeThreadFromMVarQueue(cap, mvar, target);
        } else {
            removeThreadFromMVarQueue(cap, mvar, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unblockOne(cap, target);
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_SUCCESS;
            unblockOne(cap, target);
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_SUCCESS;
@@ -346,84 +421,23 @@ check_target:
        }
 
        if (target->flags & TSO_BLOCKEX) {
        }
 
        if (target->flags & TSO_BLOCKEX) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
+            Capability *target_cap = target->cap;
+            if (target->cap != cap) {
+                throwToSendMsg(cap,target_cap,msg);
+            } else {
+                blockedThrowTo(cap,target,msg);
+            }
            RELEASE_LOCK(&sched_mutex);
            RELEASE_LOCK(&sched_mutex);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
+           return THROWTO_BLOCKED; // caller releases lock
        } else {
            removeThreadFromQueue(cap, &blackhole_queue, target);
        } else {
            removeThreadFromQueue(cap, &blackhole_queue, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unblockOne(cap, target);
            RELEASE_LOCK(&sched_mutex);
            return THROWTO_SUCCESS;
        }
     }
 
            unblockOne(cap, target);
            RELEASE_LOCK(&sched_mutex);
            return THROWTO_SUCCESS;
        }
     }
 
-    case BlockedOnException:
-    {
-       StgTSO *target2;
-       StgInfoTable *info;
-
-       /*
-         To obtain exclusive access to a BlockedOnException thread,
-         we must call lockClosure() on the TSO on which it is blocked.
-         Since the TSO might change underneath our feet, after we
-         call lockClosure() we must check that 
-          
-             (a) the closure we locked is actually a TSO
-            (b) the original thread is still  BlockedOnException,
-            (c) the original thread is still blocked on the TSO we locked
-            and (d) the target thread has not been relocated.
-
-         We synchronise with threadStackOverflow() (which relocates
-         threads) using lockClosure()/unlockClosure().
-       */
-       target2 = target->block_info.tso;
-
-       info = lockClosure((StgClosure *)target2);
-       if (info != &stg_TSO_info) {
-           unlockClosure((StgClosure *)target2, info);
-           goto retry;
-       }
-       if (target->what_next == ThreadRelocated) {
-           target = target->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target2->what_next == ThreadRelocated) {
-           target->block_info.tso = target2->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target->why_blocked != BlockedOnException
-           || target->block_info.tso != target2) {
-           unlockTSO(target2);
-           goto retry;
-       }
-       
-       /* 
-          Now we have exclusive rights to the target TSO...
-
-          If it is blocking exceptions, add the source TSO to its
-          blocked_exceptions queue.  Otherwise, raise the exception.
-       */
-       if ((target->flags & TSO_BLOCKEX) &&
-           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
-           unlockTSO(target2);
-           *out = target;
-           return THROWTO_BLOCKED;
-       } else {
-           removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           unlockTSO(target2);
-           return THROWTO_SUCCESS;
-       }
-    }  
-
     case BlockedOnSTM:
        lockTSO(target);
        // Unblocking BlockedOnSTM threads requires the TSO to be
     case BlockedOnSTM:
        lockTSO(target);
        // Unblocking BlockedOnSTM threads requires the TSO to be
@@ -434,11 +448,16 @@ check_target:
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
-           *out = target;
+            Capability *target_cap = target->cap;
+            if (target->cap != cap) {
+                throwToSendMsg(cap,target_cap,msg);
+            } else {
+                blockedThrowTo(cap,target,msg);
+            }
+           unlockTSO(target);
            return THROWTO_BLOCKED;
        } else {
            return THROWTO_BLOCKED;
        } else {
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unblockOne(cap, target);
            unlockTSO(target);
            return THROWTO_SUCCESS;
            unblockOne(cap, target);
            unlockTSO(target);
            return THROWTO_SUCCESS;
@@ -446,19 +465,18 @@ check_target:
 
     case BlockedOnCCall:
     case BlockedOnCCall_NoUnblockExc:
 
     case BlockedOnCCall:
     case BlockedOnCCall_NoUnblockExc:
-       // I don't think it's possible to acquire ownership of a
-       // BlockedOnCCall thread.  We just assume that the target
-       // thread is blocking exceptions, and block on its
-       // blocked_exception queue.
-       lockTSO(target);
-       if (target->why_blocked != BlockedOnCCall &&
-           target->why_blocked != BlockedOnCCall_NoUnblockExc) {
-           unlockTSO(target);
-            goto retry;
-       }
-       blockedThrowTo(cap,source,target);
-       *out = target;
+    {
+        Capability *target_cap;
+
+        target_cap = target->cap;
+        if (target_cap != cap) {
+            throwToSendMsg(cap, target_cap, msg);
+            return THROWTO_BLOCKED;
+        }
+
+       blockedThrowTo(cap,target,msg);
        return THROWTO_BLOCKED;
        return THROWTO_BLOCKED;
+    }
 
 #ifndef THREADEDED_RTS
     case BlockedOnRead:
 
 #ifndef THREADEDED_RTS
     case BlockedOnRead:
@@ -469,11 +487,11 @@ check_target:
 #endif
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
 #endif
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
+           blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            return THROWTO_SUCCESS;
        }
 #endif
            return THROWTO_SUCCESS;
        }
 #endif
@@ -484,33 +502,34 @@ check_target:
     barf("throwTo");
 }
 
     barf("throwTo");
 }
 
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
 static void
 static void
-blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+                Capability *target_cap USED_IF_THREADS, 
+                MessageThrowTo *msg USED_IF_THREADS)
+            
 {
 {
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
-        setTSOLink(cap, source, target->blocked_exceptions);
-        target->blocked_exceptions = source;
-        dirty_TSO(cap,target); // we modified the blocked_exceptions queue
-        
-        source->block_info.tso = target;
-        write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
-        source->why_blocked = BlockedOnException;
-    }
-}
+#ifdef THREADED_RTS
+    debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
 
 
+    sendMessage(target_cap, (Message*)msg);
+#endif
+}
 
 
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue.  The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
 {
 {
-    unlockTSO((StgTSO *)tso);
+    debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+                  (unsigned long)target->id);
+
+    ASSERT(target->cap == cap);
+
+    msg->link = (Message*)target->blocked_exceptions;
+    target->blocked_exceptions = msg;
+    dirty_TSO(cap,target); // we modified the blocked_exceptions queue
 }
 }
-#endif
 
 /* -----------------------------------------------------------------------------
    Waking up threads blocked in throwTo
 
 /* -----------------------------------------------------------------------------
    Waking up threads blocked in throwTo
@@ -532,10 +551,11 @@ throwToReleaseTarget (void *tso)
 int
 maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
 int
 maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
-    StgTSO *source;
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
     
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
     
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
-        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
             return 1;
         } else {
             awakenBlockedExceptionQueue(cap,tso);
             return 1;
         } else {
@@ -543,32 +563,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
         }
     }
 
         }
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE && 
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && 
         (tso->flags & TSO_BLOCKEX) != 0) {
         debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
     }
 
         (tso->flags & TSO_BLOCKEX) != 0) {
         debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
 
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
 
-       // Lock the TSO, this gives us exclusive access to the queue
-       lockTSO(tso);
-
-       // Check the queue again; it might have changed before we
-       // locked it.
-       if (tso->blocked_exceptions == END_TSO_QUEUE) {
-           unlockTSO(tso);
-           return 0;
-       }
-
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
-       source = tso->blocked_exceptions;
-       performBlockedException(cap, source, tso);
-       tso->blocked_exceptions = unblockOne_(cap, source, 
-                                             rtsFalse/*no migrate*/);
-       unlockTSO(tso);
+    loop:
+        msg = tso->blocked_exceptions;
+        if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+        i = lockClosure((StgClosure*)msg);
+        tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+        if (i == &stg_IND_info) {
+            unlockClosure((StgClosure*)msg,i);
+            goto loop;
+        }
+
+        performBlockedException(cap, msg);
+        unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        unlockClosure((StgClosure*)msg,&stg_IND_info);
         return 1;
     }
     return 0;
         return 1;
     }
     return 0;
@@ -580,25 +598,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
-    lockTSO(tso);
-    awakenBlockedQueue(cap, tso->blocked_exceptions);
-    tso->blocked_exceptions = END_TSO_QUEUE;
-    unlockTSO(tso);
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
+
+    for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+         msg = (MessageThrowTo*)msg->link) {
+        i = lockClosure((StgClosure *)msg);
+        if (i != &stg_IND_info) {
+            unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        }
+        unlockClosure((StgClosure *)msg,i);
+    }
+    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
 }    
 
 static void
 }    
 
 static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
+performBlockedException (Capability *cap, MessageThrowTo *msg)
 {
 {
-    StgClosure *exception;
+    StgTSO *source;
+
+    source = msg->source;
 
 
-    ASSERT(source->why_blocked == BlockedOnException);
-    ASSERT(source->block_info.tso->id == target->id);
+    ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
+    ASSERT(source->block_info.closure == (StgClosure *)msg);
     ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
     ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
-    ASSERT(((StgTSO *)source->sp[1])->id == target->id);
+    ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
     // check ids not pointers, because the thread might be relocated
 
     // check ids not pointers, because the thread might be relocated
 
-    exception = (StgClosure *)source->sp[2];
-    throwToSingleThreaded(cap, target, exception);
+    throwToSingleThreaded(cap, msg->target, msg->exception);
     source->sp += 3;
 }
 
     source->sp += 3;
 }
 
@@ -637,22 +664,25 @@ removeFromQueues(Capability *cap, StgTSO *tso)
       removeThreadFromQueue(cap, &blackhole_queue, tso);
       goto done;
 
       removeThreadFromQueue(cap, &blackhole_queue, tso);
       goto done;
 
-  case BlockedOnException:
-    {
-      StgTSO *target  = tso->block_info.tso;
-
-      // NO: when called by threadPaused(), we probably have this
-      // TSO already locked (WHITEHOLEd) because we just placed
-      // ourselves on its queue.
-      // ASSERT(get_itbl(target)->type == TSO);
-
-      while (target->what_next == ThreadRelocated) {
-         target = target->_link;
-      }
-      
-      removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
-      goto done;
-    }
+  case BlockedOnMsgWakeup:
+  {
+      // kill the message, atomically:
+      tso->block_info.wakeup->header.info = &stg_IND_info;
+      break;
+  }
+
+  case BlockedOnMsgThrowTo:
+  {
+      MessageThrowTo *m = tso->block_info.throwto;
+      // The message is locked by us, unless we got here via
+      // deleteAllThreads(), in which case we own all the
+      // capabilities.
+      // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+      // unlock and revoke it at the same time
+      unlockClosure((StgClosure*)m,&stg_IND_info);
+      break;
+  }
 
 #if !defined(THREADED_RTS)
   case BlockedOnRead:
 
 #if !defined(THREADED_RTS)
   case BlockedOnRead:
@@ -743,6 +773,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     }
 #endif
 
     }
 #endif
 
+    while (tso->what_next == ThreadRelocated) {
+        tso = tso->_link;
+    }
+
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
index 96eb96e..5137d41 100644 (file)
@@ -29,16 +29,13 @@ void suspendComputation (Capability *cap,
                         StgTSO *tso, 
                         StgUpdateFrame *stop_here);
 
                         StgTSO *tso, 
                         StgUpdateFrame *stop_here);
 
-nat throwTo (Capability *cap,           // the Capability we hold 
-            StgTSO *source,             // the TSO sending the exception
-            StgTSO *target,             // the TSO receiving the exception
-            StgClosure *exception,      // the exception closure
-            /*[out]*/ void **out   // pass to throwToReleaseTarget()
-    );
+MessageThrowTo *throwTo (Capability *cap,      // the Capability we hold 
+                         StgTSO *source,
+                         StgTSO *target,
+                         StgClosure *exception); // the exception closure
 
 
-#ifdef THREADED_RTS
-void throwToReleaseTarget (void *tso);
-#endif
+nat throwToMsg (Capability *cap,
+                MessageThrowTo *msg);
 
 int  maybePerformBlockedException (Capability *cap, StgTSO *tso);
 void awakenBlockedExceptionQueue  (Capability *cap, StgTSO *tso);
 
 int  maybePerformBlockedException (Capability *cap, StgTSO *tso);
 void awakenBlockedExceptionQueue  (Capability *cap, StgTSO *tso);
@@ -52,7 +49,7 @@ interruptible(StgTSO *t)
 {
   switch (t->why_blocked) {
   case BlockedOnMVar:
 {
   switch (t->why_blocked) {
   case BlockedOnMVar:
-  case BlockedOnException:
+  case BlockedOnMsgThrowTo:
   case BlockedOnRead:
   case BlockedOnWrite:
 #if defined(mingw32_HOST_OS)
   case BlockedOnRead:
   case BlockedOnWrite:
 #if defined(mingw32_HOST_OS)
index 4fca19c..b7bc909 100644 (file)
@@ -509,7 +509,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
 
        // layout.payload.ptrs, no SRT
     case CONSTR:
 
        // layout.payload.ptrs, no SRT
     case CONSTR:
-    case STABLE_NAME:
+    case PRIM:
     case BCO:
     case CONSTR_STATIC:
        init_ptrs(&se.info, get_itbl(c)->layout.payload.ptrs,
     case BCO:
     case CONSTR_STATIC:
        init_ptrs(&se.info, get_itbl(c)->layout.payload.ptrs,
@@ -883,7 +883,7 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
        }
 
        case CONSTR:
        }
 
        case CONSTR:
-       case STABLE_NAME:
+       case PRIM:
        case BCO:
        case CONSTR_STATIC:
            // StgMutArrPtr.ptrs, no SRT
        case BCO:
        case CONSTR_STATIC:
            // StgMutArrPtr.ptrs, no SRT
@@ -1108,7 +1108,7 @@ isRetainer( StgClosure *c )
     case CONSTR_STATIC:
     case FUN_STATIC:
        // misc
     case CONSTR_STATIC:
     case FUN_STATIC:
        // misc
-    case STABLE_NAME:
+    case PRIM:
     case BCO:
     case ARR_WORDS:
        // STM
     case BCO:
     case ARR_WORDS:
        // STM
index e2a30a6..6e75abc 100644 (file)
@@ -9,6 +9,8 @@
 #include "PosixSource.h"
 #include "Rts.h"
 
 #include "PosixSource.h"
 #include "Rts.h"
 
+#include "eventlog/EventLog.h"
+
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
@@ -161,6 +163,10 @@ rtsFatalInternalErrorFn(const char *s, va_list ap)
      fflush(stderr);
   }
 
      fflush(stderr);
   }
 
+#ifdef TRACING
+  if (RtsFlags.TraceFlags.tracing == TRACE_EVENTLOG) endEventLogging();
+#endif
+
   abort();
   // stg_exit(EXIT_INTERNAL_ERROR);
 }
   abort();
   // stg_exit(EXIT_INTERNAL_ERROR);
 }
index ed5a722..be61538 100644 (file)
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -352,8 +352,7 @@ static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
 
 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
   StgClosure *c = q -> closure;
 
 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
   StgClosure *c = q -> closure;
-  StgInfoTable *info = get_itbl(c);
-  return (info -> type) == ATOMIC_INVARIANT;
+  return (c->header.info == &stg_ATOMIC_INVARIANT_info);
 }
 
 /*......................................................................*/
 }
 
 /*......................................................................*/
index 4cca469..70e0246 100644 (file)
@@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
+static void scheduleProcessInbox(Capability *cap);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 static void schedulePushWork(Capability *cap, Task *task);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 static void schedulePushWork(Capability *cap, Task *task);
@@ -618,7 +618,7 @@ scheduleFindWork (Capability *cap)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
-    scheduleCheckWakeupThreads(cap);
+    scheduleProcessInbox(cap);
 
     scheduleCheckBlockedThreads(cap);
 
 
     scheduleCheckBlockedThreads(cap);
 
@@ -673,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
     if (!force_yield &&
         !shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
     if (!force_yield &&
         !shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
-         !emptyWakeupQueue(cap) ||
+         !emptyInbox(cap) ||
          blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
          blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
@@ -725,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
        cap0 = &capabilities[i];
        if (cap != cap0 && tryGrabCapability(cap0,task)) {
     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
        cap0 = &capabilities[i];
        if (cap != cap0 && tryGrabCapability(cap0,task)) {
-           if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+           if (!emptyRunQueue(cap0)
+                || cap->returning_tasks_hd != NULL
+                || cap->inbox != (Message*)END_TSO_QUEUE) {
                // it already has some work, we just grabbed it at 
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
                // it already has some work, we just grabbed it at 
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
@@ -871,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
  * Check for threads woken up by other Capabilities
  * ------------------------------------------------------------------------- */
 
  * Check for threads woken up by other Capabilities
  * ------------------------------------------------------------------------- */
 
+#if defined(THREADED_RTS)
+static void
+executeMessage (Capability *cap, Message *m)
+{
+    const StgInfoTable *i;
+
+loop:
+    write_barrier(); // allow m->header to be modified by another thread
+    i = m->header.info;
+    if (i == &stg_MSG_WAKEUP_info)
+    {
+        MessageWakeup *w = (MessageWakeup *)m;
+        StgTSO *tso = w->tso;
+        debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
+                      (lnat)tso->id);
+        ASSERT(tso->cap == cap);
+        ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+        ASSERT(tso->block_info.closure == (StgClosure *)m);
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap, tso);
+    }
+    else if (i == &stg_MSG_THROWTO_info)
+    {
+        MessageThrowTo *t = (MessageThrowTo *)m;
+        nat r;
+        const StgInfoTable *i;
+
+        i = lockClosure((StgClosure*)m);
+        if (i != &stg_MSG_THROWTO_info) {
+            unlockClosure((StgClosure*)m, i);
+            goto loop;
+        }
+
+        debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
+                      (lnat)t->source->id, (lnat)t->target->id);
+
+        ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+        ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+        r = throwToMsg(cap, t);
+
+        switch (r) {
+        case THROWTO_SUCCESS:
+            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+            t->source->sp += 3;
+            unblockOne(cap, t->source);
+            // this message is done
+            unlockClosure((StgClosure*)m, &stg_IND_info);
+            break;
+        case THROWTO_BLOCKED:
+            // unlock the message
+            unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+            break;
+        }
+    }
+    else if (i == &stg_IND_info)
+    {
+        // message was revoked
+        return;
+    }
+    else if (i == &stg_WHITEHOLE_info)
+    {
+        goto loop;
+    }
+    else
+    {
+        barf("executeMessage: %p", i);
+    }
+}
+#endif
+
 static void
 static void
-scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
 {
 #if defined(THREADED_RTS)
-    // Any threads that were woken up by other Capabilities get
-    // appended to our run queue.
-    if (!emptyWakeupQueue(cap)) {
-       ACQUIRE_LOCK(&cap->lock);
-       if (emptyRunQueue(cap)) {
-           cap->run_queue_hd = cap->wakeup_queue_hd;
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       } else {
-           setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       }
-       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
-       RELEASE_LOCK(&cap->lock);
+    Message *m;
+
+    while (!emptyInbox(cap)) {
+        ACQUIRE_LOCK(&cap->lock);
+        m = cap->inbox;
+        cap->inbox = m->link;
+        RELEASE_LOCK(&cap->lock);
+        executeMessage(cap, (Message *)m);
     }
 #endif
 }
     }
 #endif
 }
@@ -983,7 +1051,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
            switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
            switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
-           case BlockedOnException:
+           case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
                throwToSingleThreaded(cap, task->incall->tso, 
                                      (StgClosure *)nonTermination_closure);
            case BlockedOnMVar:
                throwToSingleThreaded(cap, task->incall->tso, 
                                      (StgClosure *)nonTermination_closure);
@@ -1268,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
      */
 
     // blocked exceptions can now complete, even if the thread was in
      */
 
     // blocked exceptions can now complete, even if the thread was in
-    // blocked mode (see #2910).  This unconditionally calls
-    // lockTSO(), which ensures that we don't miss any threads that
-    // are engaged in throwTo() with this thread as a target.
+    // blocked mode (see #2910).
     awakenBlockedExceptionQueue (cap, t);
 
       //
     awakenBlockedExceptionQueue (cap, t);
 
       //
@@ -1884,7 +1950,7 @@ resumeThread (void *task_)
     
     if (tso->why_blocked == BlockedOnCCall) {
         // avoid locking the TSO if we don't have to
     
     if (tso->why_blocked == BlockedOnCCall) {
         // avoid locking the TSO if we don't have to
-        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
         }
        tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
             awakenBlockedExceptionQueue(cap,tso);
         }
        tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
@@ -2187,10 +2253,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
 
   IF_DEBUG(sanity,checkTSO(tso));
 
 
   IF_DEBUG(sanity,checkTSO(tso));
 
-  // don't allow throwTo() to modify the blocked_exceptions queue
-  // while we are moving the TSO:
-  lockClosure((StgClosure *)tso);
-
   if (tso->stack_size >= tso->max_stack_size
       && !(tso->flags & TSO_BLOCKEX)) {
       // NB. never raise a StackOverflow exception if the thread is
   if (tso->stack_size >= tso->max_stack_size
       && !(tso->flags & TSO_BLOCKEX)) {
       // NB. never raise a StackOverflow exception if the thread is
@@ -2201,7 +2263,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
       //
 
       if (tso->flags & TSO_SQUEEZED) {
       //
 
       if (tso->flags & TSO_SQUEEZED) {
-          unlockTSO(tso);
           return tso;
       }
       // #3677: In a stack overflow situation, stack squeezing may
           return tso;
       }
       // #3677: In a stack overflow situation, stack squeezing may
@@ -2223,7 +2284,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
                                                tso->sp+64)));
 
       // Send this thread the StackOverflow exception
                                                tso->sp+64)));
 
       // Send this thread the StackOverflow exception
-      unlockTSO(tso);
       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
       return tso;
   }
       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
       return tso;
   }
@@ -2239,7 +2299,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   // the stack anyway.
   if ((tso->flags & TSO_SQUEEZED) && 
       ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
   // the stack anyway.
   if ((tso->flags & TSO_SQUEEZED) && 
       ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
-      unlockTSO(tso);
       return tso;
   }
 
       return tso;
   }
 
@@ -2289,9 +2348,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   tso->sp = (P_)&(tso->stack[tso->stack_size]);
   tso->why_blocked = NotBlocked;
 
   tso->sp = (P_)&(tso->stack[tso->stack_size]);
   tso->why_blocked = NotBlocked;
 
-  unlockTSO(dest);
-  unlockTSO(tso);
-
   IF_DEBUG(sanity,checkTSO(dest));
 #if 0
   IF_DEBUG(scheduler,printTSO(dest));
   IF_DEBUG(sanity,checkTSO(dest));
 #if 0
   IF_DEBUG(scheduler,printTSO(dest));
@@ -2324,10 +2380,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
         return tso;
     }
 
         return tso;
     }
 
-    // don't allow throwTo() to modify the blocked_exceptions queue
-    // while we are moving the TSO:
-    lockClosure((StgClosure *)tso);
-
     // this is the number of words we'll free
     free_w = round_to_mblocks(tso_size_w/2);
 
     // this is the number of words we'll free
     free_w = round_to_mblocks(tso_size_w/2);
 
@@ -2358,9 +2410,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
         task->incall->tso = new_tso;
     }
 
         task->incall->tso = new_tso;
     }
 
-    unlockTSO(new_tso);
-    unlockTSO(tso);
-
     IF_DEBUG(sanity,checkTSO(new_tso));
 
     return new_tso;
     IF_DEBUG(sanity,checkTSO(new_tso));
 
     return new_tso;
@@ -2691,61 +2740,9 @@ resurrectThreads (StgTSO *threads)
             * can wake up threads, remember...).
             */
            continue;
             * can wake up threads, remember...).
             */
            continue;
-       case BlockedOnException:
-            // throwTo should never block indefinitely: if the target
-            // thread dies or completes, throwTo returns.
-           barf("resurrectThreads: thread BlockedOnException");
-            break;
        default:
        default:
-           barf("resurrectThreads: thread blocked in a strange way");
+           barf("resurrectThreads: thread blocked in a strange way: %d",
+                 tso->why_blocked);
        }
     }
 }
        }
     }
 }
-
-/* -----------------------------------------------------------------------------
-   performPendingThrowTos is called after garbage collection, and
-   passed a list of threads that were found to have pending throwTos
-   (tso->blocked_exceptions was not empty), and were blocked.
-   Normally this doesn't happen, because we would deliver the
-   exception directly if the target thread is blocked, but there are
-   small windows where it might occur on a multiprocessor (see
-   throwTo()).
-
-   NB. we must be holding all the capabilities at this point, just
-   like resurrectThreads().
-   -------------------------------------------------------------------------- */
-
-void
-performPendingThrowTos (StgTSO *threads)
-{
-    StgTSO *tso, *next;
-    Capability *cap;
-    Task *task, *saved_task;;
-    generation *gen;
-
-    task = myTask();
-    cap = task->cap;
-
-    for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
-       next = tso->global_link;
-
-        gen = Bdescr((P_)tso)->gen;
-       tso->global_link = gen->threads;
-       gen->threads = tso;
-
-       debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
-       
-        // We must pretend this Capability belongs to the current Task
-        // for the time being, as invariants will be broken otherwise.
-        // In fact the current Task has exclusive access to the systme
-        // at this point, so this is just bookkeeping:
-       task->cap = tso->cap;
-        saved_task = tso->cap->running_task;
-        tso->cap->running_task = task;
-        maybePerformBlockedException(tso->cap, tso);
-        tso->cap->running_task = saved_task;
-    }
-
-    // Restore our original Capability:
-    task->cap = cap;
-}
index af322d8..76138b6 100644 (file)
@@ -105,7 +105,6 @@ extern Mutex sched_mutex;
 void interruptStgRts (void);
 
 void resurrectThreads (StgTSO *);
 void interruptStgRts (void);
 
 void resurrectThreads (StgTSO *);
-void performPendingThrowTos (StgTSO *);
 
 /* -----------------------------------------------------------------------------
  * Some convenient macros/inline functions...
 
 /* -----------------------------------------------------------------------------
  * Some convenient macros/inline functions...
@@ -179,25 +178,6 @@ appendToBlockedQueue(StgTSO *tso)
 }
 #endif
 
 }
 #endif
 
-#if defined(THREADED_RTS)
-// Assumes: my_cap is owned by the current Task.  We hold
-// other_cap->lock, but we do not necessarily own other_cap; another
-// Task may be running on it.
-INLINE_HEADER void
-appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso)
-{
-    ASSERT(tso->_link == END_TSO_QUEUE);
-    if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) {
-       other_cap->wakeup_queue_hd = tso;
-    } else {
-        // my_cap is passed to setTSOLink() because it may need to
-        // write to the mutable list.
-       setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso);
-    }
-    other_cap->wakeup_queue_tl = tso;
-}
-#endif
-
 /* Check whether various thread queues are empty
  */
 INLINE_HEADER rtsBool
 /* Check whether various thread queues are empty
  */
 INLINE_HEADER rtsBool
@@ -212,14 +192,6 @@ emptyRunQueue(Capability *cap)
     return emptyQueue(cap->run_queue_hd);
 }
 
     return emptyQueue(cap->run_queue_hd);
 }
 
-#if defined(THREADED_RTS)
-INLINE_HEADER rtsBool
-emptyWakeupQueue(Capability *cap)
-{
-    return emptyQueue(cap->wakeup_queue_hd);
-}
-#endif
-
 #if !defined(THREADED_RTS)
 #define EMPTY_BLOCKED_QUEUE()  (emptyQueue(blocked_queue_hd))
 #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
 #if !defined(THREADED_RTS)
 #define EMPTY_BLOCKED_QUEUE()  (emptyQueue(blocked_queue_hd))
 #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
index 8fd96c1..f111875 100644 (file)
@@ -419,7 +419,7 @@ CLOSURE(stg_NO_FINALIZER_closure,stg_NO_FINALIZER);
    Stable Names are unlifted too.
    ------------------------------------------------------------------------- */
 
    Stable Names are unlifted too.
    ------------------------------------------------------------------------- */
 
-INFO_TABLE(stg_STABLE_NAME,0,1,STABLE_NAME,"STABLE_NAME","STABLE_NAME")
+INFO_TABLE(stg_STABLE_NAME,0,1,PRIM,"STABLE_NAME","STABLE_NAME")
 { foreign "C" barf("STABLE_NAME object entered!") never returns; }
 
 /* ----------------------------------------------------------------------------
 { foreign "C" barf("STABLE_NAME object entered!") never returns; }
 
 /* ----------------------------------------------------------------------------
@@ -439,22 +439,22 @@ INFO_TABLE(stg_MVAR_DIRTY,3,0,MVAR_DIRTY,"MVAR","MVAR")
    STM
    -------------------------------------------------------------------------- */
 
    STM
    -------------------------------------------------------------------------- */
 
-INFO_TABLE(stg_TVAR, 0, 0, TVAR, "TVAR", "TVAR")
+INFO_TABLE(stg_TVAR, 2, 1, MUT_PRIM, "TVAR", "TVAR")
 { foreign "C" barf("TVAR object entered!") never returns; }
 
 { foreign "C" barf("TVAR object entered!") never returns; }
 
-INFO_TABLE(stg_TVAR_WATCH_QUEUE, 0, 0, TVAR_WATCH_QUEUE, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE")
+INFO_TABLE(stg_TVAR_WATCH_QUEUE, 3, 0, MUT_PRIM, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE")
 { foreign "C" barf("TVAR_WATCH_QUEUE object entered!") never returns; }
 
 { foreign "C" barf("TVAR_WATCH_QUEUE object entered!") never returns; }
 
-INFO_TABLE(stg_ATOMIC_INVARIANT, 0, 0, ATOMIC_INVARIANT, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT")
+INFO_TABLE(stg_ATOMIC_INVARIANT, 2, 1, MUT_PRIM, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT")
 { foreign "C" barf("ATOMIC_INVARIANT object entered!") never returns; }
 
 { foreign "C" barf("ATOMIC_INVARIANT object entered!") never returns; }
 
-INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 0, 0, INVARIANT_CHECK_QUEUE, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE")
+INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 3, 0, MUT_PRIM, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE")
 { foreign "C" barf("INVARIANT_CHECK_QUEUE object entered!") never returns; }
 
 INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK")
 { foreign "C" barf("TREC_CHUNK object entered!") never returns; }
 
 { foreign "C" barf("INVARIANT_CHECK_QUEUE object entered!") never returns; }
 
 INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK")
 { foreign "C" barf("TREC_CHUNK object entered!") never returns; }
 
-INFO_TABLE(stg_TREC_HEADER, 0, 0, TREC_HEADER, "TREC_HEADER", "TREC_HEADER")
+INFO_TABLE(stg_TREC_HEADER, 3, 1, MUT_PRIM, "TREC_HEADER", "TREC_HEADER")
 { foreign "C" barf("TREC_HEADER object entered!") never returns; }
 
 INFO_TABLE_CONSTR(stg_END_STM_WATCH_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WATCH_QUEUE","END_STM_WATCH_QUEUE")
 { foreign "C" barf("TREC_HEADER object entered!") never returns; }
 
 INFO_TABLE_CONSTR(stg_END_STM_WATCH_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WATCH_QUEUE","END_STM_WATCH_QUEUE")
@@ -478,6 +478,17 @@ CLOSURE(stg_END_STM_CHUNK_LIST_closure,stg_END_STM_CHUNK_LIST);
 CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
 
 /* ----------------------------------------------------------------------------
 CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
 
 /* ----------------------------------------------------------------------------
+   Messages
+   ------------------------------------------------------------------------- */
+
+// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC.
+INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
+{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
+
+INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
+{ foreign "C" barf("MSG_THROWTO object entered!") never returns; }
+
+/* ----------------------------------------------------------------------------
    END_TSO_QUEUE
 
    This is a static nullary constructor (like []) that we use to mark the
    END_TSO_QUEUE
 
    This is a static nullary constructor (like []) that we use to mark the
@@ -490,18 +501,6 @@ INFO_TABLE_CONSTR(stg_END_TSO_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_TSO_QUEUE","E
 CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE);
 
 /* ----------------------------------------------------------------------------
 CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE);
 
 /* ----------------------------------------------------------------------------
-   Exception lists
-   ------------------------------------------------------------------------- */
-
-INFO_TABLE_CONSTR(stg_END_EXCEPTION_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_EXCEPTION_LIST","END_EXCEPTION_LIST")
-{ foreign "C" barf("END_EXCEPTION_LIST object entered!") never returns; }
-
-CLOSURE(stg_END_EXCEPTION_LIST_closure,stg_END_EXCEPTION_LIST);
-
-INFO_TABLE(stg_EXCEPTION_CONS,1,1,CONSTR,"EXCEPTION_CONS","EXCEPTION_CONS")
-{ foreign "C" barf("EXCEPTION_CONS object entered!") never returns; }
-
-/* ----------------------------------------------------------------------------
    Arrays
 
    These come in two basic flavours: arrays of data (StgArrWords) and arrays of
    Arrays
 
    These come in two basic flavours: arrays of data (StgArrWords) and arrays of
index 08b7aab..f824d02 100644 (file)
@@ -74,7 +74,7 @@ createThread(Capability *cap, nat size)
     tso->what_next = ThreadRunGHC;
 
     tso->why_blocked  = NotBlocked;
     tso->what_next = ThreadRunGHC;
 
     tso->why_blocked  = NotBlocked;
-    tso->blocked_exceptions = END_TSO_QUEUE;
+    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
     tso->flags = 0;
     tso->dirty = 1;
     
     tso->flags = 0;
     tso->dirty = 1;
     
@@ -218,8 +218,9 @@ unblockOne_ (Capability *cap, StgTSO *tso,
 
   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
 
   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
+  ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+         tso->block_info.closure->header.info == &stg_IND_info);
 
 
-  tso->why_blocked = NotBlocked;
   next = tso->_link;
   tso->_link = END_TSO_QUEUE;
 
   next = tso->_link;
   tso->_link = END_TSO_QUEUE;
 
@@ -235,6 +236,8 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       }
 
       tso->cap = cap;
       }
 
       tso->cap = cap;
+      write_barrier();
+      tso->why_blocked = NotBlocked;
       appendToRunQueue(cap,tso);
 
       // context-switch soonish so we can migrate the new thread if
       appendToRunQueue(cap,tso);
 
       // context-switch soonish so we can migrate the new thread if
@@ -246,6 +249,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       wakeupThreadOnCapability(cap, tso->cap, tso);
   }
 #else
       wakeupThreadOnCapability(cap, tso->cap, tso);
   }
 #else
+  tso->why_blocked = NotBlocked;
   appendToRunQueue(cap,tso);
 
   // context-switch soonish so we can migrate the new thread if
   appendToRunQueue(cap,tso);
 
   // context-switch soonish so we can migrate the new thread if
@@ -327,13 +331,15 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnMVar:
     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
     break;
   case BlockedOnMVar:
     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
     break;
-  case BlockedOnException:
-    debugBelch("is blocked on delivering an exception to thread %lu",
-              (unsigned long)tso->block_info.tso->id);
-    break;
   case BlockedOnBlackHole:
     debugBelch("is blocked on a black hole");
     break;
   case BlockedOnBlackHole:
     debugBelch("is blocked on a black hole");
     break;
+  case BlockedOnMsgWakeup:
+    debugBelch("is blocked on a wakeup message");
+    break;
+  case BlockedOnMsgThrowTo:
+    debugBelch("is blocked on a throwto message");
+    break;
   case NotBlocked:
     debugBelch("is not blocked");
     break;
   case NotBlocked:
     debugBelch("is not blocked");
     break;
index 8e0ee26..dfe879e 100644 (file)
@@ -11,6 +11,8 @@
 
 BEGIN_RTS_PRIVATE
 
 
 BEGIN_RTS_PRIVATE
 
+#define END_BLOCKED_EXCEPTIONS_QUEUE ((MessageThrowTo*)END_TSO_QUEUE)
+
 StgTSO * unblockOne (Capability *cap, StgTSO *tso);
 StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
 
 StgTSO * unblockOne (Capability *cap, StgTSO *tso);
 StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
 
index f8b6ad4..69ea3d3 100644 (file)
@@ -135,6 +135,15 @@ void traceUserMsg(Capability *cap, char *msg);
 #define debugTrace(class, str, ...) /* nothing */
 #endif
 
 #define debugTrace(class, str, ...) /* nothing */
 #endif
 
+#ifdef DEBUG
+#define debugTraceCap(class, cap, msg, ...)      \
+    if (RTS_UNLIKELY(class)) {                  \
+        traceCap_(cap, msg, ##__VA_ARGS__);     \
+    }
+#else
+#define debugTraceCap(class, cap, str, ...) /* nothing */
+#endif
+
 /* 
  * Emit a message/event describing the state of a thread
  */
 /* 
  * Emit a message/event describing the state of a thread
  */
@@ -152,6 +161,7 @@ void traceThreadStatus_ (StgTSO *tso);
 #define traceCap(class, cap, msg, ...) /* nothing */
 #define trace(class, msg, ...) /* nothing */
 #define debugTrace(class, str, ...) /* nothing */
 #define traceCap(class, cap, msg, ...) /* nothing */
 #define trace(class, msg, ...) /* nothing */
 #define debugTrace(class, str, ...) /* nothing */
+#define debugTraceCap(class, cap, str, ...) /* nothing */
 #define traceThreadStatus(class, tso) /* nothing */
 
 #endif /* TRACING */
 #define traceThreadStatus(class, tso) /* nothing */
 
 #endif /* TRACING */
index fd87820..6ebf33d 100644 (file)
@@ -59,7 +59,7 @@ INLINE_HEADER void postMsg (char *msg STG_UNUSED,
                             va_list ap STG_UNUSED)
 { /* nothing */ }
 
                             va_list ap STG_UNUSED)
 { /* nothing */ }
 
-INLINE_HEADER void postCapMsg (Capability *cap,
+INLINE_HEADER void postCapMsg (Capability *cap STG_UNUSED,
                                char *msg STG_UNUSED, 
                                va_list ap STG_UNUSED)
 { /* nothing */ }
                                char *msg STG_UNUSED, 
                                va_list ap STG_UNUSED)
 { /* nothing */ }
index e55ae2b..39284f9 100644 (file)
@@ -471,7 +471,8 @@ thread_TSO (StgTSO *tso)
 
     if (   tso->why_blocked == BlockedOnMVar
        || tso->why_blocked == BlockedOnBlackHole
 
     if (   tso->why_blocked == BlockedOnMVar
        || tso->why_blocked == BlockedOnBlackHole
-       || tso->why_blocked == BlockedOnException
+       || tso->why_blocked == BlockedOnMsgThrowTo
+       || tso->why_blocked == BlockedOnMsgWakeup
        ) {
        thread_(&tso->block_info.closure);
     }
        ) {
        thread_(&tso->block_info.closure);
     }
@@ -622,7 +623,8 @@ thread_obj (StgInfoTable *info, StgPtr p)
 
     case FUN:
     case CONSTR:
 
     case FUN:
     case CONSTR:
-    case STABLE_NAME:
+    case PRIM:
+    case MUT_PRIM:
     case IND_PERM:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
     case IND_PERM:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
@@ -705,32 +707,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
     case TSO:
        return thread_TSO((StgTSO *)p);
     
     case TSO:
        return thread_TSO((StgTSO *)p);
     
-    case TVAR_WATCH_QUEUE:
-    {
-        StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
-       thread_(&wq->closure);
-       thread_(&wq->next_queue_entry);
-       thread_(&wq->prev_queue_entry);
-       return p + sizeofW(StgTVarWatchQueue);
-    }
-    
-    case TVAR:
-    {
-        StgTVar *tvar = (StgTVar *)p;
-       thread((void *)&tvar->current_value);
-       thread((void *)&tvar->first_watch_queue_entry);
-       return p + sizeofW(StgTVar);
-    }
-    
-    case TREC_HEADER:
-    {
-        StgTRecHeader *trec = (StgTRecHeader *)p;
-       thread_(&trec->enclosing_trec);
-       thread_(&trec->current_chunk);
-       thread_(&trec->invariants_to_check);
-       return p + sizeofW(StgTRecHeader);
-    }
-
     case TREC_CHUNK:
     {
         StgWord i;
     case TREC_CHUNK:
     {
         StgWord i;
@@ -745,23 +721,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
        return p + sizeofW(StgTRecChunk);
     }
 
        return p + sizeofW(StgTRecChunk);
     }
 
-    case ATOMIC_INVARIANT:
-    {
-        StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
-       thread_(&invariant->code);
-       thread_(&invariant->last_execution);
-       return p + sizeofW(StgAtomicInvariant);
-    }
-
-    case INVARIANT_CHECK_QUEUE:
-    {
-        StgInvariantCheckQueue *queue = (StgInvariantCheckQueue *)p;
-       thread_(&queue->invariant);
-       thread_(&queue->my_execution);
-       thread_(&queue->next_queue_entry);
-       return p + sizeofW(StgInvariantCheckQueue);
-    }
-
     default:
        barf("update_fwd: unknown/strange object  %d", (int)(info->type));
        return NULL;
     default:
        barf("update_fwd: unknown/strange object  %d", (int)(info->type));
        return NULL;
index 76026b0..21017a6 100644 (file)
@@ -626,7 +626,8 @@ loop:
       return;
 
   case WEAK:
       return;
 
   case WEAK:
-  case STABLE_NAME:
+  case PRIM:
+  case MUT_PRIM:
       copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
       return;
 
       copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
       return;
 
@@ -721,30 +722,10 @@ loop:
       }
     }
 
       }
     }
 
-  case TREC_HEADER: 
-      copy(p,info,q,sizeofW(StgTRecHeader),gen);
-      return;
-
-  case TVAR_WATCH_QUEUE:
-      copy(p,info,q,sizeofW(StgTVarWatchQueue),gen);
-      return;
-
-  case TVAR:
-      copy(p,info,q,sizeofW(StgTVar),gen);
-      return;
-    
   case TREC_CHUNK:
       copy(p,info,q,sizeofW(StgTRecChunk),gen);
       return;
 
   case TREC_CHUNK:
       copy(p,info,q,sizeofW(StgTRecChunk),gen);
       return;
 
-  case ATOMIC_INVARIANT:
-      copy(p,info,q,sizeofW(StgAtomicInvariant),gen);
-      return;
-
-  case INVARIANT_CHECK_QUEUE:
-      copy(p,info,q,sizeofW(StgInvariantCheckQueue),gen);
-      return;
-
   default:
     barf("evacuate: strange closure type %d", (int)(INFO_PTR_TO_STRUCT(info)->type));
   }
   default:
     barf("evacuate: strange closure type %d", (int)(INFO_PTR_TO_STRUCT(info)->type));
   }
index 2eabdab..ae6fc99 100644 (file)
@@ -732,7 +732,6 @@ SET_GCT(gc_threads[0]);
   // send exceptions to any threads which were about to die 
   RELEASE_SM_LOCK;
   resurrectThreads(resurrected_threads);
   // send exceptions to any threads which were about to die 
   RELEASE_SM_LOCK;
   resurrectThreads(resurrected_threads);
-  performPendingThrowTos(exception_threads);
   ACQUIRE_SM_LOCK;
 
   // Update the stable pointer hash table.
   ACQUIRE_SM_LOCK;
 
   // Update the stable pointer hash table.
index 7b7187c..9df39b9 100644 (file)
@@ -22,6 +22,7 @@
 #include "Schedule.h"
 #include "Weak.h"
 #include "Storage.h"
 #include "Schedule.h"
 #include "Weak.h"
 #include "Storage.h"
+#include "Threads.h"
 
 /* -----------------------------------------------------------------------------
    Weak Pointers
 
 /* -----------------------------------------------------------------------------
    Weak Pointers
@@ -80,9 +81,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list
 // List of threads found to be unreachable
 StgTSO *resurrected_threads;
 
 // List of threads found to be unreachable
 StgTSO *resurrected_threads;
 
-// List of blocked threads found to have pending throwTos
-StgTSO *exception_threads;
-
 static void resurrectUnreachableThreads (generation *gen);
 static rtsBool tidyThreadList (generation *gen);
 
 static void resurrectUnreachableThreads (generation *gen);
 static rtsBool tidyThreadList (generation *gen);
 
@@ -93,7 +91,6 @@ initWeakForGC(void)
     weak_ptr_list = NULL;
     weak_stage = WeakPtrs;
     resurrected_threads = END_TSO_QUEUE;
     weak_ptr_list = NULL;
     weak_stage = WeakPtrs;
     resurrected_threads = END_TSO_QUEUE;
-    exception_threads = END_TSO_QUEUE;
 }
 
 rtsBool 
 }
 
 rtsBool 
@@ -286,35 +283,11 @@ static rtsBool tidyThreadList (generation *gen)
         
         next = t->global_link;
         
         
         next = t->global_link;
         
-        // This is a good place to check for blocked
-        // exceptions.  It might be the case that a thread is
-        // blocked on delivering an exception to a thread that
-        // is also blocked - we try to ensure that this
-        // doesn't happen in throwTo(), but it's too hard (or
-        // impossible) to close all the race holes, so we
-        // accept that some might get through and deal with
-        // them here.  A GC will always happen at some point,
-        // even if the system is otherwise deadlocked.
-        //
-        // If an unreachable thread has blocked
-        // exceptions, we really want to perform the
-        // blocked exceptions rather than throwing
-        // BlockedIndefinitely exceptions.  This is the
-        // only place we can discover such threads.
-        // The target thread might even be
-        // ThreadFinished or ThreadKilled.  Bugs here
-        // will only be seen when running on a
-        // multiprocessor.
-        if (t->blocked_exceptions != END_TSO_QUEUE) {
-            if (tmp == NULL) {
-                evacuate((StgClosure **)&t);
-                flag = rtsTrue;
-            }
-            t->global_link = exception_threads;
-            exception_threads = t;
-            *prev = next;
-            continue;
-        }
+        // if the thread is not masking exceptions but there are
+        // pending exceptions on its queue, then something has gone
+        // wrong:
+        ASSERT(t->blocked_exceptions == END_BLOCKED_EXCEPTIONS_QUEUE
+               || (t->flags & TSO_BLOCKEX));
         
         if (tmp == NULL) {
             // not alive (yet): leave this thread on the
         
         if (tmp == NULL) {
             // not alive (yet): leave this thread on the
index 442fee1..11d5424 100644 (file)
@@ -307,7 +307,8 @@ checkClosure( StgClosure* p )
     case IND_OLDGEN_PERM:
     case BLACKHOLE:
     case CAF_BLACKHOLE:
     case IND_OLDGEN_PERM:
     case BLACKHOLE:
     case CAF_BLACKHOLE:
-    case STABLE_NAME:
+    case PRIM:
+    case MUT_PRIM:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
     case CONSTR_STATIC:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
     case CONSTR_STATIC:
@@ -416,39 +417,6 @@ checkClosure( StgClosure* p )
         checkTSO((StgTSO *)p);
         return tso_sizeW((StgTSO *)p);
 
         checkTSO((StgTSO *)p);
         return tso_sizeW((StgTSO *)p);
 
-    case TVAR_WATCH_QUEUE:
-      {
-        StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->next_queue_entry));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->prev_queue_entry));
-        return sizeofW(StgTVarWatchQueue);
-      }
-
-    case INVARIANT_CHECK_QUEUE:
-      {
-        StgInvariantCheckQueue *q = (StgInvariantCheckQueue *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->invariant));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->my_execution));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->next_queue_entry));
-        return sizeofW(StgInvariantCheckQueue);
-      }
-
-    case ATOMIC_INVARIANT:
-      {
-        StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->code));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->last_execution));
-        return sizeofW(StgAtomicInvariant);
-      }
-
-    case TVAR:
-      {
-        StgTVar *tv = (StgTVar *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->current_value));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_watch_queue_entry));
-        return sizeofW(StgTVar);
-      }
-
     case TREC_CHUNK:
       {
         nat i;
     case TREC_CHUNK:
       {
         nat i;
@@ -461,14 +429,6 @@ checkClosure( StgClosure* p )
         }
         return sizeofW(StgTRecChunk);
       }
         }
         return sizeofW(StgTRecChunk);
       }
-
-    case TREC_HEADER:
-      {
-        StgTRecHeader *trec = (StgTRecHeader *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> enclosing_trec));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> current_chunk));
-        return sizeofW(StgTRecHeader);
-      }
       
     default:
            barf("checkClosure (closure type %d)", info->type);
       
     default:
            barf("checkClosure (closure type %d)", info->type);
index 466b9b4..1b671a0 100644 (file)
@@ -82,7 +82,8 @@ scavengeTSO (StgTSO *tso)
 
     if (   tso->why_blocked == BlockedOnMVar
        || tso->why_blocked == BlockedOnBlackHole
 
     if (   tso->why_blocked == BlockedOnMVar
        || tso->why_blocked == BlockedOnBlackHole
-       || tso->why_blocked == BlockedOnException
+       || tso->why_blocked == BlockedOnMsgWakeup
+       || tso->why_blocked == BlockedOnMsgThrowTo
        ) {
        evacuate(&tso->block_info.closure);
     }
        ) {
        evacuate(&tso->block_info.closure);
     }
@@ -394,7 +395,6 @@ scavenge_block (bdescr *bd)
 {
   StgPtr p, q;
   StgInfoTable *info;
 {
   StgPtr p, q;
   StgInfoTable *info;
-  generation *saved_evac_gen;
   rtsBool saved_eager_promotion;
   gen_workspace *ws;
 
   rtsBool saved_eager_promotion;
   gen_workspace *ws;
 
@@ -403,7 +403,6 @@ scavenge_block (bdescr *bd)
 
   gct->scan_bd = bd;
   gct->evac_gen = bd->gen;
 
   gct->scan_bd = bd;
   gct->evac_gen = bd->gen;
-  saved_evac_gen = gct->evac_gen;
   saved_eager_promotion = gct->eager_promotion;
   gct->failed_to_evac = rtsFalse;
 
   saved_eager_promotion = gct->eager_promotion;
   gct->failed_to_evac = rtsFalse;
 
@@ -532,7 +531,7 @@ scavenge_block (bdescr *bd)
     gen_obj:
     case CONSTR:
     case WEAK:
     gen_obj:
     case CONSTR:
     case WEAK:
-    case STABLE_NAME:
+    case PRIM:
     {
        StgPtr end;
 
     {
        StgPtr end;
 
@@ -672,42 +671,21 @@ scavenge_block (bdescr *bd)
        break;
     }
 
        break;
     }
 
-    case TVAR_WATCH_QUEUE:
+    case MUT_PRIM:
       {
       {
-       StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
-       gct->evac_gen = 0;
-       evacuate((StgClosure **)&wq->closure);
-       evacuate((StgClosure **)&wq->next_queue_entry);
-       evacuate((StgClosure **)&wq->prev_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgTVarWatchQueue);
-       break;
-      }
+       StgPtr end;
 
 
-    case TVAR:
-      {
-       StgTVar *tvar = ((StgTVar *) p);
-       gct->evac_gen = 0;
-       evacuate((StgClosure **)&tvar->current_value);
-       evacuate((StgClosure **)&tvar->first_watch_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgTVar);
-       break;
-      }
+       gct->eager_promotion = rtsFalse;
 
 
-    case TREC_HEADER:
-      {
-        StgTRecHeader *trec = ((StgTRecHeader *) p);
-        gct->evac_gen = 0;
-       evacuate((StgClosure **)&trec->enclosing_trec);
-       evacuate((StgClosure **)&trec->current_chunk);
-       evacuate((StgClosure **)&trec->invariants_to_check);
-       gct->evac_gen = saved_evac_gen;
+       end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+       for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+           evacuate((StgClosure **)p);
+       }
+       p += info->layout.payload.nptrs;
+
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
        gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgTRecHeader);
-        break;
+       break;
       }
 
     case TREC_CHUNK:
       }
 
     case TREC_CHUNK:
@@ -715,44 +693,19 @@ scavenge_block (bdescr *bd)
        StgWord i;
        StgTRecChunk *tc = ((StgTRecChunk *) p);
        TRecEntry *e = &(tc -> entries[0]);
        StgWord i;
        StgTRecChunk *tc = ((StgTRecChunk *) p);
        TRecEntry *e = &(tc -> entries[0]);
-       gct->evac_gen = 0;
+       gct->eager_promotion = rtsFalse;
        evacuate((StgClosure **)&tc->prev_chunk);
        for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
          evacuate((StgClosure **)&e->tvar);
          evacuate((StgClosure **)&e->expected_value);
          evacuate((StgClosure **)&e->new_value);
        }
        evacuate((StgClosure **)&tc->prev_chunk);
        for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
          evacuate((StgClosure **)&e->tvar);
          evacuate((StgClosure **)&e->expected_value);
          evacuate((StgClosure **)&e->new_value);
        }
-       gct->evac_gen = saved_evac_gen;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
        p += sizeofW(StgTRecChunk);
        break;
       }
 
        gct->failed_to_evac = rtsTrue; // mutable
        p += sizeofW(StgTRecChunk);
        break;
       }
 
-    case ATOMIC_INVARIANT:
-      {
-        StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
-        gct->evac_gen = 0;
-       evacuate(&invariant->code);
-       evacuate((StgClosure **)&invariant->last_execution);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgAtomicInvariant);
-        break;
-      }
-
-    case INVARIANT_CHECK_QUEUE:
-      {
-        StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
-        gct->evac_gen = 0;
-       evacuate((StgClosure **)&queue->invariant);
-       evacuate((StgClosure **)&queue->my_execution);
-       evacuate((StgClosure **)&queue->next_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgInvariantCheckQueue);
-        break;
-      }
-
     default:
        barf("scavenge: unimplemented/strange closure type %d @ %p", 
             info->type, p);
     default:
        barf("scavenge: unimplemented/strange closure type %d @ %p", 
             info->type, p);
@@ -806,10 +759,10 @@ scavenge_mark_stack(void)
 {
     StgPtr p, q;
     StgInfoTable *info;
 {
     StgPtr p, q;
     StgInfoTable *info;
-    generation *saved_evac_gen;
+    rtsBool saved_eager_promotion;
 
     gct->evac_gen = oldest_gen;
 
     gct->evac_gen = oldest_gen;
-    saved_evac_gen = gct->evac_gen;
+    saved_eager_promotion = gct->eager_promotion;
 
     while ((p = pop_mark_stack())) {
 
 
     while ((p = pop_mark_stack())) {
 
@@ -822,8 +775,6 @@ scavenge_mark_stack(void)
         case MVAR_CLEAN:
         case MVAR_DIRTY:
         { 
         case MVAR_CLEAN:
         case MVAR_DIRTY:
         { 
-            rtsBool saved_eager_promotion = gct->eager_promotion;
-            
             StgMVar *mvar = ((StgMVar *)p);
             gct->eager_promotion = rtsFalse;
             evacuate((StgClosure **)&mvar->head);
             StgMVar *mvar = ((StgMVar *)p);
             gct->eager_promotion = rtsFalse;
             evacuate((StgClosure **)&mvar->head);
@@ -906,7 +857,7 @@ scavenge_mark_stack(void)
        gen_obj:
        case CONSTR:
        case WEAK:
        gen_obj:
        case CONSTR:
        case WEAK:
-       case STABLE_NAME:
+       case PRIM:
        {
            StgPtr end;
            
        {
            StgPtr end;
            
@@ -938,8 +889,6 @@ scavenge_mark_stack(void)
 
        case MUT_VAR_CLEAN:
        case MUT_VAR_DIRTY: {
 
        case MUT_VAR_CLEAN:
        case MUT_VAR_DIRTY: {
-           rtsBool saved_eager_promotion = gct->eager_promotion;
-           
            gct->eager_promotion = rtsFalse;
            evacuate(&((StgMutVar *)p)->var);
            gct->eager_promotion = saved_eager_promotion;
            gct->eager_promotion = rtsFalse;
            evacuate(&((StgMutVar *)p)->var);
            gct->eager_promotion = saved_eager_promotion;
@@ -986,13 +935,10 @@ scavenge_mark_stack(void)
        case MUT_ARR_PTRS_DIRTY:
            // follow everything 
        {
        case MUT_ARR_PTRS_DIRTY:
            // follow everything 
        {
-           rtsBool saved_eager;
-
            // We don't eagerly promote objects pointed to by a mutable
            // array, but if we find the array only points to objects in
            // the same or an older generation, we mark it "clean" and
            // avoid traversing it during minor GCs.
            // We don't eagerly promote objects pointed to by a mutable
            // array, but if we find the array only points to objects in
            // the same or an older generation, we mark it "clean" and
            // avoid traversing it during minor GCs.
-           saved_eager = gct->eager_promotion;
            gct->eager_promotion = rtsFalse;
 
             scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
            gct->eager_promotion = rtsFalse;
 
             scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
@@ -1003,7 +949,7 @@ scavenge_mark_stack(void)
                 ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
             }
 
                 ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
             }
 
-           gct->eager_promotion = saved_eager;
+           gct->eager_promotion = saved_eager_promotion;
            gct->failed_to_evac = rtsTrue; // mutable anyhow.
            break;
        }
            gct->failed_to_evac = rtsTrue; // mutable anyhow.
            break;
        }
@@ -1032,81 +978,39 @@ scavenge_mark_stack(void)
            break;
        }
 
            break;
        }
 
-       case TVAR_WATCH_QUEUE:
-         {
-           StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
-           gct->evac_gen = 0;
-            evacuate((StgClosure **)&wq->closure);
-           evacuate((StgClosure **)&wq->next_queue_entry);
-           evacuate((StgClosure **)&wq->prev_queue_entry);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-           break;
-         }
-         
-       case TVAR:
-         {
-           StgTVar *tvar = ((StgTVar *) p);
-           gct->evac_gen = 0;
-           evacuate((StgClosure **)&tvar->current_value);
-           evacuate((StgClosure **)&tvar->first_watch_queue_entry);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-           break;
-         }
-         
+        case MUT_PRIM:
+        {
+            StgPtr end;
+            
+            gct->eager_promotion = rtsFalse;
+            
+            end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+            for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+                evacuate((StgClosure **)p);
+            }
+            
+            gct->eager_promotion = saved_eager_promotion;
+            gct->failed_to_evac = rtsTrue; // mutable
+            break;
+        }
+
        case TREC_CHUNK:
          {
            StgWord i;
            StgTRecChunk *tc = ((StgTRecChunk *) p);
            TRecEntry *e = &(tc -> entries[0]);
        case TREC_CHUNK:
          {
            StgWord i;
            StgTRecChunk *tc = ((StgTRecChunk *) p);
            TRecEntry *e = &(tc -> entries[0]);
-           gct->evac_gen = 0;
+           gct->eager_promotion = rtsFalse;
            evacuate((StgClosure **)&tc->prev_chunk);
            for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
              evacuate((StgClosure **)&e->tvar);
              evacuate((StgClosure **)&e->expected_value);
              evacuate((StgClosure **)&e->new_value);
            }
            evacuate((StgClosure **)&tc->prev_chunk);
            for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
              evacuate((StgClosure **)&e->tvar);
              evacuate((StgClosure **)&e->expected_value);
              evacuate((StgClosure **)&e->new_value);
            }
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-           break;
-         }
-
-       case TREC_HEADER:
-         {
-           StgTRecHeader *trec = ((StgTRecHeader *) p);
-           gct->evac_gen = 0;
-           evacuate((StgClosure **)&trec->enclosing_trec);
-           evacuate((StgClosure **)&trec->current_chunk);
-           evacuate((StgClosure **)&trec->invariants_to_check);
-           gct->evac_gen = saved_evac_gen;
+           gct->eager_promotion = saved_eager_promotion;
            gct->failed_to_evac = rtsTrue; // mutable
            break;
          }
 
            gct->failed_to_evac = rtsTrue; // mutable
            break;
          }
 
-        case ATOMIC_INVARIANT:
-          {
-            StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
-            gct->evac_gen = 0;
-           evacuate(&invariant->code);
-           evacuate((StgClosure **)&invariant->last_execution);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-            break;
-          }
-
-        case INVARIANT_CHECK_QUEUE:
-          {
-            StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
-            gct->evac_gen = 0;
-           evacuate((StgClosure **)&queue->invariant);
-           evacuate((StgClosure **)&queue->my_execution);
-            evacuate((StgClosure **)&queue->next_queue_entry);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-            break;
-          }
-
        default:
            barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p", 
                 info->type, p);
        default:
            barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p", 
                 info->type, p);
@@ -1133,9 +1037,11 @@ static rtsBool
 scavenge_one(StgPtr p)
 {
     const StgInfoTable *info;
 scavenge_one(StgPtr p)
 {
     const StgInfoTable *info;
-    generation *saved_evac_gen = gct->evac_gen;
     rtsBool no_luck;
     rtsBool no_luck;
+    rtsBool saved_eager_promotion;
     
     
+    saved_eager_promotion = gct->eager_promotion;
+
     ASSERT(LOOKS_LIKE_CLOSURE_PTR(p));
     info = get_itbl((StgClosure *)p);
     
     ASSERT(LOOKS_LIKE_CLOSURE_PTR(p));
     info = get_itbl((StgClosure *)p);
     
@@ -1144,8 +1050,6 @@ scavenge_one(StgPtr p)
     case MVAR_CLEAN:
     case MVAR_DIRTY:
     { 
     case MVAR_CLEAN:
     case MVAR_DIRTY:
     { 
-       rtsBool saved_eager_promotion = gct->eager_promotion;
-
        StgMVar *mvar = ((StgMVar *)p);
        gct->eager_promotion = rtsFalse;
        evacuate((StgClosure **)&mvar->head);
        StgMVar *mvar = ((StgMVar *)p);
        gct->eager_promotion = rtsFalse;
        evacuate((StgClosure **)&mvar->head);
@@ -1190,6 +1094,7 @@ scavenge_one(StgPtr p)
     case CONSTR_0_2:
     case CONSTR_2_0:
     case WEAK:
     case CONSTR_0_2:
     case CONSTR_2_0:
     case WEAK:
+    case PRIM:
     case IND_PERM:
     {
        StgPtr q, end;
     case IND_PERM:
     {
        StgPtr q, end;
@@ -1204,7 +1109,6 @@ scavenge_one(StgPtr p)
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY: {
        StgPtr q = p;
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY: {
        StgPtr q = p;
-       rtsBool saved_eager_promotion = gct->eager_promotion;
 
        gct->eager_promotion = rtsFalse;
        evacuate(&((StgMutVar *)p)->var);
 
        gct->eager_promotion = rtsFalse;
        evacuate(&((StgMutVar *)p)->var);
@@ -1254,13 +1158,10 @@ scavenge_one(StgPtr p)
     case MUT_ARR_PTRS_CLEAN:
     case MUT_ARR_PTRS_DIRTY:
     {
     case MUT_ARR_PTRS_CLEAN:
     case MUT_ARR_PTRS_DIRTY:
     {
-       rtsBool saved_eager;
-
        // We don't eagerly promote objects pointed to by a mutable
        // array, but if we find the array only points to objects in
        // the same or an older generation, we mark it "clean" and
        // avoid traversing it during minor GCs.
        // We don't eagerly promote objects pointed to by a mutable
        // array, but if we find the array only points to objects in
        // the same or an older generation, we mark it "clean" and
        // avoid traversing it during minor GCs.
-       saved_eager = gct->eager_promotion;
        gct->eager_promotion = rtsFalse;
 
         scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
        gct->eager_promotion = rtsFalse;
 
         scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
@@ -1271,7 +1172,7 @@ scavenge_one(StgPtr p)
            ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
        }
 
            ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
        }
 
-       gct->eager_promotion = saved_eager;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue;
        break;
     }
        gct->failed_to_evac = rtsTrue;
        break;
     }
@@ -1298,81 +1199,40 @@ scavenge_one(StgPtr p)
        break;
     }
   
        break;
     }
   
-    case TVAR_WATCH_QUEUE:
-      {
-       StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
-       gct->evac_gen = 0;
-        evacuate((StgClosure **)&wq->closure);
-        evacuate((StgClosure **)&wq->next_queue_entry);
-        evacuate((StgClosure **)&wq->prev_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       break;
-      }
+    case MUT_PRIM:
+    {
+       StgPtr end;
+        
+       gct->eager_promotion = rtsFalse;
+        
+       end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+       for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+           evacuate((StgClosure **)p);
+       }
 
 
-    case TVAR:
-      {
-       StgTVar *tvar = ((StgTVar *) p);
-       gct->evac_gen = 0;
-       evacuate((StgClosure **)&tvar->current_value);
-        evacuate((StgClosure **)&tvar->first_watch_queue_entry);
-       gct->evac_gen = saved_evac_gen;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
        break;
        gct->failed_to_evac = rtsTrue; // mutable
        break;
-      }
 
 
-    case TREC_HEADER:
-      {
-        StgTRecHeader *trec = ((StgTRecHeader *) p);
-        gct->evac_gen = 0;
-       evacuate((StgClosure **)&trec->enclosing_trec);
-       evacuate((StgClosure **)&trec->current_chunk);
-        evacuate((StgClosure **)&trec->invariants_to_check);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-        break;
-      }
+    }
 
     case TREC_CHUNK:
       {
        StgWord i;
        StgTRecChunk *tc = ((StgTRecChunk *) p);
        TRecEntry *e = &(tc -> entries[0]);
 
     case TREC_CHUNK:
       {
        StgWord i;
        StgTRecChunk *tc = ((StgTRecChunk *) p);
        TRecEntry *e = &(tc -> entries[0]);
-       gct->evac_gen = 0;
+       gct->eager_promotion = rtsFalse;
        evacuate((StgClosure **)&tc->prev_chunk);
        for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
          evacuate((StgClosure **)&e->tvar);
          evacuate((StgClosure **)&e->expected_value);
          evacuate((StgClosure **)&e->new_value);
        }
        evacuate((StgClosure **)&tc->prev_chunk);
        for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
          evacuate((StgClosure **)&e->tvar);
          evacuate((StgClosure **)&e->expected_value);
          evacuate((StgClosure **)&e->new_value);
        }
-       gct->evac_gen = saved_evac_gen;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
        break;
       }
 
        gct->failed_to_evac = rtsTrue; // mutable
        break;
       }
 
-    case ATOMIC_INVARIANT:
-    {
-      StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
-      gct->evac_gen = 0;
-      evacuate(&invariant->code);
-      evacuate((StgClosure **)&invariant->last_execution);
-      gct->evac_gen = saved_evac_gen;
-      gct->failed_to_evac = rtsTrue; // mutable
-      break;
-    }
-
-    case INVARIANT_CHECK_QUEUE:
-    {
-      StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
-      gct->evac_gen = 0;
-      evacuate((StgClosure **)&queue->invariant);
-      evacuate((StgClosure **)&queue->my_execution);
-      evacuate((StgClosure **)&queue->next_queue_entry);
-      gct->evac_gen = saved_evac_gen;
-      gct->failed_to_evac = rtsTrue; // mutable
-      break;
-    }
-
     case IND:
         // IND can happen, for example, when the interpreter allocates
         // a gigantic AP closure (more than one block), which ends up
     case IND:
         // IND can happen, for example, when the interpreter allocates
         // a gigantic AP closure (more than one block), which ends up
@@ -1470,8 +1330,8 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
                continue;
            case MUT_ARR_PTRS_DIRTY:
             {
                continue;
            case MUT_ARR_PTRS_DIRTY:
             {
-                rtsBool saved_eager;
-                saved_eager = gct->eager_promotion;
+                rtsBool saved_eager_promotion;
+                saved_eager_promotion = gct->eager_promotion;
                 gct->eager_promotion = rtsFalse;
 
                 scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p);
                 gct->eager_promotion = rtsFalse;
 
                 scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p);
@@ -1482,7 +1342,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
                     ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
                 }
 
                     ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
                 }
 
-                gct->eager_promotion = saved_eager;
+                gct->eager_promotion = saved_eager_promotion;
                 gct->failed_to_evac = rtsFalse;
                recordMutableGen_GC((StgClosure *)p,gen->no);
                continue;
                 gct->failed_to_evac = rtsFalse;
                recordMutableGen_GC((StgClosure *)p,gen->no);
                continue;