New implementation of BLACKHOLEs
authorSimon Marlow <marlowsd@gmail.com>
Mon, 29 Mar 2010 14:44:56 +0000 (14:44 +0000)
committerSimon Marlow <marlowsd@gmail.com>
Mon, 29 Mar 2010 14:44:56 +0000 (14:44 +0000)
This replaces the global blackhole_queue with a clever scheme that
enables us to queue up blocked threads on the closure that they are
blocked on, while still avoiding atomic instructions in the common
case.

Advantages:

 - gets rid of a locked global data structure and some tricky GC code
   (replacing it with some per-thread data structures and different
   tricky GC code :)

 - wakeups are more prompt: parallel/concurrent performance should
   benefit.  I haven't seen anything dramatic in the parallel
   benchmarks so far, but a couple of threading benchmarks do improve
   a bit.

 - waking up a thread blocked on a blackhole is now O(1) (e.g. if
   it is the target of throwTo).

 - less sharing and better separation of Capabilities: communication
   is done with messages, the data structures are strictly owned by a
   Capability and cannot be modified except by sending messages.

 - this change will utlimately enable us to do more intelligent
   scheduling when threads block on each other.  This is what started
   off the whole thing, but it isn't done yet (#3838).

I'll be documenting all this on the wiki in due course.

47 files changed:
compiler/cmm/CLabel.hs
compiler/codeGen/CgCallConv.hs
compiler/codeGen/CgClosure.lhs
compiler/codeGen/CgMonad.lhs
compiler/codeGen/CgStackery.lhs
includes/Rts.h
includes/mkDerivedConstants.c
includes/rts/storage/ClosureMacros.h
includes/rts/storage/ClosureTypes.h
includes/rts/storage/Closures.h
includes/rts/storage/TSO.h
includes/stg/MiscClosures.h
rts/Capability.c
rts/Capability.h
rts/ClosureFlags.c
rts/FrontPanel.c
rts/HeapStackCheck.cmm
rts/Interpreter.c
rts/LdvProfile.c
rts/Linker.c
rts/Messages.c [new file with mode: 0644]
rts/Messages.h [new file with mode: 0644]
rts/PrimOps.cmm
rts/Printer.c
rts/ProfHeap.c
rts/RaiseAsync.c
rts/RetainerProfile.c
rts/STM.c
rts/Schedule.c
rts/Schedule.h
rts/StgMiscClosures.cmm
rts/ThreadPaused.c
rts/Threads.c
rts/Threads.h
rts/Timer.c
rts/Updates.cmm
rts/Updates.h
rts/posix/OSMem.c
rts/sm/Compact.c
rts/sm/Evac.c
rts/sm/GC.c
rts/sm/MarkWeak.c
rts/sm/MarkWeak.h
rts/sm/Sanity.c
rts/sm/Scav.c
rts/sm/Storage.c
utils/genapply/GenApply.hs

index 3ceb982..7954444 100644 (file)
@@ -58,6 +58,7 @@ module CLabel (
        mkSplitMarkerLabel,
        mkDirty_MUT_VAR_Label,
        mkUpdInfoLabel,
+       mkBHUpdInfoLabel,
        mkIndStaticInfoLabel,
         mkMainCapabilityLabel,
        mkMAP_FROZEN_infoLabel,
@@ -400,6 +401,7 @@ mkStaticConEntryLabel name  c     = IdLabel name c StaticConEntry
 mkSplitMarkerLabel             = CmmLabel rtsPackageId (fsLit "__stg_split_marker")    CmmCode
 mkDirty_MUT_VAR_Label          = CmmLabel rtsPackageId (fsLit "dirty_MUT_VAR")         CmmCode
 mkUpdInfoLabel                 = CmmLabel rtsPackageId (fsLit "stg_upd_frame")         CmmInfo
+mkBHUpdInfoLabel               = CmmLabel rtsPackageId (fsLit "stg_bh_upd_frame" )     CmmInfo
 mkIndStaticInfoLabel           = CmmLabel rtsPackageId (fsLit "stg_IND_STATIC")        CmmInfo
 mkMainCapabilityLabel          = CmmLabel rtsPackageId (fsLit "MainCapability")        CmmData
 mkMAP_FROZEN_infoLabel         = CmmLabel rtsPackageId (fsLit "stg_MUT_ARR_PTRS_FROZEN0") CmmInfo
index 8a1ae8b..b8294ea 100644 (file)
@@ -284,7 +284,6 @@ getSequelAmode
            OnStack -> do { sp_rel <- getSpRelOffset virt_sp
                          ; returnFC (CmmLoad sp_rel bWord) }
 
-           UpdateCode        -> returnFC (CmmLit (CmmLabel mkUpdInfoLabel))
            CaseAlts lbl _ _  -> returnFC (CmmLit (CmmLabel lbl))
        }
 
index f0fe3d1..60ba7f8 100644 (file)
@@ -474,7 +474,12 @@ emitBlackHoleCode is_single_entry = do
      then do
           tickyBlackHole (not is_single_entry)
           let bh_info = CmmReg (CmmGlobal EagerBlackholeInfo)
-         stmtC (CmmStore (CmmReg nodeReg) bh_info)
+         stmtsC [
+              CmmStore (cmmOffsetW (CmmReg nodeReg) fixedHdrSize)
+                       (CmmReg (CmmGlobal CurrentTSO)),
+              CmmCall (CmmPrim MO_WriteBarrier) [] [] CmmUnsafe CmmMayReturn,
+             CmmStore (CmmReg nodeReg) bh_info
+            ]
      else
           nopC
 \end{code}
@@ -489,17 +494,23 @@ setupUpdate closure_info code
   = code
 
   | not (isStaticClosure closure_info)
