/* if this flag is set as well, give up execution
* LOCK: none (changes once, from false->true)
*/
-rtsBool interrupted = rtsFalse;
+rtsBool sched_state = SCHED_RUNNING;
/* Next thread ID to allocate.
* LOCK: sched_mutex
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major,
- void (*get_roots)(evac_fn));
+static Capability *scheduleDoGC(Capability *cap, Task *task,
+ rtsBool force_major,
+ void (*get_roots)(evac_fn));
static void unblockThread(Capability *cap, StgTSO *tso);
static rtsBool checkBlackHoles(Capability *cap);
rtsBool stop_at_atomically, StgPtr stop_here);
static void deleteThread (Capability *cap, StgTSO *tso);
-static void deleteRunQueue (Capability *cap);
+static void deleteAllThreads (Capability *cap);
#ifdef DEBUG
static void printThreadBlockage(StgTSO *tso);
stg_exit(EXIT_FAILURE);
}
+ // The interruption / shutdown sequence.
+ //
+ // In order to cleanly shut down the runtime, we want to:
+ // * make sure that all main threads return to their callers
+ // with the state 'Interrupted'.
+ // * clean up all OS threads assocated with the runtime
+ // * free all memory etc.
+ //
+ // So the sequence for ^C goes like this:
+ //
+ // * ^C handler sets sched_state := SCHED_INTERRUPTING and
+ // arranges for some Capability to wake up
+ //
+ // * all threads in the system are halted, and the zombies are
+ // placed on the run queue for cleaning up. We acquire all
+ // the capabilities in order to delete the threads, this is
+ // done by scheduleDoGC() for convenience (because GC already
+ // needs to acquire all the capabilities). We can't kill
+ // threads involved in foreign calls.
+ //
+ // * sched_state := SCHED_INTERRUPTED
//
- // Test for interruption. If interrupted==rtsTrue, then either
- // we received a keyboard interrupt (^C), or the scheduler is
- // trying to shut down all the tasks (shutting_down_scheduler) in
- // the threaded RTS.
+ // * somebody calls shutdownHaskell(), which calls exitScheduler()
//
- if (interrupted) {
- deleteRunQueue(cap);
+ // * sched_state := SCHED_SHUTTING_DOWN
+ //
+ // * all workers exit when the run queue on their capability
+ // drains. All main threads will also exit when their TSO
+ // reaches the head of the run queue and they can return.
+ //
+ // * eventually all Capabilities will shut down, and the RTS can
+ // exit.
+ //
+ // * We might be left with threads blocked in foreign calls,
+ // we should really attempt to kill these somehow (TODO);
+
+ switch (sched_state) {
+ case SCHED_RUNNING:
+ break;
+ case SCHED_INTERRUPTING:
+ IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
#if defined(THREADED_RTS)
discardSparksCap(cap);
#endif
- if (shutting_down_scheduler) {
- IF_DEBUG(scheduler, sched_belch("shutting down"));
- // If we are a worker, just exit. If we're a bound thread
- // then we will exit below when we've removed our TSO from
- // the run queue.
- if (task->tso == NULL && emptyRunQueue(cap)) {
- return cap;
- }
- } else {
- IF_DEBUG(scheduler, sched_belch("interrupted"));
+ /* scheduleDoGC() deletes all the threads */
+ cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
+ break;
+ case SCHED_INTERRUPTED:
+ IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
+ break;
+ case SCHED_SHUTTING_DOWN:
+ IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
+ // If we are a worker, just exit. If we're a bound thread
+ // then we will exit below when we've removed our TSO from
+ // the run queue.
+ if (task->tso == NULL && emptyRunQueue(cap)) {
+ return cap;
}
+ break;
+ default:
+ barf("sched_state: %d", sched_state);
}
#if defined(THREADED_RTS)
// as a result of a console event having been delivered.
if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
- ASSERT(interrupted);
+ ASSERT(sched_state >= SCHED_INTERRUPTING);
#endif
continue; // nothing to do
}
// ----------------------------------------------------------------------
// Run the current thread
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+
prev_what_next = t->what_next;
errno = t->saved_errno;
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
if (ready_to_gc) {
- scheduleDoGC(cap,task,rtsFalse,GetRoots);
-#if defined(THREADED_RTS)
- cap = task->cap; // reload cap, it might have changed
-#endif
+ cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
}
} /* end of while() */
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- scheduleDoGC( cap, task, rtsTrue/*force major GC*/, GetRoots );
-#if defined(THREADED_RTS)
- cap = task->cap; // reload cap, it might have changed
-#endif
+ cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
recent_activity = ACTIVITY_DONE_GC;
}
// either we have threads to run, or we were interrupted:
- ASSERT(!emptyRunQueue(cap) || interrupted);
+ ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
}
#endif
if (task->ret) {
*(task->ret) = NULL;
}
- if (interrupted) {
+ if (sched_state >= SCHED_INTERRUPTING) {
task->stat = Interrupted;
} else {
task->stat = Killed;
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
-static void
+static Capability *
scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
rtsBool force_major, void (*get_roots)(evac_fn))
{
IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
if (cap) yieldCapability(&cap,task);
} while (waiting_for_gc);
- return; // NOTE: task->cap might have changed here
+ return cap; // NOTE: task->cap might have changed here
}
for (i=0; i < n_capabilities; i++) {
IF_DEBUG(scheduler, printAllThreads());
+ /*
+ * We now have all the capabilities; if we're in an interrupting
+ * state, then we should take the opportunity to delete all the
+ * threads in the system.
+ */
+ if (sched_state >= SCHED_INTERRUPTING) {
+ deleteAllThreads(&capabilities[0]);
+ sched_state = SCHED_INTERRUPTED;
+ }
+
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
G_EVENTQ(0);
G_CURR_THREADQ(0));
#endif /* GRAN */
+
+ return cap;
}
/* ---------------------------------------------------------------------------
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(Capability *cap, StgTSO *tso);
+deleteThread_(Capability *cap, StgTSO *tso);
#endif
StgInt
forkProcess(HsStablePtr *entry
} else { // child
- // delete all threads
- cap->run_queue_hd = END_TSO_QUEUE;
- cap->run_queue_tl = END_TSO_QUEUE;
-
+ // Now, all OS threads except the thread that forked are
+ // stopped. We need to stop all Haskell threads, including
+ // those involved in foreign calls. Also we need to delete
+ // all Tasks, because they correspond to OS threads that are
+ // now gone.
+
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->link;
-
- // don't allow threads to catch the ThreadKilled exception
- deleteThreadImmediately(cap,t);
+ next = t->global_link;
+ // don't allow threads to catch the ThreadKilled
+ // exception, but we do want to raiseAsync() because these
+ // threads may be evaluating thunks that we need later.
+ deleteThread_(cap,t);
}
- // wipe the task list
+ // Empty the run queue. It seems tempting to let all the
+ // killed threads stay on the run queue as zombies to be
+ // cleaned up later, but some of them correspond to bound
+ // threads for which the corresponding Task does not exist.
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+ // Any suspended C-calling Tasks are no more, their OS threads
+ // don't exist now:
+ cap->suspended_ccalling_tasks = NULL;
+
+ // Empty the all_threads list. Otherwise, the garbage
+ // collector may attempt to resurrect some of these threads.
+ all_threads = END_TSO_QUEUE;
+
+ // Wipe the task list, except the current Task.
ACQUIRE_LOCK(&sched_mutex);
for (task = all_tasks; task != NULL; task=task->all_link) {
- if (task != cap->running_task) discardTask(task);
+ if (task != cap->running_task) {
+ discardTask(task);
+ }
}
RELEASE_LOCK(&sched_mutex);
- cap->suspended_ccalling_tasks = NULL;
-
#if defined(THREADED_RTS)
- // wipe our spare workers list.
+ // Wipe our spare workers list, they no longer exist. New
+ // workers will be created if necessary.
cap->spare_workers = NULL;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
}
/* ---------------------------------------------------------------------------
- * Delete the threads on the run queue of the current capability.
+ * Delete all the threads in the system
* ------------------------------------------------------------------------- */
static void
-deleteRunQueue (Capability *cap)
+deleteAllThreads ( Capability *cap )
{
- StgTSO *t, *next;
- for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
- ASSERT(t->what_next != ThreadRelocated);
- next = t->link;
- deleteThread(cap, t);
- }
-}
+ StgTSO* t, *next;
+ IF_DEBUG(scheduler,sched_belch("deleting all threads"));
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ deleteThread(cap,t);
+ }
+ }
-/* startThread and insertThread are now in GranSim.c -- HWL */
+ // The run queue now contains a bunch of ThreadKilled threads. We
+ // must not throw these away: the main thread(s) will be in there
+ // somewhere, and the main scheduler loop has to deal with it.
+ // Also, the run queue is the only thing keeping these threads from
+ // being GC'd, and we don't want the "main thread has been GC'd" panic.
+#if !defined(THREADED_RTS)
+ ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+ ASSERT(sleeping_queue == END_TSO_QUEUE);
+#endif
+}
/* -----------------------------------------------------------------------------
Managing the suspended_ccalling_tasks list.
/* We might have GC'd, mark the TSO dirty again */
dirtyTSO(tso);
+ IF_DEBUG(sanity, checkTSO(tso));
+
return &cap->r;
}
all_threads = END_TSO_QUEUE;
context_switch = 0;
- interrupted = 0;
+ sched_state = SCHED_RUNNING;
RtsFlags.ConcFlags.ctxtSwitchTicks =
RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
void
exitScheduler( void )
{
- interrupted = rtsTrue;
- shutting_down_scheduler = rtsTrue;
+ Task *task = NULL;
+
+#if defined(THREADED_RTS)
+ ACQUIRE_LOCK(&sched_mutex);
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
+#endif
+
+ // If we haven't killed all the threads yet, do it now.
+ if (sched_state < SCHED_INTERRUPTED) {
+ sched_state = SCHED_INTERRUPTING;
+ scheduleDoGC(NULL,task,rtsFalse,GetRoots);
+ }
+ sched_state = SCHED_SHUTTING_DOWN;
#if defined(THREADED_RTS)
{
- Task *task;
nat i;
- ACQUIRE_LOCK(&sched_mutex);
- task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
-
for (i = 0; i < n_capabilities; i++) {
shutdownCapability(&capabilities[i], task);
}
for (task = cap->suspended_ccalling_tasks; task != NULL;
task=task->next) {
+ IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
evac((StgClosure **)&task->suspended_tso);
}
}
void
interruptStgRts(void)
{
- interrupted = 1;
+ sched_state = SCHED_INTERRUPTING;
context_switch = 1;
#if defined(THREADED_RTS)
prodAllCapabilities();
tso->why_blocked = NotBlocked;
tso->block_info.closure = NULL;
appendToRunQueue(cap,tso);
+
+ // We might have just migrated this TSO to our Capability:
+ if (tso->bound) {
+ tso->bound->cap = cap;
+ }
}
#endif
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(Capability *cap, StgTSO *tso)
+deleteThread_(Capability *cap, StgTSO *tso)
{ // for forkProcess only:
- // delete thread without giving it a chance to catch the KillThread exception
-
- if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
- }
-
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unblockThread(cap,tso);
- }
+ // like deleteThread(), but we delete threads in foreign calls, too.
- tso->what_next = ThreadKilled;
+ if (tso->why_blocked == BlockedOnCCall ||
+ tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
+ unblockOne(cap,tso);
+ tso->what_next = ThreadKilled;
+ } else {
+ deleteThread(cap,tso);
+ }
}
#endif
va_list ap;
va_start(ap,s);
#ifdef THREADED_RTS
- debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
+ debugBelch("sched (task %p, pid %d): ", (void *)(unsigned long)(unsigned int)osThreadId(), getpid());
#elif defined(PARALLEL_HASKELL)
debugBelch("== ");
#else