New implementation of BLACKHOLEs
[ghc-hetmet.git] / rts / Schedule.c
index f3982b1..72f6d44 100644 (file)
@@ -39,6 +39,7 @@
 #include "Threads.h"
 #include "Timer.h"
 #include "ThreadPaused.h"
+#include "Messages.h"
 
 #ifdef HAVE_SYS_TYPES_H
 #include <sys/types.h>
@@ -66,17 +67,6 @@ StgTSO *blocked_queue_tl = NULL;
 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
 #endif
 
-/* Threads blocked on blackholes.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *blackhole_queue = NULL;
-
-/* The blackhole_queue should be checked for threads to wake up.  See
- * Schedule.h for more thorough comment.
- * LOCK: none (doesn't matter if we miss an update)
- */
-rtsBool blackholes_need_checking = rtsFalse;
-
 /* Set to true when the latest garbage collection failed to reclaim
  * enough space, and the runtime should proceed to shut itself down in
  * an orderly fashion (emitting profiling info etc.)
@@ -140,7 +130,6 @@ static void scheduleYield (Capability **pcap, Task *task, rtsBool);
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleProcessInbox(Capability *cap);
-static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
 static void schedulePushWork(Capability *cap, Task *task);
 #if defined(THREADED_RTS)
@@ -159,8 +148,6 @@ static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
 static Capability *scheduleDoGC(Capability *cap, Task *task,
                                rtsBool force_major);
 
-static rtsBool checkBlackHoles(Capability *cap);
-
 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
 static StgTSO *threadStackUnderflow(Capability *cap, Task *task, StgTSO *tso);
 
@@ -433,9 +420,6 @@ run_thread:
 
     startHeapProfTimer();
 
-    // Check for exceptions blocked on this thread
-    maybePerformBlockedException (cap, t);
-
     // ----------------------------------------------------------------------
     // Run the current thread 
 
@@ -506,13 +490,6 @@ run_thread:
     // happened.  So find the new location:
     t = cap->r.rCurrentTSO;
 
-    // We have run some Haskell code: there might be blackhole-blocked
-    // threads to wake up now.
-    // Lock-free test here should be ok, we're just setting a flag.
-    if ( blackhole_queue != END_TSO_QUEUE ) {
-       blackholes_need_checking = rtsTrue;
-    }
-    
     // And save the current errno in this thread.
     // XXX: possibly bogus for SMP because this thread might already
     // be running again, see code below.
@@ -612,12 +589,6 @@ scheduleFindWork (Capability *cap)
 {
     scheduleStartSignalHandlers(cap);
 
-    // Only check the black holes here if we've nothing else to do.
-    // During normal execution, the black hole list only gets checked
-    // at GC time, to avoid repeatedly traversing this possibly long
-    // list each time around the scheduler.
-    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
     scheduleProcessInbox(cap);
 
     scheduleCheckBlockedThreads(cap);
@@ -674,7 +645,6 @@ scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
         !shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
          !emptyInbox(cap) ||
-         blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
 
@@ -863,125 +833,11 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
     //
     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
     {
-       awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
+       awaitEvent (emptyRunQueue(cap));
     }
 #endif
 }
 
-
-/* ----------------------------------------------------------------------------
- * Check for threads woken up by other Capabilities
- * ------------------------------------------------------------------------- */
-
-#if defined(THREADED_RTS)
-static void
-executeMessage (Capability *cap, Message *m)
-{
-    const StgInfoTable *i;
-
-loop:
-    write_barrier(); // allow m->header to be modified by another thread
-    i = m->header.info;
-    if (i == &stg_MSG_WAKEUP_info)
-    {
-        MessageWakeup *w = (MessageWakeup *)m;
-        StgTSO *tso = w->tso;
-        debugTraceCap(DEBUG_sched, cap, "message: wakeup thread %ld", 
-                      (lnat)tso->id);
-        ASSERT(tso->cap == cap);
-        ASSERT(tso->why_blocked == BlockedOnMsgWakeup);
-        ASSERT(tso->block_info.closure == (StgClosure *)m);
-        tso->why_blocked = NotBlocked;
-        appendToRunQueue(cap, tso);
-    }
-    else if (i == &stg_MSG_THROWTO_info)
-    {
-        MessageThrowTo *t = (MessageThrowTo *)m;
-        nat r;
-        const StgInfoTable *i;
-
-        i = lockClosure((StgClosure*)m);
-        if (i != &stg_MSG_THROWTO_info) {
-            unlockClosure((StgClosure*)m, i);
-            goto loop;
-        }
-
-        debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld", 
-                      (lnat)t->source->id, (lnat)t->target->id);
-
-        ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
-        ASSERT(t->source->block_info.closure == (StgClosure *)m);
-
-        r = throwToMsg(cap, t);
-
-        switch (r) {
-        case THROWTO_SUCCESS:
-            ASSERT(t->source->sp[0] == (StgWord)&stg_block_throwto_info);
-            t->source->sp += 3;
-            unblockOne(cap, t->source);
-            // this message is done
-            unlockClosure((StgClosure*)m, &stg_IND_info);
-            break;
-        case THROWTO_BLOCKED:
-            // unlock the message
-            unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
-            break;
-        }
-    }
-    else if (i == &stg_IND_info)
-    {
-        // message was revoked
-        return;
-    }
-    else if (i == &stg_WHITEHOLE_info)
-    {
-        goto loop;
-    }
-    else
-    {
-        barf("executeMessage: %p", i);
-    }
-}
-#endif
-
-static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
-    Message *m;
-
-    while (!emptyInbox(cap)) {
-        ACQUIRE_LOCK(&cap->lock);
-        m = cap->inbox;
-        cap->inbox = m->link;
-        RELEASE_LOCK(&cap->lock);
-        executeMessage(cap, (Message *)m);
-    }
-#endif
-}
-
-/* ----------------------------------------------------------------------------
- * Check for threads blocked on BLACKHOLEs that can be woken up
- * ------------------------------------------------------------------------- */
-static void
-scheduleCheckBlackHoles (Capability *cap)
-{
-    if ( blackholes_need_checking ) // check without the lock first
-    {
-       ACQUIRE_LOCK(&sched_mutex);
-       if ( blackholes_need_checking ) {
-           blackholes_need_checking = rtsFalse;
-            // important that we reset the flag *before* checking the
-            // blackhole queue, otherwise we could get deadlock.  This
-            // happens as follows: we wake up a thread that
-            // immediately runs on another Capability, blocks on a
-            // blackhole, and then we reset the blackholes_need_checking flag.
-           checkBlackHoles(cap);
-       }
-       RELEASE_LOCK(&sched_mutex);
-    }
-}
-
 /* ----------------------------------------------------------------------------
  * Detect deadlock conditions and attempt to resolve them.
  * ------------------------------------------------------------------------- */