-  = if closureUpdReqd closure_info
-    then do { tickyPushUpdateFrame;  pushUpdateFrame (CmmReg nodeReg) code }
-    else do { tickyUpdateFrameOmitted; code }
+  = do
+   if not (closureUpdReqd closure_info)
+      then do tickyUpdateFrameOmitted; code
+      else do
+          tickyPushUpdateFrame
+          dflags <- getDynFlags
+          if not opt_SccProfilingOn && dopt Opt_EagerBlackHoling dflags
+              then pushBHUpdateFrame (CmmReg nodeReg) code
+              else pushUpdateFrame   (CmmReg nodeReg) code
+  
   | otherwise  -- A static closure
   = do         { tickyUpdateBhCaf closure_info
 
        ; if closureUpdReqd closure_info
          then do       -- Blackhole the (updatable) CAF:
                { upd_closure <- link_caf closure_info True
-               ; pushUpdateFrame upd_closure code }
+               ; pushBHUpdateFrame upd_closure code }
          else do
                { -- krc: removed some ticky-related code here.
                ; tickyUpdateFrameOmitted
@@ -553,7 +564,8 @@ link_caf cl_info _is_upd = do
   {    -- Alloc black hole specifying CC_HDR(Node) as the cost centre
   ; let        use_cc   = costCentreFrom (CmmReg nodeReg)
         blame_cc = use_cc
-  ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc []
+        tso      = CmmReg (CmmGlobal CurrentTSO)
+  ; hp_offset <- allocDynClosure bh_cl_info use_cc blame_cc [(tso,fixedHdrSize)]
   ; hp_rel    <- getHpRelOffset hp_offset
 
        -- Call the RTS function newCAF to add the CAF to the CafList
index 83d2b72..e5bca2a 100644 (file)
@@ -169,7 +169,6 @@ block.
 \begin{code}
 data Sequel
   = OnStack            -- Continuation is on the stack
-  | UpdateCode         -- Continuation is update
 
   | CaseAlts
          CLabel     -- Jump to this; if the continuation is for a vectored
index 6683de4..532127a 100644 (file)
@@ -17,7 +17,7 @@ module CgStackery (
        setStackFrame, getStackFrame,
        mkVirtStkOffsets, mkStkAmodes,
        freeStackSlots, 
-       pushUpdateFrame, emitPushUpdateFrame,
+       pushUpdateFrame, pushBHUpdateFrame, emitPushUpdateFrame,
     ) where
 
 #include "HsVersions.h"
@@ -265,6 +265,14 @@ to reflect the frame pushed.
 \begin{code}
 pushUpdateFrame :: CmmExpr -> Code -> Code
 pushUpdateFrame updatee code
+  = pushSpecUpdateFrame mkUpdInfoLabel updatee code
+
+pushBHUpdateFrame :: CmmExpr -> Code -> Code
+pushBHUpdateFrame updatee code
+  = pushSpecUpdateFrame mkBHUpdInfoLabel updatee code
+
+pushSpecUpdateFrame :: CLabel -> CmmExpr -> Code -> Code
+pushSpecUpdateFrame lbl updatee code
   = do {
       when debugIsOn $ do
        { EndOfBlockInfo _ sequel <- getEndOfBlockInfo ;
@@ -277,15 +285,25 @@ pushUpdateFrame updatee code
                -- The location of the lowest-address
                -- word of the update frame itself
 
-       ; setEndOfBlockInfo (EndOfBlockInfo vsp UpdateCode) $
-           do  { emitPushUpdateFrame frame_addr updatee
+                -- NB. we used to set the Sequel to 'UpdateCode' so
+                -- that we could jump directly to the update code if
+                -- we know that the next frame on the stack is an
+                -- update frame.  However, the RTS can sometimes
+                -- change an update frame into something else (see
+                -- e.g. Note [upd-black-hole] in rts/sm/Scav.c), so we
+                -- no longer make this assumption.
+       ; setEndOfBlockInfo (EndOfBlockInfo vsp OnStack) $
+           do  { emitSpecPushUpdateFrame lbl frame_addr updatee
                ; code }
        }
 
 emitPushUpdateFrame :: CmmExpr -> CmmExpr -> Code
-emitPushUpdateFrame frame_addr updatee = do
+emitPushUpdateFrame = emitSpecPushUpdateFrame mkUpdInfoLabel
+
+emitSpecPushUpdateFrame :: CLabel -> CmmExpr -> CmmExpr -> Code
+emitSpecPushUpdateFrame lbl frame_addr updatee = do
        stmtsC [  -- Set the info word
-                 CmmStore frame_addr (mkLblExpr mkUpdInfoLabel)
+                 CmmStore frame_addr (mkLblExpr lbl)
                , -- And the updatee
                  CmmStore (cmmOffsetB frame_addr off_updatee) updatee ]
        initUpdFrameProf frame_addr
index 3318402..d79e9ad 100644 (file)
@@ -106,10 +106,18 @@ void _assertFail(const char *filename, unsigned int linenum)
        else                                    \
            _assertFail(__FILE__, __LINE__)
 
+#define CHECKM(predicate, msg, ...)             \
+       if (predicate)                          \
+           /*null*/;                           \
+       else                                    \
+            barf(msg, ##__VA_ARGS__)
+
 #ifndef DEBUG
 #define ASSERT(predicate) /* nothing */
+#define ASSERTM(predicate,msg,...) /* nothing */
 #else
 #define ASSERT(predicate) CHECK(predicate)
+#define ASSERTM(predicate,msg,...) CHECKM(predicate,msg,##__VA_ARGS__)
 #endif /* DEBUG */
 
 /* 
index 94157f0..92685ca 100644 (file)
@@ -291,6 +291,7 @@ main(int argc, char *argv[])
     closure_field(StgTSO, trec);
     closure_field(StgTSO, flags);
     closure_field(StgTSO, dirty);
+    closure_field(StgTSO, bq);
     closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
     tso_field(StgTSO, sp);
     tso_field_offset(StgTSO, stack);
@@ -382,6 +383,17 @@ main(int argc, char *argv[])
     closure_size(StgStableName);
     closure_field(StgStableName,sn);
 
+    closure_size(StgBlockingQueue);
+    closure_field(StgBlockingQueue, bh);
+    closure_field(StgBlockingQueue, owner);
+    closure_field(StgBlockingQueue, queue);
+    closure_field(StgBlockingQueue, link);
+
+    closure_size(MessageBlackHole);
+    closure_field(MessageBlackHole, link);
+    closure_field(MessageBlackHole, tso);
+    closure_field(MessageBlackHole, bh);
+
     struct_field_("RtsFlags_ProfFlags_showCCSOnException",
                  RTS_FLAGS, ProfFlags.showCCSOnException);
     struct_field_("RtsFlags_DebugFlags_apply",
index a115f6f..098c65d 100644 (file)
    SET_HDR(c,info,costCentreStack);                    \
    (c)->words = n_words;
 
+// Use when changing a closure from one kind to another
+#define OVERWRITE_INFO(c, new_info)                             \
+   LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)(c));       \
+   SET_INFO((c), (new_info));                                  \
+   LDV_RECORD_CREATE(c);
+
 /* -----------------------------------------------------------------------------
    How to get hold of the static link field for a static closure.
    -------------------------------------------------------------------------- */
@@ -249,7 +255,7 @@ INLINE_HEADER StgOffset THUNK_SELECTOR_sizeW ( void )
 { return sizeofW(StgSelector); }
 
 INLINE_HEADER StgOffset BLACKHOLE_sizeW ( void )
-{ return sizeofW(StgHeader)+MIN_PAYLOAD_SIZE; }
+{ return sizeofW(StgInd); } // a BLACKHOLE is a kind of indirection
 
 /* --------------------------------------------------------------------------
    Sizes of closures
index 6a76772..518d39b 100644 (file)
@@ -62,7 +62,7 @@
 #define UPDATE_FRAME           38
 #define CATCH_FRAME            39
 #define STOP_FRAME             40
-#define CAF_BLACKHOLE          41
+#define BLOCKING_QUEUE         41
 #define BLACKHOLE              42
 #define MVAR_CLEAN             43
 #define MVAR_DIRTY             44
index d7498e2..8027468 100644 (file)
@@ -127,6 +127,14 @@ typedef struct {
     StgInfoTable *saved_info;
 } StgIndStatic;
 
+typedef struct StgBlockingQueue_ {
+    StgHeader   header;
+    struct StgBlockingQueue_ *link; // here so it looks like an IND
+    StgClosure *bh;  // the BLACKHOLE
+    StgTSO     *owner;
+    struct MessageBlackHole_ *queue;
+} StgBlockingQueue;
+
 typedef struct {
     StgHeader  header;
     StgWord    words;
@@ -433,10 +441,17 @@ typedef struct MessageWakeup_ {
 
 typedef struct MessageThrowTo_ {
     StgHeader   header;
-    Message    *link;
+    struct MessageThrowTo_ *link;
     StgTSO     *source;
     StgTSO     *target;
     StgClosure *exception;
 } MessageThrowTo;
 
+typedef struct MessageBlackHole_ {
+    StgHeader   header;
+    struct MessageBlackHole_ *link;
+    StgTSO     *tso;
+    StgClosure *bh;
+} MessageBlackHole;
+
 #endif /* RTS_STORAGE_CLOSURES_H */
index e2015f2..e07be88 100644 (file)
@@ -46,6 +46,7 @@ typedef struct {
 /* Reason for thread being blocked. See comment above struct StgTso_. */
 typedef union {
   StgClosure *closure;
+  struct MessageBlackHole_ *bh;
   struct MessageThrowTo_ *throwto;
   struct MessageWakeup_  *wakeup;
   StgInt fd;   /* StgInt instead of int, so that it's the same size as the ptrs */
@@ -78,12 +79,17 @@ typedef struct StgTSO_ {
     */
     struct StgTSO_*         _link;
     /*
+      Currently used for linking TSOs on:
+      * cap->run_queue_{hd,tl}
+      * MVAR queue
+      * (non-THREADED_RTS); the blocked_queue
+      * and pointing to the relocated version of a ThreadRelocated
+
        NOTE!!!  do not modify _link directly, it is subject to
        a write barrier for generational GC.  Instead use the
        setTSOLink() function.  Exceptions to this rule are:
 
        * setting the link field to END_TSO_QUEUE
-       * putting a TSO on the blackhole_queue
        * setting the link field of the currently running TSO, as it
          will already be dirty.
     */
@@ -127,6 +133,12 @@ typedef struct StgTSO_ {
     */
     struct MessageThrowTo_ * blocked_exceptions;
 
+    /*
+      A list of StgBlockingQueue objects, representing threads blocked
+      on thunks that are under evaluation by this thread.
+    */
+    struct StgBlockingQueue_ *bq;
+
 #ifdef TICKY_TICKY
     /* TICKY-specific stuff would go here. */
 #endif
@@ -152,6 +164,18 @@ typedef struct StgTSO_ {
 void dirty_TSO  (Capability *cap, StgTSO *tso);
 void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
 
+// Apply to a TSO before looking at it if you are not sure whether it
+// might be ThreadRelocated or not (basically, that's most of the time
+// unless the TSO is the current TSO).
+//
+INLINE_HEADER StgTSO * deRefTSO(StgTSO *tso)
+{
+    while (tso->what_next == ThreadRelocated) {
+       tso = tso->_link;
+    }
+    return tso;
+}
+
 /* -----------------------------------------------------------------------------
    Invariants:
 
index 42e878f..9834c4b 100644 (file)
@@ -44,6 +44,7 @@
 
 /* Stack frames */
 RTS_RET_INFO(stg_upd_frame_info);
+RTS_RET_INFO(stg_bh_upd_frame_info);
 RTS_RET_INFO(stg_marked_upd_frame_info);
 RTS_RET_INFO(stg_noupd_frame_info);
 RTS_RET_INFO(stg_catch_frame_info);
@@ -54,6 +55,7 @@ RTS_RET_INFO(stg_catch_stm_frame_info);
 RTS_RET_INFO(stg_unblockAsyncExceptionszh_ret_info);
 
 RTS_ENTRY(stg_upd_frame_ret);
+RTS_ENTRY(stg_bh_upd_frame_ret);
 RTS_ENTRY(stg_marked_upd_frame_ret);
 
 // RTS_FUN(stg_interp_constr_entry);
@@ -90,12 +92,12 @@ RTS_INFO(stg_IND_STATIC_info);
 RTS_INFO(stg_IND_PERM_info);
 RTS_INFO(stg_IND_OLDGEN_info);
 RTS_INFO(stg_IND_OLDGEN_PERM_info);
-RTS_INFO(stg_CAF_UNENTERED_info);
-RTS_INFO(stg_CAF_ENTERED_info);
-RTS_INFO(stg_WHITEHOLE_info);
 RTS_INFO(stg_BLACKHOLE_info);
-RTS_INFO(__stg_EAGER_BLACKHOLE_info);
 RTS_INFO(stg_CAF_BLACKHOLE_info);
+RTS_INFO(__stg_EAGER_BLACKHOLE_info);
+RTS_INFO(stg_WHITEHOLE_info);
+RTS_INFO(stg_BLOCKING_QUEUE_CLEAN_info);
+RTS_INFO(stg_BLOCKING_QUEUE_DIRTY_info);
 
 RTS_FUN_INFO(stg_BCO_info);
 RTS_INFO(stg_EVACUATED_info);
@@ -115,7 +117,9 @@ RTS_INFO(stg_MUT_VAR_CLEAN_info);
 RTS_INFO(stg_MUT_VAR_DIRTY_info);
 RTS_INFO(stg_END_TSO_QUEUE_info);
 RTS_INFO(stg_MSG_WAKEUP_info);
+RTS_INFO(stg_MSG_TRY_WAKEUP_info);
 RTS_INFO(stg_MSG_THROWTO_info);
+RTS_INFO(stg_MSG_BLACKHOLE_info);
 RTS_INFO(stg_MUT_CONS_info);
 RTS_INFO(stg_catch_info);
 RTS_INFO(stg_PAP_info);
@@ -142,12 +146,10 @@ RTS_ENTRY(stg_IND_STATIC_entry);
 RTS_ENTRY(stg_IND_PERM_entry);
 RTS_ENTRY(stg_IND_OLDGEN_entry);
 RTS_ENTRY(stg_IND_OLDGEN_PERM_entry);
-RTS_ENTRY(stg_CAF_UNENTERED_entry);
-RTS_ENTRY(stg_CAF_ENTERED_entry);
 RTS_ENTRY(stg_WHITEHOLE_entry);
 RTS_ENTRY(stg_BLACKHOLE_entry);
-RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
 RTS_ENTRY(stg_CAF_BLACKHOLE_entry);
+RTS_ENTRY(__stg_EAGER_BLACKHOLE_entry);
 RTS_ENTRY(stg_BCO_entry);
 RTS_ENTRY(stg_EVACUATED_entry);
 RTS_ENTRY(stg_WEAK_entry);
@@ -166,7 +168,9 @@ RTS_ENTRY(stg_MUT_VAR_CLEAN_entry);
 RTS_ENTRY(stg_MUT_VAR_DIRTY_entry);
 RTS_ENTRY(stg_END_TSO_QUEUE_entry);
 RTS_ENTRY(stg_MSG_WAKEUP_entry);
+RTS_ENTRY(stg_MSG_TRY_WAKEUP_entry);
 RTS_ENTRY(stg_MSG_THROWTO_entry);
+RTS_ENTRY(stg_MSG_BLACKHOLE_entry);
 RTS_ENTRY(stg_MUT_CONS_entry);
 RTS_ENTRY(stg_catch_entry);
 RTS_ENTRY(stg_PAP_entry);
@@ -404,6 +408,8 @@ RTS_FUN(stg_PAP_apply);
 
 RTS_RET_INFO(stg_enter_info);
 RTS_ENTRY(stg_enter_ret);
+RTS_RET_INFO(stg_enter_checkbh_info);
+RTS_ENTRY(stg_enter_checkbh_ret);
 
 RTS_RET_INFO(stg_gc_void_info);
 RTS_ENTRY(stg_gc_void_ret);
index 5f54eca..f5e77a9 100644 (file)
@@ -62,9 +62,8 @@ Capability * rts_unsafeGetMyCapability (void)
 STATIC_INLINE rtsBool
 globalWorkToDo (void)
 {
-    return blackholes_need_checking
-       || sched_state >= SCHED_INTERRUPTING
-       ;
+    return sched_state >= SCHED_INTERRUPTING
+        || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
 }
 #endif
 
@@ -637,43 +636,6 @@ yieldCapability (Capability** pCap, Task *task)
 }
 
 /* ----------------------------------------------------------------------------
- * Wake up a thread on a Capability.
- *
- * This is used when the current Task is running on a Capability and
- * wishes to wake up a thread on a different Capability.
- * ------------------------------------------------------------------------- */
-
-void
-wakeupThreadOnCapability (Capability *cap,
-                          Capability *other_cap, 
-                          StgTSO *tso)
-{
-    MessageWakeup *msg;
-
-    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
-    if (tso->bound) {
-       ASSERT(tso->bound->task->cap == tso->cap);
-       tso->bound->task->cap = other_cap;
-    }
-    tso->cap = other_cap;
-
-    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
-           tso->block_info.closure->header.info == &stg_IND_info);
-
-    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
-
-    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
-    msg->header.info = &stg_MSG_WAKEUP_info;
-    msg->tso = tso;
-    tso->block_info.closure = (StgClosure *)msg;
-    dirty_TSO(cap, tso);
-    write_barrier();
-    tso->why_blocked = BlockedOnMsgWakeup;
-
-    sendMessage(other_cap, (Message*)msg);
-}
-
-/* ----------------------------------------------------------------------------
  * prodCapability
  *
  * If a Capability is currently idle, wake up a Task on it.  Used to 
@@ -906,24 +868,3 @@ markCapabilities (evac_fn evac, void *user)
    Messages
    -------------------------------------------------------------------------- */
 
-#ifdef THREADED_RTS
-
-void sendMessage(Capability *cap, Message *msg)
-{
-    ACQUIRE_LOCK(&cap->lock);
-
-    msg->link = cap->inbox;
-    cap->inbox = msg;
-
-    if (cap->running_task == NULL) {
-       cap->running_task = myTask(); 
-            // precond for releaseCapability_()
-       releaseCapability_(cap,rtsFalse);
-    } else {
-        contextSwitchCapability(cap);
-    }
-
-    RELEASE_LOCK(&cap->lock);
-}
-
-#endif // THREADED_RTS
index 4030b5e..e12b8ce 100644 (file)
@@ -201,6 +201,8 @@ void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
 
 INLINE_HEADER void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
 
+INLINE_HEADER void recordClosureMutated (Capability *cap, StgClosure *p);
+
 #if defined(THREADED_RTS)
 
 // Gives up the current capability IFF there is a higher-priority
@@ -222,12 +224,6 @@ void yieldCapability (Capability** pCap, Task *task);
 //
 void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
 
-// Wakes up a thread on a Capability (probably a different Capability
-// from the one held by the current Task).
-//
-void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
-                               StgTSO *tso);
-
 // Wakes up a worker thread on just one Capability, used when we
 // need to service some global event.
 //
@@ -289,8 +285,6 @@ void traverseSparkQueues (evac_fn evac, void *user);
 
 INLINE_HEADER rtsBool emptyInbox(Capability *cap);;
 
-void sendMessage (Capability *cap, Message *msg);
-
 #endif // THREADED_RTS
 
 /* -----------------------------------------------------------------------------
@@ -316,6 +310,15 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
     *bd->free++ = (StgWord)p;
 }
 
+INLINE_HEADER void
+recordClosureMutated (Capability *cap, StgClosure *p)
+{
+    bdescr *bd;
+    bd = Bdescr((StgPtr)p);
+    if (bd->gen_no != 0) recordMutableCap(p,cap,bd->gen_no);
+}
+
+
 #if defined(THREADED_RTS)
 INLINE_HEADER rtsBool
 emptySparkPoolCap (Capability *cap) 
index 358cb40..cebd124 100644 (file)
@@ -62,8 +62,8 @@ StgWord16 closure_flags[] = {
  [UPDATE_FRAME]                =  (     _BTM                                  ),
  [CATCH_FRAME]         =  (     _BTM                                  ),
  [STOP_FRAME]          =  (     _BTM                                  ),
- [CAF_BLACKHOLE]       =  (     _BTM|_NS|              _UPT           ),
  [BLACKHOLE]           =  (          _NS|              _UPT           ),
+ [BLOCKING_QUEUE]      =  (          _NS|         _MUT|_UPT           ),
  [MVAR_CLEAN]          =  (_HNF|     _NS|         _MUT|_UPT           ),
  [MVAR_DIRTY]          =  (_HNF|     _NS|         _MUT|_UPT           ),
  [ARR_WORDS]           =  (_HNF|     _NS|              _UPT           ),
index ebba405..da42548 100644 (file)
@@ -662,8 +662,6 @@ residencyCensus( void )
                        type = Thunk;
                        break;
 
-                   case CAF_BLACKHOLE:
-                   case EAGER_BLACKHOLE:
                    case BLACKHOLE:
 /*                 case BLACKHOLE_BQ: FIXME: case does not exist */
                        size = sizeW_fromITBL(info);
index 5bdf600..f8bccc0 100644 (file)
@@ -159,6 +159,24 @@ __stg_gc_enter_1
 }
 
 /* -----------------------------------------------------------------------------
+   stg_enter_checkbh is just like stg_enter, except that we also call
+   checkBlockingQueues().  The point of this is that the GC can
+   replace an stg_marked_upd_frame with an stg_enter_checkbh if it
+   finds that the BLACKHOLE has already been updated by another
+   thread.  It would be unsafe to use stg_enter, because there might
+   be an orphaned BLOCKING_QUEUE now.
+   -------------------------------------------------------------------------- */
+
+INFO_TABLE_RET( stg_enter_checkbh, RET_SMALL, P_ unused)
+{
+    R1 = Sp(1);
+    Sp_adj(2);
+    foreign "C" checkBlockingQueues(MyCapability() "ptr",
+                                    CurrentTSO) [R1];
+    ENTER();
+}
+
+/* -----------------------------------------------------------------------------
    Heap checks in Primitive case alternatives
 
    A primitive case alternative is entered with a value either in 
@@ -593,11 +611,7 @@ INFO_TABLE_RET( stg_block_putmvar, RET_SMALL, P_ unused1, P_ unused2 )
 // code fragment executed just before we return to the scheduler
 stg_block_putmvar_finally
 {
-#ifdef THREADED_RTS
     unlockClosure(R3, stg_MVAR_DIRTY_info);
-#else
-    SET_INFO(R3, stg_MVAR_DIRTY_info);
-#endif
     jump StgReturn;
 }
 
@@ -611,24 +625,12 @@ stg_block_putmvar
     BLOCK_BUT_FIRST(stg_block_putmvar_finally);
 }
 
-// code fragment executed just before we return to the scheduler
-stg_block_blackhole_finally
-{
-#if defined(THREADED_RTS)
-    // The last thing we do is release sched_lock, which is
-    // preventing other threads from accessing blackhole_queue and
-    // picking up this thread before we are finished with it.
-    RELEASE_LOCK(sched_mutex "ptr");
-#endif
-    jump StgReturn;
-}
-
 stg_block_blackhole
 {
     Sp_adj(-2);
     Sp(1) = R1;
     Sp(0) = stg_enter_info;
-    BLOCK_BUT_FIRST(stg_block_blackhole_finally);
+    BLOCK_GENERIC;
 }
 
 INFO_TABLE_RET( stg_block_throwto, RET_SMALL, P_ unused, P_ unused )
index 9071912..16a8e24 100644 (file)
@@ -21,6 +21,7 @@
 #include "Disassembler.h"
 #include "Interpreter.h"
 #include "ThreadPaused.h"
+#include "Threads.h"
 
 #include <string.h>     /* for memcpy */
 #ifdef HAVE_ERRNO_H
@@ -443,7 +444,8 @@ do_return:
         // to a PAP by the GC, violating the invariant that PAPs
         // always contain a tagged pointer to the function.
        INTERP_TICK(it_retto_UPDATE);
-       UPD_IND(cap, ((StgUpdateFrame *)Sp)->updatee, tagged_obj); 
+        updateThunk(cap, cap->r.rCurrentTSO, 
+                    ((StgUpdateFrame *)Sp)->updatee, tagged_obj);
        Sp += sizeofW(StgUpdateFrame);
        goto do_return;
 
index 412fd05..799d418 100644 (file)
@@ -140,7 +140,7 @@ processHeapClosureForDead( StgClosure *c )
     case FUN_1_1:
     case FUN_0_2:
     case BLACKHOLE:
-    case CAF_BLACKHOLE:
+    case BLOCKING_QUEUE:
     case IND_PERM:
     case IND_OLDGEN_PERM:
        /*
index 4e8baff..0624081 100644 (file)
@@ -877,7 +877,10 @@ typedef struct _RtsSymbolVal {
       SymI_HasProto(stable_ptr_table)                  \
       SymI_HasProto(stackOverflow)                     \
       SymI_HasProto(stg_CAF_BLACKHOLE_info)            \
+      SymI_HasProto(stg_BLACKHOLE_info)                        \
       SymI_HasProto(__stg_EAGER_BLACKHOLE_info)                \
+      SymI_HasProto(stg_BLOCKING_QUEUE_CLEAN_info)      \
+      SymI_HasProto(stg_BLOCKING_QUEUE_DIRTY_info)     \
       SymI_HasProto(startTimer)                         \
       SymI_HasProto(stg_MVAR_CLEAN_info)               \
       SymI_HasProto(stg_MVAR_DIRTY_info)               \
@@ -941,6 +944,7 @@ typedef struct _RtsSymbolVal {
       SymI_HasProto(stg_sel_8_upd_info)                        \
       SymI_HasProto(stg_sel_9_upd_info)                        \
       SymI_HasProto(stg_upd_frame_info)                        \
+      SymI_HasProto(stg_bh_upd_frame_info)             \
       SymI_HasProto(suspendThread)                     \
       SymI_HasProto(stg_takeMVarzh)                    \
       SymI_HasProto(stg_threadStatuszh)                        \
diff --git a/rts/Messages.c b/rts/Messages.c
new file mode 100644 (file)
index 0000000..2b40f76
--- /dev/null
@@ -0,0 +1,296 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+#include "Rts.h"
+#include "Messages.h"
+#include "Trace.h"
+#include "Capability.h"
+#include "Schedule.h"
+#include "Threads.h"
+#include "RaiseAsync.h"
+#include "sm/Storage.h"
+
+/* ----------------------------------------------------------------------------
+   Send a message to another Capability
+   ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
+{
+    ACQUIRE_LOCK(&to_cap->lock);
+
+#ifdef DEBUG    
+    {
+        const StgInfoTable *i = msg->header.info;
+        if (i != &stg_MSG_WAKEUP_info &&
+            i != &stg_MSG_THROWTO_info &&
+            i != &stg_MSG_BLACKHOLE_info &&
+            i != &stg_MSG_TRY_WAKEUP_info &&
+            i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
+            i != &stg_WHITEHOLE_info) {
+            barf("sendMessage: %p", i);
+        }
+    }
+#endif
+
+    msg->link = to_cap->inbox;
+    to_cap->inbox = msg;
+
+    recordClosureMutated(from_cap,(StgClosure*)msg);
+
+    if (to_cap->running_task == NULL) {
+       to_cap->running_task = myTask(); 
+            // precond for releaseCapability_()
+       releaseCapability_(to_cap,rtsFalse);
+    } else {
+        contextSwitchCapability(to_cap);
+    }
+
+    RELEASE_LOCK(&to_cap->lock);
+}
+
+#endif /* THREADED_RTS */
+
+/* ----------------------------------------------------------------------------
+   Handle a message
+   ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+executeMessage (Capability *cap, Message *m)
+{
+    const StgInfoTable *i;
+
+loop:
+    write_barrier(); // allow m->header to be modified by another thread
+    i = m->header.info;
+    if (i == &stg_MSG_WAKEUP_info)
+    {
+        // the plan is to eventually get rid of these and use
+        // TRY_WAKEUP instead.
+        MessageWakeup *w = (MessageWakeup *)m;
+        StgTSO *tso = w->tso;
+        debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
+                      (lnat)tso->id);
+        ASSERT(tso->cap == cap);
+        ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
+        ASSERT(tso->block_info.closure == (StgClosure *)m);
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap, tso);
+    }
+    else if (i == &stg_MSG_TRY_WAKEUP_info)
+    {
+        StgTSO *tso = ((MessageWakeup *)m)->tso;
+        debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld", 
+                      (lnat)tso->id);
+        tryWakeupThread(cap, tso);
+    }
+    else if (i == &stg_MSG_THROWTO_info)
+    {
+        MessageThrowTo *t = (MessageThrowTo *)m;
+        nat r;
+        const StgInfoTable *i;
+
+        i = lockClosure((StgClosure*)m);
+        if (i != &stg_MSG_THROWTO_info) {
+            unlockClosure((StgClosure*)m, i);
+            goto loop;
+        }
+
+        debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
+                      (lnat)t->source->id, (lnat)t->target->id);
+
+        ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
+        ASSERT(t->source->block_info.closure == (StgClosure *)m);
+
+        r = throwToMsg(cap, t);
+
+        switch (r) {
+        case THROWTO_SUCCESS:
+            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
+            t->source->sp += 3;
+            unblockOne(cap, t->source);
+            // this message is done
+            unlockClosure((StgClosure*)m, &stg_IND_info);
+            break;
+        case THROWTO_BLOCKED:
+            // unlock the message
+            unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
+            break;
+        }
+    }
+    else if (i == &stg_MSG_BLACKHOLE_info)
+    {
+        nat r;
+        MessageBlackHole *b = (MessageBlackHole*)m;
+
+        r = messageBlackHole(cap, b);
+        if (r == 0) {
+            tryWakeupThread(cap, b->tso);
+        }
+        return;
+    }
+    else if (i == &stg_IND_info)
+    {
+        // message was revoked
+        return;
+    }
+    else if (i == &stg_WHITEHOLE_info)
+    {
+        goto loop;
+    }
+    else
+    {
+        barf("executeMessage: %p", i);
+    }
+}
+
+#endif
+
+/* ----------------------------------------------------------------------------
+   Handle a MSG_BLACKHOLE message
+
+   This is called from two places: either we just entered a BLACKHOLE
+   (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
+   cap->inbox.  
+
+   We need to establish whether the BLACKHOLE belongs to
+   this Capability, and 
+     - if so, arrange to block the current thread on it
+     - otherwise, forward the message to the right place
+
+   Returns:
+     - 0 if the blocked thread can be woken up by the caller
+     - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
+       at some point in the future.
+
+   ------------------------------------------------------------------------- */
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg)
+{
+    const StgInfoTable *info;
+    StgClosure *p;
+    StgBlockingQueue *bq;
+    StgClosure *bh = msg->bh;
+    StgTSO *owner;
+
+    debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on blackhole %p", 
+                  (lnat)msg->tso->id, msg->bh);
+
+    info = bh->header.info;
+
+    // If we got this message in our inbox, it might be that the
+    // BLACKHOLE has already been updated, and GC has shorted out the
+    // indirection, so the pointer no longer points to a BLACKHOLE at
+    // all.
+    if (info != &stg_BLACKHOLE_info && 
+        info != &stg_CAF_BLACKHOLE_info && 
+        info != &stg_WHITEHOLE_info) {
+        // if it is a WHITEHOLE, then a thread is in the process of
+        // trying to BLACKHOLE it.  But we know that it was once a
+        // BLACKHOLE, so there is at least a valid pointer in the
+        // payload, so we can carry on.
+        return 0;
+    }
+
+    // we know at this point that the closure 
+loop:
+    p = ((StgInd*)bh)->indirectee;
+    info = p->header.info;
+
+    if (info == &stg_IND_info)
+    {
+        // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+        // just been replaced with an IND by another thread in
+        // updateThunk().  In which case, if we read the indirectee
+        // again we should get the value.
+        goto loop;
+    }
+
+    else if (info == &stg_TSO_info)
+    {
+        owner = deRefTSO((StgTSO *)p);
+
+#ifdef THREADED_RTS
+        if (owner->cap != cap) {
+            sendMessage(cap, owner->cap, (Message*)msg);
+            debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+            return 1;
+        }
+#endif
+        // owner is the owner of the BLACKHOLE, and resides on this
+        // Capability.  msg->tso is the first thread to block on this
+        // BLACKHOLE, so we first create a BLOCKING_QUEUE object.
+
+        bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
+            
+        // initialise the BLOCKING_QUEUE object
+        SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
+        bq->bh = bh;
+        bq->queue = msg;
+        bq->owner = owner;
+        
+        msg->link = (MessageBlackHole*)END_TSO_QUEUE;
+        
+        // All BLOCKING_QUEUES are linked in a list on owner->bq, so
+        // that we can search through them in the event that there is
+        // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
+        // becomes orphaned (see updateThunk()).
+        bq->link = owner->bq;
+        owner->bq = bq;
+        dirty_TSO(cap, owner); // we modified owner->bq
+        
+        // point to the BLOCKING_QUEUE from the BLACKHOLE
+        write_barrier(); // make the BQ visible
+        ((StgInd*)bh)->indirectee = (StgClosure *)bq;
+        recordClosureMutated(cap,bh); // bh was mutated
+
+        debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
+                      (lnat)msg->tso->id, (lnat)owner->id);
+
+        return 1; // blocked
+    }
+    else if (info == &stg_BLOCKING_QUEUE_CLEAN_info || 
+             info == &stg_BLOCKING_QUEUE_DIRTY_info)
+    {
+        StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+        ASSERT(bq->bh == bh);
+
+        owner = deRefTSO(bq->owner);
+
+        ASSERT(owner != END_TSO_QUEUE);
+
+#ifdef THREADED_RTS
+        if (owner->cap != cap) {
+            sendMessage(cap, owner->cap, (Message*)msg);
+            debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d", owner->cap->no);
+            return 1;
+        }
+#endif
+
+        msg->link = bq->queue;
+        bq->queue = msg;
+        recordClosureMutated(cap,(StgClosure*)msg);
+
+        if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
+            bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+            recordClosureMutated(cap,bq);
+        }
+
+        debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", 
+                      (lnat)msg->tso->id, (lnat)owner->id);
+
+        return 1; // blocked
+    }
+    
+    return 0; // not blocked
+}
+
diff --git a/rts/Messages.h b/rts/Messages.h
new file mode 100644 (file)
index 0000000..15c0379
--- /dev/null
@@ -0,0 +1,18 @@
+/* ---------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2010
+ *
+ * Inter-Capability message passing
+ *
+ * --------------------------------------------------------------------------*/
+
+BEGIN_RTS_PRIVATE
+
+nat messageBlackHole(Capability *cap, MessageBlackHole *msg);
+
+#ifdef THREADED_RTS
+void executeMessage (Capability *cap, Message *m);
+void sendMessage    (Capability *from_cap, Capability *to_cap, Message *msg);
+#endif
+
+END_RTS_PRIVATE
index 4b5e106..5c575f6 100644 (file)
@@ -1204,11 +1204,7 @@ stg_takeMVarzh
          StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
       }
 
-#if defined(THREADED_RTS)
       unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-      SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
       RET_P(val);
   } 
   else
@@ -1216,11 +1212,7 @@ stg_takeMVarzh
       /* No further putMVars, MVar is now empty */
       StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
  
-#if defined(THREADED_RTS)
       unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-      SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
 
       RET_P(val);
   }
@@ -1279,21 +1271,13 @@ stg_tryTakeMVarzh
        if (StgMVar_head(mvar) == stg_END_TSO_QUEUE_closure) {
            StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
        }
-#if defined(THREADED_RTS)
         unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-        SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     else 
     {
        /* No further putMVars, MVar is now empty */
        StgMVar_value(mvar) = stg_END_TSO_QUEUE_closure;
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-       SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     
     RET_NP(1, val);
@@ -1360,11 +1344,7 @@ stg_putMVarzh
            StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
        }
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-        SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
        jump %ENTRY_CODE(Sp(0));
     }
     else
@@ -1372,11 +1352,7 @@ stg_putMVarzh
        /* No further takes, the MVar is now full. */
        StgMVar_value(mvar) = val;
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-       SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
        jump %ENTRY_CODE(Sp(0));
     }
     
@@ -1429,22 +1405,14 @@ stg_tryPutMVarzh
            StgMVar_tail(mvar) = stg_END_TSO_QUEUE_closure;
        }
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-        SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     else
     {
        /* No further takes, the MVar is now full. */
        StgMVar_value(mvar) = R2;
 
-#if defined(THREADED_RTS)
        unlockClosure(mvar, stg_MVAR_DIRTY_info);
-#else
-       SET_INFO(mvar,stg_MVAR_DIRTY_info);
-#endif
     }
     
     RET_N(1);
index e981329..6eecfab 100644 (file)
@@ -257,6 +257,12 @@ printClosure( StgClosure *obj )
             debugBelch(")\n"); 
             break;
 
+    case BLACKHOLE:
+            debugBelch("BLACKHOLE("); 
+            printPtr((StgPtr)((StgInd*)obj)->indirectee);
+            debugBelch(")\n"); 
+            break;
+
     /* Cannot happen -- use default case.
     case RET_BCO:
     case RET_SMALL:
@@ -296,14 +302,6 @@ printClosure( StgClosure *obj )
             break;
         }
 
-    case CAF_BLACKHOLE:
-            debugBelch("CAF_BH"); 
-            break;
-
-    case BLACKHOLE:
-            debugBelch("BH\n"); 
-            break;
-
     case ARR_WORDS:
         {
             StgWord i;
@@ -1122,8 +1120,8 @@ char *closure_type_names[] = {
  [UPDATE_FRAME]          = "UPDATE_FRAME",
  [CATCH_FRAME]           = "CATCH_FRAME",
  [STOP_FRAME]            = "STOP_FRAME",
- [CAF_BLACKHOLE]         = "CAF_BLACKHOLE",
  [BLACKHOLE]             = "BLACKHOLE",
+ [BLOCKING_QUEUE]        = "BLOCKING_QUEUE",
  [MVAR_CLEAN]            = "MVAR_CLEAN",
  [MVAR_DIRTY]            = "MVAR_DIRTY",
  [ARR_WORDS]             = "ARR_WORDS",
index e90051c..4a2816c 100644 (file)
@@ -878,8 +878,8 @@ heapCensusChain( Census *census, bdescr *bd )
            case IND_PERM:
            case IND_OLDGEN:
            case IND_OLDGEN_PERM:
-           case CAF_BLACKHOLE:
            case BLACKHOLE:
+           case BLOCKING_QUEUE:
            case FUN_1_0:
            case FUN_0_1:
            case FUN_1_1:
index d02a256..d5a4918 100644 (file)
@@ -18,6 +18,7 @@
 #include "STM.h"
 #include "sm/Sanity.h"
 #include "Profiling.h"
+#include "Messages.h"
 #if defined(mingw32_HOST_OS)
 #include "win32/IOManager.h"
 #endif
@@ -66,13 +67,12 @@ void
 throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception, 
                       rtsBool stop_at_atomically)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
     }
-    while (tso->what_next == ThreadRelocated) {
-       tso = tso->_link;
-    }
 
     // Remove it from any blocking queues
     removeFromQueues(cap,tso);
@@ -83,13 +83,12 @@ throwToSingleThreaded_(Capability *cap, StgTSO *tso, StgClosure *exception,
 void
 suspendComputation(Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
 {
+    tso = deRefTSO(tso);
+
     // Thread already dead?
     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
        return;
     }
-    while (tso->what_next == ThreadRelocated) {
-       tso = tso->_link;
-    }
 
     // Remove it from any blocking queues
     removeFromQueues(cap,tso);
@@ -164,7 +163,7 @@ throwTo (Capability *cap,   // the Capability we hold
     msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
     // message starts locked; the caller has to unlock it when it is
     // ready.
-    msg->header.info = &stg_WHITEHOLE_info;
+    SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
     msg->source      = source;
     msg->target      = target;
     msg->exception   = exception;
@@ -185,14 +184,24 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
 {
     StgWord status;
     StgTSO *target = msg->target;
+    Capability *target_cap;
 
+    goto check_target;
+
+retry:
+    write_barrier();
+    debugTrace(DEBUG_sched, "throwTo: retrying...");
+
+check_target:
     ASSERT(target != END_TSO_QUEUE);
 
     // follow ThreadRelocated links in the target first
-    while (target->what_next == ThreadRelocated) {
-       target = target->_link;
-       // No, it might be a WHITEHOLE:
-       // ASSERT(get_itbl(target)->type == TSO);
+    target = deRefTSO(target);
+
+    // Thread already dead?
+    if (target->what_next == ThreadComplete 
+       || target->what_next == ThreadKilled) {
+       return THROWTO_SUCCESS;
     }
 
     debugTraceCap(DEBUG_sched, cap,
@@ -204,18 +213,10 @@ throwToMsg (Capability *cap, MessageThrowTo *msg)
     traceThreadStatus(DEBUG_sched, target);
 #endif
 
-    goto check_target;
-retry:
-    write_barrier();
-    debugTrace(DEBUG_sched, "throwTo: retrying...");
-
-check_target:
-    ASSERT(target != END_TSO_QUEUE);
-
-    // Thread already dead?
-    if (target->what_next == ThreadComplete 
-       || target->what_next == ThreadKilled) {
-       return THROWTO_SUCCESS;
+    target_cap = target->cap;
+    if (target->cap != cap) {
+        throwToSendMsg(cap, target_cap, msg);
+        return THROWTO_BLOCKED;
     }
 
     status = target->why_blocked;
@@ -282,28 +283,19 @@ check_target:
            have also seen the write to Q.
        */
     {
-       Capability *target_cap;
-
        write_barrier();
-       target_cap = target->cap;
-       if (target_cap != cap) {
-            throwToSendMsg(cap, target_cap, msg);
-            return THROWTO_BLOCKED;
+        if ((target->flags & TSO_BLOCKEX) == 0) {
+            // It's on our run queue and not blocking exceptions
+            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+            return THROWTO_SUCCESS;
         } else {
-            if ((target->flags & TSO_BLOCKEX) == 0) {
-                // It's on our run queue and not blocking exceptions
-                raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-                return THROWTO_SUCCESS;
-            } else {
-                blockedThrowTo(cap,target,msg);
-                return THROWTO_BLOCKED;
-            }
+            blockedThrowTo(cap,target,msg);
+            return THROWTO_BLOCKED;
         }
     }
 
     case BlockedOnMsgThrowTo:
     {
-        Capability *target_cap;
         const StgInfoTable *i;
         MessageThrowTo *m;
 
@@ -340,13 +332,6 @@ check_target:
             goto retry;
         }
 
-        target_cap = target->cap;
-        if (target_cap != cap) {
-            unlockClosure((StgClosure*)m, i);
-            throwToSendMsg(cap, target_cap, msg);
-            return THROWTO_BLOCKED;
-        }
-
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
             unlockClosure((StgClosure*)m, i);
@@ -358,7 +343,6 @@ check_target:
         unlockClosure((StgClosure*)m, &stg_IND_info);
 
         raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-        unblockOne(cap, target);
         return THROWTO_SUCCESS;
     }
 
