Improvements to shutting down of the runtime
authorSimon Marlow <simonmar@microsoft.com>
Wed, 15 Mar 2006 14:50:41 +0000 (14:50 +0000)
committerSimon Marlow <simonmar@microsoft.com>
Wed, 15 Mar 2006 14:50:41 +0000 (14:50 +0000)
Yet another attempt at shutdown & interruption.  This one appears to
work better; ^C is more responsive in multi threaded / SMP, and I
fixed one case where the runtime wasn't responding to ^C at all.

ghc/rts/Capability.c
ghc/rts/Schedule.c
ghc/rts/Schedule.h
ghc/rts/Timer.c
ghc/rts/posix/Select.c
ghc/rts/posix/Signals.c

index 8c40b63..143eefe 100644 (file)
@@ -44,7 +44,7 @@ STATIC_INLINE rtsBool
 globalWorkToDo (void)
 {
     return blackholes_need_checking
-       || interrupted
+       || sched_state >= SCHED_INTERRUPTING
        ;
 }
 #endif
@@ -286,7 +286,7 @@ releaseCapability_ (Capability* cap)
        // is interrupted, we only create a worker task if there
        // are threads that need to be completed.  If the system is
        // shutting down, we never create a new worker.
-       if (!shutting_down_scheduler) {
+       if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
            IF_DEBUG(scheduler,
                     sched_belch("starting new worker on capability %d", cap->no));
            startWorkerTask(cap, workerStart);
@@ -575,7 +575,7 @@ shutdownCapability (Capability *cap, Task *task)
 {
     nat i;
 
-    ASSERT(interrupted && shutting_down_scheduler);
+    ASSERT(sched_state == SCHED_SHUTTING_DOWN);
 
     task->cap = cap;
 
index b0a8dc6..5760010 100644 (file)
@@ -137,7 +137,7 @@ nat recent_activity = ACTIVITY_YES;
 /* if this flag is set as well, give up execution
  * LOCK: none (changes once, from false->true)
  */
-rtsBool interrupted = rtsFalse;
+rtsBool sched_state = SCHED_RUNNING;
 
 /* Next thread ID to allocate.
  * LOCK: sched_mutex
@@ -227,8 +227,9 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
                                             StgTSO *t );
 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
-                        void (*get_roots)(evac_fn));
+static Capability *scheduleDoGC(Capability *cap, Task *task,
+                               rtsBool force_major, 
+                               void (*get_roots)(evac_fn));
 
 static void unblockThread(Capability *cap, StgTSO *tso);
 static rtsBool checkBlackHoles(Capability *cap);
@@ -240,7 +241,7 @@ static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
                        rtsBool stop_at_atomically, StgPtr stop_here);
 
 static void deleteThread (Capability *cap, StgTSO *tso);
-static void deleteRunQueue (Capability *cap);
+static void deleteAllThreads (Capability *cap);
 
 #ifdef DEBUG
 static void printThreadBlockage(StgTSO *tso);
@@ -394,28 +395,67 @@ schedule (Capability *initialCapability, Task *task)
          stg_exit(EXIT_FAILURE);
     }
 
+    // The interruption / shutdown sequence.
+    // 
+    // In order to cleanly shut down the runtime, we want to:
+    //   * make sure that all main threads return to their callers
+    //     with the state 'Interrupted'.
+    //   * clean up all OS threads assocated with the runtime
+    //   * free all memory etc.
+    //
+    // So the sequence for ^C goes like this:
+    //
+    //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
+    //     arranges for some Capability to wake up
+    //
+    //   * all threads in the system are halted, and the zombies are
+    //     placed on the run queue for cleaning up.  We acquire all
+    //     the capabilities in order to delete the threads, this is
+    //     done by scheduleDoGC() for convenience (because GC already
+    //     needs to acquire all the capabilities).  We can't kill
+    //     threads involved in foreign calls.
+    // 
+    //   * sched_state := SCHED_INTERRUPTED
+    //
+    //   * somebody calls shutdownHaskell(), which calls exitScheduler()
+    //
+    //   * sched_state := SCHED_SHUTTING_DOWN
     //
-    // Test for interruption.  If interrupted==rtsTrue, then either
-    // we received a keyboard interrupt (^C), or the scheduler is
-    // trying to shut down all the tasks (shutting_down_scheduler) in
-    // the threaded RTS.
+    //   * all workers exit when the run queue on their capability
+    //     drains.  All main threads will also exit when their TSO
+    //     reaches the head of the run queue and they can return.
     //
-    if (interrupted) {
-       deleteRunQueue(cap);
+    //   * eventually all Capabilities will shut down, and the RTS can
+    //     exit.
+    //
+    //   * We might be left with threads blocked in foreign calls, 
+    //     we should really attempt to kill these somehow (TODO);
+    
+    switch (sched_state) {
+    case SCHED_RUNNING:
+       break;
+    case SCHED_INTERRUPTING:
+       IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
 #if defined(THREADED_RTS)
        discardSparksCap(cap);
 #endif
-       if (shutting_down_scheduler) {
-           IF_DEBUG(scheduler, sched_belch("shutting down"));
-           // If we are a worker, just exit.  If we're a bound thread
-           // then we will exit below when we've removed our TSO from
-           // the run queue.
-           if (task->tso == NULL && emptyRunQueue(cap)) {
-               return cap;
-           }
-       } else {
-           IF_DEBUG(scheduler, sched_belch("interrupted"));
+       /* scheduleDoGC() deletes all the threads */
+       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
+       break;
+    case SCHED_INTERRUPTED:
+       IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
+       break;
+    case SCHED_SHUTTING_DOWN:
+       IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
+       // If we are a worker, just exit.  If we're a bound thread
+       // then we will exit below when we've removed our TSO from
+       // the run queue.
+       if (task->tso == NULL && emptyRunQueue(cap)) {
+           return cap;
        }
+       break;
+    default:
+       barf("sched_state: %d", sched_state);
     }
 
 #if defined(THREADED_RTS)
@@ -459,7 +499,7 @@ schedule (Capability *initialCapability, Task *task)
     // as a result of a console event having been delivered.
     if ( emptyRunQueue(cap) ) {
 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
-       ASSERT(interrupted);
+       ASSERT(sched_state >= SCHED_INTERRUPTING);
 #endif
        continue; // nothing to do
     }
@@ -684,10 +724,7 @@ run_thread:
 
     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
     if (ready_to_gc) {
-      scheduleDoGC(cap,task,rtsFalse,GetRoots);
-#if defined(THREADED_RTS)
-      cap = task->cap;    // reload cap, it might have changed  
-#endif
+      cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
     }
   } /* end of while() */
 
