X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=578328dbcbd328dc4d79ecbbc71bf36be884fd62;hb=0372ac231bd18e993a2533f784805046876d5527;hp=0de7679d1ae9ec9dc9cefdfc71a4d43cc4a8da86;hpb=9348149e3cfbda59690c6c6f16dd38008f0fa59d;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 0de7679..578328d 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -47,6 +47,9 @@ #include "Capability.h" #include "Task.h" #include "AwaitEvent.h" +#if defined(mingw32_HOST_OS) +#include "win32/IOManager.h" +#endif #ifdef HAVE_SYS_TYPES_H #include @@ -69,20 +72,6 @@ # define STATIC_INLINE static #endif -#ifdef THREADED_RTS -#define USED_WHEN_THREADED_RTS -#define USED_WHEN_NON_THREADED_RTS STG_UNUSED -#else -#define USED_WHEN_THREADED_RTS STG_UNUSED -#define USED_WHEN_NON_THREADED_RTS -#endif - -#ifdef SMP -#define USED_WHEN_SMP -#else -#define USED_WHEN_SMP STG_UNUSED -#endif - /* ----------------------------------------------------------------------------- * Global variables * -------------------------------------------------------------------------- */ @@ -186,10 +175,10 @@ rtsBool shutting_down_scheduler = rtsFalse; /* * This mutex protects most of the global scheduler data in - * the THREADED_RTS and (inc. SMP) runtime. + * the THREADED_RTS runtime. */ #if defined(THREADED_RTS) -Mutex sched_mutex = INIT_MUTEX_VAR; +Mutex sched_mutex; #endif #if defined(PARALLEL_HASKELL) @@ -210,7 +199,10 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); -static void scheduleStartSignalHandlers (void); +#if defined(THREADED_RTS) +static void schedulePushWork(Capability *cap, Task *task); +#endif +static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); @@ -235,7 +227,8 @@ static void scheduleHandleThreadBlocked( StgTSO *t ); static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, StgTSO *t ); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); -static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major); +static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major, + void (*get_roots)(evac_fn)); static void unblockThread(Capability *cap, StgTSO *tso); static rtsBool checkBlackHoles(Capability *cap); @@ -244,7 +237,7 @@ static void AllRoots(evac_fn evac); static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically); + rtsBool stop_at_atomically, StgPtr stop_here); static void deleteThread (Capability *cap, StgTSO *tso); static void deleteRunQueue (Capability *cap); @@ -339,7 +332,9 @@ schedule (Capability *initialCapability, Task *task) #endif nat prev_what_next; rtsBool ready_to_gc; +#if defined(THREADED_RTS) rtsBool first = rtsTrue; +#endif cap = initialCapability; @@ -379,13 +374,17 @@ schedule (Capability *initialCapability, Task *task) // thread for a bit, even if there are others banging at the // door. first = rtsFalse; - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); } else { // Yield the capability to higher-priority tasks if necessary. yieldCapability(&cap, task); } #endif +#if defined(THREADED_RTS) + schedulePushWork(cap,task); +#endif + // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). @@ -403,12 +402,15 @@ schedule (Capability *initialCapability, Task *task) // if (interrupted) { deleteRunQueue(cap); +#if defined(THREADED_RTS) + discardSparksCap(cap); +#endif if (shutting_down_scheduler) { IF_DEBUG(scheduler, sched_belch("shutting down")); // If we are a worker, just exit. If we're a bound thread // then we will exit below when we've removed our TSO from // the run queue. - if (task->tso == NULL) { + if (task->tso == NULL && emptyRunQueue(cap)) { return cap; } } else { @@ -416,29 +418,23 @@ schedule (Capability *initialCapability, Task *task) } } -#if defined(not_yet) && defined(SMP) - // - // Top up the run queue from our spark pool. We try to make the - // number of threads in the run queue equal to the number of - // free capabilities. - // +#if defined(THREADED_RTS) + // If the run queue is empty, take a spark and turn it into a thread. { - StgClosure *spark; - if (emptyRunQueue()) { - spark = findSpark(rtsFalse); - if (spark == NULL) { - break; /* no more sparks in the pool */ - } else { - createSparkThread(spark); + if (emptyRunQueue(cap)) { + StgClosure *spark; + spark = findSpark(cap); + if (spark != NULL) { IF_DEBUG(scheduler, - sched_belch("==^^ turning spark of closure %p into a thread", + sched_belch("turning spark of closure %p into a thread", (StgClosure *)spark)); + createSparkThread(cap,spark); } } } -#endif // SMP +#endif // THREADED_RTS - scheduleStartSignalHandlers(); + scheduleStartSignalHandlers(cap); // Only check the black holes here if we've nothing else to do. // During normal execution, the black hole list only gets checked @@ -449,6 +445,9 @@ schedule (Capability *initialCapability, Task *task) scheduleCheckBlockedThreads(cap); scheduleDetectDeadlock(cap,task); +#if defined(THREADED_RTS) + cap = task->cap; // reload cap, it might have changed +#endif // Normally, the only way we can get here with no threads to // run is if a keyboard interrupt received during @@ -569,6 +568,8 @@ run_thread: errno = t->saved_errno; cap->in_haskell = rtsTrue; + dirtyTSO(t); + recent_activity = ACTIVITY_YES; switch (prev_what_next) { @@ -599,24 +600,38 @@ run_thread: cap->in_haskell = rtsFalse; -#ifdef SMP + // The TSO might have moved, eg. if it re-entered the RTS and a GC + // happened. So find the new location: + t = cap->r.rCurrentTSO; + + // We have run some Haskell code: there might be blackhole-blocked + // threads to wake up now. + // Lock-free test here should be ok, we're just setting a flag. + if ( blackhole_queue != END_TSO_QUEUE ) { + blackholes_need_checking = rtsTrue; + } + + // And save the current errno in this thread. + // XXX: possibly bogus for SMP because this thread might already + // be running again, see code below. + t->saved_errno = errno; + +#if defined(THREADED_RTS) // If ret is ThreadBlocked, and this Task is bound to the TSO that // blocked, we are in limbo - the TSO is now owned by whatever it // is blocked on, and may in fact already have been woken up, // perhaps even on a different Capability. It may be the case // that task->cap != cap. We better yield this Capability // immediately and return to normaility. - if (ret == ThreadBlocked) continue; + if (ret == ThreadBlocked) { + IF_DEBUG(scheduler, + sched_belch("--<< thread %d (%s) stopped: blocked\n", + t->id, whatNext_strs[t->what_next])); + continue; + } #endif - ASSERT_CAPABILITY_INVARIANTS(cap,task); - - // The TSO might have moved, eg. if it re-entered the RTS and a GC - // happened. So find the new location: - t = cap->r.rCurrentTSO; - - // And save the current errno in this thread. - t->saved_errno = errno; + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); // ---------------------------------------------------------------------- @@ -626,13 +641,6 @@ run_thread: CCCS = CCS_SYSTEM; #endif - // We have run some Haskell code: there might be blackhole-blocked - // threads to wake up now. - // Lock-free test here should be ok, we're just setting a flag. - if ( blackhole_queue != END_TSO_QUEUE ) { - blackholes_need_checking = rtsTrue; - } - #if defined(THREADED_RTS) IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());); #elif !defined(GRAN) && !defined(PARALLEL_HASKELL) @@ -665,6 +673,7 @@ run_thread: case ThreadFinished: if (scheduleHandleThreadFinished(cap, task, t)) return cap; + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); break; default: @@ -672,7 +681,12 @@ run_thread: } if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } - if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); } + if (ready_to_gc) { + scheduleDoGC(cap,task,rtsFalse,GetRoots); +#if defined(THREADED_RTS) + cap = task->cap; // reload cap, it might have changed +#endif + } } /* end of while() */ IF_PAR_DEBUG(verbose, @@ -705,26 +719,139 @@ schedulePreLoop(void) #endif } +/* ----------------------------------------------------------------------------- + * schedulePushWork() + * + * Push work to other Capabilities if we have some. + * -------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +static void +schedulePushWork(Capability *cap USED_IF_THREADS, + Task *task USED_IF_THREADS) +{ + Capability *free_caps[n_capabilities], *cap0; + nat i, n_free_caps; + + // Check whether we have more threads on our run queue, or sparks + // in our pool, that we could hand to another Capability. + if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) + && sparkPoolSizeCap(cap) < 2) { + return; + } + + // First grab as many free Capabilities as we can. + for (i=0, n_free_caps=0; i < n_capabilities; i++) { + cap0 = &capabilities[i]; + if (cap != cap0 && tryGrabCapability(cap0,task)) { + if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) { + // it already has some work, we just grabbed it at + // the wrong moment. Or maybe it's deadlocked! + releaseCapability(cap0); + } else { + free_caps[n_free_caps++] = cap0; + } + } + } + + // we now have n_free_caps free capabilities stashed in + // free_caps[]. Share our run queue equally with them. This is + // probably the simplest thing we could do; improvements we might + // want to do include: + // + // - giving high priority to moving relatively new threads, on + // the gournds that they haven't had time to build up a + // working set in the cache on this CPU/Capability. + // + // - giving low priority to moving long-lived threads + + if (n_free_caps > 0) { + StgTSO *prev, *t, *next; + rtsBool pushed_to_all; + + IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps)); + + i = 0; + pushed_to_all = rtsFalse; + + if (cap->run_queue_hd != END_TSO_QUEUE) { + prev = cap->run_queue_hd; + t = prev->link; + prev->link = END_TSO_QUEUE; + for (; t != END_TSO_QUEUE; t = next) { + next = t->link; + t->link = END_TSO_QUEUE; + if (t->what_next == ThreadRelocated + || t->bound == task) { // don't move my bound thread + prev->link = t; + prev = t; + } else if (i == n_free_caps) { + pushed_to_all = rtsTrue; + i = 0; + // keep one for us + prev->link = t; + prev = t; + } else { + IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no)); + appendToRunQueue(free_caps[i],t); + if (t->bound) { t->bound->cap = free_caps[i]; } + i++; + } + } + cap->run_queue_tl = prev; + } + + // If there are some free capabilities that we didn't push any + // threads to, then try to push a spark to each one. + if (!pushed_to_all) { + StgClosure *spark; + // i is the next free capability to push to + for (; i < n_free_caps; i++) { + if (emptySparkPoolCap(free_caps[i])) { + spark = findSpark(cap); + if (spark != NULL) { + IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no)); + newSpark(&(free_caps[i]->r), spark); + } + } + } + } + + // release the capabilities + for (i = 0; i < n_free_caps; i++) { + task->cap = free_caps[i]; + releaseCapability(free_caps[i]); + } + } + task->cap = cap; // reset to point to our Capability. +} +#endif + /* ---------------------------------------------------------------------------- * Start any pending signal handlers * ------------------------------------------------------------------------- */ +#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS)) static void -scheduleStartSignalHandlers(void) +scheduleStartSignalHandlers(Capability *cap) { -#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) if (signals_pending()) { // safe outside the lock - startSignalHandlers(); + startSignalHandlers(cap); } -#endif } +#else +static void +scheduleStartSignalHandlers(Capability *cap STG_UNUSED) +{ +} +#endif /* ---------------------------------------------------------------------------- * Check for blocked threads that can be woken up. * ------------------------------------------------------------------------- */ static void -scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS) +scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) { #if !defined(THREADED_RTS) // @@ -766,7 +893,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) { #if defined(PARALLEL_HASKELL) - // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL + // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL return; #endif @@ -795,12 +922,16 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - scheduleDoGC( cap, task, rtsTrue/*force major GC*/ ); + scheduleDoGC( cap, task, rtsTrue/*force major GC*/, GetRoots ); +#if defined(THREADED_RTS) + cap = task->cap; // reload cap, it might have changed +#endif + recent_activity = ACTIVITY_DONE_GC; if ( !emptyRunQueue(cap) ) return; -#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) +#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS)) /* If we have user-installed signal handlers, then wait * for signals to arrive rather then bombing out with a * deadlock. @@ -812,7 +943,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) awaitUserSignals(); if (signals_pending()) { - startSignalHandlers(); + startSignalHandlers(cap); } // either we have threads to run, or we were interrupted: @@ -1385,7 +1516,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) if (cap->r.rCurrentNursery->u.back != NULL) { cap->r.rCurrentNursery->u.back->link = bd; } else { -#if !defined(SMP) +#if !defined(THREADED_RTS) ASSERT(g0s0->blocks == cap->r.rCurrentNursery && g0s0 == cap->r.rNursery); #endif @@ -1462,11 +1593,10 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) /* enlarge the stack */ StgTSO *new_t = threadStackOverflow(cap, t); - /* This TSO has moved, so update any pointers to it from the - * main thread stack. It better not be on any other queues... - * (it shouldn't be). + /* The TSO attached to this Task may have moved, so update the + * pointer to it. */ - if (task->tso != NULL) { + if (task->tso == t) { task->tso = new_t; } pushOnRunQueue(cap,new_t); @@ -1591,9 +1721,9 @@ scheduleHandleThreadBlocked( StgTSO *t // has tidied up its stack and placed itself on whatever queue // it needs to be on. -#if !defined(SMP) +#if !defined(THREADED_RTS) ASSERT(t->why_blocked != NotBlocked); - // This might not be true under SMP: we don't have + // This might not be true under THREADED_RTS: we don't have // exclusive access to this TSO, so someone might have // woken it up by now. This actually happens: try // conc023 +RTS -N2. @@ -1739,8 +1869,19 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) if (performHeapProfile || (RtsFlags.ProfFlags.profileInterval==0 && RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) { + + // checking black holes is necessary before GC, otherwise + // there may be threads that are unreachable except by the + // blackhole queue, which the GC will consider to be + // deadlocked. + scheduleCheckBlackHoles(&MainCapability); + + IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census")); GarbageCollect(GetRoots, rtsTrue); + + IF_DEBUG(scheduler, sched_belch("performing heap census")); heapCensus(); + performHeapProfile = rtsFalse; return rtsTrue; // true <=> we already GC'd } @@ -1753,16 +1894,17 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) * -------------------------------------------------------------------------- */ static void -scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) +scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, + rtsBool force_major, void (*get_roots)(evac_fn)) { StgTSO *t; -#ifdef SMP +#ifdef THREADED_RTS static volatile StgWord waiting_for_gc; rtsBool was_waiting; nat i; #endif -#ifdef SMP +#ifdef THREADED_RTS // In order to GC, there must be no threads running Haskell code. // Therefore, the GC thread needs to hold *all* the capabilities, // and release them after the GC has completed. @@ -1775,7 +1917,13 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) // was_waiting = cas(&waiting_for_gc, 0, 1); - if (was_waiting) return; + if (was_waiting) { + do { + IF_DEBUG(scheduler, sched_belch("someone else is trying to GC...")); + if (cap) yieldCapability(&cap,task); + } while (waiting_for_gc); + return; // NOTE: task->cap might have changed here + } for (i=0; i < n_capabilities; i++) { IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities)); @@ -1787,6 +1935,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) // all the Capabilities, but even so it's a slightly // unsavoury invariant. task->cap = pcap; + context_switch = 1; waitForReturnCapability(&pcap, task); if (pcap != &capabilities[i]) { barf("scheduleDoGC: got the wrong capability"); @@ -1817,7 +1966,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - raiseAsync_(cap, t, NULL, rtsTrue); + raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL); #ifdef REG_R1 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); @@ -1829,7 +1978,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) } // so this happens periodically: - scheduleCheckBlackHoles(cap); + if (cap) scheduleCheckBlackHoles(cap); IF_DEBUG(scheduler, printAllThreads()); @@ -1841,9 +1990,9 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) #if defined(THREADED_RTS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif - GarbageCollect(GetRoots, force_major); + GarbageCollect(get_roots, force_major); -#if defined(SMP) +#if defined(THREADED_RTS) // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { if (cap != &capabilities[i]) { @@ -1851,7 +2000,11 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) releaseCapability(&capabilities[i]); } } - task->cap = cap; + if (cap) { + task->cap = cap; + } else { + task->cap = NULL; + } #endif #if defined(GRAN) @@ -1886,7 +2039,7 @@ rtsSupportsBoundThreads(void) * ------------------------------------------------------------------------- */ StgBool -isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS) +isThreadBound(StgTSO* tso USED_IF_THREADS) { #if defined(THREADED_RTS) return (tso->bound != NULL); @@ -1898,7 +2051,7 @@ isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS) * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -#if !defined(mingw32_HOST_OS) && !defined(SMP) +#if !defined(mingw32_HOST_OS) #define FORKPROCESS_PRIMOP_SUPPORTED #endif @@ -1914,11 +2067,18 @@ forkProcess(HsStablePtr *entry ) { #ifdef FORKPROCESS_PRIMOP_SUPPORTED + Task *task; pid_t pid; StgTSO* t,*next; - Task *task; Capability *cap; +#if defined(THREADED_RTS) + if (RtsFlags.ParFlags.nNodes > 1) { + errorBelch("forking not supported with +RTS -N greater than 1"); + stg_exit(EXIT_FAILURE); + } +#endif + IF_DEBUG(scheduler,sched_belch("forking!")); // ToDo: for SMP, we should probably acquire *all* the capabilities @@ -1945,12 +2105,22 @@ forkProcess(HsStablePtr *entry deleteThreadImmediately(cap,t); } - // wipe the main thread list - while ((task = all_tasks) != NULL) { - all_tasks = task->all_link; - discardTask(task); + // wipe the task list + ACQUIRE_LOCK(&sched_mutex); + for (task = all_tasks; task != NULL; task=task->all_link) { + if (task != cap->running_task) discardTask(task); } - + RELEASE_LOCK(&sched_mutex); + + cap->suspended_ccalling_tasks = NULL; + +#if defined(THREADED_RTS) + // wipe our spare workers list. + cap->spare_workers = NULL; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; +#endif + cap = rts_evalStableIO(cap, entry, NULL); // run the action rts_checkSchedStatus("forkProcess",cap); @@ -2050,7 +2220,7 @@ suspendThread (StgRegTable *reg) // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; - threadPaused(tso); + threadPaused(cap,tso); if(tso->blocked_exceptions == NULL) { tso->why_blocked = BlockedOnCCall; @@ -2116,6 +2286,9 @@ resumeThread (void *task_) cap->in_haskell = rtsTrue; errno = saved_errno; + /* We might have GC'd, mark the TSO dirty again */ + dirtyTSO(tso); + return &cap->r; } @@ -2229,6 +2402,7 @@ createThread(Capability *cap, nat size) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; + tso->flags = TSO_DIRTY; tso->saved_errno = 0; tso->bound = NULL; @@ -2458,7 +2632,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) cap = schedule(cap,task); ASSERT(task->stat != NoStatus); - ASSERT_CAPABILITY_INVARIANTS(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id)); return cap; @@ -2539,13 +2713,17 @@ initScheduler(void) /* A capability holds the state a native thread needs in * order to execute STG code. At least one capability is - * floating around (only SMP builds have more than one). + * floating around (only THREADED_RTS builds have more than one). */ initCapabilities(); initTaskManager(); -#if defined(SMP) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + initSparkPools(); +#endif + +#if defined(THREADED_RTS) /* * Eagerly start one worker to run each Capability, except for * Capability 0. The idea is that we're probably going to start a @@ -2564,10 +2742,6 @@ initScheduler(void) } #endif -#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL) - initSparkPools(); -#endif - RELEASE_LOCK(&sched_mutex); } @@ -2649,15 +2823,15 @@ GetRoots( evac_fn evac ) } #if !defined(THREADED_RTS) - evac((StgClosure **)&blocked_queue_hd); - evac((StgClosure **)&blocked_queue_tl); - evac((StgClosure **)&sleeping_queue); + evac((StgClosure **)(void *)&blocked_queue_hd); + evac((StgClosure **)(void *)&blocked_queue_tl); + evac((StgClosure **)(void *)&sleeping_queue); #endif #endif - evac((StgClosure **)&blackhole_queue); + // evac((StgClosure **)&blackhole_queue); -#if defined(PARALLEL_HASKELL) || defined(GRAN) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN) markSparkQueue(evac); #endif @@ -2682,26 +2856,32 @@ GetRoots( evac_fn evac ) static void (*extra_roots)(evac_fn); +static void +performGC_(rtsBool force_major, void (*get_roots)(evac_fn)) +{ + Task *task = myTask(); + + if (task == NULL) { + ACQUIRE_LOCK(&sched_mutex); + task = newBoundTask(); + RELEASE_LOCK(&sched_mutex); + scheduleDoGC(NULL,task,force_major, get_roots); + boundTaskExiting(task); + } else { + scheduleDoGC(NULL,task,force_major, get_roots); + } +} + void performGC(void) { -#ifdef THREADED_RTS - // ToDo: we have to grab all the capabilities here. - errorBelch("performGC not supported in threaded RTS (yet)"); - stg_exit(EXIT_FAILURE); -#endif - /* Obligated to hold this lock upon entry */ - GarbageCollect(GetRoots,rtsFalse); + performGC_(rtsFalse, GetRoots); } void performMajorGC(void) { -#ifdef THREADED_RTS - errorBelch("performMayjorGC not supported in threaded RTS (yet)"); - stg_exit(EXIT_FAILURE); -#endif - GarbageCollect(GetRoots,rtsTrue); + performGC_(rtsTrue, GetRoots); } static void @@ -2714,12 +2894,8 @@ AllRoots(evac_fn evac) void performGCWithRoots(void (*get_roots)(evac_fn)) { -#ifdef THREADED_RTS - errorBelch("performGCWithRoots not supported in threaded RTS (yet)"); - stg_exit(EXIT_FAILURE); -#endif extra_roots = get_roots; - GarbageCollect(AllRoots,rtsFalse); + performGC_(rtsFalse, AllRoots); } /* ----------------------------------------------------------------------------- @@ -2764,7 +2940,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); @@ -3483,24 +3659,32 @@ checkBlackHoles (Capability *cap) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * - * ToDo: in SMP mode, this function is only safe if either (a) we hold - * all the Capabilities (eg. in GC), or (b) we own the Capability that - * the TSO is currently blocked on or on the run queue of. + * ToDo: in THREADED_RTS mode, this function is only safe if either + * (a) we hold all the Capabilities (eg. in GC, or if there is only + * one Capability), or (b) we own the Capability that the TSO is + * currently blocked on or on the run queue of. * * -------------------------------------------------------------------------- */ void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception) { - raiseAsync_(cap, tso, exception, rtsFalse); + raiseAsync_(cap, tso, exception, rtsFalse, NULL); +} + +void +suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here) +{ + raiseAsync_(cap, tso, NULL, rtsFalse, stop_here); } static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically) + rtsBool stop_at_atomically, StgPtr stop_here) { StgRetInfoTable *info; - StgPtr sp; + StgPtr sp, frame; + nat i; // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { @@ -3513,6 +3697,9 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, // Remove it from any blocking queues unblockThread(cap,tso); + // mark it dirty; we're about to change its stack. + dirtyTSO(tso); + sp = tso->sp; // The stack freezing code assumes there's a closure pointer on @@ -3525,8 +3712,8 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, sp[0] = (W_)&stg_dummy_ret_closure; } - while (1) { - nat i; + frame = sp + 1; + while (stop_here == NULL || frame < stop_here) { // 1. Let the top of the stack be the "current closure" // @@ -3546,95 +3733,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, // NB: if we pass an ATOMICALLY_FRAME then abort the associated // transaction - - StgPtr frame; - - frame = sp + 1; info = get_ret_itbl((StgClosure *)frame); - - while (info->i.type != UPDATE_FRAME - && (info->i.type != CATCH_FRAME || exception == NULL) - && info->i.type != STOP_FRAME - && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse)) - { - if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) { - // IF we find an ATOMICALLY_FRAME then we abort the - // current transaction and propagate the exception. In - // this case (unlike ordinary exceptions) we do not care - // whether the transaction is valid or not because its - // possible validity cannot have caused the exception - // and will not be visible after the abort. - IF_DEBUG(stm, - debugBelch("Found atomically block delivering async exception\n")); - stmAbortTransaction(tso -> trec); - tso -> trec = stmGetEnclosingTRec(tso -> trec); - } - frame += stack_frame_sizeW((StgClosure *)frame); - info = get_ret_itbl((StgClosure *)frame); - } - + switch (info->i.type) { - - case ATOMICALLY_FRAME: - ASSERT(stop_at_atomically); - ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); - stmCondemnTransaction(tso -> trec); -#ifdef REG_R1 - tso->sp = frame; -#else - // R1 is not a register: the return convention for IO in - // this case puts the return value on the stack, so we - // need to set up the stack to return to the atomically - // frame properly... - tso->sp = frame - 2; - tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? - tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; -#endif - tso->what_next = ThreadRunGHC; - return; - case CATCH_FRAME: - // If we find a CATCH_FRAME, and we've got an exception to raise, - // then build the THUNK raise(exception), and leave it on - // top of the CATCH_FRAME ready to enter. - // - { -#ifdef PROFILING - StgCatchFrame *cf = (StgCatchFrame *)frame; -#endif - StgThunk *raise; - - // we've got an exception to raise, so let's pass it to the - // handler in this frame. - // - raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); - TICK_ALLOC_SE_THK(1,0); - SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); - raise->payload[0] = exception; - - // throw away the stack from Sp up to the CATCH_FRAME. - // - sp = frame - 1; - - /* Ensure that async excpetions are blocked now, so we don't get - * a surprise exception before we get around to executing the - * handler. - */ - if (tso->blocked_exceptions == NULL) { - tso->blocked_exceptions = END_TSO_QUEUE; - } - - /* Put the newly-built THUNK on top of the stack, ready to execute - * when the thread restarts. - */ - sp[0] = (W_)raise; - sp[-1] = (W_)&stg_enter_info; - tso->sp = sp-1; - tso->what_next = ThreadRunGHC; - IF_DEBUG(sanity, checkTSO(tso)); - return; - } - case UPDATE_FRAME: { StgAP_STACK * ap; @@ -3665,9 +3767,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, printObj((StgClosure *)ap); ); - // Replace the updatee with an indirection - happily - // this will also wake up any threads currently - // waiting on the result. + // Replace the updatee with an indirection // // Warning: if we're in a loop, more than one update frame on // the stack may point to the same object. Be careful not to @@ -3684,21 +3784,106 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, } sp += sizeofW(StgUpdateFrame) - 1; sp[0] = (W_)ap; // push onto stack - break; + frame = sp + 1; + continue; //no need to bump frame } - + case STOP_FRAME: // We've stripped the entire stack, the thread is now dead. - sp += sizeofW(StgStopFrame); tso->what_next = ThreadKilled; - tso->sp = sp; + tso->sp = frame + sizeofW(StgStopFrame); return; + + case CATCH_FRAME: + // If we find a CATCH_FRAME, and we've got an exception to raise, + // then build the THUNK raise(exception), and leave it on + // top of the CATCH_FRAME ready to enter. + // + { +#ifdef PROFILING + StgCatchFrame *cf = (StgCatchFrame *)frame; +#endif + StgThunk *raise; + + if (exception == NULL) break; + + // we've got an exception to raise, so let's pass it to the + // handler in this frame. + // + raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); + TICK_ALLOC_SE_THK(1,0); + SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); + raise->payload[0] = exception; + + // throw away the stack from Sp up to the CATCH_FRAME. + // + sp = frame - 1; + + /* Ensure that async excpetions are blocked now, so we don't get + * a surprise exception before we get around to executing the + * handler. + */ + if (tso->blocked_exceptions == NULL) { + tso->blocked_exceptions = END_TSO_QUEUE; + } + + /* Put the newly-built THUNK on top of the stack, ready to execute + * when the thread restarts. + */ + sp[0] = (W_)raise; + sp[-1] = (W_)&stg_enter_info; + tso->sp = sp-1; + tso->what_next = ThreadRunGHC; + IF_DEBUG(sanity, checkTSO(tso)); + return; + } + + case ATOMICALLY_FRAME: + if (stop_at_atomically) { + ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + stmCondemnTransaction(cap, tso -> trec); +#ifdef REG_R1 + tso->sp = frame; +#else + // R1 is not a register: the return convention for IO in + // this case puts the return value on the stack, so we + // need to set up the stack to return to the atomically + // frame properly... + tso->sp = frame - 2; + tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? + tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; +#endif + tso->what_next = ThreadRunGHC; + return; + } + // Not stop_at_atomically... fall through and abort the + // transaction. + + case CATCH_RETRY_FRAME: + // IF we find an ATOMICALLY_FRAME then we abort the + // current transaction and propagate the exception. In + // this case (unlike ordinary exceptions) we do not care + // whether the transaction is valid or not because its + // possible validity cannot have caused the exception + // and will not be visible after the abort. + IF_DEBUG(stm, + debugBelch("Found atomically block delivering async exception\n")); + StgTRecHeader *trec = tso -> trec; + StgTRecHeader *outer = stmGetEnclosingTRec(trec); + stmAbortTransaction(cap, trec); + tso -> trec = outer; + break; default: - barf("raiseAsync"); + break; } + + // move on to the next stack frame + frame += stack_frame_sizeW((StgClosure *)frame); } - barf("raiseAsync"); + + // if we got here, then we stopped at stop_here + ASSERT(stop_here != NULL); } /* ----------------------------------------------------------------------------- @@ -3758,7 +3943,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // thunks which are currently under evaluataion. // - // + // OLD COMMENT (we don't have MIN_UPD_SIZE now): // LDV profiling: stg_raise_info has THUNK as its closure // type. Since a THUNK takes at least MIN_UPD_SIZE words in its // payload, MIN_UPD_SIZE is more approprate than 1. It seems that @@ -3786,7 +3971,7 @@ raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); + (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } @@ -3986,25 +4171,37 @@ printThreadBlockage(StgTSO *tso) } } -static void -printThreadStatus(StgTSO *tso) +void +printThreadStatus(StgTSO *t) { - switch (tso->what_next) { - case ThreadKilled: - debugBelch("has been killed"); - break; - case ThreadComplete: - debugBelch("has completed"); - break; - default: - printThreadBlockage(tso); - } + debugBelch("\tthread %4d @ %p ", t->id, (void *)t); + { + void *label = lookupThreadLabel(t->id); + if (label) debugBelch("[\"%s\"] ",(char *)label); + } + if (t->what_next == ThreadRelocated) { + debugBelch("has been relocated...\n"); + } else { + switch (t->what_next) { + case ThreadKilled: + debugBelch("has been killed"); + break; + case ThreadComplete: + debugBelch("has completed"); + break; + default: + printThreadBlockage(t); + } + debugBelch("\n"); + } } void printAllThreads(void) { - StgTSO *t; + StgTSO *t, *next; + nat i; + Capability *cap; # if defined(GRAN) char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; @@ -4022,20 +4219,24 @@ printAllThreads(void) debugBelch("all threads:\n"); # endif - for (t = all_threads; t != END_TSO_QUEUE; ) { - debugBelch("\tthread %4d @ %p ", t->id, (void *)t); - { - void *label = lookupThreadLabel(t->id); - if (label) debugBelch("[\"%s\"] ",(char *)label); - } - if (t->what_next == ThreadRelocated) { - debugBelch("has been relocated...\n"); - t = t->link; - } else { - printThreadStatus(t); - debugBelch("\n"); - t = t->global_link; - } + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + debugBelch("threads on capability %d:\n", cap->no); + for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) { + printThreadStatus(t); + } + } + + debugBelch("other threads:\n"); + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + if (t->why_blocked != NotBlocked) { + printThreadStatus(t); + } + if (t->what_next == ThreadRelocated) { + next = t->link; + } else { + next = t->global_link; + } } } @@ -4045,13 +4246,7 @@ printThreadQueue(StgTSO *t) { nat i = 0; for (; t != END_TSO_QUEUE; t = t->link) { - debugBelch("\tthread %d @ %p ", t->id, (void *)t); - if (t->what_next == ThreadRelocated) { - debugBelch("has been relocated...\n"); - } else { - printThreadStatus(t); - debugBelch("\n"); - } + printThreadStatus(t); i++; } debugBelch("%d threads on queue\n", i);