[project @ 2005-05-09 10:10:33 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
index 206531d..ad27b74 100644 (file)
@@ -145,10 +145,16 @@ StgTSO *run_queue_hd = NULL;
 StgTSO *run_queue_tl = NULL;
 StgTSO *blocked_queue_hd = NULL;
 StgTSO *blocked_queue_tl = NULL;
+StgTSO *blackhole_queue = NULL;
 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
 
 #endif
 
+/* The blackhole_queue should be checked for threads to wake up.  See
+ * Schedule.h for more thorough comment.
+ */
+rtsBool blackholes_need_checking = rtsFalse;
+
 /* Linked list of all threads.
  * Used for detecting garbage collected threads.
  */
@@ -168,14 +174,12 @@ static StgTSO *suspended_ccalling_threads;
 /* flag set by signal handler to precipitate a context switch */
 int context_switch = 0;
 
+/* flag that tracks whether we have done any execution in this time slice. */
+nat recent_activity = ACTIVITY_YES;
+
 /* if this flag is set as well, give up execution */
 rtsBool interrupted = rtsFalse;
 
-/* If this flag is set, we are running Haskell code.  Used to detect
- * uses of 'foreign import unsafe' that should be 'safe'.
- */
-static rtsBool in_haskell = rtsFalse;
-
 /* Next thread ID to allocate.
  * Locks required: thread_id_mutex
  */
@@ -211,12 +215,6 @@ StgTSO *CurrentTSO;
  */
 StgTSO dummy_tso;
 
-# if defined(SMP)
-static Condition gc_pending_cond = INIT_COND_VAR;
-# endif
-
-static rtsBool ready_to_gc;
-
 /*
  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
  * in an MT setting, needed to signal that a worker thread shouldn't hang around
@@ -267,9 +265,9 @@ static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 // scheduler clearer.
 //
 static void schedulePreLoop(void);
-static void scheduleHandleInterrupt(void);
 static void scheduleStartSignalHandlers(void);
 static void scheduleCheckBlockedThreads(void);
+static void scheduleCheckBlackHoles(void);
 static void scheduleDetectDeadlock(void);
 #if defined(GRAN)
 static StgTSO *scheduleProcessEvent(rtsEvent *event);
@@ -289,10 +287,11 @@ static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
 static void scheduleHandleThreadBlocked( StgTSO *t );
 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
                                             Capability *cap, StgTSO *t );
-static void scheduleDoHeapProfile(void);
-static void scheduleDoGC(void);
+static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
+static void scheduleDoGC(Capability *cap);
 
 static void unblockThread(StgTSO *tso);
+static rtsBool checkBlackHoles(void);
 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
                                   Capability *initialCapability
                                   );
@@ -317,34 +316,33 @@ StgTSO * activateSpark (rtsSpark spark);
  * ------------------------------------------------------------------------- */
 
 #if defined(RTS_SUPPORTS_THREADS)
-static rtsBool startingWorkerThread = rtsFalse;
+static nat startingWorkerThread = 0;
 
 static void
 taskStart(void)
 {
   ACQUIRE_LOCK(&sched_mutex);
-  startingWorkerThread = rtsFalse;
+  startingWorkerThread--;
   schedule(NULL,NULL);
+  taskStop();
   RELEASE_LOCK(&sched_mutex);
 }
 
 void
 startSchedulerTaskIfNecessary(void)
 {
-  if(run_queue_hd != END_TSO_QUEUE
-    || blocked_queue_hd != END_TSO_QUEUE
-    || sleeping_queue != END_TSO_QUEUE)
-  {
-    if(!startingWorkerThread)
-    { // we don't want to start another worker thread
-      // just because the last one hasn't yet reached the
-      // "waiting for capability" state
-      startingWorkerThread = rtsTrue;
-      if (!startTask(taskStart)) {
-         startingWorkerThread = rtsFalse;
-      }
+    if ( !EMPTY_RUN_QUEUE()
+        && !shutting_down_scheduler // not if we're shutting down
+        && startingWorkerThread==0)
+    {
+       // we don't want to start another worker thread
+       // just because the last one hasn't yet reached the
+       // "waiting for capability" state
+       startingWorkerThread++;
+       if (!maybeStartNewWorker(taskStart)) {
+           startingWorkerThread--;
+       }
     }
-  }
 }
 #endif
 
@@ -423,6 +421,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 # endif
 #endif
   nat prev_what_next;
+  rtsBool ready_to_gc;
   
   // Pre-condition: sched_mutex is held.
   // We might have a capability, passed in as initialCapability.
@@ -461,19 +460,6 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
       IF_DEBUG(scheduler, printAllThreads());
 
-#if defined(SMP)
-      // 
-      // Wait until GC has completed, if necessary.
-      //
-      if (ready_to_gc) {
-         if (cap != NULL) {
-             releaseCapability(cap);
-             IF_DEBUG(scheduler,sched_belch("waiting for GC"));
-             waitCondition( &gc_pending_cond, &sched_mutex );
-         }
-      }
-#endif
-
 #if defined(RTS_SUPPORTS_THREADS)
       // Yield the capability to higher-priority tasks if necessary.
       //
@@ -490,17 +476,45 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
       // We now have a capability...
 #endif
+      
+#if 0 /* extra sanity checking */
+      { 
+         StgMainThread *m;
+         for (m = main_threads; m != NULL; m = m->link) {
+             ASSERT(get_itbl(m->tso)->type == TSO);
+         }
+      }
+#endif
 
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
-    if (in_haskell) {
+    if (cap->r.rInHaskell) {
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(1);
     }
 
-    scheduleHandleInterrupt();
+    //
+    // Test for interruption.  If interrupted==rtsTrue, then either
+    // we received a keyboard interrupt (^C), or the scheduler is
+    // trying to shut down all the tasks (shutting_down_scheduler) in
+    // the threaded RTS.
+    //
+    if (interrupted) {
+       if (shutting_down_scheduler) {
+           IF_DEBUG(scheduler, sched_belch("shutting down"));
+           releaseCapability(cap);
+           if (mainThread) {
+               mainThread->stat = Interrupted;
+               mainThread->ret  = NULL;
+           }
+           return;
+       } else {
+           IF_DEBUG(scheduler, sched_belch("interrupted"));
+           deleteAllThreads();
+       }
+    }
 
 #if defined(not_yet) && defined(SMP)
     //
@@ -526,6 +540,12 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
 
     scheduleStartSignalHandlers();
 
+    // Only check the black holes here if we've nothing else to do.
+    // During normal execution, the black hole list only gets checked
+    // at GC time, to avoid repeatedly traversing this possibly long
+    // list each time around the scheduler.
+    if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
+
     scheduleCheckBlockedThreads();
 
     scheduleDetectDeadlock();
@@ -652,13 +672,15 @@ run_thread:
     startHeapProfTimer();
 #endif
 
-    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
-    /* Run the current thread 
-     */
+    // ----------------------------------------------------------------------
+    // Run the current thread 
+
     prev_what_next = t->what_next;
 
     errno = t->saved_errno;
-    in_haskell = rtsTrue;
+    cap->r.rInHaskell = rtsTrue;
+
+    recent_activity = ACTIVITY_YES;
 
     switch (prev_what_next) {
 
@@ -680,7 +702,13 @@ run_thread:
       barf("schedule: invalid what_next field");
     }
 
-    in_haskell = rtsFalse;
+    // We have run some Haskell code: there might be blackhole-blocked
+    // threads to wake up now.
+    if ( blackhole_queue != END_TSO_QUEUE ) {
+       blackholes_need_checking = rtsTrue;
+    }
+
+    cap->r.rInHaskell = rtsFalse;
 
     // The TSO might have moved, eg. if it re-entered the RTS and a GC
     // happened.  So find the new location:
@@ -689,7 +717,7 @@ run_thread:
     // And save the current errno in this thread.
     t->saved_errno = errno;
 
-    /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
+    // ----------------------------------------------------------------------
     
     /* Costs for the scheduler are assigned to CCS_SYSTEM */
 #if defined(PROFILING)
@@ -707,6 +735,8 @@ run_thread:
     
     schedulePostRunThread();
 
+    ready_to_gc = rtsFalse;
+
     switch (ret) {
     case HeapOverflow:
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
@@ -736,8 +766,8 @@ run_thread:
       barf("schedule: invalid thread return code %d", (int)ret);
     }
 
-    scheduleDoHeapProfile();
-    scheduleDoGC();
+    if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
+    if (ready_to_gc) { scheduleDoGC(cap); }
   } /* end of while() */
 
   IF_PAR_DEBUG(verbose,
@@ -772,33 +802,6 @@ schedulePreLoop(void)
 }
 
 /* ----------------------------------------------------------------------------
- * Deal with the interrupt flag
- * ASSUMES: sched_mutex
- * ------------------------------------------------------------------------- */
-
-static
-void scheduleHandleInterrupt(void)
-{
-    //
-    // Test for interruption.  If interrupted==rtsTrue, then either
-    // we received a keyboard interrupt (^C), or the scheduler is
-    // trying to shut down all the tasks (shutting_down_scheduler) in
-    // the threaded RTS.
-    //
-    if (interrupted) {
-       if (shutting_down_scheduler) {
-           IF_DEBUG(scheduler, sched_belch("shutting down"));
-#if defined(RTS_SUPPORTS_THREADS)
-           shutdownThread();
-#endif
-       } else {
-           IF_DEBUG(scheduler, sched_belch("interrupted"));
-           deleteAllThreads();
-       }
-    }
-}
-
-/* ----------------------------------------------------------------------------
  * Start any pending signal handlers
  * ASSUMES: sched_mutex
  * ------------------------------------------------------------------------- */
@@ -806,7 +809,7 @@ void scheduleHandleInterrupt(void)
 static void
 scheduleStartSignalHandlers(void)
 {
-#if defined(RTS_USER_SIGNALS)
+#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
     if (signals_pending()) {
       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
       startSignalHandlers();
@@ -834,7 +837,22 @@ scheduleCheckBlockedThreads(void)
        // We shouldn't be here...
        barf("schedule: awaitEvent() in threaded RTS");
 #endif
-       awaitEvent( EMPTY_RUN_QUEUE() );
+       awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+    }
+}
+
+
+/* ----------------------------------------------------------------------------
+ * Check for threads blocked on BLACKHOLEs that can be woken up
+ * ASSUMES: sched_mutex
+ * ------------------------------------------------------------------------- */
+static void
+scheduleCheckBlackHoles( void )
+{
+    if ( blackholes_need_checking )
+    {
+       checkBlackHoles();
+       blackholes_need_checking = rtsFalse;
     }
 }
 
@@ -846,20 +864,30 @@ scheduleCheckBlockedThreads(void)
 static void
 scheduleDetectDeadlock(void)
 {
+
+#if defined(PARALLEL_HASKELL)
+    // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
+    return;
+#endif
+
     /* 
      * Detect deadlock: when we have no threads to run, there are no
-     * threads waiting on I/O or sleeping, and all the other tasks are
-     * waiting for work, we must have a deadlock of some description.
-     *
-     * We first try to find threads blocked on themselves (ie. black
-     * holes), and generate NonTermination exceptions where necessary.
-     *
-     * If no threads are black holed, we have a deadlock situation, so
-     * inform all the main threads.
+     * threads blocked, waiting for I/O, or sleeping, and all the
+     * other tasks are waiting for work, we must have a deadlock of
+     * some description.
      */
-#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
     if ( EMPTY_THREAD_QUEUES() )
     {
+#if defined(RTS_SUPPORTS_THREADS)
+       /* 
+        * In the threaded RTS, we only check for deadlock if there
+        * has been no activity in a complete timeslice.  This means
+        * we won't eagerly start a full GC just because we don't have
+        * any threads to run currently.
+        */
+       if (recent_activity != ACTIVITY_INACTIVE) return;
+#endif
+
        IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
 
        // Garbage collection can release some new threads due to
@@ -868,9 +896,10 @@ scheduleDetectDeadlock(void)
        // exception.  Any threads thus released will be immediately
        // runnable.
        GarbageCollect(GetRoots,rtsTrue);
+       recent_activity = ACTIVITY_DONE_GC;
        if ( !EMPTY_RUN_QUEUE() ) return;
 
-#if defined(RTS_USER_SIGNALS)
+#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
        /* If we have user-installed signal handlers, then wait
         * for signals to arrive rather then bombing out with a
         * deadlock.
@@ -892,6 +921,7 @@ scheduleDetectDeadlock(void)
        }
 #endif
 
+#if !defined(RTS_SUPPORTS_THREADS)
        /* Probably a real deadlock.  Send the current main thread the
         * Deadlock exception (or in the SMP build, send *all* main
         * threads the deadlock exception, since none of them can make
@@ -901,6 +931,7 @@ scheduleDetectDeadlock(void)
            StgMainThread *m;
            m = main_threads;
            switch (m->tso->why_blocked) {
+           case BlockedOnSTM:
            case BlockedOnBlackHole:
            case BlockedOnException:
            case BlockedOnMVar:
@@ -910,13 +941,8 @@ scheduleDetectDeadlock(void)
                barf("deadlock: main thread blocked in a strange way");
            }
        }
-    }
-
-#elif defined(RTS_SUPPORTS_THREADS)
-    // ToDo: add deadlock detection in threaded RTS
-#elif defined(PARALLEL_HASKELL)
-    // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
 #endif
+    }
 }
 
 /* ----------------------------------------------------------------------------
@@ -1441,12 +1467,12 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
     if (cap->r.rHpAlloc > BLOCK_SIZE) {
        // if so, get one and push it on the front of the nursery.
        bdescr *bd;
-       nat blocks;
+       lnat blocks;
        
-       blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
+       blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
        
        IF_DEBUG(scheduler,
-                debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
+                debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
                            (long)t->id, whatNext_strs[t->what_next], blocks));
        
        // don't do this if it would push us over the
@@ -1462,9 +1488,12 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
            if (cap->r.rCurrentNursery->u.back != NULL) {
                cap->r.rCurrentNursery->u.back->link = bd;
            } else {
+#if !defined(SMP)
                ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
-                      g0s0->blocks == cap->r.rNursery);
-               cap->r.rNursery = g0s0->blocks = bd;
+                      g0s0 == cap->r.rNursery);
+               g0s0->blocks = bd;
+#endif
+               cap->r.rNursery->blocks = bd;
            }             
            cap->r.rCurrentNursery->u.back = bd;
            
@@ -1484,11 +1513,14 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
                }
            }
            
+#if !defined(SMP)
            // don't forget to update the block count in g0s0.
            g0s0->n_blocks += blocks;
+
            // This assert can be a killer if the app is doing lots
            // of large block allocations.
            ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
+#endif
            
            // now update the nursery to point to the new block
            cap->r.rCurrentNursery = bd;
@@ -1796,12 +1828,13 @@ scheduleHandleThreadFinished( StgMainThread *mainThread
          removeThreadLabel((StgWord)mainThread->tso->id);
 #endif
          if (mainThread->prev == NULL) {
+             ASSERT(mainThread == main_threads);
              main_threads = mainThread->link;
          } else {
              mainThread->prev->link = mainThread->link;
          }
          if (mainThread->link != NULL) {
-             mainThread->link->prev = NULL;
+             mainThread->link->prev = mainThread->prev;
          }
          releaseCapability(cap);
          return rtsTrue; // tells schedule() to return
@@ -1827,10 +1860,10 @@ scheduleHandleThreadFinished( StgMainThread *mainThread
  * Perform a heap census, if PROFILING
  * -------------------------------------------------------------------------- */
 
-static void
-scheduleDoHeapProfile(void)
+static rtsBool
+scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
 {
-#ifdef PROFILING
+#if defined(PROFILING)
     // When we have +RTS -i0 and we're heap profiling, do a census at
     // every GC.  This lets us get repeatable runs for debugging.
     if (performHeapProfile ||
@@ -1839,9 +1872,10 @@ scheduleDoHeapProfile(void)
        GarbageCollect(GetRoots, rtsTrue);
        heapCensus();
        performHeapProfile = rtsFalse;
-       ready_to_gc = rtsFalse; // we already GC'd
+       return rtsTrue;  // true <=> we already GC'd
     }
 #endif
+    return rtsFalse;
 }
 
 /* -----------------------------------------------------------------------------
@@ -1850,63 +1884,97 @@ scheduleDoHeapProfile(void)
  * -------------------------------------------------------------------------- */
 
 static void
-scheduleDoGC(void)
+scheduleDoGC( Capability *cap STG_UNUSED )
 {
     StgTSO *t;
+#ifdef SMP
+    static rtsBool waiting_for_gc;
+    int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
+           // subtract one because we're already holding one.
+    Capability *caps[n_capabilities];
+#endif
 
 #ifdef SMP
-    // The last task to stop actually gets to do the GC.  The rest
-    // of the tasks release their capabilities and wait gc_pending_cond.
-    if (ready_to_gc && allFreeCapabilities())
-#else
-    if (ready_to_gc)
+    // In order to GC, there must be no threads running Haskell code.
+    // Therefore, the GC thread needs to hold *all* the capabilities,
+    // and release them after the GC has completed.  
+    //
+    // This seems to be the simplest way: previous attempts involved
+    // making all the threads with capabilities give up their
+    // capabilities and sleep except for the *last* one, which
+    // actually did the GC.  But it's quite hard to arrange for all
+    // the other tasks to sleep and stay asleep.
+    //
+       
+    // Someone else is already trying to GC
+    if (waiting_for_gc) return;
+    waiting_for_gc = rtsTrue;
+
+    caps[n_capabilities] = cap;
+    while (n_capabilities > 0) {
+       IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
+       waitForReturnCapability(&sched_mutex, &cap);
+       n_capabilities--;
+       caps[n_capabilities] = cap;
+    }
+
+    waiting_for_gc = rtsFalse;
 #endif
-    {
-       /* Kick any transactions which are invalid back to their
-        * atomically frames.  When next scheduled they will try to
-        * commit, this commit will fail and they will retry.
-        */
-       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
-           if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
-               if (!stmValidateTransaction (t -> trec)) {
-                   IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-                   
-                   // strip the stack back to the ATOMICALLY_FRAME, aborting
-                   // the (nested) transaction, and saving the stack of any
-                   // partially-evaluated thunks on the heap.
-                   raiseAsync_(t, NULL, rtsTrue);
-                   
+
+    /* Kick any transactions which are invalid back to their
+     * atomically frames.  When next scheduled they will try to
+     * commit, this commit will fail and they will retry.
+     */
+    for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
+       if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+           if (!stmValidateTransaction (t -> trec)) {
+               IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+               
+               // strip the stack back to the ATOMICALLY_FRAME, aborting
+               // the (nested) transaction, and saving the stack of any
+               // partially-evaluated thunks on the heap.
+               raiseAsync_(t, NULL, rtsTrue);
+               
 #ifdef REG_R1
-                   ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+               ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
 #endif
-               }
            }
        }
-
-       /* everybody back, start the GC.
-        * Could do it in this thread, or signal a condition var
-        * to do it in another thread.  Either way, we need to
-        * broadcast on gc_pending_cond afterward.
-        */
+    }
+    
+    // so this happens periodically:
+    scheduleCheckBlackHoles();
+    
+    /* everybody back, start the GC.
+     * Could do it in this thread, or signal a condition var
+     * to do it in another thread.  Either way, we need to
+     * broadcast on gc_pending_cond afterward.
+     */
 #if defined(RTS_SUPPORTS_THREADS)
-       IF_DEBUG(scheduler,sched_belch("doing GC"));
+    IF_DEBUG(scheduler,sched_belch("doing GC"));
 #endif
-       GarbageCollect(GetRoots,rtsFalse);
-       ready_to_gc = rtsFalse;
+    GarbageCollect(GetRoots,rtsFalse);
+    
 #if defined(SMP)
-       broadcastCondition(&gc_pending_cond);
+    {
+       // release our stash of capabilities.
+       nat i;
+       for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
+           releaseCapability(caps[i]);
+       }
+    }
 #endif
+
 #if defined(GRAN)
-       /* add a ContinueThread event to continue execution of current thread */
-       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
-                 ContinueThread,
-                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
-       IF_GRAN_DEBUG(bq, 
-                     debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
-                     G_EVENTQ(0);
-                     G_CURR_THREADQ(0));
+    /* add a ContinueThread event to continue execution of current thread */
+    new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
+             ContinueThread,
+             t, (StgClosure*)NULL, (rtsSpark*)NULL);
+    IF_GRAN_DEBUG(bq, 
+                 debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
+                 G_EVENTQ(0);
+                 G_CURR_THREADQ(0));
 #endif /* GRAN */
-    }
 }
 
 /* ---------------------------------------------------------------------------
@@ -1917,7 +1985,7 @@ scheduleDoGC(void)
 StgBool
 rtsSupportsBoundThreads(void)
 {
-#ifdef THREADED_RTS
+#if defined(RTS_SUPPORTS_THREADS)
   return rtsTrue;
 #else
   return rtsFalse;
@@ -1931,7 +1999,7 @@ rtsSupportsBoundThreads(void)
 StgBool
 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
 {
-#ifdef THREADED_RTS
+#if defined(RTS_SUPPORTS_THREADS)
   return (tso->main != NULL);
 #endif
   return rtsFalse;
@@ -2025,8 +2093,12 @@ deleteAllThreads ( void )
   StgTSO* t, *next;
   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
-      next = t->global_link;
-      deleteThread(t);
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+         deleteThread(t);
+      }
   }      
 
   // The run queue now contains a bunch of ThreadKilled threads.  We
@@ -2036,6 +2108,7 @@ deleteAllThreads ( void )
   // being GC'd, and we don't want the "main thread has been GC'd" panic.
 
   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+  ASSERT(blackhole_queue == END_TSO_QUEUE);
   ASSERT(sleeping_queue == END_TSO_QUEUE);
 }
 
@@ -2092,6 +2165,7 @@ suspendThread( StgRegTable *reg )
   tok = cap->r.rCurrentTSO->id;
 
   /* Hand back capability */
+  cap->r.rInHaskell = rtsFalse;
   releaseCapability(cap);
   
 #if defined(RTS_SUPPORTS_THREADS)
@@ -2101,7 +2175,6 @@ suspendThread( StgRegTable *reg )
   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
 #endif
 
-  in_haskell = rtsFalse;
   RELEASE_LOCK(&sched_mutex);
   
   errno = saved_errno;
@@ -2149,7 +2222,7 @@ resumeThread( StgInt tok )
   tso->why_blocked  = NotBlocked;
 
   cap->r.rCurrentTSO = tso;
-  in_haskell = rtsTrue;
+  cap->r.rInHaskell = rtsTrue;
   RELEASE_LOCK(&sched_mutex);
   errno = saved_errno;
   return &cap->r;
@@ -2547,6 +2620,7 @@ initScheduler(void)
     blocked_queue_hds[i]  = END_TSO_QUEUE;
     blocked_queue_tls[i]  = END_TSO_QUEUE;
     ccalling_threadss[i]  = END_TSO_QUEUE;
+    blackhole_queue[i]    = END_TSO_QUEUE;
     sleeping_queue        = END_TSO_QUEUE;
   }
 #else
@@ -2554,6 +2628,7 @@ initScheduler(void)
   run_queue_tl      = END_TSO_QUEUE;
   blocked_queue_hd  = END_TSO_QUEUE;
   blocked_queue_tl  = END_TSO_QUEUE;
+  blackhole_queue   = END_TSO_QUEUE;
   sleeping_queue    = END_TSO_QUEUE;
 #endif 
 
@@ -2584,8 +2659,13 @@ initScheduler(void)
   initCapabilities();
   
 #if defined(RTS_SUPPORTS_THREADS)
-    /* start our haskell execution tasks */
-    startTaskManager(0,taskStart);
+  initTaskManager();
+#endif
+
+#if defined(SMP)
+  /* eagerly start some extra workers */
+  startingWorkerThread = RtsFlags.ParFlags.nNodes;
+  startTasks(RtsFlags.ParFlags.nNodes, taskStart);
 #endif
 
 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
@@ -2598,11 +2678,12 @@ initScheduler(void)
 void
 exitScheduler( void )
 {
+    interrupted = rtsTrue;
+    shutting_down_scheduler = rtsTrue;
 #if defined(RTS_SUPPORTS_THREADS)
-  stopTaskManager();
+    if (threadIsTask(osThreadId())) { taskStop(); }
+    stopTaskManager();
 #endif
-  interrupted = rtsTrue;
-  shutting_down_scheduler = rtsTrue;
 }
 
 /* ----------------------------------------------------------------------------
@@ -2709,6 +2790,10 @@ GetRoots( evac_fn evac )
   }
 #endif 
 
+  if (blackhole_queue != END_TSO_QUEUE) {
+      evac((StgClosure **)&blackhole_queue);
+  }
+
   if (suspended_ccalling_threads != END_TSO_QUEUE) {
       evac((StgClosure **)&suspended_ccalling_threads);
   }
@@ -2783,7 +2868,8 @@ performGCWithRoots(void (*get_roots)(evac_fn))
 static StgTSO *
 threadStackOverflow(StgTSO *tso)
 {
-  nat new_stack_size, new_tso_size, stack_words;
+  nat new_stack_size, stack_words;
+  lnat new_tso_size;
   StgPtr new_sp;
   StgTSO *dest;
 
@@ -2807,7 +2893,7 @@ threadStackOverflow(StgTSO *tso)
    * Finally round up so the TSO ends up as a whole number of blocks.
    */
   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
