Use message-passing to implement throwTo in the RTS
authorSimon Marlow <marlowsd@gmail.com>
Thu, 11 Mar 2010 09:57:44 +0000 (09:57 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Thu, 11 Mar 2010 09:57:44 +0000 (09:57 +0000)
This replaces some complicated locking schemes with message-passing
in the implementation of throwTo. The benefits are

 - previously it was impossible to guarantee that a throwTo from
   a thread running on one CPU to a thread running on another CPU
   would be noticed, and we had to rely on the GC to pick up these
   forgotten exceptions. This no longer happens.

 - the locking regime is simpler (though the code is about the same
   size)

 - threads can be unblocked from a blocked_exceptions queue without
   having to traverse the whole queue now.  It's a rare case, but
   replaces an O(n) operation with an O(1).

 - generally we move in the direction of sharing less between
   Capabilities (aka HECs), which will become important with other
   changes we have planned.

Also in this patch I replaced several STM-specific closure types with
a generic MUT_PRIM closure type, which allowed a lot of code in the GC
and other places to go away, hence the line-count reduction.  The
message-passing changes resulted in about a net zero line-count
difference.

35 files changed:
includes/rts/Constants.h
includes/rts/storage/ClosureMacros.h
includes/rts/storage/ClosureTypes.h
includes/rts/storage/Closures.h
includes/rts/storage/SMPClosureOps.h
includes/rts/storage/TSO.h
includes/stg/MiscClosures.h
rts/Capability.c
rts/Capability.h
rts/ClosureFlags.c
rts/Exception.cmm
rts/FrontPanel.c
rts/HeapStackCheck.cmm
rts/LdvProfile.c
rts/PrimOps.cmm
rts/Printer.c
rts/ProfHeap.c
rts/RaiseAsync.c
rts/RaiseAsync.h
rts/RetainerProfile.c
rts/RtsMessages.c
rts/STM.c
rts/Schedule.c
rts/Schedule.h
rts/StgMiscClosures.cmm
rts/Threads.c
rts/Threads.h
rts/Trace.h
rts/eventlog/EventLog.h
rts/sm/Compact.c
rts/sm/Evac.c
rts/sm/GC.c
rts/sm/MarkWeak.c
rts/sm/Sanity.c
rts/sm/Scav.c

index 54a1ca7..bfc77fa 100644 (file)
 #define NotBlocked          0
 #define BlockedOnMVar       1
 #define BlockedOnBlackHole  2
-#define BlockedOnException  3
-#define BlockedOnRead       4
-#define BlockedOnWrite      5
-#define BlockedOnDelay      6
-#define BlockedOnSTM        7
+#define BlockedOnRead       3
+#define BlockedOnWrite      4
+#define BlockedOnDelay      5
+#define BlockedOnSTM        6
 
 /* Win32 only: */
-#define BlockedOnDoProc     8
+#define BlockedOnDoProc     7
 
 /* Only relevant for PAR: */
   /* blocked on a remote closure represented by a Global Address: */
-#define BlockedOnGA         9
+#define BlockedOnGA         8
   /* same as above but without sending a Fetch message */
-#define BlockedOnGA_NoSend  10
+#define BlockedOnGA_NoSend  9
 /* Only relevant for THREADED_RTS: */
-#define BlockedOnCCall      11
-#define BlockedOnCCall_NoUnblockExc 12
+#define BlockedOnCCall      10
+#define BlockedOnCCall_NoUnblockExc 11
    /* same as above but don't unblock async exceptions in resumeThread() */
 
+/* Involved in a message sent to tso->msg_cap */
+#define BlockedOnMsgWakeup  12
+#define BlockedOnMsgThrowTo 13
 /*
  * These constants are returned to the scheduler by a thread that has
  * stopped for one reason or another.  See typedef StgThreadReturnCode
index f73d2c5..a115f6f 100644 (file)
@@ -335,18 +335,8 @@ closure_sizeW_ (StgClosure *p, StgInfoTable *info)
        return tso_sizeW((StgTSO *)p);
     case BCO:
        return bco_sizeW((StgBCO *)p);
-    case TVAR_WATCH_QUEUE:
-        return sizeofW(StgTVarWatchQueue);
-    case TVAR:
-        return sizeofW(StgTVar);
     case TREC_CHUNK:
         return sizeofW(StgTRecChunk);
-    case TREC_HEADER:
-        return sizeofW(StgTRecHeader);
-    case ATOMIC_INVARIANT:
-        return sizeofW(StgAtomicInvariant);
-    case INVARIANT_CHECK_QUEUE:
-        return sizeofW(StgInvariantCheckQueue);
     default:
        return sizeW_fromITBL(info);
     }
index 508dce2..6a76772 100644 (file)
 #define MUT_VAR_CLEAN          50
 #define MUT_VAR_DIRTY          51
 #define WEAK                   52
-#define STABLE_NAME            53
-#define TSO                    54
-#define TVAR_WATCH_QUEUE        55
-#define INVARIANT_CHECK_QUEUE   56
-#define ATOMIC_INVARIANT        57
-#define TVAR                    58
-#define TREC_CHUNK              59
-#define TREC_HEADER             60
-#define ATOMICALLY_FRAME        61
-#define CATCH_RETRY_FRAME       62
-#define CATCH_STM_FRAME         63
-#define WHITEHOLE               64
-#define N_CLOSURE_TYPES         65
+#define PRIM                   53
+#define MUT_PRIM                54
+#define TSO                    55
+#define TREC_CHUNK              56
+#define ATOMICALLY_FRAME        57
+#define CATCH_RETRY_FRAME       58
+#define CATCH_STM_FRAME         59
+#define WHITEHOLE               60
+#define N_CLOSURE_TYPES         61
 
 #endif /* RTS_STORAGE_CLOSURETYPES_H */
index 8928268..d7498e2 100644 (file)
@@ -390,10 +390,10 @@ typedef struct StgInvariantCheckQueue_ {
 
 struct StgTRecHeader_ {
   StgHeader                  header;
-  TRecState                  state;
   struct StgTRecHeader_     *enclosing_trec;
   StgTRecChunk              *current_chunk;
   StgInvariantCheckQueue    *invariants_to_check;
+  TRecState                  state;
 };
 
 typedef struct {
@@ -416,4 +416,27 @@ typedef struct {
   StgClosure    *alt_code;
 } StgCatchRetryFrame;
 
+/* ----------------------------------------------------------------------------
+   Messages
+   ------------------------------------------------------------------------- */
+
+typedef struct Message_ {
+    StgHeader        header;
+    struct Message_ *link;
+} Message;
+
+typedef struct MessageWakeup_ {
+    StgHeader header;
+    Message  *link;
+    StgTSO   *tso;
+} MessageWakeup;
+
+typedef struct MessageThrowTo_ {
+    StgHeader   header;
+    Message    *link;
+    StgTSO     *source;
+    StgTSO     *target;
+    StgClosure *exception;
+} MessageThrowTo;
+
 #endif /* RTS_STORAGE_CLOSURES_H */
index 582ec0e..8dee7cb 100644 (file)
@@ -18,6 +18,7 @@
 #else
 
 EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p);
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p);
 EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info);
 
 #if defined(THREADED_RTS)
@@ -43,11 +44,15 @@ EXTERN_INLINE StgInfoTable *lockClosure(StgClosure *p)
     } while (1);
 }
 
-EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+EXTERN_INLINE StgInfoTable *tryLockClosure(StgClosure *p)
 {
-    // This is a strictly ordered write, so we need a write_barrier():
-    write_barrier();
-    p->header.info = info;
+    StgWord info;
+    info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
+    if (info != (W_)&stg_WHITEHOLE_info) {
+        return (StgInfoTable *)info;
+    } else {
+        return NULL;
+    }
 }
 
 #else /* !THREADED_RTS */
@@ -56,12 +61,19 @@ EXTERN_INLINE StgInfoTable *
 lockClosure(StgClosure *p)
 { return (StgInfoTable *)p->header.info; }
 
-EXTERN_INLINE void
-unlockClosure(StgClosure *p STG_UNUSED, const StgInfoTable *info STG_UNUSED)
-{ /* nothing */ }
+EXTERN_INLINE StgInfoTable *
+tryLockClosure(StgClosure *p)
+{ return (StgInfoTable *)p->header.info; }
 
 #endif /* THREADED_RTS */
 
+EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
+{
+    // This is a strictly ordered write, so we need a write_barrier():
+    write_barrier();
+    p->header.info = info;
+}
+
 // Handy specialised versions of lockClosure()/unlockClosure()
 EXTERN_INLINE void lockTSO(StgTSO *tso);
 EXTERN_INLINE void lockTSO(StgTSO *tso)