@@ -400,48 +384,30 @@ check_target:
 
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-            Capability *target_cap = target->cap;
-            if (target->cap != cap) {
-                throwToSendMsg(cap,target_cap,msg);
-            } else {
-                blockedThrowTo(cap,target,msg);
-            }
+            blockedThrowTo(cap,target,msg);
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_BLOCKED;
        } else {
            removeThreadFromMVarQueue(cap, mvar, target);
            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           unlockClosure((StgClosure *)mvar, info);
+            if (info == &stg_MVAR_CLEAN_info) {
+                dirty_MVAR(&cap->r,(StgClosure*)mvar);
+            }
+           unlockClosure((StgClosure *)mvar, &stg_MVAR_DIRTY_info);
            return THROWTO_SUCCESS;
        }
     }
 
     case BlockedOnBlackHole:
     {
-       ACQUIRE_LOCK(&sched_mutex);
-       // double checking the status after the memory barrier:
-       if (target->why_blocked != BlockedOnBlackHole) {
-           RELEASE_LOCK(&sched_mutex);
-           goto retry;
-       }
-
-       if (target->flags & TSO_BLOCKEX) {
-            Capability *target_cap = target->cap;
-            if (target->cap != cap) {
-                throwToSendMsg(cap,target_cap,msg);
-            } else {
-                blockedThrowTo(cap,target,msg);
-            }
-           RELEASE_LOCK(&sched_mutex);
-           return THROWTO_BLOCKED; // caller releases lock
-       } else {
-           removeThreadFromQueue(cap, &blackhole_queue, target);
-           raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-           unblockOne(cap, target);
-           RELEASE_LOCK(&sched_mutex);
-           return THROWTO_SUCCESS;
-       }
+        // Revoke the message by replacing it with IND. We're not
+        // locking anything here, so we might still get a TRY_WAKEUP
+        // message from the owner of the blackhole some time in the
+        // future, but that doesn't matter.
+        ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
+        OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
+        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
+        return THROWTO_SUCCESS;
     }
 
     case BlockedOnSTM:
@@ -454,35 +420,19 @@ check_target:
        }
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
-            Capability *target_cap = target->cap;
-            if (target->cap != cap) {
-                throwToSendMsg(cap,target_cap,msg);
-            } else {
-                blockedThrowTo(cap,target,msg);
-            }
+            blockedThrowTo(cap,target,msg);
            unlockTSO(target);
            return THROWTO_BLOCKED;
        } else {
            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
-           unblockOne(cap, target);
            unlockTSO(target);
            return THROWTO_SUCCESS;
        }
 
     case BlockedOnCCall:
     case BlockedOnCCall_NoUnblockExc:
-    {
-        Capability *target_cap;
-
-        target_cap = target->cap;
-        if (target_cap != cap) {
-            throwToSendMsg(cap, target_cap, msg);
-            return THROWTO_BLOCKED;
-        }
-
        blockedThrowTo(cap,target,msg);
        return THROWTO_BLOCKED;
-    }
 
 #ifndef THREADEDED_RTS
     case BlockedOnRead:
@@ -515,9 +465,9 @@ throwToSendMsg (Capability *cap STG_UNUSED,
             
 {
 #ifdef THREADED_RTS
-    debugTrace(DEBUG_sched, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
+    debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
 
-    sendMessage(target_cap, (Message*)msg);
+    sendMessage(cap, target_cap, (Message*)msg);
 #endif
 }
 
@@ -532,7 +482,7 @@ blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
 
     ASSERT(target->cap == cap);
 
-    msg->link = (Message*)target->blocked_exceptions;
+    msg->link = target->blocked_exceptions;
     target->blocked_exceptions = msg;
     dirty_TSO(cap,target); // we modified the blocked_exceptions queue
 }
@@ -571,7 +521,7 @@ maybePerformBlockedException (Capability *cap, StgTSO *tso)
 
     if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && 
         (tso->flags & TSO_BLOCKEX) != 0) {
-        debugTrace(DEBUG_sched, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
+        debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
     }
 
     if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
@@ -664,16 +614,18 @@ removeFromQueues(Capability *cap, StgTSO *tso)
 
   case BlockedOnMVar:
       removeThreadFromMVarQueue(cap, (StgMVar *)tso->block_info.closure, tso);
+      // we aren't doing a write barrier here: the MVar is supposed to
+      // be already locked, so replacing the info pointer would unlock it.
       goto done;
 
   case BlockedOnBlackHole:
-      removeThreadFromQueue(cap, &blackhole_queue, tso);
+      // nothing to do
       goto done;
 
   case BlockedOnMsgWakeup:
   {
       // kill the message, atomically:
-      tso->block_info.wakeup->header.info = &stg_IND_info;
+      OVERWRITE_INFO(tso->block_info.wakeup, &stg_IND_info);
       break;
   }
 
@@ -725,7 +677,8 @@ removeFromQueues(Capability *cap, StgTSO *tso)
  * asynchronous exception in an existing thread.
  *
  * We first remove the thread from any queue on which it might be
- * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
+ * blocked.  The possible blockages are MVARs, BLOCKING_QUEUESs, and
+ * TSO blocked_exception queues.
  *
  * We strip the stack down to the innermost CATCH_FRAME, building
  * thunks in the heap for all the active computations, so they can 
@@ -764,8 +717,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
     StgClosure *updatee;
     nat i;
 
-    debugTrace(DEBUG_sched,
-              "raising exception in thread %ld.", (long)tso->id);
+    debugTraceCap(DEBUG_sched, cap,
+                  "raising exception in thread %ld.", (long)tso->id);
     
 #if defined(PROFILING)
     /* 
@@ -784,6 +737,15 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
            tso->what_next != ThreadKilled && 
            tso->what_next != ThreadRelocated);
 
+    // only if we own this TSO (except that deleteThread() calls this 
+    ASSERT(tso->cap == cap);
+
+    // wake it up
+    if (tso->why_blocked != NotBlocked && tso->why_blocked != BlockedOnMsgWakeup) {
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap,tso);
+    }        
+
     // mark it dirty; we're about to change its stack.
     dirty_TSO(cap, tso);
 
@@ -871,7 +833,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                 // Perform the update
                 // TODO: this may waste some work, if the thunk has
                 // already been updated by another thread.
-                UPD_IND(cap, ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
+                updateThunk(cap, tso, 
+                            ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
             }
 
            sp += sizeofW(StgUpdateFrame) - 1;
@@ -963,8 +926,8 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
                {
             StgTRecHeader *trec = tso -> trec;
             StgTRecHeader *outer = trec -> enclosing_trec;
-           debugTrace(DEBUG_stm, 
-                      "found atomically block delivering async exception");
+           debugTraceCap(DEBUG_stm, cap,
+                          "found atomically block delivering async exception");
             stmAbortTransaction(cap, trec);
            stmFreeAbortedTRec(cap, trec);
             tso -> trec = outer;
index ba4d146..b5db15a 100644 (file)
@@ -453,8 +453,6 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
        // no child, no SRT
     case CONSTR_0_1:
     case CONSTR_0_2:
-    case CAF_BLACKHOLE:
-    case BLACKHOLE:
     case ARR_WORDS:
        *first_child = NULL;
        return;
@@ -470,6 +468,7 @@ push( StgClosure *c, retainer c_child_r, StgClosure **first_child )
     case IND_PERM:
     case IND_OLDGEN_PERM:
     case IND_OLDGEN:
+    case BLACKHOLE:
        *first_child = ((StgInd *)c)->indirectee;
        return;
     case CONSTR_1_0:
@@ -916,8 +915,6 @@ pop( StgClosure **c, StgClosure **cp, retainer *r )
            // no child (fixed), no SRT
        case CONSTR_0_1:
        case CONSTR_0_2:
-       case CAF_BLACKHOLE:
-       case BLACKHOLE:
        case ARR_WORDS:
            // one child (fixed), no SRT
        case MUT_VAR_CLEAN:
@@ -1059,13 +1056,11 @@ isRetainer( StgClosure *c )
     case FUN_0_2:
        // partial applications
     case PAP:
-       // blackholes
-    case CAF_BLACKHOLE:
-    case BLACKHOLE:
        // indirection
     case IND_PERM:
     case IND_OLDGEN_PERM:
     case IND_OLDGEN:
+    case BLACKHOLE:
        // static objects
     case CONSTR_STATIC:
     case FUN_STATIC:
index be61538..f98d201 100644 (file)
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -377,7 +377,7 @@ static void unpark_tso(Capability *cap, StgTSO *tso) {
     lockTSO(tso);
     if (tso -> why_blocked == BlockedOnSTM) {
        TRACE("unpark_tso on tso=%p", tso);
-       unblockOne(cap,tso);
+        tryWakeupThread(cap,tso);
     } else {
        TRACE("spurious unpark_tso on tso=%p", tso);
     }
index f3982b1..72f6d44 100644 (file)
@@ -39,6 +39,7 @@
 #include "Threads.h"
 #include "Timer.h"
 #include "ThreadPaused.h"
+#include "Messages.h"
 
 #ifdef HAVE_SYS_TYPES_H
 #include <sys/types.h>
@@ -66,17 +67,6 @@ StgTSO *blocked_queue_tl = NULL;
 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
 #endif
 
-/* Threads blocked on blackholes.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *blackhole_queue = NULL;
-
-/* The blackhole_queue should be checked for threads to wake up.  See
- * Schedule.h for more thorough comment.
- * LOCK: none (doesn't matter if we miss an update)
- */
-rtsBool blackholes_need_checking = rtsFalse;
-
 /* Set to true when the latest garbage collection failed to reclaim
  * enough space, and the runtime should proceed to shut itself down in
  * an orderly fashion (emitting profiling info etc.)
@@ -140,7 +130,6 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleProcessInbox(Capability *cap);
-static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 static void schedulePushWork(Capability *cap, Task *task);
 #if defined(THREADED_RTS)
@@ -159,8 +148,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
 static Capability *scheduleDoGC(Capability *cap, Task *task,
                                rtsBool force_major);
 
-static rtsBool checkBlackHoles(Capability *cap);
-
 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
 static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
 
@@ -433,9 +420,6 @@ run_thread:
 
     startHeapProfTimer();
 
-    // Check for exceptions blocked on this thread
-    maybePerformBlockedException (cap, t);
-
     // ----------------------------------------------------------------------
     // Run the current thread 
 
@@ -506,13 +490,6 @@ run_thread:
     // happened.  So find the new location:
     t = cap->r.rCurrentTSO;
 
-    // We have run some Haskell code: there might be blackhole-blocked
-    // threads to wake up now.
-    // Lock-free test here should be ok, we're just setting a flag.
-    if ( blackhole_queue != END_TSO_QUEUE ) {
-       blackholes_need_checking = rtsTrue;
-    }
-    
     // And save the current errno in this thread.
     // XXX: possibly bogus for SMP because this thread might already
     // be running again, see code below.
@@ -612,12 +589,6 @@ scheduleFindWork (Capability *cap)
 {
     scheduleStartSignalHandlers(cap);
 
-    // Only check the black holes here if we've nothing else to do.
-    // During normal execution, the black hole list only gets checked
-    // at GC time, to avoid repeatedly traversing this possibly long
-    // list each time around the scheduler.
-    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
     scheduleProcessInbox(cap);
 
     scheduleCheckBlockedThreads(cap);
@@ -674,7 +645,6 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
         !shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
          !emptyInbox(cap) ||
-         blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
 
@@ -863,125 +833,11 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
     //
     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
     {
-       awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
+       awaitEvent (emptyRunQueue(cap));
     }
 #endif
 }
 
-
-/* ----------------------------------------------------------------------------
- * Check for threads woken up by other Capabilities
- * ------------------------------------------------------------------------- */
-
-#if defined(THREADED_RTS)
-static void
-executeMessage (Capability *cap, Message *m)
-{
-    const StgInfoTable *i;
-
-loop:
-    write_barrier(); // allow m->header to be modified by another thread
-    i = m->header.info;
-    if (i == &stg_MSG_WAKEUP_info)
-    {
-        MessageWakeup *w = (MessageWakeup *)m;
-        StgTSO *tso = w->tso;
-        debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
-                      (lnat)tso->id);
-        ASSERT(tso->cap == cap);
-        ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
-        ASSERT(tso->block_info.closure == (StgClosure *)m);
-        tso->why_blocked = NotBlocked;
-        appendToRunQueue(cap, tso);
-    }
-    else if (i == &stg_MSG_THROWTO_info)
-    {
-        MessageThrowTo *t = (MessageThrowTo *)m;
-        nat r;
-        const StgInfoTable *i;
-
-        i = lockClosure((StgClosure*)m);
-        if (i != &stg_MSG_THROWTO_info) {
-            unlockClosure((StgClosure*)m, i);
-            goto loop;
-        }
-
-        debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
-                      (lnat)t->source->id, (lnat)t->target->id);
-
-        ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
-        ASSERT(t->source->block_info.closure == (StgClosure *)m);
-
-        r = throwToMsg(cap, t);
-
-        switch (r) {
-        case THROWTO_SUCCESS:
-            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
-            t->source->sp += 3;
-            unblockOne(cap, t->source);
-            // this message is done
-            unlockClosure((StgClosure*)m, &stg_IND_info);
-            break;
-        case THROWTO_BLOCKED:
-            // unlock the message
-            unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
-            break;
-        }
-    }
-    else if (i == &stg_IND_info)
-    {
-        // message was revoked
-        return;
-    }
-    else if (i == &stg_WHITEHOLE_info)
-    {
-        goto loop;
-    }
-    else
-    {
-        barf("executeMessage: %p", i);
-    }
-}
-#endif
-
-static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
-    Message *m;
-
-    while (!emptyInbox(cap)) {
-        ACQUIRE_LOCK(&cap->lock);
-        m = cap->inbox;
-        cap->inbox = m->link;
-        RELEASE_LOCK(&cap->lock);
-        executeMessage(cap, (Message *)m);
-    }
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- * Check for threads blocked on BLACKHOLEs that can be woken up
- * ------------------------------------------------------------------------- */
-static void
-scheduleCheckBlackHoles (Capability *cap)
-{
-    if ( blackholes_need_checking ) // check without the lock first
-    {
-       ACQUIRE_LOCK(&sched_mutex);
-       if ( blackholes_need_checking ) {
-           blackholes_need_checking = rtsFalse;
-            // important that we reset the flag *before* checking the
-            // blackhole queue, otherwise we could get deadlock.  This
-            // happens as follows: we wake up a thread that
-            // immediately runs on another Capability, blocks on a
-            // blackhole, and then we reset the blackholes_need_checking flag.
-           checkBlackHoles(cap);
-       }
-       RELEASE_LOCK(&sched_mutex);
-    }
-}
-
 /* ----------------------------------------------------------------------------
  * Detect deadlock conditions and attempt to resolve them.
  * ------------------------------------------------------------------------- */
