From 5638488ba28ec84fbf64bf2742a040e3fa30bed4 Mon Sep 17 00:00:00 2001 From: Simon Marlow Date: Wed, 15 Mar 2006 14:50:41 +0000 Subject: [PATCH] Improvements to shutting down of the runtime Yet another attempt at shutdown & interruption. This one appears to work better; ^C is more responsive in multi threaded / SMP, and I fixed one case where the runtime wasn't responding to ^C at all. --- ghc/rts/Capability.c | 6 +- ghc/rts/Schedule.c | 174 +++++++++++++++++++++++++++++++++-------------- ghc/rts/Schedule.h | 15 ++-- ghc/rts/Timer.c | 60 ++++++++-------- ghc/rts/posix/Select.c | 5 +- ghc/rts/posix/Signals.c | 4 +- 6 files changed, 168 insertions(+), 96 deletions(-) diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 8c40b63..143eefe 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -44,7 +44,7 @@ STATIC_INLINE rtsBool globalWorkToDo (void) { return blackholes_need_checking - || interrupted + || sched_state >= SCHED_INTERRUPTING ; } #endif @@ -286,7 +286,7 @@ releaseCapability_ (Capability* cap) // is interrupted, we only create a worker task if there // are threads that need to be completed. If the system is // shutting down, we never create a new worker. - if (!shutting_down_scheduler) { + if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { IF_DEBUG(scheduler, sched_belch("starting new worker on capability %d", cap->no)); startWorkerTask(cap, workerStart); @@ -575,7 +575,7 @@ shutdownCapability (Capability *cap, Task *task) { nat i; - ASSERT(interrupted && shutting_down_scheduler); + ASSERT(sched_state == SCHED_SHUTTING_DOWN); task->cap = cap; diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index b0a8dc6..5760010 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -137,7 +137,7 @@ nat recent_activity = ACTIVITY_YES; /* if this flag is set as well, give up execution * LOCK: none (changes once, from false->true) */ -rtsBool interrupted = rtsFalse; +rtsBool sched_state = SCHED_RUNNING; /* Next thread ID to allocate. * LOCK: sched_mutex @@ -227,8 +227,9 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, StgTSO *t ); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); -static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major, - void (*get_roots)(evac_fn)); +static Capability *scheduleDoGC(Capability *cap, Task *task, + rtsBool force_major, + void (*get_roots)(evac_fn)); static void unblockThread(Capability *cap, StgTSO *tso); static rtsBool checkBlackHoles(Capability *cap); @@ -240,7 +241,7 @@ static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically, StgPtr stop_here); static void deleteThread (Capability *cap, StgTSO *tso); -static void deleteRunQueue (Capability *cap); +static void deleteAllThreads (Capability *cap); #ifdef DEBUG static void printThreadBlockage(StgTSO *tso); @@ -394,28 +395,67 @@ schedule (Capability *initialCapability, Task *task) stg_exit(EXIT_FAILURE); } + // The interruption / shutdown sequence. + // + // In order to cleanly shut down the runtime, we want to: + // * make sure that all main threads return to their callers + // with the state 'Interrupted'. + // * clean up all OS threads assocated with the runtime + // * free all memory etc. + // + // So the sequence for ^C goes like this: + // + // * ^C handler sets sched_state := SCHED_INTERRUPTING and + // arranges for some Capability to wake up + // + // * all threads in the system are halted, and the zombies are + // placed on the run queue for cleaning up. We acquire all + // the capabilities in order to delete the threads, this is + // done by scheduleDoGC() for convenience (because GC already + // needs to acquire all the capabilities). We can't kill + // threads involved in foreign calls. + // + // * sched_state := SCHED_INTERRUPTED + // + // * somebody calls shutdownHaskell(), which calls exitScheduler() + // + // * sched_state := SCHED_SHUTTING_DOWN // - // Test for interruption. If interrupted==rtsTrue, then either - // we received a keyboard interrupt (^C), or the scheduler is - // trying to shut down all the tasks (shutting_down_scheduler) in - // the threaded RTS. + // * all workers exit when the run queue on their capability + // drains. All main threads will also exit when their TSO + // reaches the head of the run queue and they can return. // - if (interrupted) { - deleteRunQueue(cap); + // * eventually all Capabilities will shut down, and the RTS can + // exit. + // + // * We might be left with threads blocked in foreign calls, + // we should really attempt to kill these somehow (TODO); + + switch (sched_state) { + case SCHED_RUNNING: + break; + case SCHED_INTERRUPTING: + IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING")); #if defined(THREADED_RTS) discardSparksCap(cap); #endif - if (shutting_down_scheduler) { - IF_DEBUG(scheduler, sched_belch("shutting down")); - // If we are a worker, just exit. If we're a bound thread - // then we will exit below when we've removed our TSO from - // the run queue. - if (task->tso == NULL && emptyRunQueue(cap)) { - return cap; - } - } else { - IF_DEBUG(scheduler, sched_belch("interrupted")); + /* scheduleDoGC() deletes all the threads */ + cap = scheduleDoGC(cap,task,rtsFalse,GetRoots); + break; + case SCHED_INTERRUPTED: + IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED")); + break; + case SCHED_SHUTTING_DOWN: + IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN")); + // If we are a worker, just exit. If we're a bound thread + // then we will exit below when we've removed our TSO from + // the run queue. + if (task->tso == NULL && emptyRunQueue(cap)) { + return cap; } + break; + default: + barf("sched_state: %d", sched_state); } #if defined(THREADED_RTS) @@ -459,7 +499,7 @@ schedule (Capability *initialCapability, Task *task) // as a result of a console event having been delivered. if ( emptyRunQueue(cap) ) { #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) - ASSERT(interrupted); + ASSERT(sched_state >= SCHED_INTERRUPTING); #endif continue; // nothing to do } @@ -684,10 +724,7 @@ run_thread: if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } if (ready_to_gc) { - scheduleDoGC(cap,task,rtsFalse,GetRoots); -#if defined(THREADED_RTS) - cap = task->cap; // reload cap, it might have changed -#endif + cap = scheduleDoGC(cap,task,rtsFalse,GetRoots); } } /* end of while() */ @@ -924,10 +961,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - scheduleDoGC( cap, task, rtsTrue/*force major GC*/, GetRoots ); -#if defined(THREADED_RTS) - cap = task->cap; // reload cap, it might have changed -#endif + cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots); recent_activity = ACTIVITY_DONE_GC; @@ -949,7 +983,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) } // either we have threads to run, or we were interrupted: - ASSERT(!emptyRunQueue(cap) || interrupted); + ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING); } #endif @@ -1843,7 +1877,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) if (task->ret) { *(task->ret) = NULL; } - if (interrupted) { + if (sched_state >= SCHED_INTERRUPTING) { task->stat = Interrupted; } else { task->stat = Killed; @@ -1895,7 +1929,7 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) * Perform a garbage collection if necessary * -------------------------------------------------------------------------- */ -static void +static Capability * scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major, void (*get_roots)(evac_fn)) { @@ -1924,7 +1958,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, IF_DEBUG(scheduler, sched_belch("someone else is trying to GC...")); if (cap) yieldCapability(&cap,task); } while (waiting_for_gc); - return; // NOTE: task->cap might have changed here + return cap; // NOTE: task->cap might have changed here } for (i=0; i < n_capabilities; i++) { @@ -1984,6 +2018,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, IF_DEBUG(scheduler, printAllThreads()); + /* + * We now have all the capabilities; if we're in an interrupting + * state, then we should take the opportunity to delete all the + * threads in the system. + */ + if (sched_state >= SCHED_INTERRUPTING) { + deleteAllThreads(&capabilities[0]); + sched_state = SCHED_INTERRUPTED; + } + /* everybody back, start the GC. * Could do it in this thread, or signal a condition var * to do it in another thread. Either way, we need to @@ -2019,6 +2063,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, G_EVENTQ(0); G_CURR_THREADQ(0)); #endif /* GRAN */ + + return cap; } /* --------------------------------------------------------------------------- @@ -2137,22 +2183,34 @@ forkProcess(HsStablePtr *entry } /* --------------------------------------------------------------------------- - * Delete the threads on the run queue of the current capability. + * Delete all the threads in the system * ------------------------------------------------------------------------- */ static void -deleteRunQueue (Capability *cap) +deleteAllThreads ( Capability *cap ) { - StgTSO *t, *next; - for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) { - ASSERT(t->what_next != ThreadRelocated); - next = t->link; - deleteThread(cap, t); - } -} + StgTSO* t, *next; + IF_DEBUG(scheduler,sched_belch("deleting all threads")); + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + deleteThread(cap,t); + } + } -/* startThread and insertThread are now in GranSim.c -- HWL */ + // The run queue now contains a bunch of ThreadKilled threads. We + // must not throw these away: the main thread(s) will be in there + // somewhere, and the main scheduler loop has to deal with it. + // Also, the run queue is the only thing keeping these threads from + // being GC'd, and we don't want the "main thread has been GC'd" panic. +#if !defined(THREADED_RTS) + ASSERT(blocked_queue_hd == END_TSO_QUEUE); + ASSERT(sleeping_queue == END_TSO_QUEUE); +#endif +} /* ----------------------------------------------------------------------------- Managing the suspended_ccalling_tasks list. @@ -2702,7 +2760,7 @@ initScheduler(void) all_threads = END_TSO_QUEUE; context_switch = 0; - interrupted = 0; + sched_state = SCHED_RUNNING; RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; @@ -2752,18 +2810,25 @@ initScheduler(void) void exitScheduler( void ) { - interrupted = rtsTrue; - shutting_down_scheduler = rtsTrue; + Task *task = NULL; + +#if defined(THREADED_RTS) + ACQUIRE_LOCK(&sched_mutex); + task = newBoundTask(); + RELEASE_LOCK(&sched_mutex); +#endif + + // If we haven't killed all the threads yet, do it now. + if (sched_state < SCHED_INTERRUPTED) { + sched_state = SCHED_INTERRUPTING; + scheduleDoGC(NULL,task,rtsFalse,GetRoots); + } + sched_state = SCHED_SHUTTING_DOWN; #if defined(THREADED_RTS) { - Task *task; nat i; - ACQUIRE_LOCK(&sched_mutex); - task = newBoundTask(); - RELEASE_LOCK(&sched_mutex); - for (i = 0; i < n_capabilities; i++) { shutdownCapability(&capabilities[i], task); } @@ -3273,7 +3338,7 @@ awakenBlockedQueue(Capability *cap, StgTSO *tso) void interruptStgRts(void) { - interrupted = 1; + sched_state = SCHED_INTERRUPTING; context_switch = 1; #if defined(THREADED_RTS) prodAllCapabilities(); @@ -3581,6 +3646,11 @@ unblockThread(Capability *cap, StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; appendToRunQueue(cap,tso); + + // We might have just migrated this TSO to our Capability: + if (tso->bound) { + tso->bound->cap = cap; + } } #endif diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index 0e2e496..63dfeb7 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -106,15 +106,16 @@ void initThread(StgTSO *tso, nat stack_size); */ extern int RTS_VAR(context_switch); -/* Interrupted flag. - * Locks required : none (makes one transition from false->true) +/* The state of the scheduler. This is used to control the sequence + * of events during shutdown, and when the runtime is interrupted + * using ^C. */ -extern rtsBool RTS_VAR(interrupted); +#define SCHED_RUNNING 0 /* running as normal */ +#define SCHED_INTERRUPTING 1 /* ^C detected, before threads are deleted */ +#define SCHED_INTERRUPTED 2 /* ^C detected, after threads deleted */ +#define SCHED_SHUTTING_DOWN 3 /* final shutdown */ -/* Shutdown flag. - * Locks required : none (makes one transition from false->true) - */ -extern rtsBool shutting_down_scheduler; +extern rtsBool RTS_VAR(sched_state); /* * flag that tracks whether we have done any execution in this time slice. diff --git a/ghc/rts/Timer.c b/ghc/rts/Timer.c index b6414f8..0bfea2d 100644 --- a/ghc/rts/Timer.c +++ b/ghc/rts/Timer.c @@ -48,41 +48,41 @@ handle_tick(int unused STG_UNUSED) if (ticks_to_ctxt_switch <= 0) { ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks; context_switch = 1; /* schedule a context switch */ + } + } #if defined(THREADED_RTS) - /* - * If we've been inactive for idleGCDelayTicks (set by +RTS - * -I), tell the scheduler to wake up and do a GC, to check - * for threads that are deadlocked. + /* + * If we've been inactive for idleGCDelayTicks (set by +RTS + * -I), tell the scheduler to wake up and do a GC, to check + * for threads that are deadlocked. + */ + switch (recent_activity) { + case ACTIVITY_YES: + recent_activity = ACTIVITY_MAYBE_NO; + ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks; + break; + case ACTIVITY_MAYBE_NO: + if (ticks_to_gc == 0) break; /* 0 ==> no idle GC */ + ticks_to_gc--; + if (ticks_to_gc == 0) { + ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks; + recent_activity = ACTIVITY_INACTIVE; + blackholes_need_checking = rtsTrue; + /* hack: re-use the blackholes_need_checking flag */ + + /* ToDo: this doesn't work. Can't invoke + * pthread_cond_signal from a signal handler. + * Furthermore, we can't prod a capability that we + * might be holding. What can we do? */ - switch (recent_activity) { - case ACTIVITY_YES: - recent_activity = ACTIVITY_MAYBE_NO; - ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks; - break; - case ACTIVITY_MAYBE_NO: - if (ticks_to_gc == 0) break; /* 0 ==> no idle GC */ - ticks_to_gc--; - if (ticks_to_gc == 0) { - ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTicks; - recent_activity = ACTIVITY_INACTIVE; - blackholes_need_checking = rtsTrue; - /* hack: re-use the blackholes_need_checking flag */ - - /* ToDo: this doesn't work. Can't invoke - * pthread_cond_signal from a signal handler. - * Furthermore, we can't prod a capability that we - * might be holding. What can we do? - */ - prodOneCapability(); - } - break; - default: - break; - } -#endif + prodOneCapability(); } + break; + default: + break; } +#endif } int diff --git a/ghc/rts/posix/Select.c b/ghc/rts/posix/Select.c index effc7c4..e21ced0 100644 --- a/ghc/rts/posix/Select.c +++ b/ghc/rts/posix/Select.c @@ -215,7 +215,7 @@ awaitEvent(rtsBool wait) /* we were interrupted, return to the scheduler immediately. */ - if (interrupted) { + if (sched_state >= SCHED_INTERRUPTING) { return; /* still hold the lock */ } @@ -272,7 +272,8 @@ awaitEvent(rtsBool wait) } } - } while (wait && !interrupted && emptyRunQueue(&MainCapability)); + } while (wait && sched_state == SCHED_RUNNING + && emptyRunQueue(&MainCapability)); } #endif /* THREADED_RTS */ diff --git a/ghc/rts/posix/Signals.c b/ghc/rts/posix/Signals.c index b4cc2fd..5f5f77f 100644 --- a/ghc/rts/posix/Signals.c +++ b/ghc/rts/posix/Signals.c @@ -253,7 +253,7 @@ anyUserHandlers(void) void awaitUserSignals(void) { - while (!signals_pending() && !interrupted) { + while (!signals_pending() && sched_state == SCHED_RUNNING) { pause(); } } @@ -432,7 +432,7 @@ shutdown_handler(int sig STG_UNUSED) // If we're already trying to interrupt the RTS, terminate with // extreme prejudice. So the first ^C tries to exit the program // cleanly, and the second one just kills it. - if (interrupted) { + if (sched_state >= SCHED_INTERRUPTING) { stg_exit(EXIT_INTERRUPTED); } else { interruptStgRts(); -- 1.7.10.4