index e8d97c5..e2015f2 100644 (file)
@@ -46,7 +46,8 @@ typedef struct {
 /* Reason for thread being blocked. See comment above struct StgTso_. */
 typedef union {
   StgClosure *closure;
-  struct StgTSO_ *tso;
+  struct MessageThrowTo_ *throwto;
+  struct MessageWakeup_  *wakeup;
   StgInt fd;   /* StgInt instead of int, so that it's the same size as the ptrs */
 #if defined(mingw32_HOST_OS)
   StgAsyncIOResult *async_result;
@@ -87,7 +88,8 @@ typedef struct StgTSO_ {
          will already be dirty.
     */
 
-    struct StgTSO_*         global_link;    /* Links all threads together */
+    struct StgTSO_*         global_link;    // Links threads on the
+                                            // generation->threads lists
     
     StgWord                 dirty;          /* non-zero => dirty */
     /*
@@ -108,9 +110,9 @@ typedef struct StgTSO_ {
      * setTSOLink().
      */
 
-    StgWord16               what_next;      /* Values defined in Constants.h */
-    StgWord16               why_blocked;    /* Values defined in Constants.h */
-    StgWord32               flags;
+    StgWord16               what_next;      // Values defined in Constants.h
+    StgWord16               why_blocked;    // Values defined in Constants.h
+    StgWord32               flags;          // Values defined in Constants.h
     StgTSOBlockInfo         block_info;
     StgThreadID             id;
     int                     saved_errno;
@@ -123,7 +125,7 @@ typedef struct StgTSO_ {
        exceptions.  In order to access this field, the TSO must be
        locked using lockClosure/unlockClosure (see SMP.h).
     */
-    struct StgTSO_ *        blocked_exceptions;
+    struct MessageThrowTo_ * blocked_exceptions;
 
 #ifdef TICKY_TICKY
     /* TICKY-specific stuff would go here. */
@@ -167,7 +169,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
 
        tso->why_blocked       tso->block_info      location
         ----------------------------------------------------------------------
-       NotBlocked             NULL                 runnable_queue, or running
+       NotBlocked             END_TSO_QUEUE        runnable_queue, or running
        
         BlockedOnBlackHole     the BLACKHOLE        blackhole_queue
        
@@ -175,7 +177,7 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
 
        BlockedOnSTM           END_TSO_QUEUE        STM wait queue(s)
        
-        BlockedOnException     the TSO              TSO->blocked_exception
+        BlockedOnMsgThrowTo    MessageThrowTo *     TSO->blocked_exception
 
         BlockedOnRead          NULL                 blocked_queue
         BlockedOnWrite         NULL                blocked_queue
@@ -189,7 +191,6 @@ void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
       
       tso->what_next == ThreadComplete or ThreadKilled
       tso->link     ==  (could be on some queue somewhere)
-      tso->su       ==  tso->stack + tso->stack_size
       tso->sp       ==  tso->stack + tso->stack_size - 1 (i.e. top stack word)
       tso->sp[0]    ==  return value of thread, if what_next == ThreadComplete,
                         exception             , if what_next == ThreadKilled
index e68282e..42e878f 100644 (file)
@@ -114,6 +114,8 @@ RTS_INFO(stg_MUT_ARR_PTRS_FROZEN0_info);
 RTS_INFO(stg_MUT_VAR_CLEAN_info);
 RTS_INFO(stg_MUT_VAR_DIRTY_info);
 RTS_INFO(stg_END_TSO_QUEUE_info);
+RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_THROWTO_info);
 RTS_INFO(stg_MUT_CONS_info);
 RTS_INFO(stg_catch_info);
 RTS_INFO(stg_PAP_info);
@@ -163,6 +165,8 @@ RTS_ENTRY(stg_MUT_ARR_PTRS_FROZEN0_entry);
 RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
 RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
 RTS_ENTRY(stg_END_TSO_QUEUE_entry);
+RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_THROWTO_entry);
 RTS_ENTRY(stg_MUT_CONS_entry);
 RTS_ENTRY(stg_catch_entry);
 RTS_ENTRY(stg_PAP_entry);
@@ -205,8 +209,6 @@ RTS_CLOSURE(stg_END_STM_CHUNK_LIST_closure);
 RTS_CLOSURE(stg_NO_TREC_closure);
 
 RTS_ENTRY(stg_NO_FINALIZER_entry);
-RTS_ENTRY(stg_END_EXCEPTION_LIST_entry);
-RTS_ENTRY(stg_EXCEPTION_CONS_entry);
 
 #if IN_STG_CODE
 extern DLL_IMPORT_RTS StgWordArray stg_CHARLIKE_closure;
index ce6eceb..5f54eca 100644 (file)
@@ -223,8 +223,7 @@ initCapability( Capability *cap, nat i )
     cap->suspended_ccalls  = NULL;
     cap->returning_tasks_hd = NULL;
     cap->returning_tasks_tl = NULL;
-    cap->wakeup_queue_hd    = END_TSO_QUEUE;
-    cap->wakeup_queue_tl    = END_TSO_QUEUE;
+    cap->inbox              = (Message*)END_TSO_QUEUE;
     cap->sparks_created     = 0;
     cap->sparks_converted   = 0;
     cap->sparks_pruned      = 0;
@@ -419,7 +418,7 @@ releaseCapability_ (Capability* cap,
     // If we have an unbound thread on the run queue, or if there's
     // anything else to do, give the Capability to a worker thread.
     if (always_wakeup || 
-        !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+        !emptyRunQueue(cap) || !emptyInbox(cap) ||
         !emptySparkPoolCap(cap) || globalWorkToDo()) {
        if (cap->spare_workers) {
            giveCapabilityToTask(cap,cap->spare_workers);
@@ -645,11 +644,11 @@ yieldCapability (Capability** pCap, Task *task)
  * ------------------------------------------------------------------------- */
 
 void
-wakeupThreadOnCapability (Capability *my_cap, 
+wakeupThreadOnCapability (Capability *cap,
                           Capability *other_cap, 
                           StgTSO *tso)
 {
-    ACQUIRE_LOCK(&other_cap->lock);
+    MessageWakeup *msg;
 
     // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
     if (tso->bound) {
@@ -658,27 +657,20 @@ wakeupThreadOnCapability (Capability *my_cap,
     }
     tso->cap = other_cap;
 
-    ASSERT(tso->bound ? tso->bound->task->cap == other_cap : 1);
+    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+           tso->block_info.closure->header.info == &stg_IND_info);
 
-    if (other_cap->running_task == NULL) {
-       // nobody is running this Capability, we can add our thread
-       // directly onto the run queue and start up a Task to run it.
+    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
 
-       other_cap->running_task = myTask(); 
-            // precond for releaseCapability_() and appendToRunQueue()
+    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+    msg->header.info = &stg_MSG_WAKEUP_info;
+    msg->tso = tso;
+    tso->block_info.closure = (StgClosure *)msg;
+    dirty_TSO(cap, tso);
+    write_barrier();
+    tso->why_blocked = BlockedOnMsgWakeup;
 
-       appendToRunQueue(other_cap,tso);
-
-       releaseCapability_(other_cap,rtsFalse);
-    } else {
-       appendToWakeupQueue(my_cap,other_cap,tso);
-        other_cap->context_switch = 1;
-       // someone is running on this Capability, so it cannot be
-       // freed without first checking the wakeup queue (see
-       // releaseCapability_).
-    }
-
-    RELEASE_LOCK(&other_cap->lock);
+    sendMessage(other_cap, (Message*)msg);
 }
 
 /* ----------------------------------------------------------------------------
@@ -881,8 +873,7 @@ markSomeCapabilities (evac_fn evac, void *user, nat i0, nat delta,
        evac(user, (StgClosure **)(void *)&cap->run_queue_hd);
        evac(user, (StgClosure **)(void *)&cap->run_queue_tl);
 #if defined(THREADED_RTS)
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_hd);
-       evac(user, (StgClosure **)(void *)&cap->wakeup_queue_tl);
+        evac(user, (StgClosure **)(void *)&cap->inbox);
 #endif
        for (incall = cap->suspended_ccalls; incall != NULL; 
             incall=incall->next) {
@@ -910,3 +901,29 @@ markCapabilities (evac_fn evac, void *user)
 {
     markSomeCapabilities(evac, user, 0, 1, rtsFalse);
 }
+
+/* -----------------------------------------------------------------------------
+   Messages
+   -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *cap, Message *msg)
+{
+    ACQUIRE_LOCK(&cap->lock);
+
+    msg->link = cap->inbox;
+    cap->inbox = msg;
+
+    if (cap->running_task == NULL) {
+       cap->running_task = myTask(); 
+            // precond for releaseCapability_()
+       releaseCapability_(cap,rtsFalse);
+    } else {
+        contextSwitchCapability(cap);
+    }
+
+    RELEASE_LOCK(&cap->lock);
+}
+
+#endif // THREADED_RTS
index 41974dc..4030b5e 100644 (file)
@@ -88,11 +88,8 @@ struct Capability_ {
     Task *returning_tasks_hd; // Singly-linked, with head/tail
     Task *returning_tasks_tl;
 
-    // A list of threads to append to this Capability's run queue at
-    // the earliest opportunity.  These are threads that have been
-    // woken up by another Capability.
-    StgTSO *wakeup_queue_hd;
-    StgTSO *wakeup_queue_tl;
+    // Messages, or END_TSO_QUEUE.
+    Message *inbox;
 
     SparkPool *sparks;
 
@@ -285,6 +282,18 @@ void markCapabilities (evac_fn evac, void *user);
 void traverseSparkQueues (evac_fn evac, void *user);
 
 /* -----------------------------------------------------------------------------
+   Messages
+   -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
+
+void sendMessage (Capability *cap, Message *msg);
+
+#endif // THREADED_RTS
+
+/* -----------------------------------------------------------------------------
  * INLINE functions... private below here
  * -------------------------------------------------------------------------- */
 
@@ -333,6 +342,15 @@ contextSwitchCapability (Capability *cap)
     cap->context_switch = 1;
 }
 
+#ifdef THREADED_RTS
+
+INLINE_HEADER rtsBool emptyInbox(Capability *cap)
+{
+    return (cap->inbox == (Message*)END_TSO_QUEUE);
+}
+
+#endif
+
 END_RTS_PRIVATE
 
 #endif /* CAPABILITY_H */
index 477a892..358cb40 100644 (file)
@@ -74,20 +74,16 @@ StgWord16 closure_flags[] = {
  [MUT_VAR_CLEAN]       =  (_HNF|     _NS|         _MUT|_UPT           ),
  [MUT_VAR_DIRTY]       =  (_HNF|     _NS|         _MUT|_UPT           ),
  [WEAK]                        =  (_HNF|     _NS|              _UPT           ),
- [STABLE_NAME]         =  (_HNF|     _NS|              _UPT           ),
+ [PRIM]                =  (_HNF|     _NS|              _UPT           ),
+ [MUT_PRIM]            =  (_HNF|     _NS|         _MUT|_UPT           ),
  [TSO]                         =  (_HNF|     _NS|         _MUT|_UPT           ),
- [TVAR_WATCH_QUEUE]     =  (          _NS|         _MUT|_UPT           ),
- [INVARIANT_CHECK_QUEUE]=  (          _NS|         _MUT|_UPT           ),
- [ATOMIC_INVARIANT]     =  (          _NS|         _MUT|_UPT           ),
- [TVAR]                 =  (_HNF|     _NS|         _MUT|_UPT           ), 
  [TREC_CHUNK]           =  (          _NS|         _MUT|_UPT           ),
- [TREC_HEADER]          =  (          _NS|         _MUT|_UPT           ),
  [ATOMICALLY_FRAME]     =  (     _BTM                                  ),
  [CATCH_RETRY_FRAME]    =  (     _BTM                                  ),
  [CATCH_STM_FRAME]      =  (     _BTM                                  ),
  [WHITEHOLE]           =  ( 0                                         )
 };
 
-#if N_CLOSURE_TYPES != 65
+#if N_CLOSURE_TYPES != 61
 #error Closure types changed: update ClosureFlags.c!
 #endif
index 6c887c2..55c79ce 100644 (file)
@@ -56,7 +56,7 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
     CInt r;
 
     StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & 
-       ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+       %lobits32(~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
 
     /* Eagerly raise a blocked exception, if there is one */
     if (StgTSO_blocked_exceptions(CurrentTSO) != END_TSO_QUEUE) {
@@ -99,8 +99,8 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, RET_SMALL )
 
 INFO_TABLE_RET( stg_blockAsyncExceptionszh_ret, RET_SMALL )
 {
-    StgTSO_flags(CurrentTSO) = 
-       StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+    StgTSO_flags(CurrentTSO) = %lobits32(
+       TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
 
     Sp_adj(1);
     jump %ENTRY_CODE(Sp(0));
@@ -113,8 +113,8 @@ stg_blockAsyncExceptionszh
 
     if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) == 0) {
        
-       StgTSO_flags(CurrentTSO) = 
-          StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+       StgTSO_flags(CurrentTSO) = %lobits32(
+          TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
 
        /* avoid growing the stack unnecessarily */
        if (Sp(0) == stg_blockAsyncExceptionszh_ret_info) {
@@ -142,8 +142,8 @@ stg_unblockAsyncExceptionszh
     /* If exceptions are already unblocked, there's nothing to do */
     if ((TO_W_(StgTSO_flags(CurrentTSO)) & TSO_BLOCKEX) != 0) {
 
-       StgTSO_flags(CurrentTSO) = StgTSO_flags(CurrentTSO) & 
-          ~(TSO_BLOCKEX::I32|TSO_INTERRUPTIBLE::I32);
+       StgTSO_flags(CurrentTSO) = %lobits32(
+           TO_W_(StgTSO_flags(CurrentTSO)) & ~(TSO_BLOCKEX|TSO_INTERRUPTIBLE));
 
        /* avoid growing the stack unnecessarily */
        if (Sp(0) == stg_unblockAsyncExceptionszh_ret_info) {
@@ -252,27 +252,22 @@ stg_killThreadzh
        }
     } else {
        W_ out;
-       W_ retcode;
+       W_ msg;
        out = Sp - WDS(1); /* ok to re-use stack space here */
 
-       (retcode) = foreign "C" throwTo(MyCapability() "ptr",
-                                     CurrentTSO "ptr",
-                                     target "ptr",
-                                     exception "ptr",
-                                     out "ptr") [R1,R2];
+       (msg) = foreign "C" throwTo(MyCapability() "ptr",
+                                    CurrentTSO "ptr",
+                                    target "ptr",
+                                    exception "ptr") [R1,R2];
        
-       switch [THROWTO_SUCCESS .. THROWTO_BLOCKED] (retcode) {
-
-       case THROWTO_SUCCESS: {
+        if (msg == NULL) {
            jump %ENTRY_CODE(Sp(0));
-       }
-
-       case THROWTO_BLOCKED: {
-           R3 = W_[out];
-           // we must block, and call throwToReleaseTarget() before returning
+       } else {
+            StgTSO_why_blocked(CurrentTSO) = BlockedOnMsgThrowTo;
+            StgTSO_block_info(CurrentTSO) = msg;
+           // we must block, and unlock the message before returning
            jump stg_block_throwto;
        }
-       }
     }
 }
 
@@ -507,8 +502,8 @@ retry_pop_stack:
 
     /* Ensure that async excpetions are blocked when running the handler.
     */
-    StgTSO_flags(CurrentTSO) = 
-       StgTSO_flags(CurrentTSO) | TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32;
+    StgTSO_flags(CurrentTSO) = %lobits32(
+       TO_W_(StgTSO_flags(CurrentTSO)) | TSO_BLOCKEX | TSO_INTERRUPTIBLE);
 
     /* Call the handler, passing the exception value and a realworld
      * token as arguments.
index 163a7c0..ebba405 100644 (file)
@@ -697,7 +697,7 @@ residencyCensus( void )
                        break;
                        
                    case WEAK:
-                   case STABLE_NAME:
+                   case PRIM:
                    case MVAR:
                    case MUT_VAR:
 /*                 case MUT_CONS: FIXME: case does not exist */
index b516ef2..a528a3f 100644 (file)
@@ -631,9 +631,8 @@ INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
 
 stg_block_throwto_finally
 {
-#ifdef THREADED_RTS
-    foreign "C" throwToReleaseTarget (R3 "ptr");
-#endif
+    // unlock the throwto message
+    unlockClosure(StgTSO_block_info(CurrentTSO), stg_MSG_THROWTO_info);
     jump StgReturn;
 }
 
index c2e7d7e..ccaf10c 100644 (file)
@@ -109,7 +109,7 @@ processHeapClosureForDead( StgClosure *c )
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
     case BCO:
-    case STABLE_NAME:
+    case PRIM:
     case TVAR_WATCH_QUEUE:
     case TVAR:
     case TREC_HEADER:
index 5325c85..bf81eee 100644 (file)
@@ -542,9 +542,9 @@ stg_forkzh
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
-  StgTSO_flags(threadid) = 
-     StgTSO_flags(threadid) |  (StgTSO_flags(CurrentTSO) & 
-                                (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+  StgTSO_flags(threadid) = %lobits16(
+     TO_W_(StgTSO_flags(threadid)) | 
+     TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
 
   foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
 
@@ -572,9 +572,9 @@ stg_forkOnzh
                                closure "ptr") [];
 
   /* start blocked if the current thread is blocked */
-  StgTSO_flags(threadid) = 
-     StgTSO_flags(threadid) |  (StgTSO_flags(CurrentTSO) & 
-                                (TSO_BLOCKEX::I32 | TSO_INTERRUPTIBLE::I32));
+  StgTSO_flags(threadid) = %lobits16(
+     TO_W_(StgTSO_flags(threadid)) | 
+     TO_W_(StgTSO_flags(CurrentTSO)) & (TSO_BLOCKEX | TSO_INTERRUPTIBLE));
 
   foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
 
index 1b8a6dd..e981329 100644 (file)
@@ -160,6 +160,12 @@ printClosure( StgClosure *obj )
        printStdObjPayload(obj);
        break;
 
+    case PRIM:
+       debugBelch("PRIM(");
+       printPtr((StgPtr)obj->header.info);
+       printStdObjPayload(obj);
+       break;
+
     case THUNK:
     case THUNK_1_0: case THUNK_0_1:
     case THUNK_1_1: case THUNK_0_2: case THUNK_2_0:
@@ -356,10 +362,6 @@ printClosure( StgClosure *obj )
            /* ToDo: chase 'link' ? */
             break;
 
-    case STABLE_NAME:
-            debugBelch("STABLE_NAME(%lu)\n", (lnat)((StgStableName*)obj)->sn); 
-            break;
-
     case TSO:
       debugBelch("TSO("); 
       debugBelch("%lu (%p)",(unsigned long)(((StgTSO*)obj)->id), (StgTSO*)obj);
@@ -1132,14 +1134,10 @@ char *closure_type_names[] = {
  [MUT_VAR_CLEAN]         = "MUT_VAR_CLEAN",
  [MUT_VAR_DIRTY]         = "MUT_VAR_DIRTY",
  [WEAK]                  = "WEAK",
- [STABLE_NAME]           = "STABLE_NAME",
+ [PRIM]                         = "PRIM",
+ [MUT_PRIM]              = "MUT_PRIM",
  [TSO]                   = "TSO",
- [TVAR_WATCH_QUEUE]      = "TVAR_WATCH_QUEUE",
- [INVARIANT_CHECK_QUEUE] = "INVARIANT_CHECK_QUEUE",
- [ATOMIC_INVARIANT]      = "ATOMIC_INVARIANT",
- [TVAR]                  = "TVAR",
  [TREC_CHUNK]            = "TREC_CHUNK",
- [TREC_HEADER]           = "TREC_HEADER",
  [ATOMICALLY_FRAME]      = "ATOMICALLY_FRAME",
  [CATCH_RETRY_FRAME]     = "CATCH_RETRY_FRAME",
  [CATCH_STM_FRAME]       = "CATCH_STM_FRAME",
index 15337d4..e90051c 100644 (file)
@@ -912,7 +912,8 @@ heapCensusChain( Census *census, bdescr *bd )
             case MVAR_CLEAN:
             case MVAR_DIRTY:
            case WEAK:
-           case STABLE_NAME:
+           case PRIM:
+           case MUT_PRIM:
            case MUT_VAR_CLEAN:
            case MUT_VAR_DIRTY:
                prim = rtsTrue;
@@ -960,31 +961,6 @@ heapCensusChain( Census *census, bdescr *bd )
                break;
 #endif
 
-           case TREC_HEADER: 
-               prim = rtsTrue;
-               size = sizeofW(StgTRecHeader);
-               break;
-
-           case TVAR_WATCH_QUEUE:
-               prim = rtsTrue;
-               size = sizeofW(StgTVarWatchQueue);
-               break;
-               
-           case INVARIANT_CHECK_QUEUE:
-               prim = rtsTrue;
-               size = sizeofW(StgInvariantCheckQueue);
-               break;
-               
-           case ATOMIC_INVARIANT:
-               prim = rtsTrue;
-               size = sizeofW(StgAtomicInvariant);
-               break;
-               
-           case TVAR:
-               prim = rtsTrue;
-               size = sizeofW(StgTVar);
-               break;
-               
            case TREC_CHUNK:
                prim = rtsTrue;
                size = sizeofW(StgTRecChunk);
index ca5e5ea..d54f823 100644 (file)
@@ -30,10 +30,14 @@ static void raiseAsync (Capability *cap,
 
 static void removeFromQueues(Capability *cap, StgTSO *tso);
 
-static void blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target);
+static void blockedThrowTo (Capability *cap, 
+                            StgTSO *target, MessageThrowTo *msg);
 
-static void performBlockedException (Capability *cap, 
-                                    StgTSO *source, StgTSO *target);
+static void throwToSendMsg (Capability *cap USED_IF_THREADS,
+                            Capability *target_cap USED_IF_THREADS, 
+                            MessageThrowTo *msg USED_IF_THREADS);
+
+static void performBlockedException (Capability *cap, MessageThrowTo *msg);
 
 /* -----------------------------------------------------------------------------
    throwToSingleThreaded
@@ -96,59 +100,85 @@ suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
    may be blocked and could be woken up at any point by another CPU.
    We have some delicate synchronisation to do.
 
-   There is a completely safe fallback scheme: it is always possible
-   to just block the source TSO on the target TSO's blocked_exceptions
-   queue.  This queue is locked using lockTSO()/unlockTSO().  It is
-   checked at regular intervals: before and after running a thread
-   (schedule() and threadPaused() respectively), and just before GC
-   (scheduleDoGC()).  Activating a thread on this queue should be done
-   using maybePerformBlockedException(): this is done in the context
-   of the target thread, so the exception can be raised eagerly.
-
-   This fallback scheme works even if the target thread is complete or
-   killed: scheduleDoGC() will discover the blocked thread before the
-   target is GC'd.
-
-   Blocking the source thread on the target thread's blocked_exception
-   queue is also employed when the target thread is currently blocking
-   exceptions (ie. inside Control.Exception.block).
-
-   We could use the safe fallback scheme exclusively, but that
-   wouldn't be ideal: most calls to throwTo would block immediately,
-   possibly until the next GC, which might require the deadlock
-   detection mechanism to kick in.  So we try to provide promptness
-   wherever possible.
-
-   We can promptly deliver the exception if the target thread is:
-
-     - runnable, on the same Capability as the source thread (because
-       we own the run queue and therefore the target thread).
-   
-     - blocked, and we can obtain exclusive access to it.  Obtaining
-       exclusive access to the thread depends on how it is blocked.
-
-   We must also be careful to not trip over threadStackOverflow(),
-   which might be moving the TSO to enlarge its stack.
-   lockTSO()/unlockTSO() are used here too.
-
+   The underlying scheme when multiple Capabilities are in use is
+   message passing: when the target of a throwTo is on another
+   Capability, we send a message (a MessageThrowTo closure) to that
+   Capability.
+
+   If the throwTo needs to block because the target TSO is masking
+   exceptions (the TSO_BLOCKEX flag), then the message is placed on
+   the blocked_exceptions queue attached to the target TSO.  When the
+   target TSO enters the unmasked state again, it must check the
+   queue.  The blocked_exceptions queue is not locked; only the
+   Capability owning the TSO may modify it.
+
+   To make things simpler for throwTo, we always create the message
+   first before deciding what to do.  The message may get sent, or it
+   may get attached to a TSO's blocked_exceptions queue, or the
+   exception may get thrown immediately and the message dropped,
+   depending on the current state of the target.
+
+   Currently we send a message if the target belongs to another
+   Capability, and it is
+
+     - NotBlocked, BlockedOnMsgWakeup, BlockedOnMsgThrowTo,
+       BlockedOnCCall
+
+     - or it is masking exceptions (TSO_BLOCKEX)
+
+   Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
+   BlockedOnBlackHole then we acquire ownership of the TSO by locking
+   its parent container (e.g. the MVar) and then raise the exception.
+   We might change these cases to be more message-passing-like in the
+   future.
+  
    Returns: 
 
-   THROWTO_SUCCESS    exception was raised, ok to continue
+   NULL               exception was raised, ok to continue
 
-   THROWTO_BLOCKED    exception was not raised; block the source
-                      thread then call throwToReleaseTarget() when
-                     the source thread is properly tidied away.
+   MessageThrowTo *   exception was not raised; the source TSO
+                      should now put itself in the state 
+                      BlockedOnMsgThrowTo, and when it is ready
+                      it should unlock the mssage using
+                      unlockClosure(msg, &stg_MSG_THROWTO_info);
+                      If it decides not to raise the exception after
+                      all, it can revoke it safely with
+                      unlockClosure(msg, &stg_IND_info);
 
    -------------------------------------------------------------------------- */
 
-nat
+MessageThrowTo *
 throwTo (Capability *cap,      // the Capability we hold 
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
-        StgClosure *exception, // the exception closure
-        /*[out]*/ void **out USED_IF_THREADS)
+        StgClosure *exception) // the exception closure
+{
+    MessageThrowTo *msg;
+
+    msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
+    // message starts locked; the caller has to unlock it when it is
+    // ready.
+    msg->header.info = &stg_WHITEHOLE_info;
+    msg->source      = source;
+    msg->target      = target;
+    msg->exception   = exception;
+
+    switch (throwToMsg(cap, msg))
+    {
+    case THROWTO_SUCCESS:
+        return NULL;
+    case THROWTO_BLOCKED:
+    default:
+        return msg;
+    }
+}
+    
+
+nat
+throwToMsg (Capability *cap, MessageThrowTo *msg)
 {
     StgWord status;
+    StgTSO *target = msg->target;
 
     ASSERT(target != END_TSO_QUEUE);
 
@@ -159,13 +189,10 @@ throwTo (Capability *cap, // the Capability we hold
        // ASSERT(get_itbl(target)->type == TSO);
     }
 
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: from thread %lu to thread %lu",
-                   (unsigned long)source->id, (unsigned long)target->id);
-    } else {
-        debugTrace(DEBUG_sched, "throwTo: from RTS to thread %lu",
-                   (unsigned long)target->id);
-    }
+    debugTraceCap(DEBUG_sched, cap,
+                  "throwTo: from thread %lu to thread %lu",
+                  (unsigned long)msg->source->id, 
+                  (unsigned long)msg->target->id);
 
 #ifdef DEBUG
     traceThreadStatus(DEBUG_sched, target);
@@ -173,6 +200,7 @@ throwTo (Capability *cap,   // the Capability we hold
 
     goto check_target;
 retry:
+    write_barrier();
     debugTrace(DEBUG_sched, "throwTo: retrying...");
 
 check_target:
@@ -188,6 +216,7 @@ check_target:
     
     switch (status) {
     case NotBlocked:
+    case BlockedOnMsgWakeup:
        /* if status==NotBlocked, and target->cap == cap, then
           we own this TSO and can raise the exception.
           
@@ -251,37 +280,80 @@ check_target:
 
        write_barrier();
        target_cap = target->cap;
-       if (target_cap == cap && (target->flags & TSO_BLOCKEX) == 0) {
-           // It's on our run queue and not blocking exceptions
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           return THROWTO_SUCCESS;
-       } else {
-           // Otherwise, just block on the blocked_exceptions queue
-           // of the target thread.  The queue will get looked at
-           // soon enough: it is checked before and after running a
-           // thread, and during GC.
-           lockTSO(target);
-
-           // Avoid race with threadStackOverflow, which may have
-           // just moved this TSO.
-           if (target->what_next == ThreadRelocated) {
-               unlockTSO(target);
-               target = target->_link;
-               goto retry;
-           }
-            // check again for ThreadComplete and ThreadKilled.  This
-            // cooperates with scheduleHandleThreadFinished to ensure
-            // that we never miss any threads that are throwing an
-            // exception to a thread in the process of terminating.
-            if (target->what_next == ThreadComplete
-                || target->what_next == ThreadKilled) {
-               unlockTSO(target);
+       if (target_cap != cap) {
+            throwToSendMsg(cap, target_cap, msg);
+            return THROWTO_BLOCKED;
+        } else {
+            if ((target->flags & TSO_BLOCKEX) == 0) {
+                // It's on our run queue and not blocking exceptions
+                raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
                 return THROWTO_SUCCESS;
+            } else {
+                blockedThrowTo(cap,target,msg);
+                return THROWTO_BLOCKED;
             }
-           blockedThrowTo(cap,source,target);
-           *out = target;
-           return THROWTO_BLOCKED;
-       }
+        }
+    }
+
+    case BlockedOnMsgThrowTo:
+    {
+        Capability *target_cap;
+        const StgInfoTable *i;
+        MessageThrowTo *m;
+
+        m = target->block_info.throwto;
+
+        // target is local to this cap, but has sent a throwto
+        // message to another cap.
+        //
+        // The source message is locked.  We need to revoke the
+        // target's message so that we can raise the exception, so
+        // we attempt to lock it.
+
+        // There's a possibility of a deadlock if two threads are both
+        // trying to throwTo each other (or more generally, a cycle of
+        // threads).  To break the symmetry we compare the addresses
+        // of the MessageThrowTo objects, and the one for which m <
+        // msg gets to spin, while the other can only try to lock
+        // once, but must then back off and unlock both before trying
+        // again.
+        if (m < msg) {
+            i = lockClosure((StgClosure *)m);
+        } else {
+            i = tryLockClosure((StgClosure *)m);
+            if (i == NULL) {
+//            debugBelch("collision\n");
+                throwToSendMsg(cap, target->cap, msg);
+                return THROWTO_BLOCKED;
+            }
+        }
+
+        if (i != &stg_MSG_THROWTO_info) {
+            // if it's an IND, this TSO has been woken up by another Cap
+            unlockClosure((StgClosure*)m, i);
+            goto retry;
+        }
+
+        target_cap = target->cap;
+        if (target_cap != cap) {
+            unlockClosure((StgClosure*)m, i);
+            throwToSendMsg(cap, target_cap, msg);
+            return THROWTO_BLOCKED;
+        }
+
+       if ((target->flags & TSO_BLOCKEX) &&
+           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
+            unlockClosure((StgClosure*)m, i);
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
+        }
+
+        // nobody else can wake up this TSO after we claim the message
+        unlockClosure((StgClosure*)m, &stg_IND_info);
+
+        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+        unblockOne(cap, target);
+        return THROWTO_SUCCESS;
     }
 
     case BlockedOnMVar:
@@ -322,14 +394,17 @@ check_target:
 
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockClosure((StgClosure *)target);
-           blockedThrowTo(cap,source,target);
+            Capability *target_cap = target->cap;
+            if (target->cap != cap) {
+                throwToSendMsg(cap,target_cap,msg);
+            } else {
+                blockedThrowTo(cap,target,msg);
+            }
            unlockClosure((StgClosure *)mvar, info);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
+           return THROWTO_BLOCKED;
        } else {
            removeThreadFromMVarQueue(cap, mvar, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unblockOne(cap, target);
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_SUCCESS;
@@ -346,84 +421,23 @@ check_target:
        }
 
        if (target->flags & TSO_BLOCKEX) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
+            Capability *target_cap = target->cap;
+            if (target->cap != cap) {
+                throwToSendMsg(cap,target_cap,msg);
+            } else {
+                blockedThrowTo(cap,target,msg);
+            }
            RELEASE_LOCK(&sched_mutex);
-           *out = target;
-           return THROWTO_BLOCKED; // caller releases TSO
+           return THROWTO_BLOCKED; // caller releases lock
        } else {
            removeThreadFromQueue(cap, &blackhole_queue, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unblockOne(cap, target);
            RELEASE_LOCK(&sched_mutex);
            return THROWTO_SUCCESS;
        }
     }
 
-    case BlockedOnException:
-    {
-       StgTSO *target2;
-       StgInfoTable *info;
-
-       /*
-         To obtain exclusive access to a BlockedOnException thread,
-         we must call lockClosure() on the TSO on which it is blocked.
-         Since the TSO might change underneath our feet, after we
-         call lockClosure() we must check that 
-          
-             (a) the closure we locked is actually a TSO
-            (b) the original thread is still  BlockedOnException,
-            (c) the original thread is still blocked on the TSO we locked
-            and (d) the target thread has not been relocated.
-
-         We synchronise with threadStackOverflow() (which relocates
-         threads) using lockClosure()/unlockClosure().
-       */
-       target2 = target->block_info.tso;
-
-       info = lockClosure((StgClosure *)target2);
-       if (info != &stg_TSO_info) {
-           unlockClosure((StgClosure *)target2, info);
-           goto retry;
-       }
-       if (target->what_next == ThreadRelocated) {
-           target = target->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target2->what_next == ThreadRelocated) {
-           target->block_info.tso = target2->_link;
-           unlockTSO(target2);
-           goto retry;
-       }
-       if (target->why_blocked != BlockedOnException
-           || target->block_info.tso != target2) {
-           unlockTSO(target2);
-           goto retry;
-       }
-       
-       /* 
-          Now we have exclusive rights to the target TSO...
-
-          If it is blocking exceptions, add the source TSO to its
-          blocked_exceptions queue.  Otherwise, raise the exception.
-       */
-       if ((target->flags & TSO_BLOCKEX) &&
-           ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           lockTSO(target);
-           blockedThrowTo(cap,source,target);
-           unlockTSO(target2);
-           *out = target;
-           return THROWTO_BLOCKED;
-       } else {
-           removeThreadFromQueue(cap, &target2->blocked_exceptions, target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           unlockTSO(target2);
-           return THROWTO_SUCCESS;
-       }
-    }  
-
     case BlockedOnSTM:
        lockTSO(target);
        // Unblocking BlockedOnSTM threads requires the TSO to be
@@ -434,11 +448,16 @@ check_target:
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
-           *out = target;
+            Capability *target_cap = target->cap;
+            if (target->cap != cap) {
+                throwToSendMsg(cap,target_cap,msg);
+            } else {
+                blockedThrowTo(cap,target,msg);
+            }
+           unlockTSO(target);
            return THROWTO_BLOCKED;
        } else {
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            unblockOne(cap, target);
            unlockTSO(target);
            return THROWTO_SUCCESS;
@@ -446,19 +465,18 @@ check_target:
 
     case BlockedOnCCall:
     case BlockedOnCCall_NoUnblockExc:
-       // I don't think it's possible to acquire ownership of a
-       // BlockedOnCCall thread.  We just assume that the target
-       // thread is blocking exceptions, and block on its
-       // blocked_exception queue.
-       lockTSO(target);
-       if (target->why_blocked != BlockedOnCCall &&
-           target->why_blocked != BlockedOnCCall_NoUnblockExc) {
-           unlockTSO(target);
-            goto retry;
-       }
-       blockedThrowTo(cap,source,target);
-       *out = target;
+    {
+        Capability *target_cap;
+
+        target_cap = target->cap;
+        if (target_cap != cap) {
+            throwToSendMsg(cap, target_cap, msg);
+            return THROWTO_BLOCKED;
+        }
+
+       blockedThrowTo(cap,target,msg);
        return THROWTO_BLOCKED;
+    }
 
 #ifndef THREADEDED_RTS
     case BlockedOnRead:
@@ -469,11 +487,11 @@ check_target:
 #endif
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-           blockedThrowTo(cap,source,target);
+           blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
-           raiseAsync(cap, target, exception, rtsFalse, NULL);
+           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
            return THROWTO_SUCCESS;
        }
 #endif
@@ -484,33 +502,34 @@ check_target:
     barf("throwTo");
 }
 
-// Block a TSO on another TSO's blocked_exceptions queue.
-// Precondition: we hold an exclusive lock on the target TSO (this is
-// complex to achieve as there's no single lock on a TSO; see
-// throwTo()).
 static void
-blockedThrowTo (Capability *cap, StgTSO *source, StgTSO *target)
+throwToSendMsg (Capability *cap STG_UNUSED,
+                Capability *target_cap USED_IF_THREADS, 
+                MessageThrowTo *msg USED_IF_THREADS)
+            
 {
-    if (source != NULL) {
-        debugTrace(DEBUG_sched, "throwTo: blocking on thread %lu", (unsigned long)target->id);
-        setTSOLink(cap, source, target->blocked_exceptions);
-        target->blocked_exceptions = source;
-        dirty_TSO(cap,target); // we modified the blocked_exceptions queue
-        
-        source->block_info.tso = target;
-        write_barrier(); // throwTo_exception *must* be visible if BlockedOnException is.
-        source->why_blocked = BlockedOnException;
-    }
-}
+#ifdef THREADED_RTS
+    debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
 
+    sendMessage(target_cap, (Message*)msg);
+#endif
+}
 
-#ifdef THREADED_RTS
-void
-throwToReleaseTarget (void *tso)
+// Block a throwTo message on the target TSO's blocked_exceptions
+// queue.  The current Capability must own the target TSO in order to
+// modify the blocked_exceptions queue.
+static void
+blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
 {
-    unlockTSO((StgTSO *)tso);
+    debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
+                  (unsigned long)target->id);
+
+    ASSERT(target->cap == cap);
+
+    msg->link = (Message*)target->blocked_exceptions;
+    target->blocked_exceptions = msg;
+    dirty_TSO(cap,target); // we modified the blocked_exceptions queue
 }
-#endif
 
 /* -----------------------------------------------------------------------------
    Waking up threads blocked in throwTo
@@ -532,10 +551,11 @@ throwToReleaseTarget (void *tso)
 int
 maybePerformBlockedException (Capability *cap, StgTSO *tso)
 {
-    StgTSO *source;
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
     
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
-        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
             return 1;
         } else {
@@ -543,32 +563,30 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
         }
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE && 
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && 
         (tso->flags & TSO_BLOCKEX) != 0) {
         debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
     }
 
-    if (tso->blocked_exceptions != END_TSO_QUEUE
+    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
 
-       // Lock the TSO, this gives us exclusive access to the queue
-       lockTSO(tso);
-
-       // Check the queue again; it might have changed before we
-       // locked it.
-       if (tso->blocked_exceptions == END_TSO_QUEUE) {
-           unlockTSO(tso);
-           return 0;
-       }
-
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
-       source = tso->blocked_exceptions;
-       performBlockedException(cap, source, tso);
-       tso->blocked_exceptions = unblockOne_(cap, source, 
-                                             rtsFalse/*no migrate*/);
-       unlockTSO(tso);
+    loop:
+        msg = tso->blocked_exceptions;
+        if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
+        i = lockClosure((StgClosure*)msg);
+        tso->blocked_exceptions = (MessageThrowTo*)msg->link;
+        if (i == &stg_IND_info) {
+            unlockClosure((StgClosure*)msg,i);
+            goto loop;
+        }
+
+        performBlockedException(cap, msg);
+        unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        unlockClosure((StgClosure*)msg,&stg_IND_info);
         return 1;
     }
     return 0;
@@ -580,25 +598,34 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
 void
 awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
 {
-    lockTSO(tso);
-    awakenBlockedQueue(cap, tso->blocked_exceptions);
-    tso->blocked_exceptions = END_TSO_QUEUE;
-    unlockTSO(tso);
+    MessageThrowTo *msg;
+    const StgInfoTable *i;
+
+    for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
+         msg = (MessageThrowTo*)msg->link) {
+        i = lockClosure((StgClosure *)msg);
+        if (i != &stg_IND_info) {
+            unblockOne_(cap, msg->source, rtsFalse/*no migrate*/);
+        }
+        unlockClosure((StgClosure *)msg,i);
+    }
+    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
 }    
 
 static void
-performBlockedException (Capability *cap, StgTSO *source, StgTSO *target)
+performBlockedException (Capability *cap, MessageThrowTo *msg)
 {
-    StgClosure *exception;
+    StgTSO *source;
+
+    source = msg->source;
 
-    ASSERT(source->why_blocked == BlockedOnException);
-    ASSERT(source->block_info.tso->id == target->id);
+    ASSERT(source->why_blocked == BlockedOnMsgThrowTo);
+    ASSERT(source->block_info.closure == (StgClosure *)msg);
     ASSERT(source->sp[0] == (StgWord)&stg_block_throwto_info);
-    ASSERT(((StgTSO *)source->sp[1])->id == target->id);
+    ASSERT(((StgTSO *)source->sp[1])->id == msg->target->id);
     // check ids not pointers, because the thread might be relocated
 
-    exception = (StgClosure *)source->sp[2];
-    throwToSingleThreaded(cap, target, exception);
+    throwToSingleThreaded(cap, msg->target, msg->exception);
     source->sp += 3;
 }
 
@@ -637,22 +664,25 @@ removeFromQueues(Capability *cap, StgTSO *tso)
       removeThreadFromQueue(cap, &blackhole_queue, tso);
       goto done;
 
-  case BlockedOnException:
-    {
-      StgTSO *target  = tso->block_info.tso;
-
-      // NO: when called by threadPaused(), we probably have this
-      // TSO already locked (WHITEHOLEd) because we just placed
-      // ourselves on its queue.
-      // ASSERT(get_itbl(target)->type == TSO);
-
-      while (target->what_next == ThreadRelocated) {
-         target = target->_link;
-      }
-      
-      removeThreadFromQueue(cap, &target->blocked_exceptions, tso);
-      goto done;
-    }
+  case BlockedOnMsgWakeup:
+  {
+      // kill the message, atomically:
+      tso->block_info.wakeup->header.info = &stg_IND_info;
+      break;
+  }
+
+  case BlockedOnMsgThrowTo:
+  {
+      MessageThrowTo *m = tso->block_info.throwto;
+      // The message is locked by us, unless we got here via
+      // deleteAllThreads(), in which case we own all the
+      // capabilities.
+      // ASSERT(m->header.info == &stg_WHITEHOLE_info);
+
+      // unlock and revoke it at the same time
+      unlockClosure((StgClosure*)m,&stg_IND_info);
+      break;
+  }
 
 #if !defined(THREADED_RTS)
   case BlockedOnRead:
@@ -743,6 +773,10 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     }
 #endif
 
+    while (tso->what_next == ThreadRelocated) {
+        tso = tso->_link;
+    }
+
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
index 96eb96e..5137d41 100644 (file)
@@ -29,16 +29,13 @@ void suspendComputation (Capability *cap,
                         StgTSO *tso, 
                         StgUpdateFrame *stop_here);
 
-nat throwTo (Capability *cap,           // the Capability we hold 
-            StgTSO *source,             // the TSO sending the exception
-            StgTSO *target,             // the TSO receiving the exception
-            StgClosure *exception,      // the exception closure
-            /*[out]*/ void **out   // pass to throwToReleaseTarget()
-    );
+MessageThrowTo *throwTo (Capability *cap,      // the Capability we hold 
+                         StgTSO *source,
+                         StgTSO *target,
+                         StgClosure *exception); // the exception closure
 
-#ifdef THREADED_RTS
-void throwToReleaseTarget (void *tso);
-#endif
+nat throwToMsg (Capability *cap,
+                MessageThrowTo *msg);
 
 int  maybePerformBlockedException (Capability *cap, StgTSO *tso);
 void awakenBlockedExceptionQueue  (Capability *cap, StgTSO *tso);
@@ -52,7 +49,7 @@ interruptible(StgTSO *t)
 {
   switch (t->why_blocked) {
   case BlockedOnMVar:
-  case BlockedOnException:
+  case BlockedOnMsgThrowTo:
   case BlockedOnRead:
   case BlockedOnWrite:
 #if defined(mingw32_HOST_OS)
index 4fca19c..b7bc909 100644 (file)
@@ -509,7 +509,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
 
        // layout.payload.ptrs, no SRT
     case CONSTR:
-    case STABLE_NAME:
+    case PRIM:
     case BCO:
     case CONSTR_STATIC:
        init_ptrs(&se.info, get_itbl(c)->layout.payload.ptrs,
@@ -883,7 +883,7 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
        }
 
        case CONSTR:
-       case STABLE_NAME:
+       case PRIM:
        case BCO:
        case CONSTR_STATIC:
            // StgMutArrPtr.ptrs, no SRT
@@ -1108,7 +1108,7 @@ isRetainer( StgClosure *c )
     case CONSTR_STATIC:
     case FUN_STATIC:
        // misc
-    case STABLE_NAME:
+    case PRIM:
     case BCO:
     case ARR_WORDS:
        // STM
index e2a30a6..6e75abc 100644 (file)
@@ -9,6 +9,8 @@
 #include "PosixSource.h"
 #include "Rts.h"
 
+#include "eventlog/EventLog.h"
+
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
@@ -161,6 +163,10 @@ rtsFatalInternalErrorFn(const char *s, va_list ap)
      fflush(stderr);
   }
 
+#ifdef TRACING
+  if (RtsFlags.TraceFlags.tracing == TRACE_EVENTLOG) endEventLogging();
+#endif
+
   abort();
   // stg_exit(EXIT_INTERNAL_ERROR);
 }
index ed5a722..be61538 100644 (file)
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -352,8 +352,7 @@ static StgBool watcher_is_tso(StgTVarWatchQueue *q) {
 
 static StgBool watcher_is_invariant(StgTVarWatchQueue *q) {
   StgClosure *c = q -> closure;
-  StgInfoTable *info = get_itbl(c);
-  return (info -> type) == ATOMIC_INVARIANT;
+  return (c->header.info == &stg_ATOMIC_INVARIANT_info);
 }
 
 /*......................................................................*/
index 4cca469..70e0246 100644 (file)
@@ -139,7 +139,7 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
+static void scheduleProcessInbox(Capability *cap);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 static void schedulePushWork(Capability *cap, Task *task);
@@ -618,7 +618,7 @@ scheduleFindWork (Capability *cap)
     // list each time around the scheduler.
     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
 
-    scheduleCheckWakeupThreads(cap);
+    scheduleProcessInbox(cap);
 
     scheduleCheckBlockedThreads(cap);
 
@@ -673,7 +673,7 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
     if (!force_yield &&
         !shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
-         !emptyWakeupQueue(cap) ||
+         !emptyInbox(cap) ||
          blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
@@ -725,7 +725,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
        cap0 = &capabilities[i];
        if (cap != cap0 && tryGrabCapability(cap0,task)) {
-           if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+           if (!emptyRunQueue(cap0)
+                || cap->returning_tasks_hd != NULL
+                || cap->inbox != (Message*)END_TSO_QUEUE) {
                // it already has some work, we just grabbed it at 
                // the wrong moment.  Or maybe it's deadlocked!
                releaseCapability(cap0);
@@ -871,23 +873,89 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
  * Check for threads woken up by other Capabilities
  * ------------------------------------------------------------------------- */
 
+#if defined(THREADED_RTS)
+static void
+executeMessage (Capability *cap, Message *m)
+{
+    const StgInfoTable *i;
+
+loop:
+    write_barrier(); // allow m->header to be modified by another thread
+    i = m->header.info;
+    if (i == &stg_MSG_WAKEUP_info)
+    {
+        MessageWakeup *w = (MessageWakeup *)m;
+        StgTSO *tso = w->tso;
+        debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
+                      (lnat)tso->id);
+        ASSERT(tso->cap == cap);
+        ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+        ASSERT(tso->block_info.closure == (StgClosure *)m);
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap, tso);
+    }
+    else if (i == &stg_MSG_THROWTO_info)
+    {
+        MessageThrowTo *t = (MessageThrowTo *)m;
+        nat r;
+        const StgInfoTable *i;
+
+        i = lockClosure((StgClosure*)m);
+        if (i != &stg_MSG_THROWTO_info) {
+            unlockClosure((StgClosure*)m, i);
+            goto loop;
+        }
+
+        debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
+                      (lnat)t->source->id, (lnat)t->target->id);
+
+        ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+        ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+        r = throwToMsg(cap, t);
+
+        switch (r) {
+        case THROWTO_SUCCESS:
+            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+            t->source->sp += 3;
+            unblockOne(cap, t->source);
+            // this message is done
+            unlockClosure((StgClosure*)m, &stg_IND_info);
+            break;
+        case THROWTO_BLOCKED:
+            // unlock the message
+            unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+            break;
+        }
+    }
+    else if (i == &stg_IND_info)
+    {
+        // message was revoked
+        return;
+    }
+    else if (i == &stg_WHITEHOLE_info)
+    {
+        goto loop;
+    }
+    else
+    {
+        barf("executeMessage: %p", i);
+    }
+}
+#endif
+
 static void
-scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
 {
 #if defined(THREADED_RTS)
-    // Any threads that were woken up by other Capabilities get
-    // appended to our run queue.
-    if (!emptyWakeupQueue(cap)) {
-       ACQUIRE_LOCK(&cap->lock);
-       if (emptyRunQueue(cap)) {
-           cap->run_queue_hd = cap->wakeup_queue_hd;
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       } else {
-           setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
-           cap->run_queue_tl = cap->wakeup_queue_tl;
-       }
-       cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
-       RELEASE_LOCK(&cap->lock);
+    Message *m;
+
+    while (!emptyInbox(cap)) {
+        ACQUIRE_LOCK(&cap->lock);
+        m = cap->inbox;
+        cap->inbox = m->link;
+        RELEASE_LOCK(&cap->lock);
+        executeMessage(cap, (Message *)m);
     }
 #endif
 }
@@ -983,7 +1051,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
            switch (task->incall->tso->why_blocked) {
            case BlockedOnSTM:
            case BlockedOnBlackHole:
-           case BlockedOnException:
+           case BlockedOnMsgThrowTo:
            case BlockedOnMVar:
                throwToSingleThreaded(cap, task->incall->tso, 
                                      (StgClosure *)nonTermination_closure);
@@ -1268,9 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
      */
 
     // blocked exceptions can now complete, even if the thread was in
-    // blocked mode (see #2910).  This unconditionally calls
-    // lockTSO(), which ensures that we don't miss any threads that
-    // are engaged in throwTo() with this thread as a target.
+    // blocked mode (see #2910).
     awakenBlockedExceptionQueue (cap, t);
 
       //
@@ -1884,7 +1950,7 @@ resumeThread (void *task_)
     
     if (tso->why_blocked == BlockedOnCCall) {
         // avoid locking the TSO if we don't have to
-        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
             awakenBlockedExceptionQueue(cap,tso);
         }
        tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
@@ -2187,10 +2253,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
 
   IF_DEBUG(sanity,checkTSO(tso));
 
-  // don't allow throwTo() to modify the blocked_exceptions queue
-  // while we are moving the TSO:
-  lockClosure((StgClosure *)tso);
-
   if (tso->stack_size >= tso->max_stack_size
       && !(tso->flags & TSO_BLOCKEX)) {
       // NB. never raise a StackOverflow exception if the thread is
@@ -2201,7 +2263,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
       //
 
       if (tso->flags & TSO_SQUEEZED) {
-          unlockTSO(tso);
           return tso;
       }
       // #3677: In a stack overflow situation, stack squeezing may
@@ -2223,7 +2284,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
                                                tso->sp+64)));
 
       // Send this thread the StackOverflow exception
-      unlockTSO(tso);
       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
       return tso;
   }
@@ -2239,7 +2299,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   // the stack anyway.
   if ((tso->flags & TSO_SQUEEZED) && 
       ((W_)(tso->sp - tso->stack) >= BLOCK_SIZE_W)) {
-      unlockTSO(tso);
       return tso;
   }
 
@@ -2289,9 +2348,6 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   tso->sp = (P_)&(tso->stack[tso->stack_size]);
   tso->why_blocked = NotBlocked;
 
-  unlockTSO(dest);
-  unlockTSO(tso);
-
   IF_DEBUG(sanity,checkTSO(dest));
 #if 0
   IF_DEBUG(scheduler,printTSO(dest));
@@ -2324,10 +2380,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
         return tso;
     }
 
-    // don't allow throwTo() to modify the blocked_exceptions queue
-    // while we are moving the TSO:
-    lockClosure((StgClosure *)tso);
-
     // this is the number of words we'll free
     free_w = round_to_mblocks(tso_size_w/2);
 
@@ -2358,9 +2410,6 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
         task->incall->tso = new_tso;
     }
 
-    unlockTSO(new_tso);
-    unlockTSO(tso);
-
     IF_DEBUG(sanity,checkTSO(new_tso));
 
     return new_tso;
@@ -2691,61 +2740,9 @@ resurrectThreads (StgTSO *threads)
             * can wake up threads, remember...).
             */
            continue;
-       case BlockedOnException:
-            // throwTo should never block indefinitely: if the target
-            // thread dies or completes, throwTo returns.
-           barf("resurrectThreads: thread BlockedOnException");
-            break;
        default:
-           barf("resurrectThreads: thread blocked in a strange way");
+           barf("resurrectThreads: thread blocked in a strange way: %d",
+                 tso->why_blocked);
        }
     }
 }
