#include "Interpreter.h"
#include "Printer.h"
#include "RtsSignals.h"
-#include "Sanity.h"
+#include "sm/Sanity.h"
#include "Stats.h"
#include "STM.h"
#include "Prelude.h"
#include "Threads.h"
#include "Timer.h"
#include "ThreadPaused.h"
+#include "Messages.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#include <errno.h>
#endif
+#ifdef TRACING
+#include "eventlog/EventLog.h"
+#endif
/* -----------------------------------------------------------------------------
* Global variables
* -------------------------------------------------------------------------- */
StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table?
#endif
-/* Threads blocked on blackholes.
- * LOCK: sched_mutex+capability, or all capabilities
- */
-StgTSO *blackhole_queue = NULL;
-
-/* The blackhole_queue should be checked for threads to wake up. See
- * Schedule.h for more thorough comment.
- * LOCK: none (doesn't matter if we miss an update)
- */
-rtsBool blackholes_need_checking = rtsFalse;
-
/* Set to true when the latest garbage collection failed to reclaim
* enough space, and the runtime should proceed to shut itself down in
* an orderly fashion (emitting profiling info etc.)
static void schedulePreLoop (void);
static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
-static void scheduleYield (Capability **pcap, Task *task, rtsBool);
+static void scheduleYield (Capability **pcap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
-static void scheduleCheckBlackHoles (Capability *cap);
+static void scheduleProcessInbox(Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
-static void scheduleHandleStackOverflow( Capability *cap, Task *task,
- StgTSO *t);
-static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
+static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t,
nat prev_what_next );
static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
static Capability *scheduleDoGC(Capability *cap, Task *task,
rtsBool force_major);
-static rtsBool checkBlackHoles(Capability *cap);
-
-static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
-static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
-
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteAllThreads (Capability *cap);
static void deleteThread_(Capability *cap, StgTSO *tso);
#endif
-/* -----------------------------------------------------------------------------
- * Putting a thread on the run queue: different scheduling policies
- * -------------------------------------------------------------------------- */
-
-STATIC_INLINE void
-addToRunQueue( Capability *cap, StgTSO *t )
-{
- // this does round-robin scheduling; good for concurrency
- appendToRunQueue(cap,t);
-}
-
/* ---------------------------------------------------------------------------
Main scheduling loop.
rtsBool ready_to_gc;
#if defined(THREADED_RTS)
rtsBool first = rtsTrue;
- rtsBool force_yield = rtsFalse;
#endif
cap = initialCapability;
// If we are a worker, just exit. If we're a bound thread
// then we will exit below when we've removed our TSO from
// the run queue.
- if (task->tso == NULL && emptyRunQueue(cap)) {
+ if (!isBoundTask(task) && emptyRunQueue(cap)) {
return cap;
}
break;
// ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
}
- yield:
- scheduleYield(&cap,task,force_yield);
- force_yield = rtsFalse;
+ scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
#endif
// Check whether we can run this thread in the current task.
// If not, we have to pass our capability to the right task.
{
- Task *bound = t->bound;
+ InCall *bound = t->bound;
if (bound) {
- if (bound == task) {
+ if (bound->task == task) {
// yes, the Haskell thread is bound to the current native thread
} else {
debugTrace(DEBUG_sched,
}
} else {
// The thread we want to run is unbound.
- if (task->tso) {
+ if (task->incall->tso) {
debugTrace(DEBUG_sched,
"this OS thread cannot run thread %lu",
(unsigned long)t->id);
startHeapProfTimer();
- // Check for exceptions blocked on this thread
- maybePerformBlockedException (cap, t);
-
// ----------------------------------------------------------------------
// Run the current thread
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
- ASSERT(t->bound ? t->bound->cap == cap : 1);
+ ASSERT(t->bound ? t->bound->task->cap == cap : 1);
prev_what_next = t->what_next;
cap->in_haskell = rtsTrue;
dirty_TSO(cap,t);
+ dirty_STACK(cap,t->stackobj);
#if defined(THREADED_RTS)
if (recent_activity == ACTIVITY_DONE_GC) {
if (prev == ACTIVITY_DONE_GC) {
startTimer();
}
- } else {
+ } else if (recent_activity != ACTIVITY_INACTIVE) {
+ // If we reached ACTIVITY_INACTIVE, then don't reset it until
+ // we've done the GC. The thread running here might just be
+ // the IO manager thread that handle_tick() woke up via
+ // wakeUpRts().
recent_activity = ACTIVITY_YES;
}
#endif
- traceSchedEvent(cap, EVENT_RUN_THREAD, t, 0);
+ traceEventRunThread(cap, t);
switch (prev_what_next) {
// happened. So find the new location:
t = cap->r.rCurrentTSO;
- // We have run some Haskell code: there might be blackhole-blocked
- // threads to wake up now.
- // Lock-free test here should be ok, we're just setting a flag.
- if ( blackhole_queue != END_TSO_QUEUE ) {
- blackholes_need_checking = rtsTrue;
- }
-
// And save the current errno in this thread.
// XXX: possibly bogus for SMP because this thread might already
// be running again, see code below.
t->saved_winerror = GetLastError();
#endif
- traceSchedEvent (cap, EVENT_STOP_THREAD, t, ret);
-
-#if defined(THREADED_RTS)
- // If ret is ThreadBlocked, and this Task is bound to the TSO that
- // blocked, we are in limbo - the TSO is now owned by whatever it
- // is blocked on, and may in fact already have been woken up,
- // perhaps even on a different Capability. It may be the case
- // that task->cap != cap. We better yield this Capability
- // immediately and return to normaility.
if (ret == ThreadBlocked) {
- force_yield = rtsTrue;
- goto yield;
+ if (t->why_blocked == BlockedOnBlackHole) {
+ StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
+ traceEventStopThread(cap, t, t->why_blocked + 6,
+ owner != NULL ? owner->id : 0);
+ } else {
+ traceEventStopThread(cap, t, t->why_blocked + 6, 0);
+ }
+ } else {
+ traceEventStopThread(cap, t, ret, 0);
}
-#endif
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
schedulePostRunThread(cap,t);
- if (ret != StackOverflow) {
- t = threadStackUnderflow(task,t);
- }
-
ready_to_gc = rtsFalse;
switch (ret) {
break;
case StackOverflow:
- scheduleHandleStackOverflow(cap,task,t);
- break;
+ // just adjust the stack for this thread, then pop it back
+ // on the run queue.
+ threadStackOverflow(cap, t);
+ pushOnRunQueue(cap,t);
+ break;
case ThreadYielding:
if (scheduleHandleYield(cap, t, prev_what_next)) {
} /* end of while() */
}
+/* -----------------------------------------------------------------------------
+ * Run queue operations
+ * -------------------------------------------------------------------------- */
+
+void
+removeFromRunQueue (Capability *cap, StgTSO *tso)
+{
+ if (tso->block_info.prev == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_hd == tso);
+ cap->run_queue_hd = tso->_link;
+ } else {
+ setTSOLink(cap, tso->block_info.prev, tso->_link);
+ }
+ if (tso->_link == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_tl == tso);
+ cap->run_queue_tl = tso->block_info.prev;
+ } else {
+ setTSOPrev(cap, tso->_link, tso->block_info.prev);
+ }
+ tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
+}
+
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
* ------------------------------------------------------------------------- */
{
scheduleStartSignalHandlers(cap);
- // Only check the black holes here if we've nothing else to do.
- // During normal execution, the black hole list only gets checked
- // at GC time, to avoid repeatedly traversing this possibly long
- // list each time around the scheduler.
- if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
- scheduleCheckWakeupThreads(cap);
+ scheduleProcessInbox(cap);
scheduleCheckBlockedThreads(cap);
// and this task it bound).
return (waiting_for_gc ||
cap->returning_tasks_hd != NULL ||
- (!emptyRunQueue(cap) && (task->tso == NULL
+ (!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
- : cap->run_queue_hd->bound != task)));
+ : cap->run_queue_hd->bound != task->incall)));
}
// This is the single place where a Task goes to sleep. There are
// and also check the benchmarks in nofib/parallel for regressions.
static void
-scheduleYield (Capability **pcap, Task *task, rtsBool force_yield)
+scheduleYield (Capability **pcap, Task *task)
{
Capability *cap = *pcap;
// if we have work, and we don't need to give up the Capability, continue.
//
- // The force_yield flag is used when a bound thread blocks. This
- // is a particularly tricky situation: the current Task does not
- // own the TSO any more, since it is on some queue somewhere, and
- // might be woken up or manipulated by another thread at any time.
- // The TSO and Task might be migrated to another Capability.
- // Certain invariants might be in doubt, such as task->bound->cap
- // == cap. We have to yield the current Capability immediately,
- // no messing around.
- //
- if (!force_yield &&
- !shouldYieldCapability(cap,task) &&
+ if (!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
- !emptyWakeupQueue(cap) ||
- blackholes_need_checking ||
+ !emptyInbox(cap) ||
sched_state >= SCHED_INTERRUPTING))
return;
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
- // migration can be turned off with +RTS -qg
+ // migration can be turned off with +RTS -qm
if (!RtsFlags.ParFlags.migrate) return;
// Check whether we have more threads on our run queue, or sparks
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
if (cap != cap0 && tryGrabCapability(cap0,task)) {
- if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
+ if (!emptyRunQueue(cap0)
+ || cap->returning_tasks_hd != NULL
+ || cap->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
for (; t != END_TSO_QUEUE; t = next) {
next = t->_link;
t->_link = END_TSO_QUEUE;
- if (t->what_next == ThreadRelocated
- || t->bound == task // don't move my bound thread
+ if (t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else if (i == n_free_caps) {
pushed_to_all = rtsTrue;
i = 0;
// keep one for us
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else {
- debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
appendToRunQueue(free_caps[i],t);
- traceSchedEvent (cap, EVENT_MIGRATE_THREAD, t, free_caps[i]->no);
+ traceEventMigrateThread (cap, t, free_caps[i]->no);
- if (t->bound) { t->bound->cap = free_caps[i]; }
+ if (t->bound) { t->bound->task->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
}
}
cap->run_queue_tl = prev;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
}
#ifdef SPARK_PUSHING
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
- traceSchedEvent(free_caps[i], EVENT_STEAL_SPARK, t, cap->no);
+ traceEventStealSpark(free_caps[i], t, cap->no);
newSpark(&(free_caps[i]->r), spark);
}
//
if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
{
- awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
- }
-#endif
-}
-
-
-/* ----------------------------------------------------------------------------
- * Check for threads woken up by other Capabilities
- * ------------------------------------------------------------------------- */
-
-static void
-scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
-{
-#if defined(THREADED_RTS)
- // Any threads that were woken up by other Capabilities get
- // appended to our run queue.
- if (!emptyWakeupQueue(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- if (emptyRunQueue(cap)) {
- cap->run_queue_hd = cap->wakeup_queue_hd;
- cap->run_queue_tl = cap->wakeup_queue_tl;
- } else {
- setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
- cap->run_queue_tl = cap->wakeup_queue_tl;
- }
- cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
- RELEASE_LOCK(&cap->lock);
+ awaitEvent (emptyRunQueue(cap));
}
#endif
}
/* ----------------------------------------------------------------------------
- * Check for threads blocked on BLACKHOLEs that can be woken up
- * ------------------------------------------------------------------------- */
-static void
-scheduleCheckBlackHoles (Capability *cap)
-{
- if ( blackholes_need_checking ) // check without the lock first
- {
- ACQUIRE_LOCK(&sched_mutex);
- if ( blackholes_need_checking ) {
- blackholes_need_checking = rtsFalse;
- // important that we reset the flag *before* checking the
- // blackhole queue, otherwise we could get deadlock. This
- // happens as follows: we wake up a thread that
- // immediately runs on another Capability, blocks on a
- // blackhole, and then we reset the blackholes_need_checking flag.
- checkBlackHoles(cap);
- }
- RELEASE_LOCK(&sched_mutex);
- }
-}
-
-/* ----------------------------------------------------------------------------
* Detect deadlock conditions and attempt to resolve them.
* ------------------------------------------------------------------------- */
/* Probably a real deadlock. Send the current main thread the
* Deadlock exception.
*/
- if (task->tso) {
- switch (task->tso->why_blocked) {
+ if (task->incall->tso) {
+ switch (task->incall->tso->why_blocked) {
case BlockedOnSTM:
case BlockedOnBlackHole:
- case BlockedOnException:
+ case BlockedOnMsgThrowTo:
case BlockedOnMVar:
- throwToSingleThreaded(cap, task->tso,
+ throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
default:
#endif
/* ----------------------------------------------------------------------------
+ * Process message in the current Capability's inbox
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ Message *m, *next;
+ int r;
+
+ while (!emptyInbox(cap)) {
+ if (cap->r.rCurrentNursery->link == NULL ||
+ g0->n_new_large_words >= large_alloc_lim) {
+ scheduleDoGC(cap, cap->running_task, rtsFalse);
+ }
+
+ // don't use a blocking acquire; if the lock is held by
+ // another thread then just carry on. This seems to avoid
+ // getting stuck in a message ping-pong situation with other
+ // processors. We'll check the inbox again later anyway.
+ //
+ // We should really use a more efficient queue data structure
+ // here. The trickiness is that we must ensure a Capability
+ // never goes idle if the inbox is non-empty, which is why we
+ // use cap->lock (cap->lock is released as the last thing
+ // before going idle; see Capability.c:releaseCapability()).
+ r = TRY_ACQUIRE_LOCK(&cap->lock);
+ if (r != 0) return;
+
+ m = cap->inbox;
+ cap->inbox = (Message*)END_TSO_QUEUE;
+
+ RELEASE_LOCK(&cap->lock);
+
+ while (m != (Message*)END_TSO_QUEUE) {
+ next = m->link;
+ executeMessage(cap, m);
+ m = next;
+ }
+ }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
* Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
+ if (blocks > BLOCKS_PER_MBLOCK) {
+ barf("allocation of %ld bytes too large (GHC should have complained at compile-time)", (long)cap->r.rHpAlloc);
+ }
+
debugTrace(DEBUG_sched,
"--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n",
(long)t->id, what_next_strs[t->what_next], blocks);
cap->r.rNursery->n_blocks == 1) { // paranoia to prevent infinite loop
// if the nursery has only one block.
- ACQUIRE_SM_LOCK
- bd = allocGroup( blocks );
- RELEASE_SM_LOCK
- cap->r.rNursery->n_blocks += blocks;
+ bd = allocGroup_lock(blocks);
+ cap->r.rNursery->n_blocks += blocks;
// link the new group into the list
bd->link = cap->r.rCurrentNursery;
{
bdescr *x;
for (x = bd; x < bd + blocks; x++) {
- x->step = cap->r.rNursery;
- x->gen_no = 0;
+ initBdescr(x,g0,g0);
+ x->free = x->start;
x->flags = 0;
}
}
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
cap->context_switch = 0;
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
}
}
/* -----------------------------------------------------------------------------
- * Handle a thread that returned to the scheduler with ThreadStackOverflow
- * -------------------------------------------------------------------------- */
-
-static void
-scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
-{
- /* just adjust the stack for this thread, then pop it back
- * on the run queue.
- */
- {
- /* enlarge the stack */
- StgTSO *new_t = threadStackOverflow(cap, t);
-
- /* The TSO attached to this Task may have moved, so update the
- * pointer to it.
- */
- if (task->tso == t) {
- task->tso = new_t;
- }
- pushOnRunQueue(cap,new_t);
- }
-}
-
-/* -----------------------------------------------------------------------------
* Handle a thread that returned to the scheduler with ThreadYielding
* -------------------------------------------------------------------------- */
static rtsBool
scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
{
- // Reset the context switch flag. We don't do this just before
- // running the thread, because that would mean we would lose ticks
- // during GC, which can lead to unfair scheduling (a thread hogs
- // the CPU because the tick always arrives during GC). This way
- // penalises threads that do a lot of allocation, but that seems
- // better than the alternative.
- cap->context_switch = 0;
-
/* put the thread back on the run queue. Then, if we're ready to
* GC, check whether this is the last task to stop. If so, wake
* up the GC thread. getThread will block during a GC until the
* GC is finished.
*/
-#ifdef DEBUG
- if (t->what_next != prev_what_next) {
+
+ ASSERT(t->_link == END_TSO_QUEUE);
+
+ // Shortcut if we're just switching evaluators: don't bother
+ // doing stack squeezing (which can be expensive), just run the
+ // thread.
+ if (cap->context_switch == 0 && t->what_next != prev_what_next) {
debugTrace(DEBUG_sched,
"--<< thread %ld (%s) stopped to switch evaluators",
(long)t->id, what_next_strs[t->what_next]);
+ return rtsTrue;
}
-#endif
+
+ // Reset the context switch flag. We don't do this just before
+ // running the thread, because that would mean we would lose ticks
+ // during GC, which can lead to unfair scheduling (a thread hogs
+ // the CPU because the tick always arrives during GC). This way
+ // penalises threads that do a lot of allocation, but that seems
+ // better than the alternative.
+ cap->context_switch = 0;
IF_DEBUG(sanity,
//debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
checkTSO(t));
- ASSERT(t->_link == END_TSO_QUEUE);
-
- // Shortcut if we're just switching evaluators: don't bother
- // doing stack squeezing (which can be expensive), just run the
- // thread.
- if (t->what_next != prev_what_next) {
- return rtsTrue;
- }
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
return rtsFalse;
}
// ASSERT(t->why_blocked != NotBlocked);
// Not true: for example,
- // - in THREADED_RTS, the thread may already have been woken
- // up by another Capability. This actually happens: try
- // conc023 +RTS -N2.
// - the thread may have woken itself up already, because
// threadPaused() might have raised a blocked throwTo
// exception, see maybePerformBlockedException().
*/
// blocked exceptions can now complete, even if the thread was in
- // blocked mode (see #2910). This unconditionally calls
- // lockTSO(), which ensures that we don't miss any threads that
- // are engaged in throwTo() with this thread as a target.
+ // blocked mode (see #2910).
awakenBlockedExceptionQueue (cap, t);
//
if (t->bound) {
- if (t->bound != task) {
+ if (t->bound != task->incall) {
#if !defined(THREADED_RTS)
// Must be a bound thread that is not the topmost one. Leave
// it on the run queue until the stack has unwound to the
#endif
}
- ASSERT(task->tso == t);
+ ASSERT(task->incall->tso == t);
if (t->what_next == ThreadComplete) {
- if (task->ret) {
- // NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(task->ret) = (StgClosure *)task->tso->sp[1];
+ if (task->incall->ret) {
+ // NOTE: return val is stack->sp[1] (see StgStartup.hc)
+ *(task->incall->ret) = (StgClosure *)task->incall->tso->stackobj->sp[1];
}
- task->stat = Success;
+ task->incall->stat = Success;
} else {
- if (task->ret) {
- *(task->ret) = NULL;
+ if (task->incall->ret) {
+ *(task->incall->ret) = NULL;
}
if (sched_state >= SCHED_INTERRUPTING) {
if (heap_overflow) {
- task->stat = HeapExhausted;
+ task->incall->stat = HeapExhausted;
} else {
- task->stat = Interrupted;
+ task->incall->stat = Interrupted;
}
} else {
- task->stat = Killed;
+ task->incall->stat = Killed;
}
}
#ifdef DEBUG
- removeThreadLabel((StgWord)task->tso->id);
+ removeThreadLabel((StgWord)task->incall->tso->id);
#endif
+
+ // We no longer consider this thread and task to be bound to
+ // each other. The TSO lives on until it is GC'd, but the
+ // task is about to be released by the caller, and we don't
+ // want anyone following the pointer from the TSO to the
+ // defunct task (which might have already been
+ // re-used). This was a real bug: the GC updated
+ // tso->bound->tso which lead to a deadlock.
+ t->bound = NULL;
+ task->incall->tso = NULL;
+
return rtsTrue; // tells schedule() to return
}
if (sched_state < SCHED_INTERRUPTING
&& RtsFlags.ParFlags.parGcEnabled
&& N >= RtsFlags.ParFlags.parGcGen
- && ! oldest_gen->steps[0].mark)
+ && ! oldest_gen->mark)
{
gc_type = PENDING_GC_PAR;
} else {
if (gc_type == PENDING_GC_SEQ)
{
- traceSchedEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+ traceEventRequestSeqGc(cap);
+ }
+ else
+ {
+ traceEventRequestParGc(cap);
+ debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
+ }
+
+ if (gc_type == PENDING_GC_SEQ)
+ {
// single-threaded GC: grab all the capabilities
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
{
// multi-threaded GC: make sure all the Capabilities donate one
// GC thread each.
- traceSchedEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
-
waitForGcThreads(cap);
}
+
#endif
- // so this happens periodically:
- if (cap) scheduleCheckBlackHoles(cap);
-
IF_DEBUG(scheduler, printAllThreads());
delete_threads_and_gc:
heap_census = scheduleNeedHeapProfile(rtsTrue);
+ traceEventGcStart(cap);
#if defined(THREADED_RTS)
- traceSchedEvent(cap, EVENT_GC_START, 0, 0);
// reset waiting_for_gc *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
waiting_for_gc = 0;
#else
GarbageCollect(force_major || heap_census, 0, cap);
#endif
- traceSchedEvent(cap, EVENT_GC_END, 0, 0);
+ traceEventGcEnd(cap);
if (recent_activity == ACTIVITY_INACTIVE && force_major)
{
recent_activity = ACTIVITY_YES;
}
+ if (heap_census) {
+ debugTrace(DEBUG_sched, "performing heap census");
+ heapCensus();
+ performHeapProfile = rtsFalse;
+ }
+
#if defined(THREADED_RTS)
if (gc_type == PENDING_GC_PAR)
{
}
#endif
- if (heap_census) {
- debugTrace(DEBUG_sched, "performing heap census");
- heapCensus();
- performHeapProfile = rtsFalse;
- }
-
if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
// GC set the heap_overflow flag, so we should proceed with
// an orderly shutdown now. Ultimately we want the main
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
- Task *task;
pid_t pid;
StgTSO* t,*next;
Capability *cap;
- nat s;
+ nat g;
#if defined(THREADED_RTS)
if (RtsFlags.ParFlags.nNodes > 1) {
ACQUIRE_LOCK(&cap->lock);
ACQUIRE_LOCK(&cap->running_task->lock);
+ stopTimer(); // See #4074
+
+#if defined(TRACING)
+ flushEventLog(); // so that child won't inherit dirty file buffers
+#endif
+
pid = fork();
if (pid) { // parent
+ startTimer(); // #4074
+
RELEASE_LOCK(&sched_mutex);
RELEASE_LOCK(&cap->lock);
RELEASE_LOCK(&cap->running_task->lock);
initMutex(&cap->running_task->lock);
#endif
- // Now, all OS threads except the thread that forked are
+#ifdef TRACING
+ resetTracing();
+#endif
+
+ // Now, all OS threads except the thread that forked are
// stopped. We need to stop all Haskell threads, including
// those involved in foreign calls. Also we need to delete
// all Tasks, because they correspond to OS threads that are
// now gone.
- for (s = 0; s < total_steps; s++) {
- for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
- if (t->what_next == ThreadRelocated) {
- next = t->_link;
- } else {
- next = t->global_link;
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
+ next = t->global_link;
// don't allow threads to catch the ThreadKilled
// exception, but we do want to raiseAsync() because these
// threads may be evaluating thunks that we need later.
deleteThread_(cap,t);
- }
+
+ // stop the GC from updating the InCall to point to
+ // the TSO. This is only necessary because the
+ // OSThread bound to the TSO has been killed, and
+ // won't get a chance to exit in the usual way (see
+ // also scheduleHandleThreadFinished).
+ t->bound = NULL;
}
}
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
- cap->suspended_ccalling_tasks = NULL;
+ cap->suspended_ccalls = NULL;
// Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
- for (s = 0; s < total_steps; s++) {
- all_steps[s].threads = END_TSO_QUEUE;
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ generations[g].threads = END_TSO_QUEUE;
}
- // Wipe the task list, except the current Task.
- ACQUIRE_LOCK(&sched_mutex);
- for (task = all_tasks; task != NULL; task=task->all_link) {
- if (task != cap->running_task) {
-#if defined(THREADED_RTS)
- initMutex(&task->lock); // see #1391
-#endif
- discardTask(task);
- }
- }
- RELEASE_LOCK(&sched_mutex);
+ discardTasksExcept(cap->running_task);
#if defined(THREADED_RTS)
// Wipe our spare workers list, they no longer exist. New
// workers will be created if necessary.
cap->spare_workers = NULL;
- cap->returning_tasks_hd = NULL;
+ cap->n_spare_workers = 0;
+ cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
#endif
initTimer();
startTimer();
+#if defined(THREADED_RTS)
+ cap = ioManagerStartCap(cap);
+#endif
+
cap = rts_evalStableIO(cap, entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",cap);
// NOTE: only safe to call if we own all capabilities.
StgTSO* t, *next;
- nat s;
+ nat g;
debugTrace(DEBUG_sched,"deleting all threads");
- for (s = 0; s < total_steps; s++) {
- for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
- if (t->what_next == ThreadRelocated) {
- next = t->_link;
- } else {
- next = t->global_link;
- deleteThread(cap,t);
- }
- }
- }
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) {
+ next = t->global_link;
+ deleteThread(cap,t);
+ }
+ }
// The run queue now contains a bunch of ThreadKilled threads. We
// must not throw these away: the main thread(s) will be in there
}
/* -----------------------------------------------------------------------------
- Managing the suspended_ccalling_tasks list.
+ Managing the suspended_ccalls list.
Locks required: sched_mutex
-------------------------------------------------------------------------- */
STATIC_INLINE void
suspendTask (Capability *cap, Task *task)
{
- ASSERT(task->next == NULL && task->prev == NULL);
- task->next = cap->suspended_ccalling_tasks;
- task->prev = NULL;
- if (cap->suspended_ccalling_tasks) {
- cap->suspended_ccalling_tasks->prev = task;
- }
- cap->suspended_ccalling_tasks = task;
+ InCall *incall;
+
+ incall = task->incall;
+ ASSERT(incall->next == NULL && incall->prev == NULL);
+ incall->next = cap->suspended_ccalls;
+ incall->prev = NULL;
+ if (cap->suspended_ccalls) {
+ cap->suspended_ccalls->prev = incall;
+ }
+ cap->suspended_ccalls = incall;
}
STATIC_INLINE void
recoverSuspendedTask (Capability *cap, Task *task)
{
- if (task->prev) {
- task->prev->next = task->next;
+ InCall *incall;
+
+ incall = task->incall;
+ if (incall->prev) {
+ incall->prev->next = incall->next;
} else {
- ASSERT(cap->suspended_ccalling_tasks == task);
- cap->suspended_ccalling_tasks = task->next;
+ ASSERT(cap->suspended_ccalls == incall);
+ cap->suspended_ccalls = incall->next;
}
- if (task->next) {
- task->next->prev = task->prev;
+ if (incall->next) {
+ incall->next->prev = incall->prev;
}
- task->next = task->prev = NULL;
+ incall->next = incall->prev = NULL;
}
/* ---------------------------------------------------------------------------
* the whole system.
*
* The Haskell thread making the C call is put to sleep for the
- * duration of the call, on the susepended_ccalling_threads queue. We
+ * duration of the call, on the suspended_ccalling_threads queue. We
* give out a token to the task, which it can use to resume the thread
* on return from the C function.
+ *
+ * If this is an interruptible C call, this means that the FFI call may be
+ * unceremoniously terminated and should be scheduled on an
+ * unbound worker thread.
* ------------------------------------------------------------------------- */
void *
-suspendThread (StgRegTable *reg)
+suspendThread (StgRegTable *reg, rtsBool interruptible)
{
Capability *cap;
int saved_errno;
task = cap->running_task;
tso = cap->r.rCurrentTSO;
- traceSchedEvent(cap, EVENT_STOP_THREAD, tso, THREAD_SUSPENDED_FOREIGN_CALL);
+ traceEventStopThread(cap, tso, THREAD_SUSPENDED_FOREIGN_CALL, 0);
// XXX this might not be necessary --SDM
tso->what_next = ThreadRunGHC;
threadPaused(cap,tso);
- if ((tso->flags & TSO_BLOCKEX) == 0) {
- tso->why_blocked = BlockedOnCCall;
- tso->flags |= TSO_BLOCKEX;
- tso->flags &= ~TSO_INTERRUPTIBLE;
+ if (interruptible) {
+ tso->why_blocked = BlockedOnCCall_Interruptible;
} else {
- tso->why_blocked = BlockedOnCCall_NoUnblockExc;
+ tso->why_blocked = BlockedOnCCall;
}
// Hand back capability
- task->suspended_tso = tso;
+ task->incall->suspended_tso = tso;
+ task->incall->suspended_cap = cap;
ACQUIRE_LOCK(&cap->lock);
resumeThread (void *task_)
{
StgTSO *tso;
+ InCall *incall;
Capability *cap;
Task *task = task_;
int saved_errno;
saved_winerror = GetLastError();
#endif
- cap = task->cap;
+ incall = task->incall;
+ cap = incall->suspended_cap;
+ task->cap = cap;
+
// Wait for permission to re-enter the RTS with the result.
waitForReturnCapability(&cap,task);
// we might be on a different capability now... but if so, our
- // entry on the suspended_ccalling_tasks list will also have been
+ // entry on the suspended_ccalls list will also have been
// migrated.
// Remove the thread from the suspended list
recoverSuspendedTask(cap,task);
- tso = task->suspended_tso;
- task->suspended_tso = NULL;
+ tso = incall->suspended_tso;
+ incall->suspended_tso = NULL;
+ incall->suspended_cap = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
- traceSchedEvent(cap, EVENT_RUN_THREAD, tso, tso->what_next);
+ traceEventRunThread(cap, tso);
- if (tso->why_blocked == BlockedOnCCall) {
+ /* Reset blocking status */
+ tso->why_blocked = NotBlocked;
+
+ if ((tso->flags & TSO_BLOCKEX) == 0) {
// avoid locking the TSO if we don't have to
- if (tso->blocked_exceptions != END_TSO_QUEUE) {
- awakenBlockedExceptionQueue(cap,tso);
+ if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
+ maybePerformBlockedException(cap,tso);
}
- tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
}
- /* Reset blocking status */
- tso->why_blocked = NotBlocked;
-
cap->r.rCurrentTSO = tso;
cap->in_haskell = rtsTrue;
errno = saved_errno;
/* We might have GC'd, mark the TSO dirty again */
dirty_TSO(cap,tso);
+ dirty_STACK(cap,tso->stackobj);
IF_DEBUG(sanity, checkTSO(tso));
void
scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
{
-#if defined(THREADED_RTS)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
+#if defined(THREADED_RTS)
cpu %= RtsFlags.ParFlags.nNodes;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
- traceSchedEvent (cap, EVENT_MIGRATE_THREAD, tso, capabilities[cpu].no);
- wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
+ migrateThread(cap, tso, &capabilities[cpu]);
}
#else
appendToRunQueue(cap,tso);
scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
{
Task *task;
+ StgThreadID id;
// We already created/initialised the Task
task = cap->running_task;
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
- tso->bound = task;
+ tso->bound = task->incall;
tso->cap = cap;
- task->tso = tso;
- task->ret = ret;
- task->stat = NoStatus;
+ task->incall->tso = tso;
+ task->incall->ret = ret;
+ task->incall->stat = NoStatus;
appendToRunQueue(cap,tso);
- debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
+ id = tso->id;
+ debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id);
cap = schedule(cap,task);
- ASSERT(task->stat != NoStatus);
+ ASSERT(task->incall->stat != NoStatus);
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
+ debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
return cap;
}
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
-void OSThreadProcAttr
-workerStart(Task *task)
+void scheduleWorker (Capability *cap, Task *task)
{
- Capability *cap;
-
- // See startWorkerTask().
- ACQUIRE_LOCK(&task->lock);
- cap = task->cap;
- RELEASE_LOCK(&task->lock);
-
- if (RtsFlags.ParFlags.setAffinity) {
- setThreadAffinity(cap->no, n_capabilities);
- }
-
- // set the thread-local pointer to the Task:
- taskEnter(task);
-
// schedule() runs without a lock.
cap = schedule(cap,task);
sleeping_queue = END_TSO_QUEUE;
#endif
- blackhole_queue = END_TSO_QUEUE;
-
sched_state = SCHED_RUNNING;
recent_activity = ACTIVITY_YES;
initSparkPools();
#endif
+ RELEASE_LOCK(&sched_mutex);
+
#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
for (i = 1; i < n_capabilities; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
- startWorkerTask(cap, workerStart);
+ startWorkerTask(cap);
RELEASE_LOCK(&cap->lock);
}
}
#endif
-
- RELEASE_LOCK(&sched_mutex);
}
void
-exitScheduler(
- rtsBool wait_foreign
-#if !defined(THREADED_RTS)
- __attribute__((unused))
-#endif
-)
+exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
/* see Capability.c, shutdownCapability() */
{
Task *task = NULL;
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
waitForReturnCapability(&task->cap,task);
- scheduleDoGC(task->cap,task,rtsFalse);
+ scheduleDoGC(task->cap,task,rtsFalse);
+ ASSERT(task->incall->tso == NULL);
releaseCapability(task->cap);
}
sched_state = SCHED_SHUTTING_DOWN;
+ nat i;
+
+ for (i = 0; i < n_capabilities; i++) {
#if defined(THREADED_RTS)
- {
- nat i;
-
- for (i = 0; i < n_capabilities; i++) {
- shutdownCapability(&capabilities[i], task, wait_foreign);
- }
- boundTaskExiting(task);
- }
+ ASSERT(task->incall->tso == NULL);
+ shutdownCapability(&capabilities[i], task, wait_foreign);
#endif
+ traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, i);
+ }
+ traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT);
+
+ boundTaskExiting(task);
}
void
#endif
}
+void markScheduler (evac_fn evac USED_IF_NOT_THREADS,
+ void *user USED_IF_NOT_THREADS)
+{
+#if !defined(THREADED_RTS)
+ evac(user, (StgClosure **)(void *)&blocked_queue_hd);
+ evac(user, (StgClosure **)(void *)&blocked_queue_tl);
+ evac(user, (StgClosure **)(void *)&sleeping_queue);
+#endif
+}
+
/* -----------------------------------------------------------------------------
performGC
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
- // suspended_ccalling_tasks queue.
+ // suspended_ccalls queue.
task = newBoundTask();
waitForReturnCapability(&task->cap,task);
performGC_(rtsTrue);
}
-/* -----------------------------------------------------------------------------
- Stack overflow
-
- If the thread has reached its maximum stack size, then raise the
- StackOverflow exception in the offending thread. Otherwise
- relocate the TSO into a larger chunk of memory and adjust its stack
- size appropriately.
- -------------------------------------------------------------------------- */
-
-static StgTSO *
-threadStackOverflow(Capability *cap, StgTSO *tso)
-{
- nat new_stack_size, stack_words;
- lnat new_tso_size;
- StgPtr new_sp;
- StgTSO *dest;
-
- IF_DEBUG(sanity,checkTSO(tso));
-
- // don't allow throwTo() to modify the blocked_exceptions queue
- // while we are moving the TSO:
- lockClosure((StgClosure *)tso);
-
- if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
- // NB. never raise a StackOverflow exception if the thread is
- // inside Control.Exceptino.block. It is impractical to protect
- // against stack overflow exceptions, since virtually anything
- // can raise one (even 'catch'), so this is the only sensible
- // thing to do here. See bug #767.
-
- debugTrace(DEBUG_gc,
- "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
- (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
- IF_DEBUG(gc,
- /* If we're debugging, just print out the top of the stack */
- printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size,
- tso->sp+64)));
-
- // Send this thread the StackOverflow exception
- unlockTSO(tso);
- throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
- return tso;
- }
-
- /* Try to double the current stack size. If that takes us over the
- * maximum stack size for this thread, then use the maximum instead
- * (that is, unless we're already at or over the max size and we
- * can't raise the StackOverflow exception (see above), in which
- * case just double the size). Finally round up so the TSO ends up as
- * a whole number of blocks.
- */
- if (tso->stack_size >= tso->max_stack_size) {
- new_stack_size = tso->stack_size * 2;
- } else {
- new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
- }
- new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) +
- TSO_STRUCT_SIZE)/sizeof(W_);
- new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */
- new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
-
- debugTrace(DEBUG_sched,
- "increasing stack size from %ld words to %d.",
- (long)tso->stack_size, new_stack_size);
-
- dest = (StgTSO *)allocateLocal(cap,new_tso_size);
- TICK_ALLOC_TSO(new_stack_size,0);
-
- /* copy the TSO block and the old stack into the new area */
- memcpy(dest,tso,TSO_STRUCT_SIZE);
- stack_words = tso->stack + tso->stack_size - tso->sp;
- new_sp = (P_)dest + new_tso_size - stack_words;
- memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
-
- /* relocate the stack pointers... */
- dest->sp = new_sp;
- dest->stack_size = new_stack_size;
-
- /* Mark the old TSO as relocated. We have to check for relocated
- * TSOs in the garbage collector and any primops that deal with TSOs.
- *
- * It's important to set the sp value to just beyond the end
- * of the stack, so we don't attempt to scavenge any part of the
- * dead TSO's stack.
- */
- tso->what_next = ThreadRelocated;
- setTSOLink(cap,tso,dest);
- tso->sp = (P_)&(tso->stack[tso->stack_size]);
- tso->why_blocked = NotBlocked;
-
- unlockTSO(dest);
- unlockTSO(tso);
-
- IF_DEBUG(sanity,checkTSO(dest));
-#if 0
- IF_DEBUG(scheduler,printTSO(dest));
-#endif
-
- return dest;
-}
-
-static StgTSO *
-threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
-{
- bdescr *bd, *new_bd;
- lnat free_w, tso_size_w;
- StgTSO *new_tso;
-
- tso_size_w = tso_sizeW(tso);
-
- if (tso_size_w < MBLOCK_SIZE_W ||
- // TSO is less than 2 mblocks (since the first mblock is
- // shorter than MBLOCK_SIZE_W)
- (tso_size_w - BLOCKS_PER_MBLOCK*BLOCK_SIZE_W) % MBLOCK_SIZE_W != 0 ||
- // or TSO is not a whole number of megablocks (ensuring
- // precondition of splitLargeBlock() below)
- (tso_size_w <= round_up_to_mblocks(RtsFlags.GcFlags.initialStkSize)) ||
- // or TSO is smaller than the minimum stack size (rounded up)
- (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4)
- // or stack is using more than 1/4 of the available space
- {
- // then do nothing
- return tso;
- }
-
- // don't allow throwTo() to modify the blocked_exceptions queue
- // while we are moving the TSO:
- lockClosure((StgClosure *)tso);
-
- // this is the number of words we'll free
- free_w = round_to_mblocks(tso_size_w/2);
-
- bd = Bdescr((StgPtr)tso);
- new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
- bd->free = bd->start + TSO_STRUCT_SIZEW;
-
- new_tso = (StgTSO *)new_bd->start;
- memcpy(new_tso,tso,TSO_STRUCT_SIZE);
- new_tso->stack_size = new_bd->free - new_tso->stack;
-
- debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
- (long)tso->id, tso_size_w, tso_sizeW(new_tso));
-
- tso->what_next = ThreadRelocated;
- tso->_link = new_tso; // no write barrier reqd: same generation
-
- // The TSO attached to this Task may have moved, so update the
- // pointer to it.
- if (task->tso == tso) {
- task->tso = new_tso;
- }
-
- unlockTSO(new_tso);
- unlockTSO(tso);
-
- IF_DEBUG(sanity,checkTSO(new_tso));
-
- return new_tso;
-}
-
/* ---------------------------------------------------------------------------
Interrupt execution
- usually called inside a signal handler so it mustn't do anything fancy.
#endif
/* -----------------------------------------------------------------------------
- * checkBlackHoles()
- *
- * Check the blackhole_queue for threads that can be woken up. We do
- * this periodically: before every GC, and whenever the run queue is
- * empty.
- *
- * An elegant solution might be to just wake up all the blocked
- * threads with awakenBlockedQueue occasionally: they'll go back to
- * sleep again if the object is still a BLACKHOLE. Unfortunately this
- * doesn't give us a way to tell whether we've actually managed to
- * wake up any threads, so we would be busy-waiting.
- *
- * -------------------------------------------------------------------------- */
-
-static rtsBool
-checkBlackHoles (Capability *cap)
-{
- StgTSO **prev, *t;
- rtsBool any_woke_up = rtsFalse;
- StgHalfWord type;
-
- // blackhole_queue is global:
- ASSERT_LOCK_HELD(&sched_mutex);
-
- debugTrace(DEBUG_sched, "checking threads blocked on black holes");
-
- // ASSUMES: sched_mutex
- prev = &blackhole_queue;
- t = blackhole_queue;
- while (t != END_TSO_QUEUE) {
- if (t->what_next == ThreadRelocated) {
- t = t->_link;
- continue;
- }
- ASSERT(t->why_blocked == BlockedOnBlackHole);
- type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
- if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
- IF_DEBUG(sanity,checkTSO(t));
- t = unblockOne(cap, t);
- *prev = t;
- any_woke_up = rtsTrue;
- } else {
- prev = &t->_link;
- t = t->_link;
- }
- }
-
- return any_woke_up;
-}
-
-/* -----------------------------------------------------------------------------
Deleting threads
This is used for interruption (^C) and forking, and corresponds to
exception.
-------------------------------------------------------------------------- */
-static void
-deleteThread (Capability *cap, StgTSO *tso)
+static void
+deleteThread (Capability *cap STG_UNUSED, StgTSO *tso)
{
// NOTE: must only be called on a TSO that we have exclusive
// access to, because we will call throwToSingleThreaded() below.
// we must own all Capabilities.
if (tso->why_blocked != BlockedOnCCall &&
- tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
- throwToSingleThreaded(cap,tso,NULL);
+ tso->why_blocked != BlockedOnCCall_Interruptible) {
+ throwToSingleThreaded(tso->cap,tso,NULL);
}
}
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
-static void
+static void
deleteThread_(Capability *cap, StgTSO *tso)
{ // for forkProcess only:
// like deleteThread(), but we delete threads in foreign calls, too.
if (tso->why_blocked == BlockedOnCCall ||
- tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
- unblockOne(cap,tso);
+ tso->why_blocked == BlockedOnCCall_Interruptible) {
tso->what_next = ThreadKilled;
+ appendToRunQueue(tso->cap, tso);
} else {
deleteThread(cap,tso);
}
// we update any closures pointed to from update frames with the
// raise closure that we just built.
//
- p = tso->sp;
+ p = tso->stackobj->sp;
while(1) {
info = get_ret_itbl((StgClosure *)p);
next = p + stack_frame_sizeW((StgClosure *)p);
// Only create raise_closure if we need to.
if (raise_closure == NULL) {
raise_closure =
- (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
+ (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
SET_HDR(raise_closure, &stg_raise_info, CCCS);
raise_closure->payload[0] = exception;
}
- UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
+ updateThunk(cap, tso, ((StgUpdateFrame *)p)->updatee,
+ (StgClosure *)raise_closure);
p = next;
continue;
case ATOMICALLY_FRAME:
debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
- tso->sp = p;
+ tso->stackobj->sp = p;
return ATOMICALLY_FRAME;
case CATCH_FRAME:
- tso->sp = p;
+ tso->stackobj->sp = p;
return CATCH_FRAME;
case CATCH_STM_FRAME:
debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
- tso->sp = p;
+ tso->stackobj->sp = p;
return CATCH_STM_FRAME;
- case STOP_FRAME:
- tso->sp = p;
+ case UNDERFLOW_FRAME:
+ tso->stackobj->sp = p;
+ threadStackUnderflow(cap,tso);
+ p = tso->stackobj->sp;
+ continue;
+
+ case STOP_FRAME:
+ tso->stackobj->sp = p;
return STOP_FRAME;
case CATCH_RETRY_FRAME:
-------------------------------------------------------------------------- */
StgWord
-findRetryFrameHelper (StgTSO *tso)
+findRetryFrameHelper (Capability *cap, StgTSO *tso)
{
StgPtr p, next;
StgRetInfoTable *info;
- p = tso -> sp;
+ p = tso->stackobj->sp;
while (1) {
info = get_ret_itbl((StgClosure *)p);
next = p + stack_frame_sizeW((StgClosure *)p);
case ATOMICALLY_FRAME:
debugTrace(DEBUG_stm,
"found ATOMICALLY_FRAME at %p during retry", p);
- tso->sp = p;
+ tso->stackobj->sp = p;
return ATOMICALLY_FRAME;
case CATCH_RETRY_FRAME:
debugTrace(DEBUG_stm,
"found CATCH_RETRY_FRAME at %p during retrry", p);
- tso->sp = p;
+ tso->stackobj->sp = p;
return CATCH_RETRY_FRAME;
case CATCH_STM_FRAME: {
debugTrace(DEBUG_stm,
"found CATCH_STM_FRAME at %p during retry", p);
debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
- stmAbortTransaction(tso -> cap, trec);
- stmFreeAbortedTRec(tso -> cap, trec);
+ stmAbortTransaction(cap, trec);
+ stmFreeAbortedTRec(cap, trec);
tso -> trec = outer;
p = next;
continue;
}
+ case UNDERFLOW_FRAME:
+ threadStackUnderflow(cap,tso);
+ p = tso->stackobj->sp;
+ continue;
default:
ASSERT(info->i.type != CATCH_FRAME);
{
StgTSO *tso, *next;
Capability *cap;
- step *step;
+ generation *gen;
for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
next = tso->global_link;
- step = Bdescr((P_)tso)->step;
- tso->global_link = step->threads;
- step->threads = tso;
+ gen = Bdescr((P_)tso)->gen;
+ tso->global_link = gen->threads;
+ gen->threads = tso;
debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
* can wake up threads, remember...).
*/
continue;
- case BlockedOnException:
- // throwTo should never block indefinitely: if the target
- // thread dies or completes, throwTo returns.
- barf("resurrectThreads: thread BlockedOnException");
- break;
+ case BlockedOnMsgThrowTo:
+ // This can happen if the target is masking, blocks on a
+ // black hole, and then is found to be unreachable. In
+ // this case, we want to let the target wake up and carry
+ // on, and do nothing to this thread.
+ continue;
default:
- barf("resurrectThreads: thread blocked in a strange way");
+ barf("resurrectThreads: thread blocked in a strange way: %d",
+ tso->why_blocked);
}
}
}
-
-/* -----------------------------------------------------------------------------
- performPendingThrowTos is called after garbage collection, and
- passed a list of threads that were found to have pending throwTos
- (tso->blocked_exceptions was not empty), and were blocked.
- Normally this doesn't happen, because we would deliver the
- exception directly if the target thread is blocked, but there are
- small windows where it might occur on a multiprocessor (see
- throwTo()).
-
- NB. we must be holding all the capabilities at this point, just
- like resurrectThreads().
- -------------------------------------------------------------------------- */
-
-void
-performPendingThrowTos (StgTSO *threads)
-{
- StgTSO *tso, *next;
- Capability *cap;
- Task *task, *saved_task;;
- step *step;
-
- task = myTask();
- cap = task->cap;
-
- for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
- next = tso->global_link;
-
- step = Bdescr((P_)tso)->step;
- tso->global_link = step->threads;
- step->threads = tso;
-
- debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
-
- // We must pretend this Capability belongs to the current Task
- // for the time being, as invariants will be broken otherwise.
- // In fact the current Task has exclusive access to the systme
- // at this point, so this is just bookkeeping:
- task->cap = tso->cap;
- saved_task = tso->cap->running_task;
- tso->cap->running_task = task;
- maybePerformBlockedException(tso->cap, tso);
- tso->cap->running_task = saved_task;
- }
-
- // Restore our original Capability:
- task->cap = cap;
-}