check for ThreadRelocated in checkBlackHoles()
[ghc-hetmet.git] / rts / Schedule.c
index 7c453d7..141c973 100644 (file)
@@ -32,6 +32,8 @@
 #include "Proftimer.h"
 #include "ProfHeap.h"
 #include "GC.h"
+#include "Weak.h"
+#include "EventLog.h"
 
 /* PARALLEL_HASKELL includes go here */
 
@@ -281,6 +283,12 @@ schedule (Capability *initialCapability, Task *task)
              "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
              task, initialCapability);
 
+  if (running_finalizers) {
+      errorBelch("error: a C finalizer called back into Haskell.\n"
+                 "   use Foreign.Concurrent.newForeignPtr for Haskell finalizers.");
+      stg_exit(EXIT_FAILURE);
+  }
+
   schedulePreLoop();
 
   // -----------------------------------------------------------
@@ -532,6 +540,8 @@ run_thread:
     }
 #endif
 
+    postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
+
     switch (prev_what_next) {
        
     case ThreadKilled:
@@ -580,6 +590,8 @@ run_thread:
     t->saved_winerror = GetLastError();
 #endif
 
+    postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
+
 #if defined(THREADED_RTS)
     // If ret is ThreadBlocked, and this Task is bound to the TSO that
     // blocked, we are in limbo - the TSO is now owned by whatever it
@@ -737,6 +749,7 @@ scheduleYield (Capability **pcap, Task *task)
     // if we have work, and we don't need to give up the Capability, continue.
     if (!shouldYieldCapability(cap,task) && 
         (!emptyRunQueue(cap) ||
+         !emptyWakeupQueue(cap) ||
          blackholes_need_checking ||
          sched_state >= SCHED_INTERRUPTING))
         return;
@@ -844,6 +857,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                } else {
                    debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
                    appendToRunQueue(free_caps[i],t);
+
+                    postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
+
                    if (t->bound) { t->bound->cap = free_caps[i]; }
                    t->cap = free_caps[i];
                    i++;
@@ -1260,7 +1276,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
               "--<< thread %ld (%s) stopped: HeapOverflow",
               (long)t->id, whatNext_strs[t->what_next]);
 
-    if (cap->context_switch) {
+    if (cap->r.rHpLim == NULL || cap->context_switch) {
         // Sometimes we miss a context switch, e.g. when calling
         // primitives in a tight loop, MAYBE_GC() doesn't check the
         // context switch flag, and we end up waiting for a GC.
@@ -1403,10 +1419,9 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
               (unsigned long)t->id, whatNext_strs[t->what_next]);
 
     // blocked exceptions can now complete, even if the thread was in
-    // blocked mode (see #2910).  The thread is already marked
-    // ThreadComplete, so any further throwTos will complete
-    // immediately and we don't need to worry about synchronising with
-    // those.
+    // blocked mode (see #2910).  This unconditionally calls
+    // lockTSO(), which ensures that we don't miss any threads that
+    // are engaged in throwTo() with this thread as a target.
     awakenBlockedExceptionQueue (cap, t);
 
       //
@@ -1553,6 +1568,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     
     if (gc_type == PENDING_GC_SEQ)
     {
+        postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
         // single-threaded GC: grab all the capabilities
         for (i=0; i < n_capabilities; i++) {
             debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
@@ -1575,6 +1591,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     {
         // multi-threaded GC: make sure all the Capabilities donate one
         // GC thread each.
+        postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
         debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
 
         waitForGcThreads(cap);
@@ -1600,6 +1617,7 @@ delete_threads_and_gc:
     heap_census = scheduleNeedHeapProfile(rtsTrue);
 
 #if defined(THREADED_RTS)
+    postEvent(cap, EVENT_GC_START, 0, 0);
     debugTrace(DEBUG_sched, "doing GC");
     // reset waiting_for_gc *before* GC, so that when the GC threads
     // emerge they don't immediately re-enter the GC.
@@ -1608,6 +1626,31 @@ delete_threads_and_gc:
 #else
     GarbageCollect(force_major || heap_census, 0, cap);
 #endif
+    postEvent(cap, EVENT_GC_END, 0, 0);
+
+    if (recent_activity == ACTIVITY_INACTIVE && force_major)
+    {
+        // We are doing a GC because the system has been idle for a
+        // timeslice and we need to check for deadlock.  Record the
+        // fact that we've done a GC and turn off the timer signal;
+        // it will get re-enabled if we run any threads after the GC.
+        recent_activity = ACTIVITY_DONE_GC;
+        stopTimer();
+    }
+    else
+    {
+        // the GC might have taken long enough for the timer to set
+        // recent_activity = ACTIVITY_INACTIVE, but we aren't
+        // necessarily deadlocked:
+        recent_activity = ACTIVITY_YES;
+    }
+
+#if defined(THREADED_RTS)
+    if (gc_type == PENDING_GC_PAR)
+    {
+        releaseGCThreads(cap);
+    }
+#endif
 
     if (heap_census) {
         debugTrace(DEBUG_sched, "performing heap census");
@@ -1640,16 +1683,6 @@ delete_threads_and_gc:
     balanceSparkPoolsCaps(n_capabilities, capabilities);
 #endif
 
-    if (force_major)
-    {
-        // We've just done a major GC and we don't need the timer
-        // signal turned on any more (#1623).
-        // NB. do this *before* releasing the Capabilities, to avoid
-        // deadlocks!
-        recent_activity = ACTIVITY_DONE_GC;
-        stopTimer();
-    }
-
 #if defined(THREADED_RTS)
     if (gc_type == PENDING_GC_SEQ) {
         // release our stash of capabilities.
@@ -1909,6 +1942,7 @@ suspendThread (StgRegTable *reg)
   task = cap->running_task;
   tso = cap->r.rCurrentTSO;
 
+  postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
   debugTrace(DEBUG_sched, 
             "thread %lu did a safe foreign call", 
             (unsigned long)cap->r.rCurrentTSO->id);
@@ -1980,10 +2014,15 @@ resumeThread (void *task_)
     tso = task->suspended_tso;
     task->suspended_tso = NULL;
     tso->_link = END_TSO_QUEUE; // no write barrier reqd
+
+    postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
     
     if (tso->why_blocked == BlockedOnCCall) {
-       awakenBlockedExceptionQueue(cap,tso);
+        // avoid locking the TSO if we don't have to
+        if (tso->blocked_exceptions != END_TSO_QUEUE) {
+            awakenBlockedExceptionQueue(cap,tso);
+        }
        tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
     }
     
@@ -2033,6 +2072,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
     if (cpu == cap->no) {
        appendToRunQueue(cap,tso);
     } else {
+        postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
        wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
     }
 #else
@@ -2085,6 +2125,10 @@ workerStart(Task *task)
     cap = task->cap;
     RELEASE_LOCK(&task->lock);
 
+    if (RtsFlags.ParFlags.setAffinity) {
+        setThreadAffinity(cap->no, n_capabilities);
+    }
+
     // set the thread-local pointer to the Task:
     taskEnter(task);
 
@@ -2172,8 +2216,6 @@ initScheduler(void)
   }
 #endif
 
-  trace(TRACE_sched, "start: %d capabilities", n_capabilities);
-
   RELEASE_LOCK(&sched_mutex);
 }
 
@@ -2188,22 +2230,16 @@ exitScheduler(
 {
     Task *task = NULL;
 
-#if defined(THREADED_RTS)
     ACQUIRE_LOCK(&sched_mutex);
     task = newBoundTask();
     RELEASE_LOCK(&sched_mutex);
-#endif
 
     // If we haven't killed all the threads yet, do it now.
     if (sched_state < SCHED_SHUTTING_DOWN) {
        sched_state = SCHED_INTERRUPTING;
-#if defined(THREADED_RTS)
         waitForReturnCapability(&task->cap,task);
        scheduleDoGC(task->cap,task,rtsFalse);    
         releaseCapability(task->cap);
-#else
-       scheduleDoGC(&MainCapability,task,rtsFalse);    
-#endif
     }
     sched_state = SCHED_SHUTTING_DOWN;
 
@@ -2508,6 +2544,10 @@ checkBlackHoles (Capability *cap)
     prev = &blackhole_queue;
     t = blackhole_queue;
     while (t != END_TSO_QUEUE) {
+        if (t->what_next == ThreadRelocated) {
+            t = t->_link;
+            continue;
+        }
        ASSERT(t->why_blocked == BlockedOnBlackHole);
        type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
        if (type != BLACKHOLE && type != CAF_BLACKHOLE) {