@@ -924,10 +961,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
        // they are unreachable and will therefore be sent an
        // exception.  Any threads thus released will be immediately
        // runnable.
-       scheduleDoGC( cap, task, rtsTrue/*force  major GC*/, GetRoots );
-#if defined(THREADED_RTS)
-       cap = task->cap;    // reload cap, it might have changed
-#endif
+       cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
 
        recent_activity = ACTIVITY_DONE_GC;
        
@@ -949,7 +983,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
            }
 
            // either we have threads to run, or we were interrupted:
-           ASSERT(!emptyRunQueue(cap) || interrupted);
+           ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
        }
 #endif
 
@@ -1843,7 +1877,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
              if (task->ret) {
                  *(task->ret) = NULL;
              }
-             if (interrupted) {
+             if (sched_state >= SCHED_INTERRUPTING) {
                  task->stat = Interrupted;
              } else {
                  task->stat = Killed;
@@ -1895,7 +1929,7 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
  * Perform a garbage collection if necessary
  * -------------------------------------------------------------------------- */
 
-static void
+static Capability *
 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
              rtsBool force_major, void (*get_roots)(evac_fn))
 {
@@ -1924,7 +1958,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
            IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
            if (cap) yieldCapability(&cap,task);
        } while (waiting_for_gc);
-       return;  // NOTE: task->cap might have changed here
+       return cap;  // NOTE: task->cap might have changed here
     }
 
     for (i=0; i < n_capabilities; i++) {
@@ -1984,6 +2018,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
     
     IF_DEBUG(scheduler, printAllThreads());
 
+    /*
+     * We now have all the capabilities; if we're in an interrupting
+     * state, then we should take the opportunity to delete all the
+     * threads in the system.
+     */
+    if (sched_state >= SCHED_INTERRUPTING) {
+       deleteAllThreads(&capabilities[0]);
+       sched_state = SCHED_INTERRUPTED;
+    }
+
     /* everybody back, start the GC.
      * Could do it in this thread, or signal a condition var
      * to do it in another thread.  Either way, we need to
@@ -2019,6 +2063,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
                  G_EVENTQ(0);
                  G_CURR_THREADQ(0));
 #endif /* GRAN */
+
+    return cap;
 }
 
 /* ---------------------------------------------------------------------------
@@ -2137,22 +2183,34 @@ forkProcess(HsStablePtr *entry
 }
 
 /* ---------------------------------------------------------------------------
- * Delete the threads on the run queue of the current capability.
+ * Delete all the threads in the system
  * ------------------------------------------------------------------------- */
    
 static void
-deleteRunQueue (Capability *cap)
+deleteAllThreads ( Capability *cap )
 {
-    StgTSO *t, *next;
-    for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
-       ASSERT(t->what_next != ThreadRelocated);
-       next = t->link;
-       deleteThread(cap, t);
-    }
-}
+  StgTSO* t, *next;
+  IF_DEBUG(scheduler,sched_belch("deleting all threads"));
+  for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+      if (t->what_next == ThreadRelocated) {
+         next = t->link;
+      } else {
+         next = t->global_link;
+         deleteThread(cap,t);
+      }
+  }      
 
-/* startThread and  insertThread are now in GranSim.c -- HWL */
+  // The run queue now contains a bunch of ThreadKilled threads.  We
+  // must not throw these away: the main thread(s) will be in there
+  // somewhere, and the main scheduler loop has to deal with it.
+  // Also, the run queue is the only thing keeping these threads from
+  // being GC'd, and we don't want the "main thread has been GC'd" panic.
 
+#if !defined(THREADED_RTS)
+  ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+  ASSERT(sleeping_queue == END_TSO_QUEUE);
+#endif
+}
 
 /* -----------------------------------------------------------------------------
    Managing the suspended_ccalling_tasks list.
@@ -2702,7 +2760,7 @@ initScheduler(void)
   all_threads       = END_TSO_QUEUE;
 
   context_switch = 0;
-  interrupted    = 0;
+  sched_state    = SCHED_RUNNING;
 
   RtsFlags.ConcFlags.ctxtSwitchTicks =
       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
@@ -2752,18 +2810,25 @@ initScheduler(void)
 void
 exitScheduler( void )
 {
-    interrupted = rtsTrue;
-    shutting_down_scheduler = rtsTrue;
+    Task *task = NULL;
+
+#if defined(THREADED_RTS)
+    ACQUIRE_LOCK(&sched_mutex);
+    task = newBoundTask();
+    RELEASE_LOCK(&sched_mutex);
+#endif
+
+    // If we haven't killed all the threads yet, do it now.
+    if (sched_state < SCHED_INTERRUPTED) {
+       sched_state = SCHED_INTERRUPTING;
+       scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
+    }
+    sched_state = SCHED_SHUTTING_DOWN;
 
 #if defined(THREADED_RTS)
     { 
-       Task *task;
        nat i;
        
-       ACQUIRE_LOCK(&sched_mutex);
-       task = newBoundTask();
-       RELEASE_LOCK(&sched_mutex);
-
        for (i = 0; i < n_capabilities; i++) {
            shutdownCapability(&capabilities[i], task);
        }
@@ -3273,7 +3338,7 @@ awakenBlockedQueue(Capability *cap, StgTSO *tso)
 void
 interruptStgRts(void)
 {
-    interrupted    = 1;
+    sched_state = SCHED_INTERRUPTING;
     context_switch = 1;
 #if defined(THREADED_RTS)
     prodAllCapabilities();
@@ -3581,6 +3646,11 @@ unblockThread(Capability *cap, StgTSO *tso)
   tso->why_blocked = NotBlocked;
   tso->block_info.closure = NULL;
   appendToRunQueue(cap,tso);
+
+  // We might have just migrated this TSO to our Capability:
+  if (tso->bound) {
+      tso->bound->cap = cap;
+  }
 }
 #endif
 
index 0e2e496..63dfeb7 100644 (file)
@@ -106,15 +106,16 @@ void    initThread(StgTSO *tso, nat stack_size);
  */
 extern int RTS_VAR(context_switch);
 
-/* Interrupted flag.
- * Locks required  : none (makes one transition from false->true)
+/* The state of the scheduler.  This is used to control the sequence
+ * of events during shutdown, and when the runtime is interrupted
+ * using ^C.
  */
-extern rtsBool RTS_VAR(interrupted);
+#define SCHED_RUNNING       0  /* running as normal */
+#define SCHED_INTERRUPTING  1  /* ^C detected, before threads are deleted */
+#define SCHED_INTERRUPTED   2  /* ^C detected, after threads deleted */
+#define SCHED_SHUTTING_DOWN 3  /* final shutdown */
 
