/* flag set by signal handler to precipitate a context switch */
int context_switch = 0;
+/* flag that tracks whether we have done any execution in this time slice. */
+nat recent_activity = ACTIVITY_YES;
+
/* if this flag is set as well, give up execution */
rtsBool interrupted = rtsFalse;
-/* If this flag is set, we are running Haskell code. Used to detect
- * uses of 'foreign import unsafe' that should be 'safe'.
- */
-static rtsBool in_haskell = rtsFalse;
-
/* Next thread ID to allocate.
* Locks required: thread_id_mutex
*/
*/
StgTSO dummy_tso;
-# if defined(SMP)
-static Condition gc_pending_cond = INIT_COND_VAR;
-# endif
-
-static rtsBool ready_to_gc;
-
/*
* Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
* in an MT setting, needed to signal that a worker thread shouldn't hang around
// scheduler clearer.
//
static void schedulePreLoop(void);
-static void scheduleHandleInterrupt(void);
static void scheduleStartSignalHandlers(void);
static void scheduleCheckBlockedThreads(void);
static void scheduleCheckBlackHoles(void);
static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread,
Capability *cap, StgTSO *t );
-static void scheduleDoHeapProfile(void);
-static void scheduleDoGC(void);
+static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
+static void scheduleDoGC(rtsBool force_major);
static void unblockThread(StgTSO *tso);
static rtsBool checkBlackHoles(void);
static void printThreadBlockage(StgTSO *tso);
static void printThreadStatus(StgTSO *tso);
+void printThreadQueue(StgTSO *tso);
#if defined(PARALLEL_HASKELL)
StgTSO * createSparkThread(rtsSpark spark);
* ------------------------------------------------------------------------- */
#if defined(RTS_SUPPORTS_THREADS)
-static rtsBool startingWorkerThread = rtsFalse;
+static nat startingWorkerThread = 0;
static void
taskStart(void)
{
ACQUIRE_LOCK(&sched_mutex);
- startingWorkerThread = rtsFalse;
+ startingWorkerThread--;
schedule(NULL,NULL);
+ taskStop();
RELEASE_LOCK(&sched_mutex);
}
void
startSchedulerTaskIfNecessary(void)
{
- if(run_queue_hd != END_TSO_QUEUE
- || blocked_queue_hd != END_TSO_QUEUE
- || sleeping_queue != END_TSO_QUEUE)
- {
- if(!startingWorkerThread)
- { // we don't want to start another worker thread
- // just because the last one hasn't yet reached the
- // "waiting for capability" state
- startingWorkerThread = rtsTrue;
- if (!startTask(taskStart)) {
- startingWorkerThread = rtsFalse;
- }
+ if ( !EMPTY_RUN_QUEUE()
+ && !shutting_down_scheduler // not if we're shutting down
+ && startingWorkerThread==0)
+ {
+ // we don't want to start another worker thread
+ // just because the last one hasn't yet reached the
+ // "waiting for capability" state
+ startingWorkerThread++;
+ if (!maybeStartNewWorker(taskStart)) {
+ startingWorkerThread--;
+ }
}
- }
}
#endif
# endif
#endif
nat prev_what_next;
+ rtsBool ready_to_gc;
// Pre-condition: sched_mutex is held.
// We might have a capability, passed in as initialCapability.
CurrentTSO = event->tso;
#endif
- IF_DEBUG(scheduler, printAllThreads());
-
-#if defined(SMP)
- //
- // Wait until GC has completed, if necessary.
- //
- if (ready_to_gc) {
- if (cap != NULL) {
- releaseCapability(cap);
- IF_DEBUG(scheduler,sched_belch("waiting for GC"));
- waitCondition( &gc_pending_cond, &sched_mutex );
- }
- }
-#endif
-
#if defined(RTS_SUPPORTS_THREADS)
// Yield the capability to higher-priority tasks if necessary.
//
if (cap != NULL) {
- yieldCapability(&cap);
+ yieldCapability(&cap,
+ mainThread ? &mainThread->bound_thread_cond : NULL );
}
// If we do not currently hold a capability, we wait for one
// We now have a capability...
#endif
+#if 0 /* extra sanity checking */
+ {
+ StgMainThread *m;
+ for (m = main_threads; m != NULL; m = m->link) {
+ ASSERT(get_itbl(m->tso)->type == TSO);
+ }
+ }
+#endif
+
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
- if (in_haskell) {
+ if (cap->r.rInHaskell) {
errorBelch("schedule: re-entered unsafely.\n"
" Perhaps a 'foreign import unsafe' should be 'safe'?");
stg_exit(1);
}
- scheduleHandleInterrupt();
+ //
+ // Test for interruption. If interrupted==rtsTrue, then either
+ // we received a keyboard interrupt (^C), or the scheduler is
+ // trying to shut down all the tasks (shutting_down_scheduler) in
+ // the threaded RTS.
+ //
+ if (interrupted) {
+ if (shutting_down_scheduler) {
+ IF_DEBUG(scheduler, sched_belch("shutting down"));
+ releaseCapability(cap);
+ if (mainThread) {
+ mainThread->stat = Interrupted;
+ mainThread->ret = NULL;
+ }
+ return;
+ } else {
+ IF_DEBUG(scheduler, sched_belch("interrupted"));
+ deleteAllThreads();
+ }
+ }
#if defined(not_yet) && defined(SMP)
//
sched_belch("### thread %d bound to another OS thread", t->id));
// no, bound to a different Haskell thread: pass to that thread
PUSH_ON_RUN_QUEUE(t);
- passCapability(&m->bound_thread_cond);
continue;
}
}
else
{
if(mainThread != NULL)
- // The thread we want to run is bound.
+ // The thread we want to run is unbound.
{
IF_DEBUG(scheduler,
sched_belch("### this OS thread cannot run thread %d", t->id));
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
PUSH_ON_RUN_QUEUE(t);
- passCapabilityToWorker();
continue;
}
}
prev_what_next = t->what_next;
errno = t->saved_errno;
- in_haskell = rtsTrue;
+ cap->r.rInHaskell = rtsTrue;
+
+ recent_activity = ACTIVITY_YES;
switch (prev_what_next) {
barf("schedule: invalid what_next field");
}
+#if defined(SMP)
+ // in SMP mode, we might return with a different capability than
+ // we started with, if the Haskell thread made a foreign call. So
+ // let's find out what our current Capability is:
+ cap = myCapability();
+#endif
+
// We have run some Haskell code: there might be blackhole-blocked
// threads to wake up now.
if ( blackhole_queue != END_TSO_QUEUE ) {
blackholes_need_checking = rtsTrue;
}
- in_haskell = rtsFalse;
+ cap->r.rInHaskell = rtsFalse;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// happened. So find the new location:
schedulePostRunThread();
+ ready_to_gc = rtsFalse;
+
switch (ret) {
case HeapOverflow:
ready_to_gc = scheduleHandleHeapOverflow(cap,t);
case ThreadBlocked:
scheduleHandleThreadBlocked(t);
- threadPaused(t);
break;
case ThreadFinished:
barf("schedule: invalid thread return code %d", (int)ret);
}
- scheduleDoHeapProfile();
- scheduleDoGC();
+ if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
+ if (ready_to_gc) { scheduleDoGC(rtsFalse); }
} /* end of while() */
IF_PAR_DEBUG(verbose,
}
/* ----------------------------------------------------------------------------
- * Deal with the interrupt flag
- * ASSUMES: sched_mutex
- * ------------------------------------------------------------------------- */
-
-static
-void scheduleHandleInterrupt(void)
-{
- //
- // Test for interruption. If interrupted==rtsTrue, then either
- // we received a keyboard interrupt (^C), or the scheduler is
- // trying to shut down all the tasks (shutting_down_scheduler) in
- // the threaded RTS.
- //
- if (interrupted) {
- if (shutting_down_scheduler) {
- IF_DEBUG(scheduler, sched_belch("shutting down"));
-#if defined(RTS_SUPPORTS_THREADS)
- shutdownThread();
-#endif
- } else {
- IF_DEBUG(scheduler, sched_belch("interrupted"));
- deleteAllThreads();
- }
- }
-}
-
-/* ----------------------------------------------------------------------------
* Start any pending signal handlers
* ASSUMES: sched_mutex
* ------------------------------------------------------------------------- */
static void
scheduleStartSignalHandlers(void)
{
-#if defined(RTS_USER_SIGNALS)
+#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
if (signals_pending()) {
RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
startSignalHandlers();
#if defined(RTS_SUPPORTS_THREADS)
// We shouldn't be here...
barf("schedule: awaitEvent() in threaded RTS");
-#endif
+#else
awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
+#endif
}
}
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock(void)
+scheduleDetectDeadlock()
{
+
+#if defined(PARALLEL_HASKELL)
+ // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
+ return;
+#endif
+
/*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
*/
if ( EMPTY_THREAD_QUEUES() )
{
-#if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
+#if defined(RTS_SUPPORTS_THREADS)
+ /*
+ * In the threaded RTS, we only check for deadlock if there
+ * has been no activity in a complete timeslice. This means
+ * we won't eagerly start a full GC just because we don't have
+ * any threads to run currently.
+ */
+ if (recent_activity != ACTIVITY_INACTIVE) return;
+#endif
+
IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
// Garbage collection can release some new threads due to
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- GarbageCollect(GetRoots,rtsTrue);
+
+ scheduleDoGC( rtsTrue/*force major GC*/ );
+ recent_activity = ACTIVITY_DONE_GC;
if ( !EMPTY_RUN_QUEUE() ) return;
-#if defined(RTS_USER_SIGNALS)
+#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
/* If we have user-installed signal handlers, then wait
* for signals to arrive rather then bombing out with a
* deadlock.
}
#endif
+#if !defined(RTS_SUPPORTS_THREADS)
/* Probably a real deadlock. Send the current main thread the
* Deadlock exception (or in the SMP build, send *all* main
* threads the deadlock exception, since none of them can make
StgMainThread *m;
m = main_threads;
switch (m->tso->why_blocked) {
+ case BlockedOnSTM:
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
barf("deadlock: main thread blocked in a strange way");
}
}
-
-#elif defined(RTS_SUPPORTS_THREADS)
- // ToDo: add deadlock detection in threaded RTS
-#elif defined(PARALLEL_HASKELL)
- // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
#endif
}
}
blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
IF_DEBUG(scheduler,
- debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n",
+ debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
(long)t->id, whatNext_strs[t->what_next], blocks));
- // don't do this if it would push us over the
- // alloc_blocks_lim limit; we'll GC first.
- if (alloc_blocks + blocks < alloc_blocks_lim) {
+ // don't do this if the nursery is (nearly) full, we'll GC first.
+ if (cap->r.rCurrentNursery->link != NULL ||
+ cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
+ // if the nursery has only one block.
- alloc_blocks += blocks;
bd = allocGroup( blocks );
+ cap->r.rNursery->n_blocks += blocks;
// link the new group into the list
bd->link = cap->r.rCurrentNursery;
if (cap->r.rCurrentNursery->u.back != NULL) {
cap->r.rCurrentNursery->u.back->link = bd;
} else {
+#if !defined(SMP)
ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
- g0s0->blocks == cap->r.rNursery);
- cap->r.rNursery = g0s0->blocks = bd;
+ g0s0 == cap->r.rNursery);
+#endif
+ cap->r.rNursery->blocks = bd;
}
cap->r.rCurrentNursery->u.back = bd;
{
bdescr *x;
for (x = bd; x < bd + blocks; x++) {
- x->step = g0s0;
+ x->step = cap->r.rNursery;
x->gen_no = 0;
x->flags = 0;
}
}
- // don't forget to update the block count in g0s0.
- g0s0->n_blocks += blocks;
// This assert can be a killer if the app is doing lots
// of large block allocations.
- ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
+ IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
// now update the nursery to point to the new block
cap->r.rCurrentNursery = bd;
}
}
- /* make all the running tasks block on a condition variable,
- * maybe set context_switch and wait till they all pile in,
- * then have them wait on a GC condition variable.
- */
IF_DEBUG(scheduler,
debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n",
(long)t->id, whatNext_strs[t->what_next]));
- threadPaused(t);
#if defined(GRAN)
ASSERT(!is_on_queue(t,CurrentProc));
#elif defined(PARALLEL_HASKELL)
/* just adjust the stack for this thread, then pop it back
* on the run queue.
*/
- threadPaused(t);
{
/* enlarge the stack */
StgTSO *new_t = threadStackOverflow(t);
return rtsTrue;
}
- threadPaused(t);
-
#if defined(GRAN)
ASSERT(!is_on_queue(t,CurrentProc));
emitSchedule = rtsTrue;
#else /* !GRAN */
- /* don't need to do anything. Either the thread is blocked on
- * I/O, in which case we'll have called addToBlockedQueue
- * previously, or it's blocked on an MVar or Blackhole, in which
- * case it'll be on the relevant queue already.
- */
+
+ // We don't need to do anything. The thread is blocked, and it
+ // has tidied up its stack and placed itself on whatever queue
+ // it needs to be on.
+
+#if !defined(SMP)
ASSERT(t->why_blocked != NotBlocked);
+ // This might not be true under SMP: we don't have
+ // exclusive access to this TSO, so someone might have
+ // woken it up by now. This actually happens: try
+ // conc023 +RTS -N2.
+#endif
+
IF_DEBUG(scheduler,
debugBelch("--<< thread %d (%s) stopped: ",
t->id, whatNext_strs[t->what_next]);
removeThreadLabel((StgWord)mainThread->tso->id);
#endif
if (mainThread->prev == NULL) {
+ ASSERT(mainThread == main_threads);
main_threads = mainThread->link;
} else {
mainThread->prev->link = mainThread->link;
}
if (mainThread->link != NULL) {
- mainThread->link->prev = NULL;
+ mainThread->link->prev = mainThread->prev;
}
releaseCapability(cap);
return rtsTrue; // tells schedule() to return
* Perform a heap census, if PROFILING
* -------------------------------------------------------------------------- */
-static void
-scheduleDoHeapProfile(void)
+static rtsBool
+scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
{
-#ifdef PROFILING
+#if defined(PROFILING)
// When we have +RTS -i0 and we're heap profiling, do a census at
// every GC. This lets us get repeatable runs for debugging.
if (performHeapProfile ||
GarbageCollect(GetRoots, rtsTrue);
heapCensus();
performHeapProfile = rtsFalse;
- ready_to_gc = rtsFalse; // we already GC'd
+ return rtsTrue; // true <=> we already GC'd
}
#endif
+ return rtsFalse;
}
/* -----------------------------------------------------------------------------
* -------------------------------------------------------------------------- */
static void
-scheduleDoGC(void)
+scheduleDoGC( rtsBool force_major )
{
StgTSO *t;
+#ifdef SMP
+ Capability *cap;
+ static rtsBool waiting_for_gc;
+ int n_capabilities = RtsFlags.ParFlags.nNodes - 1;
+ // subtract one because we're already holding one.
+ Capability *caps[n_capabilities];
+#endif
#ifdef SMP
- // The last task to stop actually gets to do the GC. The rest
- // of the tasks release their capabilities and wait gc_pending_cond.
- if (ready_to_gc && allFreeCapabilities())
-#else
- if (ready_to_gc)
+ // In order to GC, there must be no threads running Haskell code.
+ // Therefore, the GC thread needs to hold *all* the capabilities,
+ // and release them after the GC has completed.
+ //
+ // This seems to be the simplest way: previous attempts involved
+ // making all the threads with capabilities give up their
+ // capabilities and sleep except for the *last* one, which
+ // actually did the GC. But it's quite hard to arrange for all
+ // the other tasks to sleep and stay asleep.
+ //
+ // This does mean that there will be multiple entries in the
+ // thread->capability hash table for the current thread, but
+ // they will be removed as normal when the capabilities are
+ // released again.
+ //
+
+ // Someone else is already trying to GC
+ if (waiting_for_gc) return;
+ waiting_for_gc = rtsTrue;
+
+ while (n_capabilities > 0) {
+ IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
+ waitForReturnCapability(&sched_mutex, &cap);
+ n_capabilities--;
+ caps[n_capabilities] = cap;
+ }
+
+ waiting_for_gc = rtsFalse;
#endif
- {
- /* Kick any transactions which are invalid back to their
- * atomically frames. When next scheduled they will try to
- * commit, this commit will fail and they will retry.
- */
- for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
- if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
- if (!stmValidateTransaction (t -> trec)) {
- IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
-
- // strip the stack back to the ATOMICALLY_FRAME, aborting
- // the (nested) transaction, and saving the stack of any
- // partially-evaluated thunks on the heap.
- raiseAsync_(t, NULL, rtsTrue);
-
+
+ /* Kick any transactions which are invalid back to their
+ * atomically frames. When next scheduled they will try to
+ * commit, this commit will fail and they will retry.
+ */
+ {
+ StgTSO *next;
+
+ for (t = all_threads; t != END_TSO_QUEUE; t = next) {
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
+ if (!stmValidateNestOfTransactions (t -> trec)) {
+ IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
+
+ // strip the stack back to the ATOMICALLY_FRAME, aborting
+ // the (nested) transaction, and saving the stack of any
+ // partially-evaluated thunks on the heap.
+ raiseAsync_(t, NULL, rtsTrue);
+
#ifdef REG_R1
- ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
+ ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
#endif
+ }
}
}
}
+ }
+
+ // so this happens periodically:
+ scheduleCheckBlackHoles();
+
+ IF_DEBUG(scheduler, printAllThreads());
- // so this happens periodically:
- scheduleCheckBlackHoles();
-
- /* everybody back, start the GC.
- * Could do it in this thread, or signal a condition var
- * to do it in another thread. Either way, we need to
- * broadcast on gc_pending_cond afterward.
- */
+ /* everybody back, start the GC.
+ * Could do it in this thread, or signal a condition var
+ * to do it in another thread. Either way, we need to
+ * broadcast on gc_pending_cond afterward.
+ */
#if defined(RTS_SUPPORTS_THREADS)
- IF_DEBUG(scheduler,sched_belch("doing GC"));
+ IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
- GarbageCollect(GetRoots,rtsFalse);
- ready_to_gc = rtsFalse;
+ GarbageCollect(GetRoots, force_major);
+
#if defined(SMP)
- broadcastCondition(&gc_pending_cond);
+ {
+ // release our stash of capabilities.
+ nat i;
+ for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
+ releaseCapability(caps[i]);
+ }
+ }
#endif
+
#if defined(GRAN)
- /* add a ContinueThread event to continue execution of current thread */
- new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
- ContinueThread,
- t, (StgClosure*)NULL, (rtsSpark*)NULL);
- IF_GRAN_DEBUG(bq,
- debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
- G_EVENTQ(0);
- G_CURR_THREADQ(0));
+ /* add a ContinueThread event to continue execution of current thread */
+ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
+ ContinueThread,
+ t, (StgClosure*)NULL, (rtsSpark*)NULL);
+ IF_GRAN_DEBUG(bq,
+ debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
+ G_EVENTQ(0);
+ G_CURR_THREADQ(0));
#endif /* GRAN */
- }
}
/* ---------------------------------------------------------------------------
StgBool
rtsSupportsBoundThreads(void)
{
-#ifdef THREADED_RTS
+#if defined(RTS_SUPPORTS_THREADS)
return rtsTrue;
#else
return rtsFalse;
StgBool
isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
{
-#ifdef THREADED_RTS
+#if defined(RTS_SUPPORTS_THREADS)
return (tso->main != NULL);
#endif
return rtsFalse;
StgTSO* t, *next;
IF_DEBUG(scheduler,sched_belch("deleting all threads"));
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
- next = t->global_link;
- deleteThread(t);
+ if (t->what_next == ThreadRelocated) {
+ next = t->link;
+ } else {
+ next = t->global_link;
+ deleteThread(t);
+ }
}
// The run queue now contains a bunch of ThreadKilled threads. We
tok = cap->r.rCurrentTSO->id;
/* Hand back capability */
+ cap->r.rInHaskell = rtsFalse;
releaseCapability(cap);
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
#endif
- in_haskell = rtsFalse;
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
tso->why_blocked = NotBlocked;
cap->r.rCurrentTSO = tso;
- in_haskell = rtsTrue;
+ cap->r.rInHaskell = rtsTrue;
RELEASE_LOCK(&sched_mutex);
errno = saved_errno;
return &cap->r;
initCapabilities();
#if defined(RTS_SUPPORTS_THREADS)
- /* start our haskell execution tasks */
- startTaskManager(0,taskStart);
+ initTaskManager();
+#endif
+
+#if defined(SMP)
+ /* eagerly start some extra workers */
+ startingWorkerThread = RtsFlags.ParFlags.nNodes;
+ startTasks(RtsFlags.ParFlags.nNodes, taskStart);
#endif
#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
void
exitScheduler( void )
{
+ interrupted = rtsTrue;
+ shutting_down_scheduler = rtsTrue;
#if defined(RTS_SUPPORTS_THREADS)
- stopTaskManager();
+ if (threadIsTask(osThreadId())) { taskStop(); }
+ stopTaskManager();
#endif
- interrupted = rtsTrue;
- shutting_down_scheduler = rtsTrue;
}
/* ----------------------------------------------------------------------------
#endif
#if defined(GRAN)
-static StgBlockingQueueElement *
+StgBlockingQueueElement *
unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgTSO *tso;
tso->id, tso));
}
#elif defined(PARALLEL_HASKELL)
-static StgBlockingQueueElement *
+StgBlockingQueueElement *
unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
{
StgBlockingQueueElement *next;
}
#else /* !GRAN && !PARALLEL_HASKELL */
-static StgTSO *
+StgTSO *
unblockOneLocked(StgTSO *tso)
{
StgTSO *next;
void
awakenBlockedQueueNoLock(StgTSO *tso)
{
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
while (tso != END_TSO_QUEUE) {
tso = unblockOneLocked(tso);
}
void
awakenBlockedQueue(StgTSO *tso)
{
+ if (tso == NULL) return; // hack; see bug #1235728, and comments in
+ // Exception.cmm
ACQUIRE_LOCK(&sched_mutex);
while (tso != END_TSO_QUEUE) {
tso = unblockOneLocked(tso);
{
interrupted = 1;
context_switch = 1;
+ threadRunnable();
+ /* ToDo: if invoked from a signal handler, this threadRunnable
+ * only works if there's another thread (not this one) waiting to
+ * be woken up.
+ */
}
/* -----------------------------------------------------------------------------
blocked_queue_tl = (StgTSO *)prev;
}
}
+#if defined(mingw32_HOST_OS)
+ /* (Cooperatively) signal that the worker thread should abort
+ * the request.
+ */
+ abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
goto done;
}
}
blocked_queue_tl = prev;
}
}
+#if defined(mingw32_HOST_OS)
+ /* (Cooperatively) signal that the worker thread should abort
+ * the request.
+ */
+ abandonWorkRequest(tso->block_info.async_result->reqID);
+#endif
goto done;
}
}
#ifdef PROFILING
StgCatchFrame *cf = (StgCatchFrame *)frame;
#endif
- StgClosure *raise;
+ StgThunk *raise;
// we've got an exception to raise, so let's pass it to the
// handler in this frame.
//
- raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
+ raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
TICK_ALLOC_SE_THK(1,0);
SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
raise->payload[0] = exception;
// fun field.
//
words = frame - sp - 1;
- ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
+ ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
ap->size = words;
ap->fun = (StgClosure *)sp[0];
StgWord
raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
{
- StgClosure *raise_closure = NULL;
+ StgThunk *raise_closure = NULL;
StgPtr p, next;
StgRetInfoTable *info;
//
// Only create raise_closure if we need to.
if (raise_closure == NULL) {
raise_closure =
- (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
+ (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
+ UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
p = next;
continue;
{
switch (tso->why_blocked) {
case BlockedOnRead:
- debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
break;
case BlockedOnWrite:
- debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
+ debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
break;
#if defined(mingw32_HOST_OS)
case BlockedOnDoProc:
break;
#endif
case BlockedOnDelay:
- debugBelch("is blocked until %ld", tso->block_info.target);
+ debugBelch("is blocked until %ld", (long)(tso->block_info.target));
break;
case BlockedOnMVar:
- debugBelch("is blocked on an MVar");
+ debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
break;
case BlockedOnException:
debugBelch("is blocked on delivering an exception to thread %d",
debugBelch("all threads:\n");
# endif
- for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
- debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ for (t = all_threads; t != END_TSO_QUEUE; ) {
+ debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
#if defined(DEBUG)
{
void *label = lookupThreadLabel(t->id);
if (label) debugBelch("[\"%s\"] ",(char *)label);
}
#endif
- printThreadStatus(t);
- debugBelch("\n");
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ t = t->link;
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ t = t->global_link;
+ }
}
}
-
+
#ifdef DEBUG
+// useful from gdb
+void
+printThreadQueue(StgTSO *t)
+{
+ nat i = 0;
+ for (; t != END_TSO_QUEUE; t = t->link) {
+ debugBelch("\tthread %d @ %p ", t->id, (void *)t);
+ if (t->what_next == ThreadRelocated) {
+ debugBelch("has been relocated...\n");
+ } else {
+ printThreadStatus(t);
+ debugBelch("\n");
+ }
+ i++;
+ }
+ debugBelch("%d threads on queue\n", i);
+}
+
/*
Print a whole blocking queue attached to node (debugging only).
*/