2 Time-stamp: <Tue Mar 06 2001 00:17:42 Stardate: [-30]6285.06 hwloidl>
3 $Id: GranSim.c,v 1.5 2001/07/23 17:23:20 simonmar Exp $
5 Variables and functions specific to GranSim the parallelism simulator
9 //@node GranSim specific code, , ,
10 //@section GranSim specific code
13 Macros for dealing with the new and improved GA field for simulating
14 parallel execution. Based on @CONCURRENT@ package. The GA field now
15 contains a mask, where the n-th bit stands for the n-th processor, where
16 this data can be found. In case of multiple copies, several bits are
17 set. The total number of processors is bounded by @MAX_PROC@, which
18 should be <= the length of a word in bits. -- HWL
23 //* Prototypes and externs::
24 //* Constants and Variables::
26 //* Global Address Operations::
27 //* Global Event Queue::
28 //* Spark queue functions::
29 //* Scheduling functions::
30 //* Thread Queue routines::
31 //* GranSim functions::
32 //* GranSimLight routines::
33 //* Code for Fetching Nodes::
35 //* Routines directly called from Haskell world::
36 //* Emiting profiling info for GrAnSim::
37 //* Dumping routines::
41 //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
42 //@subsection Includes
47 #include "StgMiscClosures.h"
50 #include "SchedAPI.h" // for pushClosure
51 #include "GranSimRts.h"
53 #include "ParallelRts.h"
54 #include "ParallelDebug.h"
56 #include "Storage.h" // for recordMutable
59 //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
60 //@subsection Prototypes and externs
65 static inline PEs ga_to_proc(StgWord);
66 static inline rtsBool any_idle(void);
67 static inline nat idlers(void);
68 PEs where_is(StgClosure *node);
70 static rtsBool stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
71 static rtsBool stealSpark(PEs proc);
72 static rtsBool stealThread(PEs proc);
73 static rtsBool stealSparkMagic(PEs proc);
74 static rtsBool stealThreadMagic(PEs proc);
75 /* subsumed by stealSomething
76 static void stealThread(PEs proc);
77 static void stealSpark(PEs proc);
79 static rtsTime sparkStealTime(void);
80 static nat natRandom(nat from, nat to);
81 static PEs findRandomPE(PEs proc);
82 static void sortPEsByTime (PEs proc, PEs *pes_by_time,
83 nat *firstp, nat *np);
89 //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
90 //@subsection Constants and Variables
92 #if defined(GRAN) || defined(PAR)
93 /* See GranSim.h for the definition of the enum gran_event_types */
94 char *gran_event_names[] = {
96 "STEALING", "STOLEN", "STOLEN(Q)",
97 "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
98 "SCHEDULE", "DESCHEDULE",
100 "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
103 "SYSTEM_START", "SYSTEM_END", /* only for debugging */
108 #if defined(GRAN) /* whole file */
109 char *proc_status_names[] = {
110 "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy",
114 /* For internal use (event statistics) only */
115 char *event_names[] =
116 { "ContinueThread", "StartThread", "ResumeThread",
117 "MoveSpark", "MoveThread", "FindWork",
118 "FetchNode", "FetchReply",
119 "GlobalBlock", "UnblockThread"
122 //@cindex CurrentProc
126 ToDo: Create a structure for the processor status and put all the
127 arrays below into it.
130 //@cindex CurrentTime
131 /* One clock for each PE */
132 rtsTime CurrentTime[MAX_PROC];
134 /* Useful to restrict communication; cf fishing model in GUM */
135 nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
137 /* Status of each PE (new since but independent of GranSim Light) */
138 rtsProcStatus procStatus[MAX_PROC];
140 # if defined(GRAN) && defined(GRAN_CHECK)
141 /* To check if the RTS ever tries to run a thread that should be blocked
142 because of fetching remote data */
143 StgTSO *BlockedOnFetch[MAX_PROC];
144 # define FETCH_MASK_TSO 0x08000000 /* only bits 0, 1, 2 should be used */
147 nat SparksAvail = 0; /* How many sparks are available */
148 nat SurplusThreads = 0; /* How many excess threads are there */
150 /* Do we need to reschedule following a fetch? */
151 rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse;
152 rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
154 //@cindex spark queue
155 /* GranSim: a globally visible array of spark queues */
156 rtsSparkQ pending_sparks_hds[MAX_PROC];
157 rtsSparkQ pending_sparks_tls[MAX_PROC];
159 nat sparksIgnored = 0, sparksCreated = 0;
161 GlobalGranStats globalGranStats;
163 nat gran_arith_cost, gran_branch_cost, gran_load_cost,
164 gran_store_cost, gran_float_cost;
167 Old comment from 0.29. ToDo: Check and update -- HWL
169 The following variables control the behaviour of GrAnSim. In general, there
170 is one RTS option for enabling each of these features. In getting the
171 desired setup of GranSim the following questions have to be answered:
173 \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)?
174 Currently only unfair scheduling is supported.
175 \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)?
176 Either block and wait for the
177 data or reschedule and do some other work.
178 Thus, if this variable is true, asynchronous communication is
179 modelled. Block on fetch mainly makes sense for incremental fetching.
181 There is also a simplified fetch variant available
182 (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
183 communication. It is faster but the results will be less accurate.
184 \item How aggressive to be in getting work after a reschedule on fetch
185 (@RtsFlags.GranFlags.FetchStrategy@)?
186 This is determined by the so-called {\em fetching
187 strategy\/}. Currently, there are four possibilities:
189 \item Only run a runnable thread.
190 \item Turn a spark into a thread, if necessary.
191 \item Steal a remote spark, if necessary.
192 \item Steal a runnable thread from another processor, if necessary.
194 The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
195 when rescheduling on a fetch.
196 \item Should sparks or threads be stolen first when looking for work
197 (@RtsFlags.GranFlags.DoStealThreadsFirst@)?
198 The default is to steal sparks first (much cheaper).
199 \item Should the RTS use a lazy thread creation scheme
200 (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)? By default yes i.e.\ sparks are only
201 turned into threads when work is needed. Also note, that sparks
202 can be discarded by the RTS (this is done in the case of an overflow
203 of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@ to @True@ forces
204 the creation of threads at the next possibility (i.e.\ when new work
205 is demanded the next time).
206 \item Should data be fetched closure-by-closure or in packets
207 (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental
208 (i.e.\ closure-by-closure) strategy. This makes sense in a
209 low-latency setting but is bad in a high-latency system. Setting
210 @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
211 parameters determine the size of the packets (@pack_buffer_size@) and the number of
212 thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
213 \item If there is no other possibility to find work, should runnable threads
214 be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
215 RTS tried to get sparks (either local or remote ones) first. Thread
216 migration is very expensive, since a whole TSO has to be transferred
217 and probably data locality becomes worse in the process. Note, that
218 the closure, which will be evaluated next by that TSO is not
219 transferred together with the TSO (that might block another thread).
220 \item Should the RTS distinguish between sparks created by local nodes and
221 stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)? The idea is to improve
222 data locality by preferring sparks of local nodes (it is more likely
223 that the data for those sparks is already on the local processor).
224 However, such a distinction also imposes an overhead on the spark
225 queue management, and typically a large number of sparks are
226 generated during execution. By default this variable is set to @False@.
227 \item Should the RTS use granularity control mechanisms? The idea of a
228 granularity control mechanism is to make use of granularity
229 information provided via annotation of the @par@ construct in order
230 to prefer bigger threads when either turning a spark into a thread or
231 when choosing the next thread to schedule. Currently, three such
232 mechanisms are implemented:
234 \item Cut-off: The granularity information is interpreted as a
235 priority. If a threshold priority is given to the RTS, then
236 only those sparks with a higher priority than the threshold
237 are actually created. Other sparks are immediately discarded.
238 This is similar to a usual cut-off mechanism often used in
239 parallel programs, where parallelism is only created if the
240 input data is lage enough. With this option, the choice is
241 hidden in the RTS and only the threshold value has to be
242 provided as a parameter to the runtime system.
243 \item Priority Sparking: This mechanism keeps priorities for sparks
244 and chooses the spark with the highest priority when turning
245 a spark into a thread. After that the priority information is
246 discarded. The overhead of this mechanism comes from
247 maintaining a sorted spark queue.
248 \item Priority Scheduling: This mechanism keeps the granularity
249 information for threads, to. Thus, on each reschedule the
250 largest thread is chosen. This mechanism has a higher
251 overhead, as the thread queue is sorted, too.
256 //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
257 //@subsection Initialisation
260 init_gr_stats (void) {
261 memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
264 globalGranStats.noOfEvents = 0;
265 for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
267 /* communication stats */
268 globalGranStats.fetch_misses = 0;
269 globalGranStats.tot_low_pri_sparks = 0;
272 globalGranStats.rs_sp_count = 0;
273 globalGranStats.rs_t_count = 0;
274 globalGranStats.ntimes_total = 0,
275 globalGranStats.fl_total = 0;
276 globalGranStats.no_of_steals = 0;
278 /* spark queue stats */
279 globalGranStats.tot_sq_len = 0,
280 globalGranStats.tot_sq_probes = 0;
281 globalGranStats.tot_sparks = 0;
282 globalGranStats.withered_sparks = 0;
283 globalGranStats.tot_add_threads = 0;
284 globalGranStats.tot_tq_len = 0;
285 globalGranStats.non_end_add_threads = 0;
288 globalGranStats.tot_threads_created = 0;
289 for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
293 //@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
294 //@subsection Global Address Operations
296 ----------------------------------------------------------------------
297 Global Address Operations
299 These functions perform operations on the global-address (ga) part of a
300 closure. The ga is the only new field (1 word) in a closure introduced by
301 GrAnSim. It serves as a bitmask, indicating on which processor the
302 closure is residing. Since threads are described by Thread State Object
303 (TSO), which is nothing but another kind of closure, this scheme allows
304 gives placement information about threads.
306 A ga is just a bitmask, so the operations on them are mainly bitmask
307 manipulating functions. Note, that there are important macros like PROCS,
308 IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
310 NOTE: In GrAnSim-light we don't maintain placement information. This
311 allows to simulate an arbitrary number of processors. The price we have
312 to be is the lack of costing any communication properly. In short,
313 GrAnSim-light is meant to reveal the maximal parallelism in a program.
314 From an implementation point of view the important thing is: {\em
315 GrAnSim-light does not maintain global-addresses}. */
317 /* ga_to_proc returns the first processor marked in the bitmask ga.
318 Normally only one bit in ga should be set. But for PLCs all bits
319 are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
324 ga_to_proc(StgWord ga)
327 for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
328 ASSERT(i<RtsFlags.GranFlags.proc);
332 /* NB: This takes a *node* rather than just a ga as input */
335 where_is(StgClosure *node)
336 { return (ga_to_proc(PROCS(node))); }
341 is_unique(StgClosure *node)
344 rtsBool unique = rtsFalse;
346 for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
347 if (IS_LOCAL_TO(PROCS(node), i))
348 if (unique) // exactly 1 instance found so far
349 return rtsFalse; // found a 2nd instance => not unique
351 unique = rtsTrue; // found 1st instance
352 ASSERT(unique); // otherwise returned from within loop
357 static inline rtsBool
358 any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
361 for(i=0, any_idle=rtsFalse;
362 !any_idle && i<RtsFlags.GranFlags.proc;
363 any_idle = any_idle || procStatus[i] == Idle, i++)
369 idlers(void) { /* number of idle PEs */
372 i<RtsFlags.GranFlags.proc;
373 j += (procStatus[i] == Idle) ? 1 : 0, i++)
378 //@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
379 //@subsection Global Event Queue
381 The following routines implement an ADT of an event-queue (FIFO).
382 ToDo: Put that in an own file(?)
385 /* Pointer to the global event queue; events are currently malloc'ed */
386 rtsEventQ EventHd = NULL;
388 //@cindex get_next_event
392 static rtsEventQ entry = NULL;
394 if (EventHd == NULL) {
395 barf("No next event. This may be caused by a circular data dependency in the program.");
401 if (RtsFlags.GranFlags.GranSimStats.Global) { /* count events */
402 globalGranStats.noOfEvents++;
403 globalGranStats.event_counts[EventHd->evttype]++;
408 IF_GRAN_DEBUG(event_trace,
411 EventHd = EventHd->next;
415 /* When getting the time of the next event we ignore CONTINUETHREAD events:
416 we don't want to be interrupted before the end of the current time slice
417 unless there is something important to handle.
419 //@cindex get_time_of_next_event
421 get_time_of_next_event(void)
423 rtsEventQ event = EventHd;
425 while (event != NULL && event->evttype==ContinueThread) {
429 return ((rtsTime) 0);
431 return (event->time);
434 /* ToDo: replace malloc/free with a free list */
435 //@cindex insert_event
437 insert_event(newentry)
440 rtsEventType evttype = newentry->evttype;
441 rtsEvent *event, **prev;
443 /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
445 /* Search the queue and insert at the right point:
446 FINDWORK before everything, CONTINUETHREAD after everything.
448 This ensures that we find any available work after all threads have
449 executed the current cycle. This level of detail would normally be
450 irrelevant, but matters for ridiculously low latencies...
453 /* Changed the ordering: Now FINDWORK comes after everything but
454 CONTINUETHREAD. This makes sure that a MOVESPARK comes before a
455 FINDWORK. This is important when a GranSimSparkAt happens and
456 DoAlwaysCreateThreads is turned on. Also important if a GC occurs
457 when trying to build a new thread (see much_spark) -- HWL 02/96 */
462 for (event = EventHd, prev=(rtsEvent**)&EventHd;
464 prev = (rtsEvent**)&(event->next), event = event->next) {
466 case FindWork: if ( event->time < newentry->time ||
467 ( (event->time == newentry->time) &&
468 (event->evttype != ContinueThread) ) )
472 case ContinueThread: if ( event->time <= newentry->time )
476 default: if ( event->time < newentry->time ||
477 ((event->time == newentry->time) &&
478 (event->evttype == newentry->evttype)) )
483 /* Insert newentry here (i.e. before event) */
485 newentry->next = event;
495 new_event(proc,creator,time,evttype,tso,node,spark)
498 rtsEventType evttype;
503 rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
505 newentry->proc = proc;
506 newentry->creator = creator;
507 newentry->time = time;
508 newentry->evttype = evttype;
510 newentry->node = node;
511 newentry->spark = spark;
512 newentry->gc_info = 0;
513 newentry->next = NULL;
515 insert_event(newentry);
518 fprintf(stderr, "GRAN: new_event: \n");
519 print_event(newentry));
522 //@cindex prepend_event
524 prepend_event(event) /* put event at beginning of EventQueue */
526 { /* only used for GC! */
527 event->next = EventHd;
533 grab_event(void) /* undo prepend_event i.e. get the event */
534 { /* at the head of EventQ but don't free anything */
535 rtsEventQ event = EventHd;
537 if (EventHd == NULL) {
538 barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
541 EventHd = EventHd->next;
545 //@cindex traverse_eventq_for_gc
547 traverse_eventq_for_gc(void)
549 rtsEventQ event = EventHd;
551 StgClosure *closurep;
553 StgPtr buffer, bufptr;
556 /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
557 orig closure (root of packed graph). This means that a graph, which is
558 between processors at the time of GC is fetched again at the time when
559 it would have arrived, had there been no GC. Slightly inaccurate but
561 This is only needed for GUM style fetchng. -- HWL */
562 if (!RtsFlags.GranFlags.DoBulkFetching)
565 for(event = EventHd; event!=NULL; event=event->next) {
566 if (event->evttype==FetchReply) {
567 buffer = stgCast(StgPtr,event->node);
568 ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG); /* It's a pack buffer */
569 bufsize = buffer[PACK_SIZE_LOCN];
570 closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
571 tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
573 creator = event->creator; /* similar to unpacking */
574 for (bufptr=buffer+PACK_HDR_SIZE;
575 bufptr<(buffer+bufsize);
577 // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
578 // (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
579 if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
580 convertFromRBH(stgCast(StgClosure *,bufptr));
584 event->evttype = FetchNode;
585 event->proc = creator;
586 event->creator = proc;
587 event->node = closurep;
597 StgClosure *MarkRoot(StgClosure *root); // prototype
599 rtsEventQ event = EventHd;
602 /* iterate over eventq and register relevant fields in event as roots */
603 for(event = EventHd, len = 0; event!=NULL; event=event->next, len++) {
604 switch (event->evttype) {
606 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
609 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
610 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
613 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
614 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
617 event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
620 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
625 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
626 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
629 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
630 if (RtsFlags.GranFlags.DoBulkFetching)
631 // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
632 belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
634 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
637 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
638 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
641 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
642 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
645 barf("markEventQueue: trying to mark unknown event @ %p", event);
648 belch("GC: markEventQueue: %d events in queue", len));
652 Prune all ContinueThread events related to tso or node in the eventq.
653 Currently used if a thread leaves STG land with ThreadBlocked status,
654 i.e. it blocked on a closure and has been put on its blocking queue. It
655 will be reawakended via a call to awakenBlockedQueue. Until then no
656 event effecting this tso should appear in the eventq. A bit of a hack,
657 because ideally we shouldn't generate such spurious ContinueThread events
660 //@cindex prune_eventq
662 prune_eventq(tso, node)
665 { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
667 /* node unused for now */
669 /* tso must be valid, then */
670 ASSERT(tso!=END_TSO_QUEUE);
671 while (event != NULL) {
672 if (event->evttype==ContinueThread &&
674 IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
675 belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
676 event->tso->id, event->tso, event->proc, event->time, event));
677 if (prev==(rtsEventQ)NULL) { // beginning of eventq
678 EventHd = event->next;
682 prev->next = event->next;
686 } else { // no pruning necessary; go to next event
693 //@cindex print_event
698 char str_tso[16], str_node[16];
701 if (event->tso==END_TSO_QUEUE) {
702 strcpy(str_tso, "______");
705 sprintf(str_tso, "%p", event->tso);
706 tso_id = (event->tso==NULL) ? 0 : event->tso->id;
708 if (event->node==(StgClosure*)NULL) {
709 strcpy(str_node, "______");
711 sprintf(str_node, "%p", event->node);
713 // HWL: shouldn't be necessary; ToDo: nuke
718 fprintf(stderr,"Evt: NIL\n");
720 fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
721 event_names[event->evttype], event->evttype,
722 event->proc, event->creator, event->time,
723 tso_id, str_tso, str_node
724 /*, event->spark, event->next */ );
728 //@cindex print_eventq
735 fprintf(stderr,"Event Queue with root at %p:\n", hd);
736 for (x=hd; x!=NULL; x=x->next) {
742 Spark queue functions are now all in Sparks.c!!
744 //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
745 //@subsection Scheduling functions
748 These functions are variants of thread initialisation and therefore
749 related to initThread and friends in Schedule.c. However, they are
750 specific to a GranSim setup in storing more info in the TSO's statistics
751 buffer and sorting the thread queues etc.
755 A large portion of startThread deals with maintaining a sorted thread
756 queue, which is needed for the Priority Sparking option. Without that
757 complication the code boils down to FIFO handling.
759 //@cindex insertThread
761 insertThread(tso, proc)
765 StgTSO *prev = NULL, *next = NULL;
767 rtsBool found = rtsFalse;
769 ASSERT(CurrentProc==proc);
770 ASSERT(!is_on_queue(tso,proc));
771 /* Idle proc: put the thread on the run queue
772 same for pri spark and basic version */
773 if (run_queue_hds[proc] == END_TSO_QUEUE)
776 ASSERT((CurrentProc==MainProc &&
777 CurrentTime[MainProc]==0 &&
778 procStatus[MainProc]==Idle) ||
779 procStatus[proc]==Starting);
781 run_queue_hds[proc] = run_queue_tls[proc] = tso;
783 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
785 /* new_event of ContinueThread has been moved to do_the_startthread */
788 ASSERT(procStatus[proc]==Idle ||
789 procStatus[proc]==Fishing ||
790 procStatus[proc]==Starting);
791 procStatus[proc] = Busy;
796 if (RtsFlags.GranFlags.Light)
797 GranSimLight_insertThread(tso, proc);
799 /* Only for Pri Scheduling: find place where to insert tso into queue */
800 if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
801 /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
802 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
803 (next != END_TSO_QUEUE) &&
804 !(found = tso->gran.pri >= next->gran.pri);
805 prev = next, next = next->link, count++)
807 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
808 (prev==(StgTSO*)NULL || prev->link==next));
811 ASSERT(!found || next != END_TSO_QUEUE);
812 ASSERT(procStatus[proc]!=Idle);
815 /* found can only be rtsTrue if pri scheduling enabled */
816 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
817 if (RtsFlags.GranFlags.GranSimStats.Global)
818 globalGranStats.non_end_add_threads++;
819 /* Add tso to ThreadQueue between prev and next */
821 if ( next == (StgTSO*)END_TSO_QUEUE ) {
824 /* no back link for TSO chain */
827 if ( prev == (StgTSO*)END_TSO_QUEUE ) {
828 /* Never add TSO as first elem of thread queue; the first */
829 /* element should be the one that is currently running -- HWL */
831 belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
832 tso, tso->gran.pri, run_queue_hd, proc,
837 } else { /* !found */ /* or not pri sparking! */
838 /* Add TSO to the end of the thread queue on that processor */
839 run_queue_tls[proc]->link = tso;
840 run_queue_tls[proc] = tso;
842 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
843 CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
844 RtsFlags.GranFlags.Costs.threadqueuetime;
846 /* ToDo: check if this is still needed -- HWL
847 if (RtsFlags.GranFlags.DoThreadMigration)
850 if (RtsFlags.GranFlags.GranSimStats.Full &&
851 !(( event_type == GR_START || event_type == GR_STARTQ) &&
852 RtsFlags.GranFlags.labelling) )
853 DumpRawGranEvent(proc, creator, event_type+1, tso, node,
854 tso->gran.sparkname, spark_queue_len(proc));
857 # if defined(GRAN_CHECK)
858 /* Check if thread queue is sorted. Only for testing, really! HWL */
859 if ( RtsFlags.GranFlags.DoPriorityScheduling &&
860 (RtsFlags.GranFlags.Debug.sortedQ) ) {
861 rtsBool sorted = rtsTrue;
864 if (run_queue_hds[proc]==END_TSO_QUEUE ||
865 run_queue_hds[proc]->link==END_TSO_QUEUE) {
866 /* just 1 elem => ok */
868 /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
869 for (prev = run_queue_hds[proc]->link, next = prev->link;
870 (next != END_TSO_QUEUE) ;
871 prev = next, next = prev->link) {
872 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
873 (prev==(StgTSO*)NULL || prev->link==next));
875 (prev->gran.pri >= next->gran.pri);
879 fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
881 G_THREADQ(run_queue_hd,0x1);
888 insertThread, which is only used for GranSim Light, is similar to
889 startThread in that it adds a TSO to a thread queue. However, it assumes
890 that the thread queue is sorted by local clocks and it inserts the TSO at
891 the right place in the queue. Don't create any event, just insert.
893 //@cindex GranSimLight_insertThread
895 GranSimLight_insertThread(tso, proc)
901 rtsBool found = rtsFalse;
903 ASSERT(RtsFlags.GranFlags.Light);
905 /* In GrAnSim-Light we always have an idle `virtual' proc.
906 The semantics of the one-and-only thread queue is different here:
907 all threads in the queue are running (each on its own virtual processor);
908 the queue is only needed internally in the simulator to interleave the
909 reductions of the different processors.
910 The one-and-only thread queue is sorted by the local clocks of the TSOs.
912 ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
913 ASSERT(tso->link == END_TSO_QUEUE);
915 /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
916 if (RtsFlags.GranFlags.GranSimStats.Full &&
917 (RtsFlags.GranFlags.Debug.checkLight) &&
918 (run_queue_hd->link == END_TSO_QUEUE)) {
919 DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
920 run_queue_hds[proc], (StgClosure*)NULL,
921 tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
922 // resched = rtsTrue;
925 /* this routine should only be used in a GrAnSim Light setup */
926 /* && CurrentProc must be 0 in GrAnSim Light setup */
927 ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
929 /* Idle proc; same for pri spark and basic version */
930 if (run_queue_hd==END_TSO_QUEUE)
932 run_queue_hd = run_queue_tl = tso;
933 /* MAKE_BUSY(CurrentProc); */
937 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
938 (next != END_TSO_QUEUE) &&
939 !(found = (tso->gran.clock < next->gran.clock));
940 prev = next, next = next->link, count++)
942 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
943 (prev==(StgTSO*)NULL || prev->link==next));
946 /* found can only be rtsTrue if pri sparking enabled */
948 /* Add tso to ThreadQueue between prev and next */
950 if ( next == END_TSO_QUEUE ) {
951 run_queue_tls[proc] = tso;
953 /* no back link for TSO chain */
956 if ( prev == END_TSO_QUEUE ) {
957 run_queue_hds[proc] = tso;
961 } else { /* !found */ /* or not pri sparking! */
962 /* Add TSO to the end of the thread queue on that processor */
963 run_queue_tls[proc]->link = tso;
964 run_queue_tls[proc] = tso;
967 if ( prev == END_TSO_QUEUE ) { /* new head of queue */
968 new_event(proc, proc, CurrentTime[proc],
970 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
973 if (RtsFlags.GranFlags.GranSimStats.Full &&
974 !(( event_type == GR_START || event_type == GR_STARTQ) &&
975 RtsFlags.GranFlags.labelling) )
976 DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
977 tso->gran.sparkname, spark_queue_len(proc));
983 endThread is responsible for general clean-up after the thread tso has
984 finished. This includes emitting statistics into the profile etc.
987 endThread(StgTSO *tso, PEs proc)
989 ASSERT(procStatus[proc]==Busy); // coming straight out of STG land
990 ASSERT(tso->what_next==ThreadComplete);
991 // ToDo: prune ContinueThreads for this TSO from event queue
992 DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
994 /* if this was the last thread on this PE then make it Idle */
995 if (run_queue_hds[proc]==END_TSO_QUEUE) {
996 procStatus[CurrentProc] = Idle;
1000 //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
1001 //@subsection Thread Queue routines
1004 Check whether given tso resides on the run queue of the current processor.
1005 Only used for debugging.
1008 //@cindex is_on_queue
1010 is_on_queue (StgTSO *tso, PEs proc)
1015 for (t=run_queue_hds[proc], found=rtsFalse;
1016 t!=END_TSO_QUEUE && !(found = t==tso);
1023 /* This routine is only used for keeping a statistics of thread queue
1024 lengths to evaluate the impact of priority scheduling. -- HWL
1025 {spark_queue_len}vo' jInIHta'
1027 //@cindex thread_queue_len
1029 thread_queue_len(PEs proc)
1031 StgTSO *prev, *next;
1034 for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
1035 next != END_TSO_QUEUE;
1036 len++, prev = next, next = prev->link)
1042 //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
1043 //@subsection GranSim functions
1045 /* ----------------------------------------------------------------- */
1046 /* The main event handling functions; called from Schedule.c (schedule) */
1047 /* ----------------------------------------------------------------- */
1049 //@cindex do_the_globalblock
1052 do_the_globalblock(rtsEvent* event)
1054 PEs proc = event->proc; /* proc that requested node */
1055 StgTSO *tso = event->tso; /* tso that requested node */
1056 StgClosure *node = event->node; /* requested, remote node */
1058 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
1059 /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
1060 ASSERT(!RtsFlags.GranFlags.Light);
1061 /* GlobalBlock events only valid with GUM fetching */
1062 ASSERT(RtsFlags.GranFlags.DoBulkFetching);
1064 IF_GRAN_DEBUG(bq, // globalBlock,
1065 if (IS_LOCAL_TO(PROCS(node),proc)) {
1066 belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
1067 tso->id, tso, node, proc);
1070 /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
1071 if ( blockFetch(tso,proc,node) != 0 )
1072 return; /* node has become local by now */
1075 ToDo: check whether anything has to be done at all after blockFetch -- HWL
1077 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
1078 StgTSO* tso = run_queue_hds[proc]; /* awaken next thread */
1079 if (tso != (StgTSO*)NULL) {
1080 new_event(proc, proc, CurrentTime[proc],
1082 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
1083 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
1084 if (RtsFlags.GranFlags.GranSimStats.Full)
1085 DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
1086 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc)); // ToDo: check sparkname and spar_queue_len
1087 procStatus[proc] = Busy; /* might have been fetching */
1089 procStatus[proc] = Idle; /* no work on proc now */
1091 } else { /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
1092 /* other thread is already running */
1093 /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL
1094 new_event(proc,proc,CurrentTime[proc],
1095 CONTINUETHREAD,EVENT_TSO(event),
1096 (RtsFlags.GranFlags.DoBulkFetching ? closure :
1097 EVENT_NODE(event)),NULL);
1103 //@cindex do_the_unblock
1106 do_the_unblock(rtsEvent* event)
1108 PEs proc = event->proc, /* proc that requested node */
1109 creator = event->creator; /* proc that requested node */
1110 StgTSO* tso = event->tso; /* tso that requested node */
1111 StgClosure* node = event->node; /* requested, remote node */
1113 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
1114 /* There should be no UNBLOCKs in GrAnSim Light setup */
1115 ASSERT(!RtsFlags.GranFlags.Light);
1116 /* UnblockThread means either FetchReply has arrived or
1117 a blocking queue has been awakened;
1118 ToDo: check with assertions
1119 ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
1121 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* block-on-fetch */
1122 /* We count block-on-fetch as normal block time */
1123 tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
1124 /* Dumping now done when processing the event
1125 No costs for contextswitch or thread queueing in this case
1126 if (RtsFlags.GranFlags.GranSimStats.Full)
1127 DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso,
1128 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
1130 /* Maybe do this in FetchReply already
1131 if (procStatus[proc]==Fetching)
1132 procStatus[proc] = Busy;
1135 new_event(proc, proc, CurrentTime[proc],
1137 tso, node, (rtsSpark*)NULL);
1140 /* Asynchr comm causes additional costs here: */
1141 /* Bring the TSO from the blocked queue into the threadq */
1143 /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
1144 new_event(proc, proc,
1145 CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
1147 tso, node, (rtsSpark*)NULL);
1150 //@cindex do_the_fetchnode
1153 do_the_fetchnode(rtsEvent* event)
1155 PEs proc = event->proc, /* proc that holds the requested node */
1156 creator = event->creator; /* proc that requested node */
1157 StgTSO* tso = event->tso;
1158 StgClosure* node = event->node; /* requested, remote node */
1159 rtsFetchReturnCode rc;
1161 ASSERT(CurrentProc==proc);
1162 /* There should be no FETCHNODEs in GrAnSim Light setup */
1163 ASSERT(!RtsFlags.GranFlags.Light);
1165 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
1167 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1169 /* ToDo: check whether this is the right place for dumping the event */
1170 if (RtsFlags.GranFlags.GranSimStats.Full)
1171 DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
1174 rc = handleFetchRequest(node, proc, creator, tso);
1175 if (rc == OutOfHeap) { /* trigger GC */
1176 # if defined(GRAN_CHECK) && defined(GRAN)
1177 if (RtsFlags.GcFlags.giveStats)
1178 fprintf(RtsFlags.GcFlags.statsFile,"***** veQ boSwI' PackNearbyGraph(node %p, tso %p (%d))\n",
1179 node, tso, tso->id);
1181 barf("//// do_the_fetchnode: out of heap after handleFetchRequest; ToDo: call GarbageCollect()");
1182 prepend_event(event);
1183 GarbageCollect(GetRoots, rtsFalse);
1184 // HWL: ToDo: check whether a ContinueThread has to be issued
1185 // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
1186 # if 0 && defined(GRAN_CHECK) && defined(GRAN)
1187 if (RtsFlags.GcFlags.giveStats) {
1188 fprintf(RtsFlags.GcFlags.statsFile,"***** SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
1189 Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED); ???
1190 fprintf(stderr,"***** No. of packets so far: %d (total size: %d)\n",
1191 globalGranStats.tot_packets, globalGranStats.tot_packet_size);
1194 event = grab_event();
1195 // Hp -= PACK_HEAP_REQUIRED; // ???
1197 /* GC knows that events are special and follows the pointer i.e. */
1198 /* events are valid even if they moved. An EXIT is triggered */
1199 /* if there is not enough heap after GC. */
1201 } while (rc == OutOfHeap);
1204 //@cindex do_the_fetchreply
1206 do_the_fetchreply(rtsEvent* event)
1208 PEs proc = event->proc, /* proc that requested node */
1209 creator = event->creator; /* proc that holds the requested node */
1210 StgTSO* tso = event->tso;
1211 StgClosure* node = event->node; /* requested, remote node */
1212 StgClosure* closure=(StgClosure*)NULL;
1214 ASSERT(CurrentProc==proc);
1215 ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
1217 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
1218 /* There should be no FETCHREPLYs in GrAnSim Light setup */
1219 ASSERT(!RtsFlags.GranFlags.Light);
1221 /* assign message unpack costs *before* dumping the event */
1222 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1224 /* ToDo: check whether this is the right place for dumping the event */
1225 if (RtsFlags.GranFlags.GranSimStats.Full)
1226 DumpRawGranEvent(proc, creator, GR_REPLY, tso, node,
1227 tso->gran.sparkname, spark_queue_len(proc));
1229 /* THIS SHOULD NEVER HAPPEN
1230 If tso is in the BQ of node this means that it actually entered the
1231 remote closure, due to a missing GranSimFetch at the beginning of the
1232 entry code; therefore, this is actually a faked fetch, triggered from
1233 within GranSimBlock;
1234 since tso is both in the EVQ and the BQ for node, we have to take it out
1235 of the BQ first before we can handle the FetchReply;
1236 ToDo: special cases in awakenBlockedQueue, since the BQ magically moved.
1238 if (tso->block_info.closure!=(StgClosure*)NULL) {
1240 belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
1241 tso->id, tso, node));
1242 // unlink_from_bq(tso, node);
1245 if (RtsFlags.GranFlags.DoBulkFetching) { /* bulk (packet) fetching */
1246 rtsPackBuffer *buffer = (rtsPackBuffer*)node;
1247 nat size = buffer->size;
1249 /* NB: Fetch misses can't occur with GUM fetching, as */
1250 /* updatable closure are turned into RBHs and therefore locked */
1251 /* for other processors that try to grab them. */
1253 closure = UnpackGraph(buffer);
1254 CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
1255 } else // incremental fetching
1256 /* Copy or move node to CurrentProc */
1257 if (fetchNode(node, creator, proc)) {
1258 /* Fetch has failed i.e. node has been grabbed by another PE */
1259 PEs p = where_is(node);
1262 if (RtsFlags.GranFlags.GranSimStats.Global)
1263 globalGranStats.fetch_misses++;
1265 IF_GRAN_DEBUG(thunkStealing,
1266 belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
1267 CurrentTime[proc],node,p,creator));
1269 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
1271 /* Count fetch again !? */
1272 ++(tso->gran.fetchcount);
1273 tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1275 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
1276 RtsFlags.GranFlags.Costs.latency;
1278 /* Chase the grabbed node */
1279 new_event(p, proc, fetchtime,
1281 tso, node, (rtsSpark*)NULL);
1283 # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
1284 IF_GRAN_DEBUG(blockOnFetch,
1285 BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
1287 IF_GRAN_DEBUG(blockOnFetch_sanity,
1288 tso->type |= FETCH_MASK_TSO;)
1291 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
1293 return; /* NB: no REPLy has been processed; tso still sleeping */
1296 /* -- Qapla'! Fetch has been successful; node is here, now */
1297 ++(event->tso->gran.fetchcount);
1298 event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1300 /* this is now done at the beginning of this routine
1301 if (RtsFlags.GranFlags.GranSimStats.Full)
1302 DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
1303 (RtsFlags.GranFlags.DoBulkFetching ?
1306 tso->gran.sparkname, spark_queue_len(proc));
1309 ASSERT(OutstandingFetches[proc] > 0);
1310 --OutstandingFetches[proc];
1311 new_event(proc, proc, CurrentTime[proc],
1313 event->tso, (RtsFlags.GranFlags.DoBulkFetching ?
1319 //@cindex do_the_movethread
1322 do_the_movethread(rtsEvent* event) {
1323 PEs proc = event->proc, /* proc that requested node */
1324 creator = event->creator; /* proc that holds the requested node */
1325 StgTSO* tso = event->tso;
1327 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
1329 ASSERT(CurrentProc==proc);
1330 /* There should be no MOVETHREADs in GrAnSim Light setup */
1331 ASSERT(!RtsFlags.GranFlags.Light);
1332 /* MOVETHREAD events should never occur without -bM */
1333 ASSERT(RtsFlags.GranFlags.DoThreadMigration);
1334 /* Bitmask of moved thread should be 0 */
1335 ASSERT(PROCS(tso)==0);
1336 ASSERT(procStatus[proc] == Fishing ||
1337 RtsFlags.GranFlags.DoAsyncFetch);
1338 ASSERT(OutstandingFishes[proc]>0);
1340 /* ToDo: exact costs for unpacking the whole TSO */
1341 CurrentTime[proc] += 5l * RtsFlags.GranFlags.Costs.munpacktime;
1343 /* ToDo: check whether this is the right place for dumping the event */
1344 if (RtsFlags.GranFlags.GranSimStats.Full)
1345 DumpRawGranEvent(proc, creator,
1346 GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
1348 // ToDo: check cost functions
1349 --OutstandingFishes[proc];
1350 SET_GRAN_HDR(tso, ThisPE); // adjust the bitmask for the TSO
1351 insertThread(tso, proc);
1353 if (procStatus[proc]==Fishing)
1354 procStatus[proc] = Idle;
1356 if (RtsFlags.GranFlags.GranSimStats.Global)
1357 globalGranStats.tot_TSOs_migrated++;
1360 //@cindex do_the_movespark
1363 do_the_movespark(rtsEvent* event) {
1364 PEs proc = event->proc, /* proc that requested spark */
1365 creator = event->creator; /* proc that holds the requested spark */
1366 StgTSO* tso = event->tso;
1367 rtsSparkQ spark = event->spark;
1369 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
1371 ASSERT(CurrentProc==proc);
1372 ASSERT(spark!=NULL);
1373 ASSERT(procStatus[proc] == Fishing ||
1374 RtsFlags.GranFlags.DoAsyncFetch);
1375 ASSERT(OutstandingFishes[proc]>0);
1377 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1379 /* record movement of spark only if spark profiling is turned on */
1380 if (RtsFlags.GranFlags.GranSimStats.Sparks)
1381 DumpRawGranEvent(proc, creator,
1383 tso, spark->node, spark->name, spark_queue_len(proc));
1385 /* global statistics */
1386 if ( RtsFlags.GranFlags.GranSimStats.Global &&
1387 !closure_SHOULD_SPARK(spark->node))
1388 globalGranStats.withered_sparks++;
1389 /* Not adding the spark to the spark queue would be the right */
1390 /* thing here, but it also would be cheating, as this info can't be */
1391 /* available in a real system. -- HWL */
1393 --OutstandingFishes[proc];
1395 add_to_spark_queue(spark);
1397 IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
1398 print_sparkq_stats());
1400 /* Should we treat stolen sparks specially? Currently, we don't. */
1402 if (procStatus[proc]==Fishing)
1403 procStatus[proc] = Idle;
1405 /* add_to_spark_queue will increase the time of the current proc. */
1407 If proc was fishing, it is Idle now with the new spark in its spark
1408 pool. This means that the next time handleIdlePEs is called, a local
1409 FindWork will be created on this PE to turn the spark into a thread. Of
1410 course another PE might steal the spark in the meantime (that's why we
1411 are using events rather than inlining all the operations in the first
1416 In the Constellation class version of GranSim the semantics of StarThread
1417 events has changed. Now, StartThread has to perform 3 basic operations:
1418 - create a new thread (previously this was done in ActivateSpark);
1419 - insert the thread into the run queue of the current processor
1420 - generate a new event for actually running the new thread
1421 Note that the insertThread is called via createThread.
1424 //@cindex do_the_startthread
1427 do_the_startthread(rtsEvent *event)
1429 PEs proc = event->proc; /* proc that requested node */
1430 StgTSO *tso = event->tso; /* tso that requested node */
1431 StgClosure *node = event->node; /* requested, remote node */
1432 rtsSpark *spark = event->spark;
1433 GranEventType gr_evttype;
1435 ASSERT(CurrentProc==proc);
1436 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1437 ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
1438 /* if this was called via StartThread: */
1439 ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
1440 // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
1441 /* if this was called via ResumeThread: */
1442 ASSERT(event->evttype!=ResumeThread ||
1443 RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc));
1445 /* startThread may have been called from the main event handler upon
1446 finding either a ResumeThread or a StartThread event; set the
1447 gr_evttype (needed for writing to .gr file) accordingly */
1448 // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
1450 if ( event->evttype == StartThread ) {
1451 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1452 GR_START : GR_STARTQ;
1454 tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
1455 pushClosure(tso, node);
1457 // ToDo: fwd info on local/global spark to thread -- HWL
1458 // tso->gran.exported = spark->exported;
1459 // tso->gran.locked = !spark->global;
1460 tso->gran.sparkname = spark->name;
1462 ASSERT(CurrentProc==proc);
1463 if (RtsFlags.GranFlags.GranSimStats.Full)
1464 DumpGranEvent(gr_evttype,tso);
1466 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
1467 } else { // event->evttype == ResumeThread
1468 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1469 GR_RESUME : GR_RESUMEQ;
1471 insertThread(tso, proc);
1473 ASSERT(CurrentProc==proc);
1474 if (RtsFlags.GranFlags.GranSimStats.Full)
1475 DumpGranEvent(gr_evttype,tso);
1478 ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
1479 procStatus[proc] = Busy;
1480 /* make sure that this thread is actually run */
1481 new_event(proc, proc,
1484 tso, node, (rtsSpark*)NULL);
1486 /* A wee bit of statistics gathering */
1487 if (RtsFlags.GranFlags.GranSimStats.Global) {
1488 globalGranStats.tot_add_threads++;
1489 globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
1494 //@cindex do_the_findwork
1496 do_the_findwork(rtsEvent* event)
1498 PEs proc = event->proc, /* proc to search for work */
1499 creator = event->creator; /* proc that requested work */
1500 rtsSparkQ spark = event->spark;
1501 /* ToDo: check that this size is safe -- HWL */
1503 ToDo: check available heap
1505 nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
1506 // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
1509 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
1511 /* If GUM style fishing is enabled, the contents of the spark field says
1512 what to steal (spark(1) or thread(2)); */
1513 ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
1515 /* Make sure that we have enough heap for creating a new
1516 thread. This is a conservative estimate of the required heap.
1517 This eliminates special checks for GC around NewThread within
1521 ToDo: check available heap
1523 if (Hp + req_heap > HpLim ) {
1525 belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
1526 GarbageCollect(GetRoots);
1527 // ReallyPerformThreadGC(req_heap, rtsFalse); old -- HWL
1529 if (procStatus[CurrentProc]==Sparking)
1530 procStatus[CurrentProc]=Idle;
1535 if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1536 RtsFlags.GranFlags.Fishing ||
1537 ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
1538 (RtsFlags.GranFlags.FetchStrategy >= 2 ||
1539 OutstandingFetches[proc] == 0)) )
1542 rtsSparkQ prev, spark;
1545 ASSERT(procStatus[proc]==Sparking ||
1546 RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1547 RtsFlags.GranFlags.Fishing);
1549 /* SImmoHwI' yInej! Search spark queue! */
1550 /* gimme_spark (event, &found, &spark); */
1551 findLocalSpark(event, &found, &spark);
1553 if (!found) { /* pagh vumwI' */
1555 If no spark has been found this can mean 2 things:
1556 1/ The FindWork was a fish (i.e. a message sent by another PE) and
1557 the spark pool of the receiver is empty
1558 --> the fish has to be forwarded to another PE
1559 2/ The FindWork was local to this PE (i.e. no communication; in this
1560 case creator==proc) and the spark pool of the PE is not empty
1561 contains only sparks of closures that should not be sparked
1562 (note: if the spark pool were empty, handleIdlePEs wouldn't have
1563 generated a FindWork in the first place)
1564 --> the PE has to be made idle to trigger stealing sparks the next
1565 time handleIdlePEs is performed
1568 ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
1569 if (creator==proc) {
1570 /* local FindWork */
1571 if (procStatus[proc]==Busy) {
1572 belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
1573 proc, CurrentTime[proc]);
1574 procStatus[proc] = Idle;
1577 /* global FindWork i.e. a Fish */
1578 ASSERT(RtsFlags.GranFlags.Fishing);
1579 /* actually this generates another request from the originating PE */
1580 ASSERT(OutstandingFishes[creator]>0);
1581 OutstandingFishes[creator]--;
1582 /* ToDo: assign costs for sending fish to proc not to creator */
1583 stealSpark(creator); /* might steal from same PE; ToDo: fix */
1584 ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
1585 /* any assertions on state of proc possible here? */
1588 /* DaH chu' Qu' yIchen! Now create new work! */
1589 IF_GRAN_DEBUG(findWork,
1590 belch("+- munching spark %p; creating thread for node %p",
1591 spark, spark->node));
1592 activateSpark (event, spark);
1593 ASSERT(spark != (rtsSpark*)NULL);
1594 spark = delete_from_sparkq (spark, proc, rtsTrue);
1597 IF_GRAN_DEBUG(findWork,
1598 belch("+- Contents of spark queues at the end of FindWork @ %lx",
1600 print_sparkq_stats());
1602 /* ToDo: check ; not valid if GC occurs in ActivateSpark */
1604 /* forward fish or */
1606 /* local spark or */
1607 (proc==creator && procStatus[proc]==Starting)) ||
1608 //(!found && procStatus[proc]==Idle) ||
1609 RtsFlags.GranFlags.DoAlwaysCreateThreads);
1611 IF_GRAN_DEBUG(findWork,
1612 belch("+- RTS refuses to findWork on PE %d @ %lx",
1613 proc, CurrentTime[proc]);
1614 belch(" procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d",
1615 proc, proc_status_names[procStatus[proc]],
1616 RtsFlags.GranFlags.FetchStrategy,
1617 proc, OutstandingFetches[proc]));
1621 //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
1622 //@subsection GranSimLight routines
1625 This code is called from the central scheduler after having rgabbed a
1626 new event and is only needed for GranSim-Light. It mainly adjusts the
1627 ActiveTSO so that all costs that have to be assigned from within the
1628 scheduler are assigned to the right TSO. The choice of ActiveTSO depends
1629 on the type of event that has been found.
1633 GranSimLight_enter_system(event, ActiveTSOp)
1635 StgTSO **ActiveTSOp;
1637 StgTSO *ActiveTSO = *ActiveTSOp;
1639 ASSERT (RtsFlags.GranFlags.Light);
1641 /* Restore local clock of the virtual processor attached to CurrentTSO.
1642 All costs will be associated to the `virt. proc' on which the tso
1644 if (ActiveTSO != NULL) { /* already in system area */
1645 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1646 if (RtsFlags.GranFlags.DoFairSchedule)
1648 if (RtsFlags.GranFlags.GranSimStats.Full &&
1649 RtsFlags.GranFlags.Debug.checkLight)
1650 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1653 switch (event->evttype)
1655 case ContinueThread:
1656 case FindWork: /* inaccurate this way */
1657 ActiveTSO = run_queue_hd;
1661 case MoveSpark: /* has tso of virt proc in tso field of event */
1662 ActiveTSO = event->tso;
1664 default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
1665 event_names[event->evttype],event->evttype);
1667 CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
1668 if (RtsFlags.GranFlags.DoFairSchedule) {
1669 if (RtsFlags.GranFlags.GranSimStats.Full &&
1670 RtsFlags.GranFlags.Debug.checkLight)
1671 DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
1676 GranSimLight_leave_system(event, ActiveTSOp)
1678 StgTSO **ActiveTSOp;
1680 StgTSO *ActiveTSO = *ActiveTSOp;
1682 ASSERT(RtsFlags.GranFlags.Light);
1684 /* Save time of `virt. proc' which was active since last getevent and
1685 restore time of `virt. proc' where CurrentTSO is living on. */
1686 if(RtsFlags.GranFlags.DoFairSchedule) {
1687 if (RtsFlags.GranFlags.GranSimStats.Full &&
1688 RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
1689 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1691 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1692 ActiveTSO = (StgTSO*)NULL;
1693 CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
1694 if (RtsFlags.GranFlags.DoFairSchedule /* && resched */ ) {
1695 // resched = rtsFalse;
1696 if (RtsFlags.GranFlags.GranSimStats.Full &&
1697 RtsFlags.GranFlags.Debug.checkLight)
1698 DumpGranEvent(GR_SCHEDULE,run_queue_hd);
1701 if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
1702 (TimeOfNextEvent == 0 ||
1703 TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
1704 new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
1705 CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
1706 TimeOfNextEvent = get_time_of_next_event();
1711 //@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
1712 //@subsection Code for Fetching Nodes
1715 The following GrAnSim routines simulate the fetching of nodes from a
1716 remote processor. We use a 1 word bitmask to indicate on which processor
1717 a node is lying. Thus, moving or copying a node from one processor to
1718 another just requires an appropriate change in this bitmask (using
1719 @SET_GA@). Additionally, the clocks have to be updated.
1721 A special case arises when the node that is needed by processor A has
1722 been moved from a processor B to a processor C between sending out a
1723 @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
1724 be forwarded to C. This is simulated by issuing another FetchNode event
1725 on processor C with A as creator.
1728 /* ngoqvam che' {GrAnSim}! */
1730 /* Fetch node "node" to processor "p" */
1735 fetchNode(node,from,to)
1739 /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be
1740 entered! Instead, UnpackGraph is used in ReSchedule */
1741 StgClosure* closure;
1743 ASSERT(to==CurrentProc);
1744 /* Should never be entered in GrAnSim Light setup */
1745 ASSERT(!RtsFlags.GranFlags.Light);
1746 /* fetchNode should never be entered with DoBulkFetching */
1747 ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
1749 /* Now fetch the node */
1750 if (!IS_LOCAL_TO(PROCS(node),from) &&
1751 !IS_LOCAL_TO(PROCS(node),to) )
1752 return NodeHasMoved;
1754 if (closure_HNF(node)) /* node already in head normal form? */
1755 node->header.gran.procs |= PE_NUMBER(to); /* Copy node */
1757 node->header.gran.procs = PE_NUMBER(to); /* Move node */
1763 Process a fetch request.
1765 Cost of sending a packet of size n = C + P*n
1766 where C = packet construction constant,
1767 P = cost of packing one word into a packet
1768 [Should also account for multiple packets].
1771 //@cindex handleFetchRequest
1774 handleFetchRequest(node,to,from,tso)
1775 StgClosure* node; // the node which is requested
1776 PEs to, from; // fetch request: from -> to
1777 StgTSO* tso; // the tso which needs the node
1779 ASSERT(!RtsFlags.GranFlags.Light);
1780 /* ToDo: check assertion */
1781 ASSERT(OutstandingFetches[from]>0);
1783 /* probably wrong place; */
1784 ASSERT(CurrentProc==to);
1786 if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
1788 IF_GRAN_DEBUG(thunkStealing,
1789 fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n",
1790 node, info_type(node), from));
1792 if (RtsFlags.GranFlags.DoBulkFetching) {
1794 rtsPackBuffer *graph;
1796 /* Create a 1-node-buffer and schedule a FETCHREPLY now */
1797 graph = PackOneNode(node, tso, &size);
1798 new_event(from, to, CurrentTime[to],
1800 tso, (StgClosure *)graph, (rtsSpark*)NULL);
1802 new_event(from, to, CurrentTime[to],
1804 tso, node, (rtsSpark*)NULL);
1806 IF_GRAN_DEBUG(thunkStealing,
1807 belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
1808 return (NodeIsLocal);
1810 else if (IS_LOCAL_TO(PROCS(node), to) ) /* Is node still here? */
1812 if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
1813 nat size; /* (code from GUM) */
1816 if (IS_BLACK_HOLE(node)) { /* block on BH or RBH */
1817 new_event(from, to, CurrentTime[to],
1819 tso, node, (rtsSpark*)NULL);
1820 /* Note: blockFetch is done when handling GLOBALBLOCK event;
1821 make sure the TSO stays out of the run queue */
1822 /* When this thread is reawoken it does the usual: it tries to
1823 enter the updated node and issues a fetch if it's remote.
1824 It has forgotten that it has sent a fetch already (i.e. a
1825 FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
1826 --OutstandingFetches[from];
1828 IF_GRAN_DEBUG(thunkStealing,
1829 belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ",
1831 if (RtsFlags.GranFlags.GranSimStats.Global) {
1832 globalGranStats.tot_FMBQs++;
1837 /* The tso requesting the node is blocked and cannot be on a run queue */
1838 ASSERT(!is_on_queue(tso, from));
1840 // ToDo: check whether graph is ever used as an rtsPackBuffer!!
1841 if ((graph = (StgClosure *)PackNearbyGraph(node, tso, &size, 0)) == NULL)
1842 return (OutOfHeap); /* out of heap */
1844 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1845 /* Send a reply to the originator */
1846 /* ToDo: Replace that by software costs for doing graph packing! */
1847 CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
1850 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1852 tso, (StgClosure *)graph, (rtsSpark*)NULL);
1854 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1856 } else { /* incremental (single closure) fetching */
1857 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1858 /* Send a reply to the originator */
1859 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1862 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1864 tso, node, (rtsSpark*)NULL);
1866 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1870 else /* Qu'vatlh! node has been grabbed by another proc => forward */
1872 PEs node_loc = where_is(node);
1875 IF_GRAN_DEBUG(thunkStealing,
1876 belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
1877 node,node_loc,to,from,CurrentTime[to]));
1878 if (RtsFlags.GranFlags.GranSimStats.Global) {
1879 globalGranStats.fetch_misses++;
1882 /* Prepare FORWARD message to proc p_new */
1883 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1885 fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
1886 RtsFlags.GranFlags.Costs.latency;
1888 new_event(node_loc, from, fetchtime,
1890 tso, node, (rtsSpark*)NULL);
1892 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1894 return (NodeHasMoved);
1899 blockFetch blocks a BlockedFetch node on some kind of black hole.
1901 Taken from gum/HLComms.lc. [find a better place for that ?] -- HWL
1903 {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
1904 create @FMBQ@'s (FetchMe blocking queues) to cope with global
1905 blocking. Instead, non-local TSO are put into the BQ in the same way as
1906 local TSOs. However, we have to check if a TSO is local or global in
1907 order to account for the latencies involved and for keeping track of the
1908 number of fetches that are really going on.
1911 //@cindex blockFetch
1914 blockFetch(tso, proc, bh)
1915 StgTSO* tso; /* TSO which gets blocked */
1916 PEs proc; /* PE where that tso was running */
1917 StgClosure* bh; /* closure to block on (BH, RBH, BQ) */
1922 fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n",
1923 tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
1925 if (!IS_BLACK_HOLE(bh)) { /* catches BHs and RBHs */
1927 fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n",
1928 bh, info_type(bh), tso, tso->id, proc));
1930 /* No BH anymore => immediately unblock tso */
1931 new_event(proc, proc, CurrentTime[proc],
1933 tso, bh, (rtsSpark*)NULL);
1935 /* Is this always a REPLY to a FETCH in the profile ? */
1936 if (RtsFlags.GranFlags.GranSimStats.Full)
1937 DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
1938 return (NodeIsNoBH);
1941 /* DaH {BQ}Daq Qu' Suq 'e' wISov!
1942 Now we know that we have to put the tso into the BQ.
1943 2 cases: If block-on-fetch, tso is at head of threadq =>
1944 => take it out of threadq and into BQ
1945 If reschedule-on-fetch, tso is only pointed to be event
1946 => just put it into BQ
1949 if (!RtsFlags.GranFlags.DoAsyncFetch) {
1950 GranSimBlock(tso, proc, bh);
1952 if (RtsFlags.GranFlags.GranSimStats.Full)
1953 DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
1954 ++(tso->gran.blockcount);
1955 tso->gran.blockedat = CurrentTime[proc];
1959 /* after scheduling the GlobalBlock event the TSO is not put into the
1960 run queue again; it is only pointed to via the event we are
1961 processing now; in GranSim 4.xx there is no difference between
1962 synchr and asynchr comm here */
1963 ASSERT(!is_on_queue(tso, proc));
1964 ASSERT(tso->link == END_TSO_QUEUE);
1966 GranSimBlock(tso, proc, bh); /* GranSim statistics gathering */
1968 /* Now, put tso into BQ (similar to blocking entry codes) */
1969 info = get_itbl(bh);
1970 switch (info -> type) {
1973 case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1974 case SE_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1975 case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
1976 /* basically an inlined version of BLACKHOLE_entry -- HWL */
1977 /* Change the BLACKHOLE into a BLACKHOLE_BQ */
1978 ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1979 /* Put ourselves on the blocking queue for this black hole */
1980 // tso->link=END_TSO_QUEUE; not necessary; see assertion above
1981 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1982 tso->block_info.closure = bh;
1983 recordMutable((StgMutClosure *)bh);
1987 /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
1988 tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue);
1989 ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1990 recordMutable((StgMutClosure *)bh);
1992 # if 0 && defined(GC_MUT_REQUIRED)
1993 ToDo: check whether recordMutable is necessary -- HWL
1995 * If we modify a black hole in the old generation, we have to make
1996 * sure it goes on the mutables list
1999 if (bh <= StorageMgrInfo.OldLim) {
2000 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
2001 StorageMgrInfo.OldMutables = bh;
2003 MUT_LINK(bh) = MUT_NOT_LINKED;
2008 barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
2014 barf("Qagh: thought %p was a black hole (IP %p (%s))",
2015 bh, info, info_type(bh));
2022 //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
2023 //@subsection Idle PEs
2026 Export work to idle PEs. This function is called from @ReSchedule@
2027 before dispatching on the current event. @HandleIdlePEs@ iterates over
2028 all PEs, trying to get work for idle PEs. Note, that this is a
2029 simplification compared to GUM's fishing model. We try to compensate for
2030 that by making the cost for stealing work dependent on the number of
2031 idle processors and thereby on the probability with which a randomly
2032 sent fish would find work.
2035 //@cindex handleIdlePEs
2042 IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
2044 /* Should never be entered in GrAnSim Light setup */
2045 ASSERT(!RtsFlags.GranFlags.Light);
2047 /* Could check whether there are idle PEs if it's a cheap check */
2048 for (p = 0; p < RtsFlags.GranFlags.proc; p++)
2049 if (procStatus[p]==Idle) /* && IS_SPARKING(p) && IS_STARTING(p) */
2050 /* First look for local work i.e. examine local spark pool! */
2051 if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
2052 new_event(p, p, CurrentTime[p],
2054 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2055 procStatus[p] = Sparking;
2056 } else if ((RtsFlags.GranFlags.maxFishes==0 ||
2057 OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
2059 /* If no local work then try to get remote work!
2060 Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
2061 if (RtsFlags.GranFlags.DoStealThreadsFirst &&
2062 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
2064 if (SurplusThreads > 0l) /* Steal a thread */
2067 if (procStatus[p]!=Idle)
2071 if (SparksAvail > 0 &&
2072 (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
2075 if (SurplusThreads > 0 &&
2076 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
2082 Steal a spark and schedule moving it to proc. We want to look at PEs in
2083 clock order -- most retarded first. Currently sparks are only stolen
2084 from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
2085 this should be changed to first steal from the former then from the
2088 We model a sort of fishing mechanism by counting the number of sparks
2089 and threads we are currently stealing. */
2092 Return a random nat value in the intervall [from, to)
2102 /* random returns a value in [0, RAND_MAX] */
2103 r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
2104 r = (r==to) ? from : r;
2105 ASSERT(from<=r && (r<to || from==to));
2110 Find any PE other than proc. Used for GUM style fishing only.
2118 ASSERT(RtsFlags.GranFlags.Fishing);
2119 if (RtsFlags.GranFlags.RandomSteal) {
2120 p = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2124 IF_GRAN_DEBUG(randomSteal,
2125 belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
2132 Magic code for stealing sparks/threads makes use of global knowledge on
2136 sortPEsByTime (proc, pes_by_time, firstp, np)
2141 PEs p, temp, n, i, j;
2142 nat first, upb, r=0, q=0;
2144 ASSERT(!RtsFlags.GranFlags.Fishing);
2147 upb = RtsFlags.GranFlags.proc; /* full range of PEs */
2149 if (RtsFlags.GranFlags.RandomSteal) {
2150 r = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2156 /* pes_by_time shall contain processors from which we may steal sparks */
2157 for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
2158 if ((proc != p) && // not the current proc
2159 (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
2160 (CurrentTime[p] <= CurrentTime[CurrentProc]))
2161 pes_by_time[n++] = p;
2163 /* sort pes_by_time */
2164 for(i=0; i < n; ++i)
2165 for(j=i+1; j < n; ++j)
2166 if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
2167 rtsTime temp = pes_by_time[i];
2168 pes_by_time[i] = pes_by_time[j];
2169 pes_by_time[j] = temp;
2172 /* Choose random processor to steal spark from; first look at processors */
2173 /* that are earlier than the current one (i.e. proc) */
2175 (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
2179 /* if the assertion below is true we can get rid of first */
2180 /* ASSERT(first==n); */
2181 /* ToDo: check if first is really needed; find cleaner solution */
2188 Steal a spark (piece of work) from any processor and bring it to proc.
2190 //@cindex stealSpark
2192 stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
2195 Steal a thread from any processor and bring it to proc i.e. thread migration
2197 //@cindex stealThread
2199 stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
2202 Steal a spark or a thread and schedule moving it to proc.
2204 //@cindex stealSomething
2206 stealSomething(proc, steal_spark, steal_thread)
2207 PEs proc; // PE that needs work (stealer)
2208 rtsBool steal_spark, steal_thread; // should a spark and/or thread be stolen
2211 rtsTime fish_arrival_time;
2212 rtsSpark *spark, *prev, *next;
2213 rtsBool stolen = rtsFalse;
2215 ASSERT(steal_spark || steal_thread);
2217 /* Should never be entered in GrAnSim Light setup */
2218 ASSERT(!RtsFlags.GranFlags.Light);
2219 ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
2221 if (!RtsFlags.GranFlags.Fishing) {
2222 // ToDo: check if stealing threads is prefered over stealing sparks
2224 if (stealSparkMagic(proc))
2226 else // no spark found
2228 return stealThreadMagic(proc);
2229 else // no thread found
2231 } else { // ASSERT(steal_thread);
2232 return stealThreadMagic(proc);
2234 barf("stealSomething: never reached");
2237 /* The rest of this function does GUM style fishing */
2239 p = findRandomPE(proc); /* find a random PE other than proc */
2241 /* Message packing costs for sending a Fish; qeq jabbI'ID */
2242 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
2244 /* use another GranEvent for requesting a thread? */
2245 if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
2246 DumpRawGranEvent(p, proc, SP_REQUESTED,
2247 (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
2249 /* time of the fish arrival on the remote PE */
2250 fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
2252 /* Phps use an own Fish event for that? */
2253 /* The contents of the spark component is a HACK:
2254 1 means give me a spark;
2255 2 means give me a thread
2256 0 means give me nothing (this should never happen)
2258 new_event(p, proc, fish_arrival_time,
2260 (StgTSO*)NULL, (StgClosure*)NULL,
2261 (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
2263 ++OutstandingFishes[proc];
2264 /* only with Async fetching? */
2265 if (procStatus[proc]==Idle)
2266 procStatus[proc]=Fishing;
2268 /* time needed to clean up buffers etc after sending a message */
2269 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
2271 /* If GUM style fishing stealing always succeeds because it only consists
2272 of sending out a fish; of course, when the fish may return
2278 This version of stealing a spark makes use of the global info on all
2279 spark pools etc which is not available in a real parallel system.
2280 This could be extended to test e.g. the impact of perfect load information.
2282 //@cindex stealSparkMagic
2284 stealSparkMagic(proc)
2287 PEs p=0, i=0, j=0, n=0, first, upb;
2288 rtsSpark *spark=NULL, *next;
2289 PEs pes_by_time[MAX_PROC];
2290 rtsBool stolen = rtsFalse;
2293 /* Should never be entered in GrAnSim Light setup */
2294 ASSERT(!RtsFlags.GranFlags.Light);
2296 sortPEsByTime(proc, pes_by_time, &first, &n);
2298 while (!stolen && n>0) {
2299 upb = (first==0) ? n : first;
2300 i = natRandom(0,upb); /* choose a random eligible PE */
2303 IF_GRAN_DEBUG(randomSteal,
2304 belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
2307 ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
2309 /* Now go through rtsSparkQ and steal the first eligible spark */
2311 spark = pending_sparks_hds[p];
2312 while (!stolen && spark != (rtsSpark*)NULL)
2314 /* NB: no prev pointer is needed here because all sparks that are not
2317 if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
2318 spark->next==(rtsSpark*)NULL)
2320 /* Be social! Don't steal the only spark of an idle processor
2321 not {spark} neH yInIH !! */
2322 break; /* next PE */
2324 else if (closure_SHOULD_SPARK(spark->node))
2326 /* Don't Steal local sparks;
2327 ToDo: optionally prefer local over global sparks
2328 if (!spark->global) {
2330 continue; next spark
2333 /* found a spark! */
2335 /* Prepare message for sending spark */
2336 CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
2338 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2339 DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
2340 (StgTSO*)NULL, spark->node,
2341 spark->name, spark_queue_len(p));
2343 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2348 new_event(proc, p /* CurrentProc */, stealtime,
2350 (StgTSO*)NULL, spark->node, spark);
2353 ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
2354 if (procStatus[proc]==Idle)
2355 procStatus[proc] = Fishing;
2356 ++(spark->global); /* record that this is a global spark */
2357 ASSERT(SparksAvail>0);
2358 --SparksAvail; /* on-the-fly sparks are not available */
2359 next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
2360 CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
2362 else /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
2364 IF_GRAN_DEBUG(checkSparkQ,
2365 belch("^^ pruning spark %p (node %p) in stealSparkMagic",
2366 spark, spark->node));
2368 /* if the spark points to a node that should not be sparked,
2369 prune the spark queue at this point */
2370 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2371 DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
2372 (StgTSO*)NULL, spark->node,
2373 spark->name, spark_queue_len(p));
2374 if (RtsFlags.GranFlags.GranSimStats.Global)
2375 globalGranStats.pruned_sparks++;
2377 ASSERT(SparksAvail>0);
2379 spark = delete_from_sparkq(spark, p, rtsTrue);
2381 /* unlink spark (may have been freed!) from sparkq;
2382 if (prev == NULL) // spark was head of spark queue
2383 pending_sparks_hds[p] = spark->next;
2385 prev->next = spark->next;
2386 if (spark->next == NULL)
2387 pending_sparks_tls[p] = prev;
2391 } /* while ... iterating over sparkq */
2393 /* ToDo: assert that PE p still has work left after stealing the spark */
2395 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2396 ASSERT(pes_by_time[i]==p);
2398 /* remove p from the list (at pos i) */
2399 for (j=i; j+1<n; j++)
2400 pes_by_time[j] = pes_by_time[j+1];
2403 /* update index to first proc which is later (or equal) than proc */
2406 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2410 } /* while ... iterating over PEs in pes_by_time */
2412 IF_GRAN_DEBUG(randomSteal,
2414 belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2415 spark, spark->node, proc, p,
2416 SparksAvail, idlers());
2418 belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
2419 proc, SparksAvail, idlers()));
2421 if (RtsFlags.GranFlags.GranSimStats.Global &&
2422 stolen && (i!=0)) { /* only for statistics */
2423 globalGranStats.rs_sp_count++;
2424 globalGranStats.ntimes_total += n;
2425 globalGranStats.fl_total += first;
2426 globalGranStats.no_of_steals++;
2433 The old stealThread code, which makes use of global info and does not
2435 NB: most of this is the same as in stealSparkMagic;
2436 only the pieces specific to processing thread queues are different;
2437 long live polymorphism!
2440 //@cindex stealThreadMagic
2442 stealThreadMagic(proc)
2445 PEs p=0, i=0, j=0, n=0, first, upb;
2446 StgTSO *tso=END_TSO_QUEUE;
2447 PEs pes_by_time[MAX_PROC];
2448 rtsBool stolen = rtsFalse;
2451 /* Should never be entered in GrAnSim Light setup */
2452 ASSERT(!RtsFlags.GranFlags.Light);
2454 sortPEsByTime(proc, pes_by_time, &first, &n);
2456 while (!stolen && n>0) {
2457 upb = (first==0) ? n : first;
2458 i = natRandom(0,upb); /* choose a random eligible PE */
2461 IF_GRAN_DEBUG(randomSteal,
2462 belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
2465 /* Steal the first exportable thread in the runnable queue but
2466 never steal the first in the queue for social reasons;
2467 not Qu' wa'DIch yInIH !!
2469 /* Would be better to search through queue and have options which of
2470 the threads to pick when stealing */
2471 if (run_queue_hds[p] == END_TSO_QUEUE) {
2472 IF_GRAN_DEBUG(randomSteal,
2473 belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)",
2476 tso = run_queue_hds[p]->link; /* tso is *2nd* thread in thread queue */
2480 /* update links in queue */
2481 run_queue_hds[p]->link = tso->link;
2482 if (run_queue_tls[p] == tso)
2483 run_queue_tls[p] = run_queue_hds[p];
2485 /* ToDo: Turn magic constants into params */
2487 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
2489 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2493 + 4l * RtsFlags.GranFlags.Costs.additional_latency
2494 + 5l * RtsFlags.GranFlags.Costs.munpacktime;
2496 /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
2497 SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */);
2499 /* Move from one queue to another */
2500 new_event(proc, p, stealtime,
2502 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
2504 /* MAKE_BUSY(proc); not yet; only when thread is in threadq */
2505 ++OutstandingFishes[proc];
2506 if (procStatus[proc])
2507 procStatus[proc] = Fishing;
2510 if(RtsFlags.GranFlags.GranSimStats.Full)
2511 DumpRawGranEvent(p, proc,
2513 tso, (StgClosure*)NULL, (StgInt)0, 0);
2515 /* costs for tidying up buffer after having sent it */
2516 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
2519 /* ToDo: assert that PE p still has work left after stealing the spark */
2521 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2522 ASSERT(pes_by_time[i]==p);
2524 /* remove p from the list (at pos i) */
2525 for (j=i; j+1<n; j++)
2526 pes_by_time[j] = pes_by_time[j+1];
2529 /* update index to first proc which is later (or equal) than proc */
2532 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2536 } /* while ... iterating over PEs in pes_by_time */
2538 IF_GRAN_DEBUG(randomSteal,
2540 belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2541 tso->id, tso, proc, p,
2542 SparksAvail, idlers());
2544 belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
2545 proc, SparksAvail, idlers()));
2547 if (RtsFlags.GranFlags.GranSimStats.Global &&
2548 stolen && (i!=0)) { /* only for statistics */
2549 /* ToDo: more statistics on avg thread queue lenght etc */
2550 globalGranStats.rs_t_count++;
2551 globalGranStats.no_of_migrates++;
2557 //@cindex sparkStealTime
2559 sparkStealTime(void)
2561 double fishdelay, sparkdelay, latencydelay;
2562 fishdelay = (double)RtsFlags.GranFlags.proc/2;
2563 sparkdelay = fishdelay -
2564 ((fishdelay-1.0)/(double)(RtsFlags.GranFlags.proc-1))*((double)idlers());
2565 latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
2567 return((rtsTime)latencydelay);
2570 //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
2571 //@subsection Routines directly called from Haskell world
2573 The @GranSim...@ routines in here are directly called via macros from the
2576 First some auxiliary routines.
2579 /* Take the current thread off the thread queue and thereby activate the
2580 next thread. It's assumed that the next ReSchedule after this uses
2581 NEW_THREAD as param.
2582 This fct is called from GranSimBlock and GranSimFetch
2585 //@cindex ActivateNextThread
2588 ActivateNextThread (proc)
2593 This routine is entered either via GranSimFetch or via GranSimBlock.
2594 It has to prepare the CurrentTSO for being blocked and update the
2595 run queue and other statistics on PE proc. The actual enqueuing to the
2596 blocking queue (if coming from GranSimBlock) is done in the entry code
2597 of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
2599 /* ToDo: add assertions here!! */
2600 //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
2602 // Only necessary if the running thread is at front of the queue
2603 // run_queue_hds[proc] = run_queue_hds[proc]->link;
2604 ASSERT(CurrentProc==proc);
2605 ASSERT(!is_on_queue(CurrentTSO,proc));
2606 if (run_queue_hds[proc]==END_TSO_QUEUE) {
2607 /* NB: this routine is only entered with asynchr comm (see assertion) */
2608 procStatus[proc] = Idle;
2610 /* ToDo: check cost assignment */
2611 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
2612 if (RtsFlags.GranFlags.GranSimStats.Full &&
2613 (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight))
2614 /* right flag !?? ^^^ */
2615 DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
2616 (StgClosure*)NULL, (StgInt)0, 0);
2621 The following GranSim fcts are stg-called from the threaded world.
2624 /* Called from HP_CHK and friends (see StgMacros.h) */
2625 //@cindex GranSimAllocate
2630 CurrentTSO->gran.allocs += n;
2631 ++(CurrentTSO->gran.basicblocks);
2633 if (RtsFlags.GranFlags.GranSimStats.Heap) {
2634 DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
2635 (StgClosure*)NULL, (StgInt)0, n);
2638 CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
2639 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
2643 Subtract the values added above, if a heap check fails and
2644 so has to be redone.
2646 //@cindex GranSimUnallocate
2648 GranSimUnallocate(n)
2651 CurrentTSO->gran.allocs -= n;
2652 --(CurrentTSO->gran.basicblocks);
2654 CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2655 CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2658 /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
2659 //@cindex GranSimExec
2661 GranSimExec(ariths,branches,loads,stores,floats)
2662 StgWord ariths,branches,loads,stores,floats;
2664 StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths +
2665 RtsFlags.GranFlags.Costs.branch_cost*branches +
2666 RtsFlags.GranFlags.Costs.load_cost * loads +
2667 RtsFlags.GranFlags.Costs.store_cost*stores +
2668 RtsFlags.GranFlags.Costs.float_cost*floats;
2670 CurrentTSO->gran.exectime += cost;
2671 CurrentTime[CurrentProc] += cost;
2675 Fetch the node if it isn't local
2676 -- result indicates whether fetch has been done.
2678 This is GRIP-style single item fetching.
2681 //@cindex GranSimFetch
2683 GranSimFetch(node /* , liveness_mask */ )
2685 /* StgInt liveness_mask; */
2687 /* reset the return value (to be checked within STG land) */
2688 NeedToReSchedule = rtsFalse;
2690 if (RtsFlags.GranFlags.Light) {
2691 /* Always reschedule in GrAnSim-Light to prevent one TSO from
2693 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2694 ContinueThread,CurrentTSO,node,NULL);
2699 /* Faking an RBH closure:
2700 If the bitmask of the closure is 0 then this node is a fake RBH;
2702 if (node->header.gran.procs == Nowhere) {
2704 belch("## Found fake RBH (node %p); delaying TSO %d (%p)",
2705 node, CurrentTSO->id, CurrentTSO));
2707 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
2708 ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
2710 /* Rescheduling (GranSim internal) is necessary */
2711 NeedToReSchedule = rtsTrue;
2716 /* Note: once a node has been fetched, this test will be passed */
2717 if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
2719 PEs p = where_is(node);
2722 IF_GRAN_DEBUG(thunkStealing,
2724 belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
2726 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2727 /* NB: Fetch is counted on arrival (FetchReply) */
2729 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
2730 RtsFlags.GranFlags.Costs.latency;
2732 new_event(p, CurrentProc, fetchtime,
2733 FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
2735 if (fetchtime<TimeOfNextEvent)
2736 TimeOfNextEvent = fetchtime;
2738 /* About to block */
2739 CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
2741 ++OutstandingFetches[CurrentProc];
2743 if (RtsFlags.GranFlags.DoAsyncFetch)
2744 /* if asynchr comm is turned on, activate the next thread in the q */
2745 ActivateNextThread(CurrentProc);
2747 procStatus[CurrentProc] = Fetching;
2750 /* ToDo: nuke the entire if (anything special for fair schedule?) */
2751 if (RtsFlags.GranFlags.DoAsyncFetch)
2753 /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
2754 if(!RtsFlags.GranFlags.DoFairSchedule)
2756 /* now done in do_the_fetchnode
2757 if (RtsFlags.GranFlags.GranSimStats.Full)
2758 DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
2759 node, (StgInt)0, 0);
2761 ActivateNextThread(CurrentProc);
2763 # if 0 && defined(GRAN_CHECK)
2764 if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
2765 if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
2766 fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
2767 CurrentTSO,CurrentTime[CurrentProc]);
2768 stg_exit(EXIT_FAILURE);
2770 TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
2774 CurrentTSO->link = END_TSO_QUEUE;
2775 /* CurrentTSO = END_TSO_QUEUE; */
2777 /* CurrentTSO is pointed to by the FetchNode event; it is
2778 on no run queue any more */
2779 } else { /* fair scheduling currently not supported -- HWL */
2780 barf("Asynchr communication is not yet compatible with fair scheduling\n");
2782 } else { /* !RtsFlags.GranFlags.DoAsyncFetch */
2783 procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
2784 /* now done in do_the_fetchnode
2785 if (RtsFlags.GranFlags.GranSimStats.Full)
2786 DumpRawGranEvent(CurrentProc, p,
2787 GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
2789 IF_GRAN_DEBUG(blockOnFetch,
2790 BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
2794 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2796 /* Rescheduling (GranSim internal) is necessary */
2797 NeedToReSchedule = rtsTrue;
2804 //@cindex GranSimSpark
2806 GranSimSpark(local,node)
2810 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2811 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2812 DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
2813 END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
2815 /* Force the PE to take notice of the spark */
2816 if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2817 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2819 END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
2820 if (CurrentTime[CurrentProc]<TimeOfNextEvent)
2821 TimeOfNextEvent = CurrentTime[CurrentProc];
2825 ++CurrentTSO->gran.localsparks;
2827 ++CurrentTSO->gran.globalsparks;
2830 //@cindex GranSimSparkAt
2832 GranSimSparkAt(spark,where,identifier)
2834 StgClosure *where; /* This should be a node; alternatively could be a GA */
2837 PEs p = where_is(where);
2838 GranSimSparkAtAbs(spark,p,identifier);
2841 //@cindex GranSimSparkAtAbs
2843 GranSimSparkAtAbs(spark,proc,identifier)
2850 if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
2851 return; /* turned a spark into a NULL. */
2853 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2854 if(RtsFlags.GranFlags.GranSimStats.Sparks)
2855 DumpRawGranEvent(proc,0,SP_SPARKAT,
2856 END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
2858 if (proc!=CurrentProc) {
2859 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2860 exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]?
2861 CurrentTime[proc]: CurrentTime[CurrentProc])
2862 + RtsFlags.GranFlags.Costs.latency;
2864 exporttime = CurrentTime[CurrentProc];
2867 if ( RtsFlags.GranFlags.Light )
2868 /* Need CurrentTSO in event field to associate costs with creating
2869 spark even in a GrAnSim Light setup */
2870 new_event(proc, CurrentProc, exporttime,
2872 CurrentTSO, spark->node, spark);
2874 new_event(proc, CurrentProc, exporttime,
2875 MoveSpark, (StgTSO*)NULL, spark->node, spark);
2876 /* Bit of a hack to treat placed sparks the same as stolen sparks */
2877 ++OutstandingFishes[proc];
2879 /* Force the PE to take notice of the spark (FINDWORK is put after a
2880 MoveSpark into the sparkq!) */
2881 if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2882 new_event(CurrentProc,CurrentProc,exporttime+1,
2884 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2887 if (exporttime<TimeOfNextEvent)
2888 TimeOfNextEvent = exporttime;
2890 if (proc!=CurrentProc) {
2891 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2892 ++CurrentTSO->gran.globalsparks;
2894 ++CurrentTSO->gran.localsparks;
2899 This function handles local and global blocking. It's called either
2900 from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
2901 trying to fetch an BH or RBH
2904 //@cindex GranSimBlock
2906 GranSimBlock(tso, proc, node)
2911 PEs node_proc = where_is(node),
2912 tso_proc = where_is((StgClosure *)tso);
2914 ASSERT(tso_proc==CurrentProc);
2915 // ASSERT(node_proc==CurrentProc);
2917 if (node_proc!=CurrentProc)
2918 belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
2919 tso->id, tso, tso_proc, node, node_proc));
2920 ASSERT(tso->link==END_TSO_QUEUE);
2921 ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
2922 //ASSERT(tso==run_queue_hds[proc]);
2925 belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
2926 tso->id, tso, proc, node, CurrentTime[proc]));
2929 /* THIS SHOULD NEVER HAPPEN!
2930 If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
2931 we have missed a GranSimFetch before entering this closure;
2932 we hack around it for now, faking a FetchNode;
2933 because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
2934 tso will be blocked on this closure until the FetchReply occurs.
2938 if (node_proc!=CurrentProc) {
2940 ret = GranSimFetch(node);
2943 belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
2944 node, node_proc, CurrentProc););
2949 if (RtsFlags.GranFlags.GranSimStats.Full)
2950 DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
2952 ++(tso->gran.blockcount);
2953 /* Distinction between local and global block is made in blockFetch */
2954 tso->gran.blockedat = CurrentTime[proc];
2956 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
2957 ActivateNextThread(proc);
2958 /* tso->link = END_TSO_QUEUE; not really necessary; only for testing */
2963 //@node Index, , Dumping routines, GranSim specific code
2967 //* ActivateNextThread:: @cindex\s-+ActivateNextThread
2968 //* CurrentProc:: @cindex\s-+CurrentProc
2969 //* CurrentTime:: @cindex\s-+CurrentTime
2970 //* GranSimAllocate:: @cindex\s-+GranSimAllocate
2971 //* GranSimBlock:: @cindex\s-+GranSimBlock
2972 //* GranSimExec:: @cindex\s-+GranSimExec
2973 //* GranSimFetch:: @cindex\s-+GranSimFetch
2974 //* GranSimLight_insertThread:: @cindex\s-+GranSimLight_insertThread
2975 //* GranSimSpark:: @cindex\s-+GranSimSpark
2976 //* GranSimSparkAt:: @cindex\s-+GranSimSparkAt
2977 //* GranSimSparkAtAbs:: @cindex\s-+GranSimSparkAtAbs
2978 //* GranSimUnallocate:: @cindex\s-+GranSimUnallocate
2979 //* any_idle:: @cindex\s-+any_idle
2980 //* blockFetch:: @cindex\s-+blockFetch
2981 //* do_the_fetchnode:: @cindex\s-+do_the_fetchnode
2982 //* do_the_fetchreply:: @cindex\s-+do_the_fetchreply
2983 //* do_the_findwork:: @cindex\s-+do_the_findwork
2984 //* do_the_globalblock:: @cindex\s-+do_the_globalblock
2985 //* do_the_movespark:: @cindex\s-+do_the_movespark
2986 //* do_the_movethread:: @cindex\s-+do_the_movethread
2987 //* do_the_startthread:: @cindex\s-+do_the_startthread
2988 //* do_the_unblock:: @cindex\s-+do_the_unblock
2989 //* fetchNode:: @cindex\s-+fetchNode
2990 //* ga_to_proc:: @cindex\s-+ga_to_proc
2991 //* get_next_event:: @cindex\s-+get_next_event
2992 //* get_time_of_next_event:: @cindex\s-+get_time_of_next_event
2993 //* grab_event:: @cindex\s-+grab_event
2994 //* handleFetchRequest:: @cindex\s-+handleFetchRequest
2995 //* handleIdlePEs:: @cindex\s-+handleIdlePEs
2996 //* idlers:: @cindex\s-+idlers
2997 //* insertThread:: @cindex\s-+insertThread
2998 //* insert_event:: @cindex\s-+insert_event
2999 //* is_on_queue:: @cindex\s-+is_on_queue
3000 //* is_unique:: @cindex\s-+is_unique
3001 //* new_event:: @cindex\s-+new_event
3002 //* prepend_event:: @cindex\s-+prepend_event
3003 //* print_event:: @cindex\s-+print_event
3004 //* print_eventq:: @cindex\s-+print_eventq
3005 //* prune_eventq :: @cindex\s-+prune_eventq
3006 //* spark queue:: @cindex\s-+spark queue
3007 //* sparkStealTime:: @cindex\s-+sparkStealTime
3008 //* stealSomething:: @cindex\s-+stealSomething
3009 //* stealSpark:: @cindex\s-+stealSpark
3010 //* stealSparkMagic:: @cindex\s-+stealSparkMagic
3011 //* stealThread:: @cindex\s-+stealThread
3012 //* stealThreadMagic:: @cindex\s-+stealThreadMagic
3013 //* thread_queue_len:: @cindex\s-+thread_queue_len
3014 //* traverse_eventq_for_gc:: @cindex\s-+traverse_eventq_for_gc
3015 //* where_is:: @cindex\s-+where_is