-
-/* -----------------------------------------------------------------------------
-   performPendingThrowTos is called after garbage collection, and
-   passed a list of threads that were found to have pending throwTos
-   (tso->blocked_exceptions was not empty), and were blocked.
-   Normally this doesn't happen, because we would deliver the
-   exception directly if the target thread is blocked, but there are
-   small windows where it might occur on a multiprocessor (see
-   throwTo()).
-
-   NB. we must be holding all the capabilities at this point, just
-   like resurrectThreads().
-   -------------------------------------------------------------------------- */
-
-void
-performPendingThrowTos (StgTSO *threads)
-{
-    StgTSO *tso, *next;
-    Capability *cap;
-    Task *task, *saved_task;;
-    generation *gen;
-
-    task = myTask();
-    cap = task->cap;
-
-    for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
-       next = tso->global_link;
-
-        gen = Bdescr((P_)tso)->gen;
-       tso->global_link = gen->threads;
-       gen->threads = tso;
-
-       debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
-       
-        // We must pretend this Capability belongs to the current Task
-        // for the time being, as invariants will be broken otherwise.
-        // In fact the current Task has exclusive access to the systme
-        // at this point, so this is just bookkeeping:
-       task->cap = tso->cap;
-        saved_task = tso->cap->running_task;
-        tso->cap->running_task = task;
-        maybePerformBlockedException(tso->cap, tso);
-        tso->cap->running_task = saved_task;
-    }
-
-    // Restore our original Capability:
-    task->cap = cap;
-}
index af322d8..76138b6 100644 (file)
@@ -105,7 +105,6 @@ extern Mutex sched_mutex;
 void interruptStgRts (void);
 
 void resurrectThreads (StgTSO *);
-void performPendingThrowTos (StgTSO *);
 
 /* -----------------------------------------------------------------------------
  * Some convenient macros/inline functions...
@@ -179,25 +178,6 @@ appendToBlockedQueue(StgTSO *tso)
 }
 #endif
 
-#if defined(THREADED_RTS)
-// Assumes: my_cap is owned by the current Task.  We hold
-// other_cap->lock, but we do not necessarily own other_cap; another
-// Task may be running on it.
-INLINE_HEADER void
-appendToWakeupQueue (Capability *my_cap, Capability *other_cap, StgTSO *tso)
-{
-    ASSERT(tso->_link == END_TSO_QUEUE);
-    if (other_cap->wakeup_queue_hd == END_TSO_QUEUE) {
-       other_cap->wakeup_queue_hd = tso;
-    } else {
-        // my_cap is passed to setTSOLink() because it may need to
-        // write to the mutable list.
-       setTSOLink(my_cap, other_cap->wakeup_queue_tl, tso);
-    }
-    other_cap->wakeup_queue_tl = tso;
-}
-#endif
-
 /* Check whether various thread queues are empty
  */
 INLINE_HEADER rtsBool
@@ -212,14 +192,6 @@ emptyRunQueue(Capability *cap)
     return emptyQueue(cap->run_queue_hd);
 }
 