@@ -1090,6 +946,26 @@ scheduleSendPendingMessages(void)
 #endif
 
 /* ----------------------------------------------------------------------------
+ * Process message in the current Capability's inbox
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+    Message *m;
+
+    while (!emptyInbox(cap)) {
+        ACQUIRE_LOCK(&cap->lock);
+        m = cap->inbox;
+        cap->inbox = m->link;
+        RELEASE_LOCK(&cap->lock);
+        executeMessage(cap, (Message *)m);
+    }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
  * ------------------------------------------------------------------------- */
 
@@ -1499,9 +1375,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
     }
 
-    // do this while the other Capabilities stop:
-    if (cap) scheduleCheckBlackHoles(cap);
-
     if (gc_type == PENDING_GC_SEQ)
     {
         // single-threaded GC: grab all the capabilities
@@ -1529,11 +1402,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
         waitForGcThreads(cap);
     }
 
-#else /* !THREADED_RTS */
-
-    // do this while the other Capabilities stop:
-    if (cap) scheduleCheckBlackHoles(cap);
-
 #endif
 
     IF_DEBUG(scheduler, printAllThreads());
@@ -2093,8 +1961,6 @@ initScheduler(void)
   sleeping_queue    = END_TSO_QUEUE;
 #endif
 
-  blackhole_queue   = END_TSO_QUEUE;
-
   sched_state    = SCHED_RUNNING;
   recent_activity = ACTIVITY_YES;
 
@@ -2347,8 +2213,9 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
    * of the stack, so we don't attempt to scavenge any part of the
    * dead TSO's stack.
    */
-  tso->what_next = ThreadRelocated;
   setTSOLink(cap,tso,dest);
+  write_barrier(); // other threads seeing ThreadRelocated will look at _link
+  tso->what_next = ThreadRelocated;
   tso->sp = (P_)&(tso->stack[tso->stack_size]);
   tso->why_blocked = NotBlocked;
 
@@ -2405,8 +2272,9 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
 
-    tso->what_next = ThreadRelocated;
     tso->_link = new_tso; // no write barrier reqd: same generation
+    write_barrier(); // other threads seeing ThreadRelocated will look at _link
+    tso->what_next = ThreadRelocated;
 
     // The TSO attached to this Task may have moved, so update the
     // pointer to it.
@@ -2458,57 +2326,6 @@ void wakeUpRts(void)
 #endif
 
 /* -----------------------------------------------------------------------------
- * checkBlackHoles()
- *
- * Check the blackhole_queue for threads that can be woken up.  We do
- * this periodically: before every GC, and whenever the run queue is
- * empty.
- *
- * An elegant solution might be to just wake up all the blocked
- * threads with awakenBlockedQueue occasionally: they'll go back to
- * sleep again if the object is still a BLACKHOLE.  Unfortunately this
- * doesn't give us a way to tell whether we've actually managed to
- * wake up any threads, so we would be busy-waiting.
- *
- * -------------------------------------------------------------------------- */
-
-static rtsBool
-checkBlackHoles (Capability *cap)
-{
-    StgTSO **prev, *t;
-    rtsBool any_woke_up = rtsFalse;
-    StgHalfWord type;
-
-    // blackhole_queue is global:
-    ASSERT_LOCK_HELD(&sched_mutex);
-
-    debugTrace(DEBUG_sched, "checking threads blocked on black holes");
-
-    // ASSUMES: sched_mutex
-    prev = &blackhole_queue;
-    t = blackhole_queue;
-    while (t != END_TSO_QUEUE) {
-        if (t->what_next == ThreadRelocated) {
-            t = t->_link;
-            continue;
-        }
-       ASSERT(t->why_blocked == BlockedOnBlackHole);
-       type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
-       if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
-           IF_DEBUG(sanity,checkTSO(t));
-           t = unblockOne(cap, t);
-           *prev = t;
-           any_woke_up = rtsTrue;
-       } else {
-           prev = &t->_link;
-           t = t->_link;
-       }
-    }
-
-    return any_woke_up;
-}
-
-/* -----------------------------------------------------------------------------
    Deleting threads
 
    This is used for interruption (^C) and forking, and corresponds to
@@ -2517,7 +2334,7 @@ checkBlackHoles (Capability *cap)
    -------------------------------------------------------------------------- */
 
 static void 
-deleteThread (Capability *cap, StgTSO *tso)
+deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
 {
     // NOTE: must only be called on a TSO that we have exclusive
     // access to, because we will call throwToSingleThreaded() below.
@@ -2526,7 +2343,7 @@ deleteThread (Capability *cap, StgTSO *tso)
 
     if (tso->why_blocked != BlockedOnCCall &&
        tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
-       throwToSingleThreaded(cap,tso,NULL);
+       throwToSingleThreaded(tso->cap,tso,NULL);
     }
 }
 
@@ -2599,8 +2416,8 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
                SET_HDR(raise_closure, &stg_raise_info, CCCS);
                raise_closure->payload[0] = exception;
            }
-           UPD_IND(cap, ((StgUpdateFrame *)p)->updatee,
-                    (StgClosure *)raise_closure);
+            updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
+                        (StgClosure *)raise_closure);
            p = next;
            continue;