-/* Shutdown flag.
- * Locks required  : none (makes one transition from false->true)
- */
-extern rtsBool shutting_down_scheduler;
+extern rtsBool RTS_VAR(sched_state);
 
 /* 
  * flag that tracks whether we have done any execution in this time slice.
index b6414f8..0bfea2d 100644 (file)
@@ -48,41 +48,41 @@ handle_tick(int unused STG_UNUSED)
       if (ticks_to_ctxt_switch <= 0) {
          ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks;
          context_switch = 1;   /* schedule a context switch */
+      }
+  }
 
 #if defined(THREADED_RTS)
-         /* 
-          * If we've been inactive for idleGCDelayTicks (set by +RTS
-          * -I), tell the scheduler to wake up and do a GC, to check
-          * for threads that are deadlocked.
+  /* 
+   * If we've been inactive for idleGCDelayTicks (set by +RTS
+   * -I), tell the scheduler to wake up and do a GC, to check
+   * for threads that are deadlocked.
+   */
+  switch (recent_activity) {
+  case ACTIVITY_YES:
+      recent_activity = ACTIVITY_MAYBE_NO;
+      ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
+      break;
+  case ACTIVITY_MAYBE_NO:
+      if (ticks_to_gc == 0) break; /* 0 ==> no idle GC */
+      ticks_to_gc--;
+      if (ticks_to_gc == 0) {
+         ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
+         recent_activity = ACTIVITY_INACTIVE;
+         blackholes_need_checking = rtsTrue;
+         /* hack: re-use the blackholes_need_checking flag */
+         
+         /* ToDo: this doesn't work.  Can't invoke
+          * pthread_cond_signal from a signal handler.
+          * Furthermore, we can't prod a capability that we
+          * might be holding.  What can we do?
           */
-         switch (recent_activity) {
-         case ACTIVITY_YES:
-             recent_activity = ACTIVITY_MAYBE_NO;
-             ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
-             break;
-         case ACTIVITY_MAYBE_NO:
-             if (ticks_to_gc == 0) break; /* 0 ==> no idle GC */
-             ticks_to_gc--;
-             if (ticks_to_gc == 0) {
-                 ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks;
-                 recent_activity = ACTIVITY_INACTIVE;
-                 blackholes_need_checking = rtsTrue;
-                 /* hack: re-use the blackholes_need_checking flag */
-
-                 /* ToDo: this doesn't work.  Can't invoke
-                  * pthread_cond_signal from a signal handler.
-                  * Furthermore, we can't prod a capability that we
-                  * might be holding.  What can we do?
-                  */
-                 prodOneCapability();
-             }
-             break;
-         default:
-             break;
-         }
-#endif
+         prodOneCapability();
       }
+      break;
+  default:
+      break;
   }
+#endif
 }
 
 int
index effc7c4..e21ced0 100644 (file)
@@ -215,7 +215,7 @@ awaitEvent(rtsBool wait)
 
          /* we were interrupted, return to the scheduler immediately.
           */
-         if (interrupted) {
+         if (sched_state >= SCHED_INTERRUPTING) {
              return; /* still hold the lock */
          }
          
@@ -272,7 +272,8 @@ awaitEvent(rtsBool wait)
          }
       }
       
-    } while (wait && !interrupted && emptyRunQueue(&MainCapability));
+    } while (wait && sched_state == SCHED_RUNNING
+            && emptyRunQueue(&MainCapability));
 }
 
 #endif /* THREADED_RTS */
index b4cc2fd..5f5f77f 100644 (file)
@@ -253,7 +253,7 @@ anyUserHandlers(void)
 void
 awaitUserSignals(void)
 {
-    while (!signals_pending() && !interrupted) {
+    while (!signals_pending() && sched_state == SCHED_RUNNING) {
        pause();
     }
 }
@@ -432,7 +432,7 @@ shutdown_handler(int sig STG_UNUSED)
     // If we're already trying to interrupt the RTS, terminate with
     // extreme prejudice.  So the first ^C tries to exit the program
     // cleanly, and the second one just kills it.
-    if (interrupted) {
+    if (sched_state >= SCHED_INTERRUPTING) {
        stg_exit(EXIT_INTERRUPTED);
     } else {
        interruptStgRts();