Fix a race in the deadlock-detection code
[ghc-hetmet.git] / rts / Schedule.c
index f53687a..7dd0634 100644 (file)
@@ -92,13 +92,15 @@ rtsBool blackholes_need_checking = rtsFalse;
 /* flag that tracks whether we have done any execution in this time slice.
  * LOCK: currently none, perhaps we should lock (but needs to be
  * updated in the fast path of the scheduler).
+ *
+ * NB. must be StgWord, we do xchg() on it.
  */
-nat recent_activity = ACTIVITY_YES;
+volatile StgWord recent_activity = ACTIVITY_YES;
 
 /* if this flag is set as well, give up execution
- * LOCK: none (changes once, from false->true)
+ * LOCK: none (changes monotonically)
  */
-rtsBool sched_state = SCHED_RUNNING;
+volatile StgWord sched_state = SCHED_RUNNING;
 
 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
  *  exists - earlier gccs apparently didn't.
@@ -137,17 +139,21 @@ static Capability *schedule (Capability *initialCapability, Task *task);
 // scheduler clearer.
 //
 static void schedulePreLoop (void);
+static void scheduleFindWork (Capability *cap);
 #if defined(THREADED_RTS)
-static void schedulePushWork(Capability *cap, Task *task);
+static void scheduleYield (Capability **pcap, Task *task);
 #endif
 static void scheduleStartSignalHandlers (Capability *cap);
 static void scheduleCheckBlockedThreads (Capability *cap);
 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
 static void scheduleCheckBlackHoles (Capability *cap);
 static void scheduleDetectDeadlock (Capability *cap, Task *task);
+static void schedulePushWork(Capability *cap, Task *task);
 #if defined(PARALLEL_HASKELL)
 static rtsBool scheduleGetRemoteWork(Capability *cap);
 static void scheduleSendPendingMessages(void);
+#endif
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void scheduleActivateSpark(Capability *cap);
 #endif
 static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -281,23 +287,6 @@ schedule (Capability *initialCapability, Task *task)
 
   while (TERMINATION_CONDITION) {
 
-#if defined(THREADED_RTS)
-      if (first) {
-         // don't yield the first time, we want a chance to run this
-         // thread for a bit, even if there are others banging at the
-         // door.
-         first = rtsFalse;
-         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
-      } else {
-         // Yield the capability to higher-priority tasks if necessary.
-         yieldCapability(&cap, task);
-      }
-#endif
-      
-#if defined(THREADED_RTS)
-      schedulePushWork(cap,task);
-#endif
-
     // Check whether we have re-entered the RTS from Haskell without
     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
     // call).
@@ -351,7 +340,14 @@ schedule (Capability *initialCapability, Task *task)
 #endif
        /* scheduleDoGC() deletes all the threads */
        cap = scheduleDoGC(cap,task,rtsFalse);
-       break;
+
+        // after scheduleDoGC(), we must be shutting down.  Either some
+        // other Capability did the final GC, or we did it above,
+        // either way we can fall through to the SCHED_SHUTTING_DOWN
+        // case now.
+        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
+        // fall through
+
     case SCHED_SHUTTING_DOWN:
        debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
        // If we are a worker, just exit.  If we're a bound thread
@@ -365,59 +361,13 @@ schedule (Capability *initialCapability, Task *task)
        barf("sched_state: %d", sched_state);
     }
 
-#if defined(THREADED_RTS)
-    // If the run queue is empty, take a spark and turn it into a thread.
-    {
-       if (emptyRunQueue(cap)) {
-           StgClosure *spark;
-           spark = findSpark(cap);
-           if (spark != NULL) {
-               debugTrace(DEBUG_sched,
-                          "turning spark of closure %p into a thread",
-                          (StgClosure *)spark);
-               createSparkThread(cap,spark);     
-           }
-       }
-    }
-#endif // THREADED_RTS
+    scheduleFindWork(cap);
 
-    scheduleStartSignalHandlers(cap);
-
-    // Only check the black holes here if we've nothing else to do.
-    // During normal execution, the black hole list only gets checked
-    // at GC time, to avoid repeatedly traversing this possibly long
-    // list each time around the scheduler.
-    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
-    scheduleCheckWakeupThreads(cap);
-
-    scheduleCheckBlockedThreads(cap);
+    /* work pushing, currently relevant only for THREADED_RTS:
+       (pushes threads, wakes up idle capabilities for stealing) */
+    schedulePushWork(cap,task);
 
 #if defined(PARALLEL_HASKELL)
