X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=7dd063423fe3e123770291254b2efba3ed874b55;hb=117dc73b8a8f6a4184a825a96c372480377f4680;hp=f53687a4c32c3b73503a07b158400c265c57e86d;hpb=8ec22a59d5dca42053a7b4fdc7897925b9cb2eec;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index f53687a..7dd0634 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -92,13 +92,15 @@ rtsBool blackholes_need_checking = rtsFalse; /* flag that tracks whether we have done any execution in this time slice. * LOCK: currently none, perhaps we should lock (but needs to be * updated in the fast path of the scheduler). + * + * NB. must be StgWord, we do xchg() on it. */ -nat recent_activity = ACTIVITY_YES; +volatile StgWord recent_activity = ACTIVITY_YES; /* if this flag is set as well, give up execution - * LOCK: none (changes once, from false->true) + * LOCK: none (changes monotonically) */ -rtsBool sched_state = SCHED_RUNNING; +volatile StgWord sched_state = SCHED_RUNNING; /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually * exists - earlier gccs apparently didn't. @@ -137,17 +139,21 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); +static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) -static void schedulePushWork(Capability *cap, Task *task); +static void scheduleYield (Capability **pcap, Task *task); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); +static void schedulePushWork(Capability *cap, Task *task); #if defined(PARALLEL_HASKELL) static rtsBool scheduleGetRemoteWork(Capability *cap); static void scheduleSendPendingMessages(void); +#endif +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); @@ -281,23 +287,6 @@ schedule (Capability *initialCapability, Task *task) while (TERMINATION_CONDITION) { -#if defined(THREADED_RTS) - if (first) { - // don't yield the first time, we want a chance to run this - // thread for a bit, even if there are others banging at the - // door. - first = rtsFalse; - ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - } else { - // Yield the capability to higher-priority tasks if necessary. - yieldCapability(&cap, task); - } -#endif - -#if defined(THREADED_RTS) - schedulePushWork(cap,task); -#endif - // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). @@ -351,7 +340,14 @@ schedule (Capability *initialCapability, Task *task) #endif /* scheduleDoGC() deletes all the threads */ cap = scheduleDoGC(cap,task,rtsFalse); - break; + + // after scheduleDoGC(), we must be shutting down. Either some + // other Capability did the final GC, or we did it above, + // either way we can fall through to the SCHED_SHUTTING_DOWN + // case now. + ASSERT(sched_state == SCHED_SHUTTING_DOWN); + // fall through + case SCHED_SHUTTING_DOWN: debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN"); // If we are a worker, just exit. If we're a bound thread @@ -365,59 +361,13 @@ schedule (Capability *initialCapability, Task *task) barf("sched_state: %d", sched_state); } -#if defined(THREADED_RTS) - // If the run queue is empty, take a spark and turn it into a thread. - { - if (emptyRunQueue(cap)) { - StgClosure *spark; - spark = findSpark(cap); - if (spark != NULL) { - debugTrace(DEBUG_sched, - "turning spark of closure %p into a thread", - (StgClosure *)spark); - createSparkThread(cap,spark); - } - } - } -#endif // THREADED_RTS + scheduleFindWork(cap); - scheduleStartSignalHandlers(cap); - - // Only check the black holes here if we've nothing else to do. - // During normal execution, the black hole list only gets checked - // at GC time, to avoid repeatedly traversing this possibly long - // list each time around the scheduler. - if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - - scheduleCheckWakeupThreads(cap); - - scheduleCheckBlockedThreads(cap); + /* work pushing, currently relevant only for THREADED_RTS: + (pushes threads, wakes up idle capabilities for stealing) */ + schedulePushWork(cap,task); #if defined(PARALLEL_HASKELL) - /* message processing and work distribution goes here */ - - /* if messages have been buffered... a NOOP in THREADED_RTS */ - scheduleSendPendingMessages(); - - /* If the run queue is empty,...*/ - if (emptyRunQueue(cap)) { - /* ...take one of our own sparks and turn it into a thread */ - scheduleActivateSpark(cap); - - /* if this did not work, try to steal a spark from someone else */ - if (emptyRunQueue(cap)) { - receivedFinish = scheduleGetRemoteWork(cap); - continue; // a new round, (hopefully) with new work - /* - in GUM, this a) sends out a FISH and returns IF no fish is - out already - b) (blocking) awaits and receives messages - - in Eden, this is only the blocking receive, as b) in GUM. - */ - } - } - /* since we perform a blocking receive and continue otherwise, either we never reach here or we definitely have work! */ // from here: non-empty run queue @@ -430,9 +380,10 @@ schedule (Capability *initialCapability, Task *task) above, waits for messages as well! */ processMessages(cap, &receivedFinish); } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL: non-empty run queue! scheduleDetectDeadlock(cap,task); + #if defined(THREADED_RTS) cap = task->cap; // reload cap, it might have changed #endif @@ -445,12 +396,28 @@ schedule (Capability *initialCapability, Task *task) // // win32: might be here due to awaitEvent() being abandoned // as a result of a console event having been delivered. - if ( emptyRunQueue(cap) ) { + +#if defined(THREADED_RTS) + if (first) + { + // XXX: ToDo + // // don't yield the first time, we want a chance to run this + // // thread for a bit, even if there are others banging at the + // // door. + // first = rtsFalse; + // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + } + + yield: + scheduleYield(&cap,task); + if (emptyRunQueue(cap)) continue; // look for work again +#endif + #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) + if ( emptyRunQueue(cap) ) { ASSERT(sched_state >= SCHED_INTERRUPTING); -#endif - continue; // nothing to do } +#endif // // Get a thread to run @@ -493,6 +460,15 @@ schedule (Capability *initialCapability, Task *task) } #endif + // If we're shutting down, and this thread has not yet been + // killed, kill it now. This sometimes happens when a finalizer + // thread is created by the final GC, or a thread previously + // in a foreign call returns. + if (sched_state >= SCHED_INTERRUPTING && + !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) { + deleteThread(cap,t); + } + /* context switches are initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 @@ -522,6 +498,7 @@ run_thread: ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); + ASSERT(t->bound ? t->bound->cap == cap : 1); prev_what_next = t->what_next; @@ -607,7 +584,7 @@ run_thread: debugTrace(DEBUG_sched, "--<< thread %lu (%s) stopped: blocked", (unsigned long)t->id, whatNext_strs[t->what_next]); - continue; + goto yield; } #endif @@ -674,16 +651,117 @@ schedulePreLoop(void) } /* ----------------------------------------------------------------------------- + * scheduleFindWork() + * + * Search for work to do, and handle messages from elsewhere. + * -------------------------------------------------------------------------- */ + +static void +scheduleFindWork (Capability *cap) +{ + scheduleStartSignalHandlers(cap); + + // Only check the black holes here if we've nothing else to do. + // During normal execution, the black hole list only gets checked + // at GC time, to avoid repeatedly traversing this possibly long + // list each time around the scheduler. + if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } + + scheduleCheckWakeupThreads(cap); + + scheduleCheckBlockedThreads(cap); + +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } +#endif + +#if defined(PARALLEL_HASKELL) + // if messages have been buffered... + scheduleSendPendingMessages(); +#endif + +#if defined(PARALLEL_HASKELL) + if (emptyRunQueue(cap)) { + receivedFinish = scheduleGetRemoteWork(cap); + continue; // a new round, (hopefully) with new work + /* + in GUM, this a) sends out a FISH and returns IF no fish is + out already + b) (blocking) awaits and receives messages + + in Eden, this is only the blocking receive, as b) in GUM. + */ + } +#endif +} + +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +shouldYieldCapability (Capability *cap, Task *task) +{ + // we need to yield this capability to someone else if.. + // - another thread is initiating a GC + // - another Task is returning from a foreign call + // - the thread at the head of the run queue cannot be run + // by this Task (it is bound to another Task, or it is unbound + // and this task it bound). + return (waiting_for_gc || + cap->returning_tasks_hd != NULL || + (!emptyRunQueue(cap) && (task->tso == NULL + ? cap->run_queue_hd->bound != NULL + : cap->run_queue_hd->bound != task))); +} + +// This is the single place where a Task goes to sleep. There are +// two reasons it might need to sleep: +// - there are no threads to run +// - we need to yield this Capability to someone else +// (see shouldYieldCapability()) +// +// Careful: the scheduler loop is quite delicate. Make sure you run +// the tests in testsuite/concurrent (all ways) after modifying this, +// and also check the benchmarks in nofib/parallel for regressions. + +static void +scheduleYield (Capability **pcap, Task *task) +{ + Capability *cap = *pcap; + + // if we have work, and we don't need to give up the Capability, continue. + if (!shouldYieldCapability(cap,task) && + (!emptyRunQueue(cap) || + blackholes_need_checking || + sched_state >= SCHED_INTERRUPTING)) + return; + + // otherwise yield (sleep), and keep yielding if necessary. + do { + yieldCapability(&cap,task); + } + while (shouldYieldCapability(cap,task)); + + // note there may still be no threads on the run queue at this + // point, the caller has to check. + + *pcap = cap; + return; +} +#endif + +/* ----------------------------------------------------------------------------- * schedulePushWork() * * Push work to other Capabilities if we have some. * -------------------------------------------------------------------------- */ -#if defined(THREADED_RTS) static void schedulePushWork(Capability *cap USED_IF_THREADS, Task *task USED_IF_THREADS) { + /* following code not for PARALLEL_HASKELL. I kept the call general, + future GUM versions might use pushing in a distributed setup */ +#if defined(THREADED_RTS) + Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; @@ -726,7 +804,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS, StgTSO *prev, *t, *next; rtsBool pushed_to_all; - debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps); + debugTrace(DEBUG_sched, + "cap %d: %s and %d free capabilities, sharing...", + cap->no, + (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)? + "excess threads on run queue":"sparks to share (>=2)", + n_free_caps); i = 0; pushed_to_all = rtsFalse; @@ -760,6 +843,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, cap->run_queue_tl = prev; } +#ifdef SPARK_PUSHING + /* JB I left this code in place, it would work but is not necessary */ + // If there are some free capabilities that we didn't push any // threads to, then try to push a spark to each one. if (!pushed_to_all) { @@ -767,7 +853,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // i is the next free capability to push to for (; i < n_free_caps; i++) { if (emptySparkPoolCap(free_caps[i])) { - spark = findSpark(cap); + spark = tryStealSpark(cap->sparks); if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); newSpark(&(free_caps[i]->r), spark); @@ -775,16 +861,19 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } } } +#endif /* SPARK_PUSHING */ // release the capabilities for (i = 0; i < n_free_caps; i++) { task->cap = free_caps[i]; - releaseCapability(free_caps[i]); + releaseAndWakeupCapability(free_caps[i]); } } task->cap = cap; // reset to point to our Capability. + +#endif /* THREADED_RTS */ + } -#endif /* ---------------------------------------------------------------------------- * Start any pending signal handlers @@ -862,8 +951,13 @@ scheduleCheckBlackHoles (Capability *cap) { ACQUIRE_LOCK(&sched_mutex); if ( blackholes_need_checking ) { - checkBlackHoles(cap); blackholes_need_checking = rtsFalse; + // important that we reset the flag *before* checking the + // blackhole queue, otherwise we could get deadlock. This + // happens as follows: we wake up a thread that + // immediately runs on another Capability, blocks on a + // blackhole, and then we reset the blackholes_need_checking flag. + checkBlackHoles(cap); } RELEASE_LOCK(&sched_mutex); } @@ -907,12 +1001,11 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); + cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/); + // when force_major == rtsTrue. scheduleDoGC sets + // recent_activity to ACTIVITY_DONE_GC and turns off the timer + // signal. - recent_activity = ACTIVITY_DONE_GC; - // disable timer signals (see #1623) - stopTimer(); - if ( !emptyRunQueue(cap) ) return; #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) @@ -965,7 +1058,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) * ------------------------------------------------------------------------- */ #if defined(PARALLEL_HASKELL) -static StgTSO * +static void scheduleSendPendingMessages(void) { @@ -984,43 +1077,28 @@ scheduleSendPendingMessages(void) #endif /* ---------------------------------------------------------------------------- - * Activate spark threads (PARALLEL_HASKELL only) + * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap) { - StgClosure *spark; - -/* We only want to stay here if the run queue is empty and we want some - work. We try to turn a spark into a thread, and add it to the run - queue, from where it will be picked up in the next iteration of the - scheduler loop. -*/ - if (!emptyRunQueue(cap)) - /* In the threaded RTS, another task might have pushed a thread - on our run queue in the meantime ? But would need a lock.. */ - return; - - spark = findSpark(cap); // defined in Sparks.c - - if (spark != NULL) { - debugTrace(DEBUG_sched, - "turning spark of closure %p into a thread", - (StgClosure *)spark); - createSparkThread(cap,spark); // defined in Sparks.c + if (anySparks()) + { + createSparkThread(cap); + debugTrace(DEBUG_sched, "creating a spark thread"); } } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- * Get work from a remote node (PARALLEL_HASKELL only) * ------------------------------------------------------------------------- */ #if defined(PARALLEL_HASKELL) -static rtsBool -scheduleGetRemoteWork(Capability *cap) +static rtsBool /* return value used in PARALLEL_HASKELL only */ +scheduleGetRemoteWork (Capability *cap STG_UNUSED) { #if defined(PARALLEL_HASKELL) rtsBool receivedFinish = rtsFalse; @@ -1057,7 +1135,7 @@ scheduleGetRemoteWork(Capability *cap) #endif /* PARALLEL_HASKELL */ } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- * After running a thread... @@ -1085,7 +1163,7 @@ schedulePostRunThread (Capability *cap, StgTSO *t) // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL); + throwToSingleThreaded_(cap, t, NULL, rtsTrue); ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); } @@ -1404,6 +1482,13 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) nat i; #endif + if (sched_state == SCHED_SHUTTING_DOWN) { + // The final GC has already been done, and the system is + // shutting down. We'll probably deadlock if we try to GC + // now. + return cap; + } + #ifdef THREADED_RTS // In order to GC, there must be no threads running Haskell code. // Therefore, the GC thread needs to hold *all* the capabilities, @@ -1483,6 +1568,24 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) performHeapProfile = rtsFalse; } +#ifdef SPARKBALANCE + /* JB + Once we are all together... this would be the place to balance all + spark pools. No concurrent stealing or adding of new sparks can + occur. Should be defined in Sparks.c. */ + balanceSparkPoolsCaps(n_capabilities, capabilities); +#endif + + if (force_major) + { + // We've just done a major GC and we don't need the timer + // signal turned on any more (#1623). + // NB. do this *before* releasing the Capabilities, to avoid + // deadlocks! + recent_activity = ACTIVITY_DONE_GC; + stopTimer(); + } + #if defined(THREADED_RTS) // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { @@ -1764,7 +1867,7 @@ suspendThread (StgRegTable *reg) suspendTask(cap,task); cap->in_haskell = rtsFalse; - releaseCapability_(cap); + releaseCapability_(cap,rtsFalse); RELEASE_LOCK(&cap->lock); @@ -1922,9 +2025,22 @@ workerStart(Task *task) // schedule() runs without a lock. cap = schedule(cap,task); - // On exit from schedule(), we have a Capability. - releaseCapability(cap); + // On exit from schedule(), we have a Capability, but possibly not + // the same one we started with. + + // During shutdown, the requirement is that after all the + // Capabilities are shut down, all workers that are shutting down + // have finished workerTaskStop(). This is why we hold on to + // cap->lock until we've finished workerTaskStop() below. + // + // There may be workers still involved in foreign calls; those + // will just block in waitForReturnCapability() because the + // Capability has been shut down. + // + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap,rtsFalse); workerTaskStop(task); + RELEASE_LOCK(&cap->lock); } #endif @@ -2027,20 +2143,30 @@ exitScheduler( shutdownCapability(&capabilities[i], task, wait_foreign); } boundTaskExiting(task); - stopTaskManager(); } -#else - freeCapability(&MainCapability); #endif } void freeScheduler( void ) { - freeTaskManager(); - if (n_capabilities != 1) { - stgFree(capabilities); + nat still_running; + + ACQUIRE_LOCK(&sched_mutex); + still_running = freeTaskManager(); + // We can only free the Capabilities if there are no Tasks still + // running. We might have a Task about to return from a foreign + // call into waitForReturnCapability(), for example (actually, + // this should be the *only* thing that a still-running Task can + // do at this point, and it will block waiting for the + // Capability). + if (still_running == 0) { + freeCapabilities(); + if (n_capabilities != 1) { + stgFree(capabilities); + } } + RELEASE_LOCK(&sched_mutex); #if defined(THREADED_RTS) closeMutex(&sched_mutex); #endif @@ -2125,10 +2251,17 @@ threadStackOverflow(Capability *cap, StgTSO *tso) } /* Try to double the current stack size. If that takes us over the - * maximum stack size for this thread, then use the maximum instead. - * Finally round up so the TSO ends up as a whole number of blocks. + * maximum stack size for this thread, then use the maximum instead + * (that is, unless we're already at or over the max size and we + * can't raise the StackOverflow exception (see above), in which + * case just double the size). Finally round up so the TSO ends up as + * a whole number of blocks. */ - new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size); + if (tso->stack_size >= tso->max_stack_size) { + new_stack_size = tso->stack_size * 2; + } else { + new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size); + } new_tso_size = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + TSO_STRUCT_SIZE)/sizeof(W_); new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */