X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=141c973f3aa09693f4e94c4a6f3253163469727d;hb=0506cb7ec75321eaacc6c279d01d82368d2ca125;hp=31a487515a369458a6e8fedee6f57234600863c3;hpb=3ebcd3deb769a03f4ded0fca2cf38201048c0214;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index 31a4875..141c973 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -32,6 +32,8 @@ #include "Proftimer.h" #include "ProfHeap.h" #include "GC.h" +#include "Weak.h" +#include "EventLog.h" /* PARALLEL_HASKELL includes go here */ @@ -90,6 +92,12 @@ StgTSO *blackhole_queue = NULL; */ rtsBool blackholes_need_checking = rtsFalse; +/* Set to true when the latest garbage collection failed to reclaim + * enough space, and the runtime should proceed to shut itself down in + * an orderly fashion (emitting profiling info etc.) + */ +rtsBool heap_overflow = rtsFalse; + /* flag that tracks whether we have done any execution in this time slice. * LOCK: currently none, perhaps we should lock (but needs to be * updated in the fast path of the scheduler). @@ -275,6 +283,12 @@ schedule (Capability *initialCapability, Task *task) "### NEW SCHEDULER LOOP (task: %p, cap: %p)", task, initialCapability); + if (running_finalizers) { + errorBelch("error: a C finalizer called back into Haskell.\n" + " use Foreign.Concurrent.newForeignPtr for Haskell finalizers."); + stg_exit(EXIT_FAILURE); + } + schedulePreLoop(); // ----------------------------------------------------------- @@ -526,6 +540,8 @@ run_thread: } #endif + postEvent(cap, EVENT_RUN_THREAD, t->id, 0); + switch (prev_what_next) { case ThreadKilled: @@ -574,6 +590,8 @@ run_thread: t->saved_winerror = GetLastError(); #endif + postEvent (cap, EVENT_STOP_THREAD, t->id, ret); + #if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that // blocked, we are in limbo - the TSO is now owned by whatever it @@ -731,6 +749,7 @@ scheduleYield (Capability **pcap, Task *task) // if we have work, and we don't need to give up the Capability, continue. if (!shouldYieldCapability(cap,task) && (!emptyRunQueue(cap) || + !emptyWakeupQueue(cap) || blackholes_need_checking || sched_state >= SCHED_INTERRUPTING)) return; @@ -771,9 +790,11 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. - if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE) - && sparkPoolSizeCap(cap) < 2) { - return; + if (cap->run_queue_hd == END_TSO_QUEUE) { + if (sparkPoolSizeCap(cap) < 2) return; + } else { + if (cap->run_queue_hd->_link == END_TSO_QUEUE && + sparkPoolSizeCap(cap) < 1) return; } // First grab as many free Capabilities as we can. @@ -836,6 +857,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } else { debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); appendToRunQueue(free_caps[i],t); + + postEvent (cap, EVENT_MIGRATE_THREAD, t->id, free_caps[i]->no); + if (t->bound) { t->bound->cap = free_caps[i]; } t->cap = free_caps[i]; i++; @@ -1252,7 +1276,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) "--<< thread %ld (%s) stopped: HeapOverflow", (long)t->id, whatNext_strs[t->what_next]); - if (cap->context_switch) { + if (cap->r.rHpLim == NULL || cap->context_switch) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the // context switch flag, and we end up waiting for a GC. @@ -1394,6 +1418,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", (unsigned long)t->id, whatNext_strs[t->what_next]); + // blocked exceptions can now complete, even if the thread was in + // blocked mode (see #2910). This unconditionally calls + // lockTSO(), which ensures that we don't miss any threads that + // are engaged in throwTo() with this thread as a target. + awakenBlockedExceptionQueue (cap, t); + // // Check whether the thread that just completed was a bound // thread, and if so return with the result. @@ -1436,7 +1466,11 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) *(task->ret) = NULL; } if (sched_state >= SCHED_INTERRUPTING) { - task->stat = Interrupted; + if (heap_overflow) { + task->stat = HeapExhausted; + } else { + task->stat = Interrupted; + } } else { task->stat = Killed; } @@ -1534,6 +1568,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) if (gc_type == PENDING_GC_SEQ) { + postEvent(cap, EVENT_REQUEST_SEQ_GC, 0, 0); // single-threaded GC: grab all the capabilities for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); @@ -1556,6 +1591,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { // multi-threaded GC: make sure all the Capabilities donate one // GC thread each. + postEvent(cap, EVENT_REQUEST_PAR_GC, 0, 0); debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); waitForGcThreads(cap); @@ -1567,6 +1603,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) IF_DEBUG(scheduler, printAllThreads()); +delete_threads_and_gc: /* * We now have all the capabilities; if we're in an interrupting * state, then we should take the opportunity to delete all the @@ -1580,6 +1617,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) heap_census = scheduleNeedHeapProfile(rtsTrue); #if defined(THREADED_RTS) + postEvent(cap, EVENT_GC_START, 0, 0); debugTrace(DEBUG_sched, "doing GC"); // reset waiting_for_gc *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. @@ -1588,6 +1626,31 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) #else GarbageCollect(force_major || heap_census, 0, cap); #endif + postEvent(cap, EVENT_GC_END, 0, 0); + + if (recent_activity == ACTIVITY_INACTIVE && force_major) + { + // We are doing a GC because the system has been idle for a + // timeslice and we need to check for deadlock. Record the + // fact that we've done a GC and turn off the timer signal; + // it will get re-enabled if we run any threads after the GC. + recent_activity = ACTIVITY_DONE_GC; + stopTimer(); + } + else + { + // the GC might have taken long enough for the timer to set + // recent_activity = ACTIVITY_INACTIVE, but we aren't + // necessarily deadlocked: + recent_activity = ACTIVITY_YES; + } + +#if defined(THREADED_RTS) + if (gc_type == PENDING_GC_PAR) + { + releaseGCThreads(cap); + } +#endif if (heap_census) { debugTrace(DEBUG_sched, "performing heap census"); @@ -1595,6 +1658,23 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) performHeapProfile = rtsFalse; } + if (heap_overflow && sched_state < SCHED_INTERRUPTING) { + // GC set the heap_overflow flag, so we should proceed with + // an orderly shutdown now. Ultimately we want the main + // thread to return to its caller with HeapExhausted, at which + // point the caller should call hs_exit(). The first step is + // to delete all the threads. + // + // Another way to do this would be to raise an exception in + // the main thread, which we really should do because it gives + // the program a chance to clean up. But how do we find the + // main thread? It should presumably be the same one that + // gets ^C exceptions, but that's all done on the Haskell side + // (GHC.TopHandler). + sched_state = SCHED_INTERRUPTING; + goto delete_threads_and_gc; + } + #ifdef SPARKBALANCE /* JB Once we are all together... this would be the place to balance all @@ -1603,16 +1683,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) balanceSparkPoolsCaps(n_capabilities, capabilities); #endif - if (force_major) - { - // We've just done a major GC and we don't need the timer - // signal turned on any more (#1623). - // NB. do this *before* releasing the Capabilities, to avoid - // deadlocks! - recent_activity = ACTIVITY_DONE_GC; - stopTimer(); - } - #if defined(THREADED_RTS) if (gc_type == PENDING_GC_SEQ) { // release our stash of capabilities. @@ -1872,6 +1942,7 @@ suspendThread (StgRegTable *reg) task = cap->running_task; tso = cap->r.rCurrentTSO; + postEvent(cap, EVENT_STOP_THREAD, tso->id, THREAD_SUSPENDED_FOREIGN_CALL); debugTrace(DEBUG_sched, "thread %lu did a safe foreign call", (unsigned long)cap->r.rCurrentTSO->id); @@ -1943,10 +2014,15 @@ resumeThread (void *task_) tso = task->suspended_tso; task->suspended_tso = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd + + postEvent(cap, EVENT_RUN_THREAD, tso->id, 0); debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); if (tso->why_blocked == BlockedOnCCall) { - awakenBlockedExceptionQueue(cap,tso); + // avoid locking the TSO if we don't have to + if (tso->blocked_exceptions != END_TSO_QUEUE) { + awakenBlockedExceptionQueue(cap,tso); + } tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE); } @@ -1996,6 +2072,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { + postEvent (cap, EVENT_MIGRATE_THREAD, tso->id, capabilities[cpu].no); wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else @@ -2048,6 +2125,10 @@ workerStart(Task *task) cap = task->cap; RELEASE_LOCK(&task->lock); + if (RtsFlags.ParFlags.setAffinity) { + setThreadAffinity(cap->no, n_capabilities); + } + // set the thread-local pointer to the Task: taskEnter(task); @@ -2135,8 +2216,6 @@ initScheduler(void) } #endif - trace(TRACE_sched, "start: %d capabilities", n_capabilities); - RELEASE_LOCK(&sched_mutex); } @@ -2151,22 +2230,16 @@ exitScheduler( { Task *task = NULL; -#if defined(THREADED_RTS) ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); RELEASE_LOCK(&sched_mutex); -#endif // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; -#if defined(THREADED_RTS) waitForReturnCapability(&task->cap,task); scheduleDoGC(task->cap,task,rtsFalse); releaseCapability(task->cap); -#else - scheduleDoGC(&MainCapability,task,rtsFalse); -#endif } sched_state = SCHED_SHUTTING_DOWN; @@ -2471,6 +2544,10 @@ checkBlackHoles (Capability *cap) prev = &blackhole_queue; t = blackhole_queue; while (t != END_TSO_QUEUE) { + if (t->what_next == ThreadRelocated) { + t = t->_link; + continue; + } ASSERT(t->why_blocked == BlockedOnBlackHole); type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) {