-    /* message processing and work distribution goes here */ 
-
-    /* if messages have been buffered... a NOOP in THREADED_RTS */
-    scheduleSendPendingMessages();
-
-    /* If the run queue is empty,...*/
-    if (emptyRunQueue(cap)) {
-      /* ...take one of our own sparks and turn it into a thread */
-      scheduleActivateSpark(cap);
-
-       /* if this did not work, try to steal a spark from someone else */
-      if (emptyRunQueue(cap)) {
-       receivedFinish = scheduleGetRemoteWork(cap);
-       continue; //  a new round, (hopefully) with new work
-       /* 
-          in GUM, this a) sends out a FISH and returns IF no fish is
-                          out already
-                       b) (blocking) awaits and receives messages
-          
-          in Eden, this is only the blocking receive, as b) in GUM.
-       */
-      }
-    } 
-
     /* since we perform a blocking receive and continue otherwise,
        either we never reach here or we definitely have work! */
     // from here: non-empty run queue
@@ -430,9 +380,10 @@ schedule (Capability *initialCapability, Task *task)
                                above, waits for messages as well! */
       processMessages(cap, &receivedFinish);
     }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL: non-empty run queue!
 
     scheduleDetectDeadlock(cap,task);
+
 #if defined(THREADED_RTS)
     cap = task->cap;    // reload cap, it might have changed
 #endif
@@ -445,12 +396,28 @@ schedule (Capability *initialCapability, Task *task)
     //
     // win32: might be here due to awaitEvent() being abandoned
     // as a result of a console event having been delivered.
