2 Time-stamp: <Sat Dec 11 1999 17:25:27 Stardate: [-30]4033.42 software>
3 $Id: GranSim.c,v 1.2 2000/01/13 14:34:06 hwloidl Exp $
5 Variables and functions specific to GranSim the parallelism simulator
9 //@node GranSim specific code, , ,
10 //@section GranSim specific code
13 Macros for dealing with the new and improved GA field for simulating
14 parallel execution. Based on @CONCURRENT@ package. The GA field now
15 contains a mask, where the n-th bit stands for the n-th processor, where
16 this data can be found. In case of multiple copies, several bits are
17 set. The total number of processors is bounded by @MAX_PROC@, which
18 should be <= the length of a word in bits. -- HWL
23 //* Prototypes and externs::
24 //* Constants and Variables::
26 //* Global Address Operations::
27 //* Global Event Queue::
28 //* Spark queue functions::
29 //* Scheduling functions::
30 //* Thread Queue routines::
31 //* GranSim functions::
32 //* GranSimLight routines::
33 //* Code for Fetching Nodes::
35 //* Routines directly called from Haskell world::
36 //* Emiting profiling info for GrAnSim::
37 //* Dumping routines::
41 //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
42 //@subsection Includes
47 #include "StgMiscClosures.h"
50 #include "SchedAPI.h" // for pushClosure
52 #include "GranSimRts.h"
54 #include "ParallelRts.h"
55 #include "ParallelDebug.h"
56 #include "Storage.h" // for recordMutable
59 //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
60 //@subsection Prototypes and externs
65 static inline PEs ga_to_proc(StgWord);
66 static inline rtsBool any_idle(void);
67 static inline nat idlers(void);
68 PEs where_is(StgClosure *node);
70 static rtsBool stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
71 static inline rtsBool stealSpark(PEs proc);
72 static inline rtsBool stealThread(PEs proc);
73 static rtsBool stealSparkMagic(PEs proc);
74 static rtsBool stealThreadMagic(PEs proc);
75 /* subsumed by stealSomething
76 static void stealThread(PEs proc);
77 static void stealSpark(PEs proc);
79 static rtsTime sparkStealTime(void);
80 static nat natRandom(nat from, nat to);
81 static PEs findRandomPE(PEs proc);
82 static void sortPEsByTime (PEs proc, PEs *pes_by_time,
83 nat *firstp, nat *np);
89 //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
90 //@subsection Constants and Variables
92 #if defined(GRAN) || defined(PAR)
93 /* See GranSim.h for the definition of the enum gran_event_types */
94 char *gran_event_names[] = {
96 "STEALING", "STOLEN", "STOLEN(Q)",
97 "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
98 "SCHEDULE", "DESCHEDULE",
100 "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
103 "SYSTEM_START", "SYSTEM_END", /* only for debugging */
108 #if defined(GRAN) /* whole file */
109 char *proc_status_names[] = {
110 "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy",
114 /* For internal use (event statistics) only */
115 char *event_names[] =
116 { "ContinueThread", "StartThread", "ResumeThread",
117 "MoveSpark", "MoveThread", "FindWork",
118 "FetchNode", "FetchReply",
119 "GlobalBlock", "UnblockThread"
122 //@cindex CurrentProc
126 ToDo: Create a structure for the processor status and put all the
127 arrays below into it.
130 //@cindex CurrentTime
131 /* One clock for each PE */
132 rtsTime CurrentTime[MAX_PROC];
134 /* Useful to restrict communication; cf fishing model in GUM */
135 nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
137 /* Status of each PE (new since but independent of GranSim Light) */
138 rtsProcStatus procStatus[MAX_PROC];
140 # if defined(GRAN) && defined(GRAN_CHECK)
141 /* To check if the RTS ever tries to run a thread that should be blocked
142 because of fetching remote data */
143 StgTSO *BlockedOnFetch[MAX_PROC];
144 # define FETCH_MASK_TSO 0x08000000 /* only bits 0, 1, 2 should be used */
147 nat SparksAvail = 0; /* How many sparks are available */
148 nat SurplusThreads = 0; /* How many excess threads are there */
150 /* Do we need to reschedule following a fetch? */
151 rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse;
152 rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
154 //@cindex spark queue
155 /* GranSim: a globally visible array of spark queues */
156 rtsSparkQ pending_sparks_hds[MAX_PROC];
157 rtsSparkQ pending_sparks_tls[MAX_PROC];
159 nat sparksIgnored = 0, sparksCreated = 0;
161 GlobalGranStats globalGranStats;
163 nat gran_arith_cost, gran_branch_cost, gran_load_cost,
164 gran_store_cost, gran_float_cost;
167 Old comment from 0.29. ToDo: Check and update -- HWL
169 The following variables control the behaviour of GrAnSim. In general, there
170 is one RTS option for enabling each of these features. In getting the
171 desired setup of GranSim the following questions have to be answered:
173 \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)?
174 Currently only unfair scheduling is supported.
175 \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)?
176 Either block and wait for the
177 data or reschedule and do some other work.
178 Thus, if this variable is true, asynchronous communication is
179 modelled. Block on fetch mainly makes sense for incremental fetching.
181 There is also a simplified fetch variant available
182 (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
183 communication. It is faster but the results will be less accurate.
184 \item How aggressive to be in getting work after a reschedule on fetch
185 (@RtsFlags.GranFlags.FetchStrategy@)?
186 This is determined by the so-called {\em fetching
187 strategy\/}. Currently, there are four possibilities:
189 \item Only run a runnable thread.
190 \item Turn a spark into a thread, if necessary.
191 \item Steal a remote spark, if necessary.
192 \item Steal a runnable thread from another processor, if necessary.
194 The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
195 when rescheduling on a fetch.
196 \item Should sparks or threads be stolen first when looking for work
197 (@RtsFlags.GranFlags.DoStealThreadsFirst@)?
198 The default is to steal sparks first (much cheaper).
199 \item Should the RTS use a lazy thread creation scheme
200 (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)? By default yes i.e.\ sparks are only
201 turned into threads when work is needed. Also note, that sparks
202 can be discarded by the RTS (this is done in the case of an overflow
203 of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@ to @True@ forces
204 the creation of threads at the next possibility (i.e.\ when new work
205 is demanded the next time).
206 \item Should data be fetched closure-by-closure or in packets
207 (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental
208 (i.e.\ closure-by-closure) strategy. This makes sense in a
209 low-latency setting but is bad in a high-latency system. Setting
210 @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
211 parameters determine the size of the packets (@pack_buffer_size@) and the number of
212 thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
213 \item If there is no other possibility to find work, should runnable threads
214 be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
215 RTS tried to get sparks (either local or remote ones) first. Thread
216 migration is very expensive, since a whole TSO has to be transferred
217 and probably data locality becomes worse in the process. Note, that
218 the closure, which will be evaluated next by that TSO is not
219 transferred together with the TSO (that might block another thread).
220 \item Should the RTS distinguish between sparks created by local nodes and
221 stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)? The idea is to improve
222 data locality by preferring sparks of local nodes (it is more likely
223 that the data for those sparks is already on the local processor).
224 However, such a distinction also imposes an overhead on the spark
225 queue management, and typically a large number of sparks are
226 generated during execution. By default this variable is set to @False@.
227 \item Should the RTS use granularity control mechanisms? The idea of a
228 granularity control mechanism is to make use of granularity
229 information provided via annotation of the @par@ construct in order
230 to prefer bigger threads when either turning a spark into a thread or
231 when choosing the next thread to schedule. Currently, three such
232 mechanisms are implemented:
234 \item Cut-off: The granularity information is interpreted as a
235 priority. If a threshold priority is given to the RTS, then
236 only those sparks with a higher priority than the threshold
237 are actually created. Other sparks are immediately discarded.
238 This is similar to a usual cut-off mechanism often used in
239 parallel programs, where parallelism is only created if the
240 input data is lage enough. With this option, the choice is
241 hidden in the RTS and only the threshold value has to be
242 provided as a parameter to the runtime system.
243 \item Priority Sparking: This mechanism keeps priorities for sparks
244 and chooses the spark with the highest priority when turning
245 a spark into a thread. After that the priority information is
246 discarded. The overhead of this mechanism comes from
247 maintaining a sorted spark queue.
248 \item Priority Scheduling: This mechanism keeps the granularity
249 information for threads, to. Thus, on each reschedule the
250 largest thread is chosen. This mechanism has a higher
251 overhead, as the thread queue is sorted, too.
256 //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
257 //@subsection Initialisation
260 init_gr_stats (void) {
261 memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
264 globalGranStats.noOfEvents = 0;
265 for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
267 /* communication stats */
268 globalGranStats.fetch_misses = 0;
269 globalGranStats.tot_low_pri_sparks = 0;
272 globalGranStats.rs_sp_count = 0;
273 globalGranStats.rs_t_count = 0;
274 globalGranStats.ntimes_total = 0,
275 globalGranStats.fl_total = 0;
276 globalGranStats.no_of_steals = 0;
278 /* spark queue stats */
279 globalGranStats.tot_sq_len = 0,
280 globalGranStats.tot_sq_probes = 0;
281 globalGranStats.tot_sparks = 0;
282 globalGranStats.withered_sparks = 0;
283 globalGranStats.tot_add_threads = 0;
284 globalGranStats.tot_tq_len = 0;
285 globalGranStats.non_end_add_threads = 0;
288 globalGranStats.tot_threads_created = 0;
289 for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
293 //@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
294 //@subsection Global Address Operations
296 ----------------------------------------------------------------------
297 Global Address Operations
299 These functions perform operations on the global-address (ga) part of a
300 closure. The ga is the only new field (1 word) in a closure introduced by
301 GrAnSim. It serves as a bitmask, indicating on which processor the
302 closure is residing. Since threads are described by Thread State Object
303 (TSO), which is nothing but another kind of closure, this scheme allows
304 gives placement information about threads.
306 A ga is just a bitmask, so the operations on them are mainly bitmask
307 manipulating functions. Note, that there are important macros like PROCS,
308 IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
310 NOTE: In GrAnSim-light we don't maintain placement information. This
311 allows to simulate an arbitrary number of processors. The price we have
312 to be is the lack of costing any communication properly. In short,
313 GrAnSim-light is meant to reveal the maximal parallelism in a program.
314 From an implementation point of view the important thing is: {\em
315 GrAnSim-light does not maintain global-addresses}. */
317 /* ga_to_proc returns the first processor marked in the bitmask ga.
318 Normally only one bit in ga should be set. But for PLCs all bits
319 are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
324 ga_to_proc(StgWord ga)
327 for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
328 ASSERT(0<=i && i<RtsFlags.GranFlags.proc);
332 /* NB: This takes a *node* rather than just a ga as input */
335 where_is(StgClosure *node)
336 { return (ga_to_proc(PROCS(node))); }
341 is_unique(StgClosure *node)
344 rtsBool unique = rtsFalse;
346 for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
347 if (IS_LOCAL_TO(PROCS(node), i))
348 if (unique) // exactly 1 instance found so far
349 return rtsFalse; // found a 2nd instance => not unique
351 unique = rtsTrue; // found 1st instance
352 ASSERT(unique); // otherwise returned from within loop
357 static inline rtsBool
358 any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
361 for(i=0, any_idle=rtsFalse;
362 !any_idle && i<RtsFlags.GranFlags.proc;
363 any_idle = any_idle || procStatus[i] == Idle, i++)
369 idlers(void) { /* number of idle PEs */
372 i<RtsFlags.GranFlags.proc;
373 j += (procStatus[i] == Idle) ? 1 : 0, i++)
378 //@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
379 //@subsection Global Event Queue
381 The following routines implement an ADT of an event-queue (FIFO).
382 ToDo: Put that in an own file(?)
385 /* Pointer to the global event queue; events are currently malloc'ed */
386 rtsEventQ EventHd = NULL;
388 //@cindex get_next_event
392 static rtsEventQ entry = NULL;
394 if (EventHd == NULL) {
395 barf("No next event. This may be caused by a circular data dependency in the program.");
401 if (RtsFlags.GranFlags.GranSimStats.Global) { /* count events */
402 globalGranStats.noOfEvents++;
403 globalGranStats.event_counts[EventHd->evttype]++;
408 IF_GRAN_DEBUG(event_trace,
411 EventHd = EventHd->next;
415 /* When getting the time of the next event we ignore CONTINUETHREAD events:
416 we don't want to be interrupted before the end of the current time slice
417 unless there is something important to handle.
419 //@cindex get_time_of_next_event
421 get_time_of_next_event(void)
423 rtsEventQ event = EventHd;
425 while (event != NULL && event->evttype==ContinueThread) {
429 return ((rtsTime) 0);
431 return (event->time);
434 /* ToDo: replace malloc/free with a free list */
435 //@cindex insert_event
437 insert_event(newentry)
440 rtsEventType evttype = newentry->evttype;
441 rtsEvent *event, **prev;
443 /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
445 /* Search the queue and insert at the right point:
446 FINDWORK before everything, CONTINUETHREAD after everything.
448 This ensures that we find any available work after all threads have
449 executed the current cycle. This level of detail would normally be
450 irrelevant, but matters for ridiculously low latencies...
453 /* Changed the ordering: Now FINDWORK comes after everything but
454 CONTINUETHREAD. This makes sure that a MOVESPARK comes before a
455 FINDWORK. This is important when a GranSimSparkAt happens and
456 DoAlwaysCreateThreads is turned on. Also important if a GC occurs
457 when trying to build a new thread (see much_spark) -- HWL 02/96 */
462 for (event = EventHd, prev=(rtsEvent**)&EventHd;
464 prev = (rtsEvent**)&(event->next), event = event->next) {
466 case FindWork: if ( event->time < newentry->time ||
467 ( (event->time == newentry->time) &&
468 (event->evttype != ContinueThread) ) )
472 case ContinueThread: if ( event->time <= newentry->time )
476 default: if ( event->time < newentry->time ||
477 ((event->time == newentry->time) &&
478 (event->evttype == newentry->evttype)) )
483 /* Insert newentry here (i.e. before event) */
485 newentry->next = event;
495 new_event(proc,creator,time,evttype,tso,node,spark)
498 rtsEventType evttype;
503 rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
505 newentry->proc = proc;
506 newentry->creator = creator;
507 newentry->time = time;
508 newentry->evttype = evttype;
510 newentry->node = node;
511 newentry->spark = spark;
512 newentry->gc_info = 0;
513 newentry->next = NULL;
515 insert_event(newentry);
518 fprintf(stderr, "GRAN: new_event: \n");
519 print_event(newentry))
522 //@cindex prepend_event
524 prepend_event(event) /* put event at beginning of EventQueue */
526 { /* only used for GC! */
527 event->next = EventHd;
533 grab_event(void) /* undo prepend_event i.e. get the event */
534 { /* at the head of EventQ but don't free anything */
535 rtsEventQ event = EventHd;
537 if (EventHd == NULL) {
538 barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
541 EventHd = EventHd->next;
545 //@cindex traverse_eventq_for_gc
547 traverse_eventq_for_gc(void)
549 rtsEventQ event = EventHd;
551 StgClosure *closurep;
553 StgPtr buffer, bufptr;
556 /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
557 orig closure (root of packed graph). This means that a graph, which is
558 between processors at the time of GC is fetched again at the time when
559 it would have arrived, had there been no GC. Slightly inaccurate but
561 This is only needed for GUM style fetchng. -- HWL */
562 if (!RtsFlags.GranFlags.DoBulkFetching)
565 for(event = EventHd; event!=NULL; event=event->next) {
566 if (event->evttype==FetchReply) {
567 buffer = stgCast(StgPtr,event->node);
568 ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG); /* It's a pack buffer */
569 bufsize = buffer[PACK_SIZE_LOCN];
570 closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
571 tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
573 creator = event->creator; /* similar to unpacking */
574 for (bufptr=buffer+PACK_HDR_SIZE;
575 bufptr<(buffer+bufsize);
577 // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
578 // (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
579 if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
580 convertFromRBH(stgCast(StgClosure *,bufptr));
584 event->evttype = FetchNode;
585 event->proc = creator;
586 event->creator = proc;
587 event->node = closurep;
597 StgClosure *MarkRoot(StgClosure *root); // prototype
599 rtsEventQ event = EventHd;
602 /* iterate over eventq and register relevant fields in event as roots */
603 for(event = EventHd, len = 0; event!=NULL; event=event->next, len++) {
604 switch (event->evttype) {
606 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
609 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
610 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
613 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
614 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
617 event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
620 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
625 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
626 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
629 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
630 if (RtsFlags.GranFlags.DoBulkFetching)
631 // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
632 belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
634 event->node = (StgTSO *)MarkRoot((StgClosure *)event->node);
637 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
638 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
641 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
642 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
645 barf("markEventQueue: trying to mark unknown event @ %p", event);
648 belch("GC: markEventQueue: %d events in queue", len));
652 Prune all ContinueThread events related to tso or node in the eventq.
653 Currently used if a thread leaves STG land with ThreadBlocked status,
654 i.e. it blocked on a closure and has been put on its blocking queue. It
655 will be reawakended via a call to awaken_blocked_queue. Until then no
656 event effecting this tso should appear in the eventq. A bit of a hack,
657 because ideally we shouldn't generate such spurious ContinueThread events
660 //@cindex prune_eventq
662 prune_eventq(tso, node)
665 { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
667 /* node unused for now */
669 /* tso must be valid, then */
670 ASSERT(tso!=END_TSO_QUEUE);
671 while (event != NULL) {
672 if (event->evttype==ContinueThread &&
674 IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
675 belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
676 event->tso->id, event->tso, event->proc, event->time, event));
677 if (prev==(rtsEventQ)NULL) { // beginning of eventq
678 EventHd = event->next;
682 prev->next = event->next;
686 } else { // no pruning necessary; go to next event
693 //@cindex print_event
698 char str_tso[16], str_node[16];
701 if (event->tso==END_TSO_QUEUE) {
702 strcpy(str_tso, "______");
705 sprintf(str_tso, "%p", event->tso);
706 tso_id = (event->tso==NULL) ? 0 : event->tso->id;
708 if (event->node==(StgClosure*)NULL) {
709 strcpy(str_node, "______");
711 sprintf(str_node, "%p", event->node);
713 // HWL: shouldn't be necessary; ToDo: nuke
718 fprintf(stderr,"Evt: NIL\n");
720 fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
721 event_names[event->evttype], event->evttype,
722 event->proc, event->creator, event->time,
723 tso_id, str_tso, str_node
724 /*, event->spark, event->next */ );
728 //@cindex print_eventq
735 fprintf(stderr,"Event Queue with root at %p:\n", hd);
736 for (x=hd; x!=NULL; x=x->next) {
742 Spark queue functions are now all in Sparks.c!!
744 //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
745 //@subsection Scheduling functions
748 These functions are variants of thread initialisation and therefore
749 related to initThread and friends in Schedule.c. However, they are
750 specific to a GranSim setup in storing more info in the TSO's statistics
751 buffer and sorting the thread queues etc.
755 A large portion of startThread deals with maintaining a sorted thread
756 queue, which is needed for the Priority Sparking option. Without that
757 complication the code boils down to FIFO handling.
759 //@cindex insertThread
761 insertThread(tso, proc)
765 StgTSO *prev = NULL, *next = NULL;
767 rtsBool found = rtsFalse;
769 ASSERT(CurrentProc==proc);
770 ASSERT(!is_on_queue(tso,proc));
771 /* Idle proc: put the thread on the run queue
772 same for pri spark and basic version */
773 if (run_queue_hds[proc] == END_TSO_QUEUE)
776 ASSERT((CurrentProc==MainProc &&
777 CurrentTime[MainProc]==0 &&
778 procStatus[MainProc]==Idle) ||
779 procStatus[proc]==Starting);
781 run_queue_hds[proc] = run_queue_tls[proc] = tso;
783 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
785 /* new_event of ContinueThread has been moved to do_the_startthread */
788 ASSERT(procStatus[proc]==Idle ||
789 procStatus[proc]==Fishing ||
790 procStatus[proc]==Starting);
791 procStatus[proc] = Busy;
796 if (RtsFlags.GranFlags.Light)
797 GranSimLight_insertThread(tso, proc);
799 /* Only for Pri Scheduling: find place where to insert tso into queue */
800 if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
801 /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
802 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
803 (next != END_TSO_QUEUE) &&
804 !(found = tso->gran.pri >= next->gran.pri);
805 prev = next, next = next->link, count++)
807 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
808 (prev==(StgTSO*)NULL || prev->link==next));
811 ASSERT(!found || next != END_TSO_QUEUE);
812 ASSERT(procStatus[proc]!=Idle);
815 /* found can only be rtsTrue if pri scheduling enabled */
816 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
817 if (RtsFlags.GranFlags.GranSimStats.Global)
818 globalGranStats.non_end_add_threads++;
819 /* Add tso to ThreadQueue between prev and next */
821 if ( next == (StgTSO*)END_TSO_QUEUE ) {
824 /* no back link for TSO chain */
827 if ( prev == (StgTSO*)END_TSO_QUEUE ) {
828 /* Never add TSO as first elem of thread queue; the first */
829 /* element should be the one that is currently running -- HWL */
831 belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
832 tso, tso->gran.pri, run_queue_hd, proc,
837 } else { /* !found */ /* or not pri sparking! */
838 /* Add TSO to the end of the thread queue on that processor */
839 run_queue_tls[proc]->link = tso;
840 run_queue_tls[proc] = tso;
842 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
843 CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
844 RtsFlags.GranFlags.Costs.threadqueuetime;
846 /* ToDo: check if this is still needed -- HWL
847 if (RtsFlags.GranFlags.DoThreadMigration)
850 if (RtsFlags.GranFlags.GranSimStats.Full &&
851 !(( event_type == GR_START || event_type == GR_STARTQ) &&
852 RtsFlags.GranFlags.labelling) )
853 DumpRawGranEvent(proc, creator, event_type+1, tso, node,
854 tso->gran.sparkname, spark_queue_len(proc));
857 # if defined(GRAN_CHECK)
858 /* Check if thread queue is sorted. Only for testing, really! HWL */
859 if ( RtsFlags.GranFlags.DoPriorityScheduling &&
860 (RtsFlags.GranFlags.Debug.sortedQ) ) {
861 rtsBool sorted = rtsTrue;
864 if (run_queue_hds[proc]==END_TSO_QUEUE ||
865 run_queue_hds[proc]->link==END_TSO_QUEUE) {
866 /* just 1 elem => ok */
868 /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
869 for (prev = run_queue_hds[proc]->link, next = prev->link;
870 (next != END_TSO_QUEUE) ;
871 prev = next, next = prev->link) {
872 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
873 (prev==(StgTSO*)NULL || prev->link==next));
875 (prev->gran.pri >= next->gran.pri);
879 fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
881 G_THREADQ(run_queue_hd,0x1);
888 insertThread, which is only used for GranSim Light, is similar to
889 startThread in that it adds a TSO to a thread queue. However, it assumes
890 that the thread queue is sorted by local clocks and it inserts the TSO at
891 the right place in the queue. Don't create any event, just insert.
893 //@cindex GranSimLight_insertThread
895 GranSimLight_insertThread(tso, proc)
901 rtsBool found = rtsFalse;
903 ASSERT(RtsFlags.GranFlags.Light);
905 /* In GrAnSim-Light we always have an idle `virtual' proc.
906 The semantics of the one-and-only thread queue is different here:
907 all threads in the queue are running (each on its own virtual processor);
908 the queue is only needed internally in the simulator to interleave the
909 reductions of the different processors.
910 The one-and-only thread queue is sorted by the local clocks of the TSOs.
912 ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
913 ASSERT(tso->link == END_TSO_QUEUE);
915 /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
916 if (RtsFlags.GranFlags.GranSimStats.Full &&
917 (RtsFlags.GranFlags.Debug.checkLight) &&
918 (run_queue_hd->link == END_TSO_QUEUE)) {
919 DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
920 run_queue_hds[proc], (StgClosure*)NULL,
921 tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
922 // resched = rtsTrue;
925 /* this routine should only be used in a GrAnSim Light setup */
926 /* && CurrentProc must be 0 in GrAnSim Light setup */
927 ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
929 /* Idle proc; same for pri spark and basic version */
930 if (run_queue_hd==END_TSO_QUEUE)
932 run_queue_hd = run_queue_tl = tso;
933 /* MAKE_BUSY(CurrentProc); */
937 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
938 (next != END_TSO_QUEUE) &&
939 !(found = (tso->gran.clock < next->gran.clock));
940 prev = next, next = next->link, count++)
942 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
943 (prev==(StgTSO*)NULL || prev->link==next));
946 /* found can only be rtsTrue if pri sparking enabled */
948 /* Add tso to ThreadQueue between prev and next */
950 if ( next == END_TSO_QUEUE ) {
951 run_queue_tls[proc] = tso;
953 /* no back link for TSO chain */
956 if ( prev == END_TSO_QUEUE ) {
957 run_queue_hds[proc] = tso;
961 } else { /* !found */ /* or not pri sparking! */
962 /* Add TSO to the end of the thread queue on that processor */
963 run_queue_tls[proc]->link = tso;
964 run_queue_tls[proc] = tso;
967 if ( prev == END_TSO_QUEUE ) { /* new head of queue */
968 new_event(proc, proc, CurrentTime[proc],
970 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
973 if (RtsFlags.GranFlags.GranSimStats.Full &&
974 !(( event_type == GR_START || event_type == GR_STARTQ) &&
975 RtsFlags.GranFlags.labelling) )
976 DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
977 tso->gran.sparkname, spark_queue_len(proc));
983 endThread is responsible for general clean-up after the thread tso has
984 finished. This includes emitting statistics into the profile etc.
987 endThread(StgTSO *tso, PEs proc)
989 ASSERT(procStatus[proc]==Busy); // coming straight out of STG land
990 ASSERT(tso->whatNext==ThreadComplete);
991 // ToDo: prune ContinueThreads for this TSO from event queue
992 DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
994 /* if this was the last thread on this PE then make it Idle */
995 if (run_queue_hds[proc]==END_TSO_QUEUE) {
996 procStatus[CurrentProc] = Idle;
1000 //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
1001 //@subsection Thread Queue routines
1004 Check whether given tso resides on the run queue of the current processor.
1005 Only used for debugging.
1008 //@cindex is_on_queue
1010 is_on_queue (StgTSO *tso, PEs proc)
1015 for (t=run_queue_hds[proc], found=rtsFalse;
1016 t!=END_TSO_QUEUE && !(found = t==tso);
1023 /* This routine is only used for keeping a statistics of thread queue
1024 lengths to evaluate the impact of priority scheduling. -- HWL
1025 {spark_queue_len}vo' jInIHta'
1027 //@cindex thread_queue_len
1029 thread_queue_len(PEs proc)
1031 StgTSO *prev, *next;
1034 for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
1035 next != END_TSO_QUEUE;
1036 len++, prev = next, next = prev->link)
1042 //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
1043 //@subsection GranSim functions
1045 /* ----------------------------------------------------------------- */
1046 /* The main event handling functions; called from Schedule.c (schedule) */
1047 /* ----------------------------------------------------------------- */
1049 //@cindex do_the_globalblock
1052 do_the_globalblock(rtsEvent* event)
1054 PEs proc = event->proc; /* proc that requested node */
1055 StgTSO *tso = event->tso; /* tso that requested node */
1056 StgClosure *node = event->node; /* requested, remote node */
1058 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
1059 /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
1060 ASSERT(!RtsFlags.GranFlags.Light);
1061 /* GlobalBlock events only valid with GUM fetching */
1062 ASSERT(RtsFlags.GranFlags.DoBulkFetching);
1064 IF_GRAN_DEBUG(bq, // globalBlock,
1065 if (IS_LOCAL_TO(PROCS(node),proc)) {
1066 belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
1067 tso->id, tso, node, proc);
1070 /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
1071 if ( blockFetch(tso,proc,node) != 0 )
1072 return; /* node has become local by now */
1075 ToDo: check whether anything has to be done at all after blockFetch -- HWL
1077 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
1078 StgTSO* tso = run_queue_hds[proc]; /* awaken next thread */
1079 if (tso != (StgTSO*)NULL) {
1080 new_event(proc, proc, CurrentTime[proc],
1082 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
1083 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
1084 if (RtsFlags.GranFlags.GranSimStats.Full)
1085 DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
1086 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc)); // ToDo: check sparkname and spar_queue_len
1087 procStatus[proc] = Busy; /* might have been fetching */
1089 procStatus[proc] = Idle; /* no work on proc now */
1091 } else { /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
1092 /* other thread is already running */
1093 /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL
1094 new_event(proc,proc,CurrentTime[proc],
1095 CONTINUETHREAD,EVENT_TSO(event),
1096 (RtsFlags.GranFlags.DoBulkFetching ? closure :
1097 EVENT_NODE(event)),NULL);
1103 //@cindex do_the_unblock
1106 do_the_unblock(rtsEvent* event)
1108 PEs proc = event->proc, /* proc that requested node */
1109 creator = event->creator; /* proc that requested node */
1110 StgTSO* tso = event->tso; /* tso that requested node */
1111 StgClosure* node = event->node; /* requested, remote node */
1113 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
1114 /* There should be no UNBLOCKs in GrAnSim Light setup */
1115 ASSERT(!RtsFlags.GranFlags.Light);
1116 /* UnblockThread means either FetchReply has arrived or
1117 a blocking queue has been awakened;
1118 ToDo: check with assertions
1119 ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
1121 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* block-on-fetch */
1122 /* We count block-on-fetch as normal block time */
1123 tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
1124 /* Dumping now done when processing the event
1125 No costs for contextswitch or thread queueing in this case
1126 if (RtsFlags.GranFlags.GranSimStats.Full)
1127 DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso,
1128 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
1130 /* Maybe do this in FetchReply already
1131 if (procStatus[proc]==Fetching)
1132 procStatus[proc] = Busy;
1135 new_event(proc, proc, CurrentTime[proc],
1137 tso, node, (rtsSpark*)NULL);
1140 /* Asynchr comm causes additional costs here: */
1141 /* Bring the TSO from the blocked queue into the threadq */
1143 /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
1144 new_event(proc, proc,
1145 CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
1147 tso, node, (rtsSpark*)NULL);
1150 //@cindex do_the_fetchnode
1153 do_the_fetchnode(rtsEvent* event)
1155 PEs proc = event->proc, /* proc that holds the requested node */
1156 creator = event->creator; /* proc that requested node */
1157 StgTSO* tso = event->tso;
1158 StgClosure* node = event->node; /* requested, remote node */
1159 rtsFetchReturnCode rc;
1161 ASSERT(CurrentProc==proc);
1162 /* There should be no FETCHNODEs in GrAnSim Light setup */
1163 ASSERT(!RtsFlags.GranFlags.Light);
1165 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
1167 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1169 /* ToDo: check whether this is the right place for dumping the event */
1170 if (RtsFlags.GranFlags.GranSimStats.Full)
1171 DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
1174 rc = handleFetchRequest(node, proc, creator, tso);
1175 if (rc == OutOfHeap) { /* trigger GC */
1176 # if defined(GRAN_CHECK) && defined(GRAN)
1177 if (RtsFlags.GcFlags.giveStats)
1178 fprintf(RtsFlags.GcFlags.statsFile,"***** veQ boSwI' PackNearbyGraph(node %p, tso %p (%d))\n",
1179 node, tso, tso->id);
1181 prepend_event(event);
1182 GarbageCollect(GetRoots);
1183 // HWL: ToDo: check whether a ContinueThread has to be issued
1184 // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
1185 # if defined(GRAN_CHECK) && defined(GRAN)
1186 if (RtsFlags.GcFlags.giveStats) {
1187 fprintf(RtsFlags.GcFlags.statsFile,"***** SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
1188 Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED); ???
1189 fprintf(stderr,"***** No. of packets so far: %d (total size: %d)\n",
1190 globalGranStats.tot_packets, globalGranStats.tot_packet_size);
1193 event = grab_event();
1194 // Hp -= PACK_HEAP_REQUIRED; // ???
1196 /* GC knows that events are special and follows the pointer i.e. */
1197 /* events are valid even if they moved. An EXIT is triggered */
1198 /* if there is not enough heap after GC. */
1200 } while (rc == OutOfHeap);
1203 //@cindex do_the_fetchreply
1205 do_the_fetchreply(rtsEvent* event)
1207 PEs proc = event->proc, /* proc that requested node */
1208 creator = event->creator; /* proc that holds the requested node */
1209 StgTSO* tso = event->tso;
1210 StgClosure* node = event->node; /* requested, remote node */
1211 StgClosure* closure=(StgClosure*)NULL;
1213 ASSERT(CurrentProc==proc);
1214 ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
1216 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
1217 /* There should be no FETCHREPLYs in GrAnSim Light setup */
1218 ASSERT(!RtsFlags.GranFlags.Light);
1220 /* assign message unpack costs *before* dumping the event */
1221 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1223 /* ToDo: check whether this is the right place for dumping the event */
1224 if (RtsFlags.GranFlags.GranSimStats.Full)
1225 DumpRawGranEvent(proc, creator, GR_REPLY, tso, node,
1226 tso->gran.sparkname, spark_queue_len(proc));
1228 /* THIS SHOULD NEVER HAPPEN
1229 If tso is in the BQ of node this means that it actually entered the
1230 remote closure, due to a missing GranSimFetch at the beginning of the
1231 entry code; therefore, this is actually a faked fetch, triggered from
1232 within GranSimBlock;
1233 since tso is both in the EVQ and the BQ for node, we have to take it out
1234 of the BQ first before we can handle the FetchReply;
1235 ToDo: special cases in awaken_blocked_queue, since the BQ magically moved.
1237 if (tso->blocked_on!=(StgClosure*)NULL) {
1239 belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
1240 tso->id, tso, node));
1241 // unlink_from_bq(tso, node);
1244 if (RtsFlags.GranFlags.DoBulkFetching) { /* bulk (packet) fetching */
1245 rtsPackBuffer *buffer = (rtsPackBuffer*)node;
1246 nat size = buffer->size;
1248 /* NB: Fetch misses can't occur with GUM fetching, as */
1249 /* updatable closure are turned into RBHs and therefore locked */
1250 /* for other processors that try to grab them. */
1252 closure = UnpackGraph(buffer);
1253 CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
1254 } else // incremental fetching
1255 /* Copy or move node to CurrentProc */
1256 if (fetchNode(node, creator, proc)) {
1257 /* Fetch has failed i.e. node has been grabbed by another PE */
1258 PEs p = where_is(node);
1261 if (RtsFlags.GranFlags.GranSimStats.Global)
1262 globalGranStats.fetch_misses++;
1264 IF_GRAN_DEBUG(thunkStealing,
1265 belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
1266 CurrentTime[proc],node,p,creator));
1268 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
1270 /* Count fetch again !? */
1271 ++(tso->gran.fetchcount);
1272 tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1274 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
1275 RtsFlags.GranFlags.Costs.latency;
1277 /* Chase the grabbed node */
1278 new_event(p, proc, fetchtime,
1280 tso, node, (rtsSpark*)NULL);
1282 # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
1283 IF_GRAN_DEBUG(blockOnFetch,
1284 BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
1286 IF_GRAN_DEBUG(blockOnFetch_sanity,
1287 tso->type |= FETCH_MASK_TSO;)
1290 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
1292 return; /* NB: no REPLy has been processed; tso still sleeping */
1295 /* -- Qapla'! Fetch has been successful; node is here, now */
1296 ++(event->tso->gran.fetchcount);
1297 event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1299 /* this is now done at the beginning of this routine
1300 if (RtsFlags.GranFlags.GranSimStats.Full)
1301 DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
1302 (RtsFlags.GranFlags.DoBulkFetching ?
1305 tso->gran.sparkname, spark_queue_len(proc));
1308 --OutstandingFetches[proc];
1309 ASSERT(OutstandingFetches[proc] >= 0);
1310 new_event(proc, proc, CurrentTime[proc],
1312 event->tso, (RtsFlags.GranFlags.DoBulkFetching ?
1318 //@cindex do_the_movethread
1321 do_the_movethread(rtsEvent* event) {
1322 PEs proc = event->proc, /* proc that requested node */
1323 creator = event->creator; /* proc that holds the requested node */
1324 StgTSO* tso = event->tso;
1326 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
1328 ASSERT(CurrentProc==proc);
1329 /* There should be no MOVETHREADs in GrAnSim Light setup */
1330 ASSERT(!RtsFlags.GranFlags.Light);
1331 /* MOVETHREAD events should never occur without -bM */
1332 ASSERT(RtsFlags.GranFlags.DoThreadMigration);
1333 /* Bitmask of moved thread should be 0 */
1334 ASSERT(PROCS(tso)==0);
1335 ASSERT(procStatus[proc] == Fishing ||
1336 RtsFlags.GranFlags.DoAsyncFetch);
1337 ASSERT(OutstandingFishes[proc]>0);
1339 /* ToDo: exact costs for unpacking the whole TSO */
1340 CurrentTime[proc] += 5l * RtsFlags.GranFlags.Costs.munpacktime;
1342 /* ToDo: check whether this is the right place for dumping the event */
1343 if (RtsFlags.GranFlags.GranSimStats.Full)
1344 DumpRawGranEvent(proc, creator,
1345 GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
1347 // ToDo: check cost functions
1348 --OutstandingFishes[proc];
1349 SET_GRAN_HDR(tso, ThisPE); // adjust the bitmask for the TSO
1350 insertThread(tso, proc);
1352 if (procStatus[proc]==Fishing)
1353 procStatus[proc] = Idle;
1355 if (RtsFlags.GranFlags.GranSimStats.Global)
1356 globalGranStats.tot_TSOs_migrated++;
1359 //@cindex do_the_movespark
1362 do_the_movespark(rtsEvent* event) {
1363 PEs proc = event->proc, /* proc that requested spark */
1364 creator = event->creator; /* proc that holds the requested spark */
1365 StgTSO* tso = event->tso;
1366 rtsSparkQ spark = event->spark;
1368 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
1370 ASSERT(CurrentProc==proc);
1371 ASSERT(spark!=NULL);
1372 ASSERT(procStatus[proc] == Fishing ||
1373 RtsFlags.GranFlags.DoAsyncFetch);
1374 ASSERT(OutstandingFishes[proc]>0);
1376 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1378 /* record movement of spark only if spark profiling is turned on */
1379 if (RtsFlags.GranFlags.GranSimStats.Sparks)
1380 DumpRawGranEvent(proc, creator,
1382 tso, spark->node, spark->name, spark_queue_len(proc));
1384 /* global statistics */
1385 if ( RtsFlags.GranFlags.GranSimStats.Global &&
1386 !closure_SHOULD_SPARK(spark->node))
1387 globalGranStats.withered_sparks++;
1388 /* Not adding the spark to the spark queue would be the right */
1389 /* thing here, but it also would be cheating, as this info can't be */
1390 /* available in a real system. -- HWL */
1392 --OutstandingFishes[proc];
1394 add_to_spark_queue(spark);
1396 IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
1397 print_sparkq_stats());
1399 /* Should we treat stolen sparks specially? Currently, we don't. */
1401 if (procStatus[proc]==Fishing)
1402 procStatus[proc] = Idle;
1404 /* add_to_spark_queue will increase the time of the current proc. */
1406 If proc was fishing, it is Idle now with the new spark in its spark
1407 pool. This means that the next time handleIdlePEs is called, a local
1408 FindWork will be created on this PE to turn the spark into a thread. Of
1409 course another PE might steal the spark in the meantime (that's why we
1410 are using events rather than inlining all the operations in the first
1415 In the Constellation class version of GranSim the semantics of StarThread
1416 events has changed. Now, StartThread has to perform 3 basic operations:
1417 - create a new thread (previously this was done in ActivateSpark);
1418 - insert the thread into the run queue of the current processor
1419 - generate a new event for actually running the new thread
1420 Note that the insertThread is called via createThread.
1423 //@cindex do_the_startthread
1426 do_the_startthread(rtsEvent *event)
1428 PEs proc = event->proc; /* proc that requested node */
1429 StgTSO *tso = event->tso; /* tso that requested node */
1430 StgClosure *node = event->node; /* requested, remote node */
1431 rtsSpark *spark = event->spark;
1432 GranEventType gr_evttype;
1434 ASSERT(CurrentProc==proc);
1435 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1436 ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
1437 /* if this was called via StartThread: */
1438 ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
1439 // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
1440 /* if this was called via ResumeThread: */
1441 ASSERT(event->evttype!=ResumeThread ||
1442 RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc));
1444 /* startThread may have been called from the main event handler upon
1445 finding either a ResumeThread or a StartThread event; set the
1446 gr_evttype (needed for writing to .gr file) accordingly */
1447 // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
1449 if ( event->evttype == StartThread ) {
1450 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1451 GR_START : GR_STARTQ;
1453 tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
1454 pushClosure(tso, node);
1456 // ToDo: fwd info on local/global spark to thread -- HWL
1457 // tso->gran.exported = spark->exported;
1458 // tso->gran.locked = !spark->global;
1459 tso->gran.sparkname = spark->name;
1461 ASSERT(CurrentProc==proc);
1462 if (RtsFlags.GranFlags.GranSimStats.Full)
1463 DumpGranEvent(gr_evttype,tso);
1465 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
1466 } else { // event->evttype == ResumeThread
1467 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1468 GR_RESUME : GR_RESUMEQ;
1470 insertThread(tso, proc);
1472 ASSERT(CurrentProc==proc);
1473 if (RtsFlags.GranFlags.GranSimStats.Full)
1474 DumpGranEvent(gr_evttype,tso);
1477 ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
1478 procStatus[proc] = Busy;
1479 /* make sure that this thread is actually run */
1480 new_event(proc, proc,
1483 tso, node, (rtsSpark*)NULL);
1485 /* A wee bit of statistics gathering */
1486 if (RtsFlags.GranFlags.GranSimStats.Global) {
1487 globalGranStats.tot_add_threads++;
1488 globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
1493 //@cindex do_the_findwork
1495 do_the_findwork(rtsEvent* event)
1497 PEs proc = event->proc, /* proc to search for work */
1498 creator = event->creator; /* proc that requested work */
1499 rtsSparkQ spark = event->spark;
1500 /* ToDo: check that this size is safe -- HWL */
1501 nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
1502 // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
1504 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
1506 /* If GUM style fishing is enabled, the contents of the spark field says
1507 what to steal (spark(1) or thread(2)); */
1508 ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
1510 /* Make sure that we have enough heap for creating a new
1511 thread. This is a conservative estimate of the required heap.
1512 This eliminates special checks for GC around NewThread within
1515 if (Hp + req_heap > HpLim ) {
1517 belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
1518 GarbageCollect(GetRoots);
1519 // ReallyPerformThreadGC(req_heap, rtsFalse); old -- HWL
1521 if (procStatus[CurrentProc]==Sparking)
1522 procStatus[CurrentProc]=Idle;
1526 if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1527 RtsFlags.GranFlags.Fishing ||
1528 ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
1529 (RtsFlags.GranFlags.FetchStrategy >= 2 ||
1530 OutstandingFetches[proc] == 0)) )
1533 rtsSparkQ prev, spark;
1536 ASSERT(procStatus[proc]==Sparking ||
1537 RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1538 RtsFlags.GranFlags.Fishing);
1540 /* SImmoHwI' yInej! Search spark queue! */
1541 /* gimme_spark (event, &found, &spark); */
1542 findLocalSpark(event, &found, &spark);
1544 if (!found) { /* pagh vumwI' */
1546 If no spark has been found this can mean 2 things:
1547 1/ The FindWork was a fish (i.e. a message sent by another PE) and
1548 the spark pool of the receiver is empty
1549 --> the fish has to be forwarded to another PE
1550 2/ The FindWork was local to this PE (i.e. no communication; in this
1551 case creator==proc) and the spark pool of the PE is not empty
1552 contains only sparks of closures that should not be sparked
1553 (note: if the spark pool were empty, handleIdlePEs wouldn't have
1554 generated a FindWork in the first place)
1555 --> the PE has to be made idle to trigger stealing sparks the next
1556 time handleIdlePEs is performed
1559 ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
1560 if (creator==proc) {
1561 /* local FindWork */
1562 if (procStatus[proc]==Busy) {
1563 belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
1564 proc, CurrentTime[proc]);
1565 procStatus[proc] = Idle;
1568 /* global FindWork i.e. a Fish */
1569 ASSERT(RtsFlags.GranFlags.Fishing);
1570 /* actually this generates another request from the originating PE */
1571 ASSERT(OutstandingFishes[creator]>0);
1572 OutstandingFishes[creator]--;
1573 /* ToDo: assign costs for sending fish to proc not to creator */
1574 stealSpark(creator); /* might steal from same PE; ToDo: fix */
1575 ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
1576 /* any assertions on state of proc possible here? */
1579 /* DaH chu' Qu' yIchen! Now create new work! */
1580 IF_GRAN_DEBUG(findWork,
1581 belch("+- munching spark %p; creating thread for node %p",
1582 spark, spark->node));
1583 activateSpark (event, spark);
1584 ASSERT(spark != (rtsSpark*)NULL);
1585 spark = delete_from_sparkq (spark, proc, rtsTrue);
1588 IF_GRAN_DEBUG(findWork,
1589 belch("+- Contents of spark queues at the end of FindWork @ %lx",
1591 print_sparkq_stats());
1593 /* ToDo: check ; not valid if GC occurs in ActivateSpark */
1595 /* forward fish or */
1597 /* local spark or */
1598 (proc==creator && procStatus[proc]==Starting)) ||
1599 //(!found && procStatus[proc]==Idle) ||
1600 RtsFlags.GranFlags.DoAlwaysCreateThreads);
1602 IF_GRAN_DEBUG(findWork,
1603 belch("+- RTS refuses to findWork on PE %d @ %lx",
1604 proc, CurrentTime[proc]);
1605 belch(" procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d",
1606 proc, proc_status_names[procStatus[proc]],
1607 RtsFlags.GranFlags.FetchStrategy,
1608 proc, OutstandingFetches[proc]));
1612 //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
1613 //@subsection GranSimLight routines
1616 This code is called from the central scheduler after having rgabbed a
1617 new event and is only needed for GranSim-Light. It mainly adjusts the
1618 ActiveTSO so that all costs that have to be assigned from within the
1619 scheduler are assigned to the right TSO. The choice of ActiveTSO depends
1620 on the type of event that has been found.
1624 GranSimLight_enter_system(event, ActiveTSOp)
1626 StgTSO **ActiveTSOp;
1628 StgTSO *ActiveTSO = *ActiveTSOp;
1630 ASSERT (RtsFlags.GranFlags.Light);
1632 /* Restore local clock of the virtual processor attached to CurrentTSO.
1633 All costs will be associated to the `virt. proc' on which the tso
1635 if (ActiveTSO != NULL) { /* already in system area */
1636 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1637 if (RtsFlags.GranFlags.DoFairSchedule)
1639 if (RtsFlags.GranFlags.GranSimStats.Full &&
1640 RtsFlags.GranFlags.Debug.checkLight)
1641 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1644 switch (event->evttype)
1646 case ContinueThread:
1647 case FindWork: /* inaccurate this way */
1648 ActiveTSO = run_queue_hd;
1652 case MoveSpark: /* has tso of virt proc in tso field of event */
1653 ActiveTSO = event->tso;
1655 default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
1656 event_names[event->evttype],event->evttype);
1658 CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
1659 if (RtsFlags.GranFlags.DoFairSchedule) {
1660 if (RtsFlags.GranFlags.GranSimStats.Full &&
1661 RtsFlags.GranFlags.Debug.checkLight)
1662 DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
1667 GranSimLight_leave_system(event, ActiveTSOp)
1669 StgTSO **ActiveTSOp;
1671 StgTSO *ActiveTSO = *ActiveTSOp;
1673 ASSERT(RtsFlags.GranFlags.Light);
1675 /* Save time of `virt. proc' which was active since last getevent and
1676 restore time of `virt. proc' where CurrentTSO is living on. */
1677 if(RtsFlags.GranFlags.DoFairSchedule) {
1678 if (RtsFlags.GranFlags.GranSimStats.Full &&
1679 RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
1680 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1682 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1683 ActiveTSO = (StgTSO*)NULL;
1684 CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
1685 if (RtsFlags.GranFlags.DoFairSchedule /* && resched */ ) {
1686 // resched = rtsFalse;
1687 if (RtsFlags.GranFlags.GranSimStats.Full &&
1688 RtsFlags.GranFlags.Debug.checkLight)
1689 DumpGranEvent(GR_SCHEDULE,run_queue_hd);
1692 if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
1693 (TimeOfNextEvent == 0 ||
1694 TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
1695 new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
1696 CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
1697 TimeOfNextEvent = get_time_of_next_event();
1702 //@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
1703 //@subsection Code for Fetching Nodes
1706 The following GrAnSim routines simulate the fetching of nodes from a
1707 remote processor. We use a 1 word bitmask to indicate on which processor
1708 a node is lying. Thus, moving or copying a node from one processor to
1709 another just requires an appropriate change in this bitmask (using
1710 @SET_GA@). Additionally, the clocks have to be updated.
1712 A special case arises when the node that is needed by processor A has
1713 been moved from a processor B to a processor C between sending out a
1714 @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
1715 be forwarded to C. This is simulated by issuing another FetchNode event
1716 on processor C with A as creator.
1719 /* ngoqvam che' {GrAnSim}! */
1721 /* Fetch node "node" to processor "p" */
1726 fetchNode(node,from,to)
1730 /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be
1731 entered! Instead, UnpackGraph is used in ReSchedule */
1732 StgClosure* closure;
1734 ASSERT(to==CurrentProc);
1735 /* Should never be entered in GrAnSim Light setup */
1736 ASSERT(!RtsFlags.GranFlags.Light);
1737 /* fetchNode should never be entered with DoBulkFetching */
1738 ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
1740 /* Now fetch the node */
1741 if (!IS_LOCAL_TO(PROCS(node),from) &&
1742 !IS_LOCAL_TO(PROCS(node),to) )
1743 return NodeHasMoved;
1745 if (closure_HNF(node)) /* node already in head normal form? */
1746 node->header.gran.procs |= PE_NUMBER(to); /* Copy node */
1748 node->header.gran.procs = PE_NUMBER(to); /* Move node */
1754 Process a fetch request.
1756 Cost of sending a packet of size n = C + P*n
1757 where C = packet construction constant,
1758 P = cost of packing one word into a packet
1759 [Should also account for multiple packets].
1762 //@cindex handleFetchRequest
1765 handleFetchRequest(node,to,from,tso)
1766 StgClosure* node; // the node which is requested
1767 PEs to, from; // fetch request: from -> to
1768 StgTSO* tso; // the tso which needs the node
1770 ASSERT(!RtsFlags.GranFlags.Light);
1771 /* ToDo: check assertion */
1772 ASSERT(OutstandingFetches[from]>0);
1774 /* probably wrong place; */
1775 ASSERT(CurrentProc==to);
1777 if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
1779 IF_GRAN_DEBUG(thunkStealing,
1780 fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n",
1781 node, info_type(node), from));
1783 if (RtsFlags.GranFlags.DoBulkFetching) {
1785 rtsPackBuffer *graph;
1787 /* Create a 1-node-buffer and schedule a FETCHREPLY now */
1788 graph = PackOneNode(node, tso, &size);
1789 new_event(from, to, CurrentTime[to],
1791 tso, graph, (rtsSpark*)NULL);
1793 new_event(from, to, CurrentTime[to],
1795 tso, node, (rtsSpark*)NULL);
1797 IF_GRAN_DEBUG(thunkStealing,
1798 belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
1799 return (NodeIsLocal);
1801 else if (IS_LOCAL_TO(PROCS(node), to) ) /* Is node still here? */
1803 if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
1804 nat size; /* (code from GUM) */
1807 if (IS_BLACK_HOLE(node)) { /* block on BH or RBH */
1808 new_event(from, to, CurrentTime[to],
1810 tso, node, (rtsSpark*)NULL);
1811 /* Note: blockFetch is done when handling GLOBALBLOCK event;
1812 make sure the TSO stays out of the run queue */
1813 /* When this thread is reawoken it does the usual: it tries to
1814 enter the updated node and issues a fetch if it's remote.
1815 It has forgotten that it has sent a fetch already (i.e. a
1816 FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
1817 --OutstandingFetches[from];
1819 IF_GRAN_DEBUG(thunkStealing,
1820 belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ",
1822 if (RtsFlags.GranFlags.GranSimStats.Global) {
1823 globalGranStats.tot_FMBQs++;
1828 /* The tso requesting the node is blocked and cannot be on a run queue */
1829 ASSERT(!is_on_queue(tso, from));
1831 if ((graph = PackNearbyGraph(node, tso, &size)) == NULL)
1832 return (OutOfHeap); /* out of heap */
1834 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1835 /* Send a reply to the originator */
1836 /* ToDo: Replace that by software costs for doing graph packing! */
1837 CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
1840 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1842 tso, (StgClosure *)graph, (rtsSpark*)NULL);
1844 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1846 } else { /* incremental (single closure) fetching */
1847 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1848 /* Send a reply to the originator */
1849 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1852 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1854 tso, node, (rtsSpark*)NULL);
1856 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1860 else /* Qu'vatlh! node has been grabbed by another proc => forward */
1862 PEs node_loc = where_is(node);
1865 IF_GRAN_DEBUG(thunkStealing,
1866 belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
1867 node,node_loc,to,from,CurrentTime[to]));
1868 if (RtsFlags.GranFlags.GranSimStats.Global) {
1869 globalGranStats.fetch_misses++;
1872 /* Prepare FORWARD message to proc p_new */
1873 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1875 fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
1876 RtsFlags.GranFlags.Costs.latency;
1878 new_event(node_loc, from, fetchtime,
1880 tso, node, (rtsSpark*)NULL);
1882 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1884 return (NodeHasMoved);
1889 blockFetch blocks a BlockedFetch node on some kind of black hole.
1891 Taken from gum/HLComms.lc. [find a better place for that ?] -- HWL
1893 {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
1894 create @FMBQ@'s (FetchMe blocking queues) to cope with global
1895 blocking. Instead, non-local TSO are put into the BQ in the same way as
1896 local TSOs. However, we have to check if a TSO is local or global in
1897 order to account for the latencies involved and for keeping track of the
1898 number of fetches that are really going on.
1901 //@cindex blockFetch
1904 blockFetch(tso, proc, bh)
1905 StgTSO* tso; /* TSO which gets blocked */
1906 PEs proc; /* PE where that tso was running */
1907 StgClosure* bh; /* closure to block on (BH, RBH, BQ) */
1912 fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n",
1913 tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
1915 if (!IS_BLACK_HOLE(bh)) { /* catches BHs and RBHs */
1917 fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n",
1918 bh, info_type(bh), tso, tso->id, proc));
1920 /* No BH anymore => immediately unblock tso */
1921 new_event(proc, proc, CurrentTime[proc],
1923 tso, bh, (rtsSpark*)NULL);
1925 /* Is this always a REPLY to a FETCH in the profile ? */
1926 if (RtsFlags.GranFlags.GranSimStats.Full)
1927 DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
1928 return (NodeIsNoBH);
1931 /* DaH {BQ}Daq Qu' Suq 'e' wISov!
1932 Now we know that we have to put the tso into the BQ.
1933 2 cases: If block-on-fetch, tso is at head of threadq =>
1934 => take it out of threadq and into BQ
1935 If reschedule-on-fetch, tso is only pointed to be event
1936 => just put it into BQ
1939 if (!RtsFlags.GranFlags.DoAsyncFetch) {
1940 GranSimBlock(tso, proc, bh);
1942 if (RtsFlags.GranFlags.GranSimStats.Full)
1943 DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
1944 ++(tso->gran.blockcount);
1945 tso->gran.blockedat = CurrentTime[proc];
1949 /* after scheduling the GlobalBlock event the TSO is not put into the
1950 run queue again; it is only pointed to via the event we are
1951 processing now; in GranSim 4.xx there is no difference between
1952 synchr and asynchr comm here */
1953 ASSERT(!is_on_queue(tso, proc));
1954 ASSERT(tso->link == END_TSO_QUEUE);
1956 GranSimBlock(tso, proc, bh); /* GranSim statistics gathering */
1958 /* Now, put tso into BQ (similar to blocking entry codes) */
1959 info = get_itbl(bh);
1960 switch (info -> type) {
1963 case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1964 case SE_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1965 case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
1966 /* basically an inlined version of BLACKHOLE_entry -- HWL */
1967 /* Change the BLACKHOLE into a BLACKHOLE_BQ */
1968 ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1969 /* Put ourselves on the blocking queue for this black hole */
1970 // tso->link=END_TSO_QUEUE; not necessary; see assertion above
1971 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1972 tso->blocked_on = bh;
1973 recordMutable((StgMutClosure *)bh);
1977 /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
1978 tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue);
1979 ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1980 recordMutable((StgMutClosure *)bh);
1982 # if 0 && defined(GC_MUT_REQUIRED)
1983 ToDo: check whether recordMutable is necessary -- HWL
1985 * If we modify a black hole in the old generation, we have to make
1986 * sure it goes on the mutables list
1989 if (bh <= StorageMgrInfo.OldLim) {
1990 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
1991 StorageMgrInfo.OldMutables = bh;
1993 MUT_LINK(bh) = MUT_NOT_LINKED;
1998 barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
2004 barf("Qagh: thought %p was a black hole (IP %p (%s))",
2005 bh, info, info_type(get_itbl(bh)));
2012 //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
2013 //@subsection Idle PEs
2016 Export work to idle PEs. This function is called from @ReSchedule@
2017 before dispatching on the current event. @HandleIdlePEs@ iterates over
2018 all PEs, trying to get work for idle PEs. Note, that this is a
2019 simplification compared to GUM's fishing model. We try to compensate for
2020 that by making the cost for stealing work dependent on the number of
2021 idle processors and thereby on the probability with which a randomly
2022 sent fish would find work.
2025 //@cindex handleIdlePEs
2032 IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
2034 /* Should never be entered in GrAnSim Light setup */
2035 ASSERT(!RtsFlags.GranFlags.Light);
2037 /* Could check whether there are idle PEs if it's a cheap check */
2038 for (p = 0; p < RtsFlags.GranFlags.proc; p++)
2039 if (procStatus[p]==Idle) /* && IS_SPARKING(p) && IS_STARTING(p) */
2040 /* First look for local work i.e. examine local spark pool! */
2041 if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
2042 new_event(p, p, CurrentTime[p],
2044 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2045 procStatus[p] = Sparking;
2046 } else if ((RtsFlags.GranFlags.maxFishes==0 ||
2047 OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
2049 /* If no local work then try to get remote work!
2050 Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
2051 if (RtsFlags.GranFlags.DoStealThreadsFirst &&
2052 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
2054 if (SurplusThreads > 0l) /* Steal a thread */
2057 if (procStatus[p]!=Idle)
2061 if (SparksAvail > 0 &&
2062 (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
2065 if (SurplusThreads > 0 &&
2066 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
2072 Steal a spark and schedule moving it to proc. We want to look at PEs in
2073 clock order -- most retarded first. Currently sparks are only stolen
2074 from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
2075 this should be changed to first steal from the former then from the
2078 We model a sort of fishing mechanism by counting the number of sparks
2079 and threads we are currently stealing. */
2082 Return a random nat value in the intervall [from, to)
2092 /* random returns a value in [0, RAND_MAX] */
2093 r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
2094 r = (r==to) ? from : r;
2095 ASSERT(from<=r && (r<to || from==to));
2100 Find any PE other than proc. Used for GUM style fishing only.
2108 ASSERT(RtsFlags.GranFlags.Fishing);
2109 if (RtsFlags.GranFlags.RandomSteal) {
2110 p = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2114 IF_GRAN_DEBUG(randomSteal,
2115 belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
2122 Magic code for stealing sparks/threads makes use of global knowledge on
2126 sortPEsByTime (proc, pes_by_time, firstp, np)
2131 PEs p, temp, n, i, j;
2132 nat first, upb, r=0, q=0;
2134 ASSERT(!RtsFlags.GranFlags.Fishing);
2137 upb = RtsFlags.GranFlags.proc; /* full range of PEs */
2139 if (RtsFlags.GranFlags.RandomSteal) {
2140 r = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2146 /* pes_by_time shall contain processors from which we may steal sparks */
2147 for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
2148 if ((proc != p) && // not the current proc
2149 (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
2150 (CurrentTime[p] <= CurrentTime[CurrentProc]))
2151 pes_by_time[n++] = p;
2153 /* sort pes_by_time */
2154 for(i=0; i < n; ++i)
2155 for(j=i+1; j < n; ++j)
2156 if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
2157 rtsTime temp = pes_by_time[i];
2158 pes_by_time[i] = pes_by_time[j];
2159 pes_by_time[j] = temp;
2162 /* Choose random processor to steal spark from; first look at processors */
2163 /* that are earlier than the current one (i.e. proc) */
2165 (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
2169 /* if the assertion below is true we can get rid of first */
2170 /* ASSERT(first==n); */
2171 /* ToDo: check if first is really needed; find cleaner solution */
2178 Steal a spark (piece of work) from any processor and bring it to proc.
2180 //@cindex stealSpark
2181 static inline rtsBool
2182 stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
2185 Steal a thread from any processor and bring it to proc i.e. thread migration
2187 //@cindex stealThread
2188 static inline rtsBool
2189 stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
2192 Steal a spark or a thread and schedule moving it to proc.
2194 //@cindex stealSomething
2196 stealSomething(proc, steal_spark, steal_thread)
2197 PEs proc; // PE that needs work (stealer)
2198 rtsBool steal_spark, steal_thread; // should a spark and/or thread be stolen
2201 rtsTime fish_arrival_time;
2202 rtsSpark *spark, *prev, *next;
2203 rtsBool stolen = rtsFalse;
2205 ASSERT(steal_spark || steal_thread);
2207 /* Should never be entered in GrAnSim Light setup */
2208 ASSERT(!RtsFlags.GranFlags.Light);
2209 ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
2211 if (!RtsFlags.GranFlags.Fishing) {
2212 // ToDo: check if stealing threads is prefered over stealing sparks
2214 if (stealSparkMagic(proc))
2216 else // no spark found
2218 return stealThreadMagic(proc);
2219 else // no thread found
2221 } else { // ASSERT(steal_thread);
2222 return stealThreadMagic(proc);
2224 barf("stealSomething: never reached");
2227 /* The rest of this function does GUM style fishing */
2229 p = findRandomPE(proc); /* find a random PE other than proc */
2231 /* Message packing costs for sending a Fish; qeq jabbI'ID */
2232 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
2234 /* use another GranEvent for requesting a thread? */
2235 if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
2236 DumpRawGranEvent(p, proc, SP_REQUESTED,
2237 (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
2239 /* time of the fish arrival on the remote PE */
2240 fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
2242 /* Phps use an own Fish event for that? */
2243 /* The contents of the spark component is a HACK:
2244 1 means give me a spark;
2245 2 means give me a thread
2246 0 means give me nothing (this should never happen)
2248 new_event(p, proc, fish_arrival_time,
2250 (StgTSO*)NULL, (StgClosure*)NULL,
2251 (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
2253 ++OutstandingFishes[proc];
2254 /* only with Async fetching? */
2255 if (procStatus[proc]==Idle)
2256 procStatus[proc]=Fishing;
2258 /* time needed to clean up buffers etc after sending a message */
2259 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
2261 /* If GUM style fishing stealing always succeeds because it only consists
2262 of sending out a fish; of course, when the fish may return
2268 This version of stealing a spark makes use of the global info on all
2269 spark pools etc which is not available in a real parallel system.
2270 This could be extended to test e.g. the impact of perfect load information.
2272 //@cindex stealSparkMagic
2274 stealSparkMagic(proc)
2277 PEs p, i, j, n, first, upb;
2278 rtsSpark *spark, *next;
2279 PEs pes_by_time[MAX_PROC];
2280 rtsBool stolen = rtsFalse;
2283 /* Should never be entered in GrAnSim Light setup */
2284 ASSERT(!RtsFlags.GranFlags.Light);
2286 sortPEsByTime(proc, pes_by_time, &first, &n);
2288 while (!stolen && n>0) {
2289 upb = (first==0) ? n : first;
2290 i = natRandom(0,upb); /* choose a random eligible PE */
2293 IF_GRAN_DEBUG(randomSteal,
2294 belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
2297 ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
2299 /* Now go through rtsSparkQ and steal the first eligible spark */
2301 spark = pending_sparks_hds[p];
2302 while (!stolen && spark != (rtsSpark*)NULL)
2304 /* NB: no prev pointer is needed here because all sparks that are not
2307 if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
2308 spark->next==(rtsSpark*)NULL)
2310 /* Be social! Don't steal the only spark of an idle processor
2311 not {spark} neH yInIH !! */
2312 break; /* next PE */
2314 else if (closure_SHOULD_SPARK(spark->node))
2316 /* Don't Steal local sparks;
2317 ToDo: optionally prefer local over global sparks
2318 if (!spark->global) {
2320 continue; next spark
2323 /* found a spark! */
2325 /* Prepare message for sending spark */
2326 CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
2328 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2329 DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
2330 (StgTSO*)NULL, spark->node,
2331 spark->name, spark_queue_len(p));
2333 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2338 new_event(proc, p /* CurrentProc */, stealtime,
2340 (StgTSO*)NULL, spark->node, spark);
2343 ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
2344 if (procStatus[proc]==Idle)
2345 procStatus[proc] = Fishing;
2346 ++(spark->global); /* record that this is a global spark */
2347 ASSERT(SparksAvail>0);
2348 --SparksAvail; /* on-the-fly sparks are not available */
2349 next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
2350 CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
2352 else /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
2354 IF_GRAN_DEBUG(checkSparkQ,
2355 belch("^^ pruning spark %p (node %p) in stealSparkMagic",
2356 spark, spark->node));
2358 /* if the spark points to a node that should not be sparked,
2359 prune the spark queue at this point */
2360 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2361 DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
2362 (StgTSO*)NULL, spark->node,
2363 spark->name, spark_queue_len(p));
2364 if (RtsFlags.GranFlags.GranSimStats.Global)
2365 globalGranStats.pruned_sparks++;
2367 ASSERT(SparksAvail>0);
2369 spark = delete_from_sparkq(spark, p, rtsTrue);
2371 /* unlink spark (may have been freed!) from sparkq;
2372 if (prev == NULL) // spark was head of spark queue
2373 pending_sparks_hds[p] = spark->next;
2375 prev->next = spark->next;
2376 if (spark->next == NULL)
2377 pending_sparks_tls[p] = prev;
2381 } /* while ... iterating over sparkq */
2383 /* ToDo: assert that PE p still has work left after stealing the spark */
2385 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2386 ASSERT(pes_by_time[i]==p);
2388 /* remove p from the list (at pos i) */
2389 for (j=i; j+1<n; j++)
2390 pes_by_time[j] = pes_by_time[j+1];
2393 /* update index to first proc which is later (or equal) than proc */
2396 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2400 } /* while ... iterating over PEs in pes_by_time */
2402 IF_GRAN_DEBUG(randomSteal,
2404 belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2405 spark, spark->node, proc, p,
2406 SparksAvail, idlers());
2408 belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
2409 proc, SparksAvail, idlers()));
2411 if (RtsFlags.GranFlags.GranSimStats.Global &&
2412 stolen && (i!=0)) { /* only for statistics */
2413 globalGranStats.rs_sp_count++;
2414 globalGranStats.ntimes_total += n;
2415 globalGranStats.fl_total += first;
2416 globalGranStats.no_of_steals++;
2423 The old stealThread code, which makes use of global info and does not
2425 NB: most of this is the same as in stealSparkMagic;
2426 only the pieces specific to processing thread queues are different;
2427 long live polymorphism!
2430 //@cindex stealThreadMagic
2432 stealThreadMagic(proc)
2435 PEs p, i, j, n, first, upb;
2437 PEs pes_by_time[MAX_PROC];
2438 rtsBool stolen = rtsFalse;
2441 /* Should never be entered in GrAnSim Light setup */
2442 ASSERT(!RtsFlags.GranFlags.Light);
2444 sortPEsByTime(proc, pes_by_time, &first, &n);
2446 while (!stolen && n>0) {
2447 upb = (first==0) ? n : first;
2448 i = natRandom(0,upb); /* choose a random eligible PE */
2451 IF_GRAN_DEBUG(randomSteal,
2452 belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
2455 /* Steal the first exportable thread in the runnable queue but
2456 never steal the first in the queue for social reasons;
2457 not Qu' wa'DIch yInIH !!
2459 /* Would be better to search through queue and have options which of
2460 the threads to pick when stealing */
2461 if (run_queue_hds[p] == END_TSO_QUEUE) {
2462 IF_GRAN_DEBUG(randomSteal,
2463 belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)",
2466 tso = run_queue_hds[p]->link; /* tso is *2nd* thread in thread queue */
2470 /* update links in queue */
2471 run_queue_hds[p]->link = tso->link;
2472 if (run_queue_tls[p] == tso)
2473 run_queue_tls[p] = run_queue_hds[p];
2475 /* ToDo: Turn magic constants into params */
2477 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
2479 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2483 + 4l * RtsFlags.GranFlags.Costs.additional_latency
2484 + 5l * RtsFlags.GranFlags.Costs.munpacktime;
2486 /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
2487 SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */);
2489 /* Move from one queue to another */
2490 new_event(proc, p, stealtime,
2492 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
2494 /* MAKE_BUSY(proc); not yet; only when thread is in threadq */
2495 ++OutstandingFishes[proc];
2496 if (procStatus[proc])
2497 procStatus[proc] = Fishing;
2500 if(RtsFlags.GranFlags.GranSimStats.Full)
2501 DumpRawGranEvent(p, proc,
2503 tso, (StgClosure*)NULL, (StgInt)0, 0);
2505 /* costs for tidying up buffer after having sent it */
2506 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
2509 /* ToDo: assert that PE p still has work left after stealing the spark */
2511 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2512 ASSERT(pes_by_time[i]==p);
2514 /* remove p from the list (at pos i) */
2515 for (j=i; j+1<n; j++)
2516 pes_by_time[j] = pes_by_time[j+1];
2519 /* update index to first proc which is later (or equal) than proc */
2522 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2526 } /* while ... iterating over PEs in pes_by_time */
2528 IF_GRAN_DEBUG(randomSteal,
2530 belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2531 tso->id, tso, proc, p,
2532 SparksAvail, idlers());
2534 belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
2535 proc, SparksAvail, idlers()));
2537 if (RtsFlags.GranFlags.GranSimStats.Global &&
2538 stolen && (i!=0)) { /* only for statistics */
2539 /* ToDo: more statistics on avg thread queue lenght etc */
2540 globalGranStats.rs_t_count++;
2541 globalGranStats.no_of_migrates++;
2547 //@cindex sparkStealTime
2549 sparkStealTime(void)
2551 double fishdelay, sparkdelay, latencydelay;
2552 fishdelay = (double)RtsFlags.GranFlags.proc/2;
2553 sparkdelay = fishdelay -
2554 ((fishdelay-1)/(double)(RtsFlags.GranFlags.proc-1))*(double)idlers();
2555 latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
2557 return((rtsTime)latencydelay);
2560 //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
2561 //@subsection Routines directly called from Haskell world
2563 The @GranSim...@ routines in here are directly called via macros from the
2566 First some auxiliary routines.
2569 /* Take the current thread off the thread queue and thereby activate the
2570 next thread. It's assumed that the next ReSchedule after this uses
2571 NEW_THREAD as param.
2572 This fct is called from GranSimBlock and GranSimFetch
2575 //@cindex ActivateNextThread
2578 ActivateNextThread (proc)
2583 This routine is entered either via GranSimFetch or via GranSimBlock.
2584 It has to prepare the CurrentTSO for being blocked and update the
2585 run queue and other statistics on PE proc. The actual enqueuing to the
2586 blocking queue (if coming from GranSimBlock) is done in the entry code
2587 of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
2589 /* ToDo: add assertions here!! */
2590 //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
2592 // Only necessary if the running thread is at front of the queue
2593 // run_queue_hds[proc] = run_queue_hds[proc]->link;
2594 ASSERT(CurrentProc==proc);
2595 ASSERT(!is_on_queue(CurrentTSO,proc));
2596 if (run_queue_hds[proc]==END_TSO_QUEUE) {
2597 /* NB: this routine is only entered with asynchr comm (see assertion) */
2598 procStatus[proc] = Idle;
2600 /* ToDo: check cost assignment */
2601 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
2602 if (RtsFlags.GranFlags.GranSimStats.Full &&
2603 (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight))
2604 /* right flag !?? ^^^ */
2605 DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
2606 (StgClosure*)NULL, (StgInt)0, 0);
2611 The following GranSim fcts are stg-called from the threaded world.
2614 /* Called from HP_CHK and friends (see StgMacros.h) */
2615 //@cindex GranSimAllocate
2620 CurrentTSO->gran.allocs += n;
2621 ++(CurrentTSO->gran.basicblocks);
2623 if (RtsFlags.GranFlags.GranSimStats.Heap) {
2624 DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
2625 (StgClosure*)NULL, (StgInt)0, n);
2628 CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
2629 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
2633 Subtract the values added above, if a heap check fails and
2634 so has to be redone.
2636 //@cindex GranSimUnallocate
2638 GranSimUnallocate(n)
2641 CurrentTSO->gran.allocs -= n;
2642 --(CurrentTSO->gran.basicblocks);
2644 CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2645 CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2648 /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
2649 //@cindex GranSimExec
2651 GranSimExec(ariths,branches,loads,stores,floats)
2652 StgWord ariths,branches,loads,stores,floats;
2654 StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths +
2655 RtsFlags.GranFlags.Costs.branch_cost*branches +
2656 RtsFlags.GranFlags.Costs.load_cost * loads +
2657 RtsFlags.GranFlags.Costs.store_cost*stores +
2658 RtsFlags.GranFlags.Costs.float_cost*floats;
2660 CurrentTSO->gran.exectime += cost;
2661 CurrentTime[CurrentProc] += cost;
2665 Fetch the node if it isn't local
2666 -- result indicates whether fetch has been done.
2668 This is GRIP-style single item fetching.
2671 //@cindex GranSimFetch
2673 GranSimFetch(node /* , liveness_mask */ )
2675 /* StgInt liveness_mask; */
2677 /* reset the return value (to be checked within STG land) */
2678 NeedToReSchedule = rtsFalse;
2680 if (RtsFlags.GranFlags.Light) {
2681 /* Always reschedule in GrAnSim-Light to prevent one TSO from
2683 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2684 ContinueThread,CurrentTSO,node,NULL);
2689 /* Faking an RBH closure:
2690 If the bitmask of the closure is 0 then this node is a fake RBH;
2692 if (node->header.gran.procs == Nowhere) {
2694 belch("## Found fake RBH (node %p); delaying TSO %d (%p)",
2695 node, CurrentTSO->id, CurrentTSO));
2697 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
2698 ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
2700 /* Rescheduling (GranSim internal) is necessary */
2701 NeedToReSchedule = rtsTrue;
2706 /* Note: once a node has been fetched, this test will be passed */
2707 if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
2709 PEs p = where_is(node);
2712 IF_GRAN_DEBUG(thunkStealing,
2714 belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
2716 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2717 /* NB: Fetch is counted on arrival (FetchReply) */
2719 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
2720 RtsFlags.GranFlags.Costs.latency;
2722 new_event(p, CurrentProc, fetchtime,
2723 FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
2725 if (fetchtime<TimeOfNextEvent)
2726 TimeOfNextEvent = fetchtime;
2728 /* About to block */
2729 CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
2731 ++OutstandingFetches[CurrentProc];
2733 if (RtsFlags.GranFlags.DoAsyncFetch)
2734 /* if asynchr comm is turned on, activate the next thread in the q */
2735 ActivateNextThread(CurrentProc);
2737 procStatus[CurrentProc] = Fetching;
2740 /* ToDo: nuke the entire if (anything special for fair schedule?) */
2741 if (RtsFlags.GranFlags.DoAsyncFetch)
2743 /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
2744 if(!RtsFlags.GranFlags.DoFairSchedule)
2746 /* now done in do_the_fetchnode
2747 if (RtsFlags.GranFlags.GranSimStats.Full)
2748 DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
2749 node, (StgInt)0, 0);
2751 ActivateNextThread(CurrentProc);
2753 # if 0 && defined(GRAN_CHECK)
2754 if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
2755 if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
2756 fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
2757 CurrentTSO,CurrentTime[CurrentProc]);
2758 stg_exit(EXIT_FAILURE);
2760 TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
2764 CurrentTSO->link = END_TSO_QUEUE;
2765 /* CurrentTSO = END_TSO_QUEUE; */
2767 /* CurrentTSO is pointed to by the FetchNode event; it is
2768 on no run queue any more */
2769 } else { /* fair scheduling currently not supported -- HWL */
2770 barf("Asynchr communication is not yet compatible with fair scheduling\n");
2772 } else { /* !RtsFlags.GranFlags.DoAsyncFetch */
2773 procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
2774 /* now done in do_the_fetchnode
2775 if (RtsFlags.GranFlags.GranSimStats.Full)
2776 DumpRawGranEvent(CurrentProc, p,
2777 GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
2779 IF_GRAN_DEBUG(blockOnFetch,
2780 BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
2784 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2786 /* Rescheduling (GranSim internal) is necessary */
2787 NeedToReSchedule = rtsTrue;
2794 //@cindex GranSimSpark
2796 GranSimSpark(local,node)
2800 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2801 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2802 DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
2803 END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
2805 /* Force the PE to take notice of the spark */
2806 if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2807 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2809 END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
2810 if (CurrentTime[CurrentProc]<TimeOfNextEvent)
2811 TimeOfNextEvent = CurrentTime[CurrentProc];
2815 ++CurrentTSO->gran.localsparks;
2817 ++CurrentTSO->gran.globalsparks;
2820 //@cindex GranSimSparkAt
2822 GranSimSparkAt(spark,where,identifier)
2824 StgClosure *where; /* This should be a node; alternatively could be a GA */
2827 PEs p = where_is(where);
2828 GranSimSparkAtAbs(spark,p,identifier);
2831 //@cindex GranSimSparkAtAbs
2833 GranSimSparkAtAbs(spark,proc,identifier)
2840 if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
2841 return; /* turned a spark into a NULL. */
2843 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2844 if(RtsFlags.GranFlags.GranSimStats.Sparks)
2845 DumpRawGranEvent(proc,0,SP_SPARKAT,
2846 END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
2848 if (proc!=CurrentProc) {
2849 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2850 exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]?
2851 CurrentTime[proc]: CurrentTime[CurrentProc])
2852 + RtsFlags.GranFlags.Costs.latency;
2854 exporttime = CurrentTime[CurrentProc];
2857 if ( RtsFlags.GranFlags.Light )
2858 /* Need CurrentTSO in event field to associate costs with creating
2859 spark even in a GrAnSim Light setup */
2860 new_event(proc, CurrentProc, exporttime,
2862 CurrentTSO, spark->node, spark);
2864 new_event(proc, CurrentProc, exporttime,
2865 MoveSpark, (StgTSO*)NULL, spark->node, spark);
2866 /* Bit of a hack to treat placed sparks the same as stolen sparks */
2867 ++OutstandingFishes[proc];
2869 /* Force the PE to take notice of the spark (FINDWORK is put after a
2870 MoveSpark into the sparkq!) */
2871 if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2872 new_event(CurrentProc,CurrentProc,exporttime+1,
2874 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2877 if (exporttime<TimeOfNextEvent)
2878 TimeOfNextEvent = exporttime;
2880 if (proc!=CurrentProc) {
2881 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2882 ++CurrentTSO->gran.globalsparks;
2884 ++CurrentTSO->gran.localsparks;
2889 This function handles local and global blocking. It's called either
2890 from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
2891 trying to fetch an BH or RBH
2894 //@cindex GranSimBlock
2896 GranSimBlock(tso, proc, node)
2901 PEs node_proc = where_is(node), tso_proc = where_is(tso);
2903 ASSERT(tso_proc==CurrentProc);
2904 // ASSERT(node_proc==CurrentProc);
2906 if (node_proc!=CurrentProc)
2907 belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
2908 tso->id, tso, tso_proc, node, node_proc));
2909 ASSERT(tso->link==END_TSO_QUEUE);
2910 ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
2911 //ASSERT(tso==run_queue_hds[proc]);
2914 belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
2915 tso->id, tso, proc, node, CurrentTime[proc]);)
2918 /* THIS SHOULD NEVER HAPPEN!
2919 If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
2920 we have missed a GranSimFetch before entering this closure;
2921 we hack around it for now, faking a FetchNode;
2922 because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
2923 tso will be blocked on this closure until the FetchReply occurs.
2927 if (node_proc!=CurrentProc) {
2929 ret = GranSimFetch(node);
2932 belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
2933 node, node_proc, CurrentProc););
2938 if (RtsFlags.GranFlags.GranSimStats.Full)
2939 DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
2941 ++(tso->gran.blockcount);
2942 /* Distinction between local and global block is made in blockFetch */
2943 tso->gran.blockedat = CurrentTime[proc];
2945 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
2946 ActivateNextThread(proc);
2947 /* tso->link = END_TSO_QUEUE; not really necessary; only for testing */
2952 //@node Index, , Dumping routines, GranSim specific code
2956 //* ActivateNextThread:: @cindex\s-+ActivateNextThread
2957 //* CurrentProc:: @cindex\s-+CurrentProc
2958 //* CurrentTime:: @cindex\s-+CurrentTime
2959 //* GranSimAllocate:: @cindex\s-+GranSimAllocate
2960 //* GranSimBlock:: @cindex\s-+GranSimBlock
2961 //* GranSimExec:: @cindex\s-+GranSimExec
2962 //* GranSimFetch:: @cindex\s-+GranSimFetch
2963 //* GranSimLight_insertThread:: @cindex\s-+GranSimLight_insertThread
2964 //* GranSimSpark:: @cindex\s-+GranSimSpark
2965 //* GranSimSparkAt:: @cindex\s-+GranSimSparkAt
2966 //* GranSimSparkAtAbs:: @cindex\s-+GranSimSparkAtAbs
2967 //* GranSimUnallocate:: @cindex\s-+GranSimUnallocate
2968 //* any_idle:: @cindex\s-+any_idle
2969 //* blockFetch:: @cindex\s-+blockFetch
2970 //* do_the_fetchnode:: @cindex\s-+do_the_fetchnode
2971 //* do_the_fetchreply:: @cindex\s-+do_the_fetchreply
2972 //* do_the_findwork:: @cindex\s-+do_the_findwork
2973 //* do_the_globalblock:: @cindex\s-+do_the_globalblock
2974 //* do_the_movespark:: @cindex\s-+do_the_movespark
2975 //* do_the_movethread:: @cindex\s-+do_the_movethread
2976 //* do_the_startthread:: @cindex\s-+do_the_startthread
2977 //* do_the_unblock:: @cindex\s-+do_the_unblock
2978 //* fetchNode:: @cindex\s-+fetchNode
2979 //* ga_to_proc:: @cindex\s-+ga_to_proc
2980 //* get_next_event:: @cindex\s-+get_next_event
2981 //* get_time_of_next_event:: @cindex\s-+get_time_of_next_event
2982 //* grab_event:: @cindex\s-+grab_event
2983 //* handleFetchRequest:: @cindex\s-+handleFetchRequest
2984 //* handleIdlePEs:: @cindex\s-+handleIdlePEs
2985 //* idlers:: @cindex\s-+idlers
2986 //* insertThread:: @cindex\s-+insertThread
2987 //* insert_event:: @cindex\s-+insert_event
2988 //* is_on_queue:: @cindex\s-+is_on_queue
2989 //* is_unique:: @cindex\s-+is_unique
2990 //* new_event:: @cindex\s-+new_event
2991 //* prepend_event:: @cindex\s-+prepend_event
2992 //* print_event:: @cindex\s-+print_event
2993 //* print_eventq:: @cindex\s-+print_eventq
2994 //* prune_eventq :: @cindex\s-+prune_eventq
2995 //* spark queue:: @cindex\s-+spark queue
2996 //* sparkStealTime:: @cindex\s-+sparkStealTime
2997 //* stealSomething:: @cindex\s-+stealSomething
2998 //* stealSpark:: @cindex\s-+stealSpark
2999 //* stealSparkMagic:: @cindex\s-+stealSparkMagic
3000 //* stealThread:: @cindex\s-+stealThread
3001 //* stealThreadMagic:: @cindex\s-+stealThreadMagic
3002 //* thread_queue_len:: @cindex\s-+thread_queue_len
3003 //* traverse_eventq_for_gc:: @cindex\s-+traverse_eventq_for_gc
3004 //* where_is:: @cindex\s-+where_is