#include "Capability.h"
#include "Task.h"
#include "AwaitEvent.h"
+#if defined(mingw32_HOST_OS)
+#include "win32/IOManager.h"
+#endif
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
# define STATIC_INLINE static
#endif
-#ifdef THREADED_RTS
-#define USED_WHEN_THREADED_RTS
-#define USED_WHEN_NON_THREADED_RTS STG_UNUSED
-#else
-#define USED_WHEN_THREADED_RTS STG_UNUSED
-#define USED_WHEN_NON_THREADED_RTS
-#endif
-
-#ifdef SMP
-#define USED_WHEN_SMP
-#else
-#define USED_WHEN_SMP STG_UNUSED
-#endif
-
/* -----------------------------------------------------------------------------
* Global variables
* -------------------------------------------------------------------------- */
/* if this flag is set as well, give up execution
* LOCK: none (changes once, from false->true)
*/
-rtsBool interrupted = rtsFalse;
+rtsBool sched_state = SCHED_RUNNING;
/* Next thread ID to allocate.
* LOCK: sched_mutex
/*
* This mutex protects most of the global scheduler data in
- * the THREADED_RTS and (inc. SMP) runtime.
+ * the THREADED_RTS runtime.
*/
#if defined(THREADED_RTS)
-Mutex sched_mutex = INIT_MUTEX_VAR;
+Mutex sched_mutex;
#endif
#if defined(PARALLEL_HASKELL)
// scheduler clearer.
//
static void schedulePreLoop (void);
-static void scheduleStartSignalHandlers (void);
+#if defined(THREADED_RTS)
+static void schedulePushWork(Capability *cap, Task *task);
+#endif
+static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
-static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
+static Capability *scheduleDoGC(Capability *cap, Task *task,
+ rtsBool force_major,
+ void (*get_roots)(evac_fn));
static void unblockThread(Capability *cap, StgTSO *tso);
static rtsBool checkBlackHoles(Capability *cap);
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically);
+ rtsBool stop_at_atomically, StgPtr stop_here);
static void deleteThread (Capability *cap, StgTSO *tso);
-static void deleteRunQueue (Capability *cap);
+static void deleteAllThreads (Capability *cap);
#ifdef DEBUG
static void printThreadBlockage(StgTSO *tso);
#endif
nat prev_what_next;
rtsBool ready_to_gc;
+#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
+#endif
cap = initialCapability;
// thread for a bit, even if there are others banging at the
// door.
first = rtsFalse;
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
} else {
// Yield the capability to higher-priority tasks if necessary.
yieldCapability(&cap, task);
}
#endif
- ASSERT(cap->running_task == task);
- ASSERT(task->cap == cap);
- ASSERT(myTask() == task);
+#if defined(THREADED_RTS)
+ schedulePushWork(cap,task);
+#endif
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
stg_exit(EXIT_FAILURE);
}
+ // The interruption / shutdown sequence.
+ //
+ // In order to cleanly shut down the runtime, we want to:
+ // * make sure that all main threads return to their callers
+ // with the state 'Interrupted'.
+ // * clean up all OS threads assocated with the runtime
+ // * free all memory etc.
//
- // Test for interruption. If interrupted==rtsTrue, then either
- // we received a keyboard interrupt (^C), or the scheduler is
- // trying to shut down all the tasks (shutting_down_scheduler) in
- // the threaded RTS.
+ // So the sequence for ^C goes like this:
//
- if (interrupted) {
- deleteRunQueue(cap);
- if (shutting_down_scheduler) {
- IF_DEBUG(scheduler, sched_belch("shutting down"));
- // If we are a worker, just exit. If we're a bound thread
- // then we will exit below when we've removed our TSO from
- // the run queue.
- if (task->tso == NULL) {
- return cap;
- }
- } else {
- IF_DEBUG(scheduler, sched_belch("interrupted"));
+ // * ^C handler sets sched_state := SCHED_INTERRUPTING and
+ // arranges for some Capability to wake up
+ //
+ // * all threads in the system are halted, and the zombies are
+ // placed on the run queue for cleaning up. We acquire all
+ // the capabilities in order to delete the threads, this is
+ // done by scheduleDoGC() for convenience (because GC already
+ // needs to acquire all the capabilities). We can't kill
+ // threads involved in foreign calls.
+ //
+ // * sched_state := SCHED_INTERRUPTED
+ //
+ // * somebody calls shutdownHaskell(), which calls exitScheduler()
+ //
+ // * sched_state := SCHED_SHUTTING_DOWN
+ //
+ // * all workers exit when the run queue on their capability
+ // drains. All main threads will also exit when their TSO
+ // reaches the head of the run queue and they can return.
+ //
+ // * eventually all Capabilities will shut down, and the RTS can
+ // exit.
+ //
+ // * We might be left with threads blocked in foreign calls,
+ // we should really attempt to kill these somehow (TODO);
+
+ switch (sched_state) {
+ case SCHED_RUNNING:
+ break;
+ case SCHED_INTERRUPTING:
+ IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
+#if defined(THREADED_RTS)
+ discardSparksCap(cap);
+#endif
+ /* scheduleDoGC() deletes all the threads */
+ cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
+ break;
+ case SCHED_INTERRUPTED:
+ IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
+ break;
+ case SCHED_SHUTTING_DOWN:
+ IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
+ // If we are a worker, just exit. If we're a bound thread
+ // then we will exit below when we've removed our TSO from
+ // the run queue.
+ if (task->tso == NULL && emptyRunQueue(cap)) {
+ return cap;
}
+ break;
+ default:
+ barf("sched_state: %d", sched_state);
}
-#if defined(not_yet) && defined(SMP)
- //
- // Top up the run queue from our spark pool. We try to make the
- // number of threads in the run queue equal to the number of
- // free capabilities.
- //
+#if defined(THREADED_RTS)
+ // If the run queue is empty, take a spark and turn it into a thread.
{
- StgClosure *spark;
- if (emptyRunQueue()) {
- spark = findSpark(rtsFalse);
- if (spark == NULL) {
- break; /* no more sparks in the pool */
- } else {
- createSparkThread(spark);
+ if (emptyRunQueue(cap)) {
+ StgClosure *spark;
+ spark = findSpark(cap);
+ if (spark != NULL) {
IF_DEBUG(scheduler,
- sched_belch("==^^ turning spark of closure %p into a thread",
+ sched_belch("turning spark of closure %p into a thread",
(StgClosure *)spark));
+ createSparkThread(cap,spark);
}
}
}
-#endif // SMP
+#endif // THREADED_RTS
- scheduleStartSignalHandlers();
+ scheduleStartSignalHandlers(cap);
// Only check the black holes here if we've nothing else to do.
// During normal execution, the black hole list only gets checked
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+ // Any threads that were woken up by other Capabilities get
+ // appended to our run queue.
+ if (!emptyWakeupQueue(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ if (emptyRunQueue(cap)) {
+ cap->run_queue_hd = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ } else {
+ cap->run_queue_tl->link = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ }
+ cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+ RELEASE_LOCK(&cap->lock);
+ }
+
scheduleCheckBlockedThreads(cap);
scheduleDetectDeadlock(cap,task);
+#if defined(THREADED_RTS)
+ cap = task->cap; // reload cap, it might have changed
+#endif
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
// as a result of a console event having been delivered.
if ( emptyRunQueue(cap) ) {
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
- ASSERT(interrupted);
+ ASSERT(sched_state >= SCHED_INTERRUPTING);
#endif
continue; // nothing to do
}
// ----------------------------------------------------------------------
// Run the current thread
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT(t->cap == cap);
+
prev_what_next = t->what_next;
errno = t->saved_errno;
cap->in_haskell = rtsTrue;
+ dirtyTSO(t);
+
recent_activity = ACTIVITY_YES;
switch (prev_what_next) {
cap->in_haskell = rtsFalse;
-#ifdef SMP
+ // The TSO might have moved, eg. if it re-entered the RTS and a GC
+ // happened. So find the new location:
+ t = cap->r.rCurrentTSO;
+
+ // We have run some Haskell code: there might be blackhole-blocked
+ // threads to wake up now.
+ // Lock-free test here should be ok, we're just setting a flag.
+ if ( blackhole_queue != END_TSO_QUEUE ) {
+ blackholes_need_checking = rtsTrue;
+ }
+
+ // And save the current errno in this thread.
+ // XXX: possibly bogus for SMP because this thread might already
+ // be running again, see code below.
+ t->saved_errno = errno;
+
+#if defined(THREADED_RTS)
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
// is blocked on, and may in fact already have been woken up,
// perhaps even on a different Capability. It may be the case
// that task->cap != cap. We better yield this Capability
// immediately and return to normaility.
- if (ret == ThreadBlocked) continue;
+ if (ret == ThreadBlocked) {
+ IF_DEBUG(scheduler,
+ sched_belch("--<< thread %d (%s) stopped: blocked\n",
+ t->id, whatNext_strs[t->what_next]));
+ continue;
+ }
#endif
- ASSERT(cap->running_task == task);
- ASSERT(task->cap == cap);
- ASSERT(myTask() == task);
-
- // The TSO might have moved, eg. if it re-entered the RTS and a GC
- // happened. So find the new location:
- t = cap->r.rCurrentTSO;
-
- // And save the current errno in this thread.
- t->saved_errno = errno;
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ ASSERT(t->cap == cap);
// ----------------------------------------------------------------------
CCCS = CCS_SYSTEM;
#endif
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- // Lock-free test here should be ok, we're just setting a flag.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
- }
-
#if defined(THREADED_RTS)
IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
#elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
case ThreadFinished:
if (scheduleHandleThreadFinished(cap, task, t)) return cap;
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
break;
default:
}
if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
- if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
+ if (ready_to_gc) {
+ cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
+ }
} /* end of while() */
IF_PAR_DEBUG(verbose,
#endif
}
+/* -----------------------------------------------------------------------------
+ * schedulePushWork()
+ *
+ * Push work to other Capabilities if we have some.
+ * -------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+static void
+schedulePushWork(Capability *cap USED_IF_THREADS,
+ Task *task USED_IF_THREADS)
+{
+ Capability *free_caps[n_capabilities], *cap0;
+ nat i, n_free_caps;
+
+ // migration can be turned off with +RTS -qg
+ if (!RtsFlags.ParFlags.migrate) return;
+
+ // Check whether we have more threads on our run queue, or sparks
+ // in our pool, that we could hand to another Capability.
+ if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
+ && sparkPoolSizeCap(cap) < 2) {
+ return;
+ }
+
+ // First grab as many free Capabilities as we can.
+ for (i=0, n_free_caps=0; i < n_capabilities; i++) {
+ cap0 = &capabilities[i];
+ if (cap != cap0 && tryGrabCapability(cap0,task)) {
+ if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+ // it already has some work, we just grabbed it at
+ // the wrong moment. Or maybe it's deadlocked!
+ releaseCapability(cap0);
+ } else {
+ free_caps[n_free_caps++] = cap0;
+ }
+ }
+ }
+
+ // we now have n_free_caps free capabilities stashed in
+ // free_caps[]. Share our run queue equally with them. This is
+ // probably the simplest thing we could do; improvements we might
+ // want to do include:
+ //
+ // - giving high priority to moving relatively new threads, on
+ // the gournds that they haven't had time to build up a
+ // working set in the cache on this CPU/Capability.
+ //
+ // - giving low priority to moving long-lived threads
+
+ if (n_free_caps > 0) {
+ StgTSO *prev, *t, *next;
+ rtsBool pushed_to_all;
+
+ IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
+
+ i = 0;
+ pushed_to_all = rtsFalse;
+
+ if (cap->run_queue_hd != END_TSO_QUEUE) {
+ prev = cap->run_queue_hd;
+ t = prev->link;
+ prev->link = END_TSO_QUEUE;
+ for (; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+ t->link = END_TSO_QUEUE;
+ if (t->what_next == ThreadRelocated
+ || t->bound == task) { // don't move my bound thread
+ prev->link = t;
+ prev = t;
+ } else if (i == n_free_caps) {
+ pushed_to_all = rtsTrue;
+ i = 0;
+ // keep one for us
+ prev->link = t;
+ prev = t;
+ } else {
+ IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
+ appendToRunQueue(free_caps[i],t);
+ if (t->bound) { t->bound->cap = free_caps[i]; }
+ t->cap = free_caps[i];
+ i++;
+ }
+ }
+ cap->run_queue_tl = prev;
+ }
+
+ // If there are some free capabilities that we didn't push any
+ // threads to, then try to push a spark to each one.
+ if (!pushed_to_all) {
+ StgClosure *spark;
+ // i is the next free capability to push to
+ for (; i < n_free_caps; i++) {
+ if (emptySparkPoolCap(free_caps[i])) {
+ spark = findSpark(cap);
+ if (spark != NULL) {
+ IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
+ newSpark(&(free_caps[i]->r), spark);
+ }
+ }
+ }
+ }
+
+ // release the capabilities
+ for (i = 0; i < n_free_caps; i++) {
+ task->cap = free_caps[i];
+ releaseCapability(free_caps[i]);
+ }
+ }
+ task->cap = cap; // reset to point to our Capability.
+}
+#endif
+
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
* ------------------------------------------------------------------------- */
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
static void
-scheduleStartSignalHandlers(void)
+scheduleStartSignalHandlers(Capability *cap)
{
-#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
if (signals_pending()) { // safe outside the lock
- startSignalHandlers();
+ startSignalHandlers(cap);
}
-#endif
}
+#else
+static void
+scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
+{
+}
+#endif
/* ----------------------------------------------------------------------------
* Check for blocked threads that can be woken up.
* ------------------------------------------------------------------------- */
static void
-scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
+scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
{
#if !defined(THREADED_RTS)
//
{
#if defined(PARALLEL_HASKELL)
- // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
+ // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
return;
#endif
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- scheduleDoGC( cap, task, rtsTrue/*force major GC*/ );
+ cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/, GetRoots);
+
recent_activity = ACTIVITY_DONE_GC;
if ( !emptyRunQueue(cap) ) return;
-#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
awaitUserSignals();
if (signals_pending()) {
- startSignalHandlers();
+ startSignalHandlers(cap);
}
// either we have threads to run, or we were interrupted:
- ASSERT(!emptyRunQueue(cap) || interrupted);
+ ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
}
#endif
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
-#if !defined(SMP)
+#if !defined(THREADED_RTS)
ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
g0s0 == cap->r.rNursery);
#endif
/* enlarge the stack */
StgTSO *new_t = threadStackOverflow(cap, t);
- /* This TSO has moved, so update any pointers to it from the
- * main thread stack. It better not be on any other queues...
- * (it shouldn't be).
+ /* The TSO attached to this Task may have moved, so update the
+ * pointer to it.
*/
- if (task->tso != NULL) {
+ if (task->tso == t) {
task->tso = new_t;
}
pushOnRunQueue(cap,new_t);
// has tidied up its stack and placed itself on whatever queue
// it needs to be on.
-#if !defined(SMP)
+#if !defined(THREADED_RTS)
ASSERT(t->why_blocked != NotBlocked);
- // This might not be true under SMP: we don't have
+ // This might not be true under THREADED_RTS: we don't have
// exclusive access to this TSO, so someone might have
// woken it up by now. This actually happens: try
// conc023 +RTS -N2.
if (task->ret) {
*(task->ret) = NULL;
}
- if (interrupted) {
+ if (sched_state >= SCHED_INTERRUPTING) {
task->stat = Interrupted;
} else {
task->stat = Killed;
if (performHeapProfile ||
(RtsFlags.ProfFlags.profileInterval==0 &&
RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
+
+ // checking black holes is necessary before GC, otherwise
+ // there may be threads that are unreachable except by the
+ // blackhole queue, which the GC will consider to be
+ // deadlocked.
+ scheduleCheckBlackHoles(&MainCapability);
+
+ IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
GarbageCollect(GetRoots, rtsTrue);
+
+ IF_DEBUG(scheduler, sched_belch("performing heap census"));
heapCensus();
+
performHeapProfile = rtsFalse;
return rtsTrue; // true <=> we already GC'd
}
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
-static void
-scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
+static Capability *
+scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
+ rtsBool force_major, void (*get_roots)(evac_fn))
{
StgTSO *t;
-#ifdef SMP
+#ifdef THREADED_RTS
static volatile StgWord waiting_for_gc;
rtsBool was_waiting;
nat i;
#endif
-#ifdef SMP
+#ifdef THREADED_RTS
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
//
was_waiting = cas(&waiting_for_gc, 0, 1);
- if (was_waiting) return;
+ if (was_waiting) {
+ do {
+ IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
+ if (cap) yieldCapability(&cap,task);
+ } while (waiting_for_gc);
+ return cap; // NOTE: task->cap might have changed here
+ }
for (i=0; i < n_capabilities; i++) {
IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = pcap;
+ context_switch = 1;
waitForReturnCapability(&pcap, task);
if (pcap != &capabilities[i]) {
barf("scheduleDoGC: got the wrong capability");
// ATOMICALLY_FRAME, aborting the (nested)
// transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
- raiseAsync_(cap, t, NULL, rtsTrue);
+ raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
// so this happens periodically:
- scheduleCheckBlackHoles(cap);
+ if (cap) scheduleCheckBlackHoles(cap);
IF_DEBUG(scheduler, printAllThreads());
+ /*
+ * We now have all the capabilities; if we're in an interrupting
+ * state, then we should take the opportunity to delete all the
+ * threads in the system.
+ */
+ if (sched_state >= SCHED_INTERRUPTING) {
+ deleteAllThreads(&capabilities[0]);
+ sched_state = SCHED_INTERRUPTED;
+ }
+
/* everybody back, start the GC.
* Could do it in this thread, or signal a condition var
* to do it in another thread. Either way, we need to
#if defined(THREADED_RTS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots, force_major);
+ GarbageCollect(get_roots, force_major);
-#if defined(SMP)
+#if defined(THREADED_RTS)
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
releaseCapability(&capabilities[i]);
}
}
- task->cap = cap;
+ if (cap) {
+ task->cap = cap;
+ } else {
+ task->cap = NULL;
+ }
#endif
#if defined(GRAN)
G_EVENTQ(0);
G_CURR_THREADQ(0));
#endif /* GRAN */
+
+ return cap;
}
/* ---------------------------------------------------------------------------
* ------------------------------------------------------------------------- */
StgBool
-isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
+isThreadBound(StgTSO* tso USED_IF_THREADS)
{
#if defined(THREADED_RTS)
return (tso->bound != NULL);
* Singleton fork(). Do not copy any running threads.
* ------------------------------------------------------------------------- */
-#if !defined(mingw32_HOST_OS) && !defined(SMP)
+#if !defined(mingw32_HOST_OS)
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(Capability *cap, StgTSO *tso);
+deleteThread_(Capability *cap, StgTSO *tso);
#endif
StgInt
forkProcess(HsStablePtr *entry
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
+ Task *task;
pid_t pid;
StgTSO* t,*next;
- Task *task;
Capability *cap;
+#if defined(THREADED_RTS)
+ if (RtsFlags.ParFlags.nNodes > 1) {
+ errorBelch("forking not supported with +RTS -N<n> greater than 1");
+ stg_exit(EXIT_FAILURE);
+ }
+#endif
+
IF_DEBUG(scheduler,sched_belch("forking!"));
// ToDo: for SMP, we should probably acquire *all* the capabilities
} else { // child
- // delete all threads
- cap->run_queue_hd = END_TSO_QUEUE;
- cap->run_queue_tl = END_TSO_QUEUE;
-
+ // Now, all OS threads except the thread that forked are
+ // stopped. We need to stop all Haskell threads, including
+ // those involved in foreign calls. Also we need to delete
+ // all Tasks, because they correspond to OS threads that are
+ // now gone.
+
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->link;
-
- // don't allow threads to catch the ThreadKilled exception
- deleteThreadImmediately(cap,t);
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ // don't allow threads to catch the ThreadKilled
+ // exception, but we do want to raiseAsync() because these
+ // threads may be evaluating thunks that we need later.
+ deleteThread_(cap,t);
+ }
}
- // wipe the main thread list
- while ((task = all_tasks) != NULL) {
- all_tasks = task->all_link;
- discardTask(task);
+ // Empty the run queue. It seems tempting to let all the
+ // killed threads stay on the run queue as zombies to be
+ // cleaned up later, but some of them correspond to bound
+ // threads for which the corresponding Task does not exist.
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+ // Any suspended C-calling Tasks are no more, their OS threads
+ // don't exist now:
+ cap->suspended_ccalling_tasks = NULL;
+
+ // Empty the all_threads list. Otherwise, the garbage
+ // collector may attempt to resurrect some of these threads.
+ all_threads = END_TSO_QUEUE;
+
+ // Wipe the task list, except the current Task.
+ ACQUIRE_LOCK(&sched_mutex);
+ for (task = all_tasks; task != NULL; task=task->all_link) {
+ if (task != cap->running_task) {
+ discardTask(task);
+ }
}
-
+ RELEASE_LOCK(&sched_mutex);
+
+#if defined(THREADED_RTS)
+ // Wipe our spare workers list, they no longer exist. New
+ // workers will be created if necessary.
+ cap->spare_workers = NULL;
+ cap->returning_tasks_hd = NULL;
+ cap->returning_tasks_tl = NULL;
+#endif
+
cap = rts_evalStableIO(cap, entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",cap);
}
/* ---------------------------------------------------------------------------
- * Delete the threads on the run queue of the current capability.
+ * Delete all the threads in the system
* ------------------------------------------------------------------------- */
static void
-deleteRunQueue (Capability *cap)
+deleteAllThreads ( Capability *cap )
{
- StgTSO *t, *next;
- for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
- ASSERT(t->what_next != ThreadRelocated);
- next = t->link;
- deleteThread(cap, t);
- }
-}
+ StgTSO* t, *next;
+ IF_DEBUG(scheduler,sched_belch("deleting all threads"));
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ deleteThread(cap,t);
+ }
+ }
-/* startThread and insertThread are now in GranSim.c -- HWL */
+ // The run queue now contains a bunch of ThreadKilled threads. We
+ // must not throw these away: the main thread(s) will be in there
+ // somewhere, and the main scheduler loop has to deal with it.
+ // Also, the run queue is the only thing keeping these threads from
+ // being GC'd, and we don't want the "main thread has been GC'd" panic.
+#if !defined(THREADED_RTS)
+ ASSERT(blocked_queue_hd == END_TSO_QUEUE);
+ ASSERT(sleeping_queue == END_TSO_QUEUE);
+#endif
+}
/* -----------------------------------------------------------------------------
Managing the suspended_ccalling_tasks list.
// XXX this might not be necessary --SDM
tso->what_next = ThreadRunGHC;
- threadPaused(tso);
+ threadPaused(cap,tso);
if(tso->blocked_exceptions == NULL) {
tso->why_blocked = BlockedOnCCall;
cap->in_haskell = rtsTrue;
errno = saved_errno;
+ /* We might have GC'd, mark the TSO dirty again */
+ dirtyTSO(tso);
+
+ IF_DEBUG(sanity, checkTSO(tso));
+
return &cap->r;
}
tso->why_blocked = NotBlocked;
tso->blocked_exceptions = NULL;
+ tso->flags = TSO_DIRTY;
tso->saved_errno = 0;
tso->bound = NULL;
+ tso->cap = cap;
tso->stack_size = stack_size;
tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize)
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
tso->bound = task;
+ tso->cap = cap;
task->tso = tso;
task->ret = ret;
cap = schedule(cap,task);
ASSERT(task->stat != NoStatus);
- ASSERT(cap->running_task == task);
- ASSERT(task->cap == cap);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
return cap;
all_threads = END_TSO_QUEUE;
context_switch = 0;
- interrupted = 0;
+ sched_state = SCHED_RUNNING;
RtsFlags.ConcFlags.ctxtSwitchTicks =
RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
/* A capability holds the state a native thread needs in
* order to execute STG code. At least one capability is
- * floating around (only SMP builds have more than one).
+ * floating around (only THREADED_RTS builds have more than one).
*/
initCapabilities();
initTaskManager();
-#if defined(SMP)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+ initSparkPools();
+#endif
+
+#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
* Capability 0. The idea is that we're probably going to start a
}
#endif
-#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
- initSparkPools();
-#endif
-
RELEASE_LOCK(&sched_mutex);
}
void
exitScheduler( void )
{
- interrupted = rtsTrue;
- shutting_down_scheduler = rtsTrue;
+ Task *task = NULL;
+
+#if defined(THREADED_RTS)
+ ACQUIRE_LOCK(&sched_mutex);
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
+#endif
+
+ // If we haven't killed all the threads yet, do it now.
+ if (sched_state < SCHED_INTERRUPTED) {
+ sched_state = SCHED_INTERRUPTING;
+ scheduleDoGC(NULL,task,rtsFalse,GetRoots);
+ }
+ sched_state = SCHED_SHUTTING_DOWN;
#if defined(THREADED_RTS)
{
- Task *task;
nat i;
- ACQUIRE_LOCK(&sched_mutex);
- task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
-
for (i = 0; i < n_capabilities; i++) {
shutdownCapability(&capabilities[i], task);
}
for (i = 0; i < n_capabilities; i++) {
cap = &capabilities[i];
- evac((StgClosure **)&cap->run_queue_hd);
- evac((StgClosure **)&cap->run_queue_tl);
-
+ evac((StgClosure **)(void *)&cap->run_queue_hd);
+ evac((StgClosure **)(void *)&cap->run_queue_tl);
+#if defined(THREADED_RTS)
+ evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
+ evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
+#endif
for (task = cap->suspended_ccalling_tasks; task != NULL;
task=task->next) {
- evac((StgClosure **)&task->suspended_tso);
+ IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
+ evac((StgClosure **)(void *)&task->suspended_tso);
}
+
}
+
#if !defined(THREADED_RTS)
- evac((StgClosure **)&blocked_queue_hd);
- evac((StgClosure **)&blocked_queue_tl);
- evac((StgClosure **)&sleeping_queue);
+ evac((StgClosure **)(void *)&blocked_queue_hd);
+ evac((StgClosure **)(void *)&blocked_queue_tl);
+ evac((StgClosure **)(void *)&sleeping_queue);
#endif
#endif
- evac((StgClosure **)&blackhole_queue);
+ // evac((StgClosure **)&blackhole_queue);
-#if defined(PARALLEL_HASKELL) || defined(GRAN)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
markSparkQueue(evac);
#endif
static void (*extra_roots)(evac_fn);
+static void
+performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
+{
+ Task *task = myTask();
+
+ if (task == NULL) {
+ ACQUIRE_LOCK(&sched_mutex);
+ task = newBoundTask();
+ RELEASE_LOCK(&sched_mutex);
+ scheduleDoGC(NULL,task,force_major, get_roots);
+ boundTaskExiting(task);
+ } else {
+ scheduleDoGC(NULL,task,force_major, get_roots);
+ }
+}
+
void
performGC(void)
{
-#ifdef THREADED_RTS
- // ToDo: we have to grab all the capabilities here.
- errorBelch("performGC not supported in threaded RTS (yet)");
- stg_exit(EXIT_FAILURE);
-#endif
- /* Obligated to hold this lock upon entry */
- GarbageCollect(GetRoots,rtsFalse);
+ performGC_(rtsFalse, GetRoots);
}
void
performMajorGC(void)
{
-#ifdef THREADED_RTS
- errorBelch("performMayjorGC not supported in threaded RTS (yet)");
- stg_exit(EXIT_FAILURE);
-#endif
- GarbageCollect(GetRoots,rtsTrue);
+ performGC_(rtsTrue, GetRoots);
}
static void
void
performGCWithRoots(void (*get_roots)(evac_fn))
{
-#ifdef THREADED_RTS
- errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
- stg_exit(EXIT_FAILURE);
-#endif
extra_roots = get_roots;
- GarbageCollect(AllRoots,rtsFalse);
+ performGC_(rtsFalse, AllRoots);
}
/* -----------------------------------------------------------------------------
new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
- IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size));
+ IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
dest = (StgTSO *)allocate(new_tso_size);
TICK_ALLOC_TSO(new_stack_size,0);
ASSERT(get_itbl(tso)->type == TSO);
ASSERT(tso->why_blocked != NotBlocked);
+
tso->why_blocked = NotBlocked;
next = tso->link;
tso->link = END_TSO_QUEUE;
- // We might have just migrated this TSO to our Capability:
- if (tso->bound) {
- tso->bound->cap = cap;
+ if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+ // We are waking up this thread on the current Capability, which
+ // might involve migrating it from the Capability it was last on.
+ if (tso->bound) {
+ ASSERT(tso->bound->cap == tso->cap);
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
+ appendToRunQueue(cap,tso);
+ // we're holding a newly woken thread, make sure we context switch
+ // quickly so we can migrate it if necessary.
+ context_switch = 1;
+ } else {
+ // we'll try to wake it up on the Capability it was last on.
+ wakeupThreadOnCapability(tso->cap, tso);
}
- appendToRunQueue(cap,tso);
-
- // we're holding a newly woken thread, make sure we context switch
- // quickly so we can migrate it if necessary.
- context_switch = 1;
- IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
+ IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
return next;
}
void
interruptStgRts(void)
{
- interrupted = 1;
+ sched_state = SCHED_INTERRUPTING;
context_switch = 1;
#if defined(THREADED_RTS)
prodAllCapabilities();
tso->why_blocked = NotBlocked;
tso->block_info.closure = NULL;
appendToRunQueue(cap,tso);
+
+ // We might have just migrated this TSO to our Capability:
+ if (tso->bound) {
+ tso->bound->cap = cap;
+ }
+ tso->cap = cap;
}
#endif
* CATCH_FRAME on the stack. In either case, we strip the entire
* stack and replace the thread with a zombie.
*
- * ToDo: in SMP mode, this function is only safe if either (a) we hold
- * all the Capabilities (eg. in GC), or (b) we own the Capability that
- * the TSO is currently blocked on or on the run queue of.
+ * ToDo: in THREADED_RTS mode, this function is only safe if either
+ * (a) we hold all the Capabilities (eg. in GC, or if there is only
+ * one Capability), or (b) we own the Capability that the TSO is
+ * currently blocked on or on the run queue of.
*
* -------------------------------------------------------------------------- */
void
raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
{
- raiseAsync_(cap, tso, exception, rtsFalse);
+ raiseAsync_(cap, tso, exception, rtsFalse, NULL);
+}
+
+void
+suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+{
+ raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
}
static void
raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically)
+ rtsBool stop_at_atomically, StgPtr stop_here)
{
StgRetInfoTable *info;
- StgPtr sp;
+ StgPtr sp, frame;
+ nat i;
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
// Remove it from any blocking queues
unblockThread(cap,tso);
+ // mark it dirty; we're about to change its stack.
+ dirtyTSO(tso);
+
sp = tso->sp;
// The stack freezing code assumes there's a closure pointer on
sp[0] = (W_)&stg_dummy_ret_closure;
}
- while (1) {
- nat i;
+ frame = sp + 1;
+ while (stop_here == NULL || frame < stop_here) {
// 1. Let the top of the stack be the "current closure"
//
// NB: if we pass an ATOMICALLY_FRAME then abort the associated
// transaction
-
- StgPtr frame;
-
- frame = sp + 1;
info = get_ret_itbl((StgClosure *)frame);
-
- while (info->i.type != UPDATE_FRAME
- && (info->i.type != CATCH_FRAME || exception == NULL)
- && info->i.type != STOP_FRAME
- && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
- {
- if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
- // IF we find an ATOMICALLY_FRAME then we abort the
- // current transaction and propagate the exception. In
- // this case (unlike ordinary exceptions) we do not care
- // whether the transaction is valid or not because its
- // possible validity cannot have caused the exception
- // and will not be visible after the abort.
- IF_DEBUG(stm,
- debugBelch("Found atomically block delivering async exception\n"));
- stmAbortTransaction(tso -> trec);
- tso -> trec = stmGetEnclosingTRec(tso -> trec);
- }
- frame += stack_frame_sizeW((StgClosure *)frame);
- info = get_ret_itbl((StgClosure *)frame);
- }
-
+
switch (info->i.type) {
-
- case ATOMICALLY_FRAME:
- ASSERT(stop_at_atomically);
- ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
- stmCondemnTransaction(tso -> trec);
-#ifdef REG_R1
- tso->sp = frame;
-#else
- // R1 is not a register: the return convention for IO in
- // this case puts the return value on the stack, so we
- // need to set up the stack to return to the atomically
- // frame properly...
- tso->sp = frame - 2;
- tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
- tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
- tso->what_next = ThreadRunGHC;
- return;
- case CATCH_FRAME:
- // If we find a CATCH_FRAME, and we've got an exception to raise,
- // then build the THUNK raise(exception), and leave it on
- // top of the CATCH_FRAME ready to enter.
- //
- {
-#ifdef PROFILING
- StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
- StgThunk *raise;
-
- // we've got an exception to raise, so let's pass it to the
- // handler in this frame.
- //
- raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
- TICK_ALLOC_SE_THK(1,0);
- SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
- raise->payload[0] = exception;
-
- // throw away the stack from Sp up to the CATCH_FRAME.
- //
- sp = frame - 1;
-
- /* Ensure that async excpetions are blocked now, so we don't get
- * a surprise exception before we get around to executing the
- * handler.
- */
- if (tso->blocked_exceptions == NULL) {
- tso->blocked_exceptions = END_TSO_QUEUE;
- }
-
- /* Put the newly-built THUNK on top of the stack, ready to execute
- * when the thread restarts.
- */
- sp[0] = (W_)raise;
- sp[-1] = (W_)&stg_enter_info;
- tso->sp = sp-1;
- tso->what_next = ThreadRunGHC;
- IF_DEBUG(sanity, checkTSO(tso));
- return;
- }
-
case UPDATE_FRAME:
{
StgAP_STACK * ap;
printObj((StgClosure *)ap);
);
- // Replace the updatee with an indirection - happily
- // this will also wake up any threads currently
- // waiting on the result.
+ // Replace the updatee with an indirection
//
// Warning: if we're in a loop, more than one update frame on
// the stack may point to the same object. Be careful not to
}
sp += sizeofW(StgUpdateFrame) - 1;
sp[0] = (W_)ap; // push onto stack
- break;
+ frame = sp + 1;
+ continue; //no need to bump frame
}
-
+
case STOP_FRAME:
// We've stripped the entire stack, the thread is now dead.
- sp += sizeofW(StgStopFrame);
tso->what_next = ThreadKilled;
- tso->sp = sp;
+ tso->sp = frame + sizeofW(StgStopFrame);
return;
+
+ case CATCH_FRAME:
+ // If we find a CATCH_FRAME, and we've got an exception to raise,
+ // then build the THUNK raise(exception), and leave it on
+ // top of the CATCH_FRAME ready to enter.
+ //
+ {
+#ifdef PROFILING
+ StgCatchFrame *cf = (StgCatchFrame *)frame;
+#endif
+ StgThunk *raise;
+
+ if (exception == NULL) break;
+
+ // we've got an exception to raise, so let's pass it to the
+ // handler in this frame.
+ //
+ raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+ TICK_ALLOC_SE_THK(1,0);
+ SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
+ raise->payload[0] = exception;
+
+ // throw away the stack from Sp up to the CATCH_FRAME.
+ //
+ sp = frame - 1;
+
+ /* Ensure that async excpetions are blocked now, so we don't get
+ * a surprise exception before we get around to executing the
+ * handler.
+ */
+ if (tso->blocked_exceptions == NULL) {
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ }
+
+ /* Put the newly-built THUNK on top of the stack, ready to execute
+ * when the thread restarts.
+ */
+ sp[0] = (W_)raise;
+ sp[-1] = (W_)&stg_enter_info;
+ tso->sp = sp-1;
+ tso->what_next = ThreadRunGHC;
+ IF_DEBUG(sanity, checkTSO(tso));
+ return;
+ }
+
+ case ATOMICALLY_FRAME:
+ if (stop_at_atomically) {
+ ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ stmCondemnTransaction(cap, tso -> trec);
+#ifdef REG_R1
+ tso->sp = frame;
+#else
+ // R1 is not a register: the return convention for IO in
+ // this case puts the return value on the stack, so we
+ // need to set up the stack to return to the atomically
+ // frame properly...
+ tso->sp = frame - 2;
+ tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+ tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+ tso->what_next = ThreadRunGHC;
+ return;
+ }
+ // Not stop_at_atomically... fall through and abort the
+ // transaction.
+
+ case CATCH_RETRY_FRAME:
+ // IF we find an ATOMICALLY_FRAME then we abort the
+ // current transaction and propagate the exception. In
+ // this case (unlike ordinary exceptions) we do not care
+ // whether the transaction is valid or not because its
+ // possible validity cannot have caused the exception
+ // and will not be visible after the abort.
+ IF_DEBUG(stm,
+ debugBelch("Found atomically block delivering async exception\n"));
+ StgTRecHeader *trec = tso -> trec;
+ StgTRecHeader *outer = stmGetEnclosingTRec(trec);
+ stmAbortTransaction(cap, trec);
+ tso -> trec = outer;
+ break;
default:
- barf("raiseAsync");
+ break;
}
+
+ // move on to the next stack frame
+ frame += stack_frame_sizeW((StgClosure *)frame);
}
- barf("raiseAsync");
+
+ // if we got here, then we stopped at stop_here
+ ASSERT(stop_here != NULL);
}
/* -----------------------------------------------------------------------------
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
static void
-deleteThreadImmediately(Capability *cap, StgTSO *tso)
+deleteThread_(Capability *cap, StgTSO *tso)
{ // for forkProcess only:
- // delete thread without giving it a chance to catch the KillThread exception
+ // like deleteThread(), but we delete threads in foreign calls, too.
- if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
- return;
- }
-
- if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- unblockThread(cap,tso);
- }
-
- tso->what_next = ThreadKilled;
+ if (tso->why_blocked == BlockedOnCCall ||
+ tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
+ unblockOne(cap,tso);
+ tso->what_next = ThreadKilled;
+ } else {
+ deleteThread(cap,tso);
+ }
}
#endif
// thunks which are currently under evaluataion.
//
- //
+ // OLD COMMENT (we don't have MIN_UPD_SIZE now):
// LDV profiling: stg_raise_info has THUNK as its closure
// type. Since a THUNK takes at least MIN_UPD_SIZE words in its
// payload, MIN_UPD_SIZE is more approprate than 1. It seems that
// Only create raise_closure if we need to.
if (raise_closure == NULL) {
raise_closure =
- (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
+ (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
all_threads = tso;
IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
- // Wake up the thread on the Capability it was last on for a
- // bound thread, or last_free_capability otherwise.
- if (tso->bound) {
- cap = tso->bound->cap;
- } else {
- cap = last_free_capability;
- }
+ // Wake up the thread on the Capability it was last on
+ cap = tso->cap;
switch (tso->why_blocked) {
case BlockedOnMVar:
}
}
-static void
-printThreadStatus(StgTSO *tso)
+void
+printThreadStatus(StgTSO *t)
{
- switch (tso->what_next) {
- case ThreadKilled:
- debugBelch("has been killed");
- break;
- case ThreadComplete:
- debugBelch("has completed");
- break;
- default:
- printThreadBlockage(tso);
- }
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
+ {
+ void *label = lookupThreadLabel(t->id);
+ if (label) debugBelch("[\"%s\"] ",(char *)label);
+ }
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ switch (t->what_next) {
+ case ThreadKilled:
+ debugBelch("has been killed");
+ break;
+ case ThreadComplete:
+ debugBelch("has completed");
+ break;
+ default:
+ printThreadBlockage(t);
+ }
+ debugBelch("\n");
+ }
}
void
printAllThreads(void)
{
- StgTSO *t;
+ StgTSO *t, *next;
+ nat i;
+ Capability *cap;
# if defined(GRAN)
char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
debugBelch("all threads:\n");
# endif
- for (t = all_threads; t != END_TSO_QUEUE; ) {
- debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
- {
- void *label = lookupThreadLabel(t->id);
- if (label) debugBelch("[\"%s\"] ",(char *)label);
- }
- if (t->what_next == ThreadRelocated) {
- debugBelch("has been relocated...\n");
- t = t->link;
- } else {
- printThreadStatus(t);
- debugBelch("\n");
- t = t->global_link;
- }
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ debugBelch("threads on capability %d:\n", cap->no);
+ for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
+ printThreadStatus(t);
+ }
+ }
+
+ debugBelch("other threads:\n");
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->why_blocked != NotBlocked) {
+ printThreadStatus(t);
+ }
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ }
}
}
{
nat i = 0;
for (; t != END_TSO_QUEUE; t = t->link) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
- if (t->what_next == ThreadRelocated) {
- debugBelch("has been relocated...\n");
- } else {
- printThreadStatus(t);
- debugBelch("\n");
- }
+ printThreadStatus(t);
i++;
}
debugBelch("%d threads on queue\n", i);