% % (c) The GRASP/AQUA Project, Glasgow University, 1992-1995 % %************************************************************************ %* * \section[Threads.lc]{Thread Control Routines} %* * %************************************************************************ %************************************************************************ % \subsection[thread-overview]{Overview of the Thread Management System} % %************************************************************************ %************************************************************************ % \subsection[thread-decls]{Thread Declarations} % %************************************************************************ % I haven't checked if GRAN can work with QP profiling. But as we use our % own profiling (GR profiling) that should be irrelevant. -- HWL \begin{code} #if defined(CONCURRENT) /* the whole module! */ # define NON_POSIX_SOURCE /* so says Solaris */ # include "rtsdefs.h" # include #include "LLC.h" #include "HLC.h" static void init_qp_profiling(STG_NO_ARGS); /* forward decl */ \end{code} @AvailableStack@ is used to determine whether an existing stack can be reused without new allocation, so reducing garbage collection, and stack setup time. At present, it is only used for the first stack chunk of a thread, the one that's got @RTSflags.ConcFlags.stkChunkSize@ words. \begin{code} P_ AvailableStack = Prelude_Z91Z93_closure; P_ AvailableTSO = Prelude_Z91Z93_closure; \end{code} Macros for dealing with the new and improved GA field for simulating parallel execution. Based on @CONCURRENT@ package. The GA field now contains a mask, where the n-th bit stands for the n-th processor, on which this data can be found. In case of multiple copies, several bits are set. The total number of processors is bounded by @MAX_PROC@, which should be <= the length of a word in bits. -- HWL {{GranSim.lc}Daq ngoq' roQlu'ta'} (Code has been moved to GranSim.lc). %**************************************************************** %* * \subsection[thread-getthread]{The Thread Scheduler} %* * %**************************************************************** This is the heart of the thread scheduling code. Most of the changes for GranSim are in this part of the RTS. Especially the @ReSchedule@ routine has been blown up quite a lot It now contains the top-level event-handling loop. Parts of the code that are not necessary for GranSim, but convenient to have when developing it are marked with a @GRAN_CHECK@ variable. \begin{code} STGRegisterTable *CurrentRegTable = NULL; P_ CurrentTSO = NULL; #if defined(GRAN) /* Only needed for GranSim Light; costs of operations during rescheduling are associated to the virtual processor on which ActiveTSO is living */ P_ ActiveTSO = NULL; rtsBool resched = rtsFalse; /* debugging only !!*/ /* Pointers to the head and tail of the runnable queues for each PE */ /* In GranSim Light only the thread/spark-queues of proc 0 are used */ P_ RunnableThreadsHd[MAX_PROC]; P_ RunnableThreadsTl[MAX_PROC]; P_ WaitThreadsHd[MAX_PROC]; P_ WaitThreadsTl[MAX_PROC]; sparkq PendingSparksHd[MAX_PROC][SPARK_POOLS]; sparkq PendingSparksTl[MAX_PROC][SPARK_POOLS]; /* One clock for each PE */ W_ CurrentTime[MAX_PROC]; /* Useful to restrict communication; cf fishing model in GUM */ I_ OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC]; /* Status of each PE (new since but independent of GranSim Light) */ enum proc_status procStatus[MAX_PROC]; #if defined(GRAN) && defined(GRAN_CHECK) /* To check if the RTS ever tries to run a thread that should be blocked because of fetching remote data */ P_ BlockedOnFetch[MAX_PROC]; #endif W_ SparksAvail = 0; /* How many sparks are available */ W_ SurplusThreads = 0; /* How many excess threads are there */ TIME SparkStealTime(); # else /* !GRAN */ P_ RunnableThreadsHd = Prelude_Z91Z93_closure; P_ RunnableThreadsTl = Prelude_Z91Z93_closure; P_ WaitingThreadsHd = Prelude_Z91Z93_closure; P_ WaitingThreadsTl = Prelude_Z91Z93_closure; TYPE_OF_SPARK PendingSparksBase[SPARK_POOLS]; TYPE_OF_SPARK PendingSparksLim[SPARK_POOLS]; TYPE_OF_SPARK PendingSparksHd[SPARK_POOLS]; TYPE_OF_SPARK PendingSparksTl[SPARK_POOLS]; #endif /* GRAN ; HWL */ static jmp_buf scheduler_loop; I_ required_thread_count = 0; I_ advisory_thread_count = 0; EXTFUN(resumeThread); /* Misc prototypes */ #if defined(GRAN) P_ NewThread PROTO((P_, W_, I_)); I_ blockFetch PROTO((P_, PROC, P_)); I_ HandleFetchRequest PROTO((P_, PROC, P_)); rtsBool InsertThread PROTO((P_ tso)); sparkq delete_from_spark_queue PROTO((sparkq, sparkq)); sparkq prev, spark; #else P_ NewThread PROTO((P_, W_)); #endif I_ context_switch = 0; I_ contextSwitchTime = 10000; I_ threadId = 0; /* NB: GRAN and GUM use different representations of spark pools. GRAN sparks are more flexible (containing e.g. granularity info) but slower than GUM sparks. There is no fixed upper bound on the number of GRAN sparks either. -- HWL */ #if !defined(GRAN) I_ sparksIgnored =0; I_ SparkLimit[SPARK_POOLS]; rtsBool initThreadPools(STG_NO_ARGS) { I_ i, size = RTSflags.ConcFlags.maxLocalSparks; SparkLimit[ADVISORY_POOL] = SparkLimit[REQUIRED_POOL] = size; if ((PendingSparksBase[ADVISORY_POOL] = (TYPE_OF_SPARK) malloc(size * SIZE_OF_SPARK)) == NULL) return rtsFalse; if ((PendingSparksBase[REQUIRED_POOL] = (TYPE_OF_SPARK) malloc(size * SIZE_OF_SPARK)) == NULL) return rtsFalse; PendingSparksLim[ADVISORY_POOL] = PendingSparksBase[ADVISORY_POOL] + size; PendingSparksLim[REQUIRED_POOL] = PendingSparksBase[REQUIRED_POOL] + size; return rtsTrue; } #endif /* !GRAN */ #ifdef PAR rtsBool sameThread; #endif void ScheduleThreads(topClosure) P_ topClosure; { #ifdef GRAN I_ i; #endif P_ tso; #if defined(PROFILING) || defined(PAR) if (time_profiling || RTSflags.ConcFlags.ctxtSwitchTime > 0) { if (initialize_virtual_timer(RTSflags.CcFlags.msecsPerTick)) { #else if (RTSflags.ConcFlags.ctxtSwitchTime > 0) { if (initialize_virtual_timer(RTSflags.ConcFlags.ctxtSwitchTime)) { #endif fflush(stdout); fprintf(stderr, "Can't initialize virtual timer.\n"); EXIT(EXIT_FAILURE); } } else context_switch = 0 /* 1 HWL */; # if defined(GRAN_CHECK) && defined(GRAN) /* HWL */ if ( RTSflags.GranFlags.Light && RTSflags.GranFlags.proc!=1 ) { fprintf(stderr,"Qagh: In GrAnSim Light setup .proc must be 1\n"); EXIT(EXIT_FAILURE); } if ( RTSflags.GranFlags.debug & 0x40 ) { fprintf(stderr,"Doing init in ScheduleThreads now ...\n"); } # endif #if defined(GRAN) /* KH */ /* Init thread and spark queues on all processors */ for (i=0; i Event Queue is now:\n"); GEQ(); } */ } #endif #ifdef PAR if (PendingFetches != Prelude_Z91Z93_closure) { processFetches(); } #elif defined(GRAN) if (ThreadQueueHd == Prelude_Z91Z93_closure) { fprintf(stderr, "Qu'vatlh! No runnable threads!\n"); EXIT(EXIT_FAILURE); } if (DO_QP_PROF > 1 && CurrentTSO != ThreadQueueHd) { QP_Event1("AG", ThreadQueueHd); } #else while (RunnableThreadsHd == Prelude_Z91Z93_closure) { /* If we've no work */ if (WaitingThreadsHd == Prelude_Z91Z93_closure) { fflush(stdout); fprintf(stderr, "No runnable threads!\n"); EXIT(EXIT_FAILURE); } /* Block indef. waiting for I/O and timer expire */ AwaitEvent(0); } #endif #ifdef PAR if (RunnableThreadsHd == Prelude_Z91Z93_closure) { if (advisory_thread_count < RTSflags.ConcFlags.maxThreads && (PendingSparksHd[REQUIRED_POOL] < PendingSparksTl[REQUIRED_POOL] || PendingSparksHd[ADVISORY_POOL] < PendingSparksTl[ADVISORY_POOL])) { /* * If we're here (no runnable threads) and we have pending * sparks, we must have a space problem. Get enough space * to turn one of those pending sparks into a * thread... ReallyPerformGC doesn't return until the * space is available, so it may force global GC. ToDo: * Is this unnecessary here? Duplicated in ReSchedule()? * --JSM */ ReallyPerformThreadGC(THREAD_SPACE_REQUIRED, rtsTrue); SAVE_Hp -= THREAD_SPACE_REQUIRED; } else { /* * We really have absolutely no work. Send out a fish * (there may be some out there already), and wait for * something to arrive. We clearly can't run any threads * until a SCHEDULE or RESUME arrives, and so that's what * we're hoping to see. (Of course, we still have to * respond to other types of messages.) */ if (!fishing) sendFish(choosePE(), mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, NEW_FISH_HUNGER); processMessages(); } ReSchedule(0); } else if (PacketsWaiting()) { /* Look for incoming messages */ processMessages(); } #endif /* PAR */ #if !defined(GRAN) if (DO_QP_PROF > 1 && CurrentTSO != RunnableThreadsHd) { QP_Event1("AG", RunnableThreadsHd); } #endif #ifdef PAR if (RTSflags.ParFlags.granSimStats && !sameThread) DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd); #endif #if defined(GRAN) TimeOfNextEvent = get_time_of_next_event(); CurrentTSO = ThreadQueueHd; if (RTSflags.GranFlags.Light) { /* Save time of `virt. proc' which was active since last getevent and restore time of `virt. proc' where CurrentTSO is living on. */ if(RTSflags.GranFlags.DoFairSchedule) { if (RTSflags.GranFlags.granSimStats && RTSflags.GranFlags.debug & 0x20000) DumpGranEvent(GR_SYSTEM_END,ActiveTSO); } TSO_CLOCK(ActiveTSO) = CurrentTime[CurrentProc]; ActiveTSO = NULL; CurrentTime[CurrentProc] = TSO_CLOCK(CurrentTSO); if(RTSflags.GranFlags.DoFairSchedule && resched ) { resched = rtsFalse; if (RTSflags.GranFlags.granSimStats && RTSflags.GranFlags.debug & 0x20000) DumpGranEvent(GR_SCHEDULE,ThreadQueueHd); } /* if (TSO_LINK(ThreadQueueHd)!=Prelude_Z91Z93_closure && (TimeOfNextEvent == 0 || TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000 0) context_switch = 0; #if defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */ if (CurrentTSO == Prelude_Z91Z93_closure) { fprintf(stderr,"Qagh: Trying to execute Prelude_Z91Z93_closure on proc %d (@ %d)\n", CurrentProc,CurrentTime[CurrentProc]); EXIT(EXIT_FAILURE); } if (RTSflags.GranFlags.debug & 0x04) { if (BlockedOnFetch[CurrentProc]) { fprintf(stderr,"Qagh: Trying to execute TSO 0x%x on proc %d (@ %d) which is blocked-on-fetch by TSO 0x%x\n", CurrentTSO,CurrentProc,CurrentTime[CurrentProc],BlockedOnFetch[CurrentProc]); EXIT(EXIT_FAILURE); } } if ( (RTSflags.GranFlags.debug & 0x10) && (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) ) { fprintf(stderr,"Qagh: Trying to execute TSO 0x%x on proc %d (@ %d) which should be asleep!\n", CurrentTSO,CurrentProc,CurrentTime[CurrentProc]); EXIT(EXIT_FAILURE); } #endif #if 0 && defined(CONCURRENT) fprintf(stderr, "ScheduleThreads: About to resume thread:%#x\n", CurrentTSO); #endif miniInterpret((StgFunPtr)resumeThread); } \end{code} % Some remarks on GrAnSim -- HWL The ReSchedule fct is the heart of GrAnSim. Based on its parameter it issues a CONTINUETRHEAD to carry on executing the current thread in due course or it watches out for new work (e.g. called from EndThread). Then it picks the next event (get_next_event) and handles it appropriately (see switch construct). Note that a continue in the switch causes the next event to be handled and a break causes a jmp to the scheduler_loop where the TSO at the head of the current processor's runnable queue is executed. ReSchedule is mostly entered from HpOverflow.lc:PerformReSchedule which is itself called via the GRAN_RESCHEDULE macro in the compiler generated code. \begin{code} /* GrAnSim rules here! Others stay out or you will be crashed. Concurrent and parallel guys: please use the next door (a few pages down; turn left at the !GRAN sign). */ #if defined(GRAN) /* Prototypes of event handling functions. Only needed in ReSchedule */ void do_the_globalblock PROTO((eventq event)); void do_the_unblock PROTO((eventq event)); void do_the_fetchnode PROTO((eventq event)); void do_the_fetchreply PROTO((eventq event)); void do_the_movethread PROTO((eventq event)); void do_the_movespark PROTO((eventq event)); void gimme_spark PROTO((rtsBool *found_res, sparkq *prev_res, sparkq *spark_res)); void munch_spark PROTO((rtsBool found, sparkq prev, sparkq spark)); void ReSchedule(what_next) int what_next; /* Run the current thread again? */ { sparkq spark, nextspark; P_ tso; P_ node, closure; eventq event; int rc; # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.debug & 0x80 ) { fprintf(stderr,"Entering ReSchedule with mode %u; tso is\n",what_next); G_TSO(ThreadQueueHd,1); } # endif # if defined(GRAN_CHECK) && defined(GRAN) if ( (RTSflags.GranFlags.debug & 0x80) || (RTSflags.GranFlags.debug & 0x40 ) ) if (what_nextEND_OF_WORLD) fprintf(stderr,"Qagh {ReSchedule}Daq: illegal parameter %u for what_next\n", what_next); # endif if (RTSflags.GranFlags.Light) { /* Save current time; GranSim Light only */ TSO_CLOCK(CurrentTSO) = CurrentTime[CurrentProc]; } /* Run the current thread again (if there is one) */ if(what_next==SAME_THREAD && ThreadQueueHd != Prelude_Z91Z93_closure) { /* A bit of a hassle if the event queue is empty, but ... */ CurrentTSO = ThreadQueueHd; resched = rtsFalse; if (RTSflags.GranFlags.Light && TSO_LINK(ThreadQueueHd)!=Prelude_Z91Z93_closure && TSO_CLOCK(ThreadQueueHd)>TSO_CLOCK(TSO_LINK(ThreadQueueHd))) { if(RTSflags.GranFlags.granSimStats && RTSflags.GranFlags.debug & 0x20000 ) DumpGranEvent(GR_DESCHEDULE,ThreadQueueHd); resched = rtsTrue; ThreadQueueHd = TSO_LINK(CurrentTSO); if (ThreadQueueHd==Prelude_Z91Z93_closure) ThreadQueueTl=Prelude_Z91Z93_closure; TSO_LINK(CurrentTSO) = Prelude_Z91Z93_closure; InsertThread(CurrentTSO); } /* This code does round-Robin, if preferred. */ if(!RTSflags.GranFlags.Light && RTSflags.GranFlags.DoFairSchedule && TSO_LINK(CurrentTSO) != Prelude_Z91Z93_closure && CurrentTime[CurrentProc]>=EndOfTimeSlice) { ThreadQueueHd = TSO_LINK(CurrentTSO); TSO_LINK(ThreadQueueTl) = CurrentTSO; ThreadQueueTl = CurrentTSO; TSO_LINK(CurrentTSO) = Prelude_Z91Z93_closure; CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_threadcontextswitchtime; if ( RTSflags.GranFlags.granSimStats ) DumpGranEvent(GR_SCHEDULE,ThreadQueueHd); CurrentTSO = ThreadQueueHd; } new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], CONTINUETHREAD,CurrentTSO,Prelude_Z91Z93_closure,NULL); } /* Schedule `next thread' which is at ThreadQueueHd now i.e. thread queue */ /* has been updated before that already. */ else if(what_next==NEW_THREAD && ThreadQueueHd != Prelude_Z91Z93_closure) { # if defined(GRAN_CHECK) && defined(GRAN) fprintf(stderr,"Qagh: ReSchedule(NEW_THREAD) shouldn't be used with DoReScheduleOnFetch!!\n"); EXIT(EXIT_FAILURE); # endif if(RTSflags.GranFlags.granSimStats && (!RTSflags.GranFlags.Light || RTSflags.GranFlags.debug & 0x20000) ) DumpGranEvent(GR_SCHEDULE,ThreadQueueHd); CurrentTSO = ThreadQueueHd; new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], CONTINUETHREAD,CurrentTSO,Prelude_Z91Z93_closure,NULL); CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_threadcontextswitchtime; } /* We go in here if the current thread is blocked on fetch => don'd CONT */ else if(what_next==CHANGE_THREAD) { /* just fall into event handling loop for next event */ } /* We go in here if we have no runnable threads or what_next==0 */ else { procStatus[CurrentProc] = Idle; /* That's now done in HandleIdlePEs! new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], FINDWORK,Prelude_Z91Z93_closure,Prelude_Z91Z93_closure,NULL); */ CurrentTSO = Prelude_Z91Z93_closure; } /* ----------------------------------------------------------------- */ /* This part is the EVENT HANDLING LOOP */ /* ----------------------------------------------------------------- */ do { /* Choose the processor with the next event */ event = get_next_event(); CurrentProc = EVENT_PROC(event); CurrentTSO = EVENT_TSO(event); if (RTSflags.GranFlags.Light) { P_ tso; W_ tmp; /* Restore local clock of the virtual processor attached to CurrentTSO. All costs will be associated to the `virt. proc' on which the tso is living. */ if (ActiveTSO != NULL) { /* already in system area */ TSO_CLOCK(ActiveTSO) = CurrentTime[CurrentProc]; if (RTSflags.GranFlags.DoFairSchedule) { if (RTSflags.GranFlags.granSimStats && RTSflags.GranFlags.debug & 0x20000) DumpGranEvent(GR_SYSTEM_END,ActiveTSO); } } switch (EVENT_TYPE(event)) { case CONTINUETHREAD: case FINDWORK: /* inaccurate this way */ ActiveTSO = ThreadQueueHd; break; case RESUMETHREAD: case STARTTHREAD: case MOVESPARK: /* has tso of virt proc in tso field of event */ ActiveTSO = EVENT_TSO(event); break; default: fprintf(stderr,"Illegal event type %s (%d) in GrAnSim Light setup\n", event_names[EVENT_TYPE(event)],EVENT_TYPE(event)); EXIT(EXIT_FAILURE); } CurrentTime[CurrentProc] = TSO_CLOCK(ActiveTSO); if(RTSflags.GranFlags.DoFairSchedule) { if (RTSflags.GranFlags.granSimStats && RTSflags.GranFlags.debug & 0x20000) DumpGranEvent(GR_SYSTEM_START,ActiveTSO); } } if(EVENT_TIME(event) > CurrentTime[CurrentProc] && EVENT_TYPE(event)!=CONTINUETHREAD) CurrentTime[CurrentProc] = EVENT_TIME(event); # if defined(GRAN_CHECK) && defined(GRAN) /* HWL */ if ( RTSflags.GranFlags.Light && CurrentProc!=0 ) { fprintf(stderr,"Qagh {ReSchedule}Daq: CurrentProc must be 0 in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif /* MAKE_BUSY(CurrentProc); don't think that's right in all cases now */ /* -- HWL */ # if defined(GRAN_CHECK) && defined(GRAN) if (RTSflags.GranFlags.debug & 0x80) fprintf(stderr,"After get_next_event, before HandleIdlePEs\n"); # endif /* Deal with the idlers */ if ( !RTSflags.GranFlags.Light ) HandleIdlePEs(); # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.event_trace_all || ( RTSflags.GranFlags.event_trace && EVENT_TYPE(event) != CONTINUETHREAD) || (RTSflags.GranFlags.debug & 0x80) ) print_event(event); # endif switch (EVENT_TYPE(event)) { /* Should just be continuing execution */ case CONTINUETHREAD: # if defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */ if ( (RTSflags.GranFlags.debug & 0x100) && (EVENT_TSO(event)!=RunnableThreadsHd[EVENT_PROC(event)]) ) { fprintf(stderr,"Warning: Wrong TSO in CONTINUETHREAD: %#lx (%x) (PE: %d Hd: 0x%lx)\n", EVENT_TSO(event), TSO_ID(EVENT_TSO(event)), EVENT_PROC(event), RunnableThreadsHd[EVENT_PROC(event)]); } if ( (RTSflags.GranFlags.debug & 0x04) && BlockedOnFetch[CurrentProc]) { fprintf(stderr,"Warning: Discarding CONTINUETHREAD on blocked proc %u @ %u\n", CurrentProc,CurrentTime[CurrentProc]); print_event(event); continue; } # endif if(ThreadQueueHd==Prelude_Z91Z93_closure) { new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], FINDWORK,Prelude_Z91Z93_closure,Prelude_Z91Z93_closure,NULL); continue; /* Catches superfluous CONTINUEs -- should be unnecessary */ } else break; /* fall into scheduler loop */ case FETCHNODE: do_the_fetchnode(event); continue; /* handle next event in event queue */ case GLOBALBLOCK: do_the_globalblock(event); continue; /* handle next event in event queue */ case FETCHREPLY: do_the_fetchreply(event); continue; /* handle next event in event queue */ case UNBLOCKTHREAD: /* Move from the blocked queue to the tail of */ do_the_unblock(event); continue; /* handle next event in event queue */ case RESUMETHREAD: /* Move from the blocked queue to the tail of */ /* the runnable queue ( i.e. Qu' SImqa'lu') */ TSO_BLOCKTIME(EVENT_TSO(event)) += CurrentTime[CurrentProc] - TSO_BLOCKEDAT(EVENT_TSO(event)); StartThread(event,GR_RESUME); continue; case STARTTHREAD: StartThread(event,GR_START); continue; case MOVETHREAD: do_the_movethread(event); continue; /* handle next event in event queue */ case MOVESPARK: do_the_movespark(event); continue; /* handle next event in event queue */ case FINDWORK: { /* Make sure that we have enough heap for creating a new thread. This is a conservative estimate of the required heap. This eliminates special checks for GC around NewThread within munch_spark. */ I_ req_heap = TSO_HS + TSO_CTS_SIZE + STKO_HS + RTSflags.ConcFlags.stkChunkSize; if (SAVE_Hp + req_heap >= SAVE_HpLim ) { ReallyPerformThreadGC(req_heap, rtsFalse); SAVE_Hp -= req_heap; if (IS_SPARKING(CurrentProc)) MAKE_IDLE(CurrentProc); continue; } } if( RTSflags.GranFlags.DoAlwaysCreateThreads || (ThreadQueueHd == Prelude_Z91Z93_closure && (RTSflags.GranFlags.FetchStrategy >= 2 || OutstandingFetches[CurrentProc] == 0)) ) { rtsBool found; sparkq prev, spark; /* ToDo: check */ ASSERT(procStatus[CurrentProc]==Sparking || RTSflags.GranFlags.DoAlwaysCreateThreads); /* SImmoHwI' yInej! Search spark queue! */ gimme_spark (&found, &prev, &spark); /* DaH chu' Qu' yIchen! Now create new work! */ munch_spark (found, prev, spark); /* ToDo: check ; not valid if GC occurs in munch_spark ASSERT(procStatus[CurrentProc]==Starting || procStatus[CurrentProc]==Idle || RTSflags.GranFlags.DoAlwaysCreateThreads); */ } continue; /* to the next event */ default: fprintf(stderr,"Illegal event type %u\n",EVENT_TYPE(event)); continue; } /* switch */ longjmp(scheduler_loop, 1); } while(1); } /* ----------------------------------------------------------------- */ /* The main event handling functions; called from ReSchedule (switch) */ /* ----------------------------------------------------------------- */ void do_the_globalblock(eventq event) { PROC proc = EVENT_PROC(event); /* proc that requested node */ P_ tso = EVENT_TSO(event), /* tso that requested node */ node = EVENT_NODE(event); /* requested, remote node */ # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh: There should be no GLOBALBLOCKs in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } if (!RTSflags.GranFlags.DoGUMMFetching) { fprintf(stderr,"Qagh: GLOBALBLOCK events only valid with GUMM fetching\n"); EXIT(EXIT_FAILURE); } if ( (RTSflags.GranFlags.debug & 0x100) && IS_LOCAL_TO(PROCS(node),proc) ) { fprintf(stderr,"Qagh: GLOBALBLOCK: Blocking on LOCAL node 0x %x (PE %d).\n", node,proc); } # endif /* CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_munpacktime; */ if ( blockFetch(tso,proc,node) != 0 ) return; /* node has become local by now */ if (!RTSflags.GranFlags.DoReScheduleOnFetch) { /* head of queue is next thread */ P_ tso = RunnableThreadsHd[proc]; /* awaken next thread */ if(tso != Prelude_Z91Z93_closure) { new_event(proc,proc,CurrentTime[proc], CONTINUETHREAD,tso,Prelude_Z91Z93_closure,NULL); CurrentTime[proc] += RTSflags.GranFlags.gran_threadcontextswitchtime; if(RTSflags.GranFlags.granSimStats) DumpRawGranEvent(proc,CurrentProc,GR_SCHEDULE,tso, Prelude_Z91Z93_closure,0); MAKE_BUSY(proc); /* might have been fetching */ } else { MAKE_IDLE(proc); /* no work on proc now */ } } else { /* RTSflags.GranFlags.DoReScheduleOnFetch i.e. block-on-fetch */ /* other thread is already running */ /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL new_event(proc,proc,CurrentTime[proc], CONTINUETHREAD,EVENT_TSO(event), (RTSflags.GranFlags.DoGUMMFetching ? closure : EVENT_NODE(event)),NULL); */ } } void do_the_unblock(eventq event) { PROC proc = EVENT_PROC(event), /* proc that requested node */ creator = EVENT_CREATOR(event); /* proc that requested node */ P_ tso = EVENT_TSO(event), /* tso that requested node */ node = EVENT_NODE(event); /* requested, remote node */ # if defined(GRAN) && defined(GRAN_CHECK) if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh: There should be no UNBLOCKs in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif if (!RTSflags.GranFlags.DoReScheduleOnFetch) { /* block-on-fetch */ /* We count block-on-fetch as normal block time */ TSO_BLOCKTIME(tso) += CurrentTime[proc] - TSO_BLOCKEDAT(tso); /* No costs for contextswitch or thread queueing in this case */ if(RTSflags.GranFlags.granSimStats) DumpRawGranEvent(proc,CurrentProc,GR_RESUME,tso, Prelude_Z91Z93_closure,0); new_event(proc,proc,CurrentTime[proc],CONTINUETHREAD,tso,node,NULL); } else { /* Reschedule on fetch causes additional costs here: */ /* Bring the TSO from the blocked queue into the threadq */ new_event(proc,proc,CurrentTime[proc]+RTSflags.GranFlags.gran_threadqueuetime, RESUMETHREAD,tso,node,NULL); } } void do_the_fetchnode(eventq event) { I_ rc; # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh: There should be no FETCHNODEs in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } if (RTSflags.GranFlags.SimplifiedFetch) { fprintf(stderr,"Qagh: FETCHNODE events not valid with simplified fetch\n"); EXIT(EXIT_FAILURE); } # endif CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_munpacktime; do { rc = HandleFetchRequest(EVENT_NODE(event), EVENT_CREATOR(event), EVENT_TSO(event)); if (rc == 4) { /* trigger GC */ # if defined(GRAN_CHECK) && defined(GRAN) if (RTSflags.GcFlags.giveStats) fprintf(RTSflags.GcFlags.statsFile,"***** veQ boSwI' PackNearbyGraph(node %#lx, tso %#lx (%x))\n", EVENT_NODE(event), EVENT_TSO(event), TSO_ID(EVENT_TSO(event))); # endif prepend_event(event); ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse); # if defined(GRAN_CHECK) && defined(GRAN) if (RTSflags.GcFlags.giveStats) { fprintf(RTSflags.GcFlags.statsFile,"***** SAVE_Hp=%#lx, SAVE_HpLim=%#lx, PACK_HEAP_REQUIRED=%#lx\n", SAVE_Hp, SAVE_HpLim, PACK_HEAP_REQUIRED); fprintf(stderr,"***** No. of packets so far: %d (total size: %d)\n", tot_packets,tot_packet_size); } # endif event = grab_event(); SAVE_Hp -= PACK_HEAP_REQUIRED; /* GC knows that events are special and follows the pointer i.e. */ /* events are valid even if they moved. An EXIT is triggered */ /* if there is not enough heap after GC. */ } } while (rc == 4); } void do_the_fetchreply(eventq event) { P_ tso, closure; # if defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */ if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh: There should be no FETCHREPLYs in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } if (RTSflags.GranFlags.SimplifiedFetch) { fprintf(stderr,"Qagh: FETCHREPLY events not valid with simplified fetch\n"); EXIT(EXIT_FAILURE); } if (RTSflags.GranFlags.debug & 0x10) { if (TSO_TYPE(EVENT_TSO(event)) & FETCH_MASK_TSO) { TSO_TYPE(EVENT_TSO(event)) &= ~FETCH_MASK_TSO; } else { fprintf(stderr,"Qagh: FETCHREPLY: TSO %#x (%x) has fetch mask not set @ %d\n", CurrentTSO,TSO_ID(CurrentTSO),CurrentTime[CurrentProc]); EXIT(EXIT_FAILURE); } } if (RTSflags.GranFlags.debug & 0x04) { if (BlockedOnFetch[CurrentProc]!=ThreadQueueHd) { fprintf(stderr,"Qagh: FETCHREPLY: Proc %d (with TSO %#x (%x)) not blocked-on-fetch by TSO %#lx (%x)\n", CurrentProc,CurrentTSO,TSO_ID(CurrentTSO), BlockedOnFetch[CurrentProc], TSO_ID(BlockedOnFetch[CurrentProc])); EXIT(EXIT_FAILURE); } else { BlockedOnFetch[CurrentProc] = 0; /*- rtsFalse; -*/ } } # endif CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_munpacktime; if (RTSflags.GranFlags.DoGUMMFetching) { /* bulk (packet) fetching */ P_ buffer = EVENT_NODE(event); PROC p = EVENT_PROC(event); I_ size = buffer[PACK_SIZE_LOCN]; tso = EVENT_TSO(event); /* NB: Fetch misses can't occur with GUMM fetching, as */ /* updatable closure are turned into RBHs and therefore locked */ /* for other processors that try to grab them. */ closure = UnpackGraph(buffer); CurrentTime[CurrentProc] += size * RTSflags.GranFlags.gran_munpacktime; } else /* Copy or move node to CurrentProc */ if (FetchNode(EVENT_NODE(event), EVENT_CREATOR(event), EVENT_PROC(event)) ) { /* Fetch has failed i.e. node has been grabbed by another PE */ P_ node = EVENT_NODE(event), tso = EVENT_TSO(event); PROC p = where_is(node); TIME fetchtime; # if defined(GRAN_CHECK) && defined(GRAN) if (RTSflags.GranFlags.PrintFetchMisses) { fprintf(stderr,"Fetch miss @ %lu: node %#lx is at proc %u (rather than proc %u)\n", CurrentTime[CurrentProc],node,p,EVENT_CREATOR(event)); fetch_misses++; } # endif /* GRAN_CHECK */ CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mpacktime; /* Count fetch again !? */ ++TSO_FETCHCOUNT(tso); TSO_FETCHTIME(tso) += RTSflags.GranFlags.gran_fetchtime; fetchtime = STG_MAX(CurrentTime[CurrentProc],CurrentTime[p]) + RTSflags.GranFlags.gran_latency; /* Chase the grabbed node */ new_event(p,CurrentProc,fetchtime,FETCHNODE,tso,node,NULL); # if defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */ if (RTSflags.GranFlags.debug & 0x04) BlockedOnFetch[CurrentProc] = tso; /*-rtsTrue;-*/ if (RTSflags.GranFlags.debug & 0x10) TSO_TYPE(tso) |= FETCH_MASK_TSO; # endif CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mtidytime; return; /* NB: no REPLy has been processed; tso still sleeping */ } /* -- Qapla'! Fetch has been successful; node is here, now */ ++TSO_FETCHCOUNT(EVENT_TSO(event)); TSO_FETCHTIME(EVENT_TSO(event)) += RTSflags.GranFlags.gran_fetchtime; if (RTSflags.GranFlags.granSimStats) DumpRawGranEvent(CurrentProc,EVENT_CREATOR(event),GR_REPLY, EVENT_TSO(event), (RTSflags.GranFlags.DoGUMMFetching ? closure : EVENT_NODE(event)), 0); --OutstandingFetches[CurrentProc]; ASSERT(OutstandingFetches[CurrentProc] >= 0); # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */ if (OutstandingFetches[CurrentProc] < 0) { fprintf(stderr,"Qagh: OutstandingFetches of proc %u has become negative\n",CurrentProc); EXIT(EXIT_FAILURE); } # endif new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], UNBLOCKTHREAD,EVENT_TSO(event), (RTSflags.GranFlags.DoGUMMFetching ? closure : EVENT_NODE(event)), NULL); } void do_the_movethread(eventq event) { P_ tso = EVENT_TSO(event); # if defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */ if ( RTSflags.GranFlags.Light && CurrentProc!=1 ) { fprintf(stderr,"Qagh: There should be no MOVETHREADs in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } if (!RTSflags.GranFlags.DoThreadMigration) { fprintf(stderr,"Qagh: MOVETHREAD events should never occur without -bM\n"); EXIT(EXIT_FAILURE); } if (PROCS(tso)!=0) { fprintf(stderr,"Qagh: Moved thread has a bitmask of 0%o (proc %d); should be 0\n", PROCS(tso), where_is(tso)); EXIT(EXIT_FAILURE); } # endif --OutstandingFishes[CurrentProc]; ASSERT(OutstandingFishes[CurrentProc]>=0); SET_PROCS(tso,ThisPE); CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_munpacktime; StartThread(event,GR_STOLEN); } void do_the_movespark(eventq event){ sparkq spark = EVENT_SPARK(event); CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_munpacktime; if (RTSflags.GranFlags.granSimStats_Sparks) DumpRawGranEvent(CurrentProc,(PROC)0,SP_ACQUIRED,Prelude_Z91Z93_closure, SPARK_NODE(spark), spark_queue_len(CurrentProc,ADVISORY_POOL)); #if defined(GRAN) && defined(GRAN_CHECK) if (!SHOULD_SPARK(SPARK_NODE(spark))) withered_sparks++; /* Not adding the spark to the spark queue would be the right */ /* thing here, but it also would be cheating, as this info can't be */ /* available in a real system. -- HWL */ #endif --OutstandingFishes[CurrentProc]; ASSERT(OutstandingFishes[CurrentProc]>=0); add_to_spark_queue(spark); if (procStatus[CurrentProc]==Fishing) procStatus[CurrentProc] = Idle; /* add_to_spark_queue will increase the time of the current proc. */ /* Just falling into FINDWORK is wrong as we might have other */ /* events that are happening before that. Therefore, just create */ /* a FINDWORK event and go back to main event handling loop. */ /* Should we treat stolen sparks specially? Currently, we don't. */ #if 0 /* Now FINDWORK is created in HandleIdlePEs */ new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], FINDWORK,Prelude_Z91Z93_closure,Prelude_Z91Z93_closure,NULL); sparking[CurrentProc]=rtsTrue; #endif } /* Search the spark queue of the CurrentProc for a spark that's worth turning into a thread */ void gimme_spark (rtsBool *found_res, sparkq *prev_res, sparkq *spark_res) { P_ node; rtsBool found; sparkq spark_of_non_local_node = NULL, spark_of_non_local_node_prev = NULL, low_priority_spark = NULL, low_priority_spark_prev = NULL, spark = NULL, prev = NULL, tmp = NULL; /* Choose a spark from the local spark queue */ spark = SparkQueueHd; found = rtsFalse; while (spark != NULL && !found) { node = SPARK_NODE(spark); if (!SHOULD_SPARK(node)) { if(RTSflags.GranFlags.granSimStats_Sparks) DumpRawGranEvent(CurrentProc,(PROC)0,SP_PRUNED,Prelude_Z91Z93_closure, SPARK_NODE(spark), spark_queue_len(CurrentProc,ADVISORY_POOL)); ASSERT(spark != NULL); --SparksAvail; spark = delete_from_spark_queue (prev,spark); } /* -- node should eventually be sparked */ else if (RTSflags.GranFlags.PreferSparksOfLocalNodes && !IS_LOCAL_TO(PROCS(node),CurrentProc)) { /* Remember first low priority spark */ if (spark_of_non_local_node==NULL) { spark_of_non_local_node_prev = prev; spark_of_non_local_node = spark; } if (SPARK_NEXT(spark)==NULL) { ASSERT(spark==SparkQueueTl); /* just for testing */ prev = spark_of_non_local_node_prev; spark = spark_of_non_local_node; found = rtsTrue; break; } # if defined(GRAN) && defined(GRAN_CHECK) /* Should never happen; just for testing */ if (spark==SparkQueueTl) { fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n"); EXIT(EXIT_FAILURE); } # endif prev = spark; spark = SPARK_NEXT(spark); --SparksAvail; } else if ( RTSflags.GranFlags.DoPrioritySparking || (SPARK_GRAN_INFO(spark)>=RTSflags.GranFlags.SparkPriority2) ) { found = rtsTrue; } else /* only used if SparkPriority2 is defined */ { /* Remember first low priority spark */ if (low_priority_spark==NULL) { low_priority_spark_prev = prev; low_priority_spark = spark; } if (SPARK_NEXT(spark)==NULL) { ASSERT(spark==SparkQueueTl); /* just for testing */ prev = low_priority_spark_prev; spark = low_priority_spark; found = rtsTrue; /* take low pri spark => rc is 2 */ break; } /* Should never happen; just for testing */ if (spark==SparkQueueTl) { fprintf(stderr,"ReSchedule: Last spark != SparkQueueTl\n"); EXIT(EXIT_FAILURE); break; } prev = spark; spark = SPARK_NEXT(spark); # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.debug & 0x40 ) { fprintf(stderr,"Ignoring spark of priority %u (SparkPriority=%u); node=0x%lx; name=%u\n", SPARK_GRAN_INFO(spark), RTSflags.GranFlags.SparkPriority, SPARK_NODE(spark), SPARK_NAME(spark)); } # endif /* GRAN_CHECK */ } } /* while (spark!=NULL && !found) */ *spark_res = spark; *prev_res = prev; *found_res = found; } void munch_spark (rtsBool found, sparkq prev, sparkq spark) { P_ tso, node; /* We've found a node; now, create thread (DaH Qu' yIchen) */ if (found) { # if defined(GRAN_CHECK) && defined(GRAN) if ( SPARK_GRAN_INFO(spark) < RTSflags.GranFlags.SparkPriority2 ) { tot_low_pri_sparks++; if ( RTSflags.GranFlags.debug & 0x40 ) { fprintf(stderr,"GRAN_TNG: No high priority spark available; low priority (%u) spark chosen: node=0x%lx; name=%u\n", SPARK_GRAN_INFO(spark), SPARK_NODE(spark), SPARK_NAME(spark)); } } # endif CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_threadcreatetime; node = SPARK_NODE(spark); if((tso = NewThread(node, T_REQUIRED, SPARK_GRAN_INFO(spark)))==NULL) { /* Some kind of backoff needed here in case there's too little heap */ # if defined(GRAN_CHECK) && defined(GRAN) if (RTSflags.GcFlags.giveStats) fprintf(RTSflags.GcFlags.statsFile,"***** vIS Qu' chen veQ boSwI'; spark=%#x, node=%#x; name=%u\n", /* (found==2 ? "no hi pri spark" : "hi pri spark"), */ spark, node,SPARK_NAME(spark)); # endif new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc]+1, FINDWORK,Prelude_Z91Z93_closure,Prelude_Z91Z93_closure,NULL); ReallyPerformThreadGC(TSO_HS+TSO_CTS_SIZE,rtsFalse); SAVE_Hp -= TSO_HS+TSO_CTS_SIZE; spark = NULL; return; /* was: continue; */ /* to the next event, eventually */ } if(RTSflags.GranFlags.granSimStats_Sparks) DumpRawGranEvent(CurrentProc,(PROC)0,SP_USED,Prelude_Z91Z93_closure, SPARK_NODE(spark), spark_queue_len(CurrentProc,ADVISORY_POOL)); TSO_EXPORTED(tso) = SPARK_EXPORTED(spark); TSO_LOCKED(tso) = !SPARK_GLOBAL(spark); TSO_SPARKNAME(tso) = SPARK_NAME(spark); new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], STARTTHREAD,tso,node,NULL); procStatus[CurrentProc] = Starting; ASSERT(spark != NULL); spark = delete_from_spark_queue (prev, spark); } else /* !found */ /* Make the PE idle if nothing sparked and we have no threads. */ { if(ThreadQueueHd == Prelude_Z91Z93_closure) { MAKE_IDLE(CurrentProc); # if defined(GRAN_CHECK) && defined(GRAN) if ( (RTSflags.GranFlags.debug & 0x80) ) fprintf(stderr,"Warning in FINDWORK handling: No work found for PROC %u\n",CurrentProc); # endif /* GRAN_CHECK */ } #if 0 else /* ut'lu'Qo' ; Don't think that's necessary any more -- HWL new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], CONTINUETHREAD,ThreadQueueHd,Prelude_Z91Z93_closure,NULL); */ #endif } } \end{code} Here follows the non-GRAN @ReSchedule@. \begin{code} #else /* !GRAN */ /* If you are concurrent and maybe even parallel please use this door. */ void ReSchedule(again) int again; /* Run the current thread again? */ { P_ spark; PP_ sparkp; P_ tso; #ifdef PAR /* * In the parallel world, we do unfair scheduling for the moment. * Ultimately, this should all be merged with the more * sophisticated GrAnSim scheduling options. (Of course, some * provision should be made for *required* threads to make sure * that they don't starve, but for now we assume that no one is * running concurrent Haskell on a multi-processor platform.) */ sameThread = again; if (again) { if (RunnableThreadsHd == Prelude_Z91Z93_closure) RunnableThreadsTl = CurrentTSO; TSO_LINK(CurrentTSO) = RunnableThreadsHd; RunnableThreadsHd = CurrentTSO; } #else /* * In the sequential world, we assume that the whole point of running * the threaded build is for concurrent Haskell, so we provide round-robin * scheduling. */ if (again) { if(RunnableThreadsHd == Prelude_Z91Z93_closure) { RunnableThreadsHd = CurrentTSO; } else { TSO_LINK(RunnableThreadsTl) = CurrentTSO; if (DO_QP_PROF > 1) { QP_Event1("GA", CurrentTSO); } } RunnableThreadsTl = CurrentTSO; } #endif #if 1 /* * Debugging code, which is useful enough (and cheap enough) to compile * in all the time. This makes sure that we don't access saved registers, * etc. in threads which are supposed to be sleeping. */ CurrentTSO = Prelude_Z91Z93_closure; CurrentRegTable = NULL; #endif /* First the required sparks */ for (sparkp = PendingSparksHd[REQUIRED_POOL]; sparkp < PendingSparksTl[REQUIRED_POOL]; sparkp++) { spark = *sparkp; if (SHOULD_SPARK(spark)) { if ((tso = NewThread(spark, T_REQUIRED)) == NULL) break; if (RunnableThreadsHd == Prelude_Z91Z93_closure) { RunnableThreadsHd = tso; #ifdef PAR if (RTSflags.ParFlags.granSimStats) { DumpGranEvent(GR_START, tso); sameThread = rtsTrue; } #endif } else { TSO_LINK(RunnableThreadsTl) = tso; #ifdef PAR if (RTSflags.ParFlags.granSimStats) DumpGranEvent(GR_STARTQ, tso); #endif } RunnableThreadsTl = tso; } else { if (DO_QP_PROF) QP_Event0(threadId++, spark); #if 0 /* ToDo: Fix log entries for pruned sparks in GUM -- HWL */ if(RTSflags.GranFlags.granSimStats_Sparks) DumpGranEvent(SP_PRUNED,threadId++); ^^^^^^^^ should be a TSO #endif } } PendingSparksHd[REQUIRED_POOL] = sparkp; /* Now, almost the same thing for advisory sparks */ for (sparkp = PendingSparksHd[ADVISORY_POOL]; sparkp < PendingSparksTl[ADVISORY_POOL]; sparkp++) { spark = *sparkp; if (SHOULD_SPARK(spark)) { if ( #ifdef PAR /* In the parallel world, don't create advisory threads if we are * about to rerun the same thread, or already have runnable threads, * or the main thread has terminated */ (RunnableThreadsHd != Prelude_Z91Z93_closure || (required_thread_count == 0 && IAmMainThread)) || #endif advisory_thread_count == RTSflags.ConcFlags.maxThreads || (tso = NewThread(spark, T_ADVISORY)) == NULL) break; advisory_thread_count++; if (RunnableThreadsHd == Prelude_Z91Z93_closure) { RunnableThreadsHd = tso; #ifdef PAR if (RTSflags.ParFlags.granSimStats) { DumpGranEvent(GR_START, tso); sameThread = rtsTrue; } #endif } else { TSO_LINK(RunnableThreadsTl) = tso; #ifdef PAR if (RTSflags.ParFlags.granSimStats) DumpGranEvent(GR_STARTQ, tso); #endif } RunnableThreadsTl = tso; } else { if (DO_QP_PROF) QP_Event0(threadId++, spark); #if 0 /* ToDo: Fix log entries for pruned sparks in GUM -- HWL */ if(RTSflags.GranFlags.granSimStats_Sparks) DumpGranEvent(SP_PRUNED,threadId++); ^^^^^^^^ should be a TSO #endif } } PendingSparksHd[ADVISORY_POOL] = sparkp; #ifndef PAR longjmp(scheduler_loop, required_thread_count == 0 ? -1 : 1); #else longjmp(scheduler_loop, required_thread_count == 0 && IAmMainThread ? -1 : 1); #endif } #endif /* !GRAN */ \end{code} %**************************************************************************** % \subsection[thread-gransim-execution]{Starting, Idling and Migrating Threads (GrAnSim only)} % %**************************************************************************** Thread start, idle and migration code for GrAnSim (i.e. simulating multiple processors). \begin{code} #if defined(GRAN) /* ngoqvam che' {GrAnSim}! */ # if defined(GRAN_CHECK) /* This routine is only used for keeping a statistics of thread queue lengths to evaluate the impact of priority scheduling. -- HWL {spark_queue_len}vo' jInIHta' */ I_ thread_queue_len(PROC proc) { P_ prev, next; I_ len; for (len = 0, prev = Prelude_Z91Z93_closure, next = RunnableThreadsHd[proc]; next != Prelude_Z91Z93_closure; len++, prev = next, next = TSO_LINK(prev)) {} return (len); } # endif /* GRAN_CHECK */ \end{code} A large portion of @StartThread@ deals with maintaining a sorted thread queue, which is needed for the Priority Sparking option. Without that complication the code boils down to FIFO handling. \begin{code} StartThread(event,event_type) eventq event; enum gran_event_types event_type; { P_ tso = EVENT_TSO(event), node = EVENT_NODE(event); PROC proc = EVENT_PROC(event), creator = EVENT_CREATOR(event); P_ prev, next; I_ count = 0; rtsBool found = rtsFalse; ASSERT(CurrentProc==proc); # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.Light && CurrentProc!=0 ) { fprintf(stderr,"Qagh {StartThread}Daq: CurrentProc must be 0 in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } /* A wee bit of statistics gathering */ ++tot_add_threads; tot_tq_len += thread_queue_len(CurrentProc); # endif ASSERT(TSO_LINK(CurrentTSO)==Prelude_Z91Z93_closure); /* Idle proc; same for pri spark and basic version */ if(ThreadQueueHd==Prelude_Z91Z93_closure) { CurrentTSO = ThreadQueueHd = ThreadQueueTl = tso; CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_threadqueuetime; new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], CONTINUETHREAD,tso,Prelude_Z91Z93_closure,NULL); if(RTSflags.GranFlags.granSimStats && !( (event_type == GR_START || event_type == GR_STARTQ) && RTSflags.GranFlags.labelling) ) DumpRawGranEvent(CurrentProc,creator,event_type, tso,node, TSO_SPARKNAME(tso)); /* ^^^ SN (spark name) as optional info */ /* spark_queue_len(CurrentProc,ADVISORY_POOL)); */ /* ^^^ spark length as optional info */ ASSERT(IS_IDLE(CurrentProc) || event_type==GR_RESUME || (procStatus[CurrentProc]==Fishing && event_type==GR_STOLEN) || procStatus[CurrentProc]==Starting); MAKE_BUSY(CurrentProc); return; } /* In GrAnSim-Light we always have an idle `virtual' proc. The semantics of the one-and-only thread queue is different here: all threads in the queue are running (each on its own virtual processor); the queue is only needed internally in the simulator to interleave the reductions of the different processors. The one-and-only thread queue is sorted by the local clocks of the TSOs. */ if(RTSflags.GranFlags.Light) { ASSERT(ThreadQueueHd!=Prelude_Z91Z93_closure); ASSERT(TSO_LINK(tso)==Prelude_Z91Z93_closure); /* If only one thread in queue so far we emit DESCHEDULE in debug mode */ if(RTSflags.GranFlags.granSimStats && (RTSflags.GranFlags.debug & 0x20000) && TSO_LINK(ThreadQueueHd)==Prelude_Z91Z93_closure) { DumpRawGranEvent(CurrentProc,CurrentProc,GR_DESCHEDULE, ThreadQueueHd,Prelude_Z91Z93_closure,0); resched = rtsTrue; } if ( InsertThread(tso) ) { /* new head of queue */ new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc], CONTINUETHREAD,tso,Prelude_Z91Z93_closure,NULL); } if(RTSflags.GranFlags.granSimStats && !(( event_type == GR_START || event_type == GR_STARTQ) && RTSflags.GranFlags.labelling) ) DumpRawGranEvent(CurrentProc,creator,event_type, tso,node, TSO_SPARKNAME(tso)); /* ^^^ SN (spark name) as optional info */ /* spark_queue_len(CurrentProc,ADVISORY_POOL)); */ /* ^^^ spark length as optional info */ /* MAKE_BUSY(CurrentProc); */ return; } /* Only for Pri Sparking */ if (RTSflags.GranFlags.DoPriorityScheduling && TSO_PRI(tso)!=0) /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */ for (prev = ThreadQueueHd, next = TSO_LINK(ThreadQueueHd), count=0; (next != Prelude_Z91Z93_closure) && !(found = (TSO_PRI(tso) >= TSO_PRI(next))); prev = next, next = TSO_LINK(next), count++) {} ASSERT(!IS_IDLE(CurrentProc)); /* found can only be rtsTrue if pri sparking enabled */ if (found) { # if defined(GRAN_CHECK) ++non_end_add_threads; # endif /* Add tso to ThreadQueue between prev and next */ TSO_LINK(tso) = next; if ( next == Prelude_Z91Z93_closure ) { ThreadQueueTl = tso; } else { /* no back link for TSO chain */ } if ( prev == Prelude_Z91Z93_closure ) { /* Never add TSO as first elem of thread queue; the first */ /* element should be the one that is currently running -- HWL */ # if defined(GRAN_CHECK) fprintf(stderr,"Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %#lx (PRI=%d) as first elem of threadQ (%#lx) on proc %u (@ %u)\n", tso, TSO_PRI(tso), ThreadQueueHd, CurrentProc, CurrentTime[CurrentProc]); # endif } else { TSO_LINK(prev) = tso; } } else { /* !found */ /* or not pri sparking! */ /* Add TSO to the end of the thread queue on that processor */ TSO_LINK(ThreadQueueTl) = EVENT_TSO(event); ThreadQueueTl = EVENT_TSO(event); } CurrentTime[CurrentProc] += count * RTSflags.GranFlags.gran_pri_sched_overhead + RTSflags.GranFlags.gran_threadqueuetime; if(RTSflags.GranFlags.DoThreadMigration) ++SurplusThreads; if(RTSflags.GranFlags.granSimStats && !(( event_type == GR_START || event_type == GR_STARTQ) && RTSflags.GranFlags.labelling) ) DumpRawGranEvent(CurrentProc,creator,event_type+1, tso,node, TSO_SPARKNAME(tso)); /* ^^^ SN (spark name) as optional info */ /* spark_queue_len(CurrentProc,ADVISORY_POOL)); */ /* ^^^ spark length as optional info */ # if defined(GRAN_CHECK) /* Check if thread queue is sorted. Only for testing, really! HWL */ if ( RTSflags.GranFlags.DoPriorityScheduling && (RTSflags.GranFlags.debug & 0x400) ) { rtsBool sorted = rtsTrue; P_ prev, next; if (ThreadQueueHd==Prelude_Z91Z93_closure || TSO_LINK(ThreadQueueHd)==Prelude_Z91Z93_closure) { /* just 1 elem => ok */ } else { /* Qu' wa'DIch yIleghQo' (ignore first elem)! */ for (prev = TSO_LINK(ThreadQueueHd), next = TSO_LINK(prev); (next != Prelude_Z91Z93_closure) ; prev = next, next = TSO_LINK(prev)) { sorted = sorted && (TSO_PRI(prev) >= TSO_PRI(next)); } } if (!sorted) { fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n", CurrentProc); G_THREADQ(ThreadQueueHd,0x1); } } # endif CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_threadqueuetime; } \end{code} @InsertThread@, which is only used for GranSim Light, is similar to @StartThread@ in that it adds a TSO to a thread queue. However, it assumes that the thread queue is sorted by local clocks and it inserts the TSO at the right place in the queue. Don't create any event, just insert. \begin{code} rtsBool InsertThread(tso) P_ tso; { P_ prev, next; I_ count = 0; rtsBool found = rtsFalse; # if defined(GRAN_CHECK) if ( !RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh {InsertThread}Daq: InsertThread should only be used in a GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } if ( RTSflags.GranFlags.Light && CurrentProc!=0 ) { fprintf(stderr,"Qagh {StartThread}Daq: CurrentProc must be 0 in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif /* Idle proc; same for pri spark and basic version */ if(ThreadQueueHd==Prelude_Z91Z93_closure) { ThreadQueueHd = ThreadQueueTl = tso; /* MAKE_BUSY(CurrentProc); */ return (rtsTrue); } for (prev = ThreadQueueHd, next = TSO_LINK(ThreadQueueHd), count=0; (next != Prelude_Z91Z93_closure) && !(found = (TSO_CLOCK(tso) < TSO_CLOCK(next))); prev = next, next = TSO_LINK(next), count++) {} /* found can only be rtsTrue if pri sparking enabled */ if (found) { /* Add tso to ThreadQueue between prev and next */ TSO_LINK(tso) = next; if ( next == Prelude_Z91Z93_closure ) { ThreadQueueTl = tso; } else { /* no back link for TSO chain */ } if ( prev == Prelude_Z91Z93_closure ) { ThreadQueueHd = tso; } else { TSO_LINK(prev) = tso; } } else { /* !found */ /* or not pri sparking! */ /* Add TSO to the end of the thread queue on that processor */ TSO_LINK(ThreadQueueTl) = tso; ThreadQueueTl = tso; } return (prev == Prelude_Z91Z93_closure); } \end{code} Export work to idle PEs. This function is called from @ReSchedule@ before dispatching on the current event. @HandleIdlePEs@ iterates over all PEs, trying to get work for idle PEs. Note, that this is a simplification compared to GUM's fishing model. We try to compensate for that by making the cost for stealing work dependent on the number of idle processors and thereby on the probability with which a randomly sent fish would find work. \begin{code} HandleIdlePEs() { PROC proc; # if defined(GRAN) && defined(GRAN_CHECK) if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh {HandleIdlePEs}Daq: Should never be entered in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif if(ANY_IDLE) for(proc = 0; proc < RTSflags.GranFlags.proc; proc++) if(IS_IDLE(proc)) /* && IS_SPARKING(proc) && IS_STARTING(proc) */ /* First look for local work! */ if (PendingSparksHd[proc][ADVISORY_POOL]!=NULL) { new_event(proc,proc,CurrentTime[proc], FINDWORK,Prelude_Z91Z93_closure,Prelude_Z91Z93_closure,NULL); MAKE_SPARKING(proc); } /* Then try to get remote work! */ else if ((RTSflags.GranFlags.max_fishes==0 || OutstandingFishes[proc]= 4 || OutstandingFetches[proc] == 0)) { if (SurplusThreads > 0l) /* Steal a thread */ StealThread(proc); if(!IS_IDLE(proc)) break; } if(SparksAvail > 0l && (RTSflags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[proc] == 0)) /* Steal a spark */ StealSpark(proc); if (SurplusThreads > 0l && (RTSflags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[proc] == 0)) /* Steal a thread */ StealThread(proc); } } \end{code} Steal a spark and schedule moving it to proc. We want to look at PEs in clock order -- most retarded first. Currently sparks are only stolen from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually, this should be changed to first steal from the former then from the latter. We model a sort of fishing mechanism by counting the number of sparks and threads we are currently stealing. \begin{code} StealSpark(proc) PROC proc; { PROC p; sparkq spark, prev, next; rtsBool stolen = rtsFalse; TIME times[MAX_PROC], stealtime; unsigned ntimes=0, i, j; int first_later, upb, r; # if defined(GRAN) && defined(GRAN_CHECK) if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh {StealSpark}Daq: Should never be entered in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif /* times shall contain processors from which we may steal sparks */ for(p=0; p < RTSflags.GranFlags.proc; ++p) if(proc != p && PendingSparksHd[p][ADVISORY_POOL] != NULL && CurrentTime[p] <= CurrentTime[CurrentProc]) times[ntimes++] = p; /* sort times */ for(i=0; i < ntimes; ++i) for(j=i+1; j < ntimes; ++j) if(CurrentTime[times[i]] > CurrentTime[times[j]]) { unsigned temp = times[i]; times[i] = times[j]; times[j] = temp; } /* Choose random processor to steal spark from; first look at processors */ /* that are earlier than the current one (i.e. proc) */ for(first_later=0; (first_later < ntimes) && (CurrentTime[times[first_later]] < CurrentTime[proc]); ++first_later) /* nothing */ ; while (!stolen && (ntimes>0)) { long unsigned int r, q=0; upb = (first_later==0) ? ntimes : first_later; if (RTSflags.GranFlags.RandomSteal) { r = lrand48(); /* [0, RAND_MAX] */ } else { r = 0; } /* -- ASSERT(r<=RAND_MAX); */ i = (unsigned int) (r % upb); /* [0, upb) */ /* -- ASSERT((i>=0) && (i<=upb)); */ p = times[i]; /* -- ASSERT((p>=0) && (p CurrentTime[proc] ? CurrentTime[p] : CurrentTime[proc]) + SparkStealTime(); new_event(proc,p /* CurrentProc */,stealtime, MOVESPARK,Prelude_Z91Z93_closure,Prelude_Z91Z93_closure,spark); /* MAKE_BUSY(proc); not yet; busy when TSO in threadq */ stolen = rtsTrue; ++OutstandingFishes[proc]; if (IS_IDLE(proc)) MAKE_FISHING(proc); ++SPARK_GLOBAL(spark); --SparksAvail; CurrentTime[p] += RTSflags.GranFlags.gran_mtidytime; } else /* !(SHOULD_SPARK(SPARK_NODE(spark))) */ { if(RTSflags.GranFlags.granSimStats_Sparks) DumpRawGranEvent(p,(PROC)0,SP_PRUNED,Prelude_Z91Z93_closure, SPARK_NODE(spark), spark_queue_len(p,ADVISORY_POOL)); --SparksAvail; DisposeSpark(spark); } if(spark == PendingSparksHd[p][ADVISORY_POOL]) PendingSparksHd[p][ADVISORY_POOL] = next; if(prev!=NULL) SPARK_NEXT(prev) = next; } /* for (spark=... iterating over sparkq */ if(PendingSparksHd[p][ADVISORY_POOL] == NULL) PendingSparksTl[p][ADVISORY_POOL] = NULL; if (!stolen && (ntimes>0)) { /* nothing stealable from proc p :( */ ASSERT(times[i]==p); /* remove p from the list (at pos i) */ for (j=i; j+10) && (CurrentTime[times[first_later-1]]>CurrentTime[proc]); first_later--) /* nothing */ ; } } /* while */ # if defined(GRAN_CHECK) if (stolen && (i!=0)) { /* only for statistics */ rs_sp_count++; ntimes_total += ntimes; fl_total += first_later; no_of_steals++; } # endif } \end{code} Steal a spark and schedule moving it to proc. \begin{code} StealThread(proc) PROC proc; { PROC p; rtsBool found; P_ thread, prev; TIME times[MAX_PROC], stealtime; unsigned ntimes=0, i, j; int first_later, upb, r; /* Hunt for a thread */ # if defined(GRAN) && defined(GRAN_CHECK) if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh {StealThread}: Should never be entered in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif /* times shall contain processors from which we may steal threads */ for(p=0; p < RTSflags.GranFlags.proc; ++p) if(proc != p && RunnableThreadsHd[p] != Prelude_Z91Z93_closure && CurrentTime[p] <= CurrentTime[CurrentProc]) times[ntimes++] = p; /* sort times */ for(i=0; i < ntimes; ++i) for(j=i+1; j < ntimes; ++j) if(CurrentTime[times[i]] > CurrentTime[times[j]]) { unsigned temp = times[i]; times[i] = times[j]; times[j] = temp; } /* Choose random processor to steal spark from; first look at processors */ /* that are earlier than the current one (i.e. proc) */ for(first_later=0; (first_later < ntimes) && (CurrentTime[times[first_later]] < CurrentTime[proc]); ++first_later) /* nothing */ ; while (!found && (ntimes>0)) { long unsigned int r, q=0; upb = (first_later==0) ? ntimes : first_later; if (RTSflags.GranFlags.RandomSteal) { r = lrand48(); /* [0, RAND_MAX] */ } else { r = 0; } /* -- ASSERT(r<=RAND_MAX); */ if ( RTSflags.GranFlags.debug & 0x2000 ) fprintf(stderr,"rand value: %d " , r); i = (unsigned int) (r % upb); /* [0, upb] */ /* -- ASSERT((i>=0) && (i<=upb)); */ p = times[i]; /* -- ASSERT((p>=0) && (p CurrentTime[proc] ? CurrentTime[p] : CurrentTime[proc]) + SparkStealTime() + 4l * RTSflags.GranFlags.gran_additional_latency + 5l * RTSflags.GranFlags.gran_munpacktime; /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */ SET_PROCS(thread,Nowhere /* PE_NUMBER(proc) */); /* Move from one queue to another */ new_event(proc,p,stealtime,MOVETHREAD,thread,Prelude_Z91Z93_closure,NULL); /* MAKE_BUSY(proc); not yet; only when thread is in threadq */ ++OutstandingFishes[proc]; if (IS_IDLE(proc)) MAKE_FISHING(proc); --SurplusThreads; if(RTSflags.GranFlags.granSimStats) DumpRawGranEvent(p,proc,GR_STEALING,thread, Prelude_Z91Z93_closure,0); CurrentTime[p] += 5l * RTSflags.GranFlags.gran_mtidytime; /* Found one */ found = rtsTrue; /* break; */ } } if (!found && (ntimes>0)) { /* nothing stealable from proc p */ ASSERT(times[i]==p); /* remove p from the list (at pos i) */ for (j=i; j+1= SPARK_GRAN_INFO(next))); prev = next, next = SPARK_NEXT(next), count++) {} } else { /* 'utQo' */ found = rtsFalse; /* to add it at the end */ } if (found) { SPARK_NEXT(spark) = next; if ( next == NULL ) { PendingSparksTl[CurrentProc][ADVISORY_POOL] = spark; } else { SPARK_PREV(next) = spark; } SPARK_PREV(spark) = prev; if ( prev == NULL ) { PendingSparksHd[CurrentProc][ADVISORY_POOL] = spark; } else { SPARK_NEXT(prev) = spark; } } else { /* (RTSflags.GranFlags.DoPrioritySparking && !found) || !DoPrioritySparking */ SPARK_NEXT(spark) = NULL; SPARK_PREV(spark) = PendingSparksTl[CurrentProc][ADVISORY_POOL]; if (PendingSparksHd[CurrentProc][ADVISORY_POOL] == NULL) PendingSparksHd[CurrentProc][ADVISORY_POOL] = spark; else SPARK_NEXT(PendingSparksTl[CurrentProc][ADVISORY_POOL]) = spark; PendingSparksTl[CurrentProc][ADVISORY_POOL] = spark; } ++SparksAvail; if (RTSflags.GranFlags.DoPrioritySparking) { CurrentTime[CurrentProc] += count * RTSflags.GranFlags.gran_pri_spark_overhead; } # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.debug & 0x1000 ) { for (prev = NULL, next = PendingSparksHd[CurrentProc][ADVISORY_POOL]; (next != NULL); prev = next, next = SPARK_NEXT(next)) {} if ( (prev!=NULL) && (prev!=PendingSparksTl[CurrentProc][ADVISORY_POOL]) ) fprintf(stderr,"SparkQ inconsistency after adding spark %#lx: (PE %u, pool %u) PendingSparksTl (%#lx) not end of queue (%#lx)\n", spark,CurrentProc,ADVISORY_POOL, PendingSparksTl[CurrentProc][ADVISORY_POOL], prev); } # endif # if defined(GRAN_CHECK) /* Check if the sparkq is still sorted. Just for testing, really! */ if ( RTSflags.GranFlags.debug & 0x400 ) { rtsBool sorted = rtsTrue; sparkq prev, next; if (PendingSparksHd[CurrentProc][ADVISORY_POOL] == NULL || SPARK_NEXT(PendingSparksHd[CurrentProc][ADVISORY_POOL]) == NULL ) { /* just 1 elem => ok */ } else { for (prev = PendingSparksHd[CurrentProc][ADVISORY_POOL], next = SPARK_NEXT(PendingSparksHd[CurrentProc][ADVISORY_POOL]); (next != NULL) ; prev = next, next = SPARK_NEXT(next)) { sorted = sorted && (SPARK_GRAN_INFO(prev) >= SPARK_GRAN_INFO(next)); } } if (!sorted) { fprintf(stderr,"Warning: SPARKQ on PE %d is not sorted:\n", CurrentProc); G_SPARKQ(PendingSparksHd[CurrentProc][ADVISORY_POOL],1); } } # endif } void DisposeSpark(spark) sparkq spark; { /* A SP_PRUNED line should be dumped when this is called from pruning or */ /* discarding a spark! */ if(spark!=NULL) free(spark); --SparksAvail; } void DisposeSparkQ(spark) sparkq spark; { if (spark==NULL) return; DisposeSparkQ(SPARK_NEXT(spark)); # ifdef GRAN_CHECK if (SparksAvail < 0) fprintf(stderr,"DisposeSparkQ: SparksAvail<0 after disposing sparkq @ 0x%lx\n", spark); # endif free(spark); } #endif /* GRAN */ \end{code} % {GrAnSim}vaD (Notes on GrAnSim) -- HWL: % Qu'vaD ngoq % NB: mayQo' wIvwI' \paragraph{Notes on GrAnSim:} The following routines are for handling threads. Currently, we use an unfair scheduling policy in GrAnSim. Thus there are no explicit functions for scheduling here. If other scheduling policies are added to the system that code should go in here. \begin{code} /* Create a new TSO, with the specified closure to enter and thread type */ #if defined(GRAN) P_ NewThread(topClosure, type, pri) P_ topClosure; W_ type; I_ pri; #else P_ NewThread(topClosure, type) P_ topClosure; W_ type; #endif /* GRAN */ { P_ stko, tso; # if defined(GRAN) && defined(GRAN_CHECK) if ( RTSflags.GranFlags.Light && CurrentProc!=0) { fprintf(stderr,"Qagh {NewThread}Daq: CurrentProc must be 0 in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif if (AvailableTSO != Prelude_Z91Z93_closure) { tso = AvailableTSO; #if defined(GRAN) SET_PROCS(tso,ThisPE); /* Allocate it locally! */ #endif AvailableTSO = TSO_LINK(tso); } else if (SAVE_Hp + TSO_HS + TSO_CTS_SIZE > SAVE_HpLim) { return(NULL); } else { ALLOC_TSO(TSO_HS,BYTES_TO_STGWORDS(sizeof(STGRegisterTable)), BYTES_TO_STGWORDS(sizeof(StgDouble))); tso = SAVE_Hp + 1; SAVE_Hp += TSO_HS + TSO_CTS_SIZE; SET_TSO_HDR(tso, TSO_info, CCC); } TSO_LINK(tso) = Prelude_Z91Z93_closure; #if defined(GRAN) TSO_PRI(tso) = pri; /* Priority of that TSO -- HWL */ #endif #ifdef PAR TSO_CCC(tso) = (CostCentre)STATIC_CC_REF(CC_MAIN); #endif TSO_NAME(tso) = (P_) INFO_PTR(topClosure); /* A string would be nicer -- JSM */ TSO_ID(tso) = threadId++; TSO_TYPE(tso) = type; TSO_PC1(tso) = TSO_PC2(tso) = EnterNodeCode; TSO_ARG1(tso) = /* TSO_ARG2(tso) = */ 0; TSO_SWITCH(tso) = NULL; #ifdef TICKY_TICKY TSO_AHWM(tso) = 0; TSO_BHWM(tso) = 0; #endif #if defined(GRAN) || defined(PAR) TSO_SPARKNAME(tso) = 0; # if defined(GRAN) TSO_STARTEDAT(tso) = CurrentTime[CurrentProc]; # else TSO_STARTEDAT(tso) = CURRENT_TIME; # endif TSO_EXPORTED(tso) = 0; TSO_BASICBLOCKS(tso) = 0; TSO_ALLOCS(tso) = 0; TSO_EXECTIME(tso) = 0; TSO_FETCHTIME(tso) = 0; TSO_FETCHCOUNT(tso) = 0; TSO_BLOCKTIME(tso) = 0; TSO_BLOCKCOUNT(tso) = 0; TSO_BLOCKEDAT(tso) = 0; TSO_GLOBALSPARKS(tso) = 0; TSO_LOCALSPARKS(tso) = 0; # if defined(GRAN) if (RTSflags.GranFlags.Light) TSO_CLOCK(tso) = TSO_STARTEDAT(tso); /* local clock */ else # endif TSO_CLOCK(tso) = 0; #endif /* * set pc, Node (R1), liveness */ CurrentRegTable = TSO_INTERNAL_PTR(tso); SAVE_Liveness = LIVENESS_R1; SAVE_R1.p = topClosure; # ifndef PAR if (type == T_MAIN) { stko = MainStkO; } else { # endif if (AvailableStack != Prelude_Z91Z93_closure) { stko = AvailableStack; #if defined(GRAN) SET_PROCS(stko,ThisPE); #endif AvailableStack = STKO_LINK(AvailableStack); } else if (SAVE_Hp + STKO_HS + RTSflags.ConcFlags.stkChunkSize > SAVE_HpLim) { return(NULL); } else { /* ALLOC_STK(STKO_HS,STKO_CHUNK_SIZE,0); use RTSflag now*/ ALLOC_STK(STKO_HS,RTSflags.ConcFlags.stkChunkSize,0); stko = SAVE_Hp + 1; SAVE_Hp += STKO_HS + RTSflags.ConcFlags.stkChunkSize; SET_STKO_HDR(stko, StkO_info, CCC); } STKO_SIZE(stko) = RTSflags.ConcFlags.stkChunkSize + STKO_VHS; STKO_SpB(stko) = STKO_SuB(stko) = STKO_BSTK_BOT(stko) + BREL(1); STKO_SpA(stko) = STKO_SuA(stko) = STKO_ASTK_BOT(stko) + AREL(1); STKO_LINK(stko) = Prelude_Z91Z93_closure; STKO_RETURN(stko) = NULL; # ifndef PAR } # endif #ifdef TICKY_TICKY STKO_ADEP(stko) = STKO_BDEP(stko) = 0; #endif if (type == T_MAIN) { STKO_SpA(stko) -= AREL(1); *STKO_SpA(stko) = (P_) WorldStateToken_closure; } SAVE_Ret = (StgRetAddr) UNVEC(stopThreadDirectReturn,vtbl_stopStgWorld); SAVE_StkO = stko; if (DO_QP_PROF) { QP_Event1(do_qp_prof > 1 ? "*A" : "*G", tso); } #if defined(GRAN_CHECK) tot_sq_len += spark_queue_len(CurrentProc,ADVISORY_POOL); tot_sq_probes++; #endif return tso; } \end{code} In GrAnSim the @EndThread@ function is the place where statistics about the simulation are printed. I guess, that could be moved into @main.lc@. \begin{code} void EndThread(STG_NO_ARGS) { P_ stko; #if defined(PAR) TIME now = CURRENT_TIME; #endif #ifdef TICKY_TICKY if (RTSflags.TickyFlags.showTickyStats) { fprintf(RTSflags.TickyFlags.tickyFile, "Thread %d (%lx)\n\tA stack max. depth: %ld words\n", TSO_ID(CurrentTSO), TSO_NAME(CurrentTSO), TSO_AHWM(CurrentTSO)); fprintf(RTSflags.TickyFlags.tickyFile, "\tB stack max. depth: %ld words\n", TSO_BHWM(CurrentTSO)); } #endif if (DO_QP_PROF) { QP_Event1("G*", CurrentTSO); } #if defined(GRAN) ASSERT(CurrentTSO == ThreadQueueHd); if (RTSflags.GranFlags.DoThreadMigration) --SurplusThreads; if(TSO_TYPE(CurrentTSO)==T_MAIN) { int i; rtsBool is_first; for(i=0; i < RTSflags.GranFlags.proc; ++i) { is_first = rtsTrue; while(RunnableThreadsHd[i] != Prelude_Z91Z93_closure) { /* We schedule runnable threads before killing them to */ /* make the job of bookkeeping the running, runnable, */ /* blocked threads easier for scripts like gr2ps -- HWL */ if (RTSflags.GranFlags.granSimStats && !is_first && (!RTSflags.GranFlags.Light || RTSflags.GranFlags.debug & 0x20000) ) DumpRawGranEvent(i,(PROC)0,GR_SCHEDULE, RunnableThreadsHd[i], Prelude_Z91Z93_closure,0); if (!RTSflags.GranFlags.granSimStats_suppressed && TSO_TYPE(RunnableThreadsHd[i])!=T_MAIN) DumpGranInfo(i,RunnableThreadsHd[i],rtsTrue); RunnableThreadsHd[i] = TSO_LINK(RunnableThreadsHd[i]); is_first = rtsFalse; } } ThreadQueueHd = Prelude_Z91Z93_closure; /* Printing of statistics has been moved into end_gr_simulation */ } /* ... T_MAIN */ if (RTSflags.GranFlags.labelling && RTSflags.GranFlags.granSimStats && !RTSflags.GranFlags.granSimStats_suppressed) DumpStartEventAt(TSO_STARTEDAT(CurrentTSO),where_is(CurrentTSO),0,GR_START, CurrentTSO,Prelude_Z91Z93_closure, TSO_SPARKNAME(CurrentTSO)); /* ^^^ SN (spark name) as optional info */ /* spark_queue_len(CurrentProc,ADVISORY_POOL)); */ /* ^^^ spark length as optional info */ if (RTSflags.GranFlags.granSimStats && !RTSflags.GranFlags.granSimStats_suppressed) DumpGranInfo(CurrentProc,CurrentTSO, TSO_TYPE(CurrentTSO) != T_ADVISORY); if (RTSflags.GranFlags.granSimStats_Binary && TSO_TYPE(CurrentTSO)==T_MAIN && !RTSflags.GranFlags.granSimStats_suppressed) grterminate(CurrentTime[CurrentProc]); if (TSO_TYPE(CurrentTSO)!=T_MAIN) ActivateNextThread(CurrentProc); /* Note ThreadQueueHd is Nil when the main thread terminates if(ThreadQueueHd != Prelude_Z91Z93_closure) { if (RTSflags.GranFlags.granSimStats && !RTSflags.GranFlags.granSimStats_suppressed && (!RTSflags.GranFlags.Light || RTSflags.GranFlags.debug & 0x20000) ) DumpGranEvent(GR_SCHEDULE,ThreadQueueHd); CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_threadscheduletime; } */ #endif /* GRAN */ #ifdef PAR if (RTSflags.ParFlags.granSimStats) { TSO_EXECTIME(CurrentTSO) += now - TSO_BLOCKEDAT(CurrentTSO); DumpGranInfo(thisPE, CurrentTSO, TSO_TYPE(CurrentTSO) != T_ADVISORY); } #endif switch (TSO_TYPE(CurrentTSO)) { case T_MAIN: required_thread_count--; #ifdef PAR if (GRANSIMSTATS_BINARY) grterminate(now); #endif #ifdef GRAN longjmp(scheduler_loop, -1); /* i.e. the world comes to an end NOW */ #else ReSchedule(0); /* i.e. the world will eventually come to an end */ #endif case T_REQUIRED: required_thread_count--; break; case T_ADVISORY: advisory_thread_count--; break; case T_FAIL: EXIT(EXIT_FAILURE); default: fflush(stdout); fprintf(stderr, "EndThread: %x unknown\n", TSO_TYPE(CurrentTSO)); EXIT(EXIT_FAILURE); } /* Reuse stack object space */ ASSERT(STKO_LINK(SAVE_StkO) == Prelude_Z91Z93_closure); STKO_LINK(SAVE_StkO) = AvailableStack; AvailableStack = SAVE_StkO; /* Reuse TSO */ TSO_LINK(CurrentTSO) = AvailableTSO; AvailableTSO = CurrentTSO; CurrentTSO = Prelude_Z91Z93_closure; CurrentRegTable = NULL; #if defined(GRAN) /* NB: Now ThreadQueueHd is either the next runnable thread on this */ /* proc or it's Prelude_Z91Z93_closure. In the latter case, a FINDWORK will be */ /* issued by ReSchedule. */ ReSchedule(SAME_THREAD); /* back for more! */ #else ReSchedule(0); /* back for more! */ #endif } \end{code} %**************************************************************************** % \subsection[thread-blocking]{Local Blocking} % %**************************************************************************** \begin{code} #if defined(GRAN_COUNT) /* Some non-essential maybe-useful statistics-gathering */ void CountnUPDs() { ++nUPDs; } void CountnUPDs_old() { ++nUPDs_old; } void CountnUPDs_new() { ++nUPDs_new; } void CountnPAPs() { ++nPAPs; } #endif EXTDATA_RO(BQ_info); #ifndef GRAN /* NB: non-GRAN version ToDo * * AwakenBlockingQueue awakens a list of TSOs and FBQs. */ P_ PendingFetches = Prelude_Z91Z93_closure; void AwakenBlockingQueue(bqe) P_ bqe; { P_ last_tso = NULL; # ifdef PAR P_ next; TIME now = CURRENT_TIME; # endif # ifndef PAR while (bqe != Prelude_Z91Z93_closure) { # else while (IS_MUTABLE(INFO_PTR(bqe))) { switch (INFO_TYPE(INFO_PTR(bqe))) { case INFO_TSO_TYPE: # endif if (DO_QP_PROF) { QP_Event2(do_qp_prof > 1 ? "RA" : "RG", bqe, CurrentTSO); } # ifdef PAR if (RTSflags.ParFlags.granSimStats) { DumpGranEvent(GR_RESUMEQ, bqe); switch (TSO_QUEUE(bqe)) { case Q_BLOCKED: TSO_BLOCKTIME(bqe) += now - TSO_BLOCKEDAT(bqe); break; case Q_FETCHING: TSO_FETCHTIME(bqe) += now - TSO_BLOCKEDAT(bqe); break; default: fflush(stdout); fprintf(stderr, "ABQ: TSO_QUEUE invalid.\n"); EXIT(EXIT_FAILURE); } } # endif if (last_tso == NULL) { if (RunnableThreadsHd == Prelude_Z91Z93_closure) { RunnableThreadsHd = bqe; } else { TSO_LINK(RunnableThreadsTl) = bqe; } } last_tso = bqe; bqe = TSO_LINK(bqe); # ifdef PAR break; case INFO_BF_TYPE: next = BF_LINK(bqe); BF_LINK(bqe) = PendingFetches; PendingFetches = bqe; bqe = next; if (last_tso != NULL) TSO_LINK(last_tso) = next; break; default: fprintf(stderr, "Unexpected IP (%#lx) in blocking queue at %#lx\n", INFO_PTR(bqe), (W_) bqe); EXIT(EXIT_FAILURE); } } # else } # endif if (last_tso != NULL) { RunnableThreadsTl = last_tso; # ifdef PAR TSO_LINK(last_tso) = Prelude_Z91Z93_closure; # endif } } #endif /* !GRAN */ #ifdef GRAN # if defined(GRAN_CHECK) /* First some useful test functions */ EXTFUN(RBH_Save_0_info); EXTFUN(RBH_Save_1_info); EXTFUN(RBH_Save_2_info); void PRINT_BQ(bqe) P_ bqe; { W_ it; P_ last = NULL; char str[80], str0[80]; fprintf(stderr,"\n[PE %d] @ %lu BQ: ", CurrentProc,CurrentTime[CurrentProc]); if ( bqe == Prelude_Z91Z93_closure ) { fprintf(stderr," NIL.\n"); return; } if ( bqe == NULL ) { fprintf(stderr," NULL\n"); return; } while (IS_MUTABLE(INFO_PTR(bqe))) { /* This distinguishes TSOs from */ W_ proc; /* RBH_Save_? closures! */ /* Find where the tso lives */ proc = where_is(bqe); it = INFO_TYPE(INFO_PTR(bqe)); switch (it) { case INFO_TSO_TYPE: strcpy(str0,"TSO"); break; case INFO_BQ_TYPE: strcpy(str0,"BQ"); break; default: strcpy(str0,"???"); break; } if(proc == CurrentProc) fprintf(stderr," %#lx (%x) L %s,", bqe, TSO_ID(bqe), str0); else fprintf(stderr," %#lx (%x) G (PE %d) %s,", bqe, TSO_ID(bqe), proc, str0); last = bqe; switch (it) { case INFO_TSO_TYPE: bqe = TSO_LINK(bqe); break; case INFO_BQ_TYPE: bqe = TSO_LINK(bqe); break; default: bqe = Prelude_Z91Z93_closure; break; } /* TSO_LINK(last_tso) = Prelude_Z91Z93_closure; */ } if ( bqe == Prelude_Z91Z93_closure ) fprintf(stderr," NIL.\n"); else if ( (INFO_PTR(bqe) == (P_) RBH_Save_0_info) || (INFO_PTR(bqe) == (P_) RBH_Save_1_info) || (INFO_PTR(bqe) == (P_) RBH_Save_2_info) ) fprintf(stderr," RBH.\n"); /* fprintf(stderr,"\n%s\n",str); */ } rtsBool CHECK_BQ(node, tso, proc) P_ node, tso; PROC proc; { P_ bqe; W_ it; P_ last = NULL; PROC p = where_is(tso); rtsBool ok = rtsTrue; if ( p != proc) { fprintf(stderr,"ERROR in CHECK_BQ: CurrentTSO %#lx (%x) on proc %d but CurrentProc = %d\n", tso, TSO_ID(tso), proc); ok = rtsFalse; } switch (INFO_TYPE(INFO_PTR(node))) { case INFO_BH_TYPE: case INFO_BH_U_TYPE: bqe = (P_) BQ_ENTRIES(node); return (rtsTrue); /* BHs don't have BQs */ break; case INFO_BQ_TYPE: bqe = (P_) BQ_ENTRIES(node); break; case INFO_FMBQ_TYPE: fprintf(stderr,"CHECK_BQ: ERROR: FMBQ closure (%#lx) found in GrAnSim (TSO=%#lx (%x))\n", node, tso, TSO_ID(tso)); EXIT(EXIT_FAILURE); break; case INFO_SPEC_RBH_TYPE: bqe = (P_) SPEC_RBH_BQ(node); break; case INFO_GEN_RBH_TYPE: bqe = (P_) GEN_RBH_BQ(node); break; default: { P_ info_ptr; I_ size, ptrs, nonptrs, vhs; char info_hdr_ty[80]; fprintf(stderr, "CHECK_BQ: thought %#lx was a black hole (IP %#lx)", node, INFO_PTR(node)); info_ptr = get_closure_info(node, &size, &ptrs, &nonptrs, &vhs, info_hdr_ty); fprintf(stderr, " %s\n",info_hdr_ty); /* G_PRINT_NODE(node); */ return (rtsFalse); /* EXIT(EXIT_FAILURE); */ } } while (IS_MUTABLE(INFO_PTR(bqe))) { /* This distinguishes TSOs from */ W_ proc; /* RBH_Save_? closures! */ /* Find where the tso lives */ proc = where_is(bqe); it = INFO_TYPE(INFO_PTR(bqe)); if ( bqe == tso ) { fprintf(stderr,"ERROR in CHECK_BQ [Node = 0x%lx, PE %d]: TSO %#lx (%x) already in BQ: ", node, proc, tso, TSO_ID(tso)); PRINT_BQ(BQ_ENTRIES(node)); ok = rtsFalse; } bqe = TSO_LINK(bqe); } return (ok); } /* End of test functions */ # endif /* GRAN_CHECK */ /* This version of AwakenBlockingQueue has been originally taken from the GUM code. It is now assimilated into GrAnSim */ /* Note: This version assumes a pointer to a blocking queue rather than a node with an attached blocking queue as input */ P_ AwakenBlockingQueue(bqe) P_ bqe; { /* P_ tso = (P_) BQ_ENTRIES(node); */ P_ last = NULL; /* P_ prev; */ W_ notifytime; # if 0 if(do_gr_sim) # endif /* Compatibility mode with old libaries! 'oH jIvoQmoH */ if (IS_BQ_CLOSURE(bqe)) bqe = (P_)BQ_ENTRIES(bqe); else if ( INFO_TYPE(INFO_PTR(bqe)) == INFO_SPEC_RBH_TYPE ) bqe = (P_)SPEC_RBH_BQ(bqe); else if ( INFO_TYPE(INFO_PTR(bqe)) == INFO_GEN_RBH_TYPE ) bqe = (P_)GEN_RBH_BQ(bqe); # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.debug & 0x100 ) { PRINT_BQ(bqe); } # endif # if defined(GRAN_COUNT) ++nUPDs; if (tso != Prelude_Z91Z93_closure) ++nUPDs_BQ; # endif # if defined(GRAN_CHECK) if (RTSflags.GranFlags.debug & 0x100) fprintf(stderr,"----- AwBQ: "); # endif while (IS_MUTABLE(INFO_PTR(bqe))) { /* This distinguishes TSOs from */ W_ proc; /* RBH_Save_? closures! */ ASSERT(INFO_TYPE(INFO_PTR(bqe)) == INFO_TSO_TYPE); if (DO_QP_PROF) { QP_Event2(do_qp_prof > 1 ? "RA" : "RG", bqe, CurrentTSO); } # if defined(GRAN_COUNT) ++BQ_lens; # endif /* Find where the tso lives */ proc = where_is(bqe); if(proc == CurrentProc) { notifytime = CurrentTime[CurrentProc] + RTSflags.GranFlags.gran_lunblocktime; } else { /* A better way of handling this would be to introduce a GLOBALUNBLOCK event which is created here. -- HWL */ CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mpacktime; notifytime = STG_MAX(CurrentTime[CurrentProc],CurrentTime[proc]) + RTSflags.GranFlags.gran_gunblocktime; CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mtidytime; /* new_event(proc, CurrentProc, notifytime, GLOBALUNBLOCK,bqe,Prelude_Z91Z93_closure,NULL); */ } /* cost the walk over the queue */ CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_lunblocktime; /* GrAnSim Light: make blocked TSO aware of the time that passed */ if (RTSflags.GranFlags.Light) TSO_CLOCK(bqe) = notifytime; /* and create a resume message */ new_event(proc, CurrentProc, notifytime, RESUMETHREAD,bqe,Prelude_Z91Z93_closure,NULL); if (notifytimerR[0].p == node); # if 0 if (DO_QP_PROF) { QP_Event2(do_qp_prof > 1 ? "RA" : "RG", bqe, CurrentTSO); } # endif bqe = TSO_LINK(bqe); } assert(TSO_INTERNAL_PTR(bqe)->rR[0].p == node); # if 0 if (DO_QP_PROF) { QP_Event2(do_qp_prof > 1 ? "RA" : "RG", bqe, CurrentTSO); } # endif } # endif /* 0 */ if (RTSflags.GranFlags.debug & 0x100) fprintf(stderr,".\n"); return (bqe); /* ngo' {GrAnSim}Qo' ngoq: RunnableThreadsTl = tso; */ } #endif /* GRAN */ EXTFUN(Continue); #if defined(GRAN) /* Different interface for GRAN */ void Yield(liveness) W_ liveness; { SAVE_Liveness = liveness; TSO_PC1(CurrentTSO) = Continue; if (DO_QP_PROF) { QP_Event1("GR", CurrentTSO); } ReSchedule(SAME_THREAD); } #else /* !GRAN */ void Yield(args) W_ args; { SAVE_Liveness = args >> 1; TSO_PC1(CurrentTSO) = Continue; if (DO_QP_PROF) { QP_Event1("GR", CurrentTSO); } #ifdef PAR if (RTSflags.ParFlags.granSimStats) { /* Note that CURRENT_TIME may perform an unsafe call */ TSO_EXECTIME(CurrentTSO) += CURRENT_TIME - TSO_BLOCKEDAT(CurrentTSO); } #endif ReSchedule(args & 1); } #endif /* GRAN */ \end{code} %**************************************************************************** % \subsection[gr-fetch]{Fetching Nodes (GrAnSim only)} % %**************************************************************************** The following GrAnSim routines simulate the fetching of nodes from a remote processor. We use a 1 word bitmask to indicate on which processor a node is lying. Thus, moving or copying a node from one processor to another just requires an appropriate change in this bitmask (using @SET_GA@). Additionally, the clocks have to be updated. A special case arises when the node that is needed by processor A has been moved from a processor B to a processor C between sending out a @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to be forwarded to C. \begin{code} #if defined(GRAN) /* ngoqvam che' {GrAnSim}! */ /* Fetch node "node" to processor "p" */ int FetchNode(node,from,to) P_ node; PROC from, to; { /* In case of RTSflags.GranFlags.DoGUMMFetching this fct should never be entered! Instead, UnpackGraph is used in ReSchedule */ P_ closure; ASSERT(to==CurrentProc); # if defined(GRAN) && defined(GRAN_CHECK) if ( RTSflags.GranFlags.Light ) { fprintf(stderr,"Qagh {FetchNode}Daq: Should never be entered in GrAnSim Light setup\n"); EXIT(EXIT_FAILURE); } # endif if ( RTSflags.GranFlags.DoGUMMFetching ) { fprintf(stderr,"Qagh: FetchNode should never be entered with DoGUMMFetching\n"); EXIT(EXIT_FAILURE); } /* Now fetch the children */ if (!IS_LOCAL_TO(PROCS(node),from) && !IS_LOCAL_TO(PROCS(node),to) ) return 1; if(IS_NF(INFO_PTR(node))) /* Old: || IS_BQ(node) */ PROCS(node) |= PE_NUMBER(to); /* Copy node */ else PROCS(node) = PE_NUMBER(to); /* Move node */ return 0; } /* -------------------------------------------------- Cost of sending a packet of size n = C + P*n where C = packet construction constant, P = cost of packing one word into a packet [Should also account for multiple packets]. -------------------------------------------------- */ /* Return codes: 0 ... ok (FETCHREPLY event with a buffer containing addresses of the nearby graph has been scheduled) 1 ... node is already local (fetched by somebody else; no event is scheduled in here) 2 ... fetch request has been forwrded to the PE that now contains the node 3 ... node is a black hole (BH, BQ or RBH); no event is scheduled, and the current TSO is put into the blocking queue of that node 4 ... out of heap in PackNearbyGraph; GC should be triggered in calling function to guarantee that the tso and node inputs are valid (they may be moved during GC). ToDo: Symbolic return codes; clean up code (separate GUMMFetching from single node fetching. */ I_ HandleFetchRequest(node,p,tso) P_ node, tso; PROC p; { ASSERT(!RTSflags.GranFlags.Light); if (IS_LOCAL_TO(PROCS(node),p) ) /* Somebody else moved node already => */ { /* start tso */ # if defined(GRAN_CHECK) if (RTSflags.GranFlags.debug & 0x100 ) { P_ info_ptr; I_ size, ptrs, nonptrs, vhs; char info_hdr_ty[80]; info_ptr = get_closure_info(node, &size, &ptrs, &nonptrs, &vhs, info_hdr_ty); fprintf(stderr,"Warning: HandleFetchRequest entered with local node %#lx (%s) (PE %d)\n", node,info_hdr_ty,p); } # endif if (RTSflags.GranFlags.DoGUMMFetching) { W_ size; P_ graph; /* Create a 1-node-buffer and schedule a FETCHREPLY now */ graph = PackOneNode(node, tso, &size); new_event(p,CurrentProc,CurrentTime[CurrentProc], FETCHREPLY,tso,graph,NULL); } else { new_event(p,CurrentProc,CurrentTime[CurrentProc], FETCHREPLY,tso,node,NULL); } return (1); } else if (IS_LOCAL_TO(PROCS(node),CurrentProc) ) /* Is node still here? */ { if(RTSflags.GranFlags.DoGUMMFetching) { /* {GUM}vo' ngoqvam vInIHta' (code from GUM) */ W_ size; P_ graph; if (IS_BLACK_HOLE(INFO_PTR(node))) { /* block on BH or RBH */ new_event(p,CurrentProc,CurrentTime[p], GLOBALBLOCK,tso,node,NULL); /* Note: blockFetch is done when handling GLOBALBLOCK event */ /* When this thread is reawoken it does the usual: it tries to enter the updated node and issues a fetch if it's remote. It has forgotten that it has sent a fetch already (i.e. a FETCHNODE is swallowed by a BH, leaving the thread in a BQ */ --OutstandingFetches[p]; return (3); } # if defined(GRAN_CHECK) if (!RTSflags.GranFlags.DoReScheduleOnFetch && (tso != RunnableThreadsHd[p])) { fprintf(stderr,"Qagh {HandleFetchRequest}Daq: tso 0x%lx (%x) not at head of proc %d (0x%lx)\n", tso, TSO_ID(tso), p, RunnableThreadsHd[p]); EXIT(EXIT_FAILURE); } # endif if ((graph = PackNearbyGraph(node, tso, &size)) == NULL) return (4); /* out of heap */ /* Actual moving/copying of node is done on arrival; see FETCHREPLY */ /* Send a reply to the originator */ /* ToDo: Replace that by software costs for doing graph packing! */ CurrentTime[CurrentProc] += size * RTSflags.GranFlags.gran_mpacktime; new_event(p,CurrentProc,CurrentTime[CurrentProc]+RTSflags.GranFlags.gran_latency, FETCHREPLY,tso,graph,NULL); CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mtidytime; return (0); } else { /* incremental (single closure) fetching */ /* Actual moving/copying of node is done on arrival; see FETCHREPLY */ /* Send a reply to the originator */ CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mpacktime; new_event(p,CurrentProc,CurrentTime[CurrentProc]+RTSflags.GranFlags.gran_latency, FETCHREPLY,tso,node,NULL); CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mtidytime; return (0); } } else /* Qu'vatlh! node has been grabbed by another proc => forward */ { PROC p_new = where_is(node); TIME fetchtime; # if defined(GRAN_CHECK) if (RTSflags.GranFlags.debug & 0x2) fprintf(stderr,"Qu'vatlh! node %#lx has been grabbed by PE %d (current=%d; demander=%d) @ %d\n", node,p_new,CurrentProc,p,CurrentTime[CurrentProc]); # endif /* Prepare FORWARD message to proc p_new */ CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mpacktime; fetchtime = STG_MAX(CurrentTime[CurrentProc],CurrentTime[p_new]) + RTSflags.GranFlags.gran_latency; new_event(p_new,p,fetchtime,FETCHNODE,tso,node,NULL); CurrentTime[CurrentProc] += RTSflags.GranFlags.gran_mtidytime; return (2); } } #endif \end{code} @blockFetch@ blocks a @BlockedFetch@ node on some kind of black hole. Taken from gum/HLComms.lc. [find a better place for that ?] -- HWL {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't create @FMBQ@'s (FetchMe blocking queues) to cope with global blocking. Instead, non-local TSO are put into the BQ in the same way as local TSOs. However, we have to check if a TSO is local or global in order to account for the latencies involved and for keeping track of the number of fetches that are really going on. \begin{code} #if defined(GRAN) /* Return codes: 0 ... ok; tso is now at beginning of BQ attached to the bh closure 1 ... the bh closure is no BH any more; tso is immediately unblocked */ I_ blockFetch(tso, proc, bh) P_ tso; /* TSO which gets blocked */ PROC proc; /* PE where that tso was running */ P_ bh; /* closure to block on (BH, RBH, BQ) */ { # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.debug & 0x100 ) { P_ info_ptr; I_ size, ptrs, nonptrs, vhs; char info_hdr_ty[80]; info_ptr = get_closure_info(bh, &size, &ptrs, &nonptrs, &vhs, info_hdr_ty); fprintf(stderr,"Blocking TSO %#lx (%x)(PE %d) on node %#lx (%s) (PE %d). No graph is packed!\n", tso, TSO_ID(tso), proc, bh, info_hdr_ty, where_is(bh)); } if ( !RTSflags.GranFlags.DoReScheduleOnFetch && (tso != RunnableThreadsHd[proc]) ) { fprintf(stderr,"Qagh {blockFetch}Daq: TSO 0x%lx (%x) is not first on runnable list of proc %d (first is 0x%lx)\n", tso,TSO_ID(tso),proc,RunnableThreadsHd[proc]); EXIT(EXIT_FAILURE); } # endif if (!IS_BLACK_HOLE(INFO_PTR(bh))) { /* catches BHs and RBHs */ # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.debug & 0x100 ) { P_ info; W_ size, ptrs, nonptrs, vhs; char str[80], junk_str[80]; info = get_closure_info(bh, &size, &ptrs, &nonptrs, &vhs, str); fprintf(stderr,"blockFetch: node %#lx (%s) is not a BH => awakening TSO %#lx (%x) (PE %u)\n", bh, str, tso, TSO_ID(tso), proc); G_PRINT_NODE(bh); } # endif /* No BH anymore => immediately unblock tso */ new_event(proc,proc,CurrentTime[proc], UNBLOCKTHREAD,tso,bh,NULL); /* Is this always a REPLY to a FETCH in the profile ? */ if (RTSflags.GranFlags.granSimStats) DumpRawGranEvent(proc,proc,GR_REPLY,tso,bh,0); return (1); } /* DaH {BQ}Daq Qu' Suq 'e' wISov! Now we know that we have to put the tso into the BQ. 2 case: If block-on-fetch, tso is at head of threadq => => take it out of threadq and into BQ If reschedule-on-fetch, tso is only pointed to be event => just put it into BQ */ if (!RTSflags.GranFlags.DoReScheduleOnFetch) { /* block-on-fetch */ GranSimBlock(tso, proc, bh); /* get tso out of threadq & activate next thread (same as in BQ_entry) */ } else { /* reschedule-on-fetch */ if(RTSflags.GranFlags.granSimStats) DumpRawGranEvent(proc,where_is(bh),GR_BLOCK,tso,bh,0); ++TSO_BLOCKCOUNT(tso); TSO_BLOCKEDAT(tso) = CurrentTime[proc]; } ASSERT(TSO_LINK(tso)==Prelude_Z91Z93_closure); /* Put tso into BQ */ switch (INFO_TYPE(INFO_PTR(bh))) { case INFO_BH_TYPE: case INFO_BH_U_TYPE: TSO_LINK(tso) = Prelude_Z91Z93_closure; SET_INFO_PTR(bh, BQ_info); BQ_ENTRIES(bh) = (W_) tso; #ifdef GC_MUT_REQUIRED /* * If we modify a black hole in the old generation, we have to make * sure it goes on the mutables list */ if (bh <= StorageMgrInfo.OldLim) { MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables; StorageMgrInfo.OldMutables = bh; } else MUT_LINK(bh) = MUT_NOT_LINKED; #endif break; case INFO_BQ_TYPE: /* BF_LINK(bf) = (P_) BQ_ENTRIES(bh); */ TSO_LINK(tso) = (P_) BQ_ENTRIES(bh); BQ_ENTRIES(bh) = (W_) tso; break; case INFO_FMBQ_TYPE: fprintf(stderr,"ERROR: FMBQ closure (%#lx) found in GrAnSim (TSO=%#lx (%x))\n", bh, tso, TSO_ID(tso)); EXIT(EXIT_FAILURE); case INFO_SPEC_RBH_TYPE: /* BF_LINK(bf) = (P_) BQ_ENTRIES(bh); */ TSO_LINK(tso) = (P_) SPEC_RBH_BQ(bh); SPEC_RBH_BQ(bh) = (W_) tso; break; case INFO_GEN_RBH_TYPE: /* BF_LINK(bf) = (P_) BQ_ENTRIES(bh); */ TSO_LINK(tso) = (P_) GEN_RBH_BQ(bh); GEN_RBH_BQ(bh) = (W_) tso; break; default: { P_ info_ptr; I_ size, ptrs, nonptrs, vhs; char info_hdr_ty[80]; fprintf(stderr, "Panic: thought %#lx was a black hole (IP %#lx)", bh, INFO_PTR(bh)); # if defined(GRAN_CHECK) info_ptr = get_closure_info(bh, &size, &ptrs, &nonptrs, &vhs, info_hdr_ty); fprintf(stderr, " %s\n",info_hdr_ty); G_PRINT_NODE(bh); # endif EXIT(EXIT_FAILURE); } } return (0); } #endif /* GRAN */ \end{code} %**************************************************************************** % \subsection[qp-profile]{Quasi-Parallel Profiling} % %**************************************************************************** \begin{code} /* ToDo: Check if this is really still used anywhere!? */ I_ do_qp_prof; FILE *qp_file; /* *Virtual* Time in milliseconds */ #if !defined(GRAN) long qp_elapsed_time(STG_NO_ARGS) { extern StgDouble usertime(); return ((long) (usertime() * 1e3)); } #else long qp_elapsed_time(STG_NO_ARGS) { return ((long) CurrentTime[CurrentProc] ); } #endif static void init_qp_profiling(STG_NO_ARGS) { I_ i; char qp_filename[STATS_FILENAME_MAXLEN]; sprintf(qp_filename, QP_FILENAME_FMT, prog_argv[0]); if ((qp_file = fopen(qp_filename,"w")) == NULL ) { fprintf(stderr, "Can't open quasi-parallel profile report file %s\n", qp_filename); do_qp_prof = 0; } else { fputs(prog_argv[0], qp_file); for(i = 1; prog_argv[i]; i++) { fputc(' ', qp_file); fputs(prog_argv[i], qp_file); } fprintf(qp_file, " +RTS -C%d -t%d\n" , RTSflags.ConcFlags.ctxtSwitchTime , RTSflags.ConcFlags.maxThreads); fputs(time_str(), qp_file); fputc('\n', qp_file); } } void QP_Event0(tid, node) I_ tid; P_ node; { fprintf(qp_file, "%lu ** %lu 0x%lx\n", qp_elapsed_time(), tid, INFO_PTR(node)); } void QP_Event1(event, tso) char *event; P_ tso; { fprintf(qp_file, "%lu %s %lu 0x%lx\n", qp_elapsed_time(), event, TSO_ID(tso), TSO_NAME(tso)); } void QP_Event2(event, tso1, tso2) char *event; P_ tso1, tso2; { fprintf(qp_file, "%lu %s %lu 0x%lx %lu 0x%lx\n", qp_elapsed_time(), event, TSO_ID(tso1), TSO_NAME(tso1), TSO_ID(tso2), TSO_NAME(tso2)); } \end{code} %**************************************************************************** % \subsection[gc-GrAnSim]{Garbage collection routines for GrAnSim objects} % %**************************************************************************** Garbage collection code for the event queue. We walk the event queue so that if the only reference to a TSO is in some event (e.g. RESUME), the TSO is still preserved. The GC code now uses a breadth-first pruning strategy. This prevents the GC from keeping all sparks of the low-numbered PEs while discarding all sparks from high-numbered PEs. Such a depth-first pruning may have disastrous effects for programs that generate a huge number of sparks! \begin{code} #if defined(GRAN) extern smInfo StorageMgrInfo; /* Auxiliary functions needed in Save/RestoreSparkRoots if breadth-first */ /* pruning is done. */ static W_ arr_and(W_ arr[], I_ max) { I_ i; W_ res; /* Doesn't work with max==0; but then, many things don't work in this */ /* special case. */ for (i=1, res = arr[0]; ires) ? arr[i] : res; return (res); } /* Routines working on spark queues. It would be a good idea to make that an ADT! */ I_ spark_queue_len(PROC proc, I_ pool) { sparkq prev, spark; /* prev only for testing !! */ I_ len; for (len = 0, prev = NULL, spark = PendingSparksHd[proc][pool]; spark != NULL; len++, prev = spark, spark = SPARK_NEXT(spark)) {} # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.debug & 0x1000 ) if ( (prev!=NULL) && (prev!=PendingSparksTl[proc][pool]) ) fprintf(stderr,"ERROR in spark_queue_len: (PE %u, pool %u) PendingSparksTl (%#lx) not end of queue (%#lx)\n", proc, pool, PendingSparksTl[proc][pool], prev); # endif return (len); } sparkq delete_from_spark_queue (prev,spark) /* unlink and dispose spark */ sparkq prev, spark; { /* Global Vars: CurrentProc, SparkQueueHd, SparkQueueTl */ sparkq tmp; # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.debug & 0x10000 ) { fprintf(stderr,"** |%#x:%#x| prev=%#x->(%#x), (%#x)<-spark=%#x->(%#x) <-(%#x)\n", SparkQueueHd, SparkQueueTl, prev, (prev==NULL ? 0 : SPARK_NEXT(prev)), SPARK_PREV(spark), spark, SPARK_NEXT(spark), (SPARK_NEXT(spark)==NULL ? 0 : SPARK_PREV(SPARK_NEXT(spark)))); } # endif tmp = SPARK_NEXT(spark); if (prev==NULL) { SparkQueueHd = SPARK_NEXT(spark); } else { SPARK_NEXT(prev) = SPARK_NEXT(spark); } if (SPARK_NEXT(spark)==NULL) { SparkQueueTl = prev; } else { SPARK_PREV(SPARK_NEXT(spark)) = prev; } if(SparkQueueHd == NULL) SparkQueueTl = NULL; SPARK_NEXT(spark) = NULL; DisposeSpark(spark); spark = tmp; # if defined(GRAN_CHECK) if ( RTSflags.GranFlags.debug & 0x10000 ) { fprintf(stderr,"## prev=%#x->(%#x)\n", prev, (prev==NULL ? 0 : SPARK_NEXT(prev))); } # endif return (tmp); } #if 0 /* NB: These functions have been replaced by functions: EvacuateEvents, EvacuateSparks, (in ../storage/SMcopying.lc) LinkEvents, LinkSparks (in ../storage/SMcompacting.lc) Thus, GrAnSim does not need additional entries in the list of roots any more. */ I_ SaveEventRoots(num_ptr_roots) I_ num_ptr_roots; { eventq event = EventHd; while(event != NULL) { if(EVENT_TYPE(event) == RESUMETHREAD || EVENT_TYPE(event) == MOVETHREAD || EVENT_TYPE(event) == CONTINUETHREAD || /* EVENT_TYPE(event) >= CONTINUETHREAD1 || */ EVENT_TYPE(event) == STARTTHREAD ) StorageMgrInfo.roots[num_ptr_roots++] = EVENT_TSO(event); else if(EVENT_TYPE(event) == MOVESPARK) StorageMgrInfo.roots[num_ptr_roots++] = SPARK_NODE(EVENT_SPARK(event)); else if (EVENT_TYPE(event) == FETCHNODE || EVENT_TYPE(event) == FETCHREPLY ) { StorageMgrInfo.roots[num_ptr_roots++] = EVENT_TSO(event); /* In the case of packet fetching, EVENT_NODE(event) points to */ /* the packet (currently, malloced). The packet is just a list of */ /* closure addresses, with the length of the list at index 1 (the */ /* structure of the packet is defined in Pack.lc). */ if ( RTSflags.GranFlags.DoGUMMFetching && (EVENT_TYPE(event)==FETCHREPLY)) { P_ buffer = (P_) EVENT_NODE(event); int size = (int) buffer[PACK_SIZE_LOCN], i; for (i = PACK_HDR_SIZE; i <= size-1; i++) { StorageMgrInfo.roots[num_ptr_roots++] = (P_) buffer[i]; } } else StorageMgrInfo.roots[num_ptr_roots++] = EVENT_NODE(event); } else if (EVENT_TYPE(event) == GLOBALBLOCK) { StorageMgrInfo.roots[num_ptr_roots++] = EVENT_TSO(event); StorageMgrInfo.roots[num_ptr_roots++] = EVENT_NODE(event); } else if (EVENT_TYPE(event) == UNBLOCKTHREAD) { StorageMgrInfo.roots[num_ptr_roots++] = EVENT_TSO(event); } event = EVENT_NEXT(event); } return(num_ptr_roots); } #if defined(DEPTH_FIRST_PRUNING) /* Is it worthwhile keeping the depth-first pruning code !? -- HWL */ I_ SaveSparkRoots(num_ptr_roots) I_ num_ptr_roots; { sparkq spark, /* prev, */ disposeQ=NULL; PROC proc; I_ i, sparkroots=0, prunedSparks=0; I_ tot_sparks[MAX_PROC], tot = 0;; for(proc = 0; proc < RTSflags.GranFlags.proc; ++proc) { tot_sparks[proc] = 0; for(i = 0; i < SPARK_POOLS; ++i) { for(/* prev = &PendingSparksHd[proc][i],*/ spark = PendingSparksHd[proc][i]; spark != NULL; /* prev = &SPARK_NEXT(spark), */ spark = SPARK_NEXT(spark)) { if(++sparkroots <= MAX_SPARKS) { if ( RTSflags.GcFlags.giveStats ) if (i==ADVISORY_POOL) { tot_sparks[proc]++; tot++; } StorageMgrInfo.roots[num_ptr_roots++] = SPARK_NODE(spark); } else { SPARK_NODE(spark) = Prelude_Z91Z93_closure; if (prunedSparks==0) { disposeQ = spark; /* *prev = NULL; */ } prunedSparks++; } } /* forall spark ... */ if ( (RTSflags.GcFlags.giveStats) && (prunedSparks>0) ) { fprintf(RTSflags.GcFlags.statsFile,"Pruning and disposing %lu excess sparks (> %lu) on proc %d for GC purposes\n", prunedSparks,MAX_SPARKS,proc); if (disposeQ == PendingSparksHd[proc][i]) PendingSparksHd[proc][i] = NULL; else SPARK_NEXT(SPARK_PREV(disposeQ)) = NULL; DisposeSparkQ(disposeQ); prunedSparks = 0; disposeQ = NULL; } } /* forall i ... */ } /*forall proc .. */ if ( RTSflags.GcFlags.giveStats ) { fprintf(RTSflags.GcFlags.statsFile, "Spark statistics (after pruning) (total sparks = %d):",tot); for (proc=0; proc "); fprintf(RTSflags.GcFlags.statsFile,"\tPE %d: %d ",proc,tot_sparks[proc]); } fprintf(RTSflags.GcFlags.statsFile,".\n"); } return(num_ptr_roots); } #else /* !DEPTH_FIRST_PRUNING */ /* In case of an excessive number of sparks, depth first pruning is a Bad */ /* Idea as we might end up with all remaining sparks on processor 0 and */ /* none on the other processors. So, this version uses breadth first */ /* pruning. -- HWL */ I_ SaveSparkRoots(num_ptr_roots) I_ num_ptr_roots; { sparkq spark, curr_spark[MAX_PROC][SPARK_POOLS]; PROC proc; W_ allProcs = 0, endQueues[SPARK_POOLS], finishedQueues[SPARK_POOLS]; I_ i, sparkroots=0, prunedSparks[MAX_PROC][SPARK_POOLS]; I_ tot_sparks[MAX_PROC], tot = 0;; # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.debug & 0x40 ) fprintf(stderr,"D> Saving spark roots for GC ...\n"); # endif /* Init */ for(proc = 0; proc < RTSflags.GranFlags.proc; ++proc) { allProcs |= PE_NUMBER(proc); tot_sparks[proc] = 0; for(i = 0; i < SPARK_POOLS; ++i) { curr_spark[proc][i] = PendingSparksHd[proc][i]; prunedSparks[proc][i] = 0; endQueues[i] = 0; finishedQueues[i] = 0; } } /* Breadth first pruning */ do { for(proc = 0; proc < RTSflags.GranFlags.proc; ++proc) { for(i = 0; i < SPARK_POOLS; ++i) { spark = curr_spark[proc][i]; if ( spark != NULL ) { if(++sparkroots <= MAX_SPARKS) { # if defined(GRAN_CHECK) && defined(GRAN) if ( (RTSflags.GranFlags.debug & 0x1000) && (RTSflags.GcFlags.giveStats) ) fprintf(RTSflags.GcFlags.statsFile,"Saving Spark Root %d(proc: %d; pool: %d): 0x%lx \t(info ptr=%#lx)\n", num_ptr_roots,proc,i,SPARK_NODE(spark), INFO_PTR(SPARK_NODE(spark))); # endif if ( RTSflags.GcFlags.giveStats ) if (i==ADVISORY_POOL) { tot_sparks[proc]++; tot++; } StorageMgrInfo.roots[num_ptr_roots++] = SPARK_NODE(spark); curr_spark[proc][i] = spark = SPARK_NEXT(spark); } else /* sparkroots > MAX_SPARKS */ { if (curr_spark[proc][i] == PendingSparksHd[proc][i]) PendingSparksHd[proc][i] = NULL; else SPARK_NEXT(SPARK_PREV(curr_spark[proc][i])) = NULL; PendingSparksTl[proc][i] = SPARK_PREV(curr_spark[proc][i]); endQueues[i] |= PE_NUMBER(proc); } } else { /* spark == NULL ; actually, this only has to be done once */ endQueues[i] |= PE_NUMBER(proc); } } } } while (arr_and(endQueues,SPARK_POOLS) != allProcs); /* The buffer for spark roots in StorageMgrInfo.roots is full */ /* now. Prune all sparks on all processor starting with */ /* curr_spark[proc][i]. */ do { for(proc = 0; proc < RTSflags.GranFlags.proc; ++proc) { for(i = 0; i < SPARK_POOLS; ++i) { spark = curr_spark[proc][i]; if ( spark != NULL ) { SPARK_NODE(spark) = Prelude_Z91Z93_closure; curr_spark[proc][i] = SPARK_NEXT(spark); prunedSparks[proc][i]++; DisposeSpark(spark); } else { finishedQueues[i] |= PE_NUMBER(proc); } } } } while (arr_and(finishedQueues,SPARK_POOLS) != allProcs); # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.debug & 0x1000) { for(proc = 0; proc < RTSflags.GranFlags.proc; ++proc) { for(i = 0; i < SPARK_POOLS; ++i) { if ( (RTSflags.GcFlags.giveStats) && (prunedSparks[proc][i]>0)) { fprintf(RTSflags.GcFlags.statsFile, "Discarding %lu sparks on proc %d (pool %d) for GC purposes\n", prunedSparks[proc][i],proc,i); } } } if ( RTSflags.GcFlags.giveStats ) { fprintf(RTSflags.GcFlags.statsFile, "Spark statistics (after discarding) (total sparks = %d):",tot); for (proc=0; proc "); fprintf(RTSflags.GcFlags.statsFile, "\tPE %d: %d ",proc,tot_sparks[proc]); } fprintf(RTSflags.GcFlags.statsFile,".\n"); } } # endif return(num_ptr_roots); } #endif /* DEPTH_FIRST_PRUNING */ /* GC roots must be restored in *reverse order*. The recursion is a little ugly, but is better than in-place pointer reversal. */ static I_ RestoreEvtRoots(event,num_ptr_roots) eventq event; I_ num_ptr_roots; { if(event != NULL) { num_ptr_roots = RestoreEvtRoots(EVENT_NEXT(event),num_ptr_roots); if(EVENT_TYPE(event) == RESUMETHREAD || EVENT_TYPE(event) == MOVETHREAD || EVENT_TYPE(event) == CONTINUETHREAD || /* EVENT_TYPE(event) >= CONTINUETHREAD1 || */ EVENT_TYPE(event) == STARTTHREAD ) EVENT_TSO(event) = StorageMgrInfo.roots[--num_ptr_roots]; else if(EVENT_TYPE(event) == MOVESPARK ) SPARK_NODE(EVENT_SPARK(event)) = StorageMgrInfo.roots[--num_ptr_roots]; else if (EVENT_TYPE(event) == FETCHNODE || EVENT_TYPE(event) == FETCHREPLY ) { if ( RTSflags.GranFlags.DoGUMMFetching && (EVENT_TYPE(event)==FETCHREPLY)) { P_ buffer = (P_) EVENT_NODE(event); int size = (int) buffer[PACK_SIZE_LOCN], i; for (i = size-1; i >= PACK_HDR_SIZE; i--) { buffer[i] = StorageMgrInfo.roots[--num_ptr_roots]; } } else EVENT_NODE(event) = StorageMgrInfo.roots[--num_ptr_roots]; EVENT_TSO(event) = StorageMgrInfo.roots[--num_ptr_roots]; } else if (EVENT_TYPE(event) == GLOBALBLOCK) { EVENT_NODE(event) = StorageMgrInfo.roots[--num_ptr_roots]; EVENT_TSO(event) = StorageMgrInfo.roots[--num_ptr_roots]; } else if (EVENT_TYPE(event) == UNBLOCKTHREAD) { EVENT_TSO(event) = StorageMgrInfo.roots[--num_ptr_roots]; } } return(num_ptr_roots); } I_ RestoreEventRoots(num_ptr_roots) I_ num_ptr_roots; { return(RestoreEvtRoots(EventHd,num_ptr_roots)); } #if defined(DEPTH_FIRST_PRUNING) static I_ RestoreSpkRoots(spark,num_ptr_roots,sparkroots) sparkq spark; I_ num_ptr_roots, sparkroots; { if(spark != NULL) { num_ptr_roots = RestoreSpkRoots(SPARK_NEXT(spark),num_ptr_roots,++sparkroots); if(sparkroots <= MAX_SPARKS) { P_ n = SPARK_NODE(spark); SPARK_NODE(spark) = StorageMgrInfo.roots[--num_ptr_roots]; # if defined(GRAN_CHECK) && defined(GRAN) if ( RTSflags.GranFlags.debug & 0x40 ) fprintf(RTSflags.GcFlags.statsFile, "Restoring Spark Root %d: 0x%lx \t(info ptr=%#lx\n", num_ptr_roots,SPARK_NODE(spark), INFO_PTR(SPARK_NODE(spark))); # endif } # if defined(GRAN_CHECK) && defined(GRAN) else if ( RTSflags.GranFlags.debug & 0x40 ) fprintf(RTSflags.GcFlags.statsFile, "Error in RestoreSpkRoots (%d; @ spark %#lx): More than MAX_SPARKS (%d) sparks\n", num_ptr_roots,SPARK_NODE(spark),MAX_SPARKS); # endif } return(num_ptr_roots); } I_ RestoreSparkRoots(num_ptr_roots) I_ num_ptr_roots; { PROC proc; I_ i; #if defined(GRAN_JSM_SPARKS) fprintf(stderr,"Error: RestoreSparkRoots should be never be entered in a JSM style sparks set-up\n"); EXIT(EXIT_FAILURE); #endif /* NB: PROC is currently an unsigned datatype i.e. proc>=0 is always */ /* true ((PROC)-1 == (PROC)255). So we need a second clause in the head */ /* of the for loop. For i that is currently not necessary. C is really */ /* impressive in datatype abstraction! -- HWL */ for(proc = RTSflags.GranFlags.proc - 1; (proc >= 0) && (proc < RTSflags.GranFlags.proc); --proc) { for(i = SPARK_POOLS - 1; (i >= 0) && (i < SPARK_POOLS) ; --i) { num_ptr_roots = RestoreSpkRoots(PendingSparksHd[proc][i],num_ptr_roots,0); } } return(num_ptr_roots); } #else /* !DEPTH_FIRST_PRUNING */ I_ RestoreSparkRoots(num_ptr_roots) I_ num_ptr_roots; { sparkq spark, curr_spark[MAX_PROC][SPARK_POOLS]; PROC proc; I_ i, max_len, len, pool, count, queue_len[MAX_PROC][SPARK_POOLS]; /* NB: PROC is currently an unsigned datatype i.e. proc>=0 is always */ /* true ((PROC)-1 == (PROC)255). So we need a second clause in the head */ /* of the for loop. For i that is currently not necessary. C is really */ /* impressive in datatype abstraction! -- HWL */ max_len=0; for (proc=0; proc < RTSflags.GranFlags.proc; proc++) { for (i=0; imax_len) ? queue_len[proc][i] : max_len; } } for (len=max_len; len > 0; len--){ for(proc = RTSflags.GranFlags.proc - 1; (proc >= 0) && (proc < RTSflags.GranFlags.proc); --proc) { for(i = SPARK_POOLS - 1; (i >= 0) && (i < SPARK_POOLS) ; --i) { if (queue_len[proc][i]>=len) { spark = curr_spark[proc][i]; SPARK_NODE(spark) = StorageMgrInfo.roots[--num_ptr_roots]; # if defined(GRAN_CHECK) && defined(GRAN) count++; if ( (RTSflags.GranFlags.debug & 0x1000) && (RTSflags.GcFlags.giveStats) ) fprintf(RTSflags.GcFlags.statsFile, "Restoring Spark Root %d (PE %u, pool %u): 0x%lx \t(info ptr=%#lx)\n", num_ptr_roots,proc,i,SPARK_NODE(spark), INFO_PTR(SPARK_NODE(spark))); # endif curr_spark[proc][i] = SPARK_PREV(spark); /* num_ptr_roots = RestoreSpkRoots(PendingSparksHd[proc][i], num_ptr_roots,0); */ } } } } # if defined(GRAN_CHECK) && defined(GRAN) if ( (RTSflags.GranFlags.debug & 0x1000) && (RTSflags.GcFlags.giveStats) ) fprintf(RTSflags.GcFlags.statsFile,"Number of restored spark roots: %d\n", count); # endif return(num_ptr_roots); } #endif /* DEPTH_FIRST_PRUNING */ #endif /* 0 */ #endif /* GRAN */ #endif /* CONCURRENT */ /* the whole module! */ \end{code}