#include "Updates.h"
#include "Proftimer.h"
#include "ProfHeap.h"
+#include "GC.h"
+#include "Weak.h"
+#include "EventLog.h"
/* PARALLEL_HASKELL includes go here */
*/
rtsBool blackholes_need_checking = rtsFalse;
+/* Set to true when the latest garbage collection failed to reclaim
+ * enough space, and the runtime should proceed to shut itself down in
+ * an orderly fashion (emitting profiling info etc.)
+ */
+rtsBool heap_overflow = rtsFalse;
+
/* flag that tracks whether we have done any execution in this time slice.
* LOCK: currently none, perhaps we should lock (but needs to be
* updated in the fast path of the scheduler).
"### NEW SCHEDULER LOOP (task: %p, cap: %p)",
task, initialCapability);
+ if (running_finalizers) {
+ errorBelch("error: a C finalizer called back into Haskell.\n"
+ " This was previously allowed, but is disallowed in GHC 6.10.2 and later.\n"
+ " To create finalizers that may call back into Haskll, use\n"
+ " Foreign.Concurrent.newForeignPtr instead of Foreign.newForeignPtr.");
+ stg_exit(EXIT_FAILURE);
+ }
+
schedulePreLoop();
// -----------------------------------------------------------
#endif
/* scheduleDoGC() deletes all the threads */
cap = scheduleDoGC(cap,task,rtsFalse);
- break;
+
+ // after scheduleDoGC(), we must be shutting down. Either some
+ // other Capability did the final GC, or we did it above,
+ // either way we can fall through to the SCHED_SHUTTING_DOWN
+ // case now.
+ ASSERT(sched_state == SCHED_SHUTTING_DOWN);
+ // fall through
+
case SCHED_SHUTTING_DOWN:
debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
// If we are a worker, just exit. If we're a bound thread
}
#endif
+ // If we're shutting down, and this thread has not yet been
+ // killed, kill it now. This sometimes happens when a finalizer
+ // thread is created by the final GC, or a thread previously
+ // in a foreign call returns.
+ if (sched_state >= SCHED_INTERRUPTING &&
+ !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
+ deleteThread(cap,t);
+ }
+
/* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
}
#endif
+ postEvent(cap, EVENT_RUN_THREAD, t->id, 0);
+
switch (prev_what_next) {
case ThreadKilled:
t->saved_winerror = GetLastError();
#endif
+ postEvent (cap, EVENT_STOP_THREAD, t->id, ret);
+
#if defined(THREADED_RTS)
// If ret is ThreadBlocked, and this Task is bound to the TSO that
// blocked, we are in limbo - the TSO is now owned by whatever it
// if we have work, and we don't need to give up the Capability, continue.
if (!shouldYieldCapability(cap,task) &&
(!emptyRunQueue(cap) ||
+ !emptyWakeupQueue(cap) ||
blackholes_need_checking ||
sched_state >= SCHED_INTERRUPTING))
return;
// Check whether we have more threads on our run queue, or sparks
// in our pool, that we could hand to another Capability.
- if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
- && sparkPoolSizeCap(cap) < 2) {
- return;
+ if (cap->run_queue_hd == END_TSO_QUEUE) {
+ if (sparkPoolSizeCap(cap) < 2) return;
+ } else {
+ if (cap->run_queue_hd->_link == END_TSO_QUEUE &&
+ sparkPoolSizeCap(cap) < 1) return;
}
// First grab as many free Capabilities as we can.
} else {
debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
appendToRunQueue(free_caps[i],t);
+
+ postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no);
+
if (t->bound) { t->bound->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
spark = tryStealSpark(cap->sparks);
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
+
+ postEvent(free_caps[i], EVENT_STEAL_SPARK, t->id, cap->no);
+
newSpark(&(free_caps[i]->r), spark);
}
}
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
+ cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
+ // when force_major == rtsTrue. scheduleDoGC sets
+ // recent_activity to ACTIVITY_DONE_GC and turns off the timer
+ // signal.
- recent_activity = ACTIVITY_DONE_GC;
- // disable timer signals (see #1623)
- stopTimer();
-
if ( !emptyRunQueue(cap) ) return;
#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
// ATOMICALLY_FRAME, aborting the (nested)
// transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
- throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
+ throwToSingleThreaded_(cap, t, NULL, rtsTrue);
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
}
"--<< thread %ld (%s) stopped: HeapOverflow",
(long)t->id, whatNext_strs[t->what_next]);
- if (cap->context_switch) {
+ if (cap->r.rHpLim == NULL || cap->context_switch) {
// Sometimes we miss a context switch, e.g. when calling
// primitives in a tight loop, MAYBE_GC() doesn't check the
// context switch flag, and we end up waiting for a GC.
debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished",
(unsigned long)t->id, whatNext_strs[t->what_next]);
+ // blocked exceptions can now complete, even if the thread was in
+ // blocked mode (see #2910). This unconditionally calls
+ // lockTSO(), which ensures that we don't miss any threads that
+ // are engaged in throwTo() with this thread as a target.
+ awakenBlockedExceptionQueue (cap, t);
+
//
// Check whether the thread that just completed was a bound
// thread, and if so return with the result.
*(task->ret) = NULL;
}
if (sched_state >= SCHED_INTERRUPTING) {
- task->stat = Interrupted;
+ if (heap_overflow) {
+ task->stat = HeapExhausted;
+ } else {
+ task->stat = Interrupted;
+ }
} else {
task->stat = Killed;
}
#ifdef THREADED_RTS
/* extern static volatile StgWord waiting_for_gc;
lives inside capability.c */
- rtsBool was_waiting;
+ rtsBool gc_type, prev_pending_gc;
nat i;
#endif
+ if (sched_state == SCHED_SHUTTING_DOWN) {
+ // The final GC has already been done, and the system is
+ // shutting down. We'll probably deadlock if we try to GC
+ // now.
+ return cap;
+ }
+
#ifdef THREADED_RTS
+ if (sched_state < SCHED_INTERRUPTING
+ && RtsFlags.ParFlags.parGcEnabled
+ && N >= RtsFlags.ParFlags.parGcGen
+ && ! oldest_gen->steps[0].mark)
+ {
+ gc_type = PENDING_GC_PAR;
+ } else {
+ gc_type = PENDING_GC_SEQ;
+ }
+
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
-
+
/* Other capabilities are prevented from running yet more Haskell
threads if waiting_for_gc is set. Tested inside
yieldCapability() and releaseCapability() in Capability.c */
- was_waiting = cas(&waiting_for_gc, 0, 1);
- if (was_waiting) {
+ prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
+ if (prev_pending_gc) {
do {
- debugTrace(DEBUG_sched, "someone else is trying to GC...");
- if (cap) yieldCapability(&cap,task);
+ debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
+ prev_pending_gc);
+ ASSERT(cap);
+ yieldCapability(&cap,task);
} while (waiting_for_gc);
return cap; // NOTE: task->cap might have changed here
}
setContextSwitches();
- for (i=0; i < n_capabilities; i++) {
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
- if (cap != &capabilities[i]) {
- Capability *pcap = &capabilities[i];
- // we better hope this task doesn't get migrated to
- // another Capability while we're waiting for this one.
- // It won't, because load balancing happens while we have
- // all the Capabilities, but even so it's a slightly
- // unsavoury invariant.
- task->cap = pcap;
- waitForReturnCapability(&pcap, task);
- if (pcap != &capabilities[i]) {
- barf("scheduleDoGC: got the wrong capability");
- }
- }
+
+ // The final shutdown GC is always single-threaded, because it's
+ // possible that some of the Capabilities have no worker threads.
+
+ if (gc_type == PENDING_GC_SEQ)
+ {
+ postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0);
+ // single-threaded GC: grab all the capabilities
+ for (i=0; i < n_capabilities; i++) {
+ debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
+ if (cap != &capabilities[i]) {
+ Capability *pcap = &capabilities[i];
+ // we better hope this task doesn't get migrated to
+ // another Capability while we're waiting for this one.
+ // It won't, because load balancing happens while we have
+ // all the Capabilities, but even so it's a slightly
+ // unsavoury invariant.
+ task->cap = pcap;
+ waitForReturnCapability(&pcap, task);
+ if (pcap != &capabilities[i]) {
+ barf("scheduleDoGC: got the wrong capability");
+ }
+ }
+ }
}
+ else
+ {
+ // multi-threaded GC: make sure all the Capabilities donate one
+ // GC thread each.
+ postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0);
+ debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
- waiting_for_gc = rtsFalse;
+ waitForGcThreads(cap);
+ }
#endif
// so this happens periodically:
IF_DEBUG(scheduler, printAllThreads());
+delete_threads_and_gc:
/*
* We now have all the capabilities; if we're in an interrupting
* state, then we should take the opportunity to delete all the
* threads in the system.
*/
- if (sched_state >= SCHED_INTERRUPTING) {
- deleteAllThreads(&capabilities[0]);
+ if (sched_state == SCHED_INTERRUPTING) {
+ deleteAllThreads(cap);
sched_state = SCHED_SHUTTING_DOWN;
}
heap_census = scheduleNeedHeapProfile(rtsTrue);
- /* everybody back, start the GC.
- * Could do it in this thread, or signal a condition var
- * to do it in another thread. Either way, we need to
- * broadcast on gc_pending_cond afterward.
- */
#if defined(THREADED_RTS)
+ postEvent(cap, EVENT_GC_START, 0, 0);
debugTrace(DEBUG_sched, "doing GC");
+ // reset waiting_for_gc *before* GC, so that when the GC threads
+ // emerge they don't immediately re-enter the GC.
+ waiting_for_gc = 0;
+ GarbageCollect(force_major || heap_census, gc_type, cap);
+#else
+ GarbageCollect(force_major || heap_census, 0, cap);
#endif
- GarbageCollect(force_major || heap_census);
-
+ postEvent(cap, EVENT_GC_END, 0, 0);
+
+ if (recent_activity == ACTIVITY_INACTIVE && force_major)
+ {
+ // We are doing a GC because the system has been idle for a
+ // timeslice and we need to check for deadlock. Record the
+ // fact that we've done a GC and turn off the timer signal;
+ // it will get re-enabled if we run any threads after the GC.
+ recent_activity = ACTIVITY_DONE_GC;
+ stopTimer();
+ }
+ else
+ {
+ // the GC might have taken long enough for the timer to set
+ // recent_activity = ACTIVITY_INACTIVE, but we aren't
+ // necessarily deadlocked:
+ recent_activity = ACTIVITY_YES;
+ }
+
+#if defined(THREADED_RTS)
+ if (gc_type == PENDING_GC_PAR)
+ {
+ releaseGCThreads(cap);
+ }
+#endif
+
if (heap_census) {
debugTrace(DEBUG_sched, "performing heap census");
heapCensus();
performHeapProfile = rtsFalse;
}
+ if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
+ // GC set the heap_overflow flag, so we should proceed with
+ // an orderly shutdown now. Ultimately we want the main
+ // thread to return to its caller with HeapExhausted, at which
+ // point the caller should call hs_exit(). The first step is
+ // to delete all the threads.
+ //
+ // Another way to do this would be to raise an exception in
+ // the main thread, which we really should do because it gives
+ // the program a chance to clean up. But how do we find the
+ // main thread? It should presumably be the same one that
+ // gets ^C exceptions, but that's all done on the Haskell side
+ // (GHC.TopHandler).
+ sched_state = SCHED_INTERRUPTING;
+ goto delete_threads_and_gc;
+ }
+
#ifdef SPARKBALANCE
/* JB
Once we are all together... this would be the place to balance all
#endif
#if defined(THREADED_RTS)
- // release our stash of capabilities.
- for (i = 0; i < n_capabilities; i++) {
- if (cap != &capabilities[i]) {
- task->cap = &capabilities[i];
- releaseCapability(&capabilities[i]);
- }
+ if (gc_type == PENDING_GC_SEQ) {
+ // release our stash of capabilities.
+ for (i = 0; i < n_capabilities; i++) {
+ if (cap != &capabilities[i]) {
+ task->cap = &capabilities[i];
+ releaseCapability(&capabilities[i]);
+ }
+ }
}
if (cap) {
task->cap = cap;
task = cap->running_task;
tso = cap->r.rCurrentTSO;
+ postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL);
debugTrace(DEBUG_sched,
"thread %lu did a safe foreign call",
(unsigned long)cap->r.rCurrentTSO->id);
tso = task->suspended_tso;
task->suspended_tso = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
+
+ postEvent(cap, EVENT_RUN_THREAD, tso->id, 0);
debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
if (tso->why_blocked == BlockedOnCCall) {
- awakenBlockedExceptionQueue(cap,tso);
+ // avoid locking the TSO if we don't have to
+ if (tso->blocked_exceptions != END_TSO_QUEUE) {
+ awakenBlockedExceptionQueue(cap,tso);
+ }
tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
}
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
+ postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no);
wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
}
#else
cap = task->cap;
RELEASE_LOCK(&task->lock);
+ if (RtsFlags.ParFlags.setAffinity) {
+ setThreadAffinity(cap->no, n_capabilities);
+ }
+
// set the thread-local pointer to the Task:
taskEnter(task);
// schedule() runs without a lock.
cap = schedule(cap,task);
- // On exit from schedule(), we have a Capability.
- releaseCapability(cap);
+ // On exit from schedule(), we have a Capability, but possibly not
+ // the same one we started with.
+
+ // During shutdown, the requirement is that after all the
+ // Capabilities are shut down, all workers that are shutting down
+ // have finished workerTaskStop(). This is why we hold on to
+ // cap->lock until we've finished workerTaskStop() below.
+ //
+ // There may be workers still involved in foreign calls; those
+ // will just block in waitForReturnCapability() because the
+ // Capability has been shut down.
+ //
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap,rtsFalse);
workerTaskStop(task);
+ RELEASE_LOCK(&cap->lock);
}
#endif
}
#endif
- trace(TRACE_sched, "start: %d capabilities", n_capabilities);
-
RELEASE_LOCK(&sched_mutex);
}
{
Task *task = NULL;
-#if defined(THREADED_RTS)
- ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
-#endif
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
- scheduleDoGC(NULL,task,rtsFalse);
+ waitForReturnCapability(&task->cap,task);
+ scheduleDoGC(task->cap,task,rtsFalse);
+ releaseCapability(task->cap);
}
sched_state = SCHED_SHUTTING_DOWN;
shutdownCapability(&capabilities[i], task, wait_foreign);
}
boundTaskExiting(task);
- stopTaskManager();
}
#endif
}
void
freeScheduler( void )
{
- freeCapabilities();
- freeTaskManager();
- if (n_capabilities != 1) {
- stgFree(capabilities);
+ nat still_running;
+
+ ACQUIRE_LOCK(&sched_mutex);
+ still_running = freeTaskManager();
+ // We can only free the Capabilities if there are no Tasks still
+ // running. We might have a Task about to return from a foreign
+ // call into waitForReturnCapability(), for example (actually,
+ // this should be the *only* thing that a still-running Task can
+ // do at this point, and it will block waiting for the
+ // Capability).
+ if (still_running == 0) {
+ freeCapabilities();
+ if (n_capabilities != 1) {
+ stgFree(capabilities);
+ }
}
+ RELEASE_LOCK(&sched_mutex);
#if defined(THREADED_RTS)
closeMutex(&sched_mutex);
#endif
performGC_(rtsBool force_major)
{
Task *task;
+
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalling_tasks queue.
- ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
- RELEASE_LOCK(&sched_mutex);
- scheduleDoGC(NULL,task,force_major);
+
+ waitForReturnCapability(&task->cap,task);
+ scheduleDoGC(task->cap,task,force_major);
+ releaseCapability(task->cap);
boundTaskExiting(task);
}
prev = &blackhole_queue;
t = blackhole_queue;
while (t != END_TSO_QUEUE) {
+ if (t->what_next == ThreadRelocated) {
+ t = t->_link;
+ continue;
+ }
ASSERT(t->why_blocked == BlockedOnBlackHole);
type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
if (type != BLACKHOLE && type != CAF_BLACKHOLE) {