@@ -1090,6 +946,26 @@ scheduleSendPendingMessages(void)
 #endif
 
 /* ----------------------------------------------------------------------------
+ * Process message in the current Capability's inbox
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    Message *m;
+
+    while (!emptyInbox(cap)) {
+        ACQUIRE_LOCK(&cap->lock);
+        m = cap->inbox;
+        cap->inbox = m->link;
+        RELEASE_LOCK(&cap->lock);
+        executeMessage(cap, (Message *)m);
+    }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
  * ------------------------------------------------------------------------- */
 
@@ -1499,9 +1375,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
     }
 
-    // do this while the other Capabilities stop:
-    if (cap) scheduleCheckBlackHoles(cap);
-
     if (gc_type == PENDING_GC_SEQ)
     {
         // single-threaded GC: grab all the capabilities
@@ -1529,11 +1402,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
         waitForGcThreads(cap);
     }
 
-#else /* !THREADED_RTS */
-
-    // do this while the other Capabilities stop:
-    if (cap) scheduleCheckBlackHoles(cap);
-
 #endif
 
     IF_DEBUG(scheduler, printAllThreads());
@@ -2093,8 +1961,6 @@ initScheduler(void)
   sleeping_queue    = END_TSO_QUEUE;
 #endif
 
-  blackhole_queue   = END_TSO_QUEUE;
-
   sched_state    = SCHED_RUNNING;
   recent_activity = ACTIVITY_YES;
 
@@ -2347,8 +2213,9 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
    * of the stack, so we don't attempt to scavenge any part of the
    * dead TSO's stack.
    */
-  tso->what_next = ThreadRelocated;
   setTSOLink(cap,tso,dest);
+  write_barrier(); // other threads seeing ThreadRelocated will look at _link
+  tso->what_next = ThreadRelocated;
   tso->sp = (P_)&(tso->stack[tso->stack_size]);
   tso->why_blocked = NotBlocked;
 
@@ -2405,8 +2272,9 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
 
-    tso->what_next = ThreadRelocated;
     tso->_link = new_tso; // no write barrier reqd: same generation
+    write_barrier(); // other threads seeing ThreadRelocated will look at _link
+    tso->what_next = ThreadRelocated;
 
     // The TSO attached to this Task may have moved, so update the
     // pointer to it.
@@ -2458,57 +2326,6 @@ void wakeUpRts(void)
 #endif
 
 /* -----------------------------------------------------------------------------
- * checkBlackHoles()
- *
- * Check the blackhole_queue for threads that can be woken up.  We do
- * this periodically: before every GC, and whenever the run queue is
- * empty.
- *
- * An elegant solution might be to just wake up all the blocked
- * threads with awakenBlockedQueue occasionally: they'll go back to
- * sleep again if the object is still a BLACKHOLE.  Unfortunately this
- * doesn't give us a way to tell whether we've actually managed to
- * wake up any threads, so we would be busy-waiting.
- *
- * -------------------------------------------------------------------------- */
-
-static rtsBool
-checkBlackHoles (Capability *cap)
-{
-    StgTSO **prev, *t;
-    rtsBool any_woke_up = rtsFalse;
-    StgHalfWord type;
-
-    // blackhole_queue is global:
-    ASSERT_LOCK_HELD(&sched_mutex);
-
-    debugTrace(DEBUG_sched, "checking threads blocked on black holes");
-
-    // ASSUMES: sched_mutex
-    prev = &blackhole_queue;
-    t = blackhole_queue;
-    while (t != END_TSO_QUEUE) {
-        if (t->what_next == ThreadRelocated) {
-            t = t->_link;
-            continue;
-        }
-       ASSERT(t->why_blocked == BlockedOnBlackHole);
-       type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
-       if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
-           IF_DEBUG(sanity,checkTSO(t));
-           t = unblockOne(cap, t);
-           *prev = t;
-           any_woke_up = rtsTrue;
-       } else {
-           prev = &t->_link;
-           t = t->_link;
-       }
-    }
-
-    return any_woke_up;
-}
-
-/* -----------------------------------------------------------------------------
    Deleting threads
 
    This is used for interruption (^C) and forking, and corresponds to
@@ -2517,7 +2334,7 @@ checkBlackHoles (Capability *cap)
    -------------------------------------------------------------------------- */
 
 static void 
-deleteThread (Capability *cap, StgTSO *tso)
+deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
 {
     // NOTE: must only be called on a TSO that we have exclusive
     // access to, because we will call throwToSingleThreaded() below.
@@ -2526,7 +2343,7 @@ deleteThread (Capability *cap, StgTSO *tso)
 
     if (tso->why_blocked != BlockedOnCCall &&
        tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
-       throwToSingleThreaded(cap,tso,NULL);
+       throwToSingleThreaded(tso->cap,tso,NULL);
     }
 }
 
@@ -2599,8 +2416,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
                SET_HDR(raise_closure, &stg_raise_info, CCCS);
                raise_closure->payload[0] = exception;
            }
-           UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
-                    (StgClosure *)raise_closure);
+            updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
+                        (StgClosure *)raise_closure);
            p = next;
            continue;
 
index 2412285..0db2b1e 100644 (file)
@@ -86,15 +86,6 @@ extern  StgTSO *blocked_queue_hd, *blocked_queue_tl;
 extern  StgTSO *sleeping_queue;
 #endif
 
-/* Set to rtsTrue if there are threads on the blackhole_queue, and
- * it is possible that one or more of them may be available to run.
- * This flag is set to rtsFalse after we've checked the queue, and
- * set to rtsTrue just before we run some Haskell code.  It is used
- * to decide whether we should yield the Capability or not.
- * Locks required  : none (see scheduleCheckBlackHoles()).
- */
-extern rtsBool blackholes_need_checking;
-
 extern rtsBool heap_overflow;
 
 #if defined(THREADED_RTS)
index f111875..830bde5 100644 (file)
@@ -283,96 +283,105 @@ INFO_TABLE(stg_IND_OLDGEN_PERM,1,0,IND_OLDGEN_PERM,"IND_OLDGEN_PERM","IND_OLDGEN
    waiting for the evaluation of the closure to finish.
    ------------------------------------------------------------------------- */
 
-/* Note: a BLACKHOLE must be big enough to be
- * overwritten with an indirection/evacuee/catch.  Thus we claim it
- * has 1 non-pointer word of payload. 
- */
-INFO_TABLE(stg_BLACKHOLE,0,1,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
+INFO_TABLE(stg_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
 {
-    TICK_ENT_BH();
-
-#ifdef THREADED_RTS
-    // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
-    /* Actually this is not necessary because R1 is about to be destroyed. */
-    LDV_ENTER(R1);
+    W_ r, p, info, bq, msg, owner, bd;
 
-#if defined(THREADED_RTS)
-    ACQUIRE_LOCK(sched_mutex "ptr");
-    // released in stg_block_blackhole_finally
-#endif
-
-    /* Put ourselves on the blackhole queue */
-    StgTSO__link(CurrentTSO) = W_[blackhole_queue];
-    W_[blackhole_queue] = CurrentTSO;
-
-    /* jot down why and on what closure we are blocked */
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
-    StgTSO_block_info(CurrentTSO) = R1;
+    TICK_ENT_DYN_IND();        /* tick */
 
-    jump stg_block_blackhole;
+retry:
+    p = StgInd_indirectee(R1);
+    if (GETTAG(p) != 0) {
+        R1 = p;
+        jump %ENTRY_CODE(Sp(0));
+    }
+
+    info = StgHeader_info(p);
+    if (info == stg_IND_info) {
+        // This could happen, if e.g. we got a BLOCKING_QUEUE that has
+        // just been replaced with an IND by another thread in
+        // wakeBlockingQueue().
+        goto retry;
+    }
+
+    if (info == stg_TSO_info ||
+        info == stg_BLOCKING_QUEUE_CLEAN_info ||
+        info == stg_BLOCKING_QUEUE_DIRTY_info)
+    {
+        ("ptr" msg) = foreign "C" allocate(MyCapability() "ptr", 
+                                           BYTES_TO_WDS(SIZEOF_MessageBlackHole)) [R1];
+        
+        StgHeader_info(msg) = stg_MSG_BLACKHOLE_info;
+        MessageBlackHole_tso(msg) = CurrentTSO;
+        MessageBlackHole_bh(msg) = R1;
+               
+        (r) = foreign "C" messageBlackHole(MyCapability() "ptr", msg "ptr") [R1];
+
+        if (r == 0) {
+            goto retry;
+        } else {
+            StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
+            StgTSO_block_info(CurrentTSO) = msg;
+            jump stg_block_blackhole;
+        }
+    }
+    else
+    {
+        R1 = p;
+        ENTER();
+    }
 }
 
-/* identical to BLACKHOLEs except for the infotag */
-INFO_TABLE(stg_CAF_BLACKHOLE,0,1,CAF_BLACKHOLE,"CAF_BLACKHOLE","CAF_BLACKHOLE")
+INFO_TABLE(__stg_EAGER_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
 {
-    TICK_ENT_BH();
-    LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
-    // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
-#if defined(THREADED_RTS)
-    ACQUIRE_LOCK(sched_mutex "ptr");
-    // released in stg_block_blackhole_finally
-#endif
-
-    /* Put ourselves on the blackhole queue */
-    StgTSO__link(CurrentTSO) = W_[blackhole_queue];
-    W_[blackhole_queue] = CurrentTSO;
-
-    /* jot down why and on what closure we are blocked */
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
-    StgTSO_block_info(CurrentTSO) = R1;
-
-    jump stg_block_blackhole;
+    jump ENTRY_LBL(stg_BLACKHOLE);
 }
 
-INFO_TABLE(__stg_EAGER_BLACKHOLE,0,1,BLACKHOLE,"EAGER_BLACKHOLE","EAGER_BLACKHOLE")
+// CAF_BLACKHOLE is allocated when entering a CAF.  The reason it is
+// distinct from BLACKHOLE is so that we can tell the difference
+// between an update frame on the stack that points to a CAF under
+// evaluation, and one that points to a closure that is under
+// evaluation by another thread (a BLACKHOLE).  See threadPaused().
+//
+INFO_TABLE(stg_CAF_BLACKHOLE,1,0,BLACKHOLE,"BLACKHOLE","BLACKHOLE")
 {
-    TICK_ENT_BH();
-
-#ifdef THREADED_RTS
-    // foreign "C" debugBelch("BLACKHOLE entry\n");
-#endif
-
-    /* Actually this is not necessary because R1 is about to be destroyed. */
-    LDV_ENTER(R1);
-
-#if defined(THREADED_RTS)
-    ACQUIRE_LOCK(sched_mutex "ptr");
-    // released in stg_block_blackhole_finally
-#endif
-
-    /* Put ourselves on the blackhole queue */
-    StgTSO__link(CurrentTSO) = W_[blackhole_queue];
-    W_[blackhole_queue] = CurrentTSO;
+    jump ENTRY_LBL(stg_BLACKHOLE);
+}
 
-    /* jot down why and on what closure we are blocked */
-    StgTSO_why_blocked(CurrentTSO) = BlockedOnBlackHole::I16;
-    StgTSO_block_info(CurrentTSO) = R1;
+INFO_TABLE(stg_BLOCKING_QUEUE_CLEAN,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_CLEAN object entered!") never returns; }
+    
 
-    jump stg_block_blackhole;
-}
+INFO_TABLE(stg_BLOCKING_QUEUE_DIRTY,4,0,BLOCKING_QUEUE,"BLOCKING_QUEUE","BLOCKING_QUEUE")
+{ foreign "C" barf("BLOCKING_QUEUE_DIRTY object entered!") never returns; }
+    
 
 /* ----------------------------------------------------------------------------
    Whiteholes are used for the "locked" state of a closure (see lockClosure())
    ------------------------------------------------------------------------- */
 
 INFO_TABLE(stg_WHITEHOLE, 0,0, WHITEHOLE, "WHITEHOLE", "WHITEHOLE")
-{ foreign "C" barf("WHITEHOLE object entered!") never returns; }
+{ 
+#if defined(THREADED_RTS)
+    W_ info, i;
+
+    i = 0;
+loop:
+    // spin until the WHITEHOLE is updated
+    info = StgHeader_info(R1);
+    if (info == stg_WHITEHOLE_info) {
+        i = i + 1;
+        if (i == SPIN_COUNT) {
+            i = 0;
+            foreign "C" yieldThread() [R1];
+        }
+        goto loop;
+    }
+    jump %ENTRY_CODE(info);
+#else
+    foreign "C" barf("WHITEHOLE object entered!") never returns;
+#endif
+}
 
 /* ----------------------------------------------------------------------------
    Some static info tables for things that don't get entered, and
@@ -485,9 +494,15 @@ CLOSURE(stg_NO_TREC_closure,stg_NO_TREC);
 INFO_TABLE_CONSTR(stg_MSG_WAKEUP,2,0,0,PRIM,"MSG_WAKEUP","MSG_WAKEUP")
 { foreign "C" barf("MSG_WAKEUP object entered!") never returns; }
 
+INFO_TABLE_CONSTR(stg_MSG_TRY_WAKEUP,2,0,0,PRIM,"MSG_TRY_WAKEUP","MSG_TRY_WAKEUP")
+{ foreign "C" barf("MSG_TRY_WAKEUP object entered!") never returns; }
+
 INFO_TABLE_CONSTR(stg_MSG_THROWTO,4,0,0,PRIM,"MSG_THROWTO","MSG_THROWTO")
 { foreign "C" barf("MSG_THROWTO object entered!") never returns; }
 
+INFO_TABLE_CONSTR(stg_MSG_BLACKHOLE,3,0,0,PRIM,"MSG_BLACKHOLE","MSG_BLACKHOLE")
+{ foreign "C" barf("MSG_BLACKHOLE object entered!") never returns; }
+
 /* ----------------------------------------------------------------------------
    END_TSO_QUEUE
 
index 75712b0..7aee59d 100644 (file)
@@ -14,6 +14,7 @@
 #include "Updates.h"
 #include "RaiseAsync.h"
 #include "Trace.h"
+#include "Threads.h"
 
 #include <string.h> // for memmove()
 
@@ -75,7 +76,7 @@ stackSqueeze(Capability *cap, StgTSO *tso, StgPtr bottom)
                 * screw us up if we don't check.
                 */
                if (upd->updatee != updatee && !closure_IND(upd->updatee)) {
-                   UPD_IND(cap, upd->updatee, updatee);
+                    updateThunk(cap, tso, upd->updatee, updatee);
                }
 
                // now mark this update frame as a stack gap.  The gap
@@ -196,7 +197,7 @@ threadPaused(Capability *cap, StgTSO *tso)
     maybePerformBlockedException (cap, tso);
     if (tso->what_next == ThreadKilled) { return; }
 
-    // NB. Blackholing is *not* optional, we must either do lazy
+    // NB. Blackholing is *compulsory*, we must either do lazy
     // blackholing, or eager blackholing consistently.  See Note
     // [upd-black-hole] in sm/Scav.c.
 
@@ -229,8 +230,9 @@ threadPaused(Capability *cap, StgTSO *tso)
 #ifdef THREADED_RTS
         retry:
 #endif
-           if (closure_flags[INFO_PTR_TO_STRUCT(bh_info)->type] & _IND
-                || bh_info == &stg_BLACKHOLE_info) {
+           if (bh_info == &stg_BLACKHOLE_info ||
+                bh_info == &stg_WHITEHOLE_info)
+            {
                debugTrace(DEBUG_squeeze,
                           "suspending duplicate work: %ld words of stack",
                           (long)((StgPtr)frame - tso->sp));
@@ -245,6 +247,7 @@ threadPaused(Capability *cap, StgTSO *tso)
                // the value to the frame underneath:
                tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2;
                tso->sp[1] = (StgWord)bh;
+                ASSERT(bh->header.info != &stg_TSO_info);
                tso->sp[0] = (W_)&stg_enter_info;
 
                // And continue with threadPaused; there might be
@@ -254,33 +257,40 @@ threadPaused(Capability *cap, StgTSO *tso)
                 continue;
            }
 
-           if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
-               // zero out the slop so that the sanity checker can tell
-               // where the next closure is.
-               DEBUG_FILL_SLOP(bh);
-#ifdef PROFILING
-               // @LDV profiling
-               // We pretend that bh is now dead.
-               LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
-#endif
-                // an EAGER_BLACKHOLE gets turned into a BLACKHOLE here.
+            // zero out the slop so that the sanity checker can tell
+            // where the next closure is.
+            DEBUG_FILL_SLOP(bh);
+
+            // @LDV profiling
+            // We pretend that bh is now dead.
+            LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)bh);
+
+            // an EAGER_BLACKHOLE or CAF_BLACKHOLE gets turned into a
+            // BLACKHOLE here.
 #ifdef THREADED_RTS
-                cur_bh_info = (const StgInfoTable *)
-                    cas((StgVolatilePtr)&bh->header.info, 
-                        (StgWord)bh_info, 
-                        (StgWord)&stg_BLACKHOLE_info);
-
-                if (cur_bh_info != bh_info) {
-                    bh_info = cur_bh_info;
-                    goto retry;
-                }
-#else
-               SET_INFO(bh,&stg_BLACKHOLE_info);
+            // first we turn it into a WHITEHOLE to claim it, and if
+            // successful we write our TSO and then the BLACKHOLE info pointer.
+            cur_bh_info = (const StgInfoTable *)
+                cas((StgVolatilePtr)&bh->header.info, 
+                    (StgWord)bh_info, 
+                    (StgWord)&stg_WHITEHOLE_info);
+            
+            if (cur_bh_info != bh_info) {
+                bh_info = cur_bh_info;
+                goto retry;
+            }
 #endif
 
-               // We pretend that bh has just been created.
-               LDV_RECORD_CREATE(bh);
-           }
+            // The payload of the BLACKHOLE points to the TSO
+            ((StgInd *)bh)->indirectee = (StgClosure *)tso;
+            write_barrier();
+            SET_INFO(bh,&stg_BLACKHOLE_info);
+
+            // .. and we need a write barrier, since we just mutated the closure:
+            recordClosureMutated(cap,bh);
+
+            // We pretend that bh has just been created.
+            LDV_RECORD_CREATE(bh);
            
            frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
            if (prev_was_update_frame) {
index f824d02..0c3e591 100644 (file)
@@ -9,11 +9,16 @@
 #include "PosixSource.h"
 #include "Rts.h"
 
+#include "Capability.h"
+#include "Updates.h"
 #include "Threads.h"
 #include "STM.h"
 #include "Schedule.h"
 #include "Trace.h"
 #include "ThreadLabels.h"
+#include "Updates.h"
+#include "Messages.h"
+#include "sm/Storage.h"
 
 /* Next thread ID to allocate.
  * LOCK: sched_mutex
@@ -74,7 +79,9 @@ createThread(Capability *cap, nat size)
     tso->what_next = ThreadRunGHC;
 
     tso->why_blocked  = NotBlocked;
+    tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
     tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
+    tso->bq = (StgBlockingQueue *)END_TSO_QUEUE;
     tso->flags = 0;
     tso->dirty = 1;
     
@@ -146,7 +153,7 @@ rts_getThreadId(StgPtr tso)
    Fails fatally if the TSO is not on the queue.
    -------------------------------------------------------------------------- */
 
-void
+rtsBool // returns True if we modified queue
 removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
 {
     StgTSO *t, *prev;
@@ -156,28 +163,32 @@ removeThreadFromQueue (Capability *cap, StgTSO **queue, StgTSO *tso)
        if (t == tso) {
            if (prev) {
                setTSOLink(cap,prev,t->_link);
+                return rtsFalse;
            } else {
                *queue = t->_link;
+                return rtsTrue;
            }
-           return;
        }
     }
     barf("removeThreadFromQueue: not found");
 }
 
-void
+rtsBool // returns True if we modified head or tail
 removeThreadFromDeQueue (Capability *cap, 
                          StgTSO **head, StgTSO **tail, StgTSO *tso)
 {
     StgTSO *t, *prev;
+    rtsBool flag = rtsFalse;
 
     prev = NULL;
     for (t = *head; t != END_TSO_QUEUE; prev = t, t = t->_link) {
        if (t == tso) {
            if (prev) {
                setTSOLink(cap,prev,t->_link);
+                flag = rtsFalse;
            } else {
                *head = t->_link;
+                flag = rtsTrue;
            }
            if (*tail == tso) {
                if (prev) {
@@ -185,8 +196,10 @@ removeThreadFromDeQueue (Capability *cap,
                } else {
                    *tail = END_TSO_QUEUE;
                }
-           }
-           return;
+                return rtsTrue;
+           } else {
+                return flag;
+            }
        }
     }
     barf("removeThreadFromMVarQueue: not found");
@@ -195,7 +208,10 @@ removeThreadFromDeQueue (Capability *cap,
 void
 removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso)
 {
+    // caller must do the write barrier, because replacing the info
+    // pointer will unlock the MVar.
     removeThreadFromDeQueue (cap, &mvar->head, &mvar->tail, tso);
+    tso->_link = END_TSO_QUEUE;
 }
 
 /* ----------------------------------------------------------------------------
@@ -263,6 +279,38 @@ unblockOne_ (Capability *cap, StgTSO *tso,
   return next;
 }
 
+void
+tryWakeupThread (Capability *cap, StgTSO *tso)
+{
+#ifdef THREADED_RTS
+    if (tso->cap != cap)
+    {
+        MessageWakeup *msg;
+        msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
+        SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
+        msg->tso = tso;
+        sendMessage(cap, tso->cap, (Message*)msg);
+        return;
+    }
+#endif
+
+    switch (tso->why_blocked)
+    {
+    case BlockedOnBlackHole:
+    case BlockedOnSTM:
+    {
+        // just run the thread now, if the BH is not really available,
+        // we'll block again.
+        tso->why_blocked = NotBlocked;
+        appendToRunQueue(cap,tso);
+        break;
+    }
+    default:
+        // otherwise, do nothing
+        break;
+    }
+}
+
 /* ----------------------------------------------------------------------------
    awakenBlockedQueue
 
@@ -270,13 +318,160 @@ unblockOne_ (Capability *cap, StgTSO *tso,
    ------------------------------------------------------------------------- */
 
 void
-awakenBlockedQueue(Capability *cap, StgTSO *tso)
+wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
 {
-    while (tso != END_TSO_QUEUE) {
-       tso = unblockOne(cap,tso);
+    MessageBlackHole *msg;
+    const StgInfoTable *i;
+
+    ASSERT(bq->header.info == &stg_BLOCKING_QUEUE_DIRTY_info  ||
+           bq->header.info == &stg_BLOCKING_QUEUE_CLEAN_info  );
+
+    for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; 
+         msg = msg->link) {
+        i = msg->header.info;
+        if (i != &stg_IND_info) {
+            ASSERT(i == &stg_MSG_BLACKHOLE_info);
+            tryWakeupThread(cap,msg->tso);
+        }
+    }
+
+    // overwrite the BQ with an indirection so it will be
+    // collected at the next GC.
+#if defined(DEBUG) && !defined(THREADED_RTS)
+    // XXX FILL_SLOP, but not if THREADED_RTS because in that case
+    // another thread might be looking at this BLOCKING_QUEUE and
+    // checking the owner field at the same time.
+    bq->bh = 0; bq->queue = 0; bq->owner = 0;
+#endif
+    OVERWRITE_INFO(bq, &stg_IND_info);
+}
+
+// If we update a closure that we know we BLACKHOLE'd, and the closure
+// no longer points to the current TSO as its owner, then there may be
+// an orphaned BLOCKING_QUEUE closure with blocked threads attached to
+// it.  We therefore traverse the BLOCKING_QUEUEs attached to the
+// current TSO to see if any can now be woken up.
+void
+checkBlockingQueues (Capability *cap, StgTSO *tso)
+{
+    StgBlockingQueue *bq, *next;
+    StgClosure *p;
+
+    debugTraceCap(DEBUG_sched, cap,
+                  "collision occurred; checking blocking queues for thread %ld",
+                  (lnat)tso->id);
+    
+    for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
+        next = bq->link;
+
+        if (bq->header.info == &stg_IND_info) {
+            // ToDo: could short it out right here, to avoid
+            // traversing this IND multiple times.
+            continue;
+        }
+        
+        p = bq->bh;
+
+        if (p->header.info != &stg_BLACKHOLE_info ||
+            ((StgInd *)p)->indirectee != (StgClosure*)bq)
+        {
+            wakeBlockingQueue(cap,bq);
+        }   
     }
 }
 
+/* ----------------------------------------------------------------------------
+   updateThunk
+
+   Update a thunk with a value.  In order to do this, we need to know
+   which TSO owns (or is evaluating) the thunk, in case we need to
+   awaken any threads that are blocked on it.
+   ------------------------------------------------------------------------- */
+
+void
+updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
+{
+    StgClosure *v;
+    StgTSO *owner;
+    const StgInfoTable *i;
+
+    i = thunk->header.info;
+    if (i != &stg_BLACKHOLE_info &&
+        i != &stg_CAF_BLACKHOLE_info &&
+        i != &stg_WHITEHOLE_info) {
+        updateWithIndirection(cap, thunk, val);
+        return;
+    }
+    
+    v = ((StgInd*)thunk)->indirectee;
+
+    updateWithIndirection(cap, thunk, val);
+
+    i = v->header.info;
+    if (i == &stg_TSO_info) {
+        owner = deRefTSO((StgTSO*)v);
+        if (owner != tso) {
+            checkBlockingQueues(cap, tso);
+        }
+        return;
+    }
+
+    if (i != &stg_BLOCKING_QUEUE_CLEAN_info &&
+        i != &stg_BLOCKING_QUEUE_DIRTY_info) {
+        checkBlockingQueues(cap, tso);
+        return;
+    }
+
+    owner = deRefTSO(((StgBlockingQueue*)v)->owner);
+
+    if (owner != tso) {
+        checkBlockingQueues(cap, tso);
+    } else {
+        wakeBlockingQueue(cap, (StgBlockingQueue*)v);
+    }
+}
+
+/* ----------------------------------------------------------------------------
+ * Wake up a thread on a Capability.
+ *
+ * This is used when the current Task is running on a Capability and
+ * wishes to wake up a thread on a different Capability.
+ * ------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
+
+void
+wakeupThreadOnCapability (Capability *cap,
+                          Capability *other_cap, 
+                          StgTSO *tso)
+{
+    MessageWakeup *msg;
+
+    // ASSUMES: cap->lock is held (asserted in wakeupThreadOnCapability)
+    if (tso->bound) {
+       ASSERT(tso->bound->task->cap == tso->cap);
+       tso->bound->task->cap = other_cap;
+    }
+    tso->cap = other_cap;
+
+    ASSERT(tso->why_blocked != BlockedOnMsgWakeup || 
+           tso->block_info.closure->header.info == &stg_IND_info);
+
+    ASSERT(tso->block_info.closure->header.info != &stg_MSG_WAKEUP_info);
+
+    msg = (MessageWakeup*) allocate(cap, sizeofW(MessageWakeup));
+    SET_HDR(msg, &stg_MSG_WAKEUP_info, CCS_SYSTEM);
+    msg->tso = tso;
+    tso->block_info.closure = (StgClosure *)msg;
+    dirty_TSO(cap, tso);
+    write_barrier();
+    tso->why_blocked = BlockedOnMsgWakeup;
+
+    sendMessage(cap, other_cap, (Message*)msg);
+}
+
+#endif /* THREADED_RTS */
+
 /* ---------------------------------------------------------------------------
  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
  * used by Control.Concurrent for error checking.
@@ -332,7 +527,8 @@ printThreadBlockage(StgTSO *tso)
     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
     break;
   case BlockedOnBlackHole:
-    debugBelch("is blocked on a black hole");
+      debugBelch("is blocked on a black hole %p", 
+                 ((StgBlockingQueue*)tso->block_info.bh->bh));
     break;
   case BlockedOnMsgWakeup:
     debugBelch("is blocked on a wakeup message");
index dfe879e..000cf1b 100644 (file)
@@ -16,11 +16,26 @@ BEGIN_RTS_PRIVATE
 StgTSO * unblockOne (Capability *cap, StgTSO *tso);
 StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate);
 
-void awakenBlockedQueue (Capability *cap, StgTSO *tso);
+void checkBlockingQueues (Capability *cap, StgTSO *tso);
+void wakeBlockingQueue   (Capability *cap, StgBlockingQueue *bq);
+void tryWakeupThread     (Capability *cap, StgTSO *tso);
+
+// Wakes up a thread on a Capability (probably a different Capability
+// from the one held by the current Task).
+//
+#ifdef THREADED_RTS
+void wakeupThreadOnCapability (Capability *cap,
+                               Capability *other_cap, 
+                               StgTSO *tso);
+#endif
+
+void updateThunk         (Capability *cap, StgTSO *tso,
+                          StgClosure *thunk, StgClosure *val);
 
 void removeThreadFromMVarQueue (Capability *cap, StgMVar *mvar, StgTSO *tso);
-void removeThreadFromQueue     (Capability *cap, StgTSO **queue, StgTSO *tso);
-void removeThreadFromDeQueue   (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
+
+rtsBool removeThreadFromQueue     (Capability *cap, StgTSO **queue, StgTSO *tso);
+rtsBool removeThreadFromDeQueue   (Capability *cap, StgTSO **head, StgTSO **tail, StgTSO *tso);
 
 StgBool isThreadBound (StgTSO* tso);
 
index a5d42fb..dddc754 100644 (file)
@@ -76,8 +76,6 @@ handle_tick(int unused STG_UNUSED)
               ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTime /
                   RtsFlags.MiscFlags.tickInterval;
               recent_activity = ACTIVITY_INACTIVE;
-              blackholes_need_checking = rtsTrue;
-              /* hack: re-use the blackholes_need_checking flag */
               wakeUpRts();
           }
       }
index e0fd7c3..7af5965 100644 (file)
 
 #include "Updates.h"
 
-/* on entry to the update code
-   (1) R1 points to the closure being returned
-   (2) Sp points to the update frame
-*/
+#if defined(PROFILING)
+#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3
+#else
+#define UPD_FRAME_PARAMS P_ unused1
+#endif
 
 /* The update fragment has been tuned so as to generate good
    code with gcc, which accounts for some of the strangeness in the
    code), since we don't mind duplicating this jump.
 */
 
-#define UPD_FRAME_ENTRY_TEMPLATE                                       \
-       {                                                               \
-          W_ updatee;                                                  \
-                                                                       \
-          updatee = StgUpdateFrame_updatee(Sp);                                \
-                                                                       \
-         /* remove the update frame from the stack */                  \
-         Sp = Sp + SIZEOF_StgUpdateFrame;                              \
-                                                                       \
-         /* ToDo: it might be a PAP, so we should check... */          \
-         TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee)));  \
-                                                                       \
-          updateWithIndirection(stg_IND_direct_info,                    \
-                               updatee,                                \
-                               R1,                                     \
-                               jump %ENTRY_CODE(Sp(0)));               \
-       }
-
-#if defined(PROFILING)
-#define UPD_FRAME_PARAMS W_ unused1, W_ unused2, P_ unused3
-#else
-#define UPD_FRAME_PARAMS P_ unused1
-#endif
-
-/* this bitmap indicates that the first word of an update frame is a
- * non-pointer - this is the update frame link.  (for profiling,
- * there's a cost-centre-stack in there too).
- */
+/* on entry to the update code
+   (1) R1 points to the closure being returned
+   (2) Sp points to the update frame
+*/
 
 INFO_TABLE_RET( stg_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
-UPD_FRAME_ENTRY_TEMPLATE
+{
+    W_ updatee;
+    
+    updatee = StgUpdateFrame_updatee(Sp);
+    
+    /* remove the update frame from the stack */
+    Sp = Sp + SIZEOF_StgUpdateFrame;
+    
+    /* ToDo: it might be a PAP, so we should check... */
+    TICK_UPD_CON_IN_NEW(sizeW_fromITBL(%GET_STD_INFO(updatee)));
+    
+    updateWithIndirection(updatee,
+                          R1,
+                          jump %ENTRY_CODE(Sp(0)));
+}
 
 
 INFO_TABLE_RET( stg_marked_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
-UPD_FRAME_ENTRY_TEMPLATE
+{
+    W_ updatee, v, i, tso, link;
+
+    // we know the closure is a BLACKHOLE
+    updatee = StgUpdateFrame_updatee(Sp);
+    v = StgInd_indirectee(updatee);
+
+    // remove the update frame from the stack
+    Sp = Sp + SIZEOF_StgUpdateFrame;
+    
+    if (GETTAG(v) != 0) {
+        // updated by someone else: discard our value and use the
+        // other one to increase sharing, but check the blocking
+        // queues to see if any threads were waiting on this BLACKHOLE.
+        R1 = v;
+        foreign "C" checkBlockingQueues(MyCapability() "ptr",
+                                        CurrentTSO "ptr") [R1];
+        jump %ENTRY_CODE(Sp(0));
+    }
+
+    // common case: it is still our BLACKHOLE
+    if (v == CurrentTSO) {
+        updateWithIndirection(updatee,
+                              R1,
+                              jump %ENTRY_CODE(Sp(0)));
+    }
+
+    // The other cases are all handled by the generic code
+    foreign "C" updateThunk (MyCapability() "ptr", CurrentTSO "ptr", 
+                             updatee "ptr", R1 "ptr") [R1];
+
+    jump %ENTRY_CODE(Sp(0));
+}
+
+// Special update frame code for CAFs and eager-blackholed thunks: it
+// knows how to update blackholes, but is distinct from
+// stg_marked_upd_frame so that lazy blackholing won't treat it as the
+// high watermark.
+INFO_TABLE_RET (stg_bh_upd_frame, UPDATE_FRAME, UPD_FRAME_PARAMS)
+{
+    jump stg_marked_upd_frame_info;
+}
index 2b3c35d..de9276c 100644 (file)
@@ -48,8 +48,7 @@ BEGIN_RTS_PRIVATE
   W_ sz;                                                               \
   W_ i;                                                                        \
   inf = %GET_STD_INFO(p);                                              \
-  if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)                            \
-       && %INFO_TYPE(inf) != HALF_W_(CAF_BLACKHOLE)) {                 \
+  if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)) {                         \
       if (%INFO_TYPE(inf) == HALF_W_(THUNK_SELECTOR)) {                        \
          sz = BYTES_TO_WDS(SIZEOF_StgSelector_NoThunkHdr);             \
      } else {                                                          \
@@ -82,7 +81,6 @@ FILL_SLOP(StgClosure *p)
 
     switch (inf->type) {
     case BLACKHOLE:
-    case CAF_BLACKHOLE:
        goto no_slop;
        // we already filled in the slop when we overwrote the thunk
        // with BLACKHOLE, and also an evacuated BLACKHOLE is only the
@@ -127,23 +125,21 @@ no_slop:
  */
 #ifdef CMINUSMINUS
 
-#define updateWithIndirection(ind_info, p1, p2, and_then)      \
+#define updateWithIndirection(p1, p2, and_then)        \
     W_ bd;                                                     \
                                                                \
     DEBUG_FILL_SLOP(p1);                                       \
     LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1);                     \
     StgInd_indirectee(p1) = p2;                                        \
     prim %write_barrier() [];                                  \
+    SET_INFO(p1, stg_BLACKHOLE_info);                           \
+    LDV_RECORD_CREATE(p1);                                      \
     bd = Bdescr(p1);                                           \
     if (bdescr_gen_no(bd) != 0 :: bits16) {                    \
       recordMutableCap(p1, TO_W_(bdescr_gen_no(bd)), R1);      \
-      SET_INFO(p1, stg_IND_OLDGEN_info);                       \
-      LDV_RECORD_CREATE(p1);                                   \
       TICK_UPD_OLD_IND();                                      \
       and_then;                                                        \
     } else {                                                   \
-      SET_INFO(p1, ind_info);                                  \
-      LDV_RECORD_CREATE(p1);                                   \
       TICK_UPD_NEW_IND();                                      \
       and_then;                                                        \
   }
@@ -151,7 +147,6 @@ no_slop:
 #else /* !CMINUSMINUS */
 
 INLINE_HEADER void updateWithIndirection (Capability *cap, 
-                                          const StgInfoTable *ind_info, 
                                           StgClosure *p1, 
                                           StgClosure *p2)
 {
@@ -164,25 +159,19 @@ INLINE_HEADER void updateWithIndirection (Capability *cap,
     LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1);
     ((StgInd *)p1)->indirectee = p2;
     write_barrier();
+    SET_INFO(p1, &stg_BLACKHOLE_info);
+    LDV_RECORD_CREATE(p1);
     bd = Bdescr((StgPtr)p1);
     if (bd->gen_no != 0) {
         recordMutableCap(p1, cap, bd->gen_no);
-        SET_INFO(p1, &stg_IND_OLDGEN_info);
         TICK_UPD_OLD_IND();
     } else {
-        SET_INFO(p1, ind_info);
-        LDV_RECORD_CREATE(p1);
         TICK_UPD_NEW_IND();
     }
 }
 
 #endif /* CMINUSMINUS */
 
-#define UPD_IND(cap, updclosure, heapptr)        \
-    updateWithIndirection(cap, &stg_IND_info,    \
-                          updclosure,            \
-                          heapptr)
-
 #ifndef CMINUSMINUS
 END_RTS_PRIVATE
 #endif
index 79c7fbf..608345b 100644 (file)
@@ -131,8 +131,9 @@ my_mmap (void *addr, lnat size)
            (errno == EINVAL && sizeof(void*)==4 && size >= 0xc0000000)) {
            // If we request more than 3Gig, then we get EINVAL
            // instead of ENOMEM (at least on Linux).
-           errorBelch("out of memory (requested %lu bytes)", size);
-           stg_exit(EXIT_FAILURE);
+           barf("out of memory (requested %lu bytes)", size);
+//            abort();
+//         stg_exit(EXIT_FAILURE);
        } else {
            barf("getMBlock: mmap: %s", strerror(errno));
        }
index 39284f9..6de42ef 100644 (file)
@@ -628,8 +628,8 @@ thread_obj (StgInfoTable *info, StgPtr p)
     case IND_PERM:
     case MUT_VAR_CLEAN:
     case MUT_VAR_DIRTY:
-    case CAF_BLACKHOLE:
     case BLACKHOLE:
+    case BLOCKING_QUEUE:
     {
        StgPtr end;
        
@@ -967,9 +967,6 @@ compact(StgClosure *static_objects)
     // any threads resurrected during this GC
     thread((void *)&resurrected_threads);
 
-    // the blackhole queue
-    thread((void *)&blackhole_queue);
-
     // the task list
     {
        Task *task;
index d5c9b8a..37cbee5 100644 (file)
@@ -139,7 +139,6 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info,
     nat i;
 
     to = alloc_for_copy(size,gen);
-    *p = TAG_CLOSURE(tag,(StgClosure*)to);
 
     from = (StgPtr)src;
     to[0] = (W_)info;
@@ -150,6 +149,7 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info,
     // if somebody else reads the forwarding pointer, we better make
     // sure there's a closure at the end of it.
     write_barrier();
+    *p = TAG_CLOSURE(tag,(StgClosure*)to);
     src->header.info = (const StgInfoTable *)MK_FORWARDING_PTR(to);
 
 //  if (to+size+2 < bd->start + BLOCK_SIZE_W) {
@@ -166,7 +166,7 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info,
 
 /* Special version of copy() for when we only want to copy the info
  * pointer of an object, but reserve some padding after it.  This is
- * used to optimise evacuation of BLACKHOLEs.
+ * used to optimise evacuation of TSOs.
  */
 static rtsBool
 copyPart(StgClosure **p, StgClosure *src, nat size_to_reserve, 
@@ -195,7 +195,6 @@ spin:
 #endif
 
     to = alloc_for_copy(size_to_reserve, gen);
-    *p = (StgClosure *)to;
 
     from = (StgPtr)src;
     to[0] = info;
@@ -203,10 +202,9 @@ spin:
        to[i] = from[i];
     }
     
-#if defined(PARALLEL_GC)
     write_barrier();
-#endif
     src->header.info = (const StgInfoTable*)MK_FORWARDING_PTR(to);
+    *p = (StgClosure *)to;
     
 #ifdef PROFILING
     // We store the size of the just evacuated object in the LDV word so that
@@ -366,7 +364,7 @@ loop:
   tag = GET_CLOSURE_TAG(q);
   q = UNTAG_CLOSURE(q);
 
-  ASSERT(LOOKS_LIKE_CLOSURE_PTR(q));
+  ASSERTM(LOOKS_LIKE_CLOSURE_PTR(q), "invalid closure, info=%p", q->header.info);
 
   if (!HEAP_ALLOCED_GC(q)) {
 
@@ -629,21 +627,42 @@ loop:
       copy_tag_nolock(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
       return;
 
+  case BLACKHOLE:
+  {
+      StgClosure *r;
+      const StgInfoTable *i;
+      r = ((StgInd*)q)->indirectee;
+      if (GET_CLOSURE_TAG(r) == 0) {
+          i = r->header.info;
+          if (IS_FORWARDING_PTR(i)) {
+              r = (StgClosure *)UN_FORWARDING_PTR(i);
+              i = r->header.info;
+          }
+          if (i == &stg_TSO_info
+              || i == &stg_WHITEHOLE_info 
+              || i == &stg_BLOCKING_QUEUE_CLEAN_info
+              || i == &stg_BLOCKING_QUEUE_DIRTY_info) {
+              copy(p,info,q,sizeofW(StgInd),gen);
+              return;
+          }
+          ASSERT(i != &stg_IND_info);
+      }
+      q = r;
+      *p = r;
+      goto loop;
+  }
+
+  case BLOCKING_QUEUE:
   case WEAK:
   case PRIM:
   case MUT_PRIM:
-      copy_tag(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen,tag);
+      copy(p,info,q,sizeW_fromITBL(INFO_PTR_TO_STRUCT(info)),gen);
       return;
 
   case BCO:
       copy(p,info,q,bco_sizeW((StgBCO *)q),gen);
       return;
 
-  case CAF_BLACKHOLE:
-  case BLACKHOLE:
-      copyPart(p,q,BLACKHOLE_sizeW(),sizeofW(StgHeader),gen);
-      return;
-
   case THUNK_SELECTOR:
       eval_thunk_selector(p, (StgSelector *)q, rtsTrue);
       return;
@@ -756,11 +775,7 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val)
     prev = NULL;
     while (p)
     {
-#ifdef THREADED_RTS
         ASSERT(p->header.info == &stg_WHITEHOLE_info);
-#else
-        ASSERT(p->header.info == &stg_BLACKHOLE_info);
-#endif
         // val must be in to-space.  Not always: when we recursively
         // invoke eval_thunk_selector(), the recursive calls will not 
         // evacuate the value (because we want to select on the value,
@@ -783,7 +798,13 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val)
             // indirection pointing to itself, and we want the program
             // to deadlock if it ever enters this closure, so
             // BLACKHOLE is correct.
-            SET_INFO(p, &stg_BLACKHOLE_info);
+
+            // XXX we do not have BLACKHOLEs any more; replace with
+            // a THUNK_SELECTOR again.  This will go into a loop if it is
+            // entered, and should result in a NonTermination exception.
+            ((StgThunk *)p)->payload[0] = val;
+            write_barrier();
+            SET_INFO(p, &stg_sel_0_upd_info);
         } else {
             ((StgInd *)p)->indirectee = val;
             write_barrier();
@@ -813,7 +834,7 @@ eval_thunk_selector (StgClosure **q, StgSelector * p, rtsBool evac)
     prev_thunk_selector = NULL;
     // this is a chain of THUNK_SELECTORs that we are going to update
     // to point to the value of the current THUNK_SELECTOR.  Each
-    // closure on the chain is a BLACKHOLE, and points to the next in the
+    // closure on the chain is a WHITEHOLE, and points to the next in the
     // chain with payload[0].
 
 selector_chain:
@@ -851,7 +872,7 @@ selector_chain:
     }
 
 
-    // BLACKHOLE the selector thunk, since it is now under evaluation.
+    // WHITEHOLE the selector thunk, since it is now under evaluation.
     // This is important to stop us going into an infinite loop if
     // this selector thunk eventually refers to itself.
 #if defined(THREADED_RTS)
@@ -884,7 +905,7 @@ selector_chain:
 #else
     // Save the real info pointer (NOTE: not the same as get_itbl()).
     info_ptr = (StgWord)p->header.info;
-    SET_INFO(p,&stg_BLACKHOLE_info);
+    SET_INFO(p,&stg_WHITEHOLE_info);
 #endif
 
     field = INFO_PTR_TO_STRUCT(info_ptr)->layout.selector_offset;
@@ -936,11 +957,7 @@ selector_loop:
               // the original selector thunk, p.
               SET_INFO(p, (StgInfoTable *)info_ptr);
               LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC((StgClosure *)p);
-#if defined(THREADED_RTS)
               SET_INFO(p, &stg_WHITEHOLE_info);
-#else
-              SET_INFO(p, &stg_BLACKHOLE_info);
-#endif
 #endif
 
               // the closure in val is now the "value" of the
@@ -998,6 +1015,33 @@ selector_loop:
           selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
          goto selector_loop;
 
+      case BLACKHOLE:
+      {
+          StgClosure *r;
+          const StgInfoTable *i;
+          r = ((StgInd*)selectee)->indirectee;
+
+          // establish whether this BH has been updated, and is now an
+          // indirection, as in evacuate().
+          if (GET_CLOSURE_TAG(r) == 0) {
+              i = r->header.info;
+              if (IS_FORWARDING_PTR(i)) {
+                  r = (StgClosure *)UN_FORWARDING_PTR(i);
+                  i = r->header.info;
+              }
+              if (i == &stg_TSO_info
+                  || i == &stg_WHITEHOLE_info 
+                  || i == &stg_BLOCKING_QUEUE_CLEAN_info
+                  || i == &stg_BLOCKING_QUEUE_DIRTY_info) {
+                  goto bale_out;
+              }
+              ASSERT(i != &stg_IND_info);
+          }
+
+          selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
+          goto selector_loop;
+      }
+
       case THUNK_SELECTOR:
       {
          StgClosure *val;
@@ -1032,8 +1076,6 @@ selector_loop:
       case THUNK_1_1:
       case THUNK_0_2:
       case THUNK_STATIC:
-      case CAF_BLACKHOLE:
-      case BLACKHOLE:
          // not evaluated yet 
          goto bale_out;
     
index ae6fc99..4d63724 100644 (file)
@@ -395,13 +395,6 @@ SET_GCT(gc_threads[0]);
       // The other threads are now stopped.  We might recurse back to
       // here, but from now on this is the only thread.
       
-      // if any blackholes are alive, make the threads that wait on
-      // them alive too.
-      if (traverseBlackholeQueue()) {
-         inc_running(); 
-         continue;
-      }
-  
       // must be last...  invariant is that everything is fully
       // scavenged at this point.
       if (traverseWeakPtrList()) { // returns rtsTrue if evaced something 
index 0ac807f..e65c176 100644 (file)
@@ -210,21 +210,6 @@ traverseWeakPtrList(void)
           }
       }
         
-      /* Finally, we can update the blackhole_queue.  This queue
-       * simply strings together TSOs blocked on black holes, it is
-       * not intended to keep anything alive.  Hence, we do not follow
-       * pointers on the blackhole_queue until now, when we have
-       * determined which TSOs are otherwise reachable.  We know at
-       * this point that all TSOs have been evacuated, however.
-       */
-      { 
-         StgTSO **pt;
-         for (pt = &blackhole_queue; *pt != END_TSO_QUEUE; pt = &((*pt)->_link)) {
-             *pt = (StgTSO *)isAlive((StgClosure *)*pt);
-             ASSERT(*pt != NULL);
-         }
-      }
-      
       weak_stage = WeakDone;  // *now* we're done,
       return rtsTrue;         // but one more round of scavenging, please
   }
@@ -310,49 +295,6 @@ static rtsBool tidyThreadList (generation *gen)
 }
 
 /* -----------------------------------------------------------------------------
-   The blackhole queue
-   
-   Threads on this list behave like weak pointers during the normal
-   phase of garbage collection: if the blackhole is reachable, then
-   the thread is reachable too.
-   -------------------------------------------------------------------------- */
-rtsBool
-traverseBlackholeQueue (void)
-{
-    StgTSO *prev, *t, *tmp;
-    rtsBool flag;
-    nat type;
-
-    flag = rtsFalse;
-    prev = NULL;
-
-    for (t = blackhole_queue; t != END_TSO_QUEUE; prev=t, t = t->_link) {
-        // if the thread is not yet alive...
-       if (! (tmp = (StgTSO *)isAlive((StgClosure*)t))) {
-            // if the closure it is blocked on is either (a) a
-            // reachable BLAKCHOLE or (b) not a BLACKHOLE, then we
-            // make the thread alive.
-           if (!isAlive(t->block_info.closure)) {
-                type = get_itbl(t->block_info.closure)->type;
-                if (type == BLACKHOLE || type == CAF_BLACKHOLE) {
-                    continue;
-                }
-            }
-            evacuate((StgClosure **)&t);
-            if (prev) {
-                prev->_link = t;
-            } else {
-                blackhole_queue = t;
-            }
-                 // no write barrier when on the blackhole queue,
-                 // because we traverse the whole queue on every GC.
-            flag = rtsTrue;
-       }
-    }
-    return flag;
-}
-
-/* -----------------------------------------------------------------------------
    Evacuate every weak pointer object on the weak_ptr_list, and update
    the link fields.
 
index 018dd6c..5c05ab2 100644 (file)
@@ -23,7 +23,6 @@ extern StgTSO *exception_threads;
 void    initWeakForGC          ( void );
 rtsBool traverseWeakPtrList    ( void );
 void    markWeakPtrList        ( void );
-rtsBool traverseBlackholeQueue ( void );
 
 END_RTS_PRIVATE
 
index 11d5424..1423077 100644 (file)
@@ -306,7 +306,6 @@ checkClosure( StgClosure* p )
     case IND_OLDGEN:
     case IND_OLDGEN_PERM:
     case BLACKHOLE:
-    case CAF_BLACKHOLE:
     case PRIM:
     case MUT_PRIM:
     case MUT_VAR_CLEAN:
@@ -323,6 +322,23 @@ checkClosure( StgClosure* p )
            return sizeW_fromITBL(info);
        }
 
+    case BLOCKING_QUEUE:
+    {
+        StgBlockingQueue *bq = (StgBlockingQueue *)p;
+
+        // NO: the BH might have been updated now
+        // ASSERT(get_itbl(bq->bh)->type == BLACKHOLE);
+        ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh));
+
+        ASSERT(get_itbl(bq->owner)->type == TSO);
+        ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO);
+        ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE || 
+               get_itbl(bq->link)->type == IND ||
+               get_itbl(bq->link)->type == BLOCKING_QUEUE);
+
+        return sizeofW(StgBlockingQueue);
+    }
+
     case BCO: {
        StgBCO *bco = (StgBCO *)p;
        ASSERT(LOOKS_LIKE_CLOSURE_PTR(bco->instrs));
@@ -516,6 +532,11 @@ checkTSO(StgTSO *tso)
       return;
     }
 
+    ASSERT(tso->_link == END_TSO_QUEUE || get_itbl(tso->_link)->type == TSO);
+    ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->block_info.closure));
+    ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->bq));
+    ASSERT(LOOKS_LIKE_CLOSURE_PTR(tso->blocked_exceptions));
+
     ASSERT(stack <= sp && sp < stack_end);
 
     checkStackChunk(sp, stack_end);
@@ -539,9 +560,7 @@ checkGlobalTSOList (rtsBool checkTSOs)
           if (checkTSOs)
               checkTSO(tso);
 
-          while (tso->what_next == ThreadRelocated) {
-              tso = tso->_link;
-          }
+          tso = deRefTSO(tso);
 
           // If this TSO is dirty and in an old generation, it better
           // be on the mutable list.
index 1b671a0..75c186c 100644 (file)
@@ -46,17 +46,6 @@ static void scavenge_large_bitmap (StgPtr p,
    Scavenge a TSO.
    -------------------------------------------------------------------------- */
 
-STATIC_INLINE void
-scavenge_TSO_link (StgTSO *tso)
-{
-    // We don't always chase the link field: TSOs on the blackhole
-    // queue are not automatically alive, so the link field is a
-    // "weak" pointer in that case.
-    if (tso->why_blocked != BlockedOnBlackHole) {
-        evacuate((StgClosure **)&tso->_link);
-    }
-}
-
 static void
 scavengeTSO (StgTSO *tso)
 {
@@ -87,7 +76,18 @@ scavengeTSO (StgTSO *tso)
        ) {
        evacuate(&tso->block_info.closure);
     }
+#ifdef THREADED_RTS
+    // in the THREADED_RTS, block_info.closure must always point to a
+    // valid closure, because we assume this in throwTo().  In the
+    // non-threaded RTS it might be a FD (for
+    // BlockedOnRead/BlockedOnWrite) or a time value (BlockedOnDelay)
+    else {
+        tso->block_info.closure = (StgClosure *)END_TSO_QUEUE;
+    }
+#endif
+
     evacuate((StgClosure **)&tso->blocked_exceptions);
+    evacuate((StgClosure **)&tso->bq);
     
     // scavange current transaction record
     evacuate((StgClosure **)&tso->trec);
@@ -97,10 +97,10 @@ scavengeTSO (StgTSO *tso)
 
     if (gct->failed_to_evac) {
         tso->dirty = 1;
-        scavenge_TSO_link(tso);
+        evacuate((StgClosure **)&tso->_link);
     } else {
         tso->dirty = 0;
-        scavenge_TSO_link(tso);
+        evacuate((StgClosure **)&tso->_link);
         if (gct->failed_to_evac) {
             tso->flags |= TSO_LINK_DIRTY;
         } else {
@@ -570,6 +570,7 @@ scavenge_block (bdescr *bd)
       }
        // fall through 
     case IND_OLDGEN_PERM:
+    case BLACKHOLE:
        evacuate(&((StgInd *)p)->indirectee);
        p += sizeofW(StgInd);
        break;
@@ -588,10 +589,25 @@ scavenge_block (bdescr *bd)
        p += sizeofW(StgMutVar);
        break;
 
-    case CAF_BLACKHOLE:
-    case BLACKHOLE:
-       p += BLACKHOLE_sizeW();
-       break;
+    case BLOCKING_QUEUE:
+    {
+        StgBlockingQueue *bq = (StgBlockingQueue *)p;
+        
+       gct->eager_promotion = rtsFalse;
+        evacuate(&bq->bh);
+        evacuate((StgClosure**)&bq->owner);
+        evacuate((StgClosure**)&bq->queue);
+        evacuate((StgClosure**)&bq->link);
+       gct->eager_promotion = saved_eager_promotion;
+
+       if (gct->failed_to_evac) {
+           bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+       } else {
+           bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+       }
+        p += sizeofW(StgBlockingQueue);
+        break;
+    }
 
     case THUNK_SELECTOR:
     { 
@@ -884,6 +900,7 @@ scavenge_mark_stack(void)
 
        case IND_OLDGEN:
        case IND_OLDGEN_PERM:
+        case BLACKHOLE:
            evacuate(&((StgInd *)p)->indirectee);
            break;
 
@@ -901,8 +918,25 @@ scavenge_mark_stack(void)
            break;
        }
 
-       case CAF_BLACKHOLE:
-       case BLACKHOLE:
+        case BLOCKING_QUEUE:
+        {
+            StgBlockingQueue *bq = (StgBlockingQueue *)p;
+            
+            gct->eager_promotion = rtsFalse;
+            evacuate(&bq->bh);
+            evacuate((StgClosure**)&bq->owner);
+            evacuate((StgClosure**)&bq->queue);
+            evacuate((StgClosure**)&bq->link);
+            gct->eager_promotion = saved_eager_promotion;
+            
+            if (gct->failed_to_evac) {
+                bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+            } else {
+                bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+            }
+            break;
+        }
+
        case ARR_WORDS:
            break;
 
@@ -1122,10 +1156,25 @@ scavenge_one(StgPtr p)
        break;
     }
 
-    case CAF_BLACKHOLE:
-    case BLACKHOLE:
-       break;
-       
+    case BLOCKING_QUEUE:
+    {
+        StgBlockingQueue *bq = (StgBlockingQueue *)p;
+        
+        gct->eager_promotion = rtsFalse;
+        evacuate(&bq->bh);
+        evacuate((StgClosure**)&bq->owner);
+        evacuate((StgClosure**)&bq->queue);
+        evacuate((StgClosure**)&bq->link);
+        gct->eager_promotion = saved_eager_promotion;
+        
+        if (gct->failed_to_evac) {
+            bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+        } else {
+            bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+        }
+        break;
+    }
+
     case THUNK_SELECTOR:
     { 
        StgSelector *s = (StgSelector *)p;
@@ -1239,6 +1288,7 @@ scavenge_one(StgPtr p)
         // on the large-object list and then gets updated.  See #3424.
     case IND_OLDGEN:
     case IND_OLDGEN_PERM:
+    case BLACKHOLE:
     case IND_STATIC:
        evacuate(&((StgInd *)p)->indirectee);
 
@@ -1300,7 +1350,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
 #ifdef DEBUG       
            switch (get_itbl((StgClosure *)p)->type) {
            case MUT_VAR_CLEAN:
-               barf("MUT_VAR_CLEAN on mutable list");
+                // can happen due to concurrent writeMutVars
            case MUT_VAR_DIRTY:
                mutlist_MUTVARS++; break;
            case MUT_ARR_PTRS_CLEAN:
@@ -1356,7 +1406,7 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
                     // this assertion would be invalid:
                     // ASSERT(tso->flags & TSO_LINK_DIRTY);
 
-                    scavenge_TSO_link(tso);
+                    evacuate((StgClosure **)&tso->_link);
                     if (gct->failed_to_evac) {
                         recordMutableGen_GC((StgClosure *)p,gen->no);
                         gct->failed_to_evac = rtsFalse;
@@ -1576,10 +1626,12 @@ scavenge_stack(StgPtr p, StgPtr stack_end)
        // before GC, but that seems like overkill.
        //
        // Scavenging this update frame as normal would be disastrous;
-       // the updatee would end up pointing to the value.  So we turn
-       // the indirection into an IND_PERM, so that evacuate will
-       // copy the indirection into the old generation instead of
-       // discarding it.
+       // the updatee would end up pointing to the value.  So we
+       // check whether the value after evacuation is a BLACKHOLE,
+       // and if not, we change the update frame to an stg_enter
+       // frame that simply returns the value.  Hence, blackholing is
+        // compulsory (otherwise we would have to check for thunks
+        // too).
         //
         // Note [upd-black-hole]
         // One slight hiccup is that the THUNK_SELECTOR machinery can
@@ -1590,22 +1642,17 @@ scavenge_stack(StgPtr p, StgPtr stack_end)
         // the updatee is never a THUNK_SELECTOR and we're ok.
         // NB. this is a new invariant: blackholing is not optional.
     {
-        nat type;
-        const StgInfoTable *i;
-        StgClosure *updatee;
-
-        updatee = ((StgUpdateFrame *)p)->updatee;
-        i = updatee->header.info;
-        if (!IS_FORWARDING_PTR(i)) {
-            type = get_itbl(updatee)->type;
-            if (type == IND) {
-                updatee->header.info = &stg_IND_PERM_info;
-            } else if (type == IND_OLDGEN) {
-                updatee->header.info = &stg_IND_OLDGEN_PERM_info;
-            }            
+        StgUpdateFrame *frame = (StgUpdateFrame *)p;
+        StgClosure *v;
+
+        evacuate(&frame->updatee);
+        v = frame->updatee;
+        if (GET_CLOSURE_TAG(v) != 0 ||
+            (get_itbl(v)->type != BLACKHOLE)) {
+            // blackholing is compulsory, see above.
+            frame->header.info = (const StgInfoTable*)&stg_enter_checkbh_info;
         }
-        evacuate(&((StgUpdateFrame *)p)->updatee);
-        ASSERT(GET_CLOSURE_TAG(((StgUpdateFrame *)p)->updatee) == 0);
+        ASSERT(v->header.info != &stg_TSO_info);
         p += sizeofW(StgUpdateFrame);
         continue;
     }
index 6aedb96..0234400 100644 (file)
@@ -107,7 +107,7 @@ initStorage( void )
    * doing something reasonable.
    */
   /* We use the NOT_NULL variant or gcc warns that the test is always true */
-  ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLACKHOLE_info));
+  ASSERT(LOOKS_LIKE_INFO_PTR_NOT_NULL((StgWord)&stg_BLOCKING_QUEUE_CLEAN_info));
   ASSERT(LOOKS_LIKE_CLOSURE_PTR(&stg_dummy_ret_closure));
   ASSERT(!HEAP_ALLOCED(&stg_dummy_ret_closure));
   
@@ -229,13 +229,13 @@ freeStorage (void)
 
    The entry code for every CAF does the following:
      
-      - builds a CAF_BLACKHOLE in the heap
-      - pushes an update frame pointing to the CAF_BLACKHOLE
+      - builds a BLACKHOLE in the heap
+      - pushes an update frame pointing to the BLACKHOLE
       - invokes UPD_CAF(), which:
           - calls newCaf, below
-         - updates the CAF with a static indirection to the CAF_BLACKHOLE
+         - updates the CAF with a static indirection to the BLACKHOLE
       
-   Why do we build a BLACKHOLE in the heap rather than just updating
+   Why do we build an BLACKHOLE in the heap rather than just updating
    the thunk directly?  It's so that we only need one kind of update
    frame - otherwise we'd need a static version of the update frame too.
 
@@ -699,11 +699,9 @@ void
 dirty_MUT_VAR(StgRegTable *reg, StgClosure *p)
 {
     Capability *cap = regTableToCapability(reg);
-    bdescr *bd;
     if (p->header.info == &stg_MUT_VAR_CLEAN_info) {
        p->header.info = &stg_MUT_VAR_DIRTY_info;
-       bd = Bdescr((StgPtr)p);
-       if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no);
+        recordClosureMutated(cap,p);
     }
 }
 
@@ -716,11 +714,9 @@ dirty_MUT_VAR(StgRegTable *reg, StgClosure *p)
 void
 setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
 {
-    bdescr *bd;
     if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
         tso->flags |= TSO_LINK_DIRTY;
-       bd = Bdescr((StgPtr)tso);
-       if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no);
+        recordClosureMutated(cap,(StgClosure*)tso);
     }
     tso->_link = target;
 }