-#if defined(THREADED_RTS)
-INLINE_HEADER rtsBool
-emptyWakeupQueue(Capability *cap)
-{
-    return emptyQueue(cap->wakeup_queue_hd);
-}
-#endif
-
 #if !defined(THREADED_RTS)
 #define EMPTY_BLOCKED_QUEUE()  (emptyQueue(blocked_queue_hd))
 #define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue))
index 8fd96c1..f111875 100644 (file)
@@ -419,7 +419,7 @@ CLOSURE(stg_NO_FINALIZER_closure,stg_NO_FINALIZER);
    Stable Names are unlifted too.
    ------------------------------------------------------------------------- */
 
-INFO_TABLE(stg_STABLE_NAME,0,1,STABLE_NAME,"STABLE_NAME","STABLE_NAME")
+INFO_TABLE(stg_STABLE_NAME,0,1,PRIM,"STABLE_NAME","STABLE_NAME")
 { foreign "C" barf("STABLE_NAME object entered!") never returns; }
 
 /* ----------------------------------------------------------------------------
@@ -439,22 +439,22 @@ INFO_TABLE(stg_MVAR_DIRTY,3,0,MVAR_DIRTY,"MVAR","MVAR")
    STM
    -------------------------------------------------------------------------- */
 
-INFO_TABLE(stg_TVAR, 0, 0, TVAR, "TVAR", "TVAR")
+INFO_TABLE(stg_TVAR, 2, 1, MUT_PRIM, "TVAR", "TVAR")
 { foreign "C" barf("TVAR object entered!") never returns; }
 
