2 Time-stamp: <Tue Mar 06 2001 00:17:42 Stardate: [-30]6285.06 hwloidl>
3 $Id: GranSim.c,v 1.4 2001/03/22 03:51:11 hwloidl Exp $
5 Variables and functions specific to GranSim the parallelism simulator
9 //@node GranSim specific code, , ,
10 //@section GranSim specific code
13 Macros for dealing with the new and improved GA field for simulating
14 parallel execution. Based on @CONCURRENT@ package. The GA field now
15 contains a mask, where the n-th bit stands for the n-th processor, where
16 this data can be found. In case of multiple copies, several bits are
17 set. The total number of processors is bounded by @MAX_PROC@, which
18 should be <= the length of a word in bits. -- HWL
23 //* Prototypes and externs::
24 //* Constants and Variables::
26 //* Global Address Operations::
27 //* Global Event Queue::
28 //* Spark queue functions::
29 //* Scheduling functions::
30 //* Thread Queue routines::
31 //* GranSim functions::
32 //* GranSimLight routines::
33 //* Code for Fetching Nodes::
35 //* Routines directly called from Haskell world::
36 //* Emiting profiling info for GrAnSim::
37 //* Dumping routines::
41 //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
42 //@subsection Includes
47 #include "StgMiscClosures.h"
50 #include "SchedAPI.h" // for pushClosure
52 #include "GranSimRts.h"
54 #include "ParallelRts.h"
55 #include "ParallelDebug.h"
57 #include "Storage.h" // for recordMutable
60 //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
61 //@subsection Prototypes and externs
66 static inline PEs ga_to_proc(StgWord);
67 static inline rtsBool any_idle(void);
68 static inline nat idlers(void);
69 PEs where_is(StgClosure *node);
71 static rtsBool stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
72 static rtsBool stealSpark(PEs proc);
73 static rtsBool stealThread(PEs proc);
74 static rtsBool stealSparkMagic(PEs proc);
75 static rtsBool stealThreadMagic(PEs proc);
76 /* subsumed by stealSomething
77 static void stealThread(PEs proc);
78 static void stealSpark(PEs proc);
80 static rtsTime sparkStealTime(void);
81 static nat natRandom(nat from, nat to);
82 static PEs findRandomPE(PEs proc);
83 static void sortPEsByTime (PEs proc, PEs *pes_by_time,
84 nat *firstp, nat *np);
90 //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
91 //@subsection Constants and Variables
93 #if defined(GRAN) || defined(PAR)
94 /* See GranSim.h for the definition of the enum gran_event_types */
95 char *gran_event_names[] = {
97 "STEALING", "STOLEN", "STOLEN(Q)",
98 "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
99 "SCHEDULE", "DESCHEDULE",
101 "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
104 "SYSTEM_START", "SYSTEM_END", /* only for debugging */
109 #if defined(GRAN) /* whole file */
110 char *proc_status_names[] = {
111 "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy",
115 /* For internal use (event statistics) only */
116 char *event_names[] =
117 { "ContinueThread", "StartThread", "ResumeThread",
118 "MoveSpark", "MoveThread", "FindWork",
119 "FetchNode", "FetchReply",
120 "GlobalBlock", "UnblockThread"
123 //@cindex CurrentProc
127 ToDo: Create a structure for the processor status and put all the
128 arrays below into it.
131 //@cindex CurrentTime
132 /* One clock for each PE */
133 rtsTime CurrentTime[MAX_PROC];
135 /* Useful to restrict communication; cf fishing model in GUM */
136 nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
138 /* Status of each PE (new since but independent of GranSim Light) */
139 rtsProcStatus procStatus[MAX_PROC];
141 # if defined(GRAN) && defined(GRAN_CHECK)
142 /* To check if the RTS ever tries to run a thread that should be blocked
143 because of fetching remote data */
144 StgTSO *BlockedOnFetch[MAX_PROC];
145 # define FETCH_MASK_TSO 0x08000000 /* only bits 0, 1, 2 should be used */
148 nat SparksAvail = 0; /* How many sparks are available */
149 nat SurplusThreads = 0; /* How many excess threads are there */
151 /* Do we need to reschedule following a fetch? */
152 rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse;
153 rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
155 //@cindex spark queue
156 /* GranSim: a globally visible array of spark queues */
157 rtsSparkQ pending_sparks_hds[MAX_PROC];
158 rtsSparkQ pending_sparks_tls[MAX_PROC];
160 nat sparksIgnored = 0, sparksCreated = 0;
162 GlobalGranStats globalGranStats;
164 nat gran_arith_cost, gran_branch_cost, gran_load_cost,
165 gran_store_cost, gran_float_cost;
168 Old comment from 0.29. ToDo: Check and update -- HWL
170 The following variables control the behaviour of GrAnSim. In general, there
171 is one RTS option for enabling each of these features. In getting the
172 desired setup of GranSim the following questions have to be answered:
174 \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)?
175 Currently only unfair scheduling is supported.
176 \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)?
177 Either block and wait for the
178 data or reschedule and do some other work.
179 Thus, if this variable is true, asynchronous communication is
180 modelled. Block on fetch mainly makes sense for incremental fetching.
182 There is also a simplified fetch variant available
183 (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
184 communication. It is faster but the results will be less accurate.
185 \item How aggressive to be in getting work after a reschedule on fetch
186 (@RtsFlags.GranFlags.FetchStrategy@)?
187 This is determined by the so-called {\em fetching
188 strategy\/}. Currently, there are four possibilities:
190 \item Only run a runnable thread.
191 \item Turn a spark into a thread, if necessary.
192 \item Steal a remote spark, if necessary.
193 \item Steal a runnable thread from another processor, if necessary.
195 The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
196 when rescheduling on a fetch.
197 \item Should sparks or threads be stolen first when looking for work
198 (@RtsFlags.GranFlags.DoStealThreadsFirst@)?
199 The default is to steal sparks first (much cheaper).
200 \item Should the RTS use a lazy thread creation scheme
201 (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)? By default yes i.e.\ sparks are only
202 turned into threads when work is needed. Also note, that sparks
203 can be discarded by the RTS (this is done in the case of an overflow
204 of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@ to @True@ forces
205 the creation of threads at the next possibility (i.e.\ when new work
206 is demanded the next time).
207 \item Should data be fetched closure-by-closure or in packets
208 (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental
209 (i.e.\ closure-by-closure) strategy. This makes sense in a
210 low-latency setting but is bad in a high-latency system. Setting
211 @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
212 parameters determine the size of the packets (@pack_buffer_size@) and the number of
213 thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
214 \item If there is no other possibility to find work, should runnable threads
215 be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
216 RTS tried to get sparks (either local or remote ones) first. Thread
217 migration is very expensive, since a whole TSO has to be transferred
218 and probably data locality becomes worse in the process. Note, that
219 the closure, which will be evaluated next by that TSO is not
220 transferred together with the TSO (that might block another thread).
221 \item Should the RTS distinguish between sparks created by local nodes and
222 stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)? The idea is to improve
223 data locality by preferring sparks of local nodes (it is more likely
224 that the data for those sparks is already on the local processor).
225 However, such a distinction also imposes an overhead on the spark
226 queue management, and typically a large number of sparks are
227 generated during execution. By default this variable is set to @False@.
228 \item Should the RTS use granularity control mechanisms? The idea of a
229 granularity control mechanism is to make use of granularity
230 information provided via annotation of the @par@ construct in order
231 to prefer bigger threads when either turning a spark into a thread or
232 when choosing the next thread to schedule. Currently, three such
233 mechanisms are implemented:
235 \item Cut-off: The granularity information is interpreted as a
236 priority. If a threshold priority is given to the RTS, then
237 only those sparks with a higher priority than the threshold
238 are actually created. Other sparks are immediately discarded.
239 This is similar to a usual cut-off mechanism often used in
240 parallel programs, where parallelism is only created if the
241 input data is lage enough. With this option, the choice is
242 hidden in the RTS and only the threshold value has to be
243 provided as a parameter to the runtime system.
244 \item Priority Sparking: This mechanism keeps priorities for sparks
245 and chooses the spark with the highest priority when turning
246 a spark into a thread. After that the priority information is
247 discarded. The overhead of this mechanism comes from
248 maintaining a sorted spark queue.
249 \item Priority Scheduling: This mechanism keeps the granularity
250 information for threads, to. Thus, on each reschedule the
251 largest thread is chosen. This mechanism has a higher
252 overhead, as the thread queue is sorted, too.
257 //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
258 //@subsection Initialisation
261 init_gr_stats (void) {
262 memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
265 globalGranStats.noOfEvents = 0;
266 for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
268 /* communication stats */
269 globalGranStats.fetch_misses = 0;
270 globalGranStats.tot_low_pri_sparks = 0;
273 globalGranStats.rs_sp_count = 0;
274 globalGranStats.rs_t_count = 0;
275 globalGranStats.ntimes_total = 0,
276 globalGranStats.fl_total = 0;
277 globalGranStats.no_of_steals = 0;
279 /* spark queue stats */
280 globalGranStats.tot_sq_len = 0,
281 globalGranStats.tot_sq_probes = 0;
282 globalGranStats.tot_sparks = 0;
283 globalGranStats.withered_sparks = 0;
284 globalGranStats.tot_add_threads = 0;
285 globalGranStats.tot_tq_len = 0;
286 globalGranStats.non_end_add_threads = 0;
289 globalGranStats.tot_threads_created = 0;
290 for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
294 //@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
295 //@subsection Global Address Operations
297 ----------------------------------------------------------------------
298 Global Address Operations
300 These functions perform operations on the global-address (ga) part of a
301 closure. The ga is the only new field (1 word) in a closure introduced by
302 GrAnSim. It serves as a bitmask, indicating on which processor the
303 closure is residing. Since threads are described by Thread State Object
304 (TSO), which is nothing but another kind of closure, this scheme allows
305 gives placement information about threads.
307 A ga is just a bitmask, so the operations on them are mainly bitmask
308 manipulating functions. Note, that there are important macros like PROCS,
309 IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
311 NOTE: In GrAnSim-light we don't maintain placement information. This
312 allows to simulate an arbitrary number of processors. The price we have
313 to be is the lack of costing any communication properly. In short,
314 GrAnSim-light is meant to reveal the maximal parallelism in a program.
315 From an implementation point of view the important thing is: {\em
316 GrAnSim-light does not maintain global-addresses}. */
318 /* ga_to_proc returns the first processor marked in the bitmask ga.
319 Normally only one bit in ga should be set. But for PLCs all bits
320 are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
325 ga_to_proc(StgWord ga)
328 for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
329 ASSERT(i<RtsFlags.GranFlags.proc);
333 /* NB: This takes a *node* rather than just a ga as input */
336 where_is(StgClosure *node)
337 { return (ga_to_proc(PROCS(node))); }
342 is_unique(StgClosure *node)
345 rtsBool unique = rtsFalse;
347 for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
348 if (IS_LOCAL_TO(PROCS(node), i))
349 if (unique) // exactly 1 instance found so far
350 return rtsFalse; // found a 2nd instance => not unique
352 unique = rtsTrue; // found 1st instance
353 ASSERT(unique); // otherwise returned from within loop
358 static inline rtsBool
359 any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
362 for(i=0, any_idle=rtsFalse;
363 !any_idle && i<RtsFlags.GranFlags.proc;
364 any_idle = any_idle || procStatus[i] == Idle, i++)
370 idlers(void) { /* number of idle PEs */
373 i<RtsFlags.GranFlags.proc;
374 j += (procStatus[i] == Idle) ? 1 : 0, i++)
379 //@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
380 //@subsection Global Event Queue
382 The following routines implement an ADT of an event-queue (FIFO).
383 ToDo: Put that in an own file(?)
386 /* Pointer to the global event queue; events are currently malloc'ed */
387 rtsEventQ EventHd = NULL;
389 //@cindex get_next_event
393 static rtsEventQ entry = NULL;
395 if (EventHd == NULL) {
396 barf("No next event. This may be caused by a circular data dependency in the program.");
402 if (RtsFlags.GranFlags.GranSimStats.Global) { /* count events */
403 globalGranStats.noOfEvents++;
404 globalGranStats.event_counts[EventHd->evttype]++;
409 IF_GRAN_DEBUG(event_trace,
412 EventHd = EventHd->next;
416 /* When getting the time of the next event we ignore CONTINUETHREAD events:
417 we don't want to be interrupted before the end of the current time slice
418 unless there is something important to handle.
420 //@cindex get_time_of_next_event
422 get_time_of_next_event(void)
424 rtsEventQ event = EventHd;
426 while (event != NULL && event->evttype==ContinueThread) {
430 return ((rtsTime) 0);
432 return (event->time);
435 /* ToDo: replace malloc/free with a free list */
436 //@cindex insert_event
438 insert_event(newentry)
441 rtsEventType evttype = newentry->evttype;
442 rtsEvent *event, **prev;
444 /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
446 /* Search the queue and insert at the right point:
447 FINDWORK before everything, CONTINUETHREAD after everything.
449 This ensures that we find any available work after all threads have
450 executed the current cycle. This level of detail would normally be
451 irrelevant, but matters for ridiculously low latencies...
454 /* Changed the ordering: Now FINDWORK comes after everything but
455 CONTINUETHREAD. This makes sure that a MOVESPARK comes before a
456 FINDWORK. This is important when a GranSimSparkAt happens and
457 DoAlwaysCreateThreads is turned on. Also important if a GC occurs
458 when trying to build a new thread (see much_spark) -- HWL 02/96 */
463 for (event = EventHd, prev=(rtsEvent**)&EventHd;
465 prev = (rtsEvent**)&(event->next), event = event->next) {
467 case FindWork: if ( event->time < newentry->time ||
468 ( (event->time == newentry->time) &&
469 (event->evttype != ContinueThread) ) )
473 case ContinueThread: if ( event->time <= newentry->time )
477 default: if ( event->time < newentry->time ||
478 ((event->time == newentry->time) &&
479 (event->evttype == newentry->evttype)) )
484 /* Insert newentry here (i.e. before event) */
486 newentry->next = event;
496 new_event(proc,creator,time,evttype,tso,node,spark)
499 rtsEventType evttype;
504 rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
506 newentry->proc = proc;
507 newentry->creator = creator;
508 newentry->time = time;
509 newentry->evttype = evttype;
511 newentry->node = node;
512 newentry->spark = spark;
513 newentry->gc_info = 0;
514 newentry->next = NULL;
516 insert_event(newentry);
519 fprintf(stderr, "GRAN: new_event: \n");
520 print_event(newentry));
523 //@cindex prepend_event
525 prepend_event(event) /* put event at beginning of EventQueue */
527 { /* only used for GC! */
528 event->next = EventHd;
534 grab_event(void) /* undo prepend_event i.e. get the event */
535 { /* at the head of EventQ but don't free anything */
536 rtsEventQ event = EventHd;
538 if (EventHd == NULL) {
539 barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
542 EventHd = EventHd->next;
546 //@cindex traverse_eventq_for_gc
548 traverse_eventq_for_gc(void)
550 rtsEventQ event = EventHd;
552 StgClosure *closurep;
554 StgPtr buffer, bufptr;
557 /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
558 orig closure (root of packed graph). This means that a graph, which is
559 between processors at the time of GC is fetched again at the time when
560 it would have arrived, had there been no GC. Slightly inaccurate but
562 This is only needed for GUM style fetchng. -- HWL */
563 if (!RtsFlags.GranFlags.DoBulkFetching)
566 for(event = EventHd; event!=NULL; event=event->next) {
567 if (event->evttype==FetchReply) {
568 buffer = stgCast(StgPtr,event->node);
569 ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG); /* It's a pack buffer */
570 bufsize = buffer[PACK_SIZE_LOCN];
571 closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
572 tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
574 creator = event->creator; /* similar to unpacking */
575 for (bufptr=buffer+PACK_HDR_SIZE;
576 bufptr<(buffer+bufsize);
578 // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
579 // (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
580 if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
581 convertFromRBH(stgCast(StgClosure *,bufptr));
585 event->evttype = FetchNode;
586 event->proc = creator;
587 event->creator = proc;
588 event->node = closurep;
598 StgClosure *MarkRoot(StgClosure *root); // prototype
600 rtsEventQ event = EventHd;
603 /* iterate over eventq and register relevant fields in event as roots */
604 for(event = EventHd, len = 0; event!=NULL; event=event->next, len++) {
605 switch (event->evttype) {
607 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
610 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
611 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
614 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
615 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
618 event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
621 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
626 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
627 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
630 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
631 if (RtsFlags.GranFlags.DoBulkFetching)
632 // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
633 belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
635 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
638 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
639 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
642 event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
643 event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
646 barf("markEventQueue: trying to mark unknown event @ %p", event);
649 belch("GC: markEventQueue: %d events in queue", len));
653 Prune all ContinueThread events related to tso or node in the eventq.
654 Currently used if a thread leaves STG land with ThreadBlocked status,
655 i.e. it blocked on a closure and has been put on its blocking queue. It
656 will be reawakended via a call to awakenBlockedQueue. Until then no
657 event effecting this tso should appear in the eventq. A bit of a hack,
658 because ideally we shouldn't generate such spurious ContinueThread events
661 //@cindex prune_eventq
663 prune_eventq(tso, node)
666 { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
668 /* node unused for now */
670 /* tso must be valid, then */
671 ASSERT(tso!=END_TSO_QUEUE);
672 while (event != NULL) {
673 if (event->evttype==ContinueThread &&
675 IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
676 belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
677 event->tso->id, event->tso, event->proc, event->time, event));
678 if (prev==(rtsEventQ)NULL) { // beginning of eventq
679 EventHd = event->next;
683 prev->next = event->next;
687 } else { // no pruning necessary; go to next event
694 //@cindex print_event
699 char str_tso[16], str_node[16];
702 if (event->tso==END_TSO_QUEUE) {
703 strcpy(str_tso, "______");
706 sprintf(str_tso, "%p", event->tso);
707 tso_id = (event->tso==NULL) ? 0 : event->tso->id;
709 if (event->node==(StgClosure*)NULL) {
710 strcpy(str_node, "______");
712 sprintf(str_node, "%p", event->node);
714 // HWL: shouldn't be necessary; ToDo: nuke
719 fprintf(stderr,"Evt: NIL\n");
721 fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
722 event_names[event->evttype], event->evttype,
723 event->proc, event->creator, event->time,
724 tso_id, str_tso, str_node
725 /*, event->spark, event->next */ );
729 //@cindex print_eventq
736 fprintf(stderr,"Event Queue with root at %p:\n", hd);
737 for (x=hd; x!=NULL; x=x->next) {
743 Spark queue functions are now all in Sparks.c!!
745 //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
746 //@subsection Scheduling functions
749 These functions are variants of thread initialisation and therefore
750 related to initThread and friends in Schedule.c. However, they are
751 specific to a GranSim setup in storing more info in the TSO's statistics
752 buffer and sorting the thread queues etc.
756 A large portion of startThread deals with maintaining a sorted thread
757 queue, which is needed for the Priority Sparking option. Without that
758 complication the code boils down to FIFO handling.
760 //@cindex insertThread
762 insertThread(tso, proc)
766 StgTSO *prev = NULL, *next = NULL;
768 rtsBool found = rtsFalse;
770 ASSERT(CurrentProc==proc);
771 ASSERT(!is_on_queue(tso,proc));
772 /* Idle proc: put the thread on the run queue
773 same for pri spark and basic version */
774 if (run_queue_hds[proc] == END_TSO_QUEUE)
777 ASSERT((CurrentProc==MainProc &&
778 CurrentTime[MainProc]==0 &&
779 procStatus[MainProc]==Idle) ||
780 procStatus[proc]==Starting);
782 run_queue_hds[proc] = run_queue_tls[proc] = tso;
784 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
786 /* new_event of ContinueThread has been moved to do_the_startthread */
789 ASSERT(procStatus[proc]==Idle ||
790 procStatus[proc]==Fishing ||
791 procStatus[proc]==Starting);
792 procStatus[proc] = Busy;
797 if (RtsFlags.GranFlags.Light)
798 GranSimLight_insertThread(tso, proc);
800 /* Only for Pri Scheduling: find place where to insert tso into queue */
801 if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
802 /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
803 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
804 (next != END_TSO_QUEUE) &&
805 !(found = tso->gran.pri >= next->gran.pri);
806 prev = next, next = next->link, count++)
808 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
809 (prev==(StgTSO*)NULL || prev->link==next));
812 ASSERT(!found || next != END_TSO_QUEUE);
813 ASSERT(procStatus[proc]!=Idle);
816 /* found can only be rtsTrue if pri scheduling enabled */
817 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
818 if (RtsFlags.GranFlags.GranSimStats.Global)
819 globalGranStats.non_end_add_threads++;
820 /* Add tso to ThreadQueue between prev and next */
822 if ( next == (StgTSO*)END_TSO_QUEUE ) {
825 /* no back link for TSO chain */
828 if ( prev == (StgTSO*)END_TSO_QUEUE ) {
829 /* Never add TSO as first elem of thread queue; the first */
830 /* element should be the one that is currently running -- HWL */
832 belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
833 tso, tso->gran.pri, run_queue_hd, proc,
838 } else { /* !found */ /* or not pri sparking! */
839 /* Add TSO to the end of the thread queue on that processor */
840 run_queue_tls[proc]->link = tso;
841 run_queue_tls[proc] = tso;
843 ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
844 CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
845 RtsFlags.GranFlags.Costs.threadqueuetime;
847 /* ToDo: check if this is still needed -- HWL
848 if (RtsFlags.GranFlags.DoThreadMigration)
851 if (RtsFlags.GranFlags.GranSimStats.Full &&
852 !(( event_type == GR_START || event_type == GR_STARTQ) &&
853 RtsFlags.GranFlags.labelling) )
854 DumpRawGranEvent(proc, creator, event_type+1, tso, node,
855 tso->gran.sparkname, spark_queue_len(proc));
858 # if defined(GRAN_CHECK)
859 /* Check if thread queue is sorted. Only for testing, really! HWL */
860 if ( RtsFlags.GranFlags.DoPriorityScheduling &&
861 (RtsFlags.GranFlags.Debug.sortedQ) ) {
862 rtsBool sorted = rtsTrue;
865 if (run_queue_hds[proc]==END_TSO_QUEUE ||
866 run_queue_hds[proc]->link==END_TSO_QUEUE) {
867 /* just 1 elem => ok */
869 /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
870 for (prev = run_queue_hds[proc]->link, next = prev->link;
871 (next != END_TSO_QUEUE) ;
872 prev = next, next = prev->link) {
873 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
874 (prev==(StgTSO*)NULL || prev->link==next));
876 (prev->gran.pri >= next->gran.pri);
880 fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
882 G_THREADQ(run_queue_hd,0x1);
889 insertThread, which is only used for GranSim Light, is similar to
890 startThread in that it adds a TSO to a thread queue. However, it assumes
891 that the thread queue is sorted by local clocks and it inserts the TSO at
892 the right place in the queue. Don't create any event, just insert.
894 //@cindex GranSimLight_insertThread
896 GranSimLight_insertThread(tso, proc)
902 rtsBool found = rtsFalse;
904 ASSERT(RtsFlags.GranFlags.Light);
906 /* In GrAnSim-Light we always have an idle `virtual' proc.
907 The semantics of the one-and-only thread queue is different here:
908 all threads in the queue are running (each on its own virtual processor);
909 the queue is only needed internally in the simulator to interleave the
910 reductions of the different processors.
911 The one-and-only thread queue is sorted by the local clocks of the TSOs.
913 ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
914 ASSERT(tso->link == END_TSO_QUEUE);
916 /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
917 if (RtsFlags.GranFlags.GranSimStats.Full &&
918 (RtsFlags.GranFlags.Debug.checkLight) &&
919 (run_queue_hd->link == END_TSO_QUEUE)) {
920 DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
921 run_queue_hds[proc], (StgClosure*)NULL,
922 tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
923 // resched = rtsTrue;
926 /* this routine should only be used in a GrAnSim Light setup */
927 /* && CurrentProc must be 0 in GrAnSim Light setup */
928 ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
930 /* Idle proc; same for pri spark and basic version */
931 if (run_queue_hd==END_TSO_QUEUE)
933 run_queue_hd = run_queue_tl = tso;
934 /* MAKE_BUSY(CurrentProc); */
938 for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
939 (next != END_TSO_QUEUE) &&
940 !(found = (tso->gran.clock < next->gran.clock));
941 prev = next, next = next->link, count++)
943 ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
944 (prev==(StgTSO*)NULL || prev->link==next));
947 /* found can only be rtsTrue if pri sparking enabled */
949 /* Add tso to ThreadQueue between prev and next */
951 if ( next == END_TSO_QUEUE ) {
952 run_queue_tls[proc] = tso;
954 /* no back link for TSO chain */
957 if ( prev == END_TSO_QUEUE ) {
958 run_queue_hds[proc] = tso;
962 } else { /* !found */ /* or not pri sparking! */
963 /* Add TSO to the end of the thread queue on that processor */
964 run_queue_tls[proc]->link = tso;
965 run_queue_tls[proc] = tso;
968 if ( prev == END_TSO_QUEUE ) { /* new head of queue */
969 new_event(proc, proc, CurrentTime[proc],
971 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
974 if (RtsFlags.GranFlags.GranSimStats.Full &&
975 !(( event_type == GR_START || event_type == GR_STARTQ) &&
976 RtsFlags.GranFlags.labelling) )
977 DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
978 tso->gran.sparkname, spark_queue_len(proc));
984 endThread is responsible for general clean-up after the thread tso has
985 finished. This includes emitting statistics into the profile etc.
988 endThread(StgTSO *tso, PEs proc)
990 ASSERT(procStatus[proc]==Busy); // coming straight out of STG land
991 ASSERT(tso->what_next==ThreadComplete);
992 // ToDo: prune ContinueThreads for this TSO from event queue
993 DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
995 /* if this was the last thread on this PE then make it Idle */
996 if (run_queue_hds[proc]==END_TSO_QUEUE) {
997 procStatus[CurrentProc] = Idle;
1001 //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
1002 //@subsection Thread Queue routines
1005 Check whether given tso resides on the run queue of the current processor.
1006 Only used for debugging.
1009 //@cindex is_on_queue
1011 is_on_queue (StgTSO *tso, PEs proc)
1016 for (t=run_queue_hds[proc], found=rtsFalse;
1017 t!=END_TSO_QUEUE && !(found = t==tso);
1024 /* This routine is only used for keeping a statistics of thread queue
1025 lengths to evaluate the impact of priority scheduling. -- HWL
1026 {spark_queue_len}vo' jInIHta'
1028 //@cindex thread_queue_len
1030 thread_queue_len(PEs proc)
1032 StgTSO *prev, *next;
1035 for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
1036 next != END_TSO_QUEUE;
1037 len++, prev = next, next = prev->link)
1043 //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
1044 //@subsection GranSim functions
1046 /* ----------------------------------------------------------------- */
1047 /* The main event handling functions; called from Schedule.c (schedule) */
1048 /* ----------------------------------------------------------------- */
1050 //@cindex do_the_globalblock
1053 do_the_globalblock(rtsEvent* event)
1055 PEs proc = event->proc; /* proc that requested node */
1056 StgTSO *tso = event->tso; /* tso that requested node */
1057 StgClosure *node = event->node; /* requested, remote node */
1059 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
1060 /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
1061 ASSERT(!RtsFlags.GranFlags.Light);
1062 /* GlobalBlock events only valid with GUM fetching */
1063 ASSERT(RtsFlags.GranFlags.DoBulkFetching);
1065 IF_GRAN_DEBUG(bq, // globalBlock,
1066 if (IS_LOCAL_TO(PROCS(node),proc)) {
1067 belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
1068 tso->id, tso, node, proc);
1071 /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
1072 if ( blockFetch(tso,proc,node) != 0 )
1073 return; /* node has become local by now */
1076 ToDo: check whether anything has to be done at all after blockFetch -- HWL
1078 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
1079 StgTSO* tso = run_queue_hds[proc]; /* awaken next thread */
1080 if (tso != (StgTSO*)NULL) {
1081 new_event(proc, proc, CurrentTime[proc],
1083 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
1084 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
1085 if (RtsFlags.GranFlags.GranSimStats.Full)
1086 DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
1087 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc)); // ToDo: check sparkname and spar_queue_len
1088 procStatus[proc] = Busy; /* might have been fetching */
1090 procStatus[proc] = Idle; /* no work on proc now */
1092 } else { /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
1093 /* other thread is already running */
1094 /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL
1095 new_event(proc,proc,CurrentTime[proc],
1096 CONTINUETHREAD,EVENT_TSO(event),
1097 (RtsFlags.GranFlags.DoBulkFetching ? closure :
1098 EVENT_NODE(event)),NULL);
1104 //@cindex do_the_unblock
1107 do_the_unblock(rtsEvent* event)
1109 PEs proc = event->proc, /* proc that requested node */
1110 creator = event->creator; /* proc that requested node */
1111 StgTSO* tso = event->tso; /* tso that requested node */
1112 StgClosure* node = event->node; /* requested, remote node */
1114 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
1115 /* There should be no UNBLOCKs in GrAnSim Light setup */
1116 ASSERT(!RtsFlags.GranFlags.Light);
1117 /* UnblockThread means either FetchReply has arrived or
1118 a blocking queue has been awakened;
1119 ToDo: check with assertions
1120 ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
1122 if (!RtsFlags.GranFlags.DoAsyncFetch) { /* block-on-fetch */
1123 /* We count block-on-fetch as normal block time */
1124 tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
1125 /* Dumping now done when processing the event
1126 No costs for contextswitch or thread queueing in this case
1127 if (RtsFlags.GranFlags.GranSimStats.Full)
1128 DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso,
1129 (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
1131 /* Maybe do this in FetchReply already
1132 if (procStatus[proc]==Fetching)
1133 procStatus[proc] = Busy;
1136 new_event(proc, proc, CurrentTime[proc],
1138 tso, node, (rtsSpark*)NULL);
1141 /* Asynchr comm causes additional costs here: */
1142 /* Bring the TSO from the blocked queue into the threadq */
1144 /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
1145 new_event(proc, proc,
1146 CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
1148 tso, node, (rtsSpark*)NULL);
1151 //@cindex do_the_fetchnode
1154 do_the_fetchnode(rtsEvent* event)
1156 PEs proc = event->proc, /* proc that holds the requested node */
1157 creator = event->creator; /* proc that requested node */
1158 StgTSO* tso = event->tso;
1159 StgClosure* node = event->node; /* requested, remote node */
1160 rtsFetchReturnCode rc;
1162 ASSERT(CurrentProc==proc);
1163 /* There should be no FETCHNODEs in GrAnSim Light setup */
1164 ASSERT(!RtsFlags.GranFlags.Light);
1166 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
1168 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1170 /* ToDo: check whether this is the right place for dumping the event */
1171 if (RtsFlags.GranFlags.GranSimStats.Full)
1172 DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
1175 rc = handleFetchRequest(node, proc, creator, tso);
1176 if (rc == OutOfHeap) { /* trigger GC */
1177 # if defined(GRAN_CHECK) && defined(GRAN)
1178 if (RtsFlags.GcFlags.giveStats)
1179 fprintf(RtsFlags.GcFlags.statsFile,"***** veQ boSwI' PackNearbyGraph(node %p, tso %p (%d))\n",
1180 node, tso, tso->id);
1182 barf("//// do_the_fetchnode: out of heap after handleFetchRequest; ToDo: call GarbageCollect()");
1183 prepend_event(event);
1184 GarbageCollect(GetRoots, rtsFalse);
1185 // HWL: ToDo: check whether a ContinueThread has to be issued
1186 // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
1187 # if 0 && defined(GRAN_CHECK) && defined(GRAN)
1188 if (RtsFlags.GcFlags.giveStats) {
1189 fprintf(RtsFlags.GcFlags.statsFile,"***** SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
1190 Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED); ???
1191 fprintf(stderr,"***** No. of packets so far: %d (total size: %d)\n",
1192 globalGranStats.tot_packets, globalGranStats.tot_packet_size);
1195 event = grab_event();
1196 // Hp -= PACK_HEAP_REQUIRED; // ???
1198 /* GC knows that events are special and follows the pointer i.e. */
1199 /* events are valid even if they moved. An EXIT is triggered */
1200 /* if there is not enough heap after GC. */
1202 } while (rc == OutOfHeap);
1205 //@cindex do_the_fetchreply
1207 do_the_fetchreply(rtsEvent* event)
1209 PEs proc = event->proc, /* proc that requested node */
1210 creator = event->creator; /* proc that holds the requested node */
1211 StgTSO* tso = event->tso;
1212 StgClosure* node = event->node; /* requested, remote node */
1213 StgClosure* closure=(StgClosure*)NULL;
1215 ASSERT(CurrentProc==proc);
1216 ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
1218 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
1219 /* There should be no FETCHREPLYs in GrAnSim Light setup */
1220 ASSERT(!RtsFlags.GranFlags.Light);
1222 /* assign message unpack costs *before* dumping the event */
1223 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1225 /* ToDo: check whether this is the right place for dumping the event */
1226 if (RtsFlags.GranFlags.GranSimStats.Full)
1227 DumpRawGranEvent(proc, creator, GR_REPLY, tso, node,
1228 tso->gran.sparkname, spark_queue_len(proc));
1230 /* THIS SHOULD NEVER HAPPEN
1231 If tso is in the BQ of node this means that it actually entered the
1232 remote closure, due to a missing GranSimFetch at the beginning of the
1233 entry code; therefore, this is actually a faked fetch, triggered from
1234 within GranSimBlock;
1235 since tso is both in the EVQ and the BQ for node, we have to take it out
1236 of the BQ first before we can handle the FetchReply;
1237 ToDo: special cases in awakenBlockedQueue, since the BQ magically moved.
1239 if (tso->block_info.closure!=(StgClosure*)NULL) {
1241 belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
1242 tso->id, tso, node));
1243 // unlink_from_bq(tso, node);
1246 if (RtsFlags.GranFlags.DoBulkFetching) { /* bulk (packet) fetching */
1247 rtsPackBuffer *buffer = (rtsPackBuffer*)node;
1248 nat size = buffer->size;
1250 /* NB: Fetch misses can't occur with GUM fetching, as */
1251 /* updatable closure are turned into RBHs and therefore locked */
1252 /* for other processors that try to grab them. */
1254 closure = UnpackGraph(buffer);
1255 CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
1256 } else // incremental fetching
1257 /* Copy or move node to CurrentProc */
1258 if (fetchNode(node, creator, proc)) {
1259 /* Fetch has failed i.e. node has been grabbed by another PE */
1260 PEs p = where_is(node);
1263 if (RtsFlags.GranFlags.GranSimStats.Global)
1264 globalGranStats.fetch_misses++;
1266 IF_GRAN_DEBUG(thunkStealing,
1267 belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
1268 CurrentTime[proc],node,p,creator));
1270 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
1272 /* Count fetch again !? */
1273 ++(tso->gran.fetchcount);
1274 tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1276 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
1277 RtsFlags.GranFlags.Costs.latency;
1279 /* Chase the grabbed node */
1280 new_event(p, proc, fetchtime,
1282 tso, node, (rtsSpark*)NULL);
1284 # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
1285 IF_GRAN_DEBUG(blockOnFetch,
1286 BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
1288 IF_GRAN_DEBUG(blockOnFetch_sanity,
1289 tso->type |= FETCH_MASK_TSO;)
1292 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
1294 return; /* NB: no REPLy has been processed; tso still sleeping */
1297 /* -- Qapla'! Fetch has been successful; node is here, now */
1298 ++(event->tso->gran.fetchcount);
1299 event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1301 /* this is now done at the beginning of this routine
1302 if (RtsFlags.GranFlags.GranSimStats.Full)
1303 DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
1304 (RtsFlags.GranFlags.DoBulkFetching ?
1307 tso->gran.sparkname, spark_queue_len(proc));
1310 ASSERT(OutstandingFetches[proc] > 0);
1311 --OutstandingFetches[proc];
1312 new_event(proc, proc, CurrentTime[proc],
1314 event->tso, (RtsFlags.GranFlags.DoBulkFetching ?
1320 //@cindex do_the_movethread
1323 do_the_movethread(rtsEvent* event) {
1324 PEs proc = event->proc, /* proc that requested node */
1325 creator = event->creator; /* proc that holds the requested node */
1326 StgTSO* tso = event->tso;
1328 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
1330 ASSERT(CurrentProc==proc);
1331 /* There should be no MOVETHREADs in GrAnSim Light setup */
1332 ASSERT(!RtsFlags.GranFlags.Light);
1333 /* MOVETHREAD events should never occur without -bM */
1334 ASSERT(RtsFlags.GranFlags.DoThreadMigration);
1335 /* Bitmask of moved thread should be 0 */
1336 ASSERT(PROCS(tso)==0);
1337 ASSERT(procStatus[proc] == Fishing ||
1338 RtsFlags.GranFlags.DoAsyncFetch);
1339 ASSERT(OutstandingFishes[proc]>0);
1341 /* ToDo: exact costs for unpacking the whole TSO */
1342 CurrentTime[proc] += 5l * RtsFlags.GranFlags.Costs.munpacktime;
1344 /* ToDo: check whether this is the right place for dumping the event */
1345 if (RtsFlags.GranFlags.GranSimStats.Full)
1346 DumpRawGranEvent(proc, creator,
1347 GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
1349 // ToDo: check cost functions
1350 --OutstandingFishes[proc];
1351 SET_GRAN_HDR(tso, ThisPE); // adjust the bitmask for the TSO
1352 insertThread(tso, proc);
1354 if (procStatus[proc]==Fishing)
1355 procStatus[proc] = Idle;
1357 if (RtsFlags.GranFlags.GranSimStats.Global)
1358 globalGranStats.tot_TSOs_migrated++;
1361 //@cindex do_the_movespark
1364 do_the_movespark(rtsEvent* event) {
1365 PEs proc = event->proc, /* proc that requested spark */
1366 creator = event->creator; /* proc that holds the requested spark */
1367 StgTSO* tso = event->tso;
1368 rtsSparkQ spark = event->spark;
1370 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
1372 ASSERT(CurrentProc==proc);
1373 ASSERT(spark!=NULL);
1374 ASSERT(procStatus[proc] == Fishing ||
1375 RtsFlags.GranFlags.DoAsyncFetch);
1376 ASSERT(OutstandingFishes[proc]>0);
1378 CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1380 /* record movement of spark only if spark profiling is turned on */
1381 if (RtsFlags.GranFlags.GranSimStats.Sparks)
1382 DumpRawGranEvent(proc, creator,
1384 tso, spark->node, spark->name, spark_queue_len(proc));
1386 /* global statistics */
1387 if ( RtsFlags.GranFlags.GranSimStats.Global &&
1388 !closure_SHOULD_SPARK(spark->node))
1389 globalGranStats.withered_sparks++;
1390 /* Not adding the spark to the spark queue would be the right */
1391 /* thing here, but it also would be cheating, as this info can't be */
1392 /* available in a real system. -- HWL */
1394 --OutstandingFishes[proc];
1396 add_to_spark_queue(spark);
1398 IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
1399 print_sparkq_stats());
1401 /* Should we treat stolen sparks specially? Currently, we don't. */
1403 if (procStatus[proc]==Fishing)
1404 procStatus[proc] = Idle;
1406 /* add_to_spark_queue will increase the time of the current proc. */
1408 If proc was fishing, it is Idle now with the new spark in its spark
1409 pool. This means that the next time handleIdlePEs is called, a local
1410 FindWork will be created on this PE to turn the spark into a thread. Of
1411 course another PE might steal the spark in the meantime (that's why we
1412 are using events rather than inlining all the operations in the first
1417 In the Constellation class version of GranSim the semantics of StarThread
1418 events has changed. Now, StartThread has to perform 3 basic operations:
1419 - create a new thread (previously this was done in ActivateSpark);
1420 - insert the thread into the run queue of the current processor
1421 - generate a new event for actually running the new thread
1422 Note that the insertThread is called via createThread.
1425 //@cindex do_the_startthread
1428 do_the_startthread(rtsEvent *event)
1430 PEs proc = event->proc; /* proc that requested node */
1431 StgTSO *tso = event->tso; /* tso that requested node */
1432 StgClosure *node = event->node; /* requested, remote node */
1433 rtsSpark *spark = event->spark;
1434 GranEventType gr_evttype;
1436 ASSERT(CurrentProc==proc);
1437 ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1438 ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
1439 /* if this was called via StartThread: */
1440 ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
1441 // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
1442 /* if this was called via ResumeThread: */
1443 ASSERT(event->evttype!=ResumeThread ||
1444 RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc));
1446 /* startThread may have been called from the main event handler upon
1447 finding either a ResumeThread or a StartThread event; set the
1448 gr_evttype (needed for writing to .gr file) accordingly */
1449 // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
1451 if ( event->evttype == StartThread ) {
1452 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1453 GR_START : GR_STARTQ;
1455 tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
1456 pushClosure(tso, node);
1458 // ToDo: fwd info on local/global spark to thread -- HWL
1459 // tso->gran.exported = spark->exported;
1460 // tso->gran.locked = !spark->global;
1461 tso->gran.sparkname = spark->name;
1463 ASSERT(CurrentProc==proc);
1464 if (RtsFlags.GranFlags.GranSimStats.Full)
1465 DumpGranEvent(gr_evttype,tso);
1467 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
1468 } else { // event->evttype == ResumeThread
1469 GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ?
1470 GR_RESUME : GR_RESUMEQ;
1472 insertThread(tso, proc);
1474 ASSERT(CurrentProc==proc);
1475 if (RtsFlags.GranFlags.GranSimStats.Full)
1476 DumpGranEvent(gr_evttype,tso);
1479 ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
1480 procStatus[proc] = Busy;
1481 /* make sure that this thread is actually run */
1482 new_event(proc, proc,
1485 tso, node, (rtsSpark*)NULL);
1487 /* A wee bit of statistics gathering */
1488 if (RtsFlags.GranFlags.GranSimStats.Global) {
1489 globalGranStats.tot_add_threads++;
1490 globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
1495 //@cindex do_the_findwork
1497 do_the_findwork(rtsEvent* event)
1499 PEs proc = event->proc, /* proc to search for work */
1500 creator = event->creator; /* proc that requested work */
1501 rtsSparkQ spark = event->spark;
1502 /* ToDo: check that this size is safe -- HWL */
1504 ToDo: check available heap
1506 nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
1507 // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
1510 IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
1512 /* If GUM style fishing is enabled, the contents of the spark field says
1513 what to steal (spark(1) or thread(2)); */
1514 ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
1516 /* Make sure that we have enough heap for creating a new
1517 thread. This is a conservative estimate of the required heap.
1518 This eliminates special checks for GC around NewThread within
1522 ToDo: check available heap
1524 if (Hp + req_heap > HpLim ) {
1526 belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
1527 GarbageCollect(GetRoots);
1528 // ReallyPerformThreadGC(req_heap, rtsFalse); old -- HWL
1530 if (procStatus[CurrentProc]==Sparking)
1531 procStatus[CurrentProc]=Idle;
1536 if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1537 RtsFlags.GranFlags.Fishing ||
1538 ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
1539 (RtsFlags.GranFlags.FetchStrategy >= 2 ||
1540 OutstandingFetches[proc] == 0)) )
1543 rtsSparkQ prev, spark;
1546 ASSERT(procStatus[proc]==Sparking ||
1547 RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1548 RtsFlags.GranFlags.Fishing);
1550 /* SImmoHwI' yInej! Search spark queue! */
1551 /* gimme_spark (event, &found, &spark); */
1552 findLocalSpark(event, &found, &spark);
1554 if (!found) { /* pagh vumwI' */
1556 If no spark has been found this can mean 2 things:
1557 1/ The FindWork was a fish (i.e. a message sent by another PE) and
1558 the spark pool of the receiver is empty
1559 --> the fish has to be forwarded to another PE
1560 2/ The FindWork was local to this PE (i.e. no communication; in this
1561 case creator==proc) and the spark pool of the PE is not empty
1562 contains only sparks of closures that should not be sparked
1563 (note: if the spark pool were empty, handleIdlePEs wouldn't have
1564 generated a FindWork in the first place)
1565 --> the PE has to be made idle to trigger stealing sparks the next
1566 time handleIdlePEs is performed
1569 ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
1570 if (creator==proc) {
1571 /* local FindWork */
1572 if (procStatus[proc]==Busy) {
1573 belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
1574 proc, CurrentTime[proc]);
1575 procStatus[proc] = Idle;
1578 /* global FindWork i.e. a Fish */
1579 ASSERT(RtsFlags.GranFlags.Fishing);
1580 /* actually this generates another request from the originating PE */
1581 ASSERT(OutstandingFishes[creator]>0);
1582 OutstandingFishes[creator]--;
1583 /* ToDo: assign costs for sending fish to proc not to creator */
1584 stealSpark(creator); /* might steal from same PE; ToDo: fix */
1585 ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
1586 /* any assertions on state of proc possible here? */
1589 /* DaH chu' Qu' yIchen! Now create new work! */
1590 IF_GRAN_DEBUG(findWork,
1591 belch("+- munching spark %p; creating thread for node %p",
1592 spark, spark->node));
1593 activateSpark (event, spark);
1594 ASSERT(spark != (rtsSpark*)NULL);
1595 spark = delete_from_sparkq (spark, proc, rtsTrue);
1598 IF_GRAN_DEBUG(findWork,
1599 belch("+- Contents of spark queues at the end of FindWork @ %lx",
1601 print_sparkq_stats());
1603 /* ToDo: check ; not valid if GC occurs in ActivateSpark */
1605 /* forward fish or */
1607 /* local spark or */
1608 (proc==creator && procStatus[proc]==Starting)) ||
1609 //(!found && procStatus[proc]==Idle) ||
1610 RtsFlags.GranFlags.DoAlwaysCreateThreads);
1612 IF_GRAN_DEBUG(findWork,
1613 belch("+- RTS refuses to findWork on PE %d @ %lx",
1614 proc, CurrentTime[proc]);
1615 belch(" procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d",
1616 proc, proc_status_names[procStatus[proc]],
1617 RtsFlags.GranFlags.FetchStrategy,
1618 proc, OutstandingFetches[proc]));
1622 //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
1623 //@subsection GranSimLight routines
1626 This code is called from the central scheduler after having rgabbed a
1627 new event and is only needed for GranSim-Light. It mainly adjusts the
1628 ActiveTSO so that all costs that have to be assigned from within the
1629 scheduler are assigned to the right TSO. The choice of ActiveTSO depends
1630 on the type of event that has been found.
1634 GranSimLight_enter_system(event, ActiveTSOp)
1636 StgTSO **ActiveTSOp;
1638 StgTSO *ActiveTSO = *ActiveTSOp;
1640 ASSERT (RtsFlags.GranFlags.Light);
1642 /* Restore local clock of the virtual processor attached to CurrentTSO.
1643 All costs will be associated to the `virt. proc' on which the tso
1645 if (ActiveTSO != NULL) { /* already in system area */
1646 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1647 if (RtsFlags.GranFlags.DoFairSchedule)
1649 if (RtsFlags.GranFlags.GranSimStats.Full &&
1650 RtsFlags.GranFlags.Debug.checkLight)
1651 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1654 switch (event->evttype)
1656 case ContinueThread:
1657 case FindWork: /* inaccurate this way */
1658 ActiveTSO = run_queue_hd;
1662 case MoveSpark: /* has tso of virt proc in tso field of event */
1663 ActiveTSO = event->tso;
1665 default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
1666 event_names[event->evttype],event->evttype);
1668 CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
1669 if (RtsFlags.GranFlags.DoFairSchedule) {
1670 if (RtsFlags.GranFlags.GranSimStats.Full &&
1671 RtsFlags.GranFlags.Debug.checkLight)
1672 DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
1677 GranSimLight_leave_system(event, ActiveTSOp)
1679 StgTSO **ActiveTSOp;
1681 StgTSO *ActiveTSO = *ActiveTSOp;
1683 ASSERT(RtsFlags.GranFlags.Light);
1685 /* Save time of `virt. proc' which was active since last getevent and
1686 restore time of `virt. proc' where CurrentTSO is living on. */
1687 if(RtsFlags.GranFlags.DoFairSchedule) {
1688 if (RtsFlags.GranFlags.GranSimStats.Full &&
1689 RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
1690 DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1692 ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1693 ActiveTSO = (StgTSO*)NULL;
1694 CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
1695 if (RtsFlags.GranFlags.DoFairSchedule /* && resched */ ) {
1696 // resched = rtsFalse;
1697 if (RtsFlags.GranFlags.GranSimStats.Full &&
1698 RtsFlags.GranFlags.Debug.checkLight)
1699 DumpGranEvent(GR_SCHEDULE,run_queue_hd);
1702 if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
1703 (TimeOfNextEvent == 0 ||
1704 TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
1705 new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
1706 CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
1707 TimeOfNextEvent = get_time_of_next_event();
1712 //@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
1713 //@subsection Code for Fetching Nodes
1716 The following GrAnSim routines simulate the fetching of nodes from a
1717 remote processor. We use a 1 word bitmask to indicate on which processor
1718 a node is lying. Thus, moving or copying a node from one processor to
1719 another just requires an appropriate change in this bitmask (using
1720 @SET_GA@). Additionally, the clocks have to be updated.
1722 A special case arises when the node that is needed by processor A has
1723 been moved from a processor B to a processor C between sending out a
1724 @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
1725 be forwarded to C. This is simulated by issuing another FetchNode event
1726 on processor C with A as creator.
1729 /* ngoqvam che' {GrAnSim}! */
1731 /* Fetch node "node" to processor "p" */
1736 fetchNode(node,from,to)
1740 /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be
1741 entered! Instead, UnpackGraph is used in ReSchedule */
1742 StgClosure* closure;
1744 ASSERT(to==CurrentProc);
1745 /* Should never be entered in GrAnSim Light setup */
1746 ASSERT(!RtsFlags.GranFlags.Light);
1747 /* fetchNode should never be entered with DoBulkFetching */
1748 ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
1750 /* Now fetch the node */
1751 if (!IS_LOCAL_TO(PROCS(node),from) &&
1752 !IS_LOCAL_TO(PROCS(node),to) )
1753 return NodeHasMoved;
1755 if (closure_HNF(node)) /* node already in head normal form? */
1756 node->header.gran.procs |= PE_NUMBER(to); /* Copy node */
1758 node->header.gran.procs = PE_NUMBER(to); /* Move node */
1764 Process a fetch request.
1766 Cost of sending a packet of size n = C + P*n
1767 where C = packet construction constant,
1768 P = cost of packing one word into a packet
1769 [Should also account for multiple packets].
1772 //@cindex handleFetchRequest
1775 handleFetchRequest(node,to,from,tso)
1776 StgClosure* node; // the node which is requested
1777 PEs to, from; // fetch request: from -> to
1778 StgTSO* tso; // the tso which needs the node
1780 ASSERT(!RtsFlags.GranFlags.Light);
1781 /* ToDo: check assertion */
1782 ASSERT(OutstandingFetches[from]>0);
1784 /* probably wrong place; */
1785 ASSERT(CurrentProc==to);
1787 if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
1789 IF_GRAN_DEBUG(thunkStealing,
1790 fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n",
1791 node, info_type(node), from));
1793 if (RtsFlags.GranFlags.DoBulkFetching) {
1795 rtsPackBuffer *graph;
1797 /* Create a 1-node-buffer and schedule a FETCHREPLY now */
1798 graph = PackOneNode(node, tso, &size);
1799 new_event(from, to, CurrentTime[to],
1801 tso, (StgClosure *)graph, (rtsSpark*)NULL);
1803 new_event(from, to, CurrentTime[to],
1805 tso, node, (rtsSpark*)NULL);
1807 IF_GRAN_DEBUG(thunkStealing,
1808 belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
1809 return (NodeIsLocal);
1811 else if (IS_LOCAL_TO(PROCS(node), to) ) /* Is node still here? */
1813 if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
1814 nat size; /* (code from GUM) */
1817 if (IS_BLACK_HOLE(node)) { /* block on BH or RBH */
1818 new_event(from, to, CurrentTime[to],
1820 tso, node, (rtsSpark*)NULL);
1821 /* Note: blockFetch is done when handling GLOBALBLOCK event;
1822 make sure the TSO stays out of the run queue */
1823 /* When this thread is reawoken it does the usual: it tries to
1824 enter the updated node and issues a fetch if it's remote.
1825 It has forgotten that it has sent a fetch already (i.e. a
1826 FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
1827 --OutstandingFetches[from];
1829 IF_GRAN_DEBUG(thunkStealing,
1830 belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ",
1832 if (RtsFlags.GranFlags.GranSimStats.Global) {
1833 globalGranStats.tot_FMBQs++;
1838 /* The tso requesting the node is blocked and cannot be on a run queue */
1839 ASSERT(!is_on_queue(tso, from));
1841 // ToDo: check whether graph is ever used as an rtsPackBuffer!!
1842 if ((graph = (StgClosure *)PackNearbyGraph(node, tso, &size, 0)) == NULL)
1843 return (OutOfHeap); /* out of heap */
1845 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1846 /* Send a reply to the originator */
1847 /* ToDo: Replace that by software costs for doing graph packing! */
1848 CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
1851 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1853 tso, (StgClosure *)graph, (rtsSpark*)NULL);
1855 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1857 } else { /* incremental (single closure) fetching */
1858 /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1859 /* Send a reply to the originator */
1860 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1863 CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1865 tso, node, (rtsSpark*)NULL);
1867 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1871 else /* Qu'vatlh! node has been grabbed by another proc => forward */
1873 PEs node_loc = where_is(node);
1876 IF_GRAN_DEBUG(thunkStealing,
1877 belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
1878 node,node_loc,to,from,CurrentTime[to]));
1879 if (RtsFlags.GranFlags.GranSimStats.Global) {
1880 globalGranStats.fetch_misses++;
1883 /* Prepare FORWARD message to proc p_new */
1884 CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1886 fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
1887 RtsFlags.GranFlags.Costs.latency;
1889 new_event(node_loc, from, fetchtime,
1891 tso, node, (rtsSpark*)NULL);
1893 CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1895 return (NodeHasMoved);
1900 blockFetch blocks a BlockedFetch node on some kind of black hole.
1902 Taken from gum/HLComms.lc. [find a better place for that ?] -- HWL
1904 {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
1905 create @FMBQ@'s (FetchMe blocking queues) to cope with global
1906 blocking. Instead, non-local TSO are put into the BQ in the same way as
1907 local TSOs. However, we have to check if a TSO is local or global in
1908 order to account for the latencies involved and for keeping track of the
1909 number of fetches that are really going on.
1912 //@cindex blockFetch
1915 blockFetch(tso, proc, bh)
1916 StgTSO* tso; /* TSO which gets blocked */
1917 PEs proc; /* PE where that tso was running */
1918 StgClosure* bh; /* closure to block on (BH, RBH, BQ) */
1923 fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n",
1924 tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
1926 if (!IS_BLACK_HOLE(bh)) { /* catches BHs and RBHs */
1928 fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n",
1929 bh, info_type(bh), tso, tso->id, proc));
1931 /* No BH anymore => immediately unblock tso */
1932 new_event(proc, proc, CurrentTime[proc],
1934 tso, bh, (rtsSpark*)NULL);
1936 /* Is this always a REPLY to a FETCH in the profile ? */
1937 if (RtsFlags.GranFlags.GranSimStats.Full)
1938 DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
1939 return (NodeIsNoBH);
1942 /* DaH {BQ}Daq Qu' Suq 'e' wISov!
1943 Now we know that we have to put the tso into the BQ.
1944 2 cases: If block-on-fetch, tso is at head of threadq =>
1945 => take it out of threadq and into BQ
1946 If reschedule-on-fetch, tso is only pointed to be event
1947 => just put it into BQ
1950 if (!RtsFlags.GranFlags.DoAsyncFetch) {
1951 GranSimBlock(tso, proc, bh);
1953 if (RtsFlags.GranFlags.GranSimStats.Full)
1954 DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
1955 ++(tso->gran.blockcount);
1956 tso->gran.blockedat = CurrentTime[proc];
1960 /* after scheduling the GlobalBlock event the TSO is not put into the
1961 run queue again; it is only pointed to via the event we are
1962 processing now; in GranSim 4.xx there is no difference between
1963 synchr and asynchr comm here */
1964 ASSERT(!is_on_queue(tso, proc));
1965 ASSERT(tso->link == END_TSO_QUEUE);
1967 GranSimBlock(tso, proc, bh); /* GranSim statistics gathering */
1969 /* Now, put tso into BQ (similar to blocking entry codes) */
1970 info = get_itbl(bh);
1971 switch (info -> type) {
1974 case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1975 case SE_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1976 case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
1977 /* basically an inlined version of BLACKHOLE_entry -- HWL */
1978 /* Change the BLACKHOLE into a BLACKHOLE_BQ */
1979 ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1980 /* Put ourselves on the blocking queue for this black hole */
1981 // tso->link=END_TSO_QUEUE; not necessary; see assertion above
1982 ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1983 tso->block_info.closure = bh;
1984 recordMutable((StgMutClosure *)bh);
1988 /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
1989 tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue);
1990 ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1991 recordMutable((StgMutClosure *)bh);
1993 # if 0 && defined(GC_MUT_REQUIRED)
1994 ToDo: check whether recordMutable is necessary -- HWL
1996 * If we modify a black hole in the old generation, we have to make
1997 * sure it goes on the mutables list
2000 if (bh <= StorageMgrInfo.OldLim) {
2001 MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
2002 StorageMgrInfo.OldMutables = bh;
2004 MUT_LINK(bh) = MUT_NOT_LINKED;
2009 barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
2015 barf("Qagh: thought %p was a black hole (IP %p (%s))",
2016 bh, info, info_type(bh));
2023 //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
2024 //@subsection Idle PEs
2027 Export work to idle PEs. This function is called from @ReSchedule@
2028 before dispatching on the current event. @HandleIdlePEs@ iterates over
2029 all PEs, trying to get work for idle PEs. Note, that this is a
2030 simplification compared to GUM's fishing model. We try to compensate for
2031 that by making the cost for stealing work dependent on the number of
2032 idle processors and thereby on the probability with which a randomly
2033 sent fish would find work.
2036 //@cindex handleIdlePEs
2043 IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
2045 /* Should never be entered in GrAnSim Light setup */
2046 ASSERT(!RtsFlags.GranFlags.Light);
2048 /* Could check whether there are idle PEs if it's a cheap check */
2049 for (p = 0; p < RtsFlags.GranFlags.proc; p++)
2050 if (procStatus[p]==Idle) /* && IS_SPARKING(p) && IS_STARTING(p) */
2051 /* First look for local work i.e. examine local spark pool! */
2052 if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
2053 new_event(p, p, CurrentTime[p],
2055 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2056 procStatus[p] = Sparking;
2057 } else if ((RtsFlags.GranFlags.maxFishes==0 ||
2058 OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
2060 /* If no local work then try to get remote work!
2061 Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
2062 if (RtsFlags.GranFlags.DoStealThreadsFirst &&
2063 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
2065 if (SurplusThreads > 0l) /* Steal a thread */
2068 if (procStatus[p]!=Idle)
2072 if (SparksAvail > 0 &&
2073 (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
2076 if (SurplusThreads > 0 &&
2077 (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
2083 Steal a spark and schedule moving it to proc. We want to look at PEs in
2084 clock order -- most retarded first. Currently sparks are only stolen
2085 from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
2086 this should be changed to first steal from the former then from the
2089 We model a sort of fishing mechanism by counting the number of sparks
2090 and threads we are currently stealing. */
2093 Return a random nat value in the intervall [from, to)
2103 /* random returns a value in [0, RAND_MAX] */
2104 r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
2105 r = (r==to) ? from : r;
2106 ASSERT(from<=r && (r<to || from==to));
2111 Find any PE other than proc. Used for GUM style fishing only.
2119 ASSERT(RtsFlags.GranFlags.Fishing);
2120 if (RtsFlags.GranFlags.RandomSteal) {
2121 p = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2125 IF_GRAN_DEBUG(randomSteal,
2126 belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
2133 Magic code for stealing sparks/threads makes use of global knowledge on
2137 sortPEsByTime (proc, pes_by_time, firstp, np)
2142 PEs p, temp, n, i, j;
2143 nat first, upb, r=0, q=0;
2145 ASSERT(!RtsFlags.GranFlags.Fishing);
2148 upb = RtsFlags.GranFlags.proc; /* full range of PEs */
2150 if (RtsFlags.GranFlags.RandomSteal) {
2151 r = natRandom(0,RtsFlags.GranFlags.proc); /* full range of PEs */
2157 /* pes_by_time shall contain processors from which we may steal sparks */
2158 for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
2159 if ((proc != p) && // not the current proc
2160 (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
2161 (CurrentTime[p] <= CurrentTime[CurrentProc]))
2162 pes_by_time[n++] = p;
2164 /* sort pes_by_time */
2165 for(i=0; i < n; ++i)
2166 for(j=i+1; j < n; ++j)
2167 if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
2168 rtsTime temp = pes_by_time[i];
2169 pes_by_time[i] = pes_by_time[j];
2170 pes_by_time[j] = temp;
2173 /* Choose random processor to steal spark from; first look at processors */
2174 /* that are earlier than the current one (i.e. proc) */
2176 (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
2180 /* if the assertion below is true we can get rid of first */
2181 /* ASSERT(first==n); */
2182 /* ToDo: check if first is really needed; find cleaner solution */
2189 Steal a spark (piece of work) from any processor and bring it to proc.
2191 //@cindex stealSpark
2193 stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
2196 Steal a thread from any processor and bring it to proc i.e. thread migration
2198 //@cindex stealThread
2200 stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
2203 Steal a spark or a thread and schedule moving it to proc.
2205 //@cindex stealSomething
2207 stealSomething(proc, steal_spark, steal_thread)
2208 PEs proc; // PE that needs work (stealer)
2209 rtsBool steal_spark, steal_thread; // should a spark and/or thread be stolen
2212 rtsTime fish_arrival_time;
2213 rtsSpark *spark, *prev, *next;
2214 rtsBool stolen = rtsFalse;
2216 ASSERT(steal_spark || steal_thread);
2218 /* Should never be entered in GrAnSim Light setup */
2219 ASSERT(!RtsFlags.GranFlags.Light);
2220 ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
2222 if (!RtsFlags.GranFlags.Fishing) {
2223 // ToDo: check if stealing threads is prefered over stealing sparks
2225 if (stealSparkMagic(proc))
2227 else // no spark found
2229 return stealThreadMagic(proc);
2230 else // no thread found
2232 } else { // ASSERT(steal_thread);
2233 return stealThreadMagic(proc);
2235 barf("stealSomething: never reached");
2238 /* The rest of this function does GUM style fishing */
2240 p = findRandomPE(proc); /* find a random PE other than proc */
2242 /* Message packing costs for sending a Fish; qeq jabbI'ID */
2243 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
2245 /* use another GranEvent for requesting a thread? */
2246 if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
2247 DumpRawGranEvent(p, proc, SP_REQUESTED,
2248 (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
2250 /* time of the fish arrival on the remote PE */
2251 fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
2253 /* Phps use an own Fish event for that? */
2254 /* The contents of the spark component is a HACK:
2255 1 means give me a spark;
2256 2 means give me a thread
2257 0 means give me nothing (this should never happen)
2259 new_event(p, proc, fish_arrival_time,
2261 (StgTSO*)NULL, (StgClosure*)NULL,
2262 (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
2264 ++OutstandingFishes[proc];
2265 /* only with Async fetching? */
2266 if (procStatus[proc]==Idle)
2267 procStatus[proc]=Fishing;
2269 /* time needed to clean up buffers etc after sending a message */
2270 CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
2272 /* If GUM style fishing stealing always succeeds because it only consists
2273 of sending out a fish; of course, when the fish may return
2279 This version of stealing a spark makes use of the global info on all
2280 spark pools etc which is not available in a real parallel system.
2281 This could be extended to test e.g. the impact of perfect load information.
2283 //@cindex stealSparkMagic
2285 stealSparkMagic(proc)
2288 PEs p=0, i=0, j=0, n=0, first, upb;
2289 rtsSpark *spark=NULL, *next;
2290 PEs pes_by_time[MAX_PROC];
2291 rtsBool stolen = rtsFalse;
2294 /* Should never be entered in GrAnSim Light setup */
2295 ASSERT(!RtsFlags.GranFlags.Light);
2297 sortPEsByTime(proc, pes_by_time, &first, &n);
2299 while (!stolen && n>0) {
2300 upb = (first==0) ? n : first;
2301 i = natRandom(0,upb); /* choose a random eligible PE */
2304 IF_GRAN_DEBUG(randomSteal,
2305 belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
2308 ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
2310 /* Now go through rtsSparkQ and steal the first eligible spark */
2312 spark = pending_sparks_hds[p];
2313 while (!stolen && spark != (rtsSpark*)NULL)
2315 /* NB: no prev pointer is needed here because all sparks that are not
2318 if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
2319 spark->next==(rtsSpark*)NULL)
2321 /* Be social! Don't steal the only spark of an idle processor
2322 not {spark} neH yInIH !! */
2323 break; /* next PE */
2325 else if (closure_SHOULD_SPARK(spark->node))
2327 /* Don't Steal local sparks;
2328 ToDo: optionally prefer local over global sparks
2329 if (!spark->global) {
2331 continue; next spark
2334 /* found a spark! */
2336 /* Prepare message for sending spark */
2337 CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
2339 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2340 DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
2341 (StgTSO*)NULL, spark->node,
2342 spark->name, spark_queue_len(p));
2344 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2349 new_event(proc, p /* CurrentProc */, stealtime,
2351 (StgTSO*)NULL, spark->node, spark);
2354 ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
2355 if (procStatus[proc]==Idle)
2356 procStatus[proc] = Fishing;
2357 ++(spark->global); /* record that this is a global spark */
2358 ASSERT(SparksAvail>0);
2359 --SparksAvail; /* on-the-fly sparks are not available */
2360 next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
2361 CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
2363 else /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
2365 IF_GRAN_DEBUG(checkSparkQ,
2366 belch("^^ pruning spark %p (node %p) in stealSparkMagic",
2367 spark, spark->node));
2369 /* if the spark points to a node that should not be sparked,
2370 prune the spark queue at this point */
2371 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2372 DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
2373 (StgTSO*)NULL, spark->node,
2374 spark->name, spark_queue_len(p));
2375 if (RtsFlags.GranFlags.GranSimStats.Global)
2376 globalGranStats.pruned_sparks++;
2378 ASSERT(SparksAvail>0);
2380 spark = delete_from_sparkq(spark, p, rtsTrue);
2382 /* unlink spark (may have been freed!) from sparkq;
2383 if (prev == NULL) // spark was head of spark queue
2384 pending_sparks_hds[p] = spark->next;
2386 prev->next = spark->next;
2387 if (spark->next == NULL)
2388 pending_sparks_tls[p] = prev;
2392 } /* while ... iterating over sparkq */
2394 /* ToDo: assert that PE p still has work left after stealing the spark */
2396 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2397 ASSERT(pes_by_time[i]==p);
2399 /* remove p from the list (at pos i) */
2400 for (j=i; j+1<n; j++)
2401 pes_by_time[j] = pes_by_time[j+1];
2404 /* update index to first proc which is later (or equal) than proc */
2407 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2411 } /* while ... iterating over PEs in pes_by_time */
2413 IF_GRAN_DEBUG(randomSteal,
2415 belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2416 spark, spark->node, proc, p,
2417 SparksAvail, idlers());
2419 belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
2420 proc, SparksAvail, idlers()));
2422 if (RtsFlags.GranFlags.GranSimStats.Global &&
2423 stolen && (i!=0)) { /* only for statistics */
2424 globalGranStats.rs_sp_count++;
2425 globalGranStats.ntimes_total += n;
2426 globalGranStats.fl_total += first;
2427 globalGranStats.no_of_steals++;
2434 The old stealThread code, which makes use of global info and does not
2436 NB: most of this is the same as in stealSparkMagic;
2437 only the pieces specific to processing thread queues are different;
2438 long live polymorphism!
2441 //@cindex stealThreadMagic
2443 stealThreadMagic(proc)
2446 PEs p=0, i=0, j=0, n=0, first, upb;
2447 StgTSO *tso=END_TSO_QUEUE;
2448 PEs pes_by_time[MAX_PROC];
2449 rtsBool stolen = rtsFalse;
2452 /* Should never be entered in GrAnSim Light setup */
2453 ASSERT(!RtsFlags.GranFlags.Light);
2455 sortPEsByTime(proc, pes_by_time, &first, &n);
2457 while (!stolen && n>0) {
2458 upb = (first==0) ? n : first;
2459 i = natRandom(0,upb); /* choose a random eligible PE */
2462 IF_GRAN_DEBUG(randomSteal,
2463 belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
2466 /* Steal the first exportable thread in the runnable queue but
2467 never steal the first in the queue for social reasons;
2468 not Qu' wa'DIch yInIH !!
2470 /* Would be better to search through queue and have options which of
2471 the threads to pick when stealing */
2472 if (run_queue_hds[p] == END_TSO_QUEUE) {
2473 IF_GRAN_DEBUG(randomSteal,
2474 belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)",
2477 tso = run_queue_hds[p]->link; /* tso is *2nd* thread in thread queue */
2481 /* update links in queue */
2482 run_queue_hds[p]->link = tso->link;
2483 if (run_queue_tls[p] == tso)
2484 run_queue_tls[p] = run_queue_hds[p];
2486 /* ToDo: Turn magic constants into params */
2488 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
2490 stealtime = (CurrentTime[p] > CurrentTime[proc] ?
2494 + 4l * RtsFlags.GranFlags.Costs.additional_latency
2495 + 5l * RtsFlags.GranFlags.Costs.munpacktime;
2497 /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
2498 SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */);
2500 /* Move from one queue to another */
2501 new_event(proc, p, stealtime,
2503 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
2505 /* MAKE_BUSY(proc); not yet; only when thread is in threadq */
2506 ++OutstandingFishes[proc];
2507 if (procStatus[proc])
2508 procStatus[proc] = Fishing;
2511 if(RtsFlags.GranFlags.GranSimStats.Full)
2512 DumpRawGranEvent(p, proc,
2514 tso, (StgClosure*)NULL, (StgInt)0, 0);
2516 /* costs for tidying up buffer after having sent it */
2517 CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
2520 /* ToDo: assert that PE p still has work left after stealing the spark */
2522 if (!stolen && (n>0)) { /* nothing stealable from proc p :( */
2523 ASSERT(pes_by_time[i]==p);
2525 /* remove p from the list (at pos i) */
2526 for (j=i; j+1<n; j++)
2527 pes_by_time[j] = pes_by_time[j+1];
2530 /* update index to first proc which is later (or equal) than proc */
2533 (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2537 } /* while ... iterating over PEs in pes_by_time */
2539 IF_GRAN_DEBUG(randomSteal,
2541 belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2542 tso->id, tso, proc, p,
2543 SparksAvail, idlers());
2545 belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
2546 proc, SparksAvail, idlers()));
2548 if (RtsFlags.GranFlags.GranSimStats.Global &&
2549 stolen && (i!=0)) { /* only for statistics */
2550 /* ToDo: more statistics on avg thread queue lenght etc */
2551 globalGranStats.rs_t_count++;
2552 globalGranStats.no_of_migrates++;
2558 //@cindex sparkStealTime
2560 sparkStealTime(void)
2562 double fishdelay, sparkdelay, latencydelay;
2563 fishdelay = (double)RtsFlags.GranFlags.proc/2;
2564 sparkdelay = fishdelay -
2565 ((fishdelay-1.0)/(double)(RtsFlags.GranFlags.proc-1))*((double)idlers());
2566 latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
2568 return((rtsTime)latencydelay);
2571 //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
2572 //@subsection Routines directly called from Haskell world
2574 The @GranSim...@ routines in here are directly called via macros from the
2577 First some auxiliary routines.
2580 /* Take the current thread off the thread queue and thereby activate the
2581 next thread. It's assumed that the next ReSchedule after this uses
2582 NEW_THREAD as param.
2583 This fct is called from GranSimBlock and GranSimFetch
2586 //@cindex ActivateNextThread
2589 ActivateNextThread (proc)
2594 This routine is entered either via GranSimFetch or via GranSimBlock.
2595 It has to prepare the CurrentTSO for being blocked and update the
2596 run queue and other statistics on PE proc. The actual enqueuing to the
2597 blocking queue (if coming from GranSimBlock) is done in the entry code
2598 of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
2600 /* ToDo: add assertions here!! */
2601 //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
2603 // Only necessary if the running thread is at front of the queue
2604 // run_queue_hds[proc] = run_queue_hds[proc]->link;
2605 ASSERT(CurrentProc==proc);
2606 ASSERT(!is_on_queue(CurrentTSO,proc));
2607 if (run_queue_hds[proc]==END_TSO_QUEUE) {
2608 /* NB: this routine is only entered with asynchr comm (see assertion) */
2609 procStatus[proc] = Idle;
2611 /* ToDo: check cost assignment */
2612 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
2613 if (RtsFlags.GranFlags.GranSimStats.Full &&
2614 (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight))
2615 /* right flag !?? ^^^ */
2616 DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
2617 (StgClosure*)NULL, (StgInt)0, 0);
2622 The following GranSim fcts are stg-called from the threaded world.
2625 /* Called from HP_CHK and friends (see StgMacros.h) */
2626 //@cindex GranSimAllocate
2631 CurrentTSO->gran.allocs += n;
2632 ++(CurrentTSO->gran.basicblocks);
2634 if (RtsFlags.GranFlags.GranSimStats.Heap) {
2635 DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
2636 (StgClosure*)NULL, (StgInt)0, n);
2639 CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
2640 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
2644 Subtract the values added above, if a heap check fails and
2645 so has to be redone.
2647 //@cindex GranSimUnallocate
2649 GranSimUnallocate(n)
2652 CurrentTSO->gran.allocs -= n;
2653 --(CurrentTSO->gran.basicblocks);
2655 CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2656 CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2659 /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
2660 //@cindex GranSimExec
2662 GranSimExec(ariths,branches,loads,stores,floats)
2663 StgWord ariths,branches,loads,stores,floats;
2665 StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths +
2666 RtsFlags.GranFlags.Costs.branch_cost*branches +
2667 RtsFlags.GranFlags.Costs.load_cost * loads +
2668 RtsFlags.GranFlags.Costs.store_cost*stores +
2669 RtsFlags.GranFlags.Costs.float_cost*floats;
2671 CurrentTSO->gran.exectime += cost;
2672 CurrentTime[CurrentProc] += cost;
2676 Fetch the node if it isn't local
2677 -- result indicates whether fetch has been done.
2679 This is GRIP-style single item fetching.
2682 //@cindex GranSimFetch
2684 GranSimFetch(node /* , liveness_mask */ )
2686 /* StgInt liveness_mask; */
2688 /* reset the return value (to be checked within STG land) */
2689 NeedToReSchedule = rtsFalse;
2691 if (RtsFlags.GranFlags.Light) {
2692 /* Always reschedule in GrAnSim-Light to prevent one TSO from
2694 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2695 ContinueThread,CurrentTSO,node,NULL);
2700 /* Faking an RBH closure:
2701 If the bitmask of the closure is 0 then this node is a fake RBH;
2703 if (node->header.gran.procs == Nowhere) {
2705 belch("## Found fake RBH (node %p); delaying TSO %d (%p)",
2706 node, CurrentTSO->id, CurrentTSO));
2708 new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
2709 ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
2711 /* Rescheduling (GranSim internal) is necessary */
2712 NeedToReSchedule = rtsTrue;
2717 /* Note: once a node has been fetched, this test will be passed */
2718 if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
2720 PEs p = where_is(node);
2723 IF_GRAN_DEBUG(thunkStealing,
2725 belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
2727 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2728 /* NB: Fetch is counted on arrival (FetchReply) */
2730 fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
2731 RtsFlags.GranFlags.Costs.latency;
2733 new_event(p, CurrentProc, fetchtime,
2734 FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
2736 if (fetchtime<TimeOfNextEvent)
2737 TimeOfNextEvent = fetchtime;
2739 /* About to block */
2740 CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
2742 ++OutstandingFetches[CurrentProc];
2744 if (RtsFlags.GranFlags.DoAsyncFetch)
2745 /* if asynchr comm is turned on, activate the next thread in the q */
2746 ActivateNextThread(CurrentProc);
2748 procStatus[CurrentProc] = Fetching;
2751 /* ToDo: nuke the entire if (anything special for fair schedule?) */
2752 if (RtsFlags.GranFlags.DoAsyncFetch)
2754 /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
2755 if(!RtsFlags.GranFlags.DoFairSchedule)
2757 /* now done in do_the_fetchnode
2758 if (RtsFlags.GranFlags.GranSimStats.Full)
2759 DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
2760 node, (StgInt)0, 0);
2762 ActivateNextThread(CurrentProc);
2764 # if 0 && defined(GRAN_CHECK)
2765 if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
2766 if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
2767 fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
2768 CurrentTSO,CurrentTime[CurrentProc]);
2769 stg_exit(EXIT_FAILURE);
2771 TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
2775 CurrentTSO->link = END_TSO_QUEUE;
2776 /* CurrentTSO = END_TSO_QUEUE; */
2778 /* CurrentTSO is pointed to by the FetchNode event; it is
2779 on no run queue any more */
2780 } else { /* fair scheduling currently not supported -- HWL */
2781 barf("Asynchr communication is not yet compatible with fair scheduling\n");
2783 } else { /* !RtsFlags.GranFlags.DoAsyncFetch */
2784 procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
2785 /* now done in do_the_fetchnode
2786 if (RtsFlags.GranFlags.GranSimStats.Full)
2787 DumpRawGranEvent(CurrentProc, p,
2788 GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
2790 IF_GRAN_DEBUG(blockOnFetch,
2791 BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
2795 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2797 /* Rescheduling (GranSim internal) is necessary */
2798 NeedToReSchedule = rtsTrue;
2805 //@cindex GranSimSpark
2807 GranSimSpark(local,node)
2811 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2812 if (RtsFlags.GranFlags.GranSimStats.Sparks)
2813 DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
2814 END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
2816 /* Force the PE to take notice of the spark */
2817 if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2818 new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2820 END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
2821 if (CurrentTime[CurrentProc]<TimeOfNextEvent)
2822 TimeOfNextEvent = CurrentTime[CurrentProc];
2826 ++CurrentTSO->gran.localsparks;
2828 ++CurrentTSO->gran.globalsparks;
2831 //@cindex GranSimSparkAt
2833 GranSimSparkAt(spark,where,identifier)
2835 StgClosure *where; /* This should be a node; alternatively could be a GA */
2838 PEs p = where_is(where);
2839 GranSimSparkAtAbs(spark,p,identifier);
2842 //@cindex GranSimSparkAtAbs
2844 GranSimSparkAtAbs(spark,proc,identifier)
2851 if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
2852 return; /* turned a spark into a NULL. */
2854 /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2855 if(RtsFlags.GranFlags.GranSimStats.Sparks)
2856 DumpRawGranEvent(proc,0,SP_SPARKAT,
2857 END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
2859 if (proc!=CurrentProc) {
2860 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2861 exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]?
2862 CurrentTime[proc]: CurrentTime[CurrentProc])
2863 + RtsFlags.GranFlags.Costs.latency;
2865 exporttime = CurrentTime[CurrentProc];
2868 if ( RtsFlags.GranFlags.Light )
2869 /* Need CurrentTSO in event field to associate costs with creating
2870 spark even in a GrAnSim Light setup */
2871 new_event(proc, CurrentProc, exporttime,
2873 CurrentTSO, spark->node, spark);
2875 new_event(proc, CurrentProc, exporttime,
2876 MoveSpark, (StgTSO*)NULL, spark->node, spark);
2877 /* Bit of a hack to treat placed sparks the same as stolen sparks */
2878 ++OutstandingFishes[proc];
2880 /* Force the PE to take notice of the spark (FINDWORK is put after a
2881 MoveSpark into the sparkq!) */
2882 if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2883 new_event(CurrentProc,CurrentProc,exporttime+1,
2885 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2888 if (exporttime<TimeOfNextEvent)
2889 TimeOfNextEvent = exporttime;
2891 if (proc!=CurrentProc) {
2892 CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2893 ++CurrentTSO->gran.globalsparks;
2895 ++CurrentTSO->gran.localsparks;
2900 This function handles local and global blocking. It's called either
2901 from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
2902 trying to fetch an BH or RBH
2905 //@cindex GranSimBlock
2907 GranSimBlock(tso, proc, node)
2912 PEs node_proc = where_is(node),
2913 tso_proc = where_is((StgClosure *)tso);
2915 ASSERT(tso_proc==CurrentProc);
2916 // ASSERT(node_proc==CurrentProc);
2918 if (node_proc!=CurrentProc)
2919 belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
2920 tso->id, tso, tso_proc, node, node_proc));
2921 ASSERT(tso->link==END_TSO_QUEUE);
2922 ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
2923 //ASSERT(tso==run_queue_hds[proc]);
2926 belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
2927 tso->id, tso, proc, node, CurrentTime[proc]));
2930 /* THIS SHOULD NEVER HAPPEN!
2931 If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
2932 we have missed a GranSimFetch before entering this closure;
2933 we hack around it for now, faking a FetchNode;
2934 because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
2935 tso will be blocked on this closure until the FetchReply occurs.
2939 if (node_proc!=CurrentProc) {
2941 ret = GranSimFetch(node);
2944 belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
2945 node, node_proc, CurrentProc););
2950 if (RtsFlags.GranFlags.GranSimStats.Full)
2951 DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
2953 ++(tso->gran.blockcount);
2954 /* Distinction between local and global block is made in blockFetch */
2955 tso->gran.blockedat = CurrentTime[proc];
2957 CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
2958 ActivateNextThread(proc);
2959 /* tso->link = END_TSO_QUEUE; not really necessary; only for testing */
2964 //@node Index, , Dumping routines, GranSim specific code
2968 //* ActivateNextThread:: @cindex\s-+ActivateNextThread
2969 //* CurrentProc:: @cindex\s-+CurrentProc
2970 //* CurrentTime:: @cindex\s-+CurrentTime
2971 //* GranSimAllocate:: @cindex\s-+GranSimAllocate
2972 //* GranSimBlock:: @cindex\s-+GranSimBlock
2973 //* GranSimExec:: @cindex\s-+GranSimExec
2974 //* GranSimFetch:: @cindex\s-+GranSimFetch
2975 //* GranSimLight_insertThread:: @cindex\s-+GranSimLight_insertThread
2976 //* GranSimSpark:: @cindex\s-+GranSimSpark
2977 //* GranSimSparkAt:: @cindex\s-+GranSimSparkAt
2978 //* GranSimSparkAtAbs:: @cindex\s-+GranSimSparkAtAbs
2979 //* GranSimUnallocate:: @cindex\s-+GranSimUnallocate
2980 //* any_idle:: @cindex\s-+any_idle
2981 //* blockFetch:: @cindex\s-+blockFetch
2982 //* do_the_fetchnode:: @cindex\s-+do_the_fetchnode
2983 //* do_the_fetchreply:: @cindex\s-+do_the_fetchreply
2984 //* do_the_findwork:: @cindex\s-+do_the_findwork
2985 //* do_the_globalblock:: @cindex\s-+do_the_globalblock
2986 //* do_the_movespark:: @cindex\s-+do_the_movespark
2987 //* do_the_movethread:: @cindex\s-+do_the_movethread
2988 //* do_the_startthread:: @cindex\s-+do_the_startthread
2989 //* do_the_unblock:: @cindex\s-+do_the_unblock
2990 //* fetchNode:: @cindex\s-+fetchNode
2991 //* ga_to_proc:: @cindex\s-+ga_to_proc
2992 //* get_next_event:: @cindex\s-+get_next_event
2993 //* get_time_of_next_event:: @cindex\s-+get_time_of_next_event
2994 //* grab_event:: @cindex\s-+grab_event
2995 //* handleFetchRequest:: @cindex\s-+handleFetchRequest
2996 //* handleIdlePEs:: @cindex\s-+handleIdlePEs
2997 //* idlers:: @cindex\s-+idlers
2998 //* insertThread:: @cindex\s-+insertThread
2999 //* insert_event:: @cindex\s-+insert_event
3000 //* is_on_queue:: @cindex\s-+is_on_queue
3001 //* is_unique:: @cindex\s-+is_unique
3002 //* new_event:: @cindex\s-+new_event
3003 //* prepend_event:: @cindex\s-+prepend_event
3004 //* print_event:: @cindex\s-+print_event
3005 //* print_eventq:: @cindex\s-+print_eventq
3006 //* prune_eventq :: @cindex\s-+prune_eventq
3007 //* spark queue:: @cindex\s-+spark queue
3008 //* sparkStealTime:: @cindex\s-+sparkStealTime
3009 //* stealSomething:: @cindex\s-+stealSomething
3010 //* stealSpark:: @cindex\s-+stealSpark
3011 //* stealSparkMagic:: @cindex\s-+stealSparkMagic
3012 //* stealThread:: @cindex\s-+stealThread
3013 //* stealThreadMagic:: @cindex\s-+stealThreadMagic
3014 //* thread_queue_len:: @cindex\s-+thread_queue_len
3015 //* traverse_eventq_for_gc:: @cindex\s-+traverse_eventq_for_gc
3016 //* where_is:: @cindex\s-+where_is