X-Git-Url: http://git.megacz.com/?a=blobdiff_plain;f=ghc%2Frts%2FSchedule.c;h=b0271153a63fd53b328f8c66ea23e94412966761;hb=dbef766ce79e37a74468a07a93b15ba1f06fe8f8;hp=68f22101e726504f81b1653b5252c8d949b7039e;hpb=03b0ad1099f0d17bd8ac26fef9dff82d2dfbdf85;p=ghc-hetmet.git diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 68f2210..b027115 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,20 +1,26 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.70 2000/05/08 15:57:01 simonmar Exp $ + * $Id: Schedule.c,v 1.108 2001/11/26 16:54:22 simonmar Exp $ * * (c) The GHC Team, 1998-2000 * * Scheduler * - * The main scheduling code in GranSim is quite different from that in std - * (concurrent) Haskell: while concurrent Haskell just iterates over the - * threads in the runnable queue, GranSim is event driven, i.e. it iterates - * over the events in the global event queue. -- HWL + * Different GHC ways use this scheduler quite differently (see comments below) + * Here is the global picture: + * + * WAY Name CPP flag What's it for + * -------------------------------------- + * mp GUM PAR Parallel execution on a distributed memory machine + * s SMP SMP Parallel execution on a shared memory machine + * mg GranSim GRAN Simulation of parallel execution + * md GUM/GdH DIST Distributed execution (based on GUM) * --------------------------------------------------------------------------*/ //@node Main scheduling code, , , //@section Main scheduling code -/* Version with scheduler monitor support for SMPs. +/* + * Version with scheduler monitor support for SMPs (WAY=s): This design provides a high-level API to create and schedule threads etc. as documented in the SMP design document. @@ -32,6 +38,24 @@ In a non-SMP build, there is one global capability, namely MainRegTable. SDM & KH, 10/99 + + * Version with support for distributed memory parallelism aka GUM (WAY=mp): + + The main scheduling loop in GUM iterates until a finish message is received. + In that case a global flag @receivedFinish@ is set and this instance of + the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages() + for the handling of incoming messages, such as PP_FINISH. + Note that in the parallel case we have a system manager that coordinates + different PEs, each of which are running one instance of the RTS. + See ghc/rts/parallel/SysMan.c for the main routine of the parallel program. + From this routine processes executing ghc/rts/Main.c are spawned. -- HWL + + * Version with support for simulating parallel execution aka GranSim (WAY=mg): + + The main scheduling code in GranSim is quite different from that in std + (concurrent) Haskell: while concurrent Haskell just iterates over the + threads in the runnable queue, GranSim is event driven, i.e. it iterates + over the events in the global event queue. -- HWL */ //@menu @@ -50,6 +74,7 @@ //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code //@subsection Includes +#include "PosixSource.h" #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" @@ -57,12 +82,11 @@ #include "Storage.h" #include "StgRun.h" #include "StgStartup.h" -#include "GC.h" #include "Hooks.h" #include "Schedule.h" #include "StgMiscClosures.h" #include "Storage.h" -#include "Evaluator.h" +#include "Interpreter.h" #include "Exception.h" #include "Printer.h" #include "Main.h" @@ -71,6 +95,11 @@ #include "Stats.h" #include "Itimer.h" #include "Prelude.h" +#ifdef PROFILING +#include "Proftimer.h" +#include "ProfHeap.h" +#include "RetainerProfile.h" +#endif #if defined(GRAN) || defined(PAR) # include "GranSimRts.h" # include "GranSim.h" @@ -144,6 +173,7 @@ StgTSO *ccalling_threadss[MAX_PROC]; StgTSO *run_queue_hd, *run_queue_tl; StgTSO *blocked_queue_hd, *blocked_queue_tl; +StgTSO *sleeping_queue; /* perhaps replace with a hash table? */ #endif @@ -156,7 +186,6 @@ StgTSO *all_threads; */ static StgTSO *suspended_ccalling_threads; -static void GetRoots(void); static StgTSO *threadStackOverflow(StgTSO *tso); /* KH: The following two flags are shared memory locations. There is no need @@ -200,19 +229,22 @@ StgThreadID next_thread_id = 1; * Locks required: sched_mutex. */ #ifdef SMP -//@cindex free_capabilities -//@cindex n_free_capabilities Capability *free_capabilities; /* Available capabilities for running threads */ nat n_free_capabilities; /* total number of available capabilities */ #else -//@cindex MainRegTable -Capability MainRegTable; /* for non-SMP, we have one global capability */ +Capability MainCapability; /* for non-SMP, we have one global capability */ #endif #if defined(GRAN) StgTSO *CurrentTSO; #endif +/* This is used in `TSO.h' and gcc 2.96 insists that this variable actually + * exists - earlier gccs apparently didn't. + * -= chak + */ +StgTSO dummy_tso; + rtsBool ready_to_gc; /* All our current task ids, saved in case we need to kill them later. @@ -254,13 +286,14 @@ nat await_death; #if defined(PAR) StgTSO *LastTSO; rtsTime TimeOfLastYield; +rtsBool emitSchedule = rtsTrue; #endif #if DEBUG char *whatNext_strs[] = { "ThreadEnterGHC", "ThreadRunGHC", - "ThreadEnterHugs", + "ThreadEnterInterp", "ThreadKilled", "ThreadComplete" }; @@ -274,6 +307,11 @@ char *threadReturnCode_strs[] = { }; #endif +#ifdef PAR +StgTSO * createSparkThread(rtsSpark spark); +StgTSO * activateSpark (rtsSpark spark); +#endif + /* * The thread state for the main thread. // ToDo: check whether not needed any more @@ -332,6 +370,10 @@ schedule( void ) rtsSpark spark; StgTSO *tso; GlobalTaskId pe; + rtsBool receivedFinish = rtsFalse; +# if defined(DEBUG) + nat tp_size, sp_size; // stats only +# endif #endif rtsBool was_interrupted = rtsFalse; @@ -363,8 +405,8 @@ schedule( void ) #elif defined(PAR) - while (!GlobalStopPending) { /* GlobalStopPending set in par_exit */ - + while (!receivedFinish) { /* set by processMessages */ + /* when receiving PP_FINISH message */ #else while (1) { @@ -379,14 +421,7 @@ schedule( void ) */ if (interrupted) { IF_DEBUG(scheduler, sched_belch("interrupted")); - for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) { - deleteThread(t); - } - for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) { - deleteThread(t); - } - run_queue_hd = run_queue_tl = END_TSO_QUEUE; - blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; + deleteAllThreads(); interrupted = rtsFalse; was_interrupted = rtsTrue; } @@ -411,6 +446,7 @@ schedule( void ) pthread_cond_broadcast(&m->wakeup); break; case ThreadKilled: + if (m->ret) *(m->ret) = NULL; *prev = m->link; if (was_interrupted) { m->stat = Interrupted; @@ -425,7 +461,8 @@ schedule( void ) } } -#else +#else // not SMP + # if defined(PAR) /* in GUM do this only on the Main PE */ if (IAmMainThread) @@ -443,6 +480,7 @@ schedule( void ) m->stat = Success; return; } else { + if (m->ret) { *(m->ret) = NULL; }; if (was_interrupted) { m->stat = Interrupted; } else { @@ -471,23 +509,16 @@ schedule( void ) for (; n > 0; n--) { StgClosure *spark; - spark = findSpark(); + spark = findSpark(rtsFalse); if (spark == NULL) { break; /* no more sparks in the pool */ } else { /* I'd prefer this to be done in activateSpark -- HWL */ /* tricky - it needs to hold the scheduler lock and * not try to re-acquire it -- SDM */ - StgTSO *tso; - tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); - pushClosure(tso,spark); - PUSH_ON_RUN_QUEUE(tso); -#ifdef PAR - advisory_thread_count++; -#endif - + createSparkThread(spark); IF_DEBUG(scheduler, - sched_belch("turning spark of closure %p into a thread", + sched_belch("==^^ turning spark of closure %p into a thread", (StgClosure *)spark)); } } @@ -498,7 +529,14 @@ schedule( void ) pthread_cond_signal(&thread_ready_cond); } } -#endif /* SMP */ +#endif // SMP + + /* check for signals each time around the scheduler */ +#ifndef mingw32_TARGET_OS + if (signals_pending()) { + startSignalHandlers(); + } +#endif /* Check whether any waiting threads need to be woken up. If the * run queue is empty, and there are no other tasks running, we @@ -506,7 +544,7 @@ schedule( void ) * ToDo: what if another client comes along & requests another * main thread? */ - if (blocked_queue_hd != END_TSO_QUEUE) { + if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) { awaitEvent( (run_queue_hd == END_TSO_QUEUE) #ifdef SMP @@ -514,13 +552,8 @@ schedule( void ) #endif ); } - - /* check for signals each time around the scheduler */ -#ifndef __MINGW32__ - if (signals_pending()) { - start_signal_handlers(); - } -#endif + /* we can be interrupted while waiting for I/O... */ + if (interrupted) continue; /* * Detect deadlock: when we have no threads to run, there are no @@ -533,37 +566,44 @@ schedule( void ) * If no threads are black holed, we have a deadlock situation, so * inform all the main threads. */ -#ifdef SMP +#ifndef PAR if (blocked_queue_hd == END_TSO_QUEUE && run_queue_hd == END_TSO_QUEUE - && (n_free_capabilities == RtsFlags.ParFlags.nNodes)) + && sleeping_queue == END_TSO_QUEUE +#ifdef SMP + && (n_free_capabilities == RtsFlags.ParFlags.nNodes) +#endif + ) { - IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes...")); - detectBlackHoles(); - if (run_queue_hd == END_TSO_QUEUE) { - StgMainThread *m; - for (m = main_threads; m != NULL; m = m->link) { + IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); + GarbageCollect(GetRoots,rtsTrue); + if (blocked_queue_hd == END_TSO_QUEUE + && run_queue_hd == END_TSO_QUEUE + && sleeping_queue == END_TSO_QUEUE) { + IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes...")); + detectBlackHoles(); + if (run_queue_hd == END_TSO_QUEUE) { + StgMainThread *m = main_threads; +#ifdef SMP + for (; m != NULL; m = m->link) { + deleteThread(m->tso); + m->ret = NULL; + m->stat = Deadlock; + pthread_cond_broadcast(&m->wakeup); + } + main_threads = NULL; +#else + deleteThread(m->tso); m->ret = NULL; m->stat = Deadlock; - pthread_cond_broadcast(&m->wakeup); + main_threads = m->link; + return; +#endif } - main_threads = NULL; - } - } -#else /* ! SMP */ - if (blocked_queue_hd == END_TSO_QUEUE - && run_queue_hd == END_TSO_QUEUE) - { - IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes...")); - detectBlackHoles(); - if (run_queue_hd == END_TSO_QUEUE) { - StgMainThread *m = main_threads; - m->ret = NULL; - m->stat = Deadlock; - main_threads = m->link; - return; } } +#elif defined(PAR) + /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */ #endif #ifdef SMP @@ -599,7 +639,7 @@ schedule( void ) if (!RtsFlags.GranFlags.Light) handleIdlePEs(); - IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n")) + IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n")); /* main event dispatcher in GranSim */ switch (event->evttype) { @@ -713,7 +753,7 @@ schedule( void ) IF_DEBUG(gran, fprintf(stderr, "GRAN: About to run current thread, which is\n"); - G_TSO(t,5)) + G_TSO(t,5)); context_switch = 0; // turned on via GranYield, checking events and time slice @@ -723,14 +763,13 @@ schedule( void ) procStatus[CurrentProc] = Busy; #elif defined(PAR) - if (PendingFetches != END_BF_QUEUE) { processFetches(); } /* ToDo: phps merge with spark activation above */ /* check whether we have local work and send requests if we have none */ - if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */ + if (EMPTY_RUN_QUEUE()) { /* no runnable threads */ /* :-[ no local threads => look out for local sparks */ /* the spark pool for the current PE */ pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable @@ -744,8 +783,8 @@ schedule( void ) * to turn one of those pending sparks into a * thread... */ - - spark = findSpark(); /* get a spark */ + + spark = findSpark(rtsFalse); /* get a spark */ if (spark != (rtsSpark) NULL) { tso = activateSpark(spark); /* turn the spark into a thread */ IF_PAR_DEBUG(schedule, @@ -762,9 +801,13 @@ schedule( void ) spark_queue_len(pool))); goto next_thread; } - } else + } + + /* If we still have no work we need to send a FISH to get a spark + from another PE + */ + if (EMPTY_RUN_QUEUE()) { /* =8-[ no local sparks => look for work on other PEs */ - { /* * We really have absolutely no work. Send out a fish * (there may be some out there already), and wait for @@ -773,28 +816,48 @@ schedule( void ) * we're hoping to see. (Of course, we still have to * respond to other types of messages.) */ - if (//!fishing && - outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // && - // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) { - /* fishing set in sendFish, processFish; + TIME now = msTime() /*CURRENT_TIME*/; + IF_PAR_DEBUG(verbose, + belch("-- now=%ld", now)); + IF_PAR_DEBUG(verbose, + if (outstandingFishes < RtsFlags.ParFlags.maxFishes && + (last_fish_arrived_at!=0 && + last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) { + belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)", + last_fish_arrived_at+RtsFlags.ParFlags.fishDelay, + last_fish_arrived_at, + RtsFlags.ParFlags.fishDelay, now); + }); + + if (outstandingFishes < RtsFlags.ParFlags.maxFishes && + (last_fish_arrived_at==0 || + (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) { + /* outstandingFishes is set in sendFish, processFish; avoid flooding system with fishes via delay */ pe = choosePE(); sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, NEW_FISH_HUNGER); + + // Global statistics: count no. of fishes + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + globalParStats.tot_fish_mess++; + } } - - processMessages(); + + receivedFinish = processMessages(); goto next_thread; - // ReSchedule(0); } } else if (PacketsWaiting()) { /* Look for incoming messages */ - processMessages(); + receivedFinish = processMessages(); } /* Now we are sure that we have some work available */ ASSERT(run_queue_hd != END_TSO_QUEUE); + /* Take a thread from the run queue, if we have work */ t = POP_RUN_QUEUE(); // take_off_run_queue(END_TSO_QUEUE); + IF_DEBUG(sanity,checkTSO(t)); /* ToDo: write something to the log-file if (RTSflags.ParFlags.granSimStats && !sameThread) @@ -805,17 +868,23 @@ schedule( void ) /* the spark pool for the current PE */ pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable - IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", - spark_queue_len(pool), - CURRENT_PROC, - pool->hd, pool->tl, pool->base, pool->lim)); - - IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", - run_queue_len(), CURRENT_PROC, - run_queue_hd, run_queue_tl)); + IF_DEBUG(scheduler, + belch("--=^ %d threads, %d sparks on [%#x]", + run_queue_len(), spark_queue_len(pool), CURRENT_PROC)); + +#if 1 + if (0 && RtsFlags.ParFlags.ParStats.Full && + t && LastTSO && t->id != LastTSO->id && + LastTSO->why_blocked == NotBlocked && + LastTSO->what_next != ThreadComplete) { + // if previously scheduled TSO not blocked we have to record the context switch + DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC, + GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0); + } -#if 0 - if (t != LastTSO) { + if (RtsFlags.ParFlags.ParStats.Full && + (emitSchedule /* forced emit */ || + (t && LastTSO && t->id != LastTSO->id))) { /* we are running a different TSO, so write a schedule event to log file NB: If we use fair scheduling we also have to write a deschedule @@ -825,8 +894,9 @@ schedule( void ) */ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0); - + emitSchedule = rtsFalse; } + #endif #else /* !GRAN && !PAR */ @@ -834,6 +904,9 @@ schedule( void ) */ ASSERT(run_queue_hd != END_TSO_QUEUE); t = POP_RUN_QUEUE(); + + // Sanity check the thread we're about to run. This can be + // expensive if there is lots of thread switching going on... IF_DEBUG(sanity,checkTSO(t)); #endif @@ -845,51 +918,54 @@ schedule( void ) free_capabilities = cap->link; n_free_capabilities--; #else - cap = &MainRegTable; + cap = &MainCapability; #endif + + cap->r.rCurrentTSO = t; - cap->rCurrentTSO = t; - - /* set the context_switch flag + /* context switches are now initiated by the timer signal, unless + * the user specified "context switch as often as possible", with + * +RTS -C0 */ - if (run_queue_hd == END_TSO_QUEUE) - context_switch = 0; + if ( +#ifdef PROFILING + RtsFlags.ProfFlags.profileInterval == 0 || +#endif + (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 + && (run_queue_hd != END_TSO_QUEUE + || blocked_queue_hd != END_TSO_QUEUE + || sleeping_queue != END_TSO_QUEUE))) + context_switch = 1; else - context_switch = 1; + context_switch = 0; RELEASE_LOCK(&sched_mutex); IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", t->id, t, whatNext_strs[t->what_next])); +#ifdef PROFILING + startHeapProfTimer(); +#endif + /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ /* Run the current thread */ - switch (cap->rCurrentTSO->what_next) { + switch (cap->r.rCurrentTSO->what_next) { case ThreadKilled: case ThreadComplete: - /* Thread already finished, return to scheduler. */ - ret = ThreadFinished; - break; + /* Thread already finished, return to scheduler. */ + ret = ThreadFinished; + break; case ThreadEnterGHC: - ret = StgRun((StgFunPtr) stg_enterStackTop, cap); - break; + ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r); + break; case ThreadRunGHC: - ret = StgRun((StgFunPtr) stg_returnToStackTop, cap); - break; - case ThreadEnterHugs: -#ifdef INTERPRETER - { - StgClosure* c; - IF_DEBUG(scheduler,sched_belch("entering Hugs")); - c = (StgClosure *)(cap->rCurrentTSO->sp[0]); - cap->rCurrentTSO->sp += 1; - ret = enter(cap,c); - break; - } -#else - barf("Panic: entered a BCO but no bytecode interpreter in this build"); -#endif + ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r); + break; + case ThreadEnterInterp: + ret = interpretBCO(cap); + break; default: barf("schedule: invalid what_next field"); } @@ -897,6 +973,7 @@ schedule( void ) /* Costs for the scheduler are assigned to CCS_SYSTEM */ #ifdef PROFILING + stopHeapProfTimer(); CCCS = CCS_SYSTEM; #endif @@ -907,18 +984,78 @@ schedule( void ) #elif !defined(GRAN) && !defined(PAR) IF_DEBUG(scheduler,fprintf(stderr,"scheduler: ");); #endif - t = cap->rCurrentTSO; + t = cap->r.rCurrentTSO; #if defined(PAR) /* HACK 675: if the last thread didn't yield, make sure to print a SCHEDULE event to the log file when StgRunning the next thread, even if it is the same one as before */ - LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; + LastTSO = t; TimeOfLastYield = CURRENT_TIME; #endif switch (ret) { case HeapOverflow: +#if defined(GRAN) + IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t)); + globalGranStats.tot_heapover++; +#elif defined(PAR) + globalParStats.tot_heapover++; +#endif + + // did the task ask for a large block? + if (cap->r.rHpAlloc > BLOCK_SIZE_W) { + // if so, get one and push it on the front of the nursery. + bdescr *bd; + nat blocks; + + blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE; + + IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", + t->id, t, + whatNext_strs[t->what_next], blocks)); + + // don't do this if it would push us over the + // alloc_blocks_lim limit; we'll GC first. + if (alloc_blocks + blocks < alloc_blocks_lim) { + + alloc_blocks += blocks; + bd = allocGroup( blocks ); + + // link the new group into the list + bd->link = cap->r.rCurrentNursery; + bd->u.back = cap->r.rCurrentNursery->u.back; + if (cap->r.rCurrentNursery->u.back != NULL) { + cap->r.rCurrentNursery->u.back->link = bd; + } else { + ASSERT(g0s0->blocks == cap->r.rCurrentNursery && + g0s0->blocks == cap->r.rNursery); + cap->r.rNursery = g0s0->blocks = bd; + } + cap->r.rCurrentNursery->u.back = bd; + + // initialise it as a nursery block + bd->step = g0s0; + bd->gen_no = 0; + bd->flags = 0; + bd->free = bd->start; + + // don't forget to update the block count in g0s0. + g0s0->n_blocks += blocks; + ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks); + + // now update the nursery to point to the new block + cap->r.rCurrentNursery = bd; + + // we might be unlucky and have another thread get on the + // run queue before us and steal the large block, but in that + // case the thread will just end up requesting another large + // block. + PUSH_ON_RUN_QUEUE(t); + break; + } + } + /* make all the running tasks block on a condition variable, * maybe set context_switch and wait till they all pile in, * then have them wait on a GC condition variable. @@ -928,6 +1065,15 @@ schedule( void ) threadPaused(t); #if defined(GRAN) ASSERT(!is_on_queue(t,CurrentProc)); +#elif defined(PAR) + /* Currently we emit a DESCHEDULE event before GC in GUM. + ToDo: either add separate event to distinguish SYSTEM time from rest + or just nuke this DESCHEDULE (and the following SCHEDULE) */ + if (0 && RtsFlags.ParFlags.ParStats.Full) { + DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, + GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0); + emitSchedule = rtsTrue; + } #endif ready_to_gc = rtsTrue; @@ -937,6 +1083,15 @@ schedule( void ) break; case StackOverflow: +#if defined(GRAN) + IF_DEBUG(gran, + DumpGranEvent(GR_DESCHEDULE, t)); + globalGranStats.tot_stackover++; +#elif defined(PAR) + // IF_DEBUG(par, + // DumpGranEvent(GR_DESCHEDULE, t); + globalParStats.tot_stackover++; +#endif IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", t->id, t, whatNext_strs[t->what_next])); /* just adjust the stack for this thread, then pop it back @@ -968,8 +1123,9 @@ schedule( void ) DumpGranEvent(GR_DESCHEDULE, t)); globalGranStats.tot_yields++; #elif defined(PAR) - IF_DEBUG(par, - DumpGranEvent(GR_DESCHEDULE, t)); + // IF_DEBUG(par, + // DumpGranEvent(GR_DESCHEDULE, t); + globalParStats.tot_yields++; #endif /* put the thread back on the run queue. Then, if we're ready to * GC, check whether this is the last task to stop. If so, wake @@ -977,7 +1133,7 @@ schedule( void ) * GC is finished. */ IF_DEBUG(scheduler, - if (t->what_next == ThreadEnterHugs) { + if (t->what_next == ThreadEnterInterp) { /* ToDo: or maybe a timer expired when we were in Hugs? * or maybe someone hit ctrl-C */ @@ -1002,7 +1158,18 @@ schedule( void ) //belch("&& Doing sanity check on all ThreadQueues (and their TSOs)."); checkThreadQsSanity(rtsTrue)); #endif +#if defined(PAR) + if (RtsFlags.ParFlags.doFairScheduling) { + /* this does round-robin scheduling; good for concurrency */ + APPEND_TO_RUN_QUEUE(t); + } else { + /* this does unfair scheduling; good for parallelism */ + PUSH_ON_RUN_QUEUE(t); + } +#else + /* this does round-robin scheduling; good for concurrency */ APPEND_TO_RUN_QUEUE(t); +#endif #if defined(GRAN) /* add a ContinueThread event to actually process the thread */ new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc], @@ -1011,7 +1178,7 @@ schedule( void ) IF_GRAN_DEBUG(bq, belch("GRAN: eventq and runnableq after adding yielded thread to queue again:"); G_EVENTQ(0); - G_CURR_THREADQ(0)) + G_CURR_THREADQ(0)); #endif /* GRAN */ break; @@ -1037,16 +1204,19 @@ schedule( void ) procStatus[CurrentProc] = Idle; */ #elif defined(PAR) - IF_DEBUG(par, - DumpGranEvent(GR_DESCHEDULE, t)); + IF_DEBUG(scheduler, + belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", + t->id, t, whatNext_strs[t->what_next], t->block_info.closure)); + IF_PAR_DEBUG(bq, + + if (t->block_info.closure!=(StgClosure*)NULL) + print_bq(t->block_info.closure)); /* Send a fetch (if BlockedOnGA) and dump event to log file */ blockThread(t); - IF_DEBUG(scheduler, - belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", - t->id, t, whatNext_strs[t->what_next], t->block_info.closure); - if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure)); + /* whatever we schedule next, we must log that schedule */ + emitSchedule = rtsTrue; #else /* !GRAN */ /* don't need to do anything. Either the thread is blocked on @@ -1080,8 +1250,17 @@ schedule( void ) #if defined(GRAN) endThread(t, CurrentProc); // clean-up the thread #elif defined(PAR) + /* For now all are advisory -- HWL */ + //if(t->priority==AdvisoryPriority) ?? advisory_thread_count--; - if (RtsFlags.ParFlags.ParStats.Full) + +# ifdef DIST + if(t->dist.priority==RevalPriority) + FinishReval(t); +# endif + + if (RtsFlags.ParFlags.ParStats.Full && + !RtsFlags.ParFlags.ParStats.Suppressed) DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */); #endif break; @@ -1096,6 +1275,15 @@ schedule( void ) n_free_capabilities++; #endif +#ifdef PROFILING + if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) { + GarbageCollect(GetRoots, rtsTrue); + heapCensus(); + performHeapProfile = rtsFalse; + ready_to_gc = rtsFalse; // we already GC'd + } +#endif + #ifdef SMP if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) #else @@ -1123,42 +1311,50 @@ schedule( void ) IF_GRAN_DEBUG(bq, fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n"); G_EVENTQ(0); - G_CURR_THREADQ(0)) + G_CURR_THREADQ(0)); #endif /* GRAN */ } + #if defined(GRAN) next_thread: IF_GRAN_DEBUG(unused, print_eventq(EventHd)); event = get_next_event(); - #elif defined(PAR) next_thread: /* ToDo: wait for next message to arrive rather than busy wait */ - -#else /* GRAN */ - /* not any more - next_thread: - t = take_off_run_queue(END_TSO_QUEUE); - */ #endif /* GRAN */ + } /* end of while(1) */ + + IF_PAR_DEBUG(verbose, + belch("== Leaving schedule() after having received Finish")); } -/* A hack for Hugs concurrency support. Needs sanitisation (?) */ +/* --------------------------------------------------------------------------- + * deleteAllThreads(): kill all the live threads. + * + * This is used when we catch a user interrupt (^C), before performing + * any necessary cleanups and running finalizers. + * ------------------------------------------------------------------------- */ + void deleteAllThreads ( void ) { StgTSO* t; - IF_DEBUG(scheduler,sched_belch("deleteAllThreads()")); + IF_DEBUG(scheduler,sched_belch("deleting all threads")); for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) { - deleteThread(t); + deleteThread(t); } for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) { - deleteThread(t); + deleteThread(t); + } + for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) { + deleteThread(t); } run_queue_hd = run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE; + sleeping_queue = END_TSO_QUEUE; } /* startThread and insertThread are now in GranSim.c -- HWL */ @@ -1182,21 +1378,25 @@ void deleteAllThreads ( void ) * ------------------------------------------------------------------------- */ StgInt -suspendThread( Capability *cap ) +suspendThread( StgRegTable *reg ) { nat tok; + Capability *cap; + + // assume that *reg is a pointer to the StgRegTable part of a Capability + cap = (Capability *)((void *)reg - sizeof(StgFunTable)); ACQUIRE_LOCK(&sched_mutex); IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id)); + sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); - threadPaused(cap->rCurrentTSO); - cap->rCurrentTSO->link = suspended_ccalling_threads; - suspended_ccalling_threads = cap->rCurrentTSO; + threadPaused(cap->r.rCurrentTSO); + cap->r.rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->r.rCurrentTSO; /* Use the thread ID as the token; it should be unique */ - tok = cap->rCurrentTSO->id; + tok = cap->r.rCurrentTSO->id; #ifdef SMP cap->link = free_capabilities; @@ -1208,7 +1408,7 @@ suspendThread( Capability *cap ) return tok; } -Capability * +StgRegTable * resumeThread( StgInt tok ) { StgTSO *tso, **prev; @@ -1228,6 +1428,7 @@ resumeThread( StgInt tok ) if (tso == END_TSO_QUEUE) { barf("resumeThread: thread not found"); } + tso->link = END_TSO_QUEUE; #ifdef SMP while (free_capabilities == NULL) { @@ -1239,13 +1440,13 @@ resumeThread( StgInt tok ) free_capabilities = cap->link; n_free_capabilities--; #else - cap = &MainRegTable; + cap = &MainCapability; #endif - cap->rCurrentTSO = tso; + cap->r.rCurrentTSO = tso; RELEASE_LOCK(&sched_mutex); - return cap; + return &cap->r; } @@ -1272,6 +1473,16 @@ int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) } /* --------------------------------------------------------------------------- + * Fetching the ThreadID from an StgTSO. + * + * This is used in the implementation of Show for ThreadIds. + * ------------------------------------------------------------------------ */ +int rts_getThreadId(const StgTSO *tso) +{ + return tso->id; +} + +/* --------------------------------------------------------------------------- Create a new thread. The new thread starts with the given stack size. Before the @@ -1339,7 +1550,7 @@ createThread_(nat size, rtsBool have_lock) tso = (StgTSO *)allocate(size); TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0); - SET_HDR(tso, &TSO_info, CCS_SYSTEM); + SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); #if defined(GRAN) SET_GRAN_HDR(tso, ThisPE); #endif @@ -1356,7 +1567,6 @@ createThread_(nat size, rtsBool have_lock) tso->why_blocked = NotBlocked; tso->blocked_exceptions = NULL; - tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS; tso->stack_size = stack_size; tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - TSO_STRUCT_SIZEW; @@ -1382,8 +1592,14 @@ createThread_(nat size, rtsBool have_lock) */ #endif -#if defined(GRAN) || defined(PAR) - DumpGranEvent(GR_START,tso); +#if defined(GRAN) + if (RtsFlags.GranFlags.GranSimStats.Full) + DumpGranEvent(GR_START,tso); +#elif defined(PAR) + if (RtsFlags.ParFlags.ParStats.Full) + DumpGranEvent(GR_STARTQ,tso); + /* HACk to avoid SCHEDULE + LastTSO = tso; */ #endif /* Link the new thread on the global thread list. @@ -1391,6 +1607,10 @@ createThread_(nat size, rtsBool have_lock) tso->global_link = all_threads; all_threads = tso; +#if defined(DIST) + tso->dist.priority = MandatoryPriority; //by default that is... +#endif + #if defined(GRAN) tso->gran.pri = pri; # if defined(DEBUG) @@ -1439,6 +1659,13 @@ createThread_(nat size, rtsBool have_lock) globalGranStats.threads_created_on_PE[CurrentProc]++; globalGranStats.tot_sq_len += spark_queue_len(CurrentProc); globalGranStats.tot_sq_probes++; +#elif defined(PAR) + // collect parallel global statistics (currently done together with GC stats) + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); + globalParStats.tot_threads_created++; + } #endif #if defined(GRAN) @@ -1456,6 +1683,36 @@ createThread_(nat size, rtsBool have_lock) return tso; } +#if defined(PAR) +/* RFP: + all parallel thread creation calls should fall through the following routine. +*/ +StgTSO * +createSparkThread(rtsSpark spark) +{ StgTSO *tso; + ASSERT(spark != (rtsSpark)NULL); + if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) + { threadsIgnored++; + barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)", + RtsFlags.ParFlags.maxThreads, advisory_thread_count); + return END_TSO_QUEUE; + } + else + { threadsCreated++; + tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); + if (tso==END_TSO_QUEUE) + barf("createSparkThread: Cannot create TSO"); +#if defined(DIST) + tso->priority = AdvisoryPriority; +#endif + pushClosure(tso,spark); + PUSH_ON_RUN_QUEUE(tso); + advisory_thread_count++; + } + return tso; +} +#endif + /* Turn a spark into a thread. ToDo: fix for SMP (needs to acquire SCHED_MUTEX!) @@ -1466,22 +1723,13 @@ StgTSO * activateSpark (rtsSpark spark) { StgTSO *tso; - - ASSERT(spark != (rtsSpark)NULL); - tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); - if (tso!=END_TSO_QUEUE) { - pushClosure(tso,spark); - PUSH_ON_RUN_QUEUE(tso); - advisory_thread_count++; - if (RtsFlags.ParFlags.ParStats.Full) { - //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ... - IF_PAR_DEBUG(verbose, - belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread", - (StgClosure *)spark, info_type((StgClosure *)spark))); - } - } else { - barf("activateSpark: Cannot create TSO"); + tso = createSparkThread(spark); + if (RtsFlags.ParFlags.ParStats.Full) { + //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ... + IF_PAR_DEBUG(verbose, + belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread", + (StgClosure *)spark, info_type((StgClosure *)spark))); } // ToDo: fwd info on local/global spark to thread -- HWL // tso->gran.exported = spark->exported; @@ -1535,10 +1783,10 @@ scheduleThread(StgTSO *tso) * ------------------------------------------------------------------------ */ #if defined(PAR) || defined(SMP) -void * -taskStart( void *arg STG_UNUSED ) +void +taskStart(void) /* ( void *arg STG_UNUSED) */ { - rts_evalNothing(NULL); + scheduleThread(END_TSO_QUEUE); } #endif @@ -1564,7 +1812,15 @@ term_handler(int sig STG_UNUSED) } #endif -//@cindex initScheduler +static void +initCapability( Capability *cap ) +{ + cap->f.stgChk0 = (F_)__stg_chk_0; + cap->f.stgChk1 = (F_)__stg_chk_1; + cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; + cap->f.stgUpdatePAP = (F_)__stg_update_PAP; +} + void initScheduler(void) { @@ -1577,12 +1833,14 @@ initScheduler(void) blocked_queue_hds[i] = END_TSO_QUEUE; blocked_queue_tls[i] = END_TSO_QUEUE; ccalling_threadss[i] = END_TSO_QUEUE; + sleeping_queue = END_TSO_QUEUE; } #else run_queue_hd = END_TSO_QUEUE; run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; + sleeping_queue = END_TSO_QUEUE; #endif suspended_ccalling_threads = END_TSO_QUEUE; @@ -1593,10 +1851,8 @@ initScheduler(void) context_switch = 0; interrupted = 0; -#ifdef INTERPRETER - ecafList = END_ECAF_LIST; - clearECafTable(); -#endif + RtsFlags.ConcFlags.ctxtSwitchTicks = + RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; /* Install the SIGHUP handler */ #ifdef SMP @@ -1621,6 +1877,7 @@ initScheduler(void) prev = NULL; for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities"); + initCapability(cap); cap->link = prev; prev = cap; } @@ -1629,6 +1886,8 @@ initScheduler(void) } IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities);); +#else + initCapability(&MainCapability); #endif #if defined(SMP) || defined(PAR) @@ -1735,6 +1994,8 @@ howManyThreadsAvail ( void ) i++; for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link) i++; + for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link) + i++; return i; } @@ -1748,9 +2009,13 @@ finishAllThreads ( void ) while (blocked_queue_hd != END_TSO_QUEUE) { waitThread ( blocked_queue_hd, NULL ); } + while (sleeping_queue != END_TSO_QUEUE) { + waitThread ( blocked_queue_hd, NULL ); + } } while (blocked_queue_hd != END_TSO_QUEUE || - run_queue_hd != END_TSO_QUEUE); + run_queue_hd != END_TSO_QUEUE || + sleeping_queue != END_TSO_QUEUE); } SchedulerStatus @@ -1773,7 +2038,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->link = main_threads; main_threads = m; - IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", + IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", m->tso->id)); #ifdef SMP @@ -1798,7 +2063,7 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) pthread_cond_destroy(&m->wakeup); #endif - IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", + IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", m->tso->id)); free(m); @@ -1916,6 +2181,7 @@ take_off_run_queue(StgTSO *tso) { - all the threads on the runnable queue - all the threads on the blocked queue + - all the threads on the sleeping queue - all the thread currently executing a _ccall_GC - all the "main threads" @@ -1926,7 +2192,8 @@ take_off_run_queue(StgTSO *tso) { KH @ 25/10/99 */ -static void GetRoots(void) +void +GetRoots(evac_fn evac) { StgMainThread *m; @@ -1935,16 +2202,16 @@ static void GetRoots(void) nat i; for (i=0; i<=RtsFlags.GranFlags.proc; i++) { if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) - run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]); + evac((StgClosure **)&run_queue_hds[i]); if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) - run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]); + evac((StgClosure **)&run_queue_tls[i]); if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) - blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]); + evac((StgClosure **)&blocked_queue_hds[i]); if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) - blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]); + evac((StgClosure **)&blocked_queue_tls[i]); if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) - ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]); + evac((StgClosure **)&ccalling_threads[i]); } } @@ -1952,27 +2219,31 @@ static void GetRoots(void) #else /* !GRAN */ if (run_queue_hd != END_TSO_QUEUE) { - ASSERT(run_queue_tl != END_TSO_QUEUE); - run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); - run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); + ASSERT(run_queue_tl != END_TSO_QUEUE); + evac((StgClosure **)&run_queue_hd); + evac((StgClosure **)&run_queue_tl); } - + if (blocked_queue_hd != END_TSO_QUEUE) { - ASSERT(blocked_queue_tl != END_TSO_QUEUE); - blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd); - blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl); + ASSERT(blocked_queue_tl != END_TSO_QUEUE); + evac((StgClosure **)&blocked_queue_hd); + evac((StgClosure **)&blocked_queue_tl); + } + + if (sleeping_queue != END_TSO_QUEUE) { + evac((StgClosure **)&sleeping_queue); } #endif for (m = main_threads; m != NULL; m = m->link) { - m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso); + evac((StgClosure **)&m->tso); + } + if (suspended_ccalling_threads != END_TSO_QUEUE) { + evac((StgClosure **)&suspended_ccalling_threads); } - if (suspended_ccalling_threads != END_TSO_QUEUE) - suspended_ccalling_threads = - (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads); #if defined(SMP) || defined(PAR) || defined(GRAN) - markSparkQueue(); + markSparkQueue(evac); #endif } @@ -1989,7 +2260,7 @@ static void GetRoots(void) This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ -void (*extra_roots)(void); +void (*extra_roots)(evac_fn); void performGC(void) @@ -2004,17 +2275,16 @@ performMajorGC(void) } static void -AllRoots(void) +AllRoots(evac_fn evac) { - GetRoots(); /* the scheduler's roots */ - extra_roots(); /* the user's roots */ + GetRoots(evac); // the scheduler's roots + extra_roots(evac); // the user's roots } void -performGCWithRoots(void (*get_roots)(void)) +performGCWithRoots(void (*get_roots)(evac_fn)) { extra_roots = get_roots; - GarbageCollect(AllRoots,rtsFalse); } @@ -2044,13 +2314,8 @@ threadStackOverflow(StgTSO *tso) printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, tso->sp+64))); -#ifdef INTERPRETER - fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" ); - exit(1); -#else /* Send this thread the StackOverflow exception */ raiseAsync(tso, (StgClosure *)stackOverflow_closure); -#endif return tso; } @@ -2064,7 +2329,7 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0); @@ -2079,11 +2344,10 @@ threadStackOverflow(StgTSO *tso) diff = (P_)new_sp - (P_)tso->sp; /* In *words* */ dest->su = (StgUpdateFrame *) ((P_)dest->su + diff); dest->sp = new_sp; - dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso); dest->stack_size = new_stack_size; /* and relocate the update frame list */ - relocate_TSO(tso, dest); + relocate_stack(dest, diff); /* Mark the old TSO as relocated. We have to check for relocated * TSOs in the garbage collector and any primops that deal with TSOs. @@ -2121,8 +2385,6 @@ threadStackOverflow(StgTSO *tso) Wake up a queue that was blocked on some resource. ------------------------------------------------------------------------ */ -/* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */ - #if defined(GRAN) static inline void unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) @@ -2136,8 +2398,10 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) update blocked and fetch time (depending on type of the orig closure) */ if (RtsFlags.ParFlags.ParStats.Full) { DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, + GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, 0, 0 /* spark_queue_len(ADVISORY_POOL) */); + if (EMPTY_RUN_QUEUE()) + emitSchedule = rtsTrue; switch (get_itbl(node)->type) { case FETCH_ME_BQ: @@ -2148,6 +2412,10 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) case BLACKHOLE_BQ: ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; break; +#ifdef DIST + case MVAR: + break; +#endif default: barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue"); } @@ -2217,8 +2485,8 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) case BLOCKED_FETCH: /* if it's a BLOCKED_FETCH put it on the PendingFetches list */ next = bqe->link; - bqe->link = PendingFetches; - PendingFetches = bqe; + bqe->link = (StgBlockingQueueElement *)PendingFetches; + PendingFetches = (StgBlockedFetch *)bqe; break; # if defined(DEBUG) @@ -2226,9 +2494,9 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) see comments on RBHSave closures above */ case CONSTR: /* check that the closure is an RBHSave closure */ - ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info || - get_itbl((StgClosure *)bqe) == &RBH_Save_1_info || - get_itbl((StgClosure *)bqe) == &RBH_Save_2_info); + ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info || + get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info || + get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info); break; default: @@ -2237,7 +2505,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) (StgClosure *)bqe); # endif } - // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id)); + IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe))); return next; } @@ -2287,13 +2555,14 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) nat len = 0; IF_GRAN_DEBUG(bq, - belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \ + belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \ node, CurrentProc, CurrentTime[CurrentProc], CurrentTSO->id, CurrentTSO)); node_loc = where_is(node); - ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave + ASSERT(q == END_BQ_QUEUE || + get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave get_itbl(q)->type == CONSTR); // closure (type constructor) ASSERT(is_unique(node)); @@ -2363,15 +2632,23 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) { - StgBlockingQueueElement *bqe, *next; + StgBlockingQueueElement *bqe; ACQUIRE_LOCK(&sched_mutex); IF_PAR_DEBUG(verbose, - belch("## AwBQ for node %p on [%x]: ", + belch("##-_ AwBQ for node %p on [%x]: ", node, mytid)); - - ASSERT(get_itbl(q)->type == TSO || +#ifdef DIST + //RFP + if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) { + IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)")); + return; + } +#endif + + ASSERT(q == END_BQ_QUEUE || + get_itbl(q)->type == TSO || get_itbl(q)->type == BLOCKED_FETCH || get_itbl(q)->type == CONSTR); @@ -2478,6 +2755,12 @@ unblockThread(StgTSO *tso) StgTSO *target = tso->block_info.tso; ASSERT(get_itbl(target)->type == TSO); + + if (target->what_next == ThreadRelocated) { + target = target->link; + ASSERT(get_itbl(target)->type == TSO); + } + ASSERT(target->blocked_exceptions != NULL); last = (StgBlockingQueueElement **)&target->blocked_exceptions; @@ -2493,10 +2776,10 @@ unblockThread(StgTSO *tso) barf("unblockThread (Exception): TSO not found"); } - case BlockedOnDelay: case BlockedOnRead: case BlockedOnWrite: { + /* take TSO off blocked_queue */ StgBlockingQueueElement *prev = NULL; for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; prev = t, t = t->link) { @@ -2518,6 +2801,24 @@ unblockThread(StgTSO *tso) barf("unblockThread (I/O): TSO not found"); } + case BlockedOnDelay: + { + /* take TSO off sleeping_queue */ + StgBlockingQueueElement *prev = NULL; + for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; + prev = t, t = t->link) { + if (t == (StgBlockingQueueElement *)tso) { + if (prev == NULL) { + sleeping_queue = (StgTSO *)t->link; + } else { + prev->link = t->link; + } + goto done; + } + } + barf("unblockThread (I/O): TSO not found"); + } + default: barf("unblockThread"); } @@ -2582,6 +2883,12 @@ unblockThread(StgTSO *tso) StgTSO *target = tso->block_info.tso; ASSERT(get_itbl(target)->type == TSO); + + while (target->what_next == ThreadRelocated) { + target = target->link; + ASSERT(get_itbl(target)->type == TSO); + } + ASSERT(target->blocked_exceptions != NULL); last = &target->blocked_exceptions; @@ -2596,7 +2903,6 @@ unblockThread(StgTSO *tso) barf("unblockThread (Exception): TSO not found"); } - case BlockedOnDelay: case BlockedOnRead: case BlockedOnWrite: { @@ -2621,6 +2927,23 @@ unblockThread(StgTSO *tso) barf("unblockThread (I/O): TSO not found"); } + case BlockedOnDelay: + { + StgTSO *prev = NULL; + for (t = sleeping_queue; t != END_TSO_QUEUE; + prev = t, t = t->link) { + if (t == tso) { + if (prev == NULL) { + sleeping_queue = t->link; + } else { + prev->link = t->link; + } + goto done; + } + } + barf("unblockThread (I/O): TSO not found"); + } + default: barf("unblockThread"); } @@ -2694,11 +3017,11 @@ raiseAsync(StgTSO *tso, StgClosure *exception) * returns to the next return address on the stack. */ if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) { - *(--sp) = (W_)&dummy_ret_closure; + *(--sp) = (W_)&stg_dummy_ret_closure; } while (1) { - int words = ((P_)su - (P_)sp) - 1; + nat words = ((P_)su - (P_)sp) - 1; nat i; StgAP_UPD * ap; @@ -2713,11 +3036,11 @@ raiseAsync(StgTSO *tso, StgClosure *exception) */ ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2); TICK_ALLOC_UPD_PAP(3,0); - SET_HDR(ap,&PAP_info,cf->header.prof.ccs); + SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs); ap->n_args = 2; ap->fun = cf->handler; /* :: Exception -> IO a */ - ap->payload[0] = (P_)exception; + ap->payload[0] = exception; ap->payload[1] = ARG_TAG(0); /* realworld token */ /* throw away the stack from Sp up to and including the @@ -2734,7 +3057,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) * unblockAsyncExceptions_ret stack frame. */ if (!cf->exceptions_blocked) { - *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info; + *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info; } /* Ensure that async exceptions are blocked when running the handler. @@ -2765,14 +3088,14 @@ raiseAsync(StgTSO *tso, StgClosure *exception) ap->fun = (StgClosure *)sp[0]; sp++; for(i=0; i < (nat)words; ++i) { - ap->payload[i] = (P_)*sp++; + ap->payload[i] = (StgClosure *)*sp++; } switch (get_itbl(su)->type) { case UPDATE_FRAME: { - SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); + SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); TICK_ALLOC_UP_THK(words+1,0); IF_DEBUG(scheduler, @@ -2785,14 +3108,24 @@ raiseAsync(StgTSO *tso, StgClosure *exception) /* Replace the updatee with an indirection - happily * this will also wake up any threads currently * waiting on the result. + * + * Warning: if we're in a loop, more than one update frame on + * the stack may point to the same object. Be careful not to + * overwrite an IND_OLDGEN in this case, because we'll screw + * up the mutable lists. To be on the safe side, don't + * overwrite any kind of indirection at all. See also + * threadSqueezeStack in GC.c, where we have to make a similar + * check. */ - UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */ + if (!closure_IND(su->updatee)) { + UPD_IND_NOLOCK(su->updatee,ap); /* revert the black hole */ + } su = su->link; sp += sizeofW(StgUpdateFrame) -1; sp[0] = (W_)ap; /* push onto stack */ break; } - + case CATCH_FRAME: { StgCatchFrame *cf = (StgCatchFrame *)su; @@ -2801,13 +3134,13 @@ raiseAsync(StgTSO *tso, StgClosure *exception) /* We want a PAP, not an AP_UPD. Fortunately, the * layout's the same. */ - SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */); + SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */); TICK_ALLOC_UPD_PAP(words+1,0); /* now build o = FUN(catch,ap,handler) */ o = (StgClosure *)allocate(sizeofW(StgClosure)+2); TICK_ALLOC_FUN(2,0); - SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */); + SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */); o->payload[0] = (StgClosure *)ap; o->payload[1] = cf->handler; @@ -2828,13 +3161,13 @@ raiseAsync(StgTSO *tso, StgClosure *exception) StgSeqFrame *sf = (StgSeqFrame *)su; StgClosure* o; - SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */); + SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */); TICK_ALLOC_UPD_PAP(words+1,0); /* now build o = FUN(seq,ap) */ o = (StgClosure *)allocate(sizeofW(StgClosure)+1); TICK_ALLOC_SE_THK(1,0); - SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */); + SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */); o->payload[0] = (StgClosure *)ap; IF_DEBUG(scheduler, @@ -2857,7 +3190,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) tso->su = (StgUpdateFrame *)(sp+1); tso->sp = sp; return; - + default: barf("raiseAsync"); } @@ -2922,6 +3255,11 @@ detectBlackHoles( void ) for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { + while (t->what_next == ThreadRelocated) { + t = t->link; + ASSERT(get_itbl(t)->type == TSO); + } + if (t->why_blocked != BlockedOnBlackHole) { continue; } @@ -2955,7 +3293,7 @@ detectBlackHoles( void ) break; } - done: + done: ; } } @@ -2973,39 +3311,34 @@ printThreadBlockage(StgTSO *tso) { switch (tso->why_blocked) { case BlockedOnRead: - fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd); + fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd); break; case BlockedOnWrite: - fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd); + fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd); break; case BlockedOnDelay: -#if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS) - fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay); -#else - fprintf(stderr,"blocked on delay of %d ms", - tso->block_info.target - getourtimeofday()); -#endif + fprintf(stderr,"is blocked until %d", tso->block_info.target); break; case BlockedOnMVar: - fprintf(stderr,"blocked on an MVar"); + fprintf(stderr,"is blocked on an MVar"); break; case BlockedOnException: - fprintf(stderr,"blocked on delivering an exception to thread %d", + fprintf(stderr,"is blocked on delivering an exception to thread %d", tso->block_info.tso->id); break; case BlockedOnBlackHole: - fprintf(stderr,"blocked on a black hole"); + fprintf(stderr,"is blocked on a black hole"); break; case NotBlocked: - fprintf(stderr,"not blocked"); + fprintf(stderr,"is not blocked"); break; #if defined(PAR) case BlockedOnGA: - fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)", + fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)", tso->block_info.closure, info_type(tso->block_info.closure)); break; case BlockedOnGA_NoSend: - fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)", + fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)", tso->block_info.closure, info_type(tso->block_info.closure)); break; #endif @@ -3035,9 +3368,24 @@ printAllThreads(void) { StgTSO *t; +# if defined(GRAN) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(TIME_ON_PROC(CurrentProc), + time_string, rtsFalse/*no commas!*/); + + sched_belch("all threads at [%s]:", time_string); +# elif defined(PAR) + char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN]; + ullong_format_string(CURRENT_TIME, + time_string, rtsFalse/*no commas!*/); + + sched_belch("all threads at [%s]:", time_string); +# else sched_belch("all threads:"); +# endif + for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) { - fprintf(stderr, "\tthread %d is ", t->id); + fprintf(stderr, "\tthread %d ", t->id); printThreadStatus(t); fprintf(stderr,"\n"); } @@ -3061,27 +3409,41 @@ print_bq (StgClosure *node) /* should cover all closures that may have a blocking queue */ ASSERT(get_itbl(node)->type == BLACKHOLE_BQ || get_itbl(node)->type == FETCH_ME_BQ || - get_itbl(node)->type == RBH); + get_itbl(node)->type == RBH || + get_itbl(node)->type == MVAR); ASSERT(node!=(StgClosure*)NULL); // sanity check + + print_bqe(((StgBlockingQueue*)node)->blocking_queue); +} + +/* + Print a whole blocking queue starting with the element bqe. +*/ +void +print_bqe (StgBlockingQueueElement *bqe) +{ + rtsBool end; + /* NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure; */ - for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE); + for (end = (bqe==END_BQ_QUEUE); !end; // iterate until bqe points to a CONSTR - end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) { - ASSERT(bqe != END_BQ_QUEUE); // sanity check - ASSERT(bqe != (StgTSO*)NULL); // sanity check + end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), + bqe = end ? END_BQ_QUEUE : bqe->link) { + ASSERT(bqe != END_BQ_QUEUE); // sanity check + ASSERT(bqe != (StgBlockingQueueElement *)NULL); // sanity check /* types of closures that may appear in a blocking queue */ ASSERT(get_itbl(bqe)->type == TSO || get_itbl(bqe)->type == BLOCKED_FETCH || get_itbl(bqe)->type == CONSTR); /* only BQs of an RBH end with an RBH_Save closure */ - ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); + //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH); switch (get_itbl(bqe)->type) { case TSO: - fprintf(stderr," TSO %d (%x),", + fprintf(stderr," TSO %u (%x),", ((StgTSO *)bqe)->id, ((StgTSO *)bqe)); break; case BLOCKED_FETCH: @@ -3093,14 +3455,14 @@ print_bq (StgClosure *node) break; case CONSTR: fprintf(stderr," %s (IP %p),", - (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" : - get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" : - get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" : + (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : + get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : + get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : "RBH_Save_?"), get_itbl(bqe)); break; default: - barf("Unexpected closure type %s in blocking queue of %p (%s)", - info_type(bqe), node, info_type(node)); + barf("Unexpected closure type %s in blocking queue", // of %p (%s)", + info_type((StgClosure *)bqe)); // , node, info_type(node)); break; } } /* for */ @@ -3147,9 +3509,9 @@ print_bq (StgClosure *node) break; case CONSTR: fprintf(stderr," %s (IP %p),", - (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" : - get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" : - get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" : + (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" : + get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" : + get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" : "RBH_Save_?"), get_itbl(bqe)); break; default: @@ -3204,6 +3566,8 @@ sched_belch(char *s, ...) va_start(ap,s); #ifdef SMP fprintf(stderr, "scheduler (task %ld): ", pthread_self()); +#elif defined(PAR) + fprintf(stderr, "== "); #else fprintf(stderr, "scheduler: "); #endif