@@ -728,10 +724,8 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
 void
 dirty_TSO (Capability *cap, StgTSO *tso)
 {
-    bdescr *bd;
     if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
-       bd = Bdescr((StgPtr)tso);
-       if (bd->gen_no > 0) recordMutableCap((StgClosure*)tso,cap,bd->gen_no);
+        recordClosureMutated(cap,(StgClosure*)tso);
     }
     tso->dirty = 1;
 }
@@ -747,10 +741,7 @@ dirty_TSO (Capability *cap, StgTSO *tso)
 void
 dirty_MVAR(StgRegTable *reg, StgClosure *p)
 {
-    Capability *cap = regTableToCapability(reg);
-    bdescr *bd;
-    bd = Bdescr((StgPtr)p);
-    if (bd->gen_no > 0) recordMutableCap(p,cap,bd->gen_no);
+    recordClosureMutated(regTableToCapability(reg),p);
 }
 
 /* -----------------------------------------------------------------------------
index 765bfb3..16d3394 100644 (file)
@@ -564,8 +564,8 @@ genApply regstatus args =
 --    else:
        text "case AP,",
        text "     AP_STACK,",
-       text "     CAF_BLACKHOLE,",
        text "     BLACKHOLE,",
+       text "     WHITEHOLE,",
         text "     THUNK,",
         text "     THUNK_1_0,",
         text "     THUNK_0_1,",