2 Time-stamp: <2006-10-19 15:12:58 simonmar>
4 Variables and functions specific to GranSim the parallelism simulator
8 //@node GranSim specific code, , ,
9 //@section GranSim specific code
12 Macros for dealing with the new and improved GA field for simulating
13 parallel execution. Based on @CONCURRENT@ package. The GA field now
14 contains a mask, where the n-th bit stands for the n-th processor, where
15 this data can be found. In case of multiple copies, several bits are
16 set. The total number of processors is bounded by @MAX_PROC@, which
17 should be <= the length of a word in bits. -- HWL
22 //* Prototypes and externs::
23 //* Constants and Variables::
25 //* Global Address Operations::
26 //* Global Event Queue::
27 //* Spark queue functions::
28 //* Scheduling functions::
29 //* Thread Queue routines::
30 //* GranSim functions::
31 //* GranSimLight routines::
32 //* Code for Fetching Nodes::
34 //* Routines directly called from Haskell world::
35 //* Emiting profiling info for GrAnSim::
36 //* Dumping routines::
40 //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
41 //@subsection Includes
46 #include "StgMiscClosures.h"
48 #include "Storage.h" // for recordMutable
50 #include "SchedAPI.h" // for pushClosure
51 #include "GranSimRts.h"
53 #include "ParallelRts.h"
54 #include "ParallelDebug.h"
58 //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
59 //@subsection Prototypes and externs
64 static inline PEs ga_to_proc(StgWord);
65 static inline rtsBool any_idle(void);
66 static inline nat idlers(void);
67 PEs where_is(StgClosure *node);
69 static rtsBool stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
70 static rtsBool stealSpark(PEs proc);
71 static rtsBool stealThread(PEs proc);
72 static rtsBool stealSparkMagic(PEs proc);
73 static rtsBool stealThreadMagic(PEs proc);
74 /* subsumed by stealSomething
75 static void stealThread(PEs proc);
76 static void stealSpark(PEs proc);
78 static rtsTime sparkStealTime(void);
79 static nat natRandom(nat from, nat to);
80 static PEs findRandomPE(PEs proc);
81 static void sortPEsByTime (PEs proc, PEs *pes_by_time,
82 nat *firstp, nat *np);
88 //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
89 //@subsection Constants and Variables
91 #if defined(GRAN) || defined(PAR)
92 /* See GranSim.h for the definition of the enum gran_event_types */
93 char *gran_event_names[] = {
95 "STEALING", "STOLEN", "STOLEN(Q)",
96 "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
97 "SCHEDULE", "DESCHEDULE",
99 "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
102 "SYSTEM_START", "SYSTEM_END", /* only for debugging */
107 #if defined(GRAN) /* whole file */
108 char *proc_status_names[] = {
109 "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy",
113 /* For internal use (event statistics) only */
114 char *event_names[] =
115 { "ContinueThread", "StartThread", "ResumeThread",
116 "MoveSpark", "MoveThread", "FindWork",
117 "FetchNode", "FetchReply",
118 "GlobalBlock", "UnblockThread"
121 //@cindex CurrentProc
125 ToDo: Create a structure for the processor status and put all the
126 arrays below into it.
129 //@cindex CurrentTime
130 /* One clock for each PE */
131 rtsTime CurrentTime[MAX_PROC];
133 /* Useful to restrict communication; cf fishing model in GUM */
134 nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
136 /* Status of each PE (new since but independent of GranSim Light) */
137 rtsProcStatus procStatus[MAX_PROC];
139 # if defined(GRAN) && defined(GRAN_CHECK)
140 /* To check if the RTS ever tries to run a thread that should be blocked
141 because of fetching remote data */
142 StgTSO *BlockedOnFetch[MAX_PROC];
143 # define FETCH_MASK_TSO 0x08000000 /* only bits 0, 1, 2 should be used */
146 nat SparksAvail = 0; /* How many sparks are available */
147 nat SurplusThreads = 0; /* How many excess threads are there */
149 /* Do we need to reschedule following a fetch? */
150 rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse;
151 rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
153 //@cindex spark queue
154 /* GranSim: a globally visible array of spark queues */
155 rtsSparkQ pending_sparks_hds[MAX_PROC];
156 rtsSparkQ pending_sparks_tls[MAX_PROC];
158 nat sparksIgnored = 0, sparksCreated = 0;
160 GlobalGranStats globalGranStats;
162 nat gran_arith_cost, gran_branch_cost, gran_load_cost,
163 gran_store_cost, gran_float_cost;
166 Old comment from 0.29. ToDo: Check and update -- HWL
168 The following variables control the behaviour of GrAnSim. In general, there
169 is one RTS option for enabling each of these features. In getting the
170 desired setup of GranSim the following questions have to be answered:
172 \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)?
173 Currently only unfair scheduling is supported.
174 \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)?
175 Either block and wait for the
176 data or reschedule and do some other work.
177 Thus, if this variable is true, asynchronous communication is
178 modelled. Block on fetch mainly makes sense for incremental fetching.
180 There is also a simplified fetch variant available
181 (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
182 communication. It is faster but the results will be less accurate.
183 \item How aggressive to be in getting work after a reschedule on fetch
184 (@RtsFlags.GranFlags.FetchStrategy@)?
185 This is determined by the so-called {\em fetching
186 strategy\/}. Currently, there are four possibilities:
188 \item Only run a runnable thread.
189 \item Turn a spark into a thread, if necessary.
190 \item Steal a remote spark, if necessary.
191 \item Steal a runnable thread from another processor, if necessary.
193 The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
194 when rescheduling on a fetch.
195 \item Should sparks or threads be stolen first when looking for work
196 (@RtsFlags.GranFlags.DoStealThreadsFirst@)?
197 The default is to steal sparks first (much cheaper).
198 \item Should the RTS use a lazy thread creation scheme
199 (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)? By default yes i.e.\ sparks are only
200 turned into threads when work is needed. Also note, that sparks
201 can be discarded by the RTS (this is done in the case of an overflow
202 of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@ to @True@ forces
203 the creation of threads at the next possibility (i.e.\ when new work
204 is demanded the next time).
205 \item Should data be fetched closure-by-closure or in packets
206 (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental
207 (i.e.\ closure-by-closure) strategy. This makes sense in a
208 low-latency setting but is bad in a high-latency system. Setting
209 @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
210 parameters determine the size of the packets (@pack_buffer_size@) and the number of
211 thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
212 \item If there is no other possibility to find work, should runnable threads
213 be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
214 RTS tried to get sparks (either local or remote ones) first. Thread
215 migration is very expensive, since a whole TSO has to be transferred
216 and probably data locality becomes worse in the process. Note, that
217 the closure, which will be evaluated next by that TSO is not
218 transferred together with the TSO (that might block another thread).
219 \item Should the RTS distinguish between sparks created by local nodes and
220 stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)? The idea is to improve
221 data locality by preferring sparks of local nodes (it is more likely
222 that the data for those sparks is already on the local processor).
223 However, such a distinction also imposes an overhead on the spark
224 queue management, and typically a large number of sparks are
225 generated during execution. By default this variable is set to @False@.
226 \item Should the RTS use granularity control mechanisms? The idea of a
227 granularity control mechanism is to make use of granularity
228 information provided via annotation of the @par@ construct in order
229 to prefer bigger threads when either turning a spark into a thread or
230 when choosing the next thread to schedule. Currently, three such
231 mechanisms are implemented:
233 \item Cut-off: The granularity information is interpreted as a
234 priority. If a threshold priority is given to the RTS, then
235 only those sparks with a higher priority than the threshold
236 are actually created. Other sparks are immediately discarded.
237 This is similar to a usual cut-off mechanism often used in
238 parallel programs, where parallelism is only created if the
239 input data is lage enough. With this option, the choice is
240 hidden in the RTS and only the threshold value has to be
241 provided as a parameter to the runtime system.
242 \item Priority Sparking: This mechanism keeps priorities for sparks
243 and chooses the spark with the highest priority when turning
244 a spark into a thread. After that the priority information is
245 discarded. The overhead of this mechanism comes from
246 maintaining a sorted spark queue.
247 \item Priority Scheduling: This mechanism keeps the granularity
248 information for threads, to. Thus, on each reschedule the
249 largest thread is chosen. This mechanism has a higher
250 overhead, as the thread queue is sorted, too.
255 //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
256 //@subsection Initialisation
259 init_gr_stats (void) {
260 memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
263 globalGranStats.noOfEvents = 0;
264 for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
266 /* communication stats */
267 globalGranStats.fetch_misses = 0;
268 globalGranStats.tot_low_pri_sparks = 0;
271 globalGranStats.rs_sp_count = 0;
272 globalGranStats.rs_t_count = 0;
273 globalGranStats.ntimes_total = 0,
274 globalGranStats.fl_total = 0;
275 globalGranStats.no_of_steals = 0;
277 /* spark queue stats */
278 globalGranStats.tot_sq_len = 0,
279 globalGranStats.tot_sq_probes = 0;
280 globalGranStats.tot_sparks = 0;
281 globalGranStats.withered_sparks = 0;
282 globalGranStats.tot_add_threads = 0;
283 globalGranStats.tot_tq_len = 0;
284 globalGranStats.non_end_add_threads = 0;
287 globalGranStats.tot_threads_created = 0;
288 for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
292 //@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
293 //@subsection Global Address Operations
295 ----------------------------------------------------------------------
296 Global Address Operations
298 These functions perform operations on the global-address (ga) part of a
299 closure. The ga is the only new field (1 word) in a closure introduced by
300 GrAnSim. It serves as a bitmask, indicating on which processor the
301 closure is residing. Since threads are described by Thread State Object
302 (TSO), which is nothing but another kind of closure, this scheme allows
303 gives placement information about threads.
305 A ga is just a bitmask, so the operations on them are mainly bitmask
306 manipulating functions. Note, that there are important macros like PROCS,
307 IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
309 NOTE: In GrAnSim-light we don't maintain placement information. This
310 allows to simulate an arbitrary number of processors. The price we have
311 to be is the lack of costing any communication properly. In short,
312 GrAnSim-light is meant to reveal the maximal parallelism in a program.
313 From an implementation point of view the important thing is: {\em
314 GrAnSim-light does not maintain global-addresses}. */
316 /* ga_to_proc returns the first processor marked in the bitmask ga.
317 Normally only one bit in ga should be set. But for PLCs all bits
318 are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
323 ga_to_proc(StgWord ga)
326 for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
327 ASSERT(i<RtsFlags.GranFlags.proc);
331 /* NB: This takes a *node* rather than just a ga as input */
334 where_is(StgClosure *node)
335 { return (ga_to_proc(PROCS(node))); }
340 is_unique(StgClosure *node)
343 rtsBool unique = rtsFalse;
345 for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
346 if (IS_LOCAL_TO(PROCS(node), i))
347 if (unique) // exactly 1 instance found so far
348 return rtsFalse; // found a 2nd instance => not unique
350 unique = rtsTrue; // found 1st instance
351 ASSERT(unique); // otherwise returned from within loop
356 static inline rtsBool
357 any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
360 for(i=0, any_idle=rtsFalse;
361 !any_idle && i<RtsFlags.GranFlags.proc;
362 any_idle = any_idle || procStatus[i] == Idle, i++)
368 idlers(void) { /* number of idle PEs */
371 i<RtsFlags.GranFlags.proc;
372 j += (procStatus[i] == Idle) ? 1 : 0, i++)
377 //@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
378 //@subsection Global Event Queue
380 The following routines implement an ADT of an event-queue (FIFO).
381 ToDo: Put that in an own file(?)
384 /* Pointer to the global event queue; events are currently malloc'ed */
385 rtsEventQ EventHd = NULL;
387 //@cindex get_next_event
391 static rtsEventQ entry = NULL;
393 if (EventHd == NULL) {
394 barf("No next event. This may be caused by a circular data dependency in the program.");
400 if (RtsFlags.GranFlags.GranSimStats.Global) { /* count events */
401 globalGranStats.noOfEvents++;
402 globalGranStats.event_counts[EventHd->evttype]++;
407 IF_GRAN_DEBUG(event_trace,
410 EventHd = EventHd->next;
414 /* When getting the time of the next event we ignore CONTINUETHREAD events:
415 we don't want to be interrupted before the end of the current time slice
416 unless there is something important to handle.
418 //@cindex get_time_of_next_event
420 get_time_of_next_event(void)
422 rtsEventQ event = EventHd;
424 while (event != NULL && event->evttype==ContinueThread) {
428 return ((rtsTime) 0);
430 return (event->time);
433 /* ToDo: replace malloc/free with a free list */
434 //@cindex insert_event
436 insert_event(newentry)
439 rtsEventType evttype = newentry->evttype;
440 rtsEvent *event, **prev;
442 /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
444 /* Search the queue and insert at the right point:
445 FINDWORK before everything, CONTINUETHREAD after everything.
447 This ensures that we find any available work after all threads have
448 executed the current cycle. This level of detail would normally be
449 irrelevant, but matters for ridiculously low latencies...
452 /* Changed the ordering: Now FINDWORK comes after everything but
453 CONTINUETHREAD. This makes sure that a MOVESPARK comes before a
454 FINDWORK. This is important when a GranSimSparkAt happens and
455 DoAlwaysCreateThreads is turned on. Also important if a GC occurs
456 when trying to build a new thread (see much_spark) -- HWL 02/96 */
461 for (event = EventHd, prev=(rtsEvent**)&EventHd;
463 prev = (rtsEvent**)&(event->next), event = event->next) {
465 case FindWork: if ( event->time < newentry->time ||
466 ( (event->time == newentry->time) &&
467 (event->evttype != ContinueThread) ) )
471 case ContinueThread: if ( event->time <= newentry->time )
475 default: if ( event->time < newentry->time ||
476 ((event->time == newentry->time) &&
477 (event->evttype == newentry->evttype)) )
482 /* Insert newentry here (i.e. before event) */
484 newentry->next = event;
494 new_event(proc,creator,time,evttype,tso,node,spark)
497 rtsEventType evttype;
502 rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
504 newentry->proc = proc;
505 newentry->creator = creator;
506 newentry->time = time;
507 newentry->evttype = evttype;
509 newentry->node = node;
510 newentry->spark = spark;
511 newentry->gc_info = 0;
512 newentry->next = NULL;
514 insert_event(newentry);
517 fprintf(stderr, "GRAN: new_event: \n");
518 print_event(newentry));
521 //@cindex prepend_event
523 prepend_event(event) /* put event at beginning of EventQueue */
525 { /* only used for GC! */
526 event->next = EventHd;
532 grab_event(void) /* undo prepend_event i.e. get the event */
533 { /* at the head of EventQ but don't free anything */
534 rtsEventQ event = EventHd;
536 if (EventHd == NULL) {
537 barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
540 EventHd = EventHd->next;
544 //@cindex traverse_eventq_for_gc
546 traverse_eventq_for_gc(void)
548 rtsEventQ event = EventHd;
550 StgClosure *closurep;
552 StgPtr buffer, bufptr;
555 /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
556 orig closure (root of packed graph). This means that a graph, which is
557 between processors at the time of GC is fetched again at the time when
558 it would have arrived, had there been no GC. Slightly inaccurate but
560 This is only needed for GUM style fetchng. -- HWL */
561 if (!RtsFlags.GranFlags.DoBulkFetching)
564 for(event = EventHd; event!=NULL; event=event->next) {
565 if (event->evttype==FetchReply) {
566 buffer = stgCast(StgPtr,event->node);
567 ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG); /* It's a pack buffer */
568 bufsize = buffer[PACK_SIZE_LOCN];
569 closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
570 tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
572 creator = event->creator; /* similar to unpacking */
573 for (bufptr=buffer+PACK_HDR_SIZE;
574 bufptr<(buffer+bufsize);
576 // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
577 // (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
578 if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
579 convertFromRBH(stgCast(StgClosure *,bufptr));
583 event->evttype = FetchNode;
584 event->proc = creator;
585 event->creator = proc;
586 event->node = closurep;
596 StgClosure *MarkRoot(StgClosure *root); // prototype
598 rtsEventQ event = EventHd;
601 /* iterate over eventq and register relevant fields in event as roots */
602 for(event = EventHd, len = 0; event!=NULL; event=event->next, len++) {
603 switch (event->evttype) {
605 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
608 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
609 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
612 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
613 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
616 event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
619 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
624 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
625 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
628 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
629 if (RtsFlags.GranFlags.DoBulkFetching)
630 // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
631 belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
633 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
636 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
637 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
640 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
641 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
644 barf("markEventQueue: trying to mark unknown event @ %p", event);
647 belch("GC: markEventQueue: %d events in queue", len));
651 Prune all ContinueThread events related to tso or node in the eventq.
652 Currently used if a thread leaves STG land with ThreadBlocked status,
653 i.e. it blocked on a closure and has been put on its blocking queue. It
654 will be reawakended via a call to awakenBlockedQueue. Until then no
655 event effecting this tso should appear in the eventq. A bit of a hack,
656 because ideally we shouldn't generate such spurious ContinueThread events
659 //@cindex prune_eventq
661 prune_eventq(tso, node)
664 { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
666 /* node unused for now */
668 /* tso must be valid, then */
669 ASSERT(tso!=END_TSO_QUEUE);
670 while (event != NULL) {
671 if (event->evttype==ContinueThread &&
673 IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
674 belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
675 event->tso->id, event->tso, event->proc, event->time, event));
676 if (prev==(rtsEventQ)NULL) { // beginning of eventq
677 EventHd = event->next;
681 prev->next = event->next;
685 } else { // no pruning necessary; go to next event
692 //@cindex print_event
697 char str_tso[16], str_node[16];
700 if (event->tso==END_TSO_QUEUE) {
701 strcpy(str_tso, "______");
704 sprintf(str_tso, "%p", event->tso);
705 tso_id = (event->tso==NULL) ? 0 : event->tso->id;
707 if (event->node==(StgClosure*)NULL) {
708 strcpy(str_node, "______");
710 sprintf(str_node, "%p", event->node);
712 // HWL: shouldn't be necessary; ToDo: nuke
717 fprintf(stderr,"Evt: NIL\n");
719 fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
720 event_names[event->evttype], event->evttype,
721 event->proc, event->creator, event->time,
722 tso_id, str_tso, str_node
723 /*, event->spark, event->next */ );
727 //@cindex print_eventq
734 fprintf(stderr,"Event Queue with root at %p:\n", hd);
735 for (x=hd; x!=NULL; x=x->next) {
741 Spark queue functions are now all in Sparks.c!!
743 //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
744 //@subsection Scheduling functions
747 These functions are variants of thread initialisation and therefore
748 related to initThread and friends in Schedule.c. However, they are
749 specific to a GranSim setup in storing more info in the TSO's statistics
750 buffer and sorting the thread queues etc.
754 A large portion of startThread deals with maintaining a sorted thread
755 queue, which is needed for the Priority Sparking option. Without that
756 complication the code boils down to FIFO handling.
758 //@cindex insertThread
760 insertThread(tso, proc)
764 StgTSO *prev = NULL, *next = NULL;
766 rtsBool found = rtsFalse;
768 ASSERT(CurrentProc==proc);
769 ASSERT(!is_on_queue(tso,proc));
770 /* Idle proc: put the thread on the run queue
771 same for pri spark and basic version */
772 if (run_queue_hds[proc] == END_TSO_QUEUE)
775 ASSERT((CurrentProc==MainProc &&
776 CurrentTime[MainProc]==0 &&
777 procStatus[MainProc]==Idle) ||
778 procStatus[proc]==Starting);
780 run_queue_hds[proc] = run_queue_tls[proc] = tso;
782 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
784 /* new_event of ContinueThread has been moved to do_the_startthread */
787 ASSERT(procStatus[proc]==Idle ||
788 procStatus[proc]==Fishing ||
789 procStatus[proc]==Starting);
790 procStatus[proc] = Busy;
795 if (RtsFlags.GranFlags.Light)
796 GranSimLight_insertThread(tso, proc);
798 /* Only for Pri Scheduling: find place where to insert tso into queue */
799 if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
800 /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
801 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
802 (next != END_TSO_QUEUE) &&
803 !(found = tso->gran.pri >= next->gran.pri);
804 prev = next, next = next->link, count++)
806 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
807 (prev==(StgTSO*)NULL || prev->link==next));
810 ASSERT(!found || next != END_TSO_QUEUE);
811 ASSERT(procStatus[proc]!=Idle);
814 /* found can only be rtsTrue if pri scheduling enabled */
815 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
816 if (RtsFlags.GranFlags.GranSimStats.Global)
817 globalGranStats.non_end_add_threads++;
818 /* Add tso to ThreadQueue between prev and next */
820 if ( next == (StgTSO*)END_TSO_QUEUE ) {
823 /* no back link for TSO chain */
826 if ( prev == (StgTSO*)END_TSO_QUEUE ) {
827 /* Never add TSO as first elem of thread queue; the first */
828 /* element should be the one that is currently running -- HWL */
830 belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
831 tso, tso->gran.pri, run_queue_hd, proc,
836 } else { /* !found */ /* or not pri sparking! */
837 /* Add TSO to the end of the thread queue on that processor */
838 run_queue_tls[proc]->link = tso;
839 run_queue_tls[proc] = tso;
841 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
842 CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
843 RtsFlags.GranFlags.Costs.threadqueuetime;
845 /* ToDo: check if this is still needed -- HWL
846 if (RtsFlags.GranFlags.DoThreadMigration)
849 if (RtsFlags.GranFlags.GranSimStats.Full &&
850 !(( event_type == GR_START || event_type == GR_STARTQ) &&
851 RtsFlags.GranFlags.labelling) )
852 DumpRawGranEvent(proc, creator, event_type+1, tso, node,
853 tso->gran.sparkname, spark_queue_len(proc));
856 # if defined(GRAN_CHECK)
857 /* Check if thread queue is sorted. Only for testing, really! HWL */
858 if ( RtsFlags.GranFlags.DoPriorityScheduling &&
859 (RtsFlags.GranFlags.Debug.sortedQ) ) {
860 rtsBool sorted = rtsTrue;
863 if (run_queue_hds[proc]==END_TSO_QUEUE ||
864 run_queue_hds[proc]->link==END_TSO_QUEUE) {
865 /* just 1 elem => ok */
867 /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
868 for (prev = run_queue_hds[proc]->link, next = prev->link;
869 (next != END_TSO_QUEUE) ;
870 prev = next, next = prev->link) {
871 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
872 (prev==(StgTSO*)NULL || prev->link==next));
874 (prev->gran.pri >= next->gran.pri);
878 fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
880 G_THREADQ(run_queue_hd,0x1);
887 insertThread, which is only used for GranSim Light, is similar to
888 startThread in that it adds a TSO to a thread queue. However, it assumes
889 that the thread queue is sorted by local clocks and it inserts the TSO at
890 the right place in the queue. Don't create any event, just insert.
892 //@cindex GranSimLight_insertThread
894 GranSimLight_insertThread(tso, proc)
900 rtsBool found = rtsFalse;
902 ASSERT(RtsFlags.GranFlags.Light);
904 /* In GrAnSim-Light we always have an idle `virtual' proc.
905 The semantics of the one-and-only thread queue is different here:
906 all threads in the queue are running (each on its own virtual processor);
907 the queue is only needed internally in the simulator to interleave the
908 reductions of the different processors.
909 The one-and-only thread queue is sorted by the local clocks of the TSOs.
911 ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
912 ASSERT(tso->link == END_TSO_QUEUE);
914 /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
915 if (RtsFlags.GranFlags.GranSimStats.Full &&
916 (RtsFlags.GranFlags.Debug.checkLight) &&
917 (run_queue_hd->link == END_TSO_QUEUE)) {
918 DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
919 run_queue_hds[proc], (StgClosure*)NULL,
920 tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
921 // resched = rtsTrue;
924 /* this routine should only be used in a GrAnSim Light setup */
925 /* && CurrentProc must be 0 in GrAnSim Light setup */
926 ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
928 /* Idle proc; same for pri spark and basic version */
929 if (run_queue_hd==END_TSO_QUEUE)
931 run_queue_hd = run_queue_tl = tso;
932 /* MAKE_BUSY(CurrentProc); */
936 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
937 (next != END_TSO_QUEUE) &&
938 !(found = (tso->gran.clock < next->gran.clock));
939 prev = next, next = next->link, count++)
941 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
942 (prev==(StgTSO*)NULL || prev->link==next));
945 /* found can only be rtsTrue if pri sparking enabled */
947 /* Add tso to ThreadQueue between prev and next */
949 if ( next == END_TSO_QUEUE ) {
950 run_queue_tls[proc] = tso;
952 /* no back link for TSO chain */
955 if ( prev == END_TSO_QUEUE ) {
956 run_queue_hds[proc] = tso;
960 } else { /* !found */ /* or not pri sparking! */
961 /* Add TSO to the end of the thread queue on that processor */
962 run_queue_tls[proc]->link = tso;
963 run_queue_tls[proc] = tso;
966 if ( prev == END_TSO_QUEUE ) { /* new head of queue */
967 new_event(proc, proc, CurrentTime[proc],
969 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
972 if (RtsFlags.GranFlags.GranSimStats.Full &&
973 !(( event_type == GR_START || event_type == GR_STARTQ) &&
974 RtsFlags.GranFlags.labelling) )
975 DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
976 tso->gran.sparkname, spark_queue_len(proc));
982 endThread is responsible for general clean-up after the thread tso has
983 finished. This includes emitting statistics into the profile etc.
986 endThread(StgTSO *tso, PEs proc)
988 ASSERT(procStatus[proc]==Busy); // coming straight out of STG land
989 ASSERT(tso->what_next==ThreadComplete);
990 // ToDo: prune ContinueThreads for this TSO from event queue
991 DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
993 /* if this was the last thread on this PE then make it Idle */
994 if (run_queue_hds[proc]==END_TSO_QUEUE) {
995 procStatus[CurrentProc] = Idle;
999 //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
1000 //@subsection Thread Queue routines
1003 Check whether given tso resides on the run queue of the current processor.
1004 Only used for debugging.
1007 //@cindex is_on_queue
1009 is_on_queue (StgTSO *tso, PEs proc)
1014 for (t=run_queue_hds[proc], found=rtsFalse;
1015 t!=END_TSO_QUEUE && !(found = t==tso);
1022 /* This routine is only used for keeping a statistics of thread queue
1023 lengths to evaluate the impact of priority scheduling. -- HWL
1024 {spark_queue_len}vo' jInIHta'
1026 //@cindex thread_queue_len
1028 thread_queue_len(PEs proc)
1030 StgTSO *prev, *next;
1033 for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
1034 next != END_TSO_QUEUE;
1035 len++, prev = next, next = prev->link)
1041 //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
1042 //@subsection GranSim functions
1044 /* ----------------------------------------------------------------- */
1045 /* The main event handling functions; called from Schedule.c (schedule) */
1046 /* ----------------------------------------------------------------- */
1048 //@cindex do_the_globalblock
1051 do_the_globalblock(rtsEvent* event)
1053 PEs proc = event->proc; /* proc that requested node */
1054 StgTSO *tso = event->tso; /* tso that requested node */
1055 StgClosure *node = event->node; /* requested, remote node */
1057 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
1058 /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
1059 ASSERT(!RtsFlags.GranFlags.Light);
1060 /* GlobalBlock events only valid with GUM fetching */
1061 ASSERT(RtsFlags.GranFlags.DoBulkFetching);
1063 IF_GRAN_DEBUG(bq, // globalBlock,
1064 if (IS_LOCAL_TO(PROCS(node),proc)) {
1065 belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
1066 tso->id, tso, node, proc);
1069 /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
1070 if ( blockFetch(tso,proc,node) != 0 )
1071 return; /* node has become local by now */
1074 ToDo: check whether anything has to be done at all after blockFetch -- HWL
1076 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
1077 StgTSO* tso = run_queue_hds[proc]; /* awaken next thread */
1078 if (tso != (StgTSO*)NULL) {
1079 new_event(proc, proc, CurrentTime[proc],
1081 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
1082 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
1083 if (RtsFlags.GranFlags.GranSimStats.Full)
1084 DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
1085 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc)); // ToDo: check sparkname and spar_queue_len
1086 procStatus[proc] = Busy; /* might have been fetching */
1088 procStatus[proc] = Idle; /* no work on proc now */
1090 } else { /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
1091 /* other thread is already running */
1092 /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL
1093 new_event(proc,proc,CurrentTime[proc],
1094 CONTINUETHREAD,EVENT_TSO(event),
1095 (RtsFlags.GranFlags.DoBulkFetching ? closure :
1096 EVENT_NODE(event)),NULL);
1102 //@cindex do_the_unblock
1105 do_the_unblock(rtsEvent* event)
1107 PEs proc = event->proc, /* proc that requested node */
1108 creator = event->creator; /* proc that requested node */
1109 StgTSO* tso = event->tso; /* tso that requested node */
1110 StgClosure* node = event->node; /* requested, remote node */
1112 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
1113 /* There should be no UNBLOCKs in GrAnSim Light setup */
1114 ASSERT(!RtsFlags.GranFlags.Light);
1115 /* UnblockThread means either FetchReply has arrived or
1116 a blocking queue has been awakened;
1117 ToDo: check with assertions
1118 ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
1120 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* block-on-fetch */
1121 /* We count block-on-fetch as normal block time */
1122 tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
1123 /* Dumping now done when processing the event
1124 No costs for contextswitch or thread queueing in this case
1125 if (RtsFlags.GranFlags.GranSimStats.Full)
1126 DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso,
1127 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
1129 /* Maybe do this in FetchReply already
1130 if (procStatus[proc]==Fetching)
1131 procStatus[proc] = Busy;
1134 new_event(proc, proc, CurrentTime[proc],
1136 tso, node, (rtsSpark*)NULL);
1139 /* Asynchr comm causes additional costs here: */
1140 /* Bring the TSO from the blocked queue into the threadq */
1142 /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
1143 new_event(proc, proc,
1144 CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
1146 tso, node, (rtsSpark*)NULL);
1149 //@cindex do_the_fetchnode
1152 do_the_fetchnode(rtsEvent* event)
1154 PEs proc = event->proc, /* proc that holds the requested node */
1155 creator = event->creator; /* proc that requested node */
1156 StgTSO* tso = event->tso;
1157 StgClosure* node = event->node; /* requested, remote node */
1158 rtsFetchReturnCode rc;
1160 ASSERT(CurrentProc==proc);
1161 /* There should be no FETCHNODEs in GrAnSim Light setup */
1162 ASSERT(!RtsFlags.GranFlags.Light);
1164 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
1166 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1168 /* ToDo: check whether this is the right place for dumping the event */
1169 if (RtsFlags.GranFlags.GranSimStats.Full)
1170 DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
1173 rc = handleFetchRequest(node, proc, creator, tso);
1174 if (rc == OutOfHeap) { /* trigger GC */
1175 # if defined(GRAN_CHECK) && defined(GRAN)
1176 if (RtsFlags.GcFlags.giveStats)
1177 fprintf(RtsFlags.GcFlags.statsFile,"***** veQ boSwI' PackNearbyGraph(node %p, tso %p (%d))\n",
1178 node, tso, tso->id);
1180 barf("//// do_the_fetchnode: out of heap after handleFetchRequest; ToDo: call GarbageCollect()");
1181 prepend_event(event);
1182 GarbageCollect(GetRoots, rtsFalse);
1183 // HWL: ToDo: check whether a ContinueThread has to be issued
1184 // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
1185 # if 0 && defined(GRAN_CHECK) && defined(GRAN)
1186 if (RtsFlags.GcFlags.giveStats) {
1187 fprintf(RtsFlags.GcFlags.statsFile,"***** SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
1188 Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED); ???
1189 fprintf(stderr,"***** No. of packets so far: %d (total size: %d)\n",
1190 globalGranStats.tot_packets, globalGranStats.tot_packet_size);
1193 event = grab_event();
1194 // Hp -= PACK_HEAP_REQUIRED; // ???
1196 /* GC knows that events are special and follows the pointer i.e. */
1197 /* events are valid even if they moved. An EXIT is triggered */
1198 /* if there is not enough heap after GC. */
1200 } while (rc == OutOfHeap);
1203 //@cindex do_the_fetchreply
1205 do_the_fetchreply(rtsEvent* event)
1207 PEs proc = event->proc, /* proc that requested node */
1208 creator = event->creator; /* proc that holds the requested node */
1209 StgTSO* tso = event->tso;
1210 StgClosure* node = event->node; /* requested, remote node */
1211 StgClosure* closure=(StgClosure*)NULL;
1213 ASSERT(CurrentProc==proc);
1214 ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
1216 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
1217 /* There should be no FETCHREPLYs in GrAnSim Light setup */
1218 ASSERT(!RtsFlags.GranFlags.Light);
1220 /* assign message unpack costs *before* dumping the event */
1221 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1223 /* ToDo: check whether this is the right place for dumping the event */
1224 if (RtsFlags.GranFlags.GranSimStats.Full)
1225 DumpRawGranEvent(proc, creator, GR_REPLY, tso, node,
1226 tso->gran.sparkname, spark_queue_len(proc));
1228 /* THIS SHOULD NEVER HAPPEN
1229 If tso is in the BQ of node this means that it actually entered the
1230 remote closure, due to a missing GranSimFetch at the beginning of the
1231 entry code; therefore, this is actually a faked fetch, triggered from
1232 within GranSimBlock;
1233 since tso is both in the EVQ and the BQ for node, we have to take it out
1234 of the BQ first before we can handle the FetchReply;
1235 ToDo: special cases in awakenBlockedQueue, since the BQ magically moved.
1237 if (tso->block_info.closure!=(StgClosure*)NULL) {
1239 belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
1240 tso->id, tso, node));
1241 // unlink_from_bq(tso, node);
1244 if (RtsFlags.GranFlags.DoBulkFetching) { /* bulk (packet) fetching */
1245 rtsPackBuffer *buffer = (rtsPackBuffer*)node;
1246 nat size = buffer->size;
1248 /* NB: Fetch misses can't occur with GUM fetching, as */
1249 /* updatable closure are turned into RBHs and therefore locked */
1250 /* for other processors that try to grab them. */
1252 closure = UnpackGraph(buffer);
1253 CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
1254 } else // incremental fetching
1255 /* Copy or move node to CurrentProc */
1256 if (fetchNode(node, creator, proc)) {
1257 /* Fetch has failed i.e. node has been grabbed by another PE */
1258 PEs p = where_is(node);
1261 if (RtsFlags.GranFlags.GranSimStats.Global)
1262 globalGranStats.fetch_misses++;
1264 IF_GRAN_DEBUG(thunkStealing,
1265 belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
1266 CurrentTime[proc],node,p,creator));
1268 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
1270 /* Count fetch again !? */
1271 ++(tso->gran.fetchcount);
1272 tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1274 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
1275 RtsFlags.GranFlags.Costs.latency;
1277 /* Chase the grabbed node */
1278 new_event(p, proc, fetchtime,
1280 tso, node, (rtsSpark*)NULL);
1282 # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
1283 IF_GRAN_DEBUG(blockOnFetch,
1284 BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
1286 IF_GRAN_DEBUG(blockOnFetch_sanity,
1287 tso->type |= FETCH_MASK_TSO;)
1290 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
1292 return; /* NB: no REPLy has been processed; tso still sleeping */
1295 /* -- Qapla'! Fetch has been successful; node is here, now */
1296 ++(event->tso->gran.fetchcount);
1297 event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1299 /* this is now done at the beginning of this routine
1300 if (RtsFlags.GranFlags.GranSimStats.Full)
1301 DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
1302 (RtsFlags.GranFlags.DoBulkFetching ?
1305 tso->gran.sparkname, spark_queue_len(proc));
1308 ASSERT(OutstandingFetches[proc] > 0);
1309 --OutstandingFetches[proc];
1310 new_event(proc, proc, CurrentTime[proc],
1312 event->tso, (RtsFlags.GranFlags.DoBulkFetching ?
1318 //@cindex do_the_movethread
1321 do_the_movethread(rtsEvent* event) {
1322 PEs proc = event->proc, /* proc that requested node */
1323 creator = event->creator; /* proc that holds the requested node */
1324 StgTSO* tso = event->tso;
1326 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
1328 ASSERT(CurrentProc==proc);
1329 /* There should be no MOVETHREADs in GrAnSim Light setup */
1330 ASSERT(!RtsFlags.GranFlags.Light);
1331 /* MOVETHREAD events should never occur without -bM */
1332 ASSERT(RtsFlags.GranFlags.DoThreadMigration);
1333 /* Bitmask of moved thread should be 0 */
1334 ASSERT(PROCS(tso)==0);
1335 ASSERT(procStatus[proc] == Fishing ||
1336 RtsFlags.GranFlags.DoAsyncFetch);
1337 ASSERT(OutstandingFishes[proc]>0);
1339 /* ToDo: exact costs for unpacking the whole TSO */
1340 CurrentTime[proc] += 5l * RtsFlags.GranFlags.Costs.munpacktime;
1342 /* ToDo: check whether this is the right place for dumping the event */
1343 if (RtsFlags.GranFlags.GranSimStats.Full)
1344 DumpRawGranEvent(proc, creator,
1345 GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
1347 // ToDo: check cost functions
1348 --OutstandingFishes[proc];
1349 SET_GRAN_HDR(tso, ThisPE); // adjust the bitmask for the TSO
1350 insertThread(tso, proc);
1352 if (procStatus[proc]==Fishing)
1353 procStatus[proc] = Idle;
1355 if (RtsFlags.GranFlags.GranSimStats.Global)
1356 globalGranStats.tot_TSOs_migrated++;
1359 //@cindex do_the_movespark
1362 do_the_movespark(rtsEvent* event) {
1363 PEs proc = event->proc, /* proc that requested spark */
1364 creator = event->creator; /* proc that holds the requested spark */
1365 StgTSO* tso = event->tso;
1366 rtsSparkQ spark = event->spark;
1368 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
1370 ASSERT(CurrentProc==proc);
1371 ASSERT(spark!=NULL);
1372 ASSERT(procStatus[proc] == Fishing ||
1373 RtsFlags.GranFlags.DoAsyncFetch);
1374 ASSERT(OutstandingFishes[proc]>0);
1376 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1378 /* record movement of spark only if spark profiling is turned on */
1379 if (RtsFlags.GranFlags.GranSimStats.Sparks)
1380 DumpRawGranEvent(proc, creator,
1382 tso, spark->node, spark->name, spark_queue_len(proc));
1384 /* global statistics */
1385 if ( RtsFlags.GranFlags.GranSimStats.Global &&
1386 !closure_SHOULD_SPARK(spark->node))
1387 globalGranStats.withered_sparks++;
1388 /* Not adding the spark to the spark queue would be the right */
1389 /* thing here, but it also would be cheating, as this info can't be */
1390 /* available in a real system. -- HWL */
1392 --OutstandingFishes[proc];
1394 add_to_spark_queue(spark);
1396 IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
1397 print_sparkq_stats());
1399 /* Should we treat stolen sparks specially? Currently, we don't. */
1401 if (procStatus[proc]==Fishing)
1402 procStatus[proc] = Idle;
1404 /* add_to_spark_queue will increase the time of the current proc. */
1406 If proc was fishing, it is Idle now with the new spark in its spark
1407 pool. This means that the next time handleIdlePEs is called, a local
1408 FindWork will be created on this PE to turn the spark into a thread. Of
1409 course another PE might steal the spark in the meantime (that's why we
1410 are using events rather than inlining all the operations in the first
1415 In the Constellation class version of GranSim the semantics of StarThread
1416 events has changed. Now, StartThread has to perform 3 basic operations:
1417 - create a new thread (previously this was done in ActivateSpark);
1418 - insert the thread into the run queue of the current processor
1419 - generate a new event for actually running the new thread
1420 Note that the insertThread is called via createThread.
1423 //@cindex do_the_startthread
1426 do_the_startthread(rtsEvent *event)
1428 PEs proc = event->proc; /* proc that requested node */
1429 StgTSO *tso = event->tso; /* tso that requested node */
1430 StgClosure *node = event->node; /* requested, remote node */
1431 rtsSpark *spark = event->spark;
1432 GranEventType gr_evttype;
1434 ASSERT(CurrentProc==proc);
1435 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1436 ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
1437 /* if this was called via StartThread: */
1438 ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
1439 // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
1440 /* if this was called via ResumeThread: */
1441 ASSERT(event->evttype!=ResumeThread ||
1442 RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc));
1444 /* startThread may have been called from the main event handler upon
1445 finding either a ResumeThread or a StartThread event; set the
1446 gr_evttype (needed for writing to .gr file) accordingly */
1447 // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
1449 if ( event->evttype == StartThread ) {
1450 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1451 GR_START : GR_STARTQ;
1453 tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
1454 pushClosure(tso, node);
1456 // ToDo: fwd info on local/global spark to thread -- HWL
1457 // tso->gran.exported = spark->exported;
1458 // tso->gran.locked = !spark->global;
1459 tso->gran.sparkname = spark->name;
1461 ASSERT(CurrentProc==proc);
1462 if (RtsFlags.GranFlags.GranSimStats.Full)
1463 DumpGranEvent(gr_evttype,tso);
1465 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
1466 } else { // event->evttype == ResumeThread
1467 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1468 GR_RESUME : GR_RESUMEQ;
1470 insertThread(tso, proc);
1472 ASSERT(CurrentProc==proc);
1473 if (RtsFlags.GranFlags.GranSimStats.Full)
1474 DumpGranEvent(gr_evttype,tso);
1477 ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
1478 procStatus[proc] = Busy;
1479 /* make sure that this thread is actually run */
1480 new_event(proc, proc,
1483 tso, node, (rtsSpark*)NULL);
1485 /* A wee bit of statistics gathering */
1486 if (RtsFlags.GranFlags.GranSimStats.Global) {
1487 globalGranStats.tot_add_threads++;
1488 globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
1493 //@cindex do_the_findwork
1495 do_the_findwork(rtsEvent* event)
1497 PEs proc = event->proc, /* proc to search for work */
1498 creator = event->creator; /* proc that requested work */
1499 rtsSparkQ spark = event->spark;
1500 /* ToDo: check that this size is safe -- HWL */
1502 ToDo: check available heap
1504 nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
1505 // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
1508 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
1510 /* If GUM style fishing is enabled, the contents of the spark field says
1511 what to steal (spark(1) or thread(2)); */
1512 ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
1514 /* Make sure that we have enough heap for creating a new
1515 thread. This is a conservative estimate of the required heap.
1516 This eliminates special checks for GC around NewThread within
1520 ToDo: check available heap
1522 if (Hp + req_heap > HpLim ) {
1524 belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
1525 GarbageCollect(GetRoots);
1526 // ReallyPerformThreadGC(req_heap, rtsFalse); old -- HWL
1528 if (procStatus[CurrentProc]==Sparking)
1529 procStatus[CurrentProc]=Idle;
1534 if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1535 RtsFlags.GranFlags.Fishing ||
1536 ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
1537 (RtsFlags.GranFlags.FetchStrategy >= 2 ||
1538 OutstandingFetches[proc] == 0)) )
1541 rtsSparkQ prev, spark;
1544 ASSERT(procStatus[proc]==Sparking ||
1545 RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1546 RtsFlags.GranFlags.Fishing);
1548 /* SImmoHwI' yInej! Search spark queue! */
1549 /* gimme_spark (event, &found, &spark); */
1550 findLocalSpark(event, &found, &spark);
1552 if (!found) { /* pagh vumwI' */
1554 If no spark has been found this can mean 2 things:
1555 1/ The FindWork was a fish (i.e. a message sent by another PE) and
1556 the spark pool of the receiver is empty
1557 --> the fish has to be forwarded to another PE
1558 2/ The FindWork was local to this PE (i.e. no communication; in this
1559 case creator==proc) and the spark pool of the PE is not empty
1560 contains only sparks of closures that should not be sparked
1561 (note: if the spark pool were empty, handleIdlePEs wouldn't have
1562 generated a FindWork in the first place)
1563 --> the PE has to be made idle to trigger stealing sparks the next
1564 time handleIdlePEs is performed
1567 ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
1568 if (creator==proc) {
1569 /* local FindWork */
1570 if (procStatus[proc]==Busy) {
1571 belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
1572 proc, CurrentTime[proc]);
1573 procStatus[proc] = Idle;
1576 /* global FindWork i.e. a Fish */
1577 ASSERT(RtsFlags.GranFlags.Fishing);
1578 /* actually this generates another request from the originating PE */
1579 ASSERT(OutstandingFishes[creator]>0);
1580 OutstandingFishes[creator]--;
1581 /* ToDo: assign costs for sending fish to proc not to creator */
1582 stealSpark(creator); /* might steal from same PE; ToDo: fix */
1583 ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
1584 /* any assertions on state of proc possible here? */
1587 /* DaH chu' Qu' yIchen! Now create new work! */
1588 IF_GRAN_DEBUG(findWork,
1589 belch("+- munching spark %p; creating thread for node %p",
1590 spark, spark->node));
1591 activateSpark (event, spark);
1592 ASSERT(spark != (rtsSpark*)NULL);
1593 spark = delete_from_sparkq (spark, proc, rtsTrue);
1596 IF_GRAN_DEBUG(findWork,
1597 belch("+- Contents of spark queues at the end of FindWork @ %lx",
1599 print_sparkq_stats());
1601 /* ToDo: check ; not valid if GC occurs in ActivateSpark */
1603 /* forward fish or */
1605 /* local spark or */
1606 (proc==creator && procStatus[proc]==Starting)) ||
1607 //(!found && procStatus[proc]==Idle) ||
1608 RtsFlags.GranFlags.DoAlwaysCreateThreads);
1610 IF_GRAN_DEBUG(findWork,
1611 belch("+- RTS refuses to findWork on PE %d @ %lx",
1612 proc, CurrentTime[proc]);
1613 belch(" procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d",
1614 proc, proc_status_names[procStatus[proc]],
1615 RtsFlags.GranFlags.FetchStrategy,
1616 proc, OutstandingFetches[proc]));
1620 //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
1621 //@subsection GranSimLight routines
1624 This code is called from the central scheduler after having rgabbed a
1625 new event and is only needed for GranSim-Light. It mainly adjusts the
1626 ActiveTSO so that all costs that have to be assigned from within the
1627 scheduler are assigned to the right TSO. The choice of ActiveTSO depends
1628 on the type of event that has been found.
1632 GranSimLight_enter_system(event, ActiveTSOp)
1634 StgTSO **ActiveTSOp;
1636 StgTSO *ActiveTSO = *ActiveTSOp;
1638 ASSERT (RtsFlags.GranFlags.Light);
1640 /* Restore local clock of the virtual processor attached to CurrentTSO.
1641 All costs will be associated to the `virt. proc' on which the tso
1643 if (ActiveTSO != NULL) { /* already in system area */
1644 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1645 if (RtsFlags.GranFlags.DoFairSchedule)
1647 if (RtsFlags.GranFlags.GranSimStats.Full &&
1648 RtsFlags.GranFlags.Debug.checkLight)
1649 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1652 switch (event->evttype)
1654 case ContinueThread:
1655 case FindWork: /* inaccurate this way */
1656 ActiveTSO = run_queue_hd;
1660 case MoveSpark: /* has tso of virt proc in tso field of event */
1661 ActiveTSO = event->tso;
1663 default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
1664 event_names[event->evttype],event->evttype);
1666 CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
1667 if (RtsFlags.GranFlags.DoFairSchedule) {
1668 if (RtsFlags.GranFlags.GranSimStats.Full &&
1669 RtsFlags.GranFlags.Debug.checkLight)
1670 DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
1675 GranSimLight_leave_system(event, ActiveTSOp)
1677 StgTSO **ActiveTSOp;
1679 StgTSO *ActiveTSO = *ActiveTSOp;
1681 ASSERT(RtsFlags.GranFlags.Light);
1683 /* Save time of `virt. proc' which was active since last getevent and
1684 restore time of `virt. proc' where CurrentTSO is living on. */
1685 if(RtsFlags.GranFlags.DoFairSchedule) {
1686 if (RtsFlags.GranFlags.GranSimStats.Full &&
1687 RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
1688 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1690 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1691 ActiveTSO = (StgTSO*)NULL;
1692 CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
1693 if (RtsFlags.GranFlags.DoFairSchedule /* && resched */ ) {
1694 // resched = rtsFalse;
1695 if (RtsFlags.GranFlags.GranSimStats.Full &&
1696 RtsFlags.GranFlags.Debug.checkLight)
1697 DumpGranEvent(GR_SCHEDULE,run_queue_hd);
1700 if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
1701 (TimeOfNextEvent == 0 ||
1702 TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
1703 new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
1704 CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
1705 TimeOfNextEvent = get_time_of_next_event();
1710 //@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
1711 //@subsection Code for Fetching Nodes
1714 The following GrAnSim routines simulate the fetching of nodes from a
1715 remote processor. We use a 1 word bitmask to indicate on which processor
1716 a node is lying. Thus, moving or copying a node from one processor to
1717 another just requires an appropriate change in this bitmask (using
1718 @SET_GA@). Additionally, the clocks have to be updated.
1720 A special case arises when the node that is needed by processor A has
1721 been moved from a processor B to a processor C between sending out a
1722 @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
1723 be forwarded to C. This is simulated by issuing another FetchNode event
1724 on processor C with A as creator.
1727 /* ngoqvam che' {GrAnSim}! */
1729 /* Fetch node "node" to processor "p" */
1734 fetchNode(node,from,to)
1738 /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be
1739 entered! Instead, UnpackGraph is used in ReSchedule */
1740 StgClosure* closure;
1742 ASSERT(to==CurrentProc);
1743 /* Should never be entered in GrAnSim Light setup */
1744 ASSERT(!RtsFlags.GranFlags.Light);
1745 /* fetchNode should never be entered with DoBulkFetching */
1746 ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
1748 /* Now fetch the node */
1749 if (!IS_LOCAL_TO(PROCS(node),from) &&
1750 !IS_LOCAL_TO(PROCS(node),to) )
1751 return NodeHasMoved;
1753 if (closure_HNF(node)) /* node already in head normal form? */
1754 node->header.gran.procs |= PE_NUMBER(to); /* Copy node */
1756 node->header.gran.procs = PE_NUMBER(to); /* Move node */
1762 Process a fetch request.
1764 Cost of sending a packet of size n = C + P*n
1765 where C = packet construction constant,
1766 P = cost of packing one word into a packet
1767 [Should also account for multiple packets].
1770 //@cindex handleFetchRequest
1773 handleFetchRequest(node,to,from,tso)
1774 StgClosure* node; // the node which is requested
1775 PEs to, from; // fetch request: from -> to
1776 StgTSO* tso; // the tso which needs the node
1778 ASSERT(!RtsFlags.GranFlags.Light);
1779 /* ToDo: check assertion */
1780 ASSERT(OutstandingFetches[from]>0);
1782 /* probably wrong place; */
1783 ASSERT(CurrentProc==to);
1785 if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
1787 IF_GRAN_DEBUG(thunkStealing,
1788 fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n",
1789 node, info_type(node), from));
1791 if (RtsFlags.GranFlags.DoBulkFetching) {
1793 rtsPackBuffer *graph;
1795 /* Create a 1-node-buffer and schedule a FETCHREPLY now */
1796 graph = PackOneNode(node, tso, &size);
1797 new_event(from, to, CurrentTime[to],
1799 tso, (StgClosure *)graph, (rtsSpark*)NULL);
1801 new_event(from, to, CurrentTime[to],
1803 tso, node, (rtsSpark*)NULL);
1805 IF_GRAN_DEBUG(thunkStealing,
1806 belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
1807 return (NodeIsLocal);
1809 else if (IS_LOCAL_TO(PROCS(node), to) ) /* Is node still here? */
1811 if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
1812 nat size; /* (code from GUM) */
1815 if (IS_BLACK_HOLE(node)) { /* block on BH or RBH */
1816 new_event(from, to, CurrentTime[to],
1818 tso, node, (rtsSpark*)NULL);
1819 /* Note: blockFetch is done when handling GLOBALBLOCK event;
1820 make sure the TSO stays out of the run queue */
1821 /* When this thread is reawoken it does the usual: it tries to
1822 enter the updated node and issues a fetch if it's remote.
1823 It has forgotten that it has sent a fetch already (i.e. a
1824 FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
1825 --OutstandingFetches[from];
1827 IF_GRAN_DEBUG(thunkStealing,
1828 belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ",
1830 if (RtsFlags.GranFlags.GranSimStats.Global) {
1831 globalGranStats.tot_FMBQs++;
1836 /* The tso requesting the node is blocked and cannot be on a run queue */
1837 ASSERT(!is_on_queue(tso, from));
1839 // ToDo: check whether graph is ever used as an rtsPackBuffer!!
1840 if ((graph = (StgClosure *)PackNearbyGraph(node, tso, &size, 0)) == NULL)
1841 return (OutOfHeap); /* out of heap */
1843 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1844 /* Send a reply to the originator */
1845 /* ToDo: Replace that by software costs for doing graph packing! */
1846 CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
1849 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1851 tso, (StgClosure *)graph, (rtsSpark*)NULL);
1853 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1855 } else { /* incremental (single closure) fetching */
1856 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1857 /* Send a reply to the originator */
1858 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1861 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1863 tso, node, (rtsSpark*)NULL);
1865 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1869 else /* Qu'vatlh! node has been grabbed by another proc => forward */
1871 PEs node_loc = where_is(node);
1874 IF_GRAN_DEBUG(thunkStealing,
1875 belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
1876 node,node_loc,to,from,CurrentTime[to]));
1877 if (RtsFlags.GranFlags.GranSimStats.Global) {
1878 globalGranStats.fetch_misses++;
1881 /* Prepare FORWARD message to proc p_new */
1882 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1884 fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
1885 RtsFlags.GranFlags.Costs.latency;
1887 new_event(node_loc, from, fetchtime,
1889 tso, node, (rtsSpark*)NULL);
1891 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1893 return (NodeHasMoved);
1898 blockFetch blocks a BlockedFetch node on some kind of black hole.
1900 Taken from gum/HLComms.lc. [find a better place for that ?] -- HWL
1902 {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
1903 create @FMBQ@'s (FetchMe blocking queues) to cope with global
1904 blocking. Instead, non-local TSO are put into the BQ in the same way as
1905 local TSOs. However, we have to check if a TSO is local or global in
1906 order to account for the latencies involved and for keeping track of the
1907 number of fetches that are really going on.
1910 //@cindex blockFetch
1913 blockFetch(tso, proc, bh)
1914 StgTSO* tso; /* TSO which gets blocked */
1915 PEs proc; /* PE where that tso was running */
1916 StgClosure* bh; /* closure to block on (BH, RBH, BQ) */
1921 fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n",
1922 tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
1924 if (!IS_BLACK_HOLE(bh)) { /* catches BHs and RBHs */
1926 fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n",
1927 bh, info_type(bh), tso, tso->id, proc));
1929 /* No BH anymore => immediately unblock tso */
1930 new_event(proc, proc, CurrentTime[proc],
1932 tso, bh, (rtsSpark*)NULL);
1934 /* Is this always a REPLY to a FETCH in the profile ? */
1935 if (RtsFlags.GranFlags.GranSimStats.Full)
1936 DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
1937 return (NodeIsNoBH);
1940 /* DaH {BQ}Daq Qu' Suq 'e' wISov!
1941 Now we know that we have to put the tso into the BQ.
1942 2 cases: If block-on-fetch, tso is at head of threadq =>
1943 => take it out of threadq and into BQ
1944 If reschedule-on-fetch, tso is only pointed to be event
1945 => just put it into BQ
1948 if (!RtsFlags.GranFlags.DoAsyncFetch) {
1949 GranSimBlock(tso, proc, bh);
1951 if (RtsFlags.GranFlags.GranSimStats.Full)
1952 DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
1953 ++(tso->gran.blockcount);
1954 tso->gran.blockedat = CurrentTime[proc];
1958 /* after scheduling the GlobalBlock event the TSO is not put into the
1959 run queue again; it is only pointed to via the event we are
1960 processing now; in GranSim 4.xx there is no difference between
1961 synchr and asynchr comm here */
1962 ASSERT(!is_on_queue(tso, proc));
1963 ASSERT(tso->link == END_TSO_QUEUE);
1965 GranSimBlock(tso, proc, bh); /* GranSim statistics gathering */
1967 /* Now, put tso into BQ (similar to blocking entry codes) */
1968 info = get_itbl(bh);
1969 switch (info -> type) {
1972 case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1973 case SE_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1974 case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
1975 /* basically an inlined version of BLACKHOLE_entry -- HWL */
1976 /* Change the BLACKHOLE into a BLACKHOLE_BQ */
1977 ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1978 /* Put ourselves on the blocking queue for this black hole */
1979 // tso->link=END_TSO_QUEUE; not necessary; see assertion above
1980 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1981 tso->block_info.closure = bh;
1982 recordMutable((StgMutClosure *)bh);
1986 /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
1987 tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue);
1988 ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1989 recordMutable((StgMutClosure *)bh);
1991 # if 0 && defined(GC_MUT_REQUIRED)
1992 ToDo: check whether recordMutable is necessary -- HWL
1994 * If we modify a black hole in the old generation, we have to make
1995 * sure it goes on the mutables list
1998 if (bh <= StorageMgrInfo.OldLim) {
1999 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
2000 StorageMgrInfo.OldMutables = bh;
2002 MUT_LINK(bh) = MUT_NOT_LINKED;
2007 barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
2013 barf("Qagh: thought %p was a black hole (IP %p (%s))",
2014 bh, info, info_type(bh));
2021 //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
2022 //@subsection Idle PEs
2025 Export work to idle PEs. This function is called from @ReSchedule@
2026 before dispatching on the current event. @HandleIdlePEs@ iterates over
2027 all PEs, trying to get work for idle PEs. Note, that this is a
2028 simplification compared to GUM's fishing model. We try to compensate for
2029 that by making the cost for stealing work dependent on the number of
2030 idle processors and thereby on the probability with which a randomly
2031 sent fish would find work.
2034 //@cindex handleIdlePEs
2041 IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
2043 /* Should never be entered in GrAnSim Light setup */
2044 ASSERT(!RtsFlags.GranFlags.Light);
2046 /* Could check whether there are idle PEs if it's a cheap check */
2047 for (p = 0; p < RtsFlags.GranFlags.proc; p++)
2048 if (procStatus[p]==Idle) /* && IS_SPARKING(p) && IS_STARTING(p) */
2049 /* First look for local work i.e. examine local spark pool! */
2050 if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
2051 new_event(p, p, CurrentTime[p],
2053 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2054 procStatus[p] = Sparking;
2055 } else if ((RtsFlags.GranFlags.maxFishes==0 ||
2056 OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
2058 /* If no local work then try to get remote work!
2059 Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
2060 if (RtsFlags.GranFlags.DoStealThreadsFirst &&
2061 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
2063 if (SurplusThreads > 0l) /* Steal a thread */
2066 if (procStatus[p]!=Idle)
2070 if (SparksAvail > 0 &&
2071 (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
2074 if (SurplusThreads > 0 &&
2075 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
2081 Steal a spark and schedule moving it to proc. We want to look at PEs in
2082 clock order -- most retarded first. Currently sparks are only stolen
2083 from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
2084 this should be changed to first steal from the former then from the
2087 We model a sort of fishing mechanism by counting the number of sparks
2088 and threads we are currently stealing. */
2091 Return a random nat value in the intervall [from, to)
2101 /* random returns a value in [0, RAND_MAX] */
2102 r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
2103 r = (r==to) ? from : r;
2104 ASSERT(from<=r && (r<to || from==to));
2109 Find any PE other than proc. Used for GUM style fishing only.
2117 ASSERT(RtsFlags.GranFlags.Fishing);
2118 if (RtsFlags.GranFlags.RandomSteal) {
2119 p = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2123 IF_GRAN_DEBUG(randomSteal,
2124 belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
2131 Magic code for stealing sparks/threads makes use of global knowledge on
2135 sortPEsByTime (proc, pes_by_time, firstp, np)
2140 PEs p, temp, n, i, j;
2141 nat first, upb, r=0, q=0;
2143 ASSERT(!RtsFlags.GranFlags.Fishing);
2146 upb = RtsFlags.GranFlags.proc; /* full range of PEs */
2148 if (RtsFlags.GranFlags.RandomSteal) {
2149 r = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2155 /* pes_by_time shall contain processors from which we may steal sparks */
2156 for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
2157 if ((proc != p) && // not the current proc
2158 (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
2159 (CurrentTime[p] <= CurrentTime[CurrentProc]))
2160 pes_by_time[n++] = p;
2162 /* sort pes_by_time */
2163 for(i=0; i < n; ++i)
2164 for(j=i+1; j < n; ++j)
2165 if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
2166 rtsTime temp = pes_by_time[i];
2167 pes_by_time[i] = pes_by_time[j];
2168 pes_by_time[j] = temp;
2171 /* Choose random processor to steal spark from; first look at processors */
2172 /* that are earlier than the current one (i.e. proc) */
2174 (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
2178 /* if the assertion below is true we can get rid of first */
2179 /* ASSERT(first==n); */
2180 /* ToDo: check if first is really needed; find cleaner solution */
2187 Steal a spark (piece of work) from any processor and bring it to proc.
2189 //@cindex stealSpark
2191 stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
2194 Steal a thread from any processor and bring it to proc i.e. thread migration
2196 //@cindex stealThread
2198 stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
2201 Steal a spark or a thread and schedule moving it to proc.
2203 //@cindex stealSomething
2205 stealSomething(proc, steal_spark, steal_thread)
2206 PEs proc; // PE that needs work (stealer)
2207 rtsBool steal_spark, steal_thread; // should a spark and/or thread be stolen
2210 rtsTime fish_arrival_time;
2211 rtsSpark *spark, *prev, *next;
2212 rtsBool stolen = rtsFalse;
2214 ASSERT(steal_spark || steal_thread);
2216 /* Should never be entered in GrAnSim Light setup */
2217 ASSERT(!RtsFlags.GranFlags.Light);
2218 ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
2220 if (!RtsFlags.GranFlags.Fishing) {
2221 // ToDo: check if stealing threads is prefered over stealing sparks
2223 if (stealSparkMagic(proc))
2225 else // no spark found
2227 return stealThreadMagic(proc);
2228 else // no thread found
2230 } else { // ASSERT(steal_thread);
2231 return stealThreadMagic(proc);
2233 barf("stealSomething: never reached");
2236 /* The rest of this function does GUM style fishing */
2238 p = findRandomPE(proc); /* find a random PE other than proc */
2240 /* Message packing costs for sending a Fish; qeq jabbI'ID */
2241 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
2243 /* use another GranEvent for requesting a thread? */
2244 if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
2245 DumpRawGranEvent(p, proc, SP_REQUESTED,
2246 (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
2248 /* time of the fish arrival on the remote PE */
2249 fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
2251 /* Phps use an own Fish event for that? */
2252 /* The contents of the spark component is a HACK:
2253 1 means give me a spark;
2254 2 means give me a thread
2255 0 means give me nothing (this should never happen)
2257 new_event(p, proc, fish_arrival_time,
2259 (StgTSO*)NULL, (StgClosure*)NULL,
2260 (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
2262 ++OutstandingFishes[proc];
2263 /* only with Async fetching? */
2264 if (procStatus[proc]==Idle)
2265 procStatus[proc]=Fishing;
2267 /* time needed to clean up buffers etc after sending a message */
2268 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
2270 /* If GUM style fishing stealing always succeeds because it only consists
2271 of sending out a fish; of course, when the fish may return
2277 This version of stealing a spark makes use of the global info on all
2278 spark pools etc which is not available in a real parallel system.
2279 This could be extended to test e.g. the impact of perfect load information.
2281 //@cindex stealSparkMagic
2283 stealSparkMagic(proc)
2286 PEs p=0, i=0, j=0, n=0, first, upb;
2287 rtsSpark *spark=NULL, *next;
2288 PEs pes_by_time[MAX_PROC];
2289 rtsBool stolen = rtsFalse;
2292 /* Should never be entered in GrAnSim Light setup */
2293 ASSERT(!RtsFlags.GranFlags.Light);
2295 sortPEsByTime(proc, pes_by_time, &first, &n);
2297 while (!stolen && n>0) {
2298 upb = (first==0) ? n : first;
2299 i = natRandom(0,upb); /* choose a random eligible PE */
2302 IF_GRAN_DEBUG(randomSteal,
2303 belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
2306 ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
2308 /* Now go through rtsSparkQ and steal the first eligible spark */
2310 spark = pending_sparks_hds[p];
2311 while (!stolen && spark != (rtsSpark*)NULL)
2313 /* NB: no prev pointer is needed here because all sparks that are not
2316 if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
2317 spark->next==(rtsSpark*)NULL)
2319 /* Be social! Don't steal the only spark of an idle processor
2320 not {spark} neH yInIH !! */
2321 break; /* next PE */
2323 else if (closure_SHOULD_SPARK(spark->node))
2325 /* Don't Steal local sparks;
2326 ToDo: optionally prefer local over global sparks
2327 if (!spark->global) {
2329 continue; next spark
2332 /* found a spark! */
2334 /* Prepare message for sending spark */
2335 CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
2337 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2338 DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
2339 (StgTSO*)NULL, spark->node,
2340 spark->name, spark_queue_len(p));
2342 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2347 new_event(proc, p /* CurrentProc */, stealtime,
2349 (StgTSO*)NULL, spark->node, spark);
2352 ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
2353 if (procStatus[proc]==Idle)
2354 procStatus[proc] = Fishing;
2355 ++(spark->global); /* record that this is a global spark */
2356 ASSERT(SparksAvail>0);
2357 --SparksAvail; /* on-the-fly sparks are not available */
2358 next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
2359 CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
2361 else /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
2363 IF_GRAN_DEBUG(checkSparkQ,
2364 belch("^^ pruning spark %p (node %p) in stealSparkMagic",
2365 spark, spark->node));
2367 /* if the spark points to a node that should not be sparked,
2368 prune the spark queue at this point */
2369 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2370 DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
2371 (StgTSO*)NULL, spark->node,
2372 spark->name, spark_queue_len(p));
2373 if (RtsFlags.GranFlags.GranSimStats.Global)
2374 globalGranStats.pruned_sparks++;
2376 ASSERT(SparksAvail>0);
2378 spark = delete_from_sparkq(spark, p, rtsTrue);
2380 /* unlink spark (may have been freed!) from sparkq;
2381 if (prev == NULL) // spark was head of spark queue
2382 pending_sparks_hds[p] = spark->next;
2384 prev->next = spark->next;
2385 if (spark->next == NULL)
2386 pending_sparks_tls[p] = prev;
2390 } /* while ... iterating over sparkq */
2392 /* ToDo: assert that PE p still has work left after stealing the spark */
2394 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2395 ASSERT(pes_by_time[i]==p);
2397 /* remove p from the list (at pos i) */
2398 for (j=i; j+1<n; j++)
2399 pes_by_time[j] = pes_by_time[j+1];
2402 /* update index to first proc which is later (or equal) than proc */
2405 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2409 } /* while ... iterating over PEs in pes_by_time */
2411 IF_GRAN_DEBUG(randomSteal,
2413 belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2414 spark, spark->node, proc, p,
2415 SparksAvail, idlers());
2417 belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
2418 proc, SparksAvail, idlers()));
2420 if (RtsFlags.GranFlags.GranSimStats.Global &&
2421 stolen && (i!=0)) { /* only for statistics */
2422 globalGranStats.rs_sp_count++;
2423 globalGranStats.ntimes_total += n;
2424 globalGranStats.fl_total += first;
2425 globalGranStats.no_of_steals++;
2432 The old stealThread code, which makes use of global info and does not
2434 NB: most of this is the same as in stealSparkMagic;
2435 only the pieces specific to processing thread queues are different;
2436 long live polymorphism!
2439 //@cindex stealThreadMagic
2441 stealThreadMagic(proc)
2444 PEs p=0, i=0, j=0, n=0, first, upb;
2445 StgTSO *tso=END_TSO_QUEUE;
2446 PEs pes_by_time[MAX_PROC];
2447 rtsBool stolen = rtsFalse;
2450 /* Should never be entered in GrAnSim Light setup */
2451 ASSERT(!RtsFlags.GranFlags.Light);
2453 sortPEsByTime(proc, pes_by_time, &first, &n);
2455 while (!stolen && n>0) {
2456 upb = (first==0) ? n : first;
2457 i = natRandom(0,upb); /* choose a random eligible PE */
2460 IF_GRAN_DEBUG(randomSteal,
2461 belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
2464 /* Steal the first exportable thread in the runnable queue but
2465 never steal the first in the queue for social reasons;
2466 not Qu' wa'DIch yInIH !!
2468 /* Would be better to search through queue and have options which of
2469 the threads to pick when stealing */
2470 if (run_queue_hds[p] == END_TSO_QUEUE) {
2471 IF_GRAN_DEBUG(randomSteal,
2472 belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)",
2475 tso = run_queue_hds[p]->link; /* tso is *2nd* thread in thread queue */
2479 /* update links in queue */
2480 run_queue_hds[p]->link = tso->link;
2481 if (run_queue_tls[p] == tso)
2482 run_queue_tls[p] = run_queue_hds[p];
2484 /* ToDo: Turn magic constants into params */
2486 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
2488 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2492 + 4l * RtsFlags.GranFlags.Costs.additional_latency
2493 + 5l * RtsFlags.GranFlags.Costs.munpacktime;
2495 /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
2496 SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */);
2498 /* Move from one queue to another */
2499 new_event(proc, p, stealtime,
2501 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
2503 /* MAKE_BUSY(proc); not yet; only when thread is in threadq */
2504 ++OutstandingFishes[proc];
2505 if (procStatus[proc])
2506 procStatus[proc] = Fishing;
2509 if(RtsFlags.GranFlags.GranSimStats.Full)
2510 DumpRawGranEvent(p, proc,
2512 tso, (StgClosure*)NULL, (StgInt)0, 0);
2514 /* costs for tidying up buffer after having sent it */
2515 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
2518 /* ToDo: assert that PE p still has work left after stealing the spark */
2520 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2521 ASSERT(pes_by_time[i]==p);
2523 /* remove p from the list (at pos i) */
2524 for (j=i; j+1<n; j++)
2525 pes_by_time[j] = pes_by_time[j+1];
2528 /* update index to first proc which is later (or equal) than proc */
2531 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2535 } /* while ... iterating over PEs in pes_by_time */
2537 IF_GRAN_DEBUG(randomSteal,
2539 belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2540 tso->id, tso, proc, p,
2541 SparksAvail, idlers());
2543 belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
2544 proc, SparksAvail, idlers()));
2546 if (RtsFlags.GranFlags.GranSimStats.Global &&
2547 stolen && (i!=0)) { /* only for statistics */
2548 /* ToDo: more statistics on avg thread queue lenght etc */
2549 globalGranStats.rs_t_count++;
2550 globalGranStats.no_of_migrates++;
2556 //@cindex sparkStealTime
2558 sparkStealTime(void)
2560 double fishdelay, sparkdelay, latencydelay;
2561 fishdelay = (double)RtsFlags.GranFlags.proc/2;
2562 sparkdelay = fishdelay -
2563 ((fishdelay-1.0)/(double)(RtsFlags.GranFlags.proc-1))*((double)idlers());
2564 latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
2566 return((rtsTime)latencydelay);
2569 //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
2570 //@subsection Routines directly called from Haskell world
2572 The @GranSim...@ routines in here are directly called via macros from the
2575 First some auxiliary routines.
2578 /* Take the current thread off the thread queue and thereby activate the
2579 next thread. It's assumed that the next ReSchedule after this uses
2580 NEW_THREAD as param.
2581 This fct is called from GranSimBlock and GranSimFetch
2584 //@cindex ActivateNextThread
2587 ActivateNextThread (proc)
2592 This routine is entered either via GranSimFetch or via GranSimBlock.
2593 It has to prepare the CurrentTSO for being blocked and update the
2594 run queue and other statistics on PE proc. The actual enqueuing to the
2595 blocking queue (if coming from GranSimBlock) is done in the entry code
2596 of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
2598 /* ToDo: add assertions here!! */
2599 //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
2601 // Only necessary if the running thread is at front of the queue
2602 // run_queue_hds[proc] = run_queue_hds[proc]->link;
2603 ASSERT(CurrentProc==proc);
2604 ASSERT(!is_on_queue(CurrentTSO,proc));
2605 if (run_queue_hds[proc]==END_TSO_QUEUE) {
2606 /* NB: this routine is only entered with asynchr comm (see assertion) */
2607 procStatus[proc] = Idle;
2609 /* ToDo: check cost assignment */
2610 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
2611 if (RtsFlags.GranFlags.GranSimStats.Full &&
2612 (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight))
2613 /* right flag !?? ^^^ */
2614 DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
2615 (StgClosure*)NULL, (StgInt)0, 0);
2620 The following GranSim fcts are stg-called from the threaded world.
2623 /* Called from HP_CHK and friends (see StgMacros.h) */
2624 //@cindex GranSimAllocate
2629 CurrentTSO->gran.allocs += n;
2630 ++(CurrentTSO->gran.basicblocks);
2632 if (RtsFlags.GranFlags.GranSimStats.Heap) {
2633 DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
2634 (StgClosure*)NULL, (StgInt)0, n);
2637 CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
2638 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
2642 Subtract the values added above, if a heap check fails and
2643 so has to be redone.
2645 //@cindex GranSimUnallocate
2647 GranSimUnallocate(n)
2650 CurrentTSO->gran.allocs -= n;
2651 --(CurrentTSO->gran.basicblocks);
2653 CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2654 CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2657 /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
2658 //@cindex GranSimExec
2660 GranSimExec(ariths,branches,loads,stores,floats)
2661 StgWord ariths,branches,loads,stores,floats;
2663 StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths +
2664 RtsFlags.GranFlags.Costs.branch_cost*branches +
2665 RtsFlags.GranFlags.Costs.load_cost * loads +
2666 RtsFlags.GranFlags.Costs.store_cost*stores +
2667 RtsFlags.GranFlags.Costs.float_cost*floats;
2669 CurrentTSO->gran.exectime += cost;
2670 CurrentTime[CurrentProc] += cost;
2674 Fetch the node if it isn't local
2675 -- result indicates whether fetch has been done.
2677 This is GRIP-style single item fetching.
2680 //@cindex GranSimFetch
2682 GranSimFetch(node /* , liveness_mask */ )
2684 /* StgInt liveness_mask; */
2686 /* reset the return value (to be checked within STG land) */
2687 NeedToReSchedule = rtsFalse;
2689 if (RtsFlags.GranFlags.Light) {
2690 /* Always reschedule in GrAnSim-Light to prevent one TSO from
2692 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2693 ContinueThread,CurrentTSO,node,NULL);
2698 /* Faking an RBH closure:
2699 If the bitmask of the closure is 0 then this node is a fake RBH;
2701 if (node->header.gran.procs == Nowhere) {
2703 belch("## Found fake RBH (node %p); delaying TSO %d (%p)",
2704 node, CurrentTSO->id, CurrentTSO));
2706 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
2707 ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
2709 /* Rescheduling (GranSim internal) is necessary */
2710 NeedToReSchedule = rtsTrue;
2715 /* Note: once a node has been fetched, this test will be passed */
2716 if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
2718 PEs p = where_is(node);
2721 IF_GRAN_DEBUG(thunkStealing,
2723 belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
2725 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2726 /* NB: Fetch is counted on arrival (FetchReply) */
2728 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
2729 RtsFlags.GranFlags.Costs.latency;
2731 new_event(p, CurrentProc, fetchtime,
2732 FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
2734 if (fetchtime<TimeOfNextEvent)
2735 TimeOfNextEvent = fetchtime;
2737 /* About to block */
2738 CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
2740 ++OutstandingFetches[CurrentProc];
2742 if (RtsFlags.GranFlags.DoAsyncFetch)
2743 /* if asynchr comm is turned on, activate the next thread in the q */
2744 ActivateNextThread(CurrentProc);
2746 procStatus[CurrentProc] = Fetching;
2749 /* ToDo: nuke the entire if (anything special for fair schedule?) */
2750 if (RtsFlags.GranFlags.DoAsyncFetch)
2752 /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
2753 if(!RtsFlags.GranFlags.DoFairSchedule)
2755 /* now done in do_the_fetchnode
2756 if (RtsFlags.GranFlags.GranSimStats.Full)
2757 DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
2758 node, (StgInt)0, 0);
2760 ActivateNextThread(CurrentProc);
2762 # if 0 && defined(GRAN_CHECK)
2763 if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
2764 if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
2765 fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
2766 CurrentTSO,CurrentTime[CurrentProc]);
2767 stg_exit(EXIT_FAILURE);
2769 TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
2773 CurrentTSO->link = END_TSO_QUEUE;
2774 /* CurrentTSO = END_TSO_QUEUE; */
2776 /* CurrentTSO is pointed to by the FetchNode event; it is
2777 on no run queue any more */
2778 } else { /* fair scheduling currently not supported -- HWL */
2779 barf("Asynchr communication is not yet compatible with fair scheduling\n");
2781 } else { /* !RtsFlags.GranFlags.DoAsyncFetch */
2782 procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
2783 /* now done in do_the_fetchnode
2784 if (RtsFlags.GranFlags.GranSimStats.Full)
2785 DumpRawGranEvent(CurrentProc, p,
2786 GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
2788 IF_GRAN_DEBUG(blockOnFetch,
2789 BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
2793 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2795 /* Rescheduling (GranSim internal) is necessary */
2796 NeedToReSchedule = rtsTrue;
2803 //@cindex GranSimSpark
2805 GranSimSpark(local,node)
2809 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2810 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2811 DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
2812 END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
2814 /* Force the PE to take notice of the spark */
2815 if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2816 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2818 END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
2819 if (CurrentTime[CurrentProc]<TimeOfNextEvent)
2820 TimeOfNextEvent = CurrentTime[CurrentProc];
2824 ++CurrentTSO->gran.localsparks;
2826 ++CurrentTSO->gran.globalsparks;
2829 //@cindex GranSimSparkAt
2831 GranSimSparkAt(spark,where,identifier)
2833 StgClosure *where; /* This should be a node; alternatively could be a GA */
2836 PEs p = where_is(where);
2837 GranSimSparkAtAbs(spark,p,identifier);
2840 //@cindex GranSimSparkAtAbs
2842 GranSimSparkAtAbs(spark,proc,identifier)
2849 if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
2850 return; /* turned a spark into a NULL. */
2852 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2853 if(RtsFlags.GranFlags.GranSimStats.Sparks)
2854 DumpRawGranEvent(proc,0,SP_SPARKAT,
2855 END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
2857 if (proc!=CurrentProc) {
2858 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2859 exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]?
2860 CurrentTime[proc]: CurrentTime[CurrentProc])
2861 + RtsFlags.GranFlags.Costs.latency;
2863 exporttime = CurrentTime[CurrentProc];
2866 if ( RtsFlags.GranFlags.Light )
2867 /* Need CurrentTSO in event field to associate costs with creating
2868 spark even in a GrAnSim Light setup */
2869 new_event(proc, CurrentProc, exporttime,
2871 CurrentTSO, spark->node, spark);
2873 new_event(proc, CurrentProc, exporttime,
2874 MoveSpark, (StgTSO*)NULL, spark->node, spark);
2875 /* Bit of a hack to treat placed sparks the same as stolen sparks */
2876 ++OutstandingFishes[proc];
2878 /* Force the PE to take notice of the spark (FINDWORK is put after a
2879 MoveSpark into the sparkq!) */
2880 if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2881 new_event(CurrentProc,CurrentProc,exporttime+1,
2883 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2886 if (exporttime<TimeOfNextEvent)
2887 TimeOfNextEvent = exporttime;
2889 if (proc!=CurrentProc) {
2890 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2891 ++CurrentTSO->gran.globalsparks;
2893 ++CurrentTSO->gran.localsparks;
2898 This function handles local and global blocking. It's called either
2899 from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
2900 trying to fetch an BH or RBH
2903 //@cindex GranSimBlock
2905 GranSimBlock(tso, proc, node)
2910 PEs node_proc = where_is(node),
2911 tso_proc = where_is((StgClosure *)tso);
2913 ASSERT(tso_proc==CurrentProc);
2914 // ASSERT(node_proc==CurrentProc);
2916 if (node_proc!=CurrentProc)
2917 belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
2918 tso->id, tso, tso_proc, node, node_proc));
2919 ASSERT(tso->link==END_TSO_QUEUE);
2920 ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
2921 //ASSERT(tso==run_queue_hds[proc]);
2924 belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
2925 tso->id, tso, proc, node, CurrentTime[proc]));
2928 /* THIS SHOULD NEVER HAPPEN!
2929 If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
2930 we have missed a GranSimFetch before entering this closure;
2931 we hack around it for now, faking a FetchNode;
2932 because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
2933 tso will be blocked on this closure until the FetchReply occurs.
2937 if (node_proc!=CurrentProc) {
2939 ret = GranSimFetch(node);
2942 belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
2943 node, node_proc, CurrentProc););
2948 if (RtsFlags.GranFlags.GranSimStats.Full)
2949 DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
2951 ++(tso->gran.blockcount);
2952 /* Distinction between local and global block is made in blockFetch */
2953 tso->gran.blockedat = CurrentTime[proc];
2955 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
2956 ActivateNextThread(proc);
2957 /* tso->link = END_TSO_QUEUE; not really necessary; only for testing */
2962 //@node Index, , Dumping routines, GranSim specific code
2966 //* ActivateNextThread:: @cindex\s-+ActivateNextThread
2967 //* CurrentProc:: @cindex\s-+CurrentProc
2968 //* CurrentTime:: @cindex\s-+CurrentTime
2969 //* GranSimAllocate:: @cindex\s-+GranSimAllocate
2970 //* GranSimBlock:: @cindex\s-+GranSimBlock
2971 //* GranSimExec:: @cindex\s-+GranSimExec
2972 //* GranSimFetch:: @cindex\s-+GranSimFetch
2973 //* GranSimLight_insertThread:: @cindex\s-+GranSimLight_insertThread
2974 //* GranSimSpark:: @cindex\s-+GranSimSpark
2975 //* GranSimSparkAt:: @cindex\s-+GranSimSparkAt
2976 //* GranSimSparkAtAbs:: @cindex\s-+GranSimSparkAtAbs
2977 //* GranSimUnallocate:: @cindex\s-+GranSimUnallocate
2978 //* any_idle:: @cindex\s-+any_idle
2979 //* blockFetch:: @cindex\s-+blockFetch
2980 //* do_the_fetchnode:: @cindex\s-+do_the_fetchnode
2981 //* do_the_fetchreply:: @cindex\s-+do_the_fetchreply
2982 //* do_the_findwork:: @cindex\s-+do_the_findwork
2983 //* do_the_globalblock:: @cindex\s-+do_the_globalblock
2984 //* do_the_movespark:: @cindex\s-+do_the_movespark
2985 //* do_the_movethread:: @cindex\s-+do_the_movethread
2986 //* do_the_startthread:: @cindex\s-+do_the_startthread
2987 //* do_the_unblock:: @cindex\s-+do_the_unblock
2988 //* fetchNode:: @cindex\s-+fetchNode
2989 //* ga_to_proc:: @cindex\s-+ga_to_proc
2990 //* get_next_event:: @cindex\s-+get_next_event
2991 //* get_time_of_next_event:: @cindex\s-+get_time_of_next_event
2992 //* grab_event:: @cindex\s-+grab_event
2993 //* handleFetchRequest:: @cindex\s-+handleFetchRequest
2994 //* handleIdlePEs:: @cindex\s-+handleIdlePEs
2995 //* idlers:: @cindex\s-+idlers
2996 //* insertThread:: @cindex\s-+insertThread
2997 //* insert_event:: @cindex\s-+insert_event
2998 //* is_on_queue:: @cindex\s-+is_on_queue
2999 //* is_unique:: @cindex\s-+is_unique
3000 //* new_event:: @cindex\s-+new_event
3001 //* prepend_event:: @cindex\s-+prepend_event
3002 //* print_event:: @cindex\s-+print_event
3003 //* print_eventq:: @cindex\s-+print_eventq
3004 //* prune_eventq :: @cindex\s-+prune_eventq
3005 //* spark queue:: @cindex\s-+spark queue
3006 //* sparkStealTime:: @cindex\s-+sparkStealTime
3007 //* stealSomething:: @cindex\s-+stealSomething
3008 //* stealSpark:: @cindex\s-+stealSpark
3009 //* stealSparkMagic:: @cindex\s-+stealSparkMagic
3010 //* stealThread:: @cindex\s-+stealThread
3011 //* stealThreadMagic:: @cindex\s-+stealThreadMagic
3012 //* thread_queue_len:: @cindex\s-+thread_queue_len
3013 //* traverse_eventq_for_gc:: @cindex\s-+traverse_eventq_for_gc
3014 //* where_is:: @cindex\s-+where_is