X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=rts%2FSchedule.c;h=8c254cc56a6d3011458d3502bee7ec0f530cf3e7;hb=66579ff945831c5fc9a17c58c722ff01f2268d76;hp=dee6a557a2f9feb1672892c9a737a594eb3c060e;hpb=b5d7276113c4eaf0dfbb8d12c80c1111d47033cd;p=ghc-hetmet.git diff --git a/rts/Schedule.c b/rts/Schedule.c index dee6a55..8c254cc 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -31,15 +31,9 @@ #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" -#if defined(GRAN) || defined(PARALLEL_HASKELL) -# include "GranSimRts.h" -# include "GranSim.h" -# include "ParallelRts.h" -# include "Parallel.h" -# include "ParallelDebug.h" -# include "FetchMe.h" -# include "HLC.h" -#endif + +/* PARALLEL_HASKELL includes go here */ + #include "Sparks.h" #include "Capability.h" #include "Task.h" @@ -77,28 +71,6 @@ * Global variables * -------------------------------------------------------------------------- */ -#if defined(GRAN) - -StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */ -/* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */ - -/* - In GranSim we have a runnable and a blocked queue for each processor. - In order to minimise code changes new arrays run_queue_hds/tls - are created. run_queue_hd is then a short cut (macro) for - run_queue_hds[CurrentProc] (see GranSim.h). - -- HWL -*/ -StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC]; -StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC]; -StgTSO *ccalling_threadss[MAX_PROC]; -/* We use the same global list of threads (all_threads) in GranSim as in - the std RTS (i.e. we are cheating). However, we don't use this list in - the GranSim specific code at the moment (so we are only potentially - cheating). */ - -#else /* !GRAN */ - #if !defined(THREADED_RTS) // Blocked/sleeping thrads StgTSO *blocked_queue_hd = NULL; @@ -110,7 +82,6 @@ StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? * LOCK: sched_mutex+capability, or all capabilities */ StgTSO *blackhole_queue = NULL; -#endif /* The blackhole_queue should be checked for threads to wake up. See * Schedule.h for more thorough comment. @@ -118,17 +89,6 @@ StgTSO *blackhole_queue = NULL; */ rtsBool blackholes_need_checking = rtsFalse; -/* Linked list of all threads. - * Used for detecting garbage collected threads. - * LOCK: sched_mutex+capability, or all capabilities - */ -StgTSO *all_threads = NULL; - -/* flag set by signal handler to precipitate a context switch - * LOCK: none (just an advisory flag) - */ -int context_switch = 0; - /* flag that tracks whether we have done any execution in this time slice. * LOCK: currently none, perhaps we should lock (but needs to be * updated in the fast path of the scheduler). @@ -140,10 +100,6 @@ nat recent_activity = ACTIVITY_YES; */ rtsBool sched_state = SCHED_RUNNING; -#if defined(GRAN) -StgTSO *CurrentTSO; -#endif - /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually * exists - earlier gccs apparently didn't. * -= chak @@ -165,12 +121,6 @@ rtsBool shutting_down_scheduler = rtsFalse; Mutex sched_mutex; #endif -#if defined(PARALLEL_HASKELL) -StgTSO *LastTSO; -rtsTime TimeOfLastYield; -rtsBool emitSchedule = rtsTrue; -#endif - #if !defined(mingw32_HOST_OS) #define FORKPROCESS_PRIMOP_SUPPORTED #endif @@ -195,18 +145,12 @@ static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); -#if defined(GRAN) -static StgTSO *scheduleProcessEvent(rtsEvent *event); -#endif #if defined(PARALLEL_HASKELL) -static StgTSO *scheduleSendPendingMessages(void); -static void scheduleActivateSpark(void); -static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish); -#endif -#if defined(PAR) || defined(GRAN) -static void scheduleGranParReport(void); +static rtsBool scheduleGetRemoteWork(Capability *cap); +static void scheduleSendPendingMessages(void); +static void scheduleActivateSpark(Capability *cap); #endif -static void schedulePostRunThread(void); +static void schedulePostRunThread(StgTSO *t); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); static void scheduleHandleStackOverflow( Capability *cap, Task *task, StgTSO *t); @@ -222,6 +166,7 @@ static Capability *scheduleDoGC(Capability *cap, Task *task, static rtsBool checkBlackHoles(Capability *cap); static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); +static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso); static void deleteThread (Capability *cap, StgTSO *tso); static void deleteAllThreads (Capability *cap); @@ -230,11 +175,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -#if defined(PARALLEL_HASKELL) -StgTSO * createSparkThread(rtsSpark spark); -StgTSO * activateSpark (rtsSpark spark); -#endif - #ifdef DEBUG static char *whatNext_strs[] = { "(unknown)", @@ -284,6 +224,7 @@ addToRunQueue( Capability *cap, StgTSO *t ) This revolves around the global event queue, which determines what to do next. Therefore, it's more complicated than either the concurrent or the parallel (GUM) setup. + This version has been entirely removed (JB 2008/08). GUM version: GUM iterates over incoming messages. @@ -294,6 +235,12 @@ addToRunQueue( Capability *cap, StgTSO *t ) (see PendingFetches). This is not the ugliest code you could imagine, but it's bloody close. + (JB 2008/08) This version was formerly indicated by a PP-Flag PAR, + now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it, + as well as future GUM versions. This file has been refurbished to + only contain valid code, which is however incomplete, refers to + invalid includes etc. + ------------------------------------------------------------------------ */ static Capability * @@ -302,15 +249,8 @@ schedule (Capability *initialCapability, Task *task) StgTSO *t; Capability *cap; StgThreadReturnCode ret; -#if defined(GRAN) - rtsEvent *event; -#elif defined(PARALLEL_HASKELL) - StgTSO *tso; - GlobalTaskId pe; +#if defined(PARALLEL_HASKELL) rtsBool receivedFinish = rtsFalse; -# if defined(DEBUG) - nat tp_size, sp_size; // stats only -# endif #endif nat prev_what_next; rtsBool ready_to_gc; @@ -335,20 +275,12 @@ schedule (Capability *initialCapability, Task *task) #if defined(PARALLEL_HASKELL) #define TERMINATION_CONDITION (!receivedFinish) -#elif defined(GRAN) -#define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL) #else #define TERMINATION_CONDITION rtsTrue #endif while (TERMINATION_CONDITION) { -#if defined(GRAN) - /* Choose the processor with the next event */ - CurrentProc = event->proc; - CurrentTSO = event->tso; -#endif - #if defined(THREADED_RTS) if (first) { // don't yield the first time, we want a chance to run this @@ -461,6 +393,45 @@ schedule (Capability *initialCapability, Task *task) scheduleCheckBlockedThreads(cap); +#if defined(PARALLEL_HASKELL) + /* message processing and work distribution goes here */ + + /* if messages have been buffered... a NOOP in THREADED_RTS */ + scheduleSendPendingMessages(); + + /* If the run queue is empty,...*/ + if (emptyRunQueue(cap)) { + /* ...take one of our own sparks and turn it into a thread */ + scheduleActivateSpark(cap); + + /* if this did not work, try to steal a spark from someone else */ + if (emptyRunQueue(cap)) { + receivedFinish = scheduleGetRemoteWork(cap); + continue; // a new round, (hopefully) with new work + /* + in GUM, this a) sends out a FISH and returns IF no fish is + out already + b) (blocking) awaits and receives messages + + in Eden, this is only the blocking receive, as b) in GUM. + */ + } + } + + /* since we perform a blocking receive and continue otherwise, + either we never reach here or we definitely have work! */ + // from here: non-empty run queue + ASSERT(!emptyRunQueue(cap)); + + if (PacketsWaiting()) { /* now process incoming messages, if any + pending... + + CAUTION: scheduleGetRemoteWork called + above, waits for messages as well! */ + processMessages(cap, &receivedFinish); + } +#endif // PARALLEL_HASKELL + scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) cap = task->cap; // reload cap, it might have changed @@ -481,47 +452,14 @@ schedule (Capability *initialCapability, Task *task) continue; // nothing to do } -#if defined(PARALLEL_HASKELL) - scheduleSendPendingMessages(); - if (emptyRunQueue(cap) && scheduleActivateSpark()) - continue; - -#if defined(SPARKS) - ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left! -#endif - - /* If we still have no work we need to send a FISH to get a spark - from another PE */ - if (emptyRunQueue(cap)) { - if (!scheduleGetRemoteWork(&receivedFinish)) continue; - ASSERT(rtsFalse); // should not happen at the moment - } - // from here: non-empty run queue. - // TODO: merge above case with this, only one call processMessages() ! - if (PacketsWaiting()) { /* process incoming messages, if - any pending... only in else - because getRemoteWork waits for - messages as well */ - receivedFinish = processMessages(); - } -#endif - -#if defined(GRAN) - scheduleProcessEvent(event); -#endif - // // Get a thread to run // t = popRunQueue(cap); -#if defined(GRAN) || defined(PAR) - scheduleGranParReport(); // some kind of debuging output -#else // Sanity check the thread we're about to run. This can be // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); -#endif #if defined(THREADED_RTS) // Check whether we can run this thread in the current task. @@ -555,19 +493,22 @@ schedule (Capability *initialCapability, Task *task) } #endif - cap->r.rCurrentTSO = t; - /* context switches are initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 */ if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 && !emptyThreadQueues(cap)) { - context_switch = 1; + cap->context_switch = 1; } run_thread: + // CurrentTSO is the thread to run. t might be different if we + // loop back to run_thread, so make sure to set CurrentTSO after + // that. + cap->r.rCurrentTSO = t; + debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", (long)t->id, whatNext_strs[t->what_next]); @@ -591,14 +532,14 @@ run_thread: cap->in_haskell = rtsTrue; - dirtyTSO(t); + dirty_TSO(cap,t); #if defined(THREADED_RTS) if (recent_activity == ACTIVITY_DONE_GC) { // ACTIVITY_DONE_GC means we turned off the timer signal to // conserve power (see #1623). Re-enable it here. nat prev; - prev = xchg(&recent_activity, ACTIVITY_YES); + prev = xchg((P_)&recent_activity, ACTIVITY_YES); if (prev == ACTIVITY_DONE_GC) { startTimer(); } @@ -681,7 +622,9 @@ run_thread: CCCS = CCS_SYSTEM; #endif - schedulePostRunThread(); + schedulePostRunThread(t); + + t = threadStackUnderflow(task,t); ready_to_gc = rtsFalse; @@ -718,9 +661,6 @@ run_thread: cap = scheduleDoGC(cap,task,rtsFalse); } } /* end of while() */ - - debugTrace(PAR_DEBUG_verbose, - "== Leaving schedule() after having received Finish"); } /* ---------------------------------------------------------------------------- @@ -730,23 +670,7 @@ run_thread: static void schedulePreLoop(void) { -#if defined(GRAN) - /* set up first event to get things going */ - /* ToDo: assign costs for system setup and init MainTSO ! */ - new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], - ContinueThread, - CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL); - - debugTrace (DEBUG_gran, - "GRAN: Init CurrentTSO (in schedule) = %p", - CurrentTSO); - IF_DEBUG(gran, G_TSO(CurrentTSO, 5)); - - if (RtsFlags.GranFlags.Light) { - /* Save current time; GranSim Light only */ - CurrentTSO->gran.clock = CurrentTime[CurrentProc]; - } -#endif + // initialisation for scheduler - what cannot go into initScheduler() } /* ----------------------------------------------------------------------------- @@ -768,7 +692,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // Check whether we have more threads on our run queue, or sparks // in our pool, that we could hand to another Capability. - if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) + if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE) && sparkPoolSizeCap(cap) < 2) { return; } @@ -809,21 +733,21 @@ schedulePushWork(Capability *cap USED_IF_THREADS, if (cap->run_queue_hd != END_TSO_QUEUE) { prev = cap->run_queue_hd; - t = prev->link; - prev->link = END_TSO_QUEUE; + t = prev->_link; + prev->_link = END_TSO_QUEUE; for (; t != END_TSO_QUEUE; t = next) { - next = t->link; - t->link = END_TSO_QUEUE; + next = t->_link; + t->_link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated || t->bound == task // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread - prev->link = t; + setTSOLink(cap, prev, t); prev = t; } else if (i == n_free_caps) { pushed_to_all = rtsTrue; i = 0; // keep one for us - prev->link = t; + setTSOLink(cap, prev, t); prev = t; } else { debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no); @@ -919,7 +843,7 @@ scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) cap->run_queue_hd = cap->wakeup_queue_hd; cap->run_queue_tl = cap->wakeup_queue_tl; } else { - cap->run_queue_tl->link = cap->wakeup_queue_hd; + setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd); cap->run_queue_tl = cap->wakeup_queue_tl; } cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; @@ -1008,6 +932,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task) // either we have threads to run, or we were interrupted: ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING); + + return; } #endif @@ -1022,7 +948,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) case BlockedOnException: case BlockedOnMVar: throwToSingleThreaded(cap, task->tso, - (StgClosure *)NonTermination_closure); + (StgClosure *)nonTermination_closure); return; default: barf("deadlock: main thread blocked in a strange way"); @@ -1033,152 +959,6 @@ scheduleDetectDeadlock (Capability *cap, Task *task) } } -/* ---------------------------------------------------------------------------- - * Process an event (GRAN only) - * ------------------------------------------------------------------------- */ - -#if defined(GRAN) -static StgTSO * -scheduleProcessEvent(rtsEvent *event) -{ - StgTSO *t; - - if (RtsFlags.GranFlags.Light) - GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc - - /* adjust time based on time-stamp */ - if (event->time > CurrentTime[CurrentProc] && - event->evttype != ContinueThread) - CurrentTime[CurrentProc] = event->time; - - /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */ - if (!RtsFlags.GranFlags.Light) - handleIdlePEs(); - - IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n")); - - /* main event dispatcher in GranSim */ - switch (event->evttype) { - /* Should just be continuing execution */ - case ContinueThread: - IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n")); - /* ToDo: check assertion - ASSERT(run_queue_hd != (StgTSO*)NULL && - run_queue_hd != END_TSO_QUEUE); - */ - /* Ignore ContinueThreads for fetching threads (if synchr comm) */ - if (!RtsFlags.GranFlags.DoAsyncFetch && - procStatus[CurrentProc]==Fetching) { - debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - goto next_thread; - } - /* Ignore ContinueThreads for completed threads */ - if (CurrentTSO->what_next == ThreadComplete) { - debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - goto next_thread; - } - /* Ignore ContinueThreads for threads that are being migrated */ - if (PROCS(CurrentTSO)==Nowhere) { - debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - goto next_thread; - } - /* The thread should be at the beginning of the run queue */ - if (CurrentTSO!=run_queue_hds[CurrentProc]) { - debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - break; // run the thread anyway - } - /* - new_event(proc, proc, CurrentTime[proc], - FindWork, - (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL); - goto next_thread; - */ /* Catches superfluous CONTINUEs -- should be unnecessary */ - break; // now actually run the thread; DaH Qu'vam yImuHbej - - case FetchNode: - do_the_fetchnode(event); - goto next_thread; /* handle next event in event queue */ - - case GlobalBlock: - do_the_globalblock(event); - goto next_thread; /* handle next event in event queue */ - - case FetchReply: - do_the_fetchreply(event); - goto next_thread; /* handle next event in event queue */ - - case UnblockThread: /* Move from the blocked queue to the tail of */ - do_the_unblock(event); - goto next_thread; /* handle next event in event queue */ - - case ResumeThread: /* Move from the blocked queue to the tail of */ - /* the runnable queue ( i.e. Qu' SImqa'lu') */ - event->tso->gran.blocktime += - CurrentTime[CurrentProc] - event->tso->gran.blockedat; - do_the_startthread(event); - goto next_thread; /* handle next event in event queue */ - - case StartThread: - do_the_startthread(event); - goto next_thread; /* handle next event in event queue */ - - case MoveThread: - do_the_movethread(event); - goto next_thread; /* handle next event in event queue */ - - case MoveSpark: - do_the_movespark(event); - goto next_thread; /* handle next event in event queue */ - - case FindWork: - do_the_findwork(event); - goto next_thread; /* handle next event in event queue */ - - default: - barf("Illegal event type %u\n", event->evttype); - } /* switch */ - - /* This point was scheduler_loop in the old RTS */ - - IF_DEBUG(gran, debugBelch("GRAN: after main switch\n")); - - TimeOfLastEvent = CurrentTime[CurrentProc]; - TimeOfNextEvent = get_time_of_next_event(); - IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK - // CurrentTSO = ThreadQueueHd; - - IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", - TimeOfNextEvent)); - - if (RtsFlags.GranFlags.Light) - GranSimLight_leave_system(event, &ActiveTSO); - - EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice; - - IF_DEBUG(gran, - debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice)); - - /* in a GranSim setup the TSO stays on the run queue */ - t = CurrentTSO; - /* Take a thread from the run queue. */ - POP_RUN_QUEUE(t); // take_off_run_queue(t); - - IF_DEBUG(gran, - debugBelch("GRAN: About to run current thread, which is\n"); - G_TSO(t,5)); - - context_switch = 0; // turned on via GranYield, checking events and time slice - - IF_DEBUG(gran, - DumpGranEvent(GR_SCHEDULE, t)); - - procStatus[CurrentProc] = Busy; -} -#endif // GRAN /* ---------------------------------------------------------------------------- * Send pending messages (PARALLEL_HASKELL only) @@ -1188,9 +968,6 @@ scheduleProcessEvent(rtsEvent *event) static StgTSO * scheduleSendPendingMessages(void) { - StgSparkPool *pool; - rtsSpark spark; - StgTSO *t; # if defined(PAR) // global Mem.Mgmt., omit for now if (PendingFetches != END_BF_QUEUE) { @@ -1212,57 +989,28 @@ scheduleSendPendingMessages(void) #if defined(PARALLEL_HASKELL) static void -scheduleActivateSpark(void) +scheduleActivateSpark(Capability *cap) { -#if defined(SPARKS) - ASSERT(emptyRunQueue()); -/* We get here if the run queue is empty and want some work. - We try to turn a spark into a thread, and add it to the run queue, - from where it will be picked up in the next iteration of the scheduler - loop. -*/ - - /* :-[ no local threads => look out for local sparks */ - /* the spark pool for the current PE */ - pool = &(cap.r.rSparks); // JB: cap = (old) MainCap - if (advisory_thread_count < RtsFlags.ParFlags.maxThreads && - pool->hd < pool->tl) { - /* - * ToDo: add GC code check that we really have enough heap afterwards!! - * Old comment: - * If we're here (no runnable threads) and we have pending - * sparks, we must have a space problem. Get enough space - * to turn one of those pending sparks into a - * thread... - */ + StgClosure *spark; - spark = findSpark(rtsFalse); /* get a spark */ - if (spark != (rtsSpark) NULL) { - tso = createThreadFromSpark(spark); /* turn the spark into a thread */ - IF_PAR_DEBUG(fish, // schedule, - debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n", - tso->id, tso, advisory_thread_count)); - - if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */ - IF_PAR_DEBUG(fish, // schedule, - debugBelch("==^^ failed to create thread from spark @ %lx\n", - spark)); - return rtsFalse; /* failed to generate a thread */ - } /* otherwise fall through & pick-up new tso */ - } else { - IF_PAR_DEBUG(fish, // schedule, - debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", - spark_queue_len(pool))); - return rtsFalse; /* failed to generate a thread */ - } - return rtsTrue; /* success in generating a thread */ - } else { /* no more threads permitted or pool empty */ - return rtsFalse; /* failed to generateThread */ - } -#else - tso = NULL; // avoid compiler warning only - return rtsFalse; /* dummy in non-PAR setup */ -#endif // SPARKS +/* We only want to stay here if the run queue is empty and we want some + work. We try to turn a spark into a thread, and add it to the run + queue, from where it will be picked up in the next iteration of the + scheduler loop. +*/ + if (!emptyRunQueue(cap)) + /* In the threaded RTS, another task might have pushed a thread + on our run queue in the meantime ? But would need a lock.. */ + return; + + spark = findSpark(cap); // defined in Sparks.c + + if (spark != NULL) { + debugTrace(DEBUG_sched, + "turning spark of closure %p into a thread", + (StgClosure *)spark); + createSparkThread(cap,spark); // defined in Sparks.c + } } #endif // PARALLEL_HASKELL @@ -1272,10 +1020,12 @@ scheduleActivateSpark(void) #if defined(PARALLEL_HASKELL) static rtsBool -scheduleGetRemoteWork(rtsBool *receivedFinish) +scheduleGetRemoteWork(Capability *cap) { - ASSERT(emptyRunQueue()); +#if defined(PARALLEL_HASKELL) + rtsBool receivedFinish = rtsFalse; + // idle() , i.e. send all buffers, wait for work if (RtsFlags.ParFlags.BufferTime) { IF_PAR_DEBUG(verbose, debugBelch("...send all pending data,")); @@ -1285,261 +1035,64 @@ scheduleGetRemoteWork(rtsBool *receivedFinish) sendImmediately(i); // send all messages away immediately } } -# ifndef SPARKS - //++EDEN++ idle() , i.e. send all buffers, wait for work - // suppress fishing in EDEN... just look for incoming messages - // (blocking receive) - IF_PAR_DEBUG(verbose, - debugBelch("...wait for incoming messages...\n")); - *receivedFinish = processMessages(); // blocking receive... - - // and reenter scheduling loop after having received something - // (return rtsFalse below) - -# else /* activate SPARKS machinery */ -/* We get here, if we have no work, tried to activate a local spark, but still - have no work. We try to get a remote spark, by sending a FISH message. - Thread migration should be added here, and triggered when a sequence of - fishes returns without work. */ - delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll); - - /* =8-[ no local sparks => look for work on other PEs */ - /* - * We really have absolutely no work. Send out a fish - * (there may be some out there already), and wait for - * something to arrive. We clearly can't run any threads - * until a SCHEDULE or RESUME arrives, and so that's what - * we're hoping to see. (Of course, we still have to - * respond to other types of messages.) - */ - rtsTime now = msTime() /*CURRENT_TIME*/; - IF_PAR_DEBUG(verbose, - debugBelch("-- now=%ld\n", now)); - IF_PAR_DEBUG(fish, // verbose, - if (outstandingFishes < RtsFlags.ParFlags.maxFishes && - (last_fish_arrived_at!=0 && - last_fish_arrived_at+delay > now)) { - debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n", - now, last_fish_arrived_at+delay, - last_fish_arrived_at, - delay); - }); - - if (outstandingFishes < RtsFlags.ParFlags.maxFishes && - advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when? - if (last_fish_arrived_at==0 || - (last_fish_arrived_at+delay <= now)) { // send FISH now! - /* outstandingFishes is set in sendFish, processFish; - avoid flooding system with fishes via delay */ - next_fish_to_send_at = 0; - } else { - /* ToDo: this should be done in the main scheduling loop to avoid the - busy wait here; not so bad if fish delay is very small */ - int iq = 0; // DEBUGGING -- HWL - next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send - /* send a fish when ready, but process messages that arrive in the meantime */ - do { - if (PacketsWaiting()) { - iq++; // DEBUGGING - *receivedFinish = processMessages(); - } - now = msTime(); - } while (!*receivedFinish || now sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count)); - - } - - // JB: IMHO, this should all be hidden inside sendFish(...) - /* pe = choosePE(); - sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, - NEW_FISH_HUNGER); - // Global statistics: count no. of fishes - if (RtsFlags.ParFlags.ParStats.Global && - RtsFlags.GcFlags.giveStats > NO_GC_STATS) { - globalParStats.tot_fish_mess++; - } - */ + /* this would be the place for fishing in GUM... - /* delayed fishes must have been sent by now! */ - next_fish_to_send_at = 0; - } - - *receivedFinish = processMessages(); -# endif /* SPARKS */ - - return rtsFalse; - /* NB: this function always returns rtsFalse, meaning the scheduler - loop continues with the next iteration; - rationale: - return code means success in finding work; we enter this function - if there is no local work, thus have to send a fish which takes - time until it arrives with work; in the meantime we should process - messages in the main loop; - */ -} -#endif // PARALLEL_HASKELL + if (no-earlier-fish-around) + sendFish(choosePe()); + */ -/* ---------------------------------------------------------------------------- - * PAR/GRAN: Report stats & debugging info(?) - * ------------------------------------------------------------------------- */ + // Eden:just look for incoming messages (blocking receive) + IF_PAR_DEBUG(verbose, + debugBelch("...wait for incoming messages...\n")); + processMessages(cap, &receivedFinish); // blocking receive... -#if defined(PAR) || defined(GRAN) -static void -scheduleGranParReport(void) -{ - ASSERT(run_queue_hd != END_TSO_QUEUE); - /* Take a thread from the run queue, if we have work */ - POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE); + return receivedFinish; + // reenter scheduling look after having received something - /* If this TSO has got its outport closed in the meantime, - * it mustn't be run. Instead, we have to clean it up as if it was finished. - * It has to be marked as TH_DEAD for this purpose. - * If it is TH_TERM instead, it is supposed to have finished in the normal way. +#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */ -JB: TODO: investigate wether state change field could be nuked - entirely and replaced by the normal tso state (whatnext - field). All we want to do is to kill tsos from outside. - */ + return rtsFalse; /* return value unused in THREADED_RTS */ - /* ToDo: write something to the log-file - if (RTSflags.ParFlags.granSimStats && !sameThread) - DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd); - - CurrentTSO = t; - */ - /* the spark pool for the current PE */ - pool = &(cap.r.rSparks); // cap = (old) MainCap - - IF_DEBUG(scheduler, - debugBelch("--=^ %d threads, %d sparks on [%#x]\n", - run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); - - IF_PAR_DEBUG(fish, - debugBelch("--=^ %d threads, %d sparks on [%#x]\n", - run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); - - if (RtsFlags.ParFlags.ParStats.Full && - (t->par.sparkname != (StgInt)0) && // only log spark generated threads - (emitSchedule || // forced emit - (t && LastTSO && t->id != LastTSO->id))) { - /* - we are running a different TSO, so write a schedule event to log file - NB: If we use fair scheduling we also have to write a deschedule - event for LastTSO; with unfair scheduling we know that the - previous tso has blocked whenever we switch to another tso, so - we don't need it in GUM for now - */ - IF_PAR_DEBUG(fish, // schedule, - debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname)); - - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0); - emitSchedule = rtsFalse; - } -} -#endif +#endif /* PARALLEL_HASKELL */ +} +#endif // PARALLEL_HASKELL /* ---------------------------------------------------------------------------- * After running a thread... * ------------------------------------------------------------------------- */ static void -schedulePostRunThread(void) +schedulePostRunThread (StgTSO *t) { -#if defined(PAR) - /* HACK 675: if the last thread didn't yield, make sure to print a - SCHEDULE event to the log file when StgRunning the next thread, even - if it is the same one as before */ - LastTSO = t; - TimeOfLastYield = CURRENT_TIME; -#endif + // We have to be able to catch transactions that are in an + // infinite loop as a result of seeing an inconsistent view of + // memory, e.g. + // + // atomically $ do + // [a,b] <- mapM readTVar [ta,tb] + // when (a == b) loop + // + // and a is never equal to b given a consistent view of memory. + // + if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { + if (!stmValidateNestOfTransactions (t -> trec)) { + debugTrace(DEBUG_sched | DEBUG_stm, + "trec %p found wasting its time", t); + + // strip the stack back to the + // ATOMICALLY_FRAME, aborting the (nested) + // transaction, and saving the stack of any + // partially-evaluated thunks on the heap. + throwToSingleThreaded_(&capabilities[0], t, + NULL, rtsTrue, NULL); + + ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); + } + } /* some statistics gathering in the parallel case */ - -#if defined(GRAN) || defined(PAR) || defined(EDEN) - switch (ret) { - case HeapOverflow: -# if defined(GRAN) - IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t)); - globalGranStats.tot_heapover++; -# elif defined(PAR) - globalParStats.tot_heapover++; -# endif - break; - - case StackOverflow: -# if defined(GRAN) - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - globalGranStats.tot_stackover++; -# elif defined(PAR) - // IF_DEBUG(par, - // DumpGranEvent(GR_DESCHEDULE, t); - globalParStats.tot_stackover++; -# endif - break; - - case ThreadYielding: -# if defined(GRAN) - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - globalGranStats.tot_yields++; -# elif defined(PAR) - // IF_DEBUG(par, - // DumpGranEvent(GR_DESCHEDULE, t); - globalParStats.tot_yields++; -# endif - break; - - case ThreadBlocked: -# if defined(GRAN) - debugTrace(DEBUG_sched, - "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure, - (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure))); - if (t->block_info.closure!=(StgClosure*)NULL) - print_bq(t->block_info.closure); - debugBelch("\n")); - - // ??? needed; should emit block before - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t - /* - ngoq Dogh! - ASSERT(procStatus[CurrentProc]==Busy || - ((procStatus[CurrentProc]==Fetching) && - (t->block_info.closure!=(StgClosure*)NULL))); - if (run_queue_hds[CurrentProc] == END_TSO_QUEUE && - !(!RtsFlags.GranFlags.DoAsyncFetch && - procStatus[CurrentProc]==Fetching)) - procStatus[CurrentProc] = Idle; - */ -# elif defined(PAR) -//++PAR++ blockThread() writes the event (change?) -# endif - break; - - case ThreadFinished: - break; - - default: - barf("parGlobalStats: unknown return code"); - break; - } -#endif } /* ----------------------------------------------------------------------------- @@ -1618,23 +1171,19 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } debugTrace(DEBUG_sched, - "--<< thread %ld (%s) stopped: HeapOverflow\n", + "--<< thread %ld (%s) stopped: HeapOverflow", (long)t->id, whatNext_strs[t->what_next]); -#if defined(GRAN) - ASSERT(!is_on_queue(t,CurrentProc)); -#elif defined(PARALLEL_HASKELL) - /* Currently we emit a DESCHEDULE event before GC in GUM. - ToDo: either add separate event to distinguish SYSTEM time from rest - or just nuke this DESCHEDULE (and the following SCHEDULE) */ - if (0 && RtsFlags.ParFlags.ParStats.Full) { - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0); - emitSchedule = rtsTrue; + if (cap->context_switch) { + // Sometimes we miss a context switch, e.g. when calling + // primitives in a tight loop, MAYBE_GC() doesn't check the + // context switch flag, and we end up waiting for a GC. + // See #1984, and concurrent/should_run/1984 + cap->context_switch = 0; + addToRunQueue(cap,t); + } else { + pushOnRunQueue(cap,t); } -#endif - - pushOnRunQueue(cap,t); return rtsTrue; /* actual GC is done at the end of the while loop in schedule() */ } @@ -1680,7 +1229,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) // the CPU because the tick always arrives during GC). This way // penalises threads that do a lot of allocation, but that seems // better than the alternative. - context_switch = 0; + cap->context_switch = 0; /* put the thread back on the run queue. Then, if we're ready to * GC, check whether this is the last task to stop. If so, wake @@ -1702,7 +1251,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) IF_DEBUG(sanity, //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id); checkTSO(t)); - ASSERT(t->link == END_TSO_QUEUE); + ASSERT(t->_link == END_TSO_QUEUE); // Shortcut if we're just switching evaluators: don't bother // doing stack squeezing (which can be expensive), just run the @@ -1710,28 +1259,9 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) if (t->what_next != prev_what_next) { return rtsTrue; } - -#if defined(GRAN) - ASSERT(!is_on_queue(t,CurrentProc)); - - IF_DEBUG(sanity, - //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs)."); - checkThreadQsSanity(rtsTrue)); - -#endif addToRunQueue(cap,t); -#if defined(GRAN) - /* add a ContinueThread event to actually process the thread */ - new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], - ContinueThread, - t, (StgClosure*)NULL, (rtsSpark*)NULL); - IF_GRAN_DEBUG(bq, - debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n"); - G_EVENTQ(0); - G_CURR_THREADQ(0)); -#endif return rtsFalse; } @@ -1746,42 +1276,6 @@ scheduleHandleThreadBlocked( StgTSO *t #endif ) { -#if defined(GRAN) - IF_DEBUG(scheduler, - debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure))); - if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure)); - - // ??? needed; should emit block before - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t - /* - ngoq Dogh! - ASSERT(procStatus[CurrentProc]==Busy || - ((procStatus[CurrentProc]==Fetching) && - (t->block_info.closure!=(StgClosure*)NULL))); - if (run_queue_hds[CurrentProc] == END_TSO_QUEUE && - !(!RtsFlags.GranFlags.DoAsyncFetch && - procStatus[CurrentProc]==Fetching)) - procStatus[CurrentProc] = Idle; - */ -#elif defined(PAR) - IF_DEBUG(scheduler, - debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure)); - IF_PAR_DEBUG(bq, - - if (t->block_info.closure!=(StgClosure*)NULL) - print_bq(t->block_info.closure)); - - /* Send a fetch (if BlockedOnGA) and dump event to log file */ - blockThread(t); - - /* whatever we schedule next, we must log that schedule */ - emitSchedule = rtsTrue; - -#else /* !GRAN */ // We don't need to do anything. The thread is blocked, and it // has tidied up its stack and placed itself on whatever queue @@ -1804,12 +1298,6 @@ scheduleHandleThreadBlocked( StgTSO *t debugTraceEnd(); } #endif - - /* Only for dumping event to log file - ToDo: do I need this in GranSim, too? - blockThread(t); - */ -#endif } /* ----------------------------------------------------------------------------- @@ -1828,48 +1316,6 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", (unsigned long)t->id, whatNext_strs[t->what_next]); -#if defined(GRAN) - endThread(t, CurrentProc); // clean-up the thread -#elif defined(PARALLEL_HASKELL) - /* For now all are advisory -- HWL */ - //if(t->priority==AdvisoryPriority) ?? - advisory_thread_count--; // JB: Caution with this counter, buggy! - -# if defined(DIST) - if(t->dist.priority==RevalPriority) - FinishReval(t); -# endif - -# if defined(EDENOLD) - // the thread could still have an outport... (BUG) - if (t->eden.outport != -1) { - // delete the outport for the tso which has finished... - IF_PAR_DEBUG(eden_ports, - debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n", - t->eden.outport, t->id)); - deleteOPT(t); - } - // thread still in the process (HEAVY BUG! since outport has just been closed...) - if (t->eden.epid != -1) { - IF_PAR_DEBUG(eden_ports, - debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n", - t->id, t->eden.epid)); - removeTSOfromProcess(t); - } -# endif - -# if defined(PAR) - if (RtsFlags.ParFlags.ParStats.Full && - !RtsFlags.ParFlags.ParStats.Suppressed) - DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */); - - // t->par only contains statistics: left out for now... - IF_PAR_DEBUG(fish, - debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n", - t->id,t,t->par.sparkname)); -# endif -#endif // PARALLEL_HASKELL - // // Check whether the thread that just completed was a bound // thread, and if so return with the result. @@ -1889,7 +1335,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) // point where we can deal with this. Leaving it on the run // queue also ensures that the garbage collector knows about // this thread and its return value (it gets dropped from the - // all_threads list so there's no other way to find it). + // step->threads list so there's no other way to find it). appendToRunQueue(cap,t); return rtsFalse; #else @@ -1951,10 +1397,10 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED ) static Capability * scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { - StgTSO *t; rtsBool heap_census; #ifdef THREADED_RTS - static volatile StgWord waiting_for_gc; + /* extern static volatile StgWord waiting_for_gc; + lives inside capability.c */ rtsBool was_waiting; nat i; #endif @@ -1971,6 +1417,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // the other tasks to sleep and stay asleep. // + /* Other capabilities are prevented from running yet more Haskell + threads if waiting_for_gc is set. Tested inside + yieldCapability() and releaseCapability() in Capability.c */ + was_waiting = cas(&waiting_for_gc, 0, 1); if (was_waiting) { do { @@ -1980,6 +1430,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) return cap; // NOTE: task->cap might have changed here } + setContextSwitches(); for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); if (cap != &capabilities[i]) { @@ -1990,7 +1441,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // all the Capabilities, but even so it's a slightly // unsavoury invariant. task->cap = pcap; - context_switch = 1; waitForReturnCapability(&pcap, task); if (pcap != &capabilities[i]) { barf("scheduleDoGC: got the wrong capability"); @@ -2001,51 +1451,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) waiting_for_gc = rtsFalse; #endif - /* Kick any transactions which are invalid back to their - * atomically frames. When next scheduled they will try to - * commit, this commit will fail and they will retry. - */ - { - StgTSO *next; - - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->link; - } else { - next = t->global_link; - - // This is a good place to check for blocked - // exceptions. It might be the case that a thread is - // blocked on delivering an exception to a thread that - // is also blocked - we try to ensure that this - // doesn't happen in throwTo(), but it's too hard (or - // impossible) to close all the race holes, so we - // accept that some might get through and deal with - // them here. A GC will always happen at some point, - // even if the system is otherwise deadlocked. - maybePerformBlockedException (&capabilities[0], t); - - if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) { - if (!stmValidateNestOfTransactions (t -> trec)) { - debugTrace(DEBUG_sched | DEBUG_stm, - "trec %p found wasting its time", t); - - // strip the stack back to the - // ATOMICALLY_FRAME, aborting the (nested) - // transaction, and saving the stack of any - // partially-evaluated thunks on the heap. - throwToSingleThreaded_(&capabilities[0], t, - NULL, rtsTrue, NULL); - -#ifdef REG_R1 - ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); -#endif - } - } - } - } - } - // so this happens periodically: if (cap) scheduleCheckBlackHoles(cap); @@ -2094,17 +1499,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) } #endif -#if defined(GRAN) - /* add a ContinueThread event to continue execution of current thread */ - new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], - ContinueThread, - t, (StgClosure*)NULL, (rtsSpark*)NULL); - IF_GRAN_DEBUG(bq, - debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n"); - G_EVENTQ(0); - G_CURR_THREADQ(0)); -#endif /* GRAN */ - return cap; } @@ -2124,6 +1518,7 @@ forkProcess(HsStablePtr *entry pid_t pid; StgTSO* t,*next; Capability *cap; + nat s; #if defined(THREADED_RTS) if (RtsFlags.ParFlags.nNodes > 1) { @@ -2171,9 +1566,10 @@ forkProcess(HsStablePtr *entry // all Tasks, because they correspond to OS threads that are // now gone. - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { - next = t->link; + next = t->_link; } else { next = t->global_link; // don't allow threads to catch the ThreadKilled @@ -2181,6 +1577,7 @@ forkProcess(HsStablePtr *entry // threads may be evaluating thunks that we need later. deleteThread_(cap,t); } + } } // Empty the run queue. It seems tempting to let all the @@ -2194,14 +1591,19 @@ forkProcess(HsStablePtr *entry // don't exist now: cap->suspended_ccalling_tasks = NULL; - // Empty the all_threads list. Otherwise, the garbage + // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. - all_threads = END_TSO_QUEUE; + for (s = 0; s < total_steps; s++) { + all_steps[s].threads = END_TSO_QUEUE; + } // Wipe the task list, except the current Task. ACQUIRE_LOCK(&sched_mutex); for (task = all_tasks; task != NULL; task=task->all_link) { if (task != cap->running_task) { +#if defined(THREADED_RTS) + initMutex(&task->lock); // see #1391 +#endif discardTask(task); } } @@ -2243,14 +1645,18 @@ deleteAllThreads ( Capability *cap ) // NOTE: only safe to call if we own all capabilities. StgTSO* t, *next; + nat s; + debugTrace(DEBUG_sched,"deleting all threads"); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { + for (s = 0; s < total_steps; s++) { + for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) { if (t->what_next == ThreadRelocated) { - next = t->link; + next = t->_link; } else { next = t->global_link; deleteThread(cap,t); } + } } // The run queue now contains a bunch of ThreadKilled threads. We @@ -2405,7 +1811,7 @@ resumeThread (void *task_) tso = task->suspended_tso; task->suspended_tso = NULL; - tso->link = END_TSO_QUEUE; + tso->_link = END_TSO_QUEUE; // no write barrier reqd debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id); if (tso->why_blocked == BlockedOnCCall) { @@ -2424,7 +1830,7 @@ resumeThread (void *task_) #endif /* We might have GC'd, mark the TSO dirty again */ - dirtyTSO(tso); + dirty_TSO(cap,tso); IF_DEBUG(sanity, checkTSO(tso)); @@ -2459,7 +1865,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) if (cpu == cap->no) { appendToRunQueue(cap,tso); } else { - migrateThreadToCapability_lock(&capabilities[cpu],tso); + wakeupThreadOnCapability(cap, &capabilities[cpu], tso); } #else appendToRunQueue(cap,tso); @@ -2487,13 +1893,6 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id); -#if defined(GRAN) - /* GranSim specific init */ - CurrentTSO = m->tso; // the TSO to run - procStatus[MainProc] = Busy; // status of main PE - CurrentProc = MainProc; // PE to run it on -#endif - cap = schedule(cap,task); ASSERT(task->stat != NoStatus); @@ -2508,7 +1907,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) -void +void OSThreadProcAttr workerStart(Task *task) { Capability *cap; @@ -2542,27 +1941,14 @@ workerStart(Task *task) void initScheduler(void) { -#if defined(GRAN) - nat i; - for (i=0; i<=MAX_PROC; i++) { - run_queue_hds[i] = END_TSO_QUEUE; - run_queue_tls[i] = END_TSO_QUEUE; - blocked_queue_hds[i] = END_TSO_QUEUE; - blocked_queue_tls[i] = END_TSO_QUEUE; - ccalling_threadss[i] = END_TSO_QUEUE; - blackhole_queue[i] = END_TSO_QUEUE; - sleeping_queue = END_TSO_QUEUE; - } -#elif !defined(THREADED_RTS) +#if !defined(THREADED_RTS) blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; #endif blackhole_queue = END_TSO_QUEUE; - all_threads = END_TSO_QUEUE; - context_switch = 0; sched_state = SCHED_RUNNING; recent_activity = ACTIVITY_YES; @@ -2661,87 +2047,6 @@ freeScheduler( void ) #endif } -/* --------------------------------------------------------------------------- - Where are the roots that we know about? - - - all the threads on the runnable queue - - all the threads on the blocked queue - - all the threads on the sleeping queue - - all the thread currently executing a _ccall_GC - - all the "main threads" - - ------------------------------------------------------------------------ */ - -/* This has to be protected either by the scheduler monitor, or by the - garbage collection monitor (probably the latter). - KH @ 25/10/99 -*/ - -void -GetRoots( evac_fn evac ) -{ - nat i; - Capability *cap; - Task *task; - -#if defined(GRAN) - for (i=0; i<=RtsFlags.GranFlags.proc; i++) { - if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) - evac((StgClosure **)&run_queue_hds[i]); - if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) - evac((StgClosure **)&run_queue_tls[i]); - - if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) - evac((StgClosure **)&blocked_queue_hds[i]); - if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) - evac((StgClosure **)&blocked_queue_tls[i]); - if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) - evac((StgClosure **)&ccalling_threads[i]); - } - - markEventQueue(); - -#else /* !GRAN */ - - for (i = 0; i < n_capabilities; i++) { - cap = &capabilities[i]; - evac((StgClosure **)(void *)&cap->run_queue_hd); - evac((StgClosure **)(void *)&cap->run_queue_tl); -#if defined(THREADED_RTS) - evac((StgClosure **)(void *)&cap->wakeup_queue_hd); - evac((StgClosure **)(void *)&cap->wakeup_queue_tl); -#endif - for (task = cap->suspended_ccalling_tasks; task != NULL; - task=task->next) { - debugTrace(DEBUG_sched, - "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id); - evac((StgClosure **)(void *)&task->suspended_tso); - } - - } - - -#if !defined(THREADED_RTS) - evac((StgClosure **)(void *)&blocked_queue_hd); - evac((StgClosure **)(void *)&blocked_queue_tl); - evac((StgClosure **)(void *)&sleeping_queue); -#endif -#endif - - // evac((StgClosure **)&blackhole_queue); - -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN) - markSparkQueue(evac); -#endif - -#if defined(RTS_USER_SIGNALS) - // mark the signal handlers (signals should be already blocked) - if (RtsFlags.MiscFlags.install_signal_handlers) { - markSignalHandlers(evac); - } -#endif -} - /* ----------------------------------------------------------------------------- performGC @@ -2834,7 +2139,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) "increasing stack size from %ld words to %d.", (long)tso->stack_size, new_stack_size); - dest = (StgTSO *)allocate(new_tso_size); + dest = (StgTSO *)allocateLocal(cap,new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); /* copy the TSO block and the old stack into the new area */ @@ -2855,7 +2160,7 @@ threadStackOverflow(Capability *cap, StgTSO *tso) * dead TSO's stack. */ tso->what_next = ThreadRelocated; - tso->link = dest; + setTSOLink(cap,tso,dest); tso->sp = (P_)&(tso->stack[tso->stack_size]); tso->why_blocked = NotBlocked; @@ -2877,6 +2182,56 @@ threadStackOverflow(Capability *cap, StgTSO *tso) return dest; } +static StgTSO * +threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso) +{ + bdescr *bd, *new_bd; + lnat free_w, tso_size_w; + StgTSO *new_tso; + + tso_size_w = tso_sizeW(tso); + + if (tso_size_w < MBLOCK_SIZE_W || + (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) + { + return tso; + } + + // don't allow throwTo() to modify the blocked_exceptions queue + // while we are moving the TSO: + lockClosure((StgClosure *)tso); + + // this is the number of words we'll free + free_w = round_to_mblocks(tso_size_w/2); + + bd = Bdescr((StgPtr)tso); + new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W); + bd->free = bd->start + TSO_STRUCT_SIZEW; + + new_tso = (StgTSO *)new_bd->start; + memcpy(new_tso,tso,TSO_STRUCT_SIZE); + new_tso->stack_size = new_bd->free - new_tso->stack; + + debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu", + (long)tso->id, tso_size_w, tso_sizeW(new_tso)); + + tso->what_next = ThreadRelocated; + tso->_link = new_tso; // no write barrier reqd: same generation + + // The TSO attached to this Task may have moved, so update the + // pointer to it. + if (task->tso == tso) { + task->tso = new_tso; + } + + unlockTSO(new_tso); + unlockTSO(tso); + + IF_DEBUG(sanity,checkTSO(new_tso)); + + return new_tso; +} + /* --------------------------------------------------------------------------- Interrupt execution - usually called inside a signal handler so it mustn't do anything fancy. @@ -2886,7 +2241,7 @@ void interruptStgRts(void) { sched_state = SCHED_INTERRUPTING; - context_switch = 1; + setContextSwitches(); wakeUpRts(); } @@ -2946,17 +2301,15 @@ checkBlackHoles (Capability *cap) t = blackhole_queue; while (t != END_TSO_QUEUE) { ASSERT(t->why_blocked == BlockedOnBlackHole); - type = get_itbl(t->block_info.closure)->type; + type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) { IF_DEBUG(sanity,checkTSO(t)); t = unblockOne(cap, t); - // urk, the threads migrate to the current capability - // here, but we'd like to keep them on the original one. *prev = t; any_woke_up = rtsTrue; } else { - prev = &t->link; - t = t->link; + prev = &t->_link; + t = t->_link; } } @@ -3164,11 +2517,15 @@ resurrectThreads (StgTSO *threads) { StgTSO *tso, *next; Capability *cap; + step *step; for (tso = threads; tso != END_TSO_QUEUE; tso = next) { next = tso->global_link; - tso->global_link = all_threads; - all_threads = tso; + + step = Bdescr((P_)tso)->step; + tso->global_link = step->threads; + step->threads = tso; + debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id); // Wake up the thread on the Capability it was last on @@ -3179,15 +2536,15 @@ resurrectThreads (StgTSO *threads) case BlockedOnException: /* Called by GC - sched_mutex lock is currently held. */ throwToSingleThreaded(cap, tso, - (StgClosure *)BlockedOnDeadMVar_closure); + (StgClosure *)blockedOnDeadMVar_closure); break; case BlockedOnBlackHole: throwToSingleThreaded(cap, tso, - (StgClosure *)NonTermination_closure); + (StgClosure *)nonTermination_closure); break; case BlockedOnSTM: throwToSingleThreaded(cap, tso, - (StgClosure *)BlockedIndefinitely_closure); + (StgClosure *)blockedIndefinitely_closure); break; case NotBlocked: /* This might happen if the thread was blocked on a black hole @@ -3200,3 +2557,37 @@ resurrectThreads (StgTSO *threads) } } } + +/* ----------------------------------------------------------------------------- + performPendingThrowTos is called after garbage collection, and + passed a list of threads that were found to have pending throwTos + (tso->blocked_exceptions was not empty), and were blocked. + Normally this doesn't happen, because we would deliver the + exception directly if the target thread is blocked, but there are + small windows where it might occur on a multiprocessor (see + throwTo()). + + NB. we must be holding all the capabilities at this point, just + like resurrectThreads(). + -------------------------------------------------------------------------- */ + +void +performPendingThrowTos (StgTSO *threads) +{ + StgTSO *tso, *next; + Capability *cap; + step *step; + + for (tso = threads; tso != END_TSO_QUEUE; tso = next) { + next = tso->global_link; + + step = Bdescr((P_)tso)->step; + tso->global_link = step->threads; + step->threads = tso; + + debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id); + + cap = tso->cap; + maybePerformBlockedException(cap, tso); + } +}