scheduleYield(): check the wakeup queue before yielding
[ghc-hetmet.git] / rts / Schedule.c
index 552c2c9..6c46c09 100644 (file)
@@ -31,6 +31,8 @@
 #include "Updates.h"
 #include "Proftimer.h"
 #include "ProfHeap.h"
+#include "GC.h"
+#include "Weak.h"
 
 /* PARALLEL_HASKELL includes go here */
 
@@ -89,6 +91,12 @@ StgTSO *blackhole_queue = NULL;
  */
 rtsBool blackholes_need_checking = rtsFalse;
 
+/* Set to true when the latest garbage collection failed to reclaim
+ * enough space, and the runtime should proceed to shut itself down in
+ * an orderly fashion (emitting profiling info etc.)
+ */
+rtsBool heap_overflow = rtsFalse;
+
 /* flag that tracks whether we have done any execution in this time slice.
  * LOCK: currently none, perhaps we should lock (but needs to be
  * updated in the fast path of the scheduler).
@@ -274,6 +282,12 @@ schedule (Capability *initialCapability, Task *task)
              "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
              task, initialCapability);
 
+  if (running_finalizers) {
+      errorBelch("error: a C finalizer called back into Haskell.\n"
+                 "   use Foreign.Concurrent.newForeignPtr for Haskell finalizers.");
+      stg_exit(EXIT_FAILURE);
+  }
+
   schedulePreLoop();
 
   // -----------------------------------------------------------
@@ -730,6 +744,7 @@ scheduleYield (Capability **pcap, Task *task)
     // if we have work, and we don't need to give up the Capability, continue.
     if (!shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
+         !emptyWakeupQueue(cap) ||
          blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
@@ -770,9 +785,11 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
 
     // Check whether we have more threads on our run queue, or sparks
     // in our pool, that we could hand to another Capability.
-    if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
-       && sparkPoolSizeCap(cap) < 2) {
-       return;
+    if (cap->run_queue_hd == END_TSO_QUEUE) {
+        if (sparkPoolSizeCap(cap) < 2) return;
+    } else {
+        if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
+            sparkPoolSizeCap(cap) < 1) return;
     }
 
     // First grab as many free Capabilities as we can.
@@ -1001,12 +1018,11 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
-       cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
+       cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
+        // when force_major == rtsTrue. scheduleDoGC sets
+        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
+        // signal.
 
-       recent_activity = ACTIVITY_DONE_GC;
-        // disable timer signals (see #1623)
-        stopTimer();
-       
        if ( !emptyRunQueue(cap) ) return;
 
 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
@@ -1394,6 +1410,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
               (unsigned long)t->id, whatNext_strs[t->what_next]);
 
+    // blocked exceptions can now complete, even if the thread was in
+    // blocked mode (see #2910).  This unconditionally calls
+    // lockTSO(), which ensures that we don't miss any threads that
+    // are engaged in throwTo() with this thread as a target.
+    awakenBlockedExceptionQueue (cap, t);
+
       //
       // Check whether the thread that just completed was a bound
       // thread, and if so return with the result.  
@@ -1436,7 +1458,11 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
                  *(task->ret) = NULL;
              }
              if (sched_state >= SCHED_INTERRUPTING) {
-                 task->stat = Interrupted;
+                  if (heap_overflow) {
+                      task->stat = HeapExhausted;
+                  } else {
+                      task->stat = Interrupted;
+                  }
              } else {
                  task->stat = Killed;
              }
@@ -1479,7 +1505,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
 #ifdef THREADED_RTS
     /* extern static volatile StgWord waiting_for_gc; 
        lives inside capability.c */
-    rtsBool was_waiting;
+    rtsBool gc_type, prev_pending_gc;
     nat i;
 #endif
 
@@ -1491,6 +1517,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     }
 
 #ifdef THREADED_RTS
+    if (sched_state < SCHED_INTERRUPTING
+        && RtsFlags.ParFlags.parGcEnabled
+        && N >= RtsFlags.ParFlags.parGcGen
+        && ! oldest_gen->steps[0].mark)
+    {
+        gc_type = PENDING_GC_PAR;
+    } else {
+        gc_type = PENDING_GC_SEQ;
+    }
+
     // In order to GC, there must be no threads running Haskell code.
     // Therefore, the GC thread needs to hold *all* the capabilities,
     // and release them after the GC has completed.  
@@ -1501,39 +1537,55 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     // actually did the GC.  But it's quite hard to arrange for all
     // the other tasks to sleep and stay asleep.
     //
-       
+
     /*  Other capabilities are prevented from running yet more Haskell
        threads if waiting_for_gc is set. Tested inside
        yieldCapability() and releaseCapability() in Capability.c */
 
