From 2942189980ad5c202c0d473bea80b387755497d1 Mon Sep 17 00:00:00 2001 From: simonmar Date: Fri, 14 Jan 2000 13:39:59 +0000 Subject: [PATCH] [project @ 2000-01-14 13:39:59 by simonmar] cleanup --- ghc/rts/Schedule.c | 376 ++++++---------------------------------------------- 1 file changed, 42 insertions(+), 334 deletions(-) diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 9fa7f6a..37eeda9 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.43 2000/01/14 13:17:16 hwloidl Exp $ + * $Id: Schedule.c,v 1.44 2000/01/14 13:39:59 simonmar Exp $ * * (c) The GHC Team, 1998-1999 * @@ -121,28 +121,10 @@ static StgMainThread *main_threads; * Locks required: sched_mutex. */ -#if DEBUG -char *whatNext_strs[] = { - "ThreadEnterGHC", - "ThreadRunGHC", - "ThreadEnterHugs", - "ThreadKilled", - "ThreadComplete" -}; - -char *threadReturnCode_strs[] = { - "HeapOverflow", /* might also be StackOverflow */ - "StackOverflow", - "ThreadYielding", - "ThreadBlocked", - "ThreadFinished" -}; -#endif - #if defined(GRAN) StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */ -// rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c +/* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */ /* In GranSim we have a runable and a blocked queue for each processor. @@ -276,16 +258,6 @@ StgTSO *MainTSO; //@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code //@subsection Prototypes -#if 0 && defined(GRAN) -// ToDo: replace these with macros -static /* inline */ void add_to_run_queue(StgTSO* tso); -static /* inline */ void push_on_run_queue(StgTSO* tso); -static /* inline */ StgTSO *take_off_run_queue(StgTSO *tso); - -/* Thread management */ -void initScheduler(void); -#endif - //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code //@subsection Main scheduling loop @@ -425,7 +397,9 @@ schedule( void ) if (spark == NULL) { break; /* no more sparks in the pool */ } else { - // I'd prefer this to be done in activateSpark -- HWL + /* I'd prefer this to be done in activateSpark -- HWL */ + /* tricky - it needs to hold the scheduler lock and + * not try to re-acquire it -- SDM */ StgTSO *tso; tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue); pushClosure(tso,spark); @@ -521,7 +495,7 @@ schedule( void ) #if defined(GRAN) # error ToDo: implement GranSim scheduler #elif defined(PAR) - // ToDo: phps merge with spark activation above + /* ToDo: phps merge with spark activation above */ /* check whether we have local work and send requests if we have none */ if (run_queue_hd == END_TSO_QUEUE) { /* no runnable threads */ /* :-[ no local threads => look out for local sparks */ @@ -544,10 +518,10 @@ schedule( void ) belch("== [%x] schedule: Created TSO %p (%d); %d threads active", mytid, tso, tso->id, advisory_thread_count)); - if (tso==END_TSO_QUEUE) { // failed to activate spark -> back to loop + if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */ belch("^^ failed to activate spark"); goto next_thread; - } // otherwise fall through & pick-up new tso + } /* otherwise fall through & pick-up new tso */ } else { IF_PAR_DEBUG(verbose, belch("^^ no local sparks (spark pool contains only NFs: %d)", @@ -1509,21 +1483,6 @@ take_off_run_queue(StgTSO *tso) { #endif /* 0 */ -nat -run_queue_len(void) -{ - nat i; - StgTSO *tso; - - for (i=0, tso=run_queue_hd; - tso != END_TSO_QUEUE; - i++, tso=tso->link) - /* nothing */ - - return i; -} - - //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code //@subsection Garbage Collextion Routines @@ -1547,29 +1506,26 @@ static void GetRoots(void) StgMainThread *m; #if defined(GRAN) - nat i; - - for (i=0; i<=RtsFlags.GranFlags.proc; i++) { - if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) - run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]); - if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) - run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]); - - if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) - blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]); - if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) - blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]); - if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) - ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]); + { + nat i; + for (i=0; i<=RtsFlags.GranFlags.proc; i++) { + if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) + run_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]); + if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) + run_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]); + + if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) + blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]); + if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) + blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]); + if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) + ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]); + } } markEventQueue(); -#elif defined(PAR) - run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); - run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); - blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd); - blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl); -#else + +#else /* !GRAN */ run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); @@ -1720,7 +1676,7 @@ threadStackOverflow(StgTSO *tso) Wake up a queue that was blocked on some resource. ------------------------------------------------------------------------ */ -// ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE +/* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */ #if defined(GRAN) static inline void @@ -2010,266 +1966,6 @@ awakenBlockedQueue(StgTSO *tso) } #endif -#if 0 -// ngoq ngo' - -#if defined(GRAN) -/* - Awakening a blocking queue in GranSim means checking for each of the - TSOs in the queue whether they are local or not, issuing a ResumeThread - or an UnblockThread event, respectively. The basic iteration over the - blocking queue is the same as in the standard setup. -*/ -void -awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node) -{ - StgBlockingQueueElement *bqe, *next; - StgTSO *tso; - PEs node_loc, tso_loc; - rtsTime bq_processing_time = 0; - nat len = 0, len_local = 0; - - IF_GRAN_DEBUG(bq, - belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \ - node, CurrentProc, CurrentTime[CurrentProc], - CurrentTSO->id, CurrentTSO)); - - node_loc = where_is(node); - - ASSERT(get_itbl(q)->type == TSO || // q is either a TSO or an RBHSave - get_itbl(q)->type == CONSTR); // closure (type constructor) - ASSERT(is_unique(node)); - - /* FAKE FETCH: magically copy the node to the tso's proc; - no Fetch necessary because in reality the node should not have been - moved to the other PE in the first place - */ - if (CurrentProc!=node_loc) { - IF_GRAN_DEBUG(bq, - belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)", - node, node_loc, CurrentProc, CurrentTSO->id, - // CurrentTSO, where_is(CurrentTSO), - node->header.gran.procs)); - node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc); - IF_GRAN_DEBUG(bq, - belch("## new bitmask of node %p is %#x", - node, node->header.gran.procs)); - if (RtsFlags.GranFlags.GranSimStats.Global) { - globalGranStats.tot_fake_fetches++; - } - } - - next = q; - // ToDo: check: ASSERT(CurrentProc==node_loc); - while (get_itbl(next)->type==TSO) { // q != END_TSO_QUEUE) { - bqe = next; - next = bqe->link; - /* - bqe points to the current element in the queue - next points to the next element in the queue - */ - tso = (StgTSO *)bqe; // wastes an assignment to get the type right - tso_loc = where_is(tso); - if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local - /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */ - ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc); - bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime; - // insertThread(tso, node_loc); - new_event(tso_loc, tso_loc, - CurrentTime[CurrentProc]+bq_processing_time, - ResumeThread, - tso, node, (rtsSpark*)NULL); - tso->link = END_TSO_QUEUE; // overwrite link just to be sure - len_local++; - len++; - } else { // TSO is remote (actually should be FMBQ) - bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime; - bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime; - new_event(tso_loc, CurrentProc, - CurrentTime[CurrentProc]+bq_processing_time+ - RtsFlags.GranFlags.Costs.latency, - UnblockThread, - tso, node, (rtsSpark*)NULL); - tso->link = END_TSO_QUEUE; // overwrite link just to be sure - bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime; - len++; - } - /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */ - IF_GRAN_DEBUG(bq, - fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,", - (node_loc==tso_loc ? "Local" : "Global"), - tso->id, tso, CurrentProc, tso->block_info.closure, tso->link)) - tso->block_info.closure = NULL; - IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", - tso->id, tso)); - } - - /* if this is the BQ of an RBH, we have to put back the info ripped out of - the closure to make room for the anchor of the BQ */ - if (next!=END_BQ_QUEUE) { - ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR); - /* - ASSERT((info_ptr==&RBH_Save_0_info) || - (info_ptr==&RBH_Save_1_info) || - (info_ptr==&RBH_Save_2_info)); - */ - /* cf. convertToRBH in RBH.c for writing the RBHSave closure */ - ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0]; - ((StgRBH *)node)->mut_link = ((StgRBHSave *)next)->payload[1]; - - IF_GRAN_DEBUG(bq, - belch("## Filled in RBH_Save for %p (%s) at end of AwBQ", - node, info_type(node))); - } - - /* statistics gathering */ - if (RtsFlags.GranFlags.GranSimStats.Global) { - globalGranStats.tot_bq_processing_time += bq_processing_time; - globalGranStats.tot_bq_len += len; // total length of all bqs awakened - globalGranStats.tot_bq_len_local += len_local; // same for local TSOs only - globalGranStats.tot_awbq++; // total no. of bqs awakened - } - IF_GRAN_DEBUG(bq, - fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n", - node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : "")); -} - -#elif defined(PAR) - -/* - Awakening a blocking queue in GUM has to check whether an entry in the - queue is a normal TSO or a BLOCKED_FETCH. The later indicates that a TSO is - waiting for the result of this computation on another PE. Thus, when - finding a BLOCKED_FETCH we have to send off a message to that PE. - Actually, we defer sending off a message, by just putting the BLOCKED_FETCH - onto the PendingFetches queue, which will be later traversed by - processFetches, sending off a RESUME message for each BLOCKED_FETCH. - - NB: There is no check for an RBHSave closure (type CONSTR) in the code - below. The reason is, if we awaken the BQ of an RBH closure (RBHSaves - only exist at the end of such BQs) we know that the closure has been - unpacked successfully on the other PE, and we can discard the info - contained in the RBHSave closure. The current closure will be turned - into a FetchMe closure anyway. -*/ -void -awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node) -{ - StgBlockingQueueElement *bqe, *next; - - IF_PAR_DEBUG(verbose, - belch("## AwBQ for node %p on [%x]: ", - node, mytid)); - - ASSERT(get_itbl(q)->type == TSO || - get_itbl(q)->type == BLOCKED_FETCH || - get_itbl(q)->type == CONSTR); - - next = q; - while (get_itbl(next)->type==TSO || - get_itbl(next)->type==BLOCKED_FETCH) { - bqe = next; - switch (get_itbl(bqe)->type) { - case TSO: - /* if it's a TSO just push it onto the run_queue */ - next = bqe->link; -#if defined(DEBUG) - ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging only -#endif - push_on_run_queue((StgTSO *)bqe); // HWL: was: PUSH_ON_RUN_QUEUE(tso); - - /* write RESUME events to log file and - update blocked and fetch time (depending on type of the orig closure) */ - if (RtsFlags.ParFlags.ParStats.Full) { - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, - 0, spark_queue_len(ADVISORY_POOL)); - - switch (get_itbl(node)->type) { - case FETCH_ME_BQ: - ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; - break; - case RBH: - case FETCH_ME: - case BLACKHOLE_BQ: - ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat; - break; - default: - barf("{awaken_blocked_queue}Daq Qagh: unexpected closure %p (%s) with blocking queue", - node, info_type(node)); - } - } - /* reset block_info.closure field after dumping event */ - ((StgTSO *)bqe)->block_info.closure = NULL; - - /* rest of this branch is debugging only */ - IF_PAR_DEBUG(verbose, - fprintf(stderr," TSO %d (%p) [PE %lx] (block_info.closure=%p) (next=%p) ,", - ((StgTSO *)bqe)->id, (StgTSO *)bqe, - mytid, ((StgTSO *)bqe)->block_info.closure, ((StgTSO *)bqe)->link)); - - IF_DEBUG(scheduler, - if (!RtsFlags.ParFlags.Debug.verbose) - belch("-- Waking up thread %ld (%p)", - ((StgTSO *)bqe)->id, (StgTSO *)bqe)); - break; - - case BLOCKED_FETCH: - /* if it's a BLOCKED_FETCH put it on the PendingFetches list */ - next = bqe->link; - bqe->link = PendingFetches; - PendingFetches = bqe; - // bqe.tso->block_info.closure = NULL; - - /* rest of this branch is debugging only */ - IF_PAR_DEBUG(verbose, - fprintf(stderr," BLOCKED_FETCH (%p) on node %p [PE %lx] (next=%p) ,", - ((StgBlockedFetch *)bqe), - ((StgBlockedFetch *)bqe)->node, - mytid, ((StgBlockedFetch *)bqe)->link)); - break; - -# if defined(DEBUG) - /* can ignore this case in a non-debugging setup; - see comments on RBHSave closures above */ - case CONSTR: - /* check that the closure is an RBHSave closure */ - ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info || - get_itbl((StgClosure *)bqe) == &RBH_Save_1_info || - get_itbl((StgClosure *)bqe) == &RBH_Save_2_info); - break; - - default: - barf("{awaken_blocked_queue}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n", - get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), - (StgClosure *)bqe); -# endif - } - } -} - -#else /* !GRAN && !PAR */ - -void -awaken_blocked_queue(StgTSO *q) { awakenBlockedQueue(q); } - -/* -{ - StgTSO *tso; - - while (q != END_TSO_QUEUE) { - ASSERT(get_itbl(q)->type == TSO); - tso = q; - q = tso->link; - push_on_run_queue(tso); // HWL: was: PUSH_ON_RUN_QUEUE(tso); - //tso->block_info.closure = NULL; - IF_DEBUG(scheduler, belch("-- Waking up thread %ld (%p)", tso->id, tso)); - } -} -*/ -#endif /* GRAN */ -#endif /* 0 */ - //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code //@subsection Exception Handling Routines @@ -2797,7 +2493,7 @@ print_bq (StgClosure *node) for (tso = ((StgBlockingQueue*)node)->blocking_queue; tso != END_TSO_QUEUE; tso=tso->link) { - ASSERT(tso!=(StgTSO*)NULL && tso!=END_TSO_QUEUE); // sanity check + ASSERT(tso!=NULL && tso!=END_TSO_QUEUE); // sanity check ASSERT(get_itbl(tso)->type == TSO); // guess what, sanity check fprintf(stderr," TSO %d (%p),", tso->id, tso); } @@ -2805,9 +2501,21 @@ print_bq (StgClosure *node) } # endif -/* A debugging function used all over the place in GranSim and GUM. - Dummy function in other setups. -*/ +#if defined(PAR) +static nat +run_queue_len(void) +{ + nat i; + StgTSO *tso; + + for (i=0, tso=run_queue_hd; + tso != END_TSO_QUEUE; + i++, tso=tso->link) + /* nothing */ + + return i; +} +#endif static void sched_belch(char *s, ...) -- 1.7.10.4