-  new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
+  new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
                                       TSO_STRUCT_SIZE)/sizeof(W_);
   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
@@ -3165,6 +3251,11 @@ interruptStgRts(void)
 {
     interrupted    = 1;
     context_switch = 1;
+    threadRunnable();
+    /* ToDo: if invoked from a signal handler, this threadRunnable
+     * only works if there's another thread (not this one) waiting to
+     * be woken up.
+     */
 }
 
 /* -----------------------------------------------------------------------------
@@ -3288,6 +3379,12 @@ unblockThread(StgTSO *tso)
              blocked_queue_tl = (StgTSO *)prev;
            }
          }
+#if defined(mingw32_HOST_OS)
+         /* (Cooperatively) signal that the worker thread should abort
+          * the request.
+          */
+         abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
          goto done;
        }
       }
@@ -3364,12 +3461,9 @@ unblockThread(StgTSO *tso)
     }
 
   case BlockedOnBlackHole:
-    ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
     {
-      StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
-
-      last = &bq->blocking_queue;
-      for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
+      last = &blackhole_queue;
+      for (t = blackhole_queue; t != END_TSO_QUEUE; 
           last = &t->link, t = t->link) {
        if (t == tso) {
          *last = tso->link;
@@ -3425,6 +3519,12 @@ unblockThread(StgTSO *tso)
              blocked_queue_tl = prev;
            }
          }
+#if defined(mingw32_HOST_OS)
+         /* (Cooperatively) signal that the worker thread should abort
+          * the request.
+          */
+         abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
          goto done;
        }
       }
@@ -3461,6 +3561,49 @@ unblockThread(StgTSO *tso)
 #endif
 
 /* -----------------------------------------------------------------------------
+ * checkBlackHoles()
+ *
+ * Check the blackhole_queue for threads that can be woken up.  We do
+ * this periodically: before every GC, and whenever the run queue is
+ * empty.
+ *
+ * An elegant solution might be to just wake up all the blocked
+ * threads with awakenBlockedQueue occasionally: they'll go back to
+ * sleep again if the object is still a BLACKHOLE.  Unfortunately this
+ * doesn't give us a way to tell whether we've actually managed to
+ * wake up any threads, so we would be busy-waiting.
+ *
+ * -------------------------------------------------------------------------- */
+
+static rtsBool
+checkBlackHoles( void )
+{
+    StgTSO **prev, *t;
+    rtsBool any_woke_up = rtsFalse;
+    StgHalfWord type;
+
+    IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
+
+    // ASSUMES: sched_mutex
+    prev = &blackhole_queue;
+    t = blackhole_queue;
+    while (t != END_TSO_QUEUE) {
+       ASSERT(t->why_blocked == BlockedOnBlackHole);
+       type = get_itbl(t->block_info.closure)->type;
+       if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
+           t = unblockOneLocked(t);
+           *prev = t;
+           any_woke_up = rtsTrue;
+       } else {
+           prev = &t->link;
+           t = t->link;
+       }
+    }
+
+    return any_woke_up;
+}
+
+/* -----------------------------------------------------------------------------
  * raiseAsync()
  *
  * The following function implements the magic for raising an
@@ -3644,12 +3787,12 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
 #ifdef PROFILING
            StgCatchFrame *cf = (StgCatchFrame *)frame;
 #endif
-           StgClosure *raise;
+           StgThunk *raise;
            
            // we've got an exception to raise, so let's pass it to the
            // handler in this frame.
            //
-           raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
+           raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
            TICK_ALLOC_SE_THK(1,0);
            SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
            raise->payload[0] = exception;
@@ -3687,7 +3830,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
            // fun field.
            //
            words = frame - sp - 1;
-           ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
+           ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
            
            ap->size = words;
            ap->fun  = (StgClosure *)sp[0];
@@ -3754,7 +3897,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
 StgWord
 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
 {
-    StgClosure *raise_closure = NULL;
+    StgThunk *raise_closure = NULL;
     StgPtr p, next;
     StgRetInfoTable *info;
     //
@@ -3791,11 +3934,11 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
            // Only create raise_closure if we need to.
            if (raise_closure == NULL) {
                raise_closure = 
-                   (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
+                   (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
                SET_HDR(raise_closure, &stg_raise_info, CCCS);
                raise_closure->payload[0] = exception;
            }
-           UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
+           UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
            p = next;
            continue;
 
@@ -3930,18 +4073,18 @@ printThreadBlockage(StgTSO *tso)
 {
   switch (tso->why_blocked) {
   case BlockedOnRead:
-    debugBelch("is blocked on read from fd %d", tso->block_info.fd);
+    debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
     break;
   case BlockedOnWrite:
-    debugBelch("is blocked on write to fd %d", tso->block_info.fd);
+    debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
     break;
 #if defined(mingw32_HOST_OS)
     case BlockedOnDoProc:
-    debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
+    debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
     break;
 #endif
   case BlockedOnDelay:
-    debugBelch("is blocked until %d", tso->block_info.target);
+    debugBelch("is blocked until %ld", tso->block_info.target);
     break;
   case BlockedOnMVar:
     debugBelch("is blocked on an MVar");
@@ -4162,25 +4305,6 @@ print_bq (StgClosure *node)
   } /* for */
   debugBelch("\n");
 }
-#else
-/* 
-   Nice and easy: only TSOs on the blocking queue
-*/
-void 
-print_bq (StgClosure *node)
-{
-  StgTSO *tso;
-
-  ASSERT(node!=(StgClosure*)NULL);         // sanity check
-  for (tso = ((StgBlockingQueue*)node)->blocking_queue;
-       tso != END_TSO_QUEUE; 
-       tso=tso->link) {
-    ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
-    ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
-    debugBelch(" TSO %d (%p),", tso->id, tso);
-  }
-  debugBelch("\n");
-}
 # endif
 
 #if defined(PARALLEL_HASKELL)