-    was_waiting = cas(&waiting_for_gc, 0, 1);
-    if (was_waiting) {
+    prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
+    if (prev_pending_gc) {
        do {
-           debugTrace(DEBUG_sched, "someone else is trying to GC...");
-           if (cap) yieldCapability(&cap,task);
+           debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", 
+                       prev_pending_gc);
+            ASSERT(cap);
+            yieldCapability(&cap,task);
        } while (waiting_for_gc);
        return cap;  // NOTE: task->cap might have changed here
     }
 
     setContextSwitches();
-    for (i=0; i < n_capabilities; i++) {
-       debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
-       if (cap != &capabilities[i]) {
-           Capability *pcap = &capabilities[i];
-           // we better hope this task doesn't get migrated to
-           // another Capability while we're waiting for this one.
-           // It won't, because load balancing happens while we have
-           // all the Capabilities, but even so it's a slightly
-           // unsavoury invariant.
-           task->cap = pcap;
-           waitForReturnCapability(&pcap, task);
-           if (pcap != &capabilities[i]) {
-               barf("scheduleDoGC: got the wrong capability");
-           }
-       }
+
+    // The final shutdown GC is always single-threaded, because it's
+    // possible that some of the Capabilities have no worker threads.
+    
+    if (gc_type == PENDING_GC_SEQ)
+    {
+        // single-threaded GC: grab all the capabilities
+        for (i=0; i < n_capabilities; i++) {
+            debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
+            if (cap != &capabilities[i]) {
+                Capability *pcap = &capabilities[i];
+                // we better hope this task doesn't get migrated to
+                // another Capability while we're waiting for this one.
+                // It won't, because load balancing happens while we have
+                // all the Capabilities, but even so it's a slightly
+                // unsavoury invariant.
+                task->cap = pcap;
+                waitForReturnCapability(&pcap, task);
+                if (pcap != &capabilities[i]) {
+                    barf("scheduleDoGC: got the wrong capability");
+                }
+            }
+        }
     }
+    else
+    {
+        // multi-threaded GC: make sure all the Capabilities donate one
+        // GC thread each.
+        debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
 
-    waiting_for_gc = rtsFalse;
+        waitForGcThreads(cap);
+    }
 #endif
 
     // so this happens periodically:
@@ -1541,34 +1593,67 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     
     IF_DEBUG(scheduler, printAllThreads());
 
+delete_threads_and_gc:
     /*
      * We now have all the capabilities; if we're in an interrupting
      * state, then we should take the opportunity to delete all the
      * threads in the system.
      */
-    if (sched_state >= SCHED_INTERRUPTING) {
-       deleteAllThreads(&capabilities[0]);
+    if (sched_state == SCHED_INTERRUPTING) {
+       deleteAllThreads(cap);
        sched_state = SCHED_SHUTTING_DOWN;
     }
     
     heap_census = scheduleNeedHeapProfile(rtsTrue);
 
-    /* everybody back, start the GC.
-     * Could do it in this thread, or signal a condition var
-     * to do it in another thread.  Either way, we need to
-     * broadcast on gc_pending_cond afterward.
-     */
+    if (recent_activity == ACTIVITY_INACTIVE && force_major)
+    {
+        // We are doing a GC because the system has been idle for a
+        // timeslice and we need to check for deadlock.  Record the
+        // fact that we've done a GC and turn off the timer signal;
+        // it will get re-enabled if we run any threads after the GC.
+        //
+        // Note: this is done before GC, because after GC there might
+        // be threads already running (GarbageCollect() releases the
+        // GC threads when it completes), so we risk turning off the
+        // timer signal when it should really be on.
+        recent_activity = ACTIVITY_DONE_GC;
+        stopTimer();
+    }
+
 #if defined(THREADED_RTS)
     debugTrace(DEBUG_sched, "doing GC");
+    // reset waiting_for_gc *before* GC, so that when the GC threads
+    // emerge they don't immediately re-enter the GC.
+    waiting_for_gc = 0;
+    GarbageCollect(force_major || heap_census, gc_type, cap);
+#else
+    GarbageCollect(force_major || heap_census, 0, cap);
 #endif
-    GarbageCollect(force_major || heap_census);
-    
+
     if (heap_census) {
         debugTrace(DEBUG_sched, "performing heap census");
         heapCensus();
        performHeapProfile = rtsFalse;
     }
 
+    if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
+        // GC set the heap_overflow flag, so we should proceed with
+        // an orderly shutdown now.  Ultimately we want the main
+        // thread to return to its caller with HeapExhausted, at which
+        // point the caller should call hs_exit().  The first step is
+        // to delete all the threads.
+        //
+        // Another way to do this would be to raise an exception in
+        // the main thread, which we really should do because it gives
+        // the program a chance to clean up.  But how do we find the
+        // main thread?  It should presumably be the same one that
+        // gets ^C exceptions, but that's all done on the Haskell side
+        // (GHC.TopHandler).
+       sched_state = SCHED_INTERRUPTING;
+        goto delete_threads_and_gc;
+    }
+
 #ifdef SPARKBALANCE
     /* JB 
        Once we are all together... this would be the place to balance all
@@ -1578,12 +1663,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
 #endif
 
 #if defined(THREADED_RTS)
-    // release our stash of capabilities.
-    for (i = 0; i < n_capabilities; i++) {
-       if (cap != &capabilities[i]) {
-           task->cap = &capabilities[i];
-           releaseCapability(&capabilities[i]);
-       }
+    if (gc_type == PENDING_GC_SEQ) {
+        // release our stash of capabilities.
+        for (i = 0; i < n_capabilities; i++) {
+            if (cap != &capabilities[i]) {
+                task->cap = &capabilities[i];
+                releaseCapability(&capabilities[i]);
+            }
+        }
     }
     if (cap) {
        task->cap = cap;
@@ -1908,7 +1995,10 @@ resumeThread (void *task_)
     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
     
     if (tso->why_blocked == BlockedOnCCall) {
-       awakenBlockedExceptionQueue(cap,tso);
+        // avoid locking the TSO if we don't have to
+        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+            awakenBlockedExceptionQueue(cap,tso);
+        }
        tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
     }
     
@@ -2016,9 +2106,22 @@ workerStart(Task *task)
     // schedule() runs without a lock.
     cap = schedule(cap,task);
 
-    // On exit from schedule(), we have a Capability.
-    releaseCapability(cap);
+    // On exit from schedule(), we have a Capability, but possibly not
+    // the same one we started with.
+
+    // During shutdown, the requirement is that after all the
+    // Capabilities are shut down, all workers that are shutting down
+    // have finished workerTaskStop().  This is why we hold on to
+    // cap->lock until we've finished workerTaskStop() below.
+    //
+    // There may be workers still involved in foreign calls; those
+    // will just block in waitForReturnCapability() because the
+    // Capability has been shut down.
+    //
+    ACQUIRE_LOCK(&cap->lock);
+    releaseCapability_(cap,rtsFalse);
     workerTaskStop(task);
+    RELEASE_LOCK(&cap->lock);
 }
 #endif
 
@@ -2109,7 +2212,13 @@ exitScheduler(
     // If we haven't killed all the threads yet, do it now.
     if (sched_state < SCHED_SHUTTING_DOWN) {
        sched_state = SCHED_INTERRUPTING;
-       scheduleDoGC(NULL,task,rtsFalse);    
+#if defined(THREADED_RTS)
+        waitForReturnCapability(&task->cap,task);
+       scheduleDoGC(task->cap,task,rtsFalse);    
+        releaseCapability(task->cap);
+#else
+       scheduleDoGC(&MainCapability,task,rtsFalse);    
+#endif
     }
     sched_state = SCHED_SHUTTING_DOWN;
 
@@ -2121,7 +2230,6 @@ exitScheduler(
            shutdownCapability(&capabilities[i], task, wait_foreign);
        }
        boundTaskExiting(task);
-       stopTaskManager();
     }
 #endif
 }
@@ -2129,11 +2237,23 @@ exitScheduler(
 void
 freeScheduler( void )
 {
-    freeCapabilities();
-    freeTaskManager();
-    if (n_capabilities != 1) {
-        stgFree(capabilities);
+    nat still_running;
+
+    ACQUIRE_LOCK(&sched_mutex);
+    still_running = freeTaskManager();
+    // We can only free the Capabilities if there are no Tasks still
+    // running.  We might have a Task about to return from a foreign
+    // call into waitForReturnCapability(), for example (actually,
+    // this should be the *only* thing that a still-running Task can
+    // do at this point, and it will block waiting for the
+    // Capability).
+    if (still_running == 0) {
+        freeCapabilities();
+        if (n_capabilities != 1) {
+            stgFree(capabilities);
+        }
     }
+    RELEASE_LOCK(&sched_mutex);
 #if defined(THREADED_RTS)
     closeMutex(&sched_mutex);
 #endif
@@ -2151,13 +2271,17 @@ static void
 performGC_(rtsBool force_major)
 {
     Task *task;
+
     // We must grab a new Task here, because the existing Task may be
     // associated with a particular Capability, and chained onto the 
     // suspended_ccalling_tasks queue.
     ACQUIRE_LOCK(&sched_mutex);
     task = newBoundTask();
     RELEASE_LOCK(&sched_mutex);
-    scheduleDoGC(NULL,task,force_major);
+
+    waitForReturnCapability(&task->cap,task);
+    scheduleDoGC(task->cap,task,force_major);
+    releaseCapability(task->cap);
     boundTaskExiting(task);
 }