-INFO_TABLE(stg_TVAR_WATCH_QUEUE, 0, 0, TVAR_WATCH_QUEUE, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE")
+INFO_TABLE(stg_TVAR_WATCH_QUEUE, 3, 0, MUT_PRIM, "TVAR_WATCH_QUEUE", "TVAR_WATCH_QUEUE")
 { foreign "C" barf("TVAR_WATCH_QUEUE object entered!") never returns; }
 
-INFO_TABLE(stg_ATOMIC_INVARIANT, 0, 0, ATOMIC_INVARIANT, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT")
+INFO_TABLE(stg_ATOMIC_INVARIANT, 2, 1, MUT_PRIM, "ATOMIC_INVARIANT", "ATOMIC_INVARIANT")
 { foreign "C" barf("ATOMIC_INVARIANT object entered!") never returns; }
 
-INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 0, 0, INVARIANT_CHECK_QUEUE, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE")
+INFO_TABLE(stg_INVARIANT_CHECK_QUEUE, 3, 0, MUT_PRIM, "INVARIANT_CHECK_QUEUE", "INVARIANT_CHECK_QUEUE")
 { foreign "C" barf("INVARIANT_CHECK_QUEUE object entered!") never returns; }
 
 INFO_TABLE(stg_TREC_CHUNK, 0, 0, TREC_CHUNK, "TREC_CHUNK", "TREC_CHUNK")
 { foreign "C" barf("TREC_CHUNK object entered!") never returns; }
 
-INFO_TABLE(stg_TREC_HEADER, 0, 0, TREC_HEADER, "TREC_HEADER", "TREC_HEADER")
+INFO_TABLE(stg_TREC_HEADER, 3, 1, MUT_PRIM, "TREC_HEADER", "TREC_HEADER")
 { foreign "C" barf("TREC_HEADER object entered!") never returns; }
 
 INFO_TABLE_CONSTR(stg_END_STM_WATCH_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_STM_WATCH_QUEUE","END_STM_WATCH_QUEUE")
@@ -478,6 +478,17 @@ CLOSURE(stg_END_STM_CHUNK_LIST_closure,stg_END_STM_CHUNK_LIST);
 CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
 
 /* ----------------------------------------------------------------------------
+   Messages
+   ------------------------------------------------------------------------- */
+
+// PRIM rather than CONSTR, because PRIM objects cannot be duplicated by the GC.
+INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
+{ foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
+
+INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
+{ foreign "C" barf("MSG_THROWTO object entered!") never returns; }
+
+/* ----------------------------------------------------------------------------
    END_TSO_QUEUE
 
    This is a static nullary constructor (like []) that we use to mark the
@@ -490,18 +501,6 @@ INFO_TABLE_CONSTR(stg_END_TSO_QUEUE,0,0,0,CONSTR_NOCAF_STATIC,"END_TSO_QUEUE","E
 CLOSURE(stg_END_TSO_QUEUE_closure,stg_END_TSO_QUEUE);
 
 /* ----------------------------------------------------------------------------
-   Exception lists
-   ------------------------------------------------------------------------- */
-
-INFO_TABLE_CONSTR(stg_END_EXCEPTION_LIST,0,0,0,CONSTR_NOCAF_STATIC,"END_EXCEPTION_LIST","END_EXCEPTION_LIST")
-{ foreign "C" barf("END_EXCEPTION_LIST object entered!") never returns; }
-
-CLOSURE(stg_END_EXCEPTION_LIST_closure,stg_END_EXCEPTION_LIST);
-
-INFO_TABLE(stg_EXCEPTION_CONS,1,1,CONSTR,"EXCEPTION_CONS","EXCEPTION_CONS")
-{ foreign "C" barf("EXCEPTION_CONS object entered!") never returns; }
-
-/* ----------------------------------------------------------------------------
    Arrays
 
    These come in two basic flavours: arrays of data (StgArrWords) and arrays of
index 08b7aab..f824d02 100644 (file)
@@ -74,7 +74,7 @@ createThread(Capability *cap, nat size)
     tso->what_next = ThreadRunGHC;
 
     tso->why_blocked  = NotBlocked;
-    tso->blocked_exceptions = END_TSO_QUEUE;
+    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
     tso->flags = 0;
     tso->dirty = 1;
     
@@ -218,8 +218,9 @@ unblockOne_ (Capability *cap, StgTSO *tso,
 
   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
+  ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+         tso->block_info.closure->header.info == &stg_IND_info);
 
-  tso->why_blocked = NotBlocked;
   next = tso->_link;
   tso->_link = END_TSO_QUEUE;
 
@@ -235,6 +236,8 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       }
 
       tso->cap = cap;
+      write_barrier();
+      tso->why_blocked = NotBlocked;
       appendToRunQueue(cap,tso);
 
       // context-switch soonish so we can migrate the new thread if
@@ -246,6 +249,7 @@ unblockOne_ (Capability *cap, StgTSO *tso,
       wakeupThreadOnCapability(cap, tso->cap, tso);
   }
 #else
+  tso->why_blocked = NotBlocked;
   appendToRunQueue(cap,tso);
 
   // context-switch soonish so we can migrate the new thread if
@@ -327,13 +331,15 @@ printThreadBlockage(StgTSO *tso)
   case BlockedOnMVar:
     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
     break;
-  case BlockedOnException:
-    debugBelch("is blocked on delivering an exception to thread %lu",
-              (unsigned long)tso->block_info.tso->id);
-    break;
   case BlockedOnBlackHole:
     debugBelch("is blocked on a black hole");
     break;
+  case BlockedOnMsgWakeup:
+    debugBelch("is blocked on a wakeup message");
+    break;
+  case BlockedOnMsgThrowTo:
+    debugBelch("is blocked on a throwto message");
+    break;
   case NotBlocked:
     debugBelch("is not blocked");
     break;
index 8e0ee26..dfe879e 100644 (file)
@@ -11,6 +11,8 @@
 
 BEGIN_RTS_PRIVATE
 
+#define END_BLOCKED_EXCEPTIONS_QUEUE ((MessageThrowTo*)END_TSO_QUEUE)
+
 StgTSO * unblockOne (Capability *cap, StgTSO *tso);
 StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
 
index f8b6ad4..69ea3d3 100644 (file)
@@ -135,6 +135,15 @@ void traceUserMsg(Capability *cap, char *msg);
 #define debugTrace(class, str, ...) /* nothing */
 #endif
 
+#ifdef DEBUG
+#define debugTraceCap(class, cap, msg, ...)      \
+    if (RTS_UNLIKELY(class)) {                  \
+        traceCap_(cap, msg, ##__VA_ARGS__);     \
+    }
+#else
+#define debugTraceCap(class, cap, str, ...) /* nothing */
+#endif
+
 /* 
  * Emit a message/event describing the state of a thread
  */
@@ -152,6 +161,7 @@ void traceThreadStatus_ (StgTSO *tso);
 #define traceCap(class, cap, msg, ...) /* nothing */
 #define trace(class, msg, ...) /* nothing */
 #define debugTrace(class, str, ...) /* nothing */
+#define debugTraceCap(class, cap, str, ...) /* nothing */
 #define traceThreadStatus(class, tso) /* nothing */
 
 #endif /* TRACING */
index fd87820..6ebf33d 100644 (file)
@@ -59,7 +59,7 @@ INLINE_HEADER void postMsg (char *msg STG_UNUSED,
                             va_list ap STG_UNUSED)
 { /* nothing */ }
 
-INLINE_HEADER void postCapMsg (Capability *cap,
+INLINE_HEADER void postCapMsg (Capability *cap STG_UNUSED,
                                char *msg STG_UNUSED, 
                                va_list ap STG_UNUSED)
 { /* nothing */ }
index e55ae2b..39284f9 100644 (file)
@@ -471,7 +471,8 @@ thread_TSO (StgTSO *tso)
 
     if (   tso->why_blocked == BlockedOnMVar
        || tso->why_blocked == BlockedOnBlackHole
-       || tso->why_blocked == BlockedOnException
+       || tso->why_blocked == BlockedOnMsgThrowTo
+       || tso->why_blocked == BlockedOnMsgWakeup
        ) {
        thread_(&tso->block_info.closure);
     }
@@ -622,7 +623,8 @@ thread_obj (StgInfoTable *info, StgPtr p)
 
     case FUN:
     case CONSTR:
-    case STABLE_NAME:
+    case PRIM:
+    case MUT_PRIM:
     case IND_PERM:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
@@ -705,32 +707,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
     case TSO:
        return thread_TSO((StgTSO *)p);
     
-    case TVAR_WATCH_QUEUE:
-    {
-        StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
-       thread_(&wq->closure);
-       thread_(&wq->next_queue_entry);
-       thread_(&wq->prev_queue_entry);
-       return p + sizeofW(StgTVarWatchQueue);
-    }
-    
-    case TVAR:
-    {
-        StgTVar *tvar = (StgTVar *)p;
-       thread((void *)&tvar->current_value);
-       thread((void *)&tvar->first_watch_queue_entry);
-       return p + sizeofW(StgTVar);
-    }
-    
-    case TREC_HEADER:
-    {
-        StgTRecHeader *trec = (StgTRecHeader *)p;
-       thread_(&trec->enclosing_trec);
-       thread_(&trec->current_chunk);
-       thread_(&trec->invariants_to_check);
-       return p + sizeofW(StgTRecHeader);
-    }
-
     case TREC_CHUNK:
     {
         StgWord i;
@@ -745,23 +721,6 @@ thread_obj (StgInfoTable *info, StgPtr p)
        return p + sizeofW(StgTRecChunk);
     }
 
-    case ATOMIC_INVARIANT:
-    {
-        StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
-       thread_(&invariant->code);
-       thread_(&invariant->last_execution);
-       return p + sizeofW(StgAtomicInvariant);
-    }
-
-    case INVARIANT_CHECK_QUEUE:
-    {
-        StgInvariantCheckQueue *queue = (StgInvariantCheckQueue *)p;
-       thread_(&queue->invariant);
-       thread_(&queue->my_execution);
-       thread_(&queue->next_queue_entry);
-       return p + sizeofW(StgInvariantCheckQueue);
-    }
-
     default:
        barf("update_fwd: unknown/strange object  %d", (int)(info->type));
        return NULL;
index 76026b0..21017a6 100644 (file)
@@ -626,7 +626,8 @@ loop:
       return;
 
   case WEAK:
-  case STABLE_NAME:
+  case PRIM:
+  case MUT_PRIM:
       copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
       return;
 
@@ -721,30 +722,10 @@ loop:
       }
     }
 
-  case TREC_HEADER: 
-      copy(p,info,q,sizeofW(StgTRecHeader),gen);
-      return;
-
-  case TVAR_WATCH_QUEUE:
-      copy(p,info,q,sizeofW(StgTVarWatchQueue),gen);
-      return;
-
-  case TVAR:
-      copy(p,info,q,sizeofW(StgTVar),gen);
-      return;
-    
   case TREC_CHUNK:
       copy(p,info,q,sizeofW(StgTRecChunk),gen);
       return;
 
-  case ATOMIC_INVARIANT:
-      copy(p,info,q,sizeofW(StgAtomicInvariant),gen);
-      return;
-
-  case INVARIANT_CHECK_QUEUE:
-      copy(p,info,q,sizeofW(StgInvariantCheckQueue),gen);
-      return;
-
   default:
     barf("evacuate: strange closure type %d", (int)(INFO_PTR_TO_STRUCT(info)->type));
   }
index 2eabdab..ae6fc99 100644 (file)
@@ -732,7 +732,6 @@ SET_GCT(gc_threads[0]);
   // send exceptions to any threads which were about to die 
   RELEASE_SM_LOCK;
   resurrectThreads(resurrected_threads);
-  performPendingThrowTos(exception_threads);
   ACQUIRE_SM_LOCK;
 
   // Update the stable pointer hash table.
index 7b7187c..9df39b9 100644 (file)
@@ -22,6 +22,7 @@
 #include "Schedule.h"
 #include "Weak.h"
 #include "Storage.h"
+#include "Threads.h"
 
 /* -----------------------------------------------------------------------------
    Weak Pointers
@@ -80,9 +81,6 @@ StgWeak *old_weak_ptr_list; // also pending finaliser list
 // List of threads found to be unreachable
 StgTSO *resurrected_threads;
 
-// List of blocked threads found to have pending throwTos
-StgTSO *exception_threads;
-
 static void resurrectUnreachableThreads (generation *gen);
 static rtsBool tidyThreadList (generation *gen);
 
@@ -93,7 +91,6 @@ initWeakForGC(void)
     weak_ptr_list = NULL;
     weak_stage = WeakPtrs;
     resurrected_threads = END_TSO_QUEUE;
-    exception_threads = END_TSO_QUEUE;
 }
 
 rtsBool 
@@ -286,35 +283,11 @@ static rtsBool tidyThreadList (generation *gen)
         
         next = t->global_link;
         
-        // This is a good place to check for blocked
-        // exceptions.  It might be the case that a thread is
-        // blocked on delivering an exception to a thread that
-        // is also blocked - we try to ensure that this
-        // doesn't happen in throwTo(), but it's too hard (or
-        // impossible) to close all the race holes, so we
-        // accept that some might get through and deal with
-        // them here.  A GC will always happen at some point,
-        // even if the system is otherwise deadlocked.
-        //
-        // If an unreachable thread has blocked
-        // exceptions, we really want to perform the
-        // blocked exceptions rather than throwing
-        // BlockedIndefinitely exceptions.  This is the
-        // only place we can discover such threads.
-        // The target thread might even be
-        // ThreadFinished or ThreadKilled.  Bugs here
-        // will only be seen when running on a
-        // multiprocessor.
-        if (t->blocked_exceptions != END_TSO_QUEUE) {
-            if (tmp == NULL) {
-                evacuate((StgClosure **)&t);
-                flag = rtsTrue;
-            }
-            t->global_link = exception_threads;
-            exception_threads = t;
-            *prev = next;
-            continue;
-        }
+        // if the thread is not masking exceptions but there are
+        // pending exceptions on its queue, then something has gone
+        // wrong:
+        ASSERT(t->blocked_exceptions == END_BLOCKED_EXCEPTIONS_QUEUE
+               || (t->flags & TSO_BLOCKEX));
         
         if (tmp == NULL) {
             // not alive (yet): leave this thread on the
index 442fee1..11d5424 100644 (file)
@@ -307,7 +307,8 @@ checkClosure( StgClosure* p )
     case IND_OLDGEN_PERM:
     case BLACKHOLE:
     case CAF_BLACKHOLE:
-    case STABLE_NAME:
+    case PRIM:
+    case MUT_PRIM:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
     case CONSTR_STATIC:
@@ -416,39 +417,6 @@ checkClosure( StgClosure* p )
         checkTSO((StgTSO *)p);
         return tso_sizeW((StgTSO *)p);
 
-    case TVAR_WATCH_QUEUE:
-      {
-        StgTVarWatchQueue *wq = (StgTVarWatchQueue *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->next_queue_entry));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(wq->prev_queue_entry));
-        return sizeofW(StgTVarWatchQueue);
-      }
-
-    case INVARIANT_CHECK_QUEUE:
-      {
-        StgInvariantCheckQueue *q = (StgInvariantCheckQueue *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->invariant));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->my_execution));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(q->next_queue_entry));
-        return sizeofW(StgInvariantCheckQueue);
-      }
-
-    case ATOMIC_INVARIANT:
-      {
-        StgAtomicInvariant *invariant = (StgAtomicInvariant *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->code));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(invariant->last_execution));
-        return sizeofW(StgAtomicInvariant);
-      }
-
-    case TVAR:
-      {
-        StgTVar *tv = (StgTVar *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->current_value));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(tv->first_watch_queue_entry));
-        return sizeofW(StgTVar);
-      }
-
     case TREC_CHUNK:
       {
         nat i;
@@ -461,14 +429,6 @@ checkClosure( StgClosure* p )
         }
         return sizeofW(StgTRecChunk);
       }
-
-    case TREC_HEADER:
-      {
-        StgTRecHeader *trec = (StgTRecHeader *)p;
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> enclosing_trec));
-        ASSERT(LOOKS_LIKE_CLOSURE_PTR(trec -> current_chunk));
-        return sizeofW(StgTRecHeader);
-      }
       
     default:
            barf("checkClosure (closure type %d)", info->type);
index 466b9b4..1b671a0 100644 (file)
@@ -82,7 +82,8 @@ scavengeTSO (StgTSO *tso)
 
     if (   tso->why_blocked == BlockedOnMVar
        || tso->why_blocked == BlockedOnBlackHole
-       || tso->why_blocked == BlockedOnException
+       || tso->why_blocked == BlockedOnMsgWakeup
+       || tso->why_blocked == BlockedOnMsgThrowTo
        ) {
        evacuate(&tso->block_info.closure);
     }
@@ -394,7 +395,6 @@ scavenge_block (bdescr *bd)
 {
   StgPtr p, q;
   StgInfoTable *info;
-  generation *saved_evac_gen;
   rtsBool saved_eager_promotion;
   gen_workspace *ws;
 
@@ -403,7 +403,6 @@ scavenge_block (bdescr *bd)
 
   gct->scan_bd = bd;
   gct->evac_gen = bd->gen;
-  saved_evac_gen = gct->evac_gen;
   saved_eager_promotion = gct->eager_promotion;
   gct->failed_to_evac = rtsFalse;
 
@@ -532,7 +531,7 @@ scavenge_block (bdescr *bd)
     gen_obj:
     case CONSTR:
     case WEAK:
-    case STABLE_NAME:
+    case PRIM:
     {
        StgPtr end;
 
@@ -672,42 +671,21 @@ scavenge_block (bdescr *bd)
        break;
     }
 
-    case TVAR_WATCH_QUEUE:
+    case MUT_PRIM:
       {
-       StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
-       gct->evac_gen = 0;
-       evacuate((StgClosure **)&wq->closure);
-       evacuate((StgClosure **)&wq->next_queue_entry);
-       evacuate((StgClosure **)&wq->prev_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgTVarWatchQueue);
-       break;
-      }
+       StgPtr end;
 
-    case TVAR:
-      {
-       StgTVar *tvar = ((StgTVar *) p);
-       gct->evac_gen = 0;
-       evacuate((StgClosure **)&tvar->current_value);
-       evacuate((StgClosure **)&tvar->first_watch_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgTVar);
-       break;
-      }
+       gct->eager_promotion = rtsFalse;
 
-    case TREC_HEADER:
-      {
-        StgTRecHeader *trec = ((StgTRecHeader *) p);
-        gct->evac_gen = 0;
-       evacuate((StgClosure **)&trec->enclosing_trec);
-       evacuate((StgClosure **)&trec->current_chunk);
-       evacuate((StgClosure **)&trec->invariants_to_check);
-       gct->evac_gen = saved_evac_gen;
+       end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+       for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+           evacuate((StgClosure **)p);
+       }
+       p += info->layout.payload.nptrs;
+
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgTRecHeader);
-        break;
+       break;
       }
 
     case TREC_CHUNK:
@@ -715,44 +693,19 @@ scavenge_block (bdescr *bd)
        StgWord i;
        StgTRecChunk *tc = ((StgTRecChunk *) p);
        TRecEntry *e = &(tc -> entries[0]);
-       gct->evac_gen = 0;
+       gct->eager_promotion = rtsFalse;
        evacuate((StgClosure **)&tc->prev_chunk);
        for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
          evacuate((StgClosure **)&e->tvar);
          evacuate((StgClosure **)&e->expected_value);
          evacuate((StgClosure **)&e->new_value);
        }
-       gct->evac_gen = saved_evac_gen;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
        p += sizeofW(StgTRecChunk);
        break;
       }
 
-    case ATOMIC_INVARIANT:
-      {
-        StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
-        gct->evac_gen = 0;
-       evacuate(&invariant->code);
-       evacuate((StgClosure **)&invariant->last_execution);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgAtomicInvariant);
-        break;
-      }
-
-    case INVARIANT_CHECK_QUEUE:
-      {
-        StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
-        gct->evac_gen = 0;
-       evacuate((StgClosure **)&queue->invariant);
-       evacuate((StgClosure **)&queue->my_execution);
-       evacuate((StgClosure **)&queue->next_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       p += sizeofW(StgInvariantCheckQueue);
-        break;
-      }
-
     default:
        barf("scavenge: unimplemented/strange closure type %d @ %p", 
             info->type, p);
@@ -806,10 +759,10 @@ scavenge_mark_stack(void)
 {
     StgPtr p, q;
     StgInfoTable *info;
-    generation *saved_evac_gen;
+    rtsBool saved_eager_promotion;
 
     gct->evac_gen = oldest_gen;
-    saved_evac_gen = gct->evac_gen;
+    saved_eager_promotion = gct->eager_promotion;
 
     while ((p = pop_mark_stack())) {
 
@@ -822,8 +775,6 @@ scavenge_mark_stack(void)
         case MVAR_CLEAN:
         case MVAR_DIRTY:
         { 
-            rtsBool saved_eager_promotion = gct->eager_promotion;
-            
             StgMVar *mvar = ((StgMVar *)p);
             gct->eager_promotion = rtsFalse;
             evacuate((StgClosure **)&mvar->head);
@@ -906,7 +857,7 @@ scavenge_mark_stack(void)
        gen_obj:
        case CONSTR:
        case WEAK:
-       case STABLE_NAME:
+       case PRIM:
        {
            StgPtr end;
            
@@ -938,8 +889,6 @@ scavenge_mark_stack(void)
 
        case MUT_VAR_CLEAN:
        case MUT_VAR_DIRTY: {
-           rtsBool saved_eager_promotion = gct->eager_promotion;
-           
            gct->eager_promotion = rtsFalse;
            evacuate(&((StgMutVar *)p)->var);
            gct->eager_promotion = saved_eager_promotion;
@@ -986,13 +935,10 @@ scavenge_mark_stack(void)
        case MUT_ARR_PTRS_DIRTY:
            // follow everything 
        {
-           rtsBool saved_eager;
-
            // We don't eagerly promote objects pointed to by a mutable
            // array, but if we find the array only points to objects in
            // the same or an older generation, we mark it "clean" and
            // avoid traversing it during minor GCs.
-           saved_eager = gct->eager_promotion;
            gct->eager_promotion = rtsFalse;
 
             scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
@@ -1003,7 +949,7 @@ scavenge_mark_stack(void)
                 ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
             }
 
-           gct->eager_promotion = saved_eager;
+           gct->eager_promotion = saved_eager_promotion;
            gct->failed_to_evac = rtsTrue; // mutable anyhow.
            break;
        }
@@ -1032,81 +978,39 @@ scavenge_mark_stack(void)
            break;
        }
 
-       case TVAR_WATCH_QUEUE:
-         {
-           StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
-           gct->evac_gen = 0;
-            evacuate((StgClosure **)&wq->closure);
-           evacuate((StgClosure **)&wq->next_queue_entry);
-           evacuate((StgClosure **)&wq->prev_queue_entry);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-           break;
-         }
-         
-       case TVAR:
-         {
-           StgTVar *tvar = ((StgTVar *) p);
-           gct->evac_gen = 0;
-           evacuate((StgClosure **)&tvar->current_value);
-           evacuate((StgClosure **)&tvar->first_watch_queue_entry);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-           break;
-         }
-         
+        case MUT_PRIM:
+        {
+            StgPtr end;
+            
+            gct->eager_promotion = rtsFalse;
+            
+            end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+            for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+                evacuate((StgClosure **)p);
+            }
+            
+            gct->eager_promotion = saved_eager_promotion;
+            gct->failed_to_evac = rtsTrue; // mutable
+            break;
+        }
+
        case TREC_CHUNK:
          {
            StgWord i;
            StgTRecChunk *tc = ((StgTRecChunk *) p);
            TRecEntry *e = &(tc -> entries[0]);
-           gct->evac_gen = 0;
+           gct->eager_promotion = rtsFalse;
            evacuate((StgClosure **)&tc->prev_chunk);
            for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
              evacuate((StgClosure **)&e->tvar);
              evacuate((StgClosure **)&e->expected_value);
              evacuate((StgClosure **)&e->new_value);
            }
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-           break;
-         }
-
-       case TREC_HEADER:
-         {
-           StgTRecHeader *trec = ((StgTRecHeader *) p);
-           gct->evac_gen = 0;
-           evacuate((StgClosure **)&trec->enclosing_trec);
-           evacuate((StgClosure **)&trec->current_chunk);
-           evacuate((StgClosure **)&trec->invariants_to_check);
-           gct->evac_gen = saved_evac_gen;
+           gct->eager_promotion = saved_eager_promotion;
            gct->failed_to_evac = rtsTrue; // mutable
            break;
          }
 
-        case ATOMIC_INVARIANT:
-          {
-            StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
-            gct->evac_gen = 0;
-           evacuate(&invariant->code);
-           evacuate((StgClosure **)&invariant->last_execution);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-            break;
-          }
-
-        case INVARIANT_CHECK_QUEUE:
-          {
-            StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
-            gct->evac_gen = 0;
-           evacuate((StgClosure **)&queue->invariant);
-           evacuate((StgClosure **)&queue->my_execution);
-            evacuate((StgClosure **)&queue->next_queue_entry);
-           gct->evac_gen = saved_evac_gen;
-           gct->failed_to_evac = rtsTrue; // mutable
-            break;
-          }
-
        default:
            barf("scavenge_mark_stack: unimplemented/strange closure type %d @ %p", 
                 info->type, p);
@@ -1133,9 +1037,11 @@ static rtsBool
 scavenge_one(StgPtr p)
 {
     const StgInfoTable *info;
-    generation *saved_evac_gen = gct->evac_gen;
     rtsBool no_luck;
+    rtsBool saved_eager_promotion;
     
+    saved_eager_promotion = gct->eager_promotion;
+
     ASSERT(LOOKS_LIKE_CLOSURE_PTR(p));
     info = get_itbl((StgClosure *)p);
     
@@ -1144,8 +1050,6 @@ scavenge_one(StgPtr p)
     case MVAR_CLEAN:
     case MVAR_DIRTY:
     { 
-       rtsBool saved_eager_promotion = gct->eager_promotion;
-
        StgMVar *mvar = ((StgMVar *)p);
        gct->eager_promotion = rtsFalse;
        evacuate((StgClosure **)&mvar->head);
@@ -1190,6 +1094,7 @@ scavenge_one(StgPtr p)
     case CONSTR_0_2:
     case CONSTR_2_0:
     case WEAK:
+    case PRIM:
     case IND_PERM:
     {
        StgPtr q, end;
@@ -1204,7 +1109,6 @@ scavenge_one(StgPtr p)
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY: {
        StgPtr q = p;
-       rtsBool saved_eager_promotion = gct->eager_promotion;
 
        gct->eager_promotion = rtsFalse;
        evacuate(&((StgMutVar *)p)->var);
@@ -1254,13 +1158,10 @@ scavenge_one(StgPtr p)
     case MUT_ARR_PTRS_CLEAN:
     case MUT_ARR_PTRS_DIRTY:
     {
-       rtsBool saved_eager;
-
        // We don't eagerly promote objects pointed to by a mutable
        // array, but if we find the array only points to objects in
        // the same or an older generation, we mark it "clean" and
        // avoid traversing it during minor GCs.
-       saved_eager = gct->eager_promotion;
        gct->eager_promotion = rtsFalse;
 
         scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
@@ -1271,7 +1172,7 @@ scavenge_one(StgPtr p)
            ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
        }
 
-       gct->eager_promotion = saved_eager;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue;
        break;
     }
@@ -1298,81 +1199,40 @@ scavenge_one(StgPtr p)
        break;
     }
   
-    case TVAR_WATCH_QUEUE:
-      {
-       StgTVarWatchQueue *wq = ((StgTVarWatchQueue *) p);
-       gct->evac_gen = 0;
-        evacuate((StgClosure **)&wq->closure);
-        evacuate((StgClosure **)&wq->next_queue_entry);
-        evacuate((StgClosure **)&wq->prev_queue_entry);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-       break;
-      }
+    case MUT_PRIM:
+    {
+       StgPtr end;
+        
+       gct->eager_promotion = rtsFalse;
+        
+       end = (P_)((StgClosure *)p)->payload + info->layout.payload.ptrs;
+       for (p = (P_)((StgClosure *)p)->payload; p < end; p++) {
+           evacuate((StgClosure **)p);
+       }
 
-    case TVAR:
-      {
-       StgTVar *tvar = ((StgTVar *) p);
-       gct->evac_gen = 0;
-       evacuate((StgClosure **)&tvar->current_value);
-        evacuate((StgClosure **)&tvar->first_watch_queue_entry);
-       gct->evac_gen = saved_evac_gen;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
        break;
-      }
 
-    case TREC_HEADER:
-      {
-        StgTRecHeader *trec = ((StgTRecHeader *) p);
-        gct->evac_gen = 0;
-       evacuate((StgClosure **)&trec->enclosing_trec);
-       evacuate((StgClosure **)&trec->current_chunk);
-        evacuate((StgClosure **)&trec->invariants_to_check);
-       gct->evac_gen = saved_evac_gen;
-       gct->failed_to_evac = rtsTrue; // mutable
-        break;
-      }
+    }
 
     case TREC_CHUNK:
       {
        StgWord i;
        StgTRecChunk *tc = ((StgTRecChunk *) p);
        TRecEntry *e = &(tc -> entries[0]);
-       gct->evac_gen = 0;
+       gct->eager_promotion = rtsFalse;
        evacuate((StgClosure **)&tc->prev_chunk);
        for (i = 0; i < tc -> next_entry_idx; i ++, e++ ) {
          evacuate((StgClosure **)&e->tvar);
          evacuate((StgClosure **)&e->expected_value);
          evacuate((StgClosure **)&e->new_value);
        }
-       gct->evac_gen = saved_evac_gen;
+       gct->eager_promotion = saved_eager_promotion;
        gct->failed_to_evac = rtsTrue; // mutable
        break;
       }
 
-    case ATOMIC_INVARIANT:
-    {
-      StgAtomicInvariant *invariant = ((StgAtomicInvariant *) p);
-      gct->evac_gen = 0;
-      evacuate(&invariant->code);
-      evacuate((StgClosure **)&invariant->last_execution);
-      gct->evac_gen = saved_evac_gen;
-      gct->failed_to_evac = rtsTrue; // mutable
-      break;
-    }
-
-    case INVARIANT_CHECK_QUEUE:
-    {
-      StgInvariantCheckQueue *queue = ((StgInvariantCheckQueue *) p);
-      gct->evac_gen = 0;
-      evacuate((StgClosure **)&queue->invariant);
-      evacuate((StgClosure **)&queue->my_execution);
-      evacuate((StgClosure **)&queue->next_queue_entry);
-      gct->evac_gen = saved_evac_gen;
-      gct->failed_to_evac = rtsTrue; // mutable
-      break;
-    }
-
     case IND:
         // IND can happen, for example, when the interpreter allocates
         // a gigantic AP closure (more than one block), which ends up
@@ -1470,8 +1330,8 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
                continue;
            case MUT_ARR_PTRS_DIRTY:
             {
-                rtsBool saved_eager;
-                saved_eager = gct->eager_promotion;
+                rtsBool saved_eager_promotion;
+                saved_eager_promotion = gct->eager_promotion;
                 gct->eager_promotion = rtsFalse;
 
                 scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p);
@@ -1482,7 +1342,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
                     ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
                 }
 
-                gct->eager_promotion = saved_eager;
+                gct->eager_promotion = saved_eager_promotion;
                 gct->failed_to_evac = rtsFalse;
                recordMutableGen_GC((StgClosure *)p,gen->no);
                continue;