From 297b05a9c9a27175e25cb8ec7b60dde51bfafbf3 Mon Sep 17 00:00:00 2001 From: "berthold@mathematik.uni-marburg.de" Date: Tue, 2 Sep 2008 16:13:13 +0000 Subject: [PATCH] Scheduler code cleanup This patch removes old code from the Schedule.c file. I removed GRAN code for GranSim, a simulator for parallel Haskell execution with GpH model. This code is inactive since ghc-4.x. Code for PARALLEL_HASKELL has been partially removed. The remaining code is valid, but can refer to nonexisting functionality in other files. --- rts/Schedule.c | 804 +++++++------------------------------------------------- 1 file changed, 96 insertions(+), 708 deletions(-) diff --git a/rts/Schedule.c b/rts/Schedule.c index 537cee0..94aac6c 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -31,15 +31,9 @@ #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" -#if defined(GRAN) || defined(PARALLEL_HASKELL) -# include "GranSimRts.h" -# include "GranSim.h" -# include "ParallelRts.h" -# include "Parallel.h" -# include "ParallelDebug.h" -# include "FetchMe.h" -# include "HLC.h" -#endif + +/* PARALLEL_HASKELL includes go here */ + #include "Sparks.h" #include "Capability.h" #include "Task.h" @@ -77,28 +71,6 @@ * Global variables * -------------------------------------------------------------------------- */ -#if defined(GRAN) - -StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */ -/* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */ - -/* - In GranSim we have a runnable and a blocked queue for each processor. - In order to minimise code changes new arrays run_queue_hds/tls - are created. run_queue_hd is then a short cut (macro) for - run_queue_hds[CurrentProc] (see GranSim.h). - -- HWL -*/ -StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC]; -StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC]; -StgTSO *ccalling_threadss[MAX_PROC]; -/* We use the same global list of threads (all_threads) in GranSim as in - the std RTS (i.e. we are cheating). However, we don't use this list in - the GranSim specific code at the moment (so we are only potentially - cheating). */ - -#else /* !GRAN */ - #if !defined(THREADED_RTS) // Blocked/sleeping thrads StgTSO *blocked_queue_hd = NULL; @@ -110,7 +82,6 @@ StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? * LOCK: sched_mutex+capability, or all capabilities */ StgTSO *blackhole_queue = NULL; -#endif /* The blackhole_queue should be checked for threads to wake up. See * Schedule.h for more thorough comment. @@ -134,10 +105,6 @@ nat recent_activity = ACTIVITY_YES; */ rtsBool sched_state = SCHED_RUNNING; -#if defined(GRAN) -StgTSO *CurrentTSO; -#endif - /* This is used in `TSO.h' and gcc 2.96 insists that this variable actually * exists - earlier gccs apparently didn't. * -= chak @@ -159,12 +126,6 @@ rtsBool shutting_down_scheduler = rtsFalse; Mutex sched_mutex; #endif -#if defined(PARALLEL_HASKELL) -StgTSO *LastTSO; -rtsTime TimeOfLastYield; -rtsBool emitSchedule = rtsTrue; -#endif - #if !defined(mingw32_HOST_OS) #define FORKPROCESS_PRIMOP_SUPPORTED #endif @@ -189,16 +150,10 @@ static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); -#if defined(GRAN) -static StgTSO *scheduleProcessEvent(rtsEvent *event); -#endif #if defined(PARALLEL_HASKELL) -static StgTSO *scheduleSendPendingMessages(void); -static void scheduleActivateSpark(void); -static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish); -#endif -#if defined(PAR) || defined(GRAN) -static void scheduleGranParReport(void); +static rtsBool scheduleGetRemoteWork(Capability *cap); +static void scheduleSendPendingMessages(void); +static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(StgTSO *t); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); @@ -225,11 +180,6 @@ static void deleteAllThreads (Capability *cap); static void deleteThread_(Capability *cap, StgTSO *tso); #endif -#if defined(PARALLEL_HASKELL) -StgTSO * createSparkThread(rtsSpark spark); -StgTSO * activateSpark (rtsSpark spark); -#endif - #ifdef DEBUG static char *whatNext_strs[] = { "(unknown)", @@ -279,6 +229,7 @@ addToRunQueue( Capability *cap, StgTSO *t ) This revolves around the global event queue, which determines what to do next. Therefore, it's more complicated than either the concurrent or the parallel (GUM) setup. + This version has been entirely removed (JB 2008/08). GUM version: GUM iterates over incoming messages. @@ -289,6 +240,12 @@ addToRunQueue( Capability *cap, StgTSO *t ) (see PendingFetches). This is not the ugliest code you could imagine, but it's bloody close. + (JB 2008/08) This version was formerly indicated by a PP-Flag PAR, + now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it, + as well as future GUM versions. This file has been refurbished to + only contain valid code, which is however incomplete, refers to + invalid includes etc. + ------------------------------------------------------------------------ */ static Capability * @@ -297,15 +254,8 @@ schedule (Capability *initialCapability, Task *task) StgTSO *t; Capability *cap; StgThreadReturnCode ret; -#if defined(GRAN) - rtsEvent *event; -#elif defined(PARALLEL_HASKELL) - StgTSO *tso; - GlobalTaskId pe; +#if defined(PARALLEL_HASKELL) rtsBool receivedFinish = rtsFalse; -# if defined(DEBUG) - nat tp_size, sp_size; // stats only -# endif #endif nat prev_what_next; rtsBool ready_to_gc; @@ -330,20 +280,12 @@ schedule (Capability *initialCapability, Task *task) #if defined(PARALLEL_HASKELL) #define TERMINATION_CONDITION (!receivedFinish) -#elif defined(GRAN) -#define TERMINATION_CONDITION ((event = get_next_event()) != (rtsEvent*)NULL) #else #define TERMINATION_CONDITION rtsTrue #endif while (TERMINATION_CONDITION) { -#if defined(GRAN) - /* Choose the processor with the next event */ - CurrentProc = event->proc; - CurrentTSO = event->tso; -#endif - #if defined(THREADED_RTS) if (first) { // don't yield the first time, we want a chance to run this @@ -456,6 +398,45 @@ schedule (Capability *initialCapability, Task *task) scheduleCheckBlockedThreads(cap); +#if defined(PARALLEL_HASKELL) + /* message processing and work distribution goes here */ + + /* if messages have been buffered... a NOOP in THREADED_RTS */ + scheduleSendPendingMessages(); + + /* If the run queue is empty,...*/ + if (emptyRunQueue(cap)) { + /* ...take one of our own sparks and turn it into a thread */ + scheduleActivateSpark(cap); + + /* if this did not work, try to steal a spark from someone else */ + if (emptyRunQueue(cap)) { + receivedFinish = scheduleGetRemoteWork(cap); + continue; // a new round, (hopefully) with new work + /* + in GUM, this a) sends out a FISH and returns IF no fish is + out already + b) (blocking) awaits and receives messages + + in Eden, this is only the blocking receive, as b) in GUM. + */ + } + } + + /* since we perform a blocking receive and continue otherwise, + either we never reach here or we definitely have work! */ + // from here: non-empty run queue + ASSERT(!emptyRunQueue(cap)); + + if (PacketsWaiting()) { /* now process incoming messages, if any + pending... + + CAUTION: scheduleGetRemoteWork called + above, waits for messages as well! */ + processMessages(cap, &receivedFinish); + } +#endif // PARALLEL_HASKELL + scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) cap = task->cap; // reload cap, it might have changed @@ -476,47 +457,14 @@ schedule (Capability *initialCapability, Task *task) continue; // nothing to do } -#if defined(PARALLEL_HASKELL) - scheduleSendPendingMessages(); - if (emptyRunQueue(cap) && scheduleActivateSpark()) - continue; - -#if defined(SPARKS) - ASSERT(next_fish_to_send_at==0); // i.e. no delayed fishes left! -#endif - - /* If we still have no work we need to send a FISH to get a spark - from another PE */ - if (emptyRunQueue(cap)) { - if (!scheduleGetRemoteWork(&receivedFinish)) continue; - ASSERT(rtsFalse); // should not happen at the moment - } - // from here: non-empty run queue. - // TODO: merge above case with this, only one call processMessages() ! - if (PacketsWaiting()) { /* process incoming messages, if - any pending... only in else - because getRemoteWork waits for - messages as well */ - receivedFinish = processMessages(); - } -#endif - -#if defined(GRAN) - scheduleProcessEvent(event); -#endif - // // Get a thread to run // t = popRunQueue(cap); -#if defined(GRAN) || defined(PAR) - scheduleGranParReport(); // some kind of debuging output -#else // Sanity check the thread we're about to run. This can be // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); -#endif #if defined(THREADED_RTS) // Check whether we can run this thread in the current task. @@ -727,23 +675,7 @@ run_thread: static void schedulePreLoop(void) { -#if defined(GRAN) - /* set up first event to get things going */ - /* ToDo: assign costs for system setup and init MainTSO ! */ - new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], - ContinueThread, - CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL); - - debugTrace (DEBUG_gran, - "GRAN: Init CurrentTSO (in schedule) = %p", - CurrentTSO); - IF_DEBUG(gran, G_TSO(CurrentTSO, 5)); - - if (RtsFlags.GranFlags.Light) { - /* Save current time; GranSim Light only */ - CurrentTSO->gran.clock = CurrentTime[CurrentProc]; - } -#endif + // initialisation for scheduler - what cannot go into initScheduler() } /* ----------------------------------------------------------------------------- @@ -1032,164 +964,15 @@ scheduleDetectDeadlock (Capability *cap, Task *task) } } -/* ---------------------------------------------------------------------------- - * Process an event (GRAN only) - * ------------------------------------------------------------------------- */ - -#if defined(GRAN) -static StgTSO * -scheduleProcessEvent(rtsEvent *event) -{ - StgTSO *t; - - if (RtsFlags.GranFlags.Light) - GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc - - /* adjust time based on time-stamp */ - if (event->time > CurrentTime[CurrentProc] && - event->evttype != ContinueThread) - CurrentTime[CurrentProc] = event->time; - - /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */ - if (!RtsFlags.GranFlags.Light) - handleIdlePEs(); - - IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n")); - - /* main event dispatcher in GranSim */ - switch (event->evttype) { - /* Should just be continuing execution */ - case ContinueThread: - IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n")); - /* ToDo: check assertion - ASSERT(run_queue_hd != (StgTSO*)NULL && - run_queue_hd != END_TSO_QUEUE); - */ - /* Ignore ContinueThreads for fetching threads (if synchr comm) */ - if (!RtsFlags.GranFlags.DoAsyncFetch && - procStatus[CurrentProc]==Fetching) { - debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - goto next_thread; - } - /* Ignore ContinueThreads for completed threads */ - if (CurrentTSO->what_next == ThreadComplete) { - debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - goto next_thread; - } - /* Ignore ContinueThreads for threads that are being migrated */ - if (PROCS(CurrentTSO)==Nowhere) { - debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - goto next_thread; - } - /* The thread should be at the beginning of the run queue */ - if (CurrentTSO!=run_queue_hds[CurrentProc]) { - debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n", - CurrentTSO->id, CurrentTSO, CurrentProc); - break; // run the thread anyway - } - /* - new_event(proc, proc, CurrentTime[proc], - FindWork, - (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL); - goto next_thread; - */ /* Catches superfluous CONTINUEs -- should be unnecessary */ - break; // now actually run the thread; DaH Qu'vam yImuHbej - - case FetchNode: - do_the_fetchnode(event); - goto next_thread; /* handle next event in event queue */ - - case GlobalBlock: - do_the_globalblock(event); - goto next_thread; /* handle next event in event queue */ - - case FetchReply: - do_the_fetchreply(event); - goto next_thread; /* handle next event in event queue */ - - case UnblockThread: /* Move from the blocked queue to the tail of */ - do_the_unblock(event); - goto next_thread; /* handle next event in event queue */ - - case ResumeThread: /* Move from the blocked queue to the tail of */ - /* the runnable queue ( i.e. Qu' SImqa'lu') */ - event->tso->gran.blocktime += - CurrentTime[CurrentProc] - event->tso->gran.blockedat; - do_the_startthread(event); - goto next_thread; /* handle next event in event queue */ - - case StartThread: - do_the_startthread(event); - goto next_thread; /* handle next event in event queue */ - - case MoveThread: - do_the_movethread(event); - goto next_thread; /* handle next event in event queue */ - - case MoveSpark: - do_the_movespark(event); - goto next_thread; /* handle next event in event queue */ - - case FindWork: - do_the_findwork(event); - goto next_thread; /* handle next event in event queue */ - - default: - barf("Illegal event type %u\n", event->evttype); - } /* switch */ - - /* This point was scheduler_loop in the old RTS */ - - IF_DEBUG(gran, debugBelch("GRAN: after main switch\n")); - - TimeOfLastEvent = CurrentTime[CurrentProc]; - TimeOfNextEvent = get_time_of_next_event(); - IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK - // CurrentTSO = ThreadQueueHd; - - IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", - TimeOfNextEvent)); - - if (RtsFlags.GranFlags.Light) - GranSimLight_leave_system(event, &ActiveTSO); - - EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice; - - IF_DEBUG(gran, - debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice)); - - /* in a GranSim setup the TSO stays on the run queue */ - t = CurrentTSO; - /* Take a thread from the run queue. */ - POP_RUN_QUEUE(t); // take_off_run_queue(t); - - IF_DEBUG(gran, - debugBelch("GRAN: About to run current thread, which is\n"); - G_TSO(t,5)); - - context_switch = 0; // turned on via GranYield, checking events and time slice - - IF_DEBUG(gran, - DumpGranEvent(GR_SCHEDULE, t)); - - procStatus[CurrentProc] = Busy; -} -#endif // GRAN /* ---------------------------------------------------------------------------- * Send pending messages (PARALLEL_HASKELL only) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) static StgTSO * scheduleSendPendingMessages(void) { - StgSparkPool *pool; - rtsSpark spark; - StgTSO *t; +#if defined(PARALLEL_HASKELL) # if defined(PAR) // global Mem.Mgmt., omit for now if (PendingFetches != END_BF_QUEUE) { @@ -1202,8 +985,8 @@ scheduleSendPendingMessages(void) // packets which have become too old... sendOldBuffers(); } -} #endif +} /* ---------------------------------------------------------------------------- * Activate spark threads (PARALLEL_HASKELL only) @@ -1211,57 +994,28 @@ scheduleSendPendingMessages(void) #if defined(PARALLEL_HASKELL) static void -scheduleActivateSpark(void) +scheduleActivateSpark(Capability *cap) { -#if defined(SPARKS) - ASSERT(emptyRunQueue()); -/* We get here if the run queue is empty and want some work. - We try to turn a spark into a thread, and add it to the run queue, - from where it will be picked up in the next iteration of the scheduler - loop. -*/ - - /* :-[ no local threads => look out for local sparks */ - /* the spark pool for the current PE */ - pool = &(cap.r.rSparks); // JB: cap = (old) MainCap - if (advisory_thread_count < RtsFlags.ParFlags.maxThreads && - pool->hd < pool->tl) { - /* - * ToDo: add GC code check that we really have enough heap afterwards!! - * Old comment: - * If we're here (no runnable threads) and we have pending - * sparks, we must have a space problem. Get enough space - * to turn one of those pending sparks into a - * thread... - */ + StgClosure *spark; - spark = findSpark(rtsFalse); /* get a spark */ - if (spark != (rtsSpark) NULL) { - tso = createThreadFromSpark(spark); /* turn the spark into a thread */ - IF_PAR_DEBUG(fish, // schedule, - debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n", - tso->id, tso, advisory_thread_count)); - - if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */ - IF_PAR_DEBUG(fish, // schedule, - debugBelch("==^^ failed to create thread from spark @ %lx\n", - spark)); - return rtsFalse; /* failed to generate a thread */ - } /* otherwise fall through & pick-up new tso */ - } else { - IF_PAR_DEBUG(fish, // schedule, - debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", - spark_queue_len(pool))); - return rtsFalse; /* failed to generate a thread */ - } - return rtsTrue; /* success in generating a thread */ - } else { /* no more threads permitted or pool empty */ - return rtsFalse; /* failed to generateThread */ - } -#else - tso = NULL; // avoid compiler warning only - return rtsFalse; /* dummy in non-PAR setup */ -#endif // SPARKS +/* We only want to stay here if the run queue is empty and we want some + work. We try to turn a spark into a thread, and add it to the run + queue, from where it will be picked up in the next iteration of the + scheduler loop. +*/ + if (!emptyRunQueue(cap)) + /* In the threaded RTS, another task might have pushed a thread + on our run queue in the meantime ? But would need a lock.. */ + return; + + spark = findSpark(cap); // defined in Sparks.c + + if (spark != NULL) { + debugTrace(DEBUG_sched, + "turning spark of closure %p into a thread", + (StgClosure *)spark); + createSparkThread(cap,spark); // defined in Sparks.c + } } #endif // PARALLEL_HASKELL @@ -1271,10 +1025,12 @@ scheduleActivateSpark(void) #if defined(PARALLEL_HASKELL) static rtsBool -scheduleGetRemoteWork(rtsBool *receivedFinish) +scheduleGetRemoteWork(Capability *cap) { - ASSERT(emptyRunQueue()); +#if defined(PARALLEL_HASKELL) + rtsBool receivedFinish = rtsFalse; + // idle() , i.e. send all buffers, wait for work if (RtsFlags.ParFlags.BufferTime) { IF_PAR_DEBUG(verbose, debugBelch("...send all pending data,")); @@ -1284,171 +1040,29 @@ scheduleGetRemoteWork(rtsBool *receivedFinish) sendImmediately(i); // send all messages away immediately } } -# ifndef SPARKS - //++EDEN++ idle() , i.e. send all buffers, wait for work - // suppress fishing in EDEN... just look for incoming messages - // (blocking receive) - IF_PAR_DEBUG(verbose, - debugBelch("...wait for incoming messages...\n")); - *receivedFinish = processMessages(); // blocking receive... - - // and reenter scheduling loop after having received something - // (return rtsFalse below) - -# else /* activate SPARKS machinery */ -/* We get here, if we have no work, tried to activate a local spark, but still - have no work. We try to get a remote spark, by sending a FISH message. - Thread migration should be added here, and triggered when a sequence of - fishes returns without work. */ - delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll); - - /* =8-[ no local sparks => look for work on other PEs */ - /* - * We really have absolutely no work. Send out a fish - * (there may be some out there already), and wait for - * something to arrive. We clearly can't run any threads - * until a SCHEDULE or RESUME arrives, and so that's what - * we're hoping to see. (Of course, we still have to - * respond to other types of messages.) - */ - rtsTime now = msTime() /*CURRENT_TIME*/; - IF_PAR_DEBUG(verbose, - debugBelch("-- now=%ld\n", now)); - IF_PAR_DEBUG(fish, // verbose, - if (outstandingFishes < RtsFlags.ParFlags.maxFishes && - (last_fish_arrived_at!=0 && - last_fish_arrived_at+delay > now)) { - debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n", - now, last_fish_arrived_at+delay, - last_fish_arrived_at, - delay); - }); - - if (outstandingFishes < RtsFlags.ParFlags.maxFishes && - advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when? - if (last_fish_arrived_at==0 || - (last_fish_arrived_at+delay <= now)) { // send FISH now! - /* outstandingFishes is set in sendFish, processFish; - avoid flooding system with fishes via delay */ - next_fish_to_send_at = 0; - } else { - /* ToDo: this should be done in the main scheduling loop to avoid the - busy wait here; not so bad if fish delay is very small */ - int iq = 0; // DEBUGGING -- HWL - next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send - /* send a fish when ready, but process messages that arrive in the meantime */ - do { - if (PacketsWaiting()) { - iq++; // DEBUGGING - *receivedFinish = processMessages(); - } - now = msTime(); - } while (!*receivedFinish || now sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count)); - } - - // JB: IMHO, this should all be hidden inside sendFish(...) - /* pe = choosePE(); - sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, - NEW_FISH_HUNGER); - - // Global statistics: count no. of fishes - if (RtsFlags.ParFlags.ParStats.Global && - RtsFlags.GcFlags.giveStats > NO_GC_STATS) { - globalParStats.tot_fish_mess++; - } - */ + /* this would be the place for fishing in GUM... - /* delayed fishes must have been sent by now! */ - next_fish_to_send_at = 0; - } - - *receivedFinish = processMessages(); -# endif /* SPARKS */ - - return rtsFalse; - /* NB: this function always returns rtsFalse, meaning the scheduler - loop continues with the next iteration; - rationale: - return code means success in finding work; we enter this function - if there is no local work, thus have to send a fish which takes - time until it arrives with work; in the meantime we should process - messages in the main loop; - */ -} -#endif // PARALLEL_HASKELL + if (no-earlier-fish-around) + sendFish(choosePe()); + */ -/* ---------------------------------------------------------------------------- - * PAR/GRAN: Report stats & debugging info(?) - * ------------------------------------------------------------------------- */ + // Eden:just look for incoming messages (blocking receive) + IF_PAR_DEBUG(verbose, + debugBelch("...wait for incoming messages...\n")); + processMessages(cap, &receivedFinish); // blocking receive... -#if defined(PAR) || defined(GRAN) -static void -scheduleGranParReport(void) -{ - ASSERT(run_queue_hd != END_TSO_QUEUE); - /* Take a thread from the run queue, if we have work */ - POP_RUN_QUEUE(t); // take_off_run_queue(END_TSO_QUEUE); + return receivedFinish; + // reenter scheduling look after having received something - /* If this TSO has got its outport closed in the meantime, - * it mustn't be run. Instead, we have to clean it up as if it was finished. - * It has to be marked as TH_DEAD for this purpose. - * If it is TH_TERM instead, it is supposed to have finished in the normal way. +#else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */ -JB: TODO: investigate wether state change field could be nuked - entirely and replaced by the normal tso state (whatnext - field). All we want to do is to kill tsos from outside. - */ + return rtsFalse; /* return value unused in THREADED_RTS */ - /* ToDo: write something to the log-file - if (RTSflags.ParFlags.granSimStats && !sameThread) - DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd); - - CurrentTSO = t; - */ - /* the spark pool for the current PE */ - pool = &(cap.r.rSparks); // cap = (old) MainCap - - IF_DEBUG(scheduler, - debugBelch("--=^ %d threads, %d sparks on [%#x]\n", - run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); - - IF_PAR_DEBUG(fish, - debugBelch("--=^ %d threads, %d sparks on [%#x]\n", - run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); - - if (RtsFlags.ParFlags.ParStats.Full && - (t->par.sparkname != (StgInt)0) && // only log spark generated threads - (emitSchedule || // forced emit - (t && LastTSO && t->id != LastTSO->id))) { - /* - we are running a different TSO, so write a schedule event to log file - NB: If we use fair scheduling we also have to write a deschedule - event for LastTSO; with unfair scheduling we know that the - previous tso has blocked whenever we switch to another tso, so - we don't need it in GUM for now - */ - IF_PAR_DEBUG(fish, // schedule, - debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname)); - - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0); - emitSchedule = rtsFalse; - } -} -#endif +#endif /* PARALLEL_HASKELL */ +} +#endif // PARALLEL_HASKELL /* ---------------------------------------------------------------------------- * After running a thread... @@ -1483,88 +1097,7 @@ schedulePostRunThread (StgTSO *t) } } -#if defined(PAR) - /* HACK 675: if the last thread didn't yield, make sure to print a - SCHEDULE event to the log file when StgRunning the next thread, even - if it is the same one as before */ - LastTSO = t; - TimeOfLastYield = CURRENT_TIME; -#endif - /* some statistics gathering in the parallel case */ - -#if defined(GRAN) || defined(PAR) || defined(EDEN) - switch (ret) { - case HeapOverflow: -# if defined(GRAN) - IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t)); - globalGranStats.tot_heapover++; -# elif defined(PAR) - globalParStats.tot_heapover++; -# endif - break; - - case StackOverflow: -# if defined(GRAN) - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - globalGranStats.tot_stackover++; -# elif defined(PAR) - // IF_DEBUG(par, - // DumpGranEvent(GR_DESCHEDULE, t); - globalParStats.tot_stackover++; -# endif - break; - - case ThreadYielding: -# if defined(GRAN) - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - globalGranStats.tot_yields++; -# elif defined(PAR) - // IF_DEBUG(par, - // DumpGranEvent(GR_DESCHEDULE, t); - globalParStats.tot_yields++; -# endif - break; - - case ThreadBlocked: -# if defined(GRAN) - debugTrace(DEBUG_sched, - "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure, - (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure))); - if (t->block_info.closure!=(StgClosure*)NULL) - print_bq(t->block_info.closure); - debugBelch("\n")); - - // ??? needed; should emit block before - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t - /* - ngoq Dogh! - ASSERT(procStatus[CurrentProc]==Busy || - ((procStatus[CurrentProc]==Fetching) && - (t->block_info.closure!=(StgClosure*)NULL))); - if (run_queue_hds[CurrentProc] == END_TSO_QUEUE && - !(!RtsFlags.GranFlags.DoAsyncFetch && - procStatus[CurrentProc]==Fetching)) - procStatus[CurrentProc] = Idle; - */ -# elif defined(PAR) -//++PAR++ blockThread() writes the event (change?) -# endif - break; - - case ThreadFinished: - break; - - default: - barf("parGlobalStats: unknown return code"); - break; - } -#endif } /* ----------------------------------------------------------------------------- @@ -1646,19 +1179,6 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) "--<< thread %ld (%s) stopped: HeapOverflow", (long)t->id, whatNext_strs[t->what_next]); -#if defined(GRAN) - ASSERT(!is_on_queue(t,CurrentProc)); -#elif defined(PARALLEL_HASKELL) - /* Currently we emit a DESCHEDULE event before GC in GUM. - ToDo: either add separate event to distinguish SYSTEM time from rest - or just nuke this DESCHEDULE (and the following SCHEDULE) */ - if (0 && RtsFlags.ParFlags.ParStats.Full) { - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0); - emitSchedule = rtsTrue; - } -#endif - if (context_switch) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the @@ -1744,28 +1264,9 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) if (t->what_next != prev_what_next) { return rtsTrue; } - -#if defined(GRAN) - ASSERT(!is_on_queue(t,CurrentProc)); - - IF_DEBUG(sanity, - //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs)."); - checkThreadQsSanity(rtsTrue)); - -#endif addToRunQueue(cap,t); -#if defined(GRAN) - /* add a ContinueThread event to actually process the thread */ - new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], - ContinueThread, - t, (StgClosure*)NULL, (rtsSpark*)NULL); - IF_GRAN_DEBUG(bq, - debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n"); - G_EVENTQ(0); - G_CURR_THREADQ(0)); -#endif return rtsFalse; } @@ -1780,42 +1281,6 @@ scheduleHandleThreadBlocked( StgTSO *t #endif ) { -#if defined(GRAN) - IF_DEBUG(scheduler, - debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure))); - if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure)); - - // ??? needed; should emit block before - IF_DEBUG(gran, - DumpGranEvent(GR_DESCHEDULE, t)); - prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t - /* - ngoq Dogh! - ASSERT(procStatus[CurrentProc]==Busy || - ((procStatus[CurrentProc]==Fetching) && - (t->block_info.closure!=(StgClosure*)NULL))); - if (run_queue_hds[CurrentProc] == END_TSO_QUEUE && - !(!RtsFlags.GranFlags.DoAsyncFetch && - procStatus[CurrentProc]==Fetching)) - procStatus[CurrentProc] = Idle; - */ -#elif defined(PAR) - IF_DEBUG(scheduler, - debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure)); - IF_PAR_DEBUG(bq, - - if (t->block_info.closure!=(StgClosure*)NULL) - print_bq(t->block_info.closure)); - - /* Send a fetch (if BlockedOnGA) and dump event to log file */ - blockThread(t); - - /* whatever we schedule next, we must log that schedule */ - emitSchedule = rtsTrue; - -#else /* !GRAN */ // We don't need to do anything. The thread is blocked, and it // has tidied up its stack and placed itself on whatever queue @@ -1838,12 +1303,6 @@ scheduleHandleThreadBlocked( StgTSO *t debugTraceEnd(); } #endif - - /* Only for dumping event to log file - ToDo: do I need this in GranSim, too? - blockThread(t); - */ -#endif } /* ----------------------------------------------------------------------------- @@ -1862,48 +1321,6 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", (unsigned long)t->id, whatNext_strs[t->what_next]); -#if defined(GRAN) - endThread(t, CurrentProc); // clean-up the thread -#elif defined(PARALLEL_HASKELL) - /* For now all are advisory -- HWL */ - //if(t->priority==AdvisoryPriority) ?? - advisory_thread_count--; // JB: Caution with this counter, buggy! - -# if defined(DIST) - if(t->dist.priority==RevalPriority) - FinishReval(t); -# endif - -# if defined(EDENOLD) - // the thread could still have an outport... (BUG) - if (t->eden.outport != -1) { - // delete the outport for the tso which has finished... - IF_PAR_DEBUG(eden_ports, - debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n", - t->eden.outport, t->id)); - deleteOPT(t); - } - // thread still in the process (HEAVY BUG! since outport has just been closed...) - if (t->eden.epid != -1) { - IF_PAR_DEBUG(eden_ports, - debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n", - t->id, t->eden.epid)); - removeTSOfromProcess(t); - } -# endif - -# if defined(PAR) - if (RtsFlags.ParFlags.ParStats.Full && - !RtsFlags.ParFlags.ParStats.Suppressed) - DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */); - - // t->par only contains statistics: left out for now... - IF_PAR_DEBUG(fish, - debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n", - t->id,t,t->par.sparkname)); -# endif -#endif // PARALLEL_HASKELL - // // Check whether the thread that just completed was a bound // thread, and if so return with the result. @@ -2083,17 +1500,6 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) } #endif -#if defined(GRAN) - /* add a ContinueThread event to continue execution of current thread */ - new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], - ContinueThread, - t, (StgClosure*)NULL, (rtsSpark*)NULL); - IF_GRAN_DEBUG(bq, - debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n"); - G_EVENTQ(0); - G_CURR_THREADQ(0)); -#endif /* GRAN */ - return cap; } @@ -2488,13 +1894,6 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id); -#if defined(GRAN) - /* GranSim specific init */ - CurrentTSO = m->tso; // the TSO to run - procStatus[MainProc] = Busy; // status of main PE - CurrentProc = MainProc; // PE to run it on -#endif - cap = schedule(cap,task); ASSERT(task->stat != NoStatus); @@ -2543,18 +1942,7 @@ workerStart(Task *task) void initScheduler(void) { -#if defined(GRAN) - nat i; - for (i=0; i<=MAX_PROC; i++) { - run_queue_hds[i] = END_TSO_QUEUE; - run_queue_tls[i] = END_TSO_QUEUE; - blocked_queue_hds[i] = END_TSO_QUEUE; - blocked_queue_tls[i] = END_TSO_QUEUE; - ccalling_threadss[i] = END_TSO_QUEUE; - blackhole_queue[i] = END_TSO_QUEUE; - sleeping_queue = END_TSO_QUEUE; - } -#elif !defined(THREADED_RTS) +#if !defined(THREADED_RTS) blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; -- 1.7.10.4