-    if ( emptyRunQueue(cap) ) {
+    
+#if defined(THREADED_RTS)
+    if (first) 
+    {
+    // XXX: ToDo
+    //     // don't yield the first time, we want a chance to run this
+    //     // thread for a bit, even if there are others banging at the
+    //     // door.
+    //     first = rtsFalse;
+    //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+    }
+
+  yield:
+    scheduleYield(&cap,task);
+    if (emptyRunQueue(cap)) continue; // look for work again
+#endif
+
 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+    if ( emptyRunQueue(cap) ) {
        ASSERT(sched_state >= SCHED_INTERRUPTING);
-#endif
-       continue; // nothing to do
     }
+#endif
 
     // 
     // Get a thread to run
@@ -493,6 +460,15 @@ schedule (Capability *initialCapability, Task *task)
     }
 #endif
 
+    // If we're shutting down, and this thread has not yet been
+    // killed, kill it now.  This sometimes happens when a finalizer
+    // thread is created by the final GC, or a thread previously
+    // in a foreign call returns.
+    if (sched_state >= SCHED_INTERRUPTING &&
+        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
+        deleteThread(cap,t);
+    }
+
     /* context switches are initiated by the timer signal, unless
      * the user specified "context switch as often as possible", with
      * +RTS -C0
@@ -522,6 +498,7 @@ run_thread:
 
     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
     ASSERT(t->cap == cap);
+    ASSERT(t->bound ? t->bound->cap == cap : 1);
 
     prev_what_next = t->what_next;
 
@@ -607,7 +584,7 @@ run_thread:
        debugTrace(DEBUG_sched,
                   "--<< thread %lu (%s) stopped: blocked",
                   (unsigned long)t->id, whatNext_strs[t->what_next]);
-       continue;
+        goto yield;
     }
 #endif
 
@@ -674,16 +651,117 @@ schedulePreLoop(void)
 }
 
 /* -----------------------------------------------------------------------------
+ * scheduleFindWork()
+ *
+ * Search for work to do, and handle messages from elsewhere.
+ * -------------------------------------------------------------------------- */
+
+static void
+scheduleFindWork (Capability *cap)
+{
+    scheduleStartSignalHandlers(cap);
+
+    // Only check the black holes here if we've nothing else to do.
+    // During normal execution, the black hole list only gets checked
+    // at GC time, to avoid repeatedly traversing this possibly long
+    // list each time around the scheduler.
+    if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+
+    scheduleCheckWakeupThreads(cap);
+
+    scheduleCheckBlockedThreads(cap);
+
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+    if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+#endif
+
+#if defined(PARALLEL_HASKELL)
+    // if messages have been buffered...
+    scheduleSendPendingMessages();
+#endif
+
+#if defined(PARALLEL_HASKELL)
+    if (emptyRunQueue(cap)) {
+       receivedFinish = scheduleGetRemoteWork(cap);
+       continue; //  a new round, (hopefully) with new work
+       /* 
+          in GUM, this a) sends out a FISH and returns IF no fish is
+                          out already
+                       b) (blocking) awaits and receives messages
+          
+          in Eden, this is only the blocking receive, as b) in GUM.
+       */
+    }
+#endif
+}
+
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+shouldYieldCapability (Capability *cap, Task *task)
+{
+    // we need to yield this capability to someone else if..
+    //   - another thread is initiating a GC
+    //   - another Task is returning from a foreign call
+    //   - the thread at the head of the run queue cannot be run
+    //     by this Task (it is bound to another Task, or it is unbound
+    //     and this task it bound).
+    return (waiting_for_gc || 
+            cap->returning_tasks_hd != NULL ||
+            (!emptyRunQueue(cap) && (task->tso == NULL
+                                     ? cap->run_queue_hd->bound != NULL
+                                     : cap->run_queue_hd->bound != task)));
+}
+
+// This is the single place where a Task goes to sleep.  There are
+// two reasons it might need to sleep:
+//    - there are no threads to run
+//    - we need to yield this Capability to someone else 
+//      (see shouldYieldCapability())
+//
+// Careful: the scheduler loop is quite delicate.  Make sure you run
+// the tests in testsuite/concurrent (all ways) after modifying this,
+// and also check the benchmarks in nofib/parallel for regressions.
+
+static void
+scheduleYield (Capability **pcap, Task *task)
+{
+    Capability *cap = *pcap;
+
+    // if we have work, and we don't need to give up the Capability, continue.
+    if (!shouldYieldCapability(cap,task) && 
+        (!emptyRunQueue(cap) ||
+         blackholes_need_checking ||
+         sched_state >= SCHED_INTERRUPTING))
+        return;
+
+    // otherwise yield (sleep), and keep yielding if necessary.
+    do {
+        yieldCapability(&cap,task);
+    } 
+    while (shouldYieldCapability(cap,task));
+
+    // note there may still be no threads on the run queue at this
+    // point, the caller has to check.
+
+    *pcap = cap;
+    return;
+}
+#endif
+    
+/* -----------------------------------------------------------------------------
  * schedulePushWork()
  *
  * Push work to other Capabilities if we have some.
  * -------------------------------------------------------------------------- */
 
-#if defined(THREADED_RTS)
 static void
 schedulePushWork(Capability *cap USED_IF_THREADS, 
                 Task *task      USED_IF_THREADS)
 {
+  /* following code not for PARALLEL_HASKELL. I kept the call general,
+     future GUM versions might use pushing in a distributed setup */
+#if defined(THREADED_RTS)
+
     Capability *free_caps[n_capabilities], *cap0;
     nat i, n_free_caps;
 
@@ -726,7 +804,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
        StgTSO *prev, *t, *next;
        rtsBool pushed_to_all;
 
-       debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
+       debugTrace(DEBUG_sched, 
+                  "cap %d: %s and %d free capabilities, sharing...", 
+                  cap->no, 
+                  (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+                  "excess threads on run queue":"sparks to share (>=2)",
+                  n_free_caps);
 
        i = 0;
        pushed_to_all = rtsFalse;
@@ -760,6 +843,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
            cap->run_queue_tl = prev;
        }
 
+#ifdef SPARK_PUSHING
+       /* JB I left this code in place, it would work but is not necessary */
+
        // If there are some free capabilities that we didn't push any
        // threads to, then try to push a spark to each one.
        if (!pushed_to_all) {
@@ -767,7 +853,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
            // i is the next free capability to push to
            for (; i < n_free_caps; i++) {
                if (emptySparkPoolCap(free_caps[i])) {
-                   spark = findSpark(cap);
+                   spark = tryStealSpark(cap->sparks);
                    if (spark != NULL) {
                        debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
                        newSpark(&(free_caps[i]->r), spark);
@@ -775,16 +861,19 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
                }
            }
        }
+#endif /* SPARK_PUSHING */
 
        // release the capabilities
        for (i = 0; i < n_free_caps; i++) {
            task->cap = free_caps[i];
-           releaseCapability(free_caps[i]);
+           releaseAndWakeupCapability(free_caps[i]);
        }
     }
     task->cap = cap; // reset to point to our Capability.
+
+#endif /* THREADED_RTS */
+
 }
-#endif
 
 /* ----------------------------------------------------------------------------
  * Start any pending signal handlers
@@ -862,8 +951,13 @@ scheduleCheckBlackHoles (Capability *cap)
     {
        ACQUIRE_LOCK(&sched_mutex);
        if ( blackholes_need_checking ) {
-           checkBlackHoles(cap);
            blackholes_need_checking = rtsFalse;
+            // important that we reset the flag *before* checking the
+            // blackhole queue, otherwise we could get deadlock.  This
+            // happens as follows: we wake up a thread that
+            // immediately runs on another Capability, blocks on a
+            // blackhole, and then we reset the blackholes_need_checking flag.
+           checkBlackHoles(cap);
        }
        RELEASE_LOCK(&sched_mutex);
     }
@@ -907,12 +1001,11 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
-       cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
+       cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
+        // when force_major == rtsTrue. scheduleDoGC sets
+        // recent_activity to ACTIVITY_DONE_GC and turns off the timer
+        // signal.
 
-       recent_activity = ACTIVITY_DONE_GC;
-        // disable timer signals (see #1623)
-        stopTimer();
-       
        if ( !emptyRunQueue(cap) ) return;
 
 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
@@ -965,7 +1058,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
  * ------------------------------------------------------------------------- */
 
 #if defined(PARALLEL_HASKELL)
-static StgTSO *
+static void
 scheduleSendPendingMessages(void)
 {
 
@@ -984,43 +1077,28 @@ scheduleSendPendingMessages(void)
 #endif
 
 /* ----------------------------------------------------------------------------
- * Activate spark threads (PARALLEL_HASKELL only)
+ * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
  * ------------------------------------------------------------------------- */
 
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
 static void
 scheduleActivateSpark(Capability *cap)
 {
-    StgClosure *spark;
-
-/* We only want to stay here if the run queue is empty and we want some
-   work. We try to turn a spark into a thread, and add it to the run
-   queue, from where it will be picked up in the next iteration of the
-   scheduler loop.  
-*/
-    if (!emptyRunQueue(cap)) 
-      /* In the threaded RTS, another task might have pushed a thread
-        on our run queue in the meantime ? But would need a lock.. */
-      return;
-
-    spark = findSpark(cap); // defined in Sparks.c
-
-    if (spark != NULL) {
-      debugTrace(DEBUG_sched,
-                "turning spark of closure %p into a thread",
-                (StgClosure *)spark);
-      createSparkThread(cap,spark); // defined in Sparks.c
+    if (anySparks())
+    {
+        createSparkThread(cap);
+        debugTrace(DEBUG_sched, "creating a spark thread");
     }
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * Get work from a remote node (PARALLEL_HASKELL only)
  * ------------------------------------------------------------------------- */
     
 #if defined(PARALLEL_HASKELL)
-static rtsBool
-scheduleGetRemoteWork(Capability *cap)
+static rtsBool /* return value used in PARALLEL_HASKELL only */
+scheduleGetRemoteWork (Capability *cap STG_UNUSED)
 {
 #if defined(PARALLEL_HASKELL)
   rtsBool receivedFinish = rtsFalse;
@@ -1057,7 +1135,7 @@ scheduleGetRemoteWork(Capability *cap)
 
 #endif /* PARALLEL_HASKELL */
 }
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
 
 /* ----------------------------------------------------------------------------
  * After running a thread...
@@ -1085,7 +1163,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t)
             // ATOMICALLY_FRAME, aborting the (nested)
             // transaction, and saving the stack of any
             // partially-evaluated thunks on the heap.
-            throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
+            throwToSingleThreaded_(cap, t, NULL, rtsTrue);
             
             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
         }
@@ -1404,6 +1482,13 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
     nat i;
 #endif
 
+    if (sched_state == SCHED_SHUTTING_DOWN) {
+        // The final GC has already been done, and the system is
+        // shutting down.  We'll probably deadlock if we try to GC
+        // now.
+        return cap;
+    }
+
 #ifdef THREADED_RTS
     // In order to GC, there must be no threads running Haskell code.
     // Therefore, the GC thread needs to hold *all* the capabilities,
@@ -1483,6 +1568,24 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
        performHeapProfile = rtsFalse;
     }
 
+#ifdef SPARKBALANCE
+    /* JB 
+       Once we are all together... this would be the place to balance all
+       spark pools. No concurrent stealing or adding of new sparks can
+       occur. Should be defined in Sparks.c. */
+    balanceSparkPoolsCaps(n_capabilities, capabilities);
+#endif
+
+    if (force_major)
+    {
+        // We've just done a major GC and we don't need the timer
+        // signal turned on any more (#1623).
+        // NB. do this *before* releasing the Capabilities, to avoid
+        // deadlocks!
+        recent_activity = ACTIVITY_DONE_GC;
+        stopTimer();
+    }
+
 #if defined(THREADED_RTS)
     // release our stash of capabilities.
     for (i = 0; i < n_capabilities; i++) {
@@ -1764,7 +1867,7 @@ suspendThread (StgRegTable *reg)
 
   suspendTask(cap,task);
   cap->in_haskell = rtsFalse;
-  releaseCapability_(cap);
+  releaseCapability_(cap,rtsFalse);
   
   RELEASE_LOCK(&cap->lock);
 
@@ -1922,9 +2025,22 @@ workerStart(Task *task)
     // schedule() runs without a lock.
     cap = schedule(cap,task);
 
-    // On exit from schedule(), we have a Capability.
-    releaseCapability(cap);
+    // On exit from schedule(), we have a Capability, but possibly not
+    // the same one we started with.
+
+    // During shutdown, the requirement is that after all the
+    // Capabilities are shut down, all workers that are shutting down
+    // have finished workerTaskStop().  This is why we hold on to
+    // cap->lock until we've finished workerTaskStop() below.
+    //
+    // There may be workers still involved in foreign calls; those
+    // will just block in waitForReturnCapability() because the
+    // Capability has been shut down.
+    //
+    ACQUIRE_LOCK(&cap->lock);
+    releaseCapability_(cap,rtsFalse);
     workerTaskStop(task);
+    RELEASE_LOCK(&cap->lock);
 }
 #endif
 
@@ -2027,20 +2143,30 @@ exitScheduler(
            shutdownCapability(&capabilities[i], task, wait_foreign);
        }
        boundTaskExiting(task);
-       stopTaskManager();
     }
-#else
-    freeCapability(&MainCapability);
 #endif
 }
 
 void
 freeScheduler( void )
 {
-    freeTaskManager();
-    if (n_capabilities != 1) {
-        stgFree(capabilities);
+    nat still_running;
+
+    ACQUIRE_LOCK(&sched_mutex);
+    still_running = freeTaskManager();
+    // We can only free the Capabilities if there are no Tasks still
+    // running.  We might have a Task about to return from a foreign
+    // call into waitForReturnCapability(), for example (actually,
+    // this should be the *only* thing that a still-running Task can
+    // do at this point, and it will block waiting for the
+    // Capability).
+    if (still_running == 0) {
+        freeCapabilities();
+        if (n_capabilities != 1) {
+            stgFree(capabilities);
+        }
     }
+    RELEASE_LOCK(&sched_mutex);
 #if defined(THREADED_RTS)
     closeMutex(&sched_mutex);
 #endif
@@ -2125,10 +2251,17 @@ threadStackOverflow(Capability *cap, StgTSO *tso)
   }
 
   /* Try to double the current stack size.  If that takes us over the
-   * maximum stack size for this thread, then use the maximum instead.
-   * Finally round up so the TSO ends up as a whole number of blocks.
+   * maximum stack size for this thread, then use the maximum instead
+   * (that is, unless we're already at or over the max size and we
+   * can't raise the StackOverflow exception (see above), in which
+   * case just double the size). Finally round up so the TSO ends up as
+   * a whole number of blocks.
    */
-  new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
+  if (tso->stack_size >= tso->max_stack_size) {
+      new_stack_size = tso->stack_size * 2;
+  } else { 
+      new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
+  }
   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
                                       TSO_STRUCT_SIZE)/sizeof(W_);
   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */