RTS tidyup sweep, first phase
[ghc-hetmet.git] / rts / parallel / GranSim.c
1 /* 
2    Time-stamp: <2009-07-06 21:48:36 simonmar>
3
4    Variables and functions specific to GranSim the parallelism simulator
5    for GPH.
6 */
7
8 //@node GranSim specific code, , ,
9 //@section GranSim specific code
10
11 /*
12    Macros for dealing with the new and improved GA field for simulating
13    parallel execution. Based on @CONCURRENT@ package. The GA field now
14    contains a mask, where the n-th bit stands for the n-th processor, where
15    this data can be found. In case of multiple copies, several bits are
16    set. The total number of processors is bounded by @MAX_PROC@, which
17    should be <= the length of a word in bits.  -- HWL 
18 */
19
20 //@menu
21 //* Includes::                  
22 //* Prototypes and externs::    
23 //* Constants and Variables::   
24 //* Initialisation::            
25 //* Global Address Operations::  
26 //* Global Event Queue::        
27 //* Spark queue functions::     
28 //* Scheduling functions::      
29 //* Thread Queue routines::     
30 //* GranSim functions::         
31 //* GranSimLight routines::     
32 //* Code for Fetching Nodes::   
33 //* Idle PEs::                  
34 //* Routines directly called from Haskell world::  
35 //* Emiting profiling info for GrAnSim::  
36 //* Dumping routines::          
37 //* Index::                     
38 //@end menu
39
40 //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
41 //@subsection Includes
42
43 #if defined(GRAN)
44
45 #include "Rts.h"
46 #include "RtsFlags.h"
47 #include "RtsUtils.h"
48 #include "StgMiscClosures.h"
49 #include "StgTypes.h"
50 #include "Storage.h"       // for recordMutable
51 #include "Schedule.h"
52 #include "SchedAPI.h"       // for pushClosure
53 #include "GranSimRts.h"
54 #include "GranSim.h"
55 #include "ParallelRts.h"
56 #include "ParallelDebug.h"
57 #include "Sparks.h"
58
59
60 //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
61 //@subsection Prototypes and externs
62
63 /* Prototypes */
64 static inline PEs      ga_to_proc(StgWord);
65 static inline rtsBool  any_idle(void);
66 static inline nat      idlers(void);
67        PEs             where_is(StgClosure *node);
68
69 static rtsBool         stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
70 static rtsBool         stealSpark(PEs proc);
71 static rtsBool         stealThread(PEs proc);
72 static rtsBool         stealSparkMagic(PEs proc);
73 static rtsBool         stealThreadMagic(PEs proc);
74 /* subsumed by stealSomething
75 static void            stealThread(PEs proc); 
76 static void            stealSpark(PEs proc);
77 */
78 static rtsTime         sparkStealTime(void);
79 static nat             natRandom(nat from, nat to);
80 static PEs             findRandomPE(PEs proc);
81 static void            sortPEsByTime (PEs proc, PEs *pes_by_time, 
82                                       nat *firstp, nat *np);
83
84 void GetRoots(void);
85
86 #endif /* GRAN */
87
88 //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
89 //@subsection Constants and Variables
90
91 #if defined(GRAN) || defined(PAR)
92 /* See GranSim.h for the definition of the enum gran_event_types */
93 char *gran_event_names[] = {
94     "START", "START(Q)",
95     "STEALING", "STOLEN", "STOLEN(Q)",
96     "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
97     "SCHEDULE", "DESCHEDULE",
98     "END",
99     "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
100     "ALLOC",
101     "TERMINATE",
102     "SYSTEM_START", "SYSTEM_END",           /* only for debugging */
103     "??"
104 };
105 #endif
106
107 #if defined(GRAN)                                              /* whole file */
108 char *proc_status_names[] = {
109   "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy", 
110   "UnknownProcStatus"
111 };
112
113 /* For internal use (event statistics) only */
114 char *event_names[] =
115     { "ContinueThread", "StartThread", "ResumeThread", 
116       "MoveSpark", "MoveThread", "FindWork",
117       "FetchNode", "FetchReply",
118       "GlobalBlock", "UnblockThread"
119     };
120
121 //@cindex CurrentProc
122 PEs CurrentProc = 0;
123
124 /*
125   ToDo: Create a structure for the processor status and put all the 
126         arrays below into it. 
127   -- HWL */
128
129 //@cindex CurrentTime
130 /* One clock for each PE */
131 rtsTime CurrentTime[MAX_PROC];  
132
133 /* Useful to restrict communication; cf fishing model in GUM */
134 nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
135
136 /* Status of each PE (new since but independent of GranSim Light) */
137 rtsProcStatus procStatus[MAX_PROC];
138
139 # if defined(GRAN) && defined(GRAN_CHECK)
140 /* To check if the RTS ever tries to run a thread that should be blocked
141    because of fetching remote data */
142 StgTSO *BlockedOnFetch[MAX_PROC];
143 # define FETCH_MASK_TSO  0x08000000      /* only bits 0, 1, 2 should be used */
144 # endif
145
146 nat SparksAvail = 0;     /* How many sparks are available */
147 nat SurplusThreads = 0;  /* How many excess threads are there */
148
149 /* Do we need to reschedule following a fetch? */
150 rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse; 
151 rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
152
153 //@cindex spark queue
154 /* GranSim: a globally visible array of spark queues */
155 rtsSparkQ pending_sparks_hds[MAX_PROC];
156 rtsSparkQ pending_sparks_tls[MAX_PROC];
157
158 nat sparksIgnored = 0, sparksCreated = 0;
159
160 GlobalGranStats globalGranStats;
161
162 nat gran_arith_cost, gran_branch_cost, gran_load_cost, 
163     gran_store_cost, gran_float_cost;
164
165 /*
166 Old comment from 0.29. ToDo: Check and update -- HWL
167
168 The following variables control the behaviour of GrAnSim. In general, there
169 is one RTS option for enabling each of these features. In getting the
170 desired setup of GranSim the following questions have to be answered:
171 \begin{itemize}
172 \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)? 
173       Currently only unfair scheduling is supported.
174 \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)? 
175       Either block and wait for the
176       data or reschedule and do some other work.
177       Thus, if this variable is true, asynchronous communication is
178       modelled. Block on fetch mainly makes sense for incremental fetching.
179
180       There is also a simplified fetch variant available
181       (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
182       communication. It is faster but the results will be less accurate.
183 \item How aggressive to be in getting work after a reschedule on fetch
184       (@RtsFlags.GranFlags.FetchStrategy@)?
185       This is determined by the so-called {\em fetching
186       strategy\/}. Currently, there are four possibilities:
187       \begin{enumerate}
188        \item Only run a runnable thread.
189        \item Turn a spark into a thread, if necessary.
190        \item Steal a remote spark, if necessary.
191        \item Steal a runnable thread from another processor, if necessary.
192       \end{itemize}
193       The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
194       when rescheduling on a fetch.
195 \item Should sparks or threads be stolen first when looking for work
196       (@RtsFlags.GranFlags.DoStealThreadsFirst@)? 
197       The default is to steal sparks first (much cheaper).
198 \item Should the RTS use a lazy thread creation scheme
199       (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)?  By default yes i.e.\ sparks are only
200       turned into threads when work is needed. Also note, that sparks
201       can be discarded by the RTS (this is done in the case of an overflow
202       of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@  to @True@ forces
203       the creation of threads at the next possibility (i.e.\ when new work
204       is demanded the next time).
205 \item Should data be fetched closure-by-closure or in packets
206       (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental 
207       (i.e.\ closure-by-closure) strategy. This makes sense in a
208       low-latency setting but is bad in a high-latency system. Setting 
209       @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
210       parameters determine the size of the packets (@pack_buffer_size@) and the number of
211       thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
212 \item If there is no other possibility to find work, should runnable threads
213       be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
214       RTS tried to get sparks (either local or remote ones) first. Thread
215       migration is very expensive, since a whole TSO has to be transferred
216       and probably data locality becomes worse in the process. Note, that
217       the closure, which will be evaluated next by that TSO is not
218       transferred together with the TSO (that might block another thread).
219 \item Should the RTS distinguish between sparks created by local nodes and
220       stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)?  The idea is to improve 
221       data locality by preferring sparks of local nodes (it is more likely
222       that the data for those sparks is already on the local processor). 
223       However, such a distinction also imposes an overhead on the spark
224       queue management, and typically a large number of sparks are
225       generated during execution. By default this variable is set to @False@.
226 \item Should the RTS use granularity control mechanisms? The idea of a 
227       granularity control mechanism is to make use of granularity
228       information provided via annotation of the @par@ construct in order
229       to prefer bigger threads when either turning a spark into a thread or
230       when choosing the next thread to schedule. Currently, three such
231       mechanisms are implemented:
232       \begin{itemize}
233         \item Cut-off: The granularity information is interpreted as a
234               priority. If a threshold priority is given to the RTS, then
235               only those sparks with a higher priority than the threshold 
236               are actually created. Other sparks are immediately discarded.
237               This is similar to a usual cut-off mechanism often used in 
238               parallel programs, where parallelism is only created if the 
239               input data is lage enough. With this option, the choice is 
240               hidden in the RTS and only the threshold value has to be 
241               provided as a parameter to the runtime system.
242         \item Priority Sparking: This mechanism keeps priorities for sparks
243               and chooses the spark with the highest priority when turning
244               a spark into a thread. After that the priority information is
245               discarded. The overhead of this mechanism comes from
246               maintaining a sorted spark queue.
247         \item Priority Scheduling: This mechanism keeps the granularity
248               information for threads, to. Thus, on each reschedule the 
249               largest thread is chosen. This mechanism has a higher
250               overhead, as the thread queue is sorted, too.
251        \end{itemize}  
252 \end{itemize}
253 */
254
255 //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
256 //@subsection Initialisation
257
258 void 
259 init_gr_stats (void) {
260   memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
261 #if 0
262   /* event stats */
263   globalGranStats.noOfEvents = 0;
264   for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
265
266   /* communication stats */
267   globalGranStats.fetch_misses = 0;
268   globalGranStats.tot_low_pri_sparks = 0;
269
270   /* obscure stats */  
271   globalGranStats.rs_sp_count = 0;
272   globalGranStats.rs_t_count = 0;
273   globalGranStats.ntimes_total = 0, 
274   globalGranStats.fl_total = 0;
275   globalGranStats.no_of_steals = 0;
276
277   /* spark queue stats */
278   globalGranStats.tot_sq_len = 0, 
279   globalGranStats.tot_sq_probes = 0; 
280   globalGranStats.tot_sparks = 0;
281   globalGranStats.withered_sparks = 0;
282   globalGranStats.tot_add_threads = 0;
283   globalGranStats.tot_tq_len = 0;
284   globalGranStats.non_end_add_threads = 0;
285
286   /* thread stats */
287   globalGranStats.tot_threads_created = 0;
288   for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
289 #endif /* 0 */
290 }
291
292 //@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
293 //@subsection Global Address Operations
294 /*
295   ----------------------------------------------------------------------
296   Global Address Operations
297
298   These functions perform operations on the global-address (ga) part of a
299   closure. The ga is the only new field (1 word) in a closure introduced by
300   GrAnSim. It serves as a bitmask, indicating on which processor the
301   closure is residing. Since threads are described by Thread State Object
302   (TSO), which is nothing but another kind of closure, this scheme allows
303   gives placement information about threads.
304
305   A ga is just a bitmask, so the operations on them are mainly bitmask
306   manipulating functions. Note, that there are important macros like PROCS,
307   IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
308
309   NOTE: In GrAnSim-light we don't maintain placement information. This
310   allows to simulate an arbitrary number of processors. The price we have
311   to be is the lack of costing any communication properly. In short,
312   GrAnSim-light is meant to reveal the maximal parallelism in a program.
313   From an implementation point of view the important thing is: {\em
314   GrAnSim-light does not maintain global-addresses}.  */
315
316 /* ga_to_proc returns the first processor marked in the bitmask ga.
317    Normally only one bit in ga should be set. But for PLCs all bits
318    are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
319  
320 //@cindex ga_to_proc
321
322 static inline PEs
323 ga_to_proc(StgWord ga)
324 {
325     PEs i;
326     for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
327     ASSERT(i<RtsFlags.GranFlags.proc);
328     return (i);
329 }
330
331 /* NB: This takes a *node* rather than just a ga as input */
332 //@cindex where_is
333 PEs
334 where_is(StgClosure *node)
335 { return (ga_to_proc(PROCS(node))); }
336
337 // debugging only
338 //@cindex is_unique
339 rtsBool
340 is_unique(StgClosure *node)
341
342   PEs i;
343   rtsBool unique = rtsFalse;
344
345   for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
346     if (IS_LOCAL_TO(PROCS(node), i))
347       if (unique)          // exactly 1 instance found so far
348         return rtsFalse;   // found a 2nd instance => not unique
349       else 
350         unique = rtsTrue;  // found 1st instance 
351   ASSERT(unique);          // otherwise returned from within loop
352   return (unique);
353 }
354
355 //@cindex any_idle
356 static inline rtsBool
357 any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
358  PEs i; 
359  rtsBool any_idle; 
360  for(i=0, any_idle=rtsFalse; 
361      !any_idle && i<RtsFlags.GranFlags.proc; 
362      any_idle = any_idle || procStatus[i] == Idle, i++) 
363  {} ;
364 }
365
366 //@cindex idlers
367 static inline nat
368 idlers(void) {  /* number of idle PEs */
369  PEs i, j; 
370  for(i=0, j=0;
371      i<RtsFlags.GranFlags.proc; 
372      j += (procStatus[i] == Idle) ? 1 : 0, i++) 
373  {} ;
374  return j;
375 }
376
377 //@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
378 //@subsection Global Event Queue
379 /*
380 The following routines implement an ADT of an event-queue (FIFO). 
381 ToDo: Put that in an own file(?)
382 */
383
384 /* Pointer to the global event queue; events are currently malloc'ed */
385 rtsEventQ EventHd = NULL;
386
387 //@cindex get_next_event
388 rtsEvent *
389 get_next_event(void)
390 {
391   static rtsEventQ entry = NULL;
392
393   if (EventHd == NULL) {
394     barf("No next event. This may be caused by a circular data dependency in the program.");
395   }
396
397   if (entry != NULL)
398     free((char *)entry);
399
400   if (RtsFlags.GranFlags.GranSimStats.Global) {     /* count events */
401     globalGranStats.noOfEvents++;
402     globalGranStats.event_counts[EventHd->evttype]++;
403   }
404
405   entry = EventHd;
406
407   IF_GRAN_DEBUG(event_trace,
408            print_event(entry));
409
410   EventHd = EventHd->next;
411   return(entry);
412 }
413
414 /* When getting the time of the next event we ignore CONTINUETHREAD events:
415    we don't want to be interrupted before the end of the current time slice
416    unless there is something important to handle. 
417 */
418 //@cindex get_time_of_next_event
419 rtsTime
420 get_time_of_next_event(void)
421
422   rtsEventQ event = EventHd;
423
424   while (event != NULL && event->evttype==ContinueThread) {
425     event = event->next;
426   }
427   if(event == NULL)
428       return ((rtsTime) 0);
429   else
430       return (event->time);
431 }
432
433 /* ToDo: replace malloc/free with a free list */
434 //@cindex insert_event
435 void
436 insert_event(newentry)
437 rtsEvent *newentry;
438 {
439   rtsEventType evttype = newentry->evttype;
440   rtsEvent *event, **prev;
441
442   /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
443
444   /* Search the queue and insert at the right point:
445      FINDWORK before everything, CONTINUETHREAD after everything.
446
447      This ensures that we find any available work after all threads have
448      executed the current cycle.  This level of detail would normally be
449      irrelevant, but matters for ridiculously low latencies...
450   */
451
452   /* Changed the ordering: Now FINDWORK comes after everything but 
453      CONTINUETHREAD. This makes sure that a MOVESPARK comes before a 
454      FINDWORK. This is important when a GranSimSparkAt happens and
455      DoAlwaysCreateThreads is turned on. Also important if a GC occurs
456      when trying to build a new thread (see much_spark)  -- HWL 02/96  */
457
458   if(EventHd == NULL)
459     EventHd = newentry;
460   else {
461     for (event = EventHd, prev=(rtsEvent**)&EventHd; 
462          event != NULL; 
463          prev = (rtsEvent**)&(event->next), event = event->next) {
464       switch (evttype) {
465         case FindWork: if ( event->time < newentry->time ||
466                             ( (event->time == newentry->time) &&
467                               (event->evttype != ContinueThread) ) )
468                          continue;
469                        else
470                          break;
471         case ContinueThread: if ( event->time <= newentry->time )
472                                continue;
473                              else
474                                break;
475         default: if ( event->time < newentry->time || 
476                       ((event->time == newentry->time) &&
477                        (event->evttype == newentry->evttype)) )
478                    continue;
479                  else
480                    break;
481        }
482        /* Insert newentry here (i.e. before event) */
483        *prev = newentry;
484        newentry->next = event;
485        break;
486     }
487     if (event == NULL)
488       *prev = newentry;
489   }
490 }
491
492 //@cindex new_event
493 void
494 new_event(proc,creator,time,evttype,tso,node,spark)
495 PEs proc, creator;
496 rtsTime time;
497 rtsEventType evttype;
498 StgTSO *tso;
499 StgClosure *node;
500 rtsSpark *spark;
501 {
502   rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
503
504   newentry->proc     = proc;
505   newentry->creator  = creator;
506   newentry->time     = time;
507   newentry->evttype  = evttype;
508   newentry->tso      = tso;
509   newentry->node     = node;
510   newentry->spark    = spark;
511   newentry->gc_info  = 0;
512   newentry->next     = NULL;
513
514   insert_event(newentry);
515
516   IF_DEBUG(gran, 
517            fprintf(stderr, "GRAN: new_event: \n"); 
518            print_event(newentry));
519 }
520
521 //@cindex prepend_event
522 void
523 prepend_event(event)       /* put event at beginning of EventQueue */
524 rtsEvent *event;
525 {                                 /* only used for GC! */
526  event->next = EventHd;
527  EventHd = event;
528 }
529
530 //@cindex grab_event
531 rtsEventQ
532 grab_event(void)             /* undo prepend_event i.e. get the event */
533 {                        /* at the head of EventQ but don't free anything */
534  rtsEventQ event = EventHd;
535
536  if (EventHd == NULL) {
537    barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
538  }
539
540  EventHd = EventHd->next;
541  return (event);
542 }
543
544 //@cindex traverse_eventq_for_gc
545 void 
546 traverse_eventq_for_gc(void)
547 {
548  rtsEventQ event = EventHd;
549  StgWord bufsize;
550  StgClosure *closurep;
551  StgTSO *tsop;
552  StgPtr buffer, bufptr;
553  PEs proc, creator;
554
555  /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
556     orig closure (root of packed graph). This means that a graph, which is
557     between processors at the time of GC is fetched again at the time when
558     it would have arrived, had there been no GC. Slightly inaccurate but
559     safe for GC.
560     This is only needed for GUM style fetchng. -- HWL */
561  if (!RtsFlags.GranFlags.DoBulkFetching)
562    return;
563
564  for(event = EventHd; event!=NULL; event=event->next) {
565    if (event->evttype==FetchReply) {
566      buffer = stgCast(StgPtr,event->node);
567      ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG);  /* It's a pack buffer */
568      bufsize = buffer[PACK_SIZE_LOCN];
569      closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
570      tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
571      proc = event->proc;
572      creator = event->creator;                 /* similar to unpacking */
573      for (bufptr=buffer+PACK_HDR_SIZE; 
574           bufptr<(buffer+bufsize);
575           bufptr++) {
576          // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
577          //      (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
578            if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
579              convertFromRBH(stgCast(StgClosure *,bufptr));
580          }
581      }
582      free(buffer);
583      event->evttype = FetchNode;
584      event->proc    = creator;
585      event->creator = proc;
586      event->node    = closurep;
587      event->tso     = tsop;
588      event->gc_info = 0;
589    }
590  }
591 }
592
593 void
594 markEventQueue(void)
595
596   StgClosure *MarkRoot(StgClosure *root); // prototype
597
598   rtsEventQ event = EventHd;
599   nat len;
600
601   /* iterate over eventq and register relevant fields in event as roots */
602   for(event = EventHd, len =  0; event!=NULL; event=event->next, len++) {
603     switch (event->evttype) {
604       case ContinueThread:  
605         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
606         break;
607       case StartThread: 
608         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
609         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
610         break;
611       case ResumeThread:
612         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
613         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
614         break;
615       case MoveSpark:
616         event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
617         break;
618       case MoveThread:
619         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
620         break;
621       case FindWork:
622         break;
623       case FetchNode: 
624         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
625         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
626         break;
627       case FetchReply:
628         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
629         if (RtsFlags.GranFlags.DoBulkFetching)
630           // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
631           belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
632         else
633           event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
634         break;
635       case GlobalBlock:
636         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
637         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
638         break;
639       case UnblockThread:
640         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
641         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
642         break;
643       default:
644         barf("markEventQueue: trying to mark unknown event @ %p", event);
645     }}
646   IF_DEBUG(gc,
647            belch("GC: markEventQueue: %d events in queue", len));
648 }
649
650 /*
651   Prune all ContinueThread events related to tso or node in the eventq.
652   Currently used if a thread leaves STG land with ThreadBlocked status,
653   i.e. it blocked on a closure and has been put on its blocking queue.  It
654   will be reawakended via a call to awakenBlockedQueue. Until then no
655   event effecting this tso should appear in the eventq.  A bit of a hack,
656   because ideally we shouldn't generate such spurious ContinueThread events
657   in the first place.  
658 */
659 //@cindex prune_eventq 
660 void 
661 prune_eventq(tso, node) 
662 StgTSO *tso; 
663 StgClosure *node; 
664 { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
665
666   /* node unused for now */ 
667   ASSERT(node==NULL); 
668   /* tso must be valid, then */
669   ASSERT(tso!=END_TSO_QUEUE);
670   while (event != NULL) {
671     if (event->evttype==ContinueThread && 
672         (event->tso==tso)) {
673       IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
674                     belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
675                           event->tso->id, event->tso, event->proc, event->time, event));
676       if (prev==(rtsEventQ)NULL) { // beginning of eventq
677         EventHd = event->next;
678         free(event); 
679         event = EventHd;
680       } else {
681         prev->next = event->next;
682         free(event); 
683         event = prev->next;
684       }
685     } else { // no pruning necessary; go to next event
686       prev = event;
687       event = event->next;
688     }
689   }
690 }
691
692 //@cindex print_event
693 void
694 print_event(event)
695 rtsEvent *event;
696 {
697   char str_tso[16], str_node[16];
698   StgThreadID tso_id;
699
700   if (event->tso==END_TSO_QUEUE) {
701     strcpy(str_tso, "______");
702     tso_id = 0;
703   } else { 
704     sprintf(str_tso, "%p", event->tso);
705     tso_id = (event->tso==NULL) ? 0 : event->tso->id;
706   }
707   if  (event->node==(StgClosure*)NULL) {
708     strcpy(str_node, "______");
709   } else {
710     sprintf(str_node, "%p", event->node);
711   }
712   // HWL: shouldn't be necessary; ToDo: nuke
713   //str_tso[6]='\0';
714   //str_node[6]='\0';
715
716   if (event==NULL)
717     fprintf(stderr,"Evt: NIL\n");
718   else
719     fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
720               event_names[event->evttype], event->evttype,
721               event->proc, event->creator, event->time, 
722               tso_id, str_tso, str_node
723               /*, event->spark, event->next */ );
724
725 }
726
727 //@cindex print_eventq
728 void
729 print_eventq(hd)
730 rtsEvent *hd;
731 {
732   rtsEvent *x;
733
734   fprintf(stderr,"Event Queue with root at %p:\n", hd);
735   for (x=hd; x!=NULL; x=x->next) {
736     print_event(x);
737   }
738 }
739
740 /* 
741    Spark queue functions are now all  in Sparks.c!!
742 */
743 //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
744 //@subsection Scheduling functions
745
746 /* 
747    These functions are variants of thread initialisation and therefore
748    related to initThread and friends in Schedule.c. However, they are
749    specific to a GranSim setup in storing more info in the TSO's statistics
750    buffer and sorting the thread queues etc.  
751 */
752
753 /*
754    A large portion of startThread deals with maintaining a sorted thread
755    queue, which is needed for the Priority Sparking option. Without that
756    complication the code boils down to FIFO handling.  
757 */
758 //@cindex insertThread
759 void
760 insertThread(tso, proc)
761 StgTSO*     tso;
762 PEs         proc;
763 {
764   StgTSO *prev = NULL, *next = NULL;
765   nat count = 0;
766   rtsBool found = rtsFalse;
767
768   ASSERT(CurrentProc==proc);
769   ASSERT(!is_on_queue(tso,proc));
770   /* Idle proc: put the thread on the run queue
771      same for pri spark and basic version */
772   if (run_queue_hds[proc] == END_TSO_QUEUE)
773     {
774       /* too strong!
775       ASSERT((CurrentProc==MainProc &&   
776               CurrentTime[MainProc]==0 &&
777               procStatus[MainProc]==Idle) ||
778              procStatus[proc]==Starting);
779       */
780       run_queue_hds[proc] = run_queue_tls[proc] = tso;
781
782       CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
783
784       /* new_event of ContinueThread has been moved to do_the_startthread */
785
786       /* too strong!
787       ASSERT(procStatus[proc]==Idle || 
788              procStatus[proc]==Fishing || 
789              procStatus[proc]==Starting);
790       procStatus[proc] = Busy;
791       */
792       return;
793     }
794
795   if (RtsFlags.GranFlags.Light)
796     GranSimLight_insertThread(tso, proc);
797
798   /* Only for Pri Scheduling: find place where to insert tso into queue */
799   if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
800     /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
801     for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
802          (next != END_TSO_QUEUE) && 
803          !(found = tso->gran.pri >= next->gran.pri);
804          prev = next, next = next->link, count++) 
805       { 
806        ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
807               (prev==(StgTSO*)NULL || prev->link==next));
808       }
809
810   ASSERT(!found || next != END_TSO_QUEUE);
811   ASSERT(procStatus[proc]!=Idle);
812  
813   if (found) {
814      /* found can only be rtsTrue if pri scheduling enabled */ 
815      ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
816      if (RtsFlags.GranFlags.GranSimStats.Global) 
817        globalGranStats.non_end_add_threads++;
818      /* Add tso to ThreadQueue between prev and next */
819      tso->link = next;
820      if ( next == (StgTSO*)END_TSO_QUEUE ) {
821        run_queue_tl = tso;
822      } else {
823        /* no back link for TSO chain */
824      }
825      
826      if ( prev == (StgTSO*)END_TSO_QUEUE ) {
827        /* Never add TSO as first elem of thread queue; the first */
828        /* element should be the one that is currently running -- HWL */
829        IF_DEBUG(gran,
830                 belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
831                     tso, tso->gran.pri, run_queue_hd, proc,
832                     CurrentTime[proc]));
833      } else {
834       prev->link = tso;
835      }
836   } else { /* !found */ /* or not pri sparking! */
837     /* Add TSO to the end of the thread queue on that processor */
838     run_queue_tls[proc]->link = tso;
839     run_queue_tls[proc] = tso;
840   }
841   ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
842   CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
843                        RtsFlags.GranFlags.Costs.threadqueuetime;
844
845   /* ToDo: check if this is still needed -- HWL 
846   if (RtsFlags.GranFlags.DoThreadMigration)
847     ++SurplusThreads;
848
849   if (RtsFlags.GranFlags.GranSimStats.Full &&
850       !(( event_type == GR_START || event_type == GR_STARTQ) && 
851         RtsFlags.GranFlags.labelling) )
852     DumpRawGranEvent(proc, creator, event_type+1, tso, node, 
853                      tso->gran.sparkname, spark_queue_len(proc));
854   */
855
856 # if defined(GRAN_CHECK)
857   /* Check if thread queue is sorted. Only for testing, really!  HWL */
858   if ( RtsFlags.GranFlags.DoPriorityScheduling && 
859        (RtsFlags.GranFlags.Debug.sortedQ) ) {
860     rtsBool sorted = rtsTrue;
861     StgTSO *prev, *next;
862
863     if (run_queue_hds[proc]==END_TSO_QUEUE || 
864         run_queue_hds[proc]->link==END_TSO_QUEUE) {
865       /* just 1 elem => ok */
866     } else {
867       /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
868       for (prev = run_queue_hds[proc]->link, next = prev->link;
869            (next != END_TSO_QUEUE) ;
870            prev = next, next = prev->link) {
871         ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
872                (prev==(StgTSO*)NULL || prev->link==next));
873         sorted = sorted && 
874                  (prev->gran.pri >= next->gran.pri);
875       }
876     }
877     if (!sorted) {
878       fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
879               CurrentProc);
880       G_THREADQ(run_queue_hd,0x1);
881     }
882   }
883 # endif
884 }
885
886 /*
887   insertThread, which is only used for GranSim Light, is similar to
888   startThread in that it adds a TSO to a thread queue. However, it assumes
889   that the thread queue is sorted by local clocks and it inserts the TSO at
890   the right place in the queue. Don't create any event, just insert.  
891 */
892 //@cindex GranSimLight_insertThread
893 rtsBool
894 GranSimLight_insertThread(tso, proc)
895 StgTSO* tso;
896 PEs proc;
897 {
898   StgTSO *prev, *next;
899   nat count = 0;
900   rtsBool found = rtsFalse;
901
902   ASSERT(RtsFlags.GranFlags.Light);
903
904   /* In GrAnSim-Light we always have an idle `virtual' proc.
905      The semantics of the one-and-only thread queue is different here:
906      all threads in the queue are running (each on its own virtual processor);
907      the queue is only needed internally in the simulator to interleave the
908      reductions of the different processors.
909      The one-and-only thread queue is sorted by the local clocks of the TSOs.
910   */
911   ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
912   ASSERT(tso->link == END_TSO_QUEUE);
913
914   /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
915   if (RtsFlags.GranFlags.GranSimStats.Full &&
916       (RtsFlags.GranFlags.Debug.checkLight) && 
917       (run_queue_hd->link == END_TSO_QUEUE)) {
918     DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
919                      run_queue_hds[proc], (StgClosure*)NULL, 
920                      tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
921     // resched = rtsTrue;
922   }
923
924   /* this routine should only be used in a GrAnSim Light setup */
925   /* && CurrentProc must be 0 in GrAnSim Light setup */
926   ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
927
928   /* Idle proc; same for pri spark and basic version */
929   if (run_queue_hd==END_TSO_QUEUE)
930     {
931       run_queue_hd = run_queue_tl = tso;
932       /* MAKE_BUSY(CurrentProc); */
933       return rtsTrue;
934     }
935
936   for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
937        (next != END_TSO_QUEUE) && 
938        !(found = (tso->gran.clock < next->gran.clock));
939        prev = next, next = next->link, count++) 
940     { 
941        ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
942               (prev==(StgTSO*)NULL || prev->link==next));
943     }
944
945   /* found can only be rtsTrue if pri sparking enabled */ 
946   if (found) {
947      /* Add tso to ThreadQueue between prev and next */
948      tso->link = next;
949      if ( next == END_TSO_QUEUE ) {
950        run_queue_tls[proc] = tso;
951      } else {
952        /* no back link for TSO chain */
953      }
954      
955      if ( prev == END_TSO_QUEUE ) {
956        run_queue_hds[proc] = tso;
957      } else {
958        prev->link = tso;
959      }
960   } else { /* !found */ /* or not pri sparking! */
961     /* Add TSO to the end of the thread queue on that processor */
962     run_queue_tls[proc]->link = tso;
963     run_queue_tls[proc] = tso;
964   }
965
966   if ( prev == END_TSO_QUEUE ) {        /* new head of queue */
967     new_event(proc, proc, CurrentTime[proc],
968               ContinueThread,
969               tso, (StgClosure*)NULL, (rtsSpark*)NULL);
970   }
971   /*
972   if (RtsFlags.GranFlags.GranSimStats.Full && 
973       !(( event_type == GR_START || event_type == GR_STARTQ) && 
974         RtsFlags.GranFlags.labelling) )
975     DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
976                      tso->gran.sparkname, spark_queue_len(proc));
977   */
978   return rtsTrue;
979 }
980
981 /*
982   endThread is responsible for general clean-up after the thread tso has
983   finished. This includes emitting statistics into the profile etc.  
984 */
985 void
986 endThread(StgTSO *tso, PEs proc) 
987 {
988   ASSERT(procStatus[proc]==Busy);        // coming straight out of STG land
989   ASSERT(tso->what_next==ThreadComplete);
990   // ToDo: prune ContinueThreads for this TSO from event queue
991   DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
992
993   /* if this was the last thread on this PE then make it Idle */
994   if (run_queue_hds[proc]==END_TSO_QUEUE) {
995     procStatus[CurrentProc] = Idle;
996   }
997 }
998
999 //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
1000 //@subsection Thread Queue routines
1001
1002 /* 
1003    Check whether given tso resides on the run queue of the current processor.
1004    Only used for debugging.
1005 */
1006    
1007 //@cindex is_on_queue
1008 rtsBool
1009 is_on_queue (StgTSO *tso, PEs proc) 
1010 {
1011   StgTSO *t;
1012   rtsBool found;
1013
1014   for (t=run_queue_hds[proc], found=rtsFalse; 
1015        t!=END_TSO_QUEUE && !(found = t==tso);
1016        t=t->link)
1017     /* nothing */ ;
1018
1019   return found;
1020 }
1021
1022 /* This routine  is only  used for keeping   a statistics  of thread  queue
1023    lengths to evaluate the impact of priority scheduling. -- HWL 
1024    {spark_queue_len}vo' jInIHta'
1025 */
1026 //@cindex thread_queue_len
1027 nat
1028 thread_queue_len(PEs proc) 
1029 {
1030  StgTSO *prev, *next;
1031  nat len;
1032
1033  for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
1034       next != END_TSO_QUEUE; 
1035       len++, prev = next, next = prev->link)
1036    {}
1037
1038  return (len);
1039 }
1040
1041 //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
1042 //@subsection GranSim functions
1043
1044 /* -----------------------------------------------------------------  */
1045 /* The main event handling functions; called from Schedule.c (schedule) */
1046 /* -----------------------------------------------------------------  */
1047  
1048 //@cindex do_the_globalblock
1049
1050 void 
1051 do_the_globalblock(rtsEvent* event)
1052
1053   PEs proc          = event->proc;        /* proc that requested node */
1054   StgTSO *tso       = event->tso;         /* tso that requested node */
1055   StgClosure  *node = event->node;        /* requested, remote node */
1056
1057   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
1058   /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
1059   ASSERT(!RtsFlags.GranFlags.Light);
1060   /* GlobalBlock events only valid with GUM fetching */
1061   ASSERT(RtsFlags.GranFlags.DoBulkFetching);
1062
1063   IF_GRAN_DEBUG(bq, // globalBlock,
1064     if (IS_LOCAL_TO(PROCS(node),proc)) {
1065       belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
1066             tso->id, tso, node, proc);
1067     });
1068
1069   /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
1070   if ( blockFetch(tso,proc,node) != 0 )
1071     return;                     /* node has become local by now */
1072
1073 #if 0
1074  ToDo: check whether anything has to be done at all after blockFetch -- HWL
1075
1076   if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
1077     StgTSO* tso = run_queue_hds[proc];       /* awaken next thread */
1078     if (tso != (StgTSO*)NULL) {
1079       new_event(proc, proc, CurrentTime[proc],
1080                 ContinueThread,
1081                 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
1082       CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
1083       if (RtsFlags.GranFlags.GranSimStats.Full)
1084         DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
1085                          (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));  // ToDo: check sparkname and spar_queue_len
1086       procStatus[proc] = Busy;                  /* might have been fetching */
1087     } else {
1088       procStatus[proc] = Idle;                     /* no work on proc now */
1089     }
1090   } else {  /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
1091               /* other thread is already running */
1092               /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL 
1093               new_event(proc,proc,CurrentTime[proc],
1094                        CONTINUETHREAD,EVENT_TSO(event),
1095                        (RtsFlags.GranFlags.DoBulkFetching ? closure :
1096                        EVENT_NODE(event)),NULL);
1097               */
1098   }
1099 #endif
1100 }
1101
1102 //@cindex do_the_unblock
1103
1104 void 
1105 do_the_unblock(rtsEvent* event) 
1106 {
1107   PEs proc = event->proc,       /* proc that requested node */
1108       creator = event->creator; /* proc that requested node */
1109   StgTSO* tso = event->tso;     /* tso that requested node */
1110   StgClosure* node = event->node;  /* requested, remote node */
1111   
1112   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
1113   /* There should be no UNBLOCKs in GrAnSim Light setup */
1114   ASSERT(!RtsFlags.GranFlags.Light);
1115   /* UnblockThread means either FetchReply has arrived or
1116      a blocking queue has been awakened;
1117      ToDo: check with assertions
1118   ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
1119   */
1120   if (!RtsFlags.GranFlags.DoAsyncFetch) {  /* block-on-fetch */
1121     /* We count block-on-fetch as normal block time */    
1122     tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
1123     /* Dumping now done when processing the event
1124        No costs for contextswitch or thread queueing in this case 
1125        if (RtsFlags.GranFlags.GranSimStats.Full)
1126          DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso, 
1127                           (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
1128     */
1129     /* Maybe do this in FetchReply already 
1130     if (procStatus[proc]==Fetching)
1131       procStatus[proc] = Busy;
1132     */
1133     /*
1134     new_event(proc, proc, CurrentTime[proc],
1135               ContinueThread,
1136               tso, node, (rtsSpark*)NULL);
1137     */
1138   } else {
1139     /* Asynchr comm causes additional costs here: */
1140     /* Bring the TSO from the blocked queue into the threadq */
1141   }
1142   /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
1143   new_event(proc, proc, 
1144             CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
1145             ResumeThread,
1146             tso, node, (rtsSpark*)NULL);
1147 }
1148
1149 //@cindex do_the_fetchnode
1150
1151 void
1152 do_the_fetchnode(rtsEvent* event)
1153 {
1154   PEs proc = event->proc,       /* proc that holds the requested node */
1155       creator = event->creator; /* proc that requested node */
1156   StgTSO* tso = event->tso;
1157   StgClosure* node = event->node;  /* requested, remote node */
1158   rtsFetchReturnCode rc;
1159
1160   ASSERT(CurrentProc==proc);
1161   /* There should be no FETCHNODEs in GrAnSim Light setup */
1162   ASSERT(!RtsFlags.GranFlags.Light);
1163
1164   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
1165
1166   CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1167
1168   /* ToDo: check whether this is the right place for dumping the event */
1169   if (RtsFlags.GranFlags.GranSimStats.Full)
1170     DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
1171
1172   do {
1173     rc = handleFetchRequest(node, proc, creator, tso);
1174     if (rc == OutOfHeap) {                                   /* trigger GC */
1175 # if defined(GRAN_CHECK)  && defined(GRAN)
1176      if (RtsFlags.GcFlags.giveStats)
1177        fprintf(RtsFlags.GcFlags.statsFile,"*****   veQ boSwI'  PackNearbyGraph(node %p, tso %p (%d))\n",
1178                 node, tso, tso->id);
1179 # endif
1180      barf("//// do_the_fetchnode: out of heap after handleFetchRequest; ToDo: call GarbageCollect()");
1181      prepend_event(event);
1182      GarbageCollect(GetRoots, rtsFalse); 
1183      // HWL: ToDo: check whether a ContinueThread has to be issued
1184      // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
1185 # if 0 && defined(GRAN_CHECK)  && defined(GRAN)
1186      if (RtsFlags.GcFlags.giveStats) {
1187        fprintf(RtsFlags.GcFlags.statsFile,"*****      SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
1188                 Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED);  ???
1189        fprintf(stderr,"*****      No. of packets so far: %d (total size: %d)\n", 
1190                 globalGranStats.tot_packets, globalGranStats.tot_packet_size);
1191      }
1192 # endif 
1193      event = grab_event();
1194      // Hp -= PACK_HEAP_REQUIRED; // ???
1195
1196      /* GC knows that events are special and follows the pointer i.e. */
1197      /* events are valid even if they moved. An EXIT is triggered */
1198      /* if there is not enough heap after GC. */
1199     }
1200   } while (rc == OutOfHeap);
1201 }
1202
1203 //@cindex do_the_fetchreply
1204 void 
1205 do_the_fetchreply(rtsEvent* event)
1206 {
1207   PEs proc = event->proc,       /* proc that requested node */
1208       creator = event->creator; /* proc that holds the requested node */
1209   StgTSO* tso = event->tso;
1210   StgClosure* node = event->node;  /* requested, remote node */
1211   StgClosure* closure=(StgClosure*)NULL;
1212
1213   ASSERT(CurrentProc==proc);
1214   ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
1215
1216   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
1217   /* There should be no FETCHREPLYs in GrAnSim Light setup */
1218   ASSERT(!RtsFlags.GranFlags.Light);
1219
1220   /* assign message unpack costs *before* dumping the event */
1221   CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1222   
1223   /* ToDo: check whether this is the right place for dumping the event */
1224   if (RtsFlags.GranFlags.GranSimStats.Full)
1225     DumpRawGranEvent(proc, creator, GR_REPLY, tso, node, 
1226                       tso->gran.sparkname, spark_queue_len(proc));
1227
1228   /* THIS SHOULD NEVER HAPPEN 
1229      If tso is in the BQ of node this means that it actually entered the 
1230      remote closure, due to a missing GranSimFetch at the beginning of the 
1231      entry code; therefore, this is actually a faked fetch, triggered from 
1232      within GranSimBlock; 
1233      since tso is both in the EVQ and the BQ for node, we have to take it out 
1234      of the BQ first before we can handle the FetchReply;
1235      ToDo: special cases in awakenBlockedQueue, since the BQ magically moved.
1236   */
1237   if (tso->block_info.closure!=(StgClosure*)NULL) {
1238     IF_GRAN_DEBUG(bq,
1239                   belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
1240                         tso->id, tso, node));
1241     // unlink_from_bq(tso, node);
1242   }
1243     
1244   if (RtsFlags.GranFlags.DoBulkFetching) {      /* bulk (packet) fetching */
1245     rtsPackBuffer *buffer = (rtsPackBuffer*)node;
1246     nat size = buffer->size;
1247   
1248     /* NB: Fetch misses can't occur with GUM fetching, as */
1249     /* updatable closure are turned into RBHs and therefore locked */
1250     /* for other processors that try to grab them. */
1251   
1252     closure = UnpackGraph(buffer);
1253     CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
1254   } else  // incremental fetching
1255       /* Copy or  move node to CurrentProc */
1256       if (fetchNode(node, creator, proc)) {
1257         /* Fetch has failed i.e. node has been grabbed by another PE */
1258         PEs p = where_is(node);
1259         rtsTime fetchtime;
1260      
1261         if (RtsFlags.GranFlags.GranSimStats.Global)
1262           globalGranStats.fetch_misses++;
1263
1264         IF_GRAN_DEBUG(thunkStealing,
1265                  belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
1266                        CurrentTime[proc],node,p,creator));
1267
1268         CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
1269         
1270         /* Count fetch again !? */
1271         ++(tso->gran.fetchcount);
1272         tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1273         
1274         fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
1275                     RtsFlags.GranFlags.Costs.latency;
1276         
1277         /* Chase the grabbed node */
1278         new_event(p, proc, fetchtime,
1279                   FetchNode,
1280                   tso, node, (rtsSpark*)NULL);
1281
1282 # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
1283        IF_GRAN_DEBUG(blockOnFetch,
1284                      BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
1285         
1286        IF_GRAN_DEBUG(blockOnFetch_sanity,
1287                      tso->type |= FETCH_MASK_TSO;)
1288 # endif
1289
1290         CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
1291         
1292         return; /* NB: no REPLy has been processed; tso still sleeping */
1293     }
1294
1295     /* -- Qapla'! Fetch has been successful; node is here, now  */
1296     ++(event->tso->gran.fetchcount);
1297     event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1298
1299     /* this is now done at the beginning of this routine
1300     if (RtsFlags.GranFlags.GranSimStats.Full)
1301        DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
1302                         (RtsFlags.GranFlags.DoBulkFetching ? 
1303                                closure : 
1304                                event->node),
1305                         tso->gran.sparkname, spark_queue_len(proc));
1306     */
1307
1308     ASSERT(OutstandingFetches[proc] > 0);
1309     --OutstandingFetches[proc];
1310     new_event(proc, proc, CurrentTime[proc],
1311               ResumeThread,
1312               event->tso, (RtsFlags.GranFlags.DoBulkFetching ? 
1313                            closure : 
1314                            event->node),
1315               (rtsSpark*)NULL);
1316 }
1317
1318 //@cindex do_the_movethread
1319
1320 void
1321 do_the_movethread(rtsEvent* event) {
1322   PEs proc = event->proc,       /* proc that requested node */
1323       creator = event->creator; /* proc that holds the requested node */
1324   StgTSO* tso = event->tso;
1325
1326  IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
1327
1328  ASSERT(CurrentProc==proc);
1329  /* There should be no MOVETHREADs in GrAnSim Light setup */
1330  ASSERT(!RtsFlags.GranFlags.Light);
1331  /* MOVETHREAD events should never occur without -bM */
1332  ASSERT(RtsFlags.GranFlags.DoThreadMigration);
1333  /* Bitmask of moved thread should be 0 */
1334  ASSERT(PROCS(tso)==0);
1335  ASSERT(procStatus[proc] == Fishing ||
1336         RtsFlags.GranFlags.DoAsyncFetch);
1337  ASSERT(OutstandingFishes[proc]>0);
1338
1339  /* ToDo: exact costs for unpacking the whole TSO  */
1340  CurrentTime[proc] +=  5l * RtsFlags.GranFlags.Costs.munpacktime;
1341
1342  /* ToDo: check whether this is the right place for dumping the event */
1343  if (RtsFlags.GranFlags.GranSimStats.Full)
1344    DumpRawGranEvent(proc, creator, 
1345                     GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
1346
1347  // ToDo: check cost functions
1348  --OutstandingFishes[proc];
1349  SET_GRAN_HDR(tso, ThisPE);         // adjust the bitmask for the TSO
1350  insertThread(tso, proc);
1351
1352  if (procStatus[proc]==Fishing)
1353    procStatus[proc] = Idle;
1354
1355  if (RtsFlags.GranFlags.GranSimStats.Global)
1356    globalGranStats.tot_TSOs_migrated++;
1357 }
1358
1359 //@cindex do_the_movespark
1360
1361 void
1362 do_the_movespark(rtsEvent* event) {
1363  PEs proc = event->proc,       /* proc that requested spark */
1364      creator = event->creator; /* proc that holds the requested spark */
1365  StgTSO* tso = event->tso;
1366  rtsSparkQ spark = event->spark;
1367
1368  IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
1369
1370  ASSERT(CurrentProc==proc);
1371  ASSERT(spark!=NULL);
1372  ASSERT(procStatus[proc] == Fishing ||
1373         RtsFlags.GranFlags.DoAsyncFetch);
1374  ASSERT(OutstandingFishes[proc]>0); 
1375
1376  CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1377           
1378  /* record movement of spark only if spark profiling is turned on */
1379  if (RtsFlags.GranFlags.GranSimStats.Sparks)
1380     DumpRawGranEvent(proc, creator,
1381                      SP_ACQUIRED,
1382                      tso, spark->node, spark->name, spark_queue_len(proc));
1383
1384  /* global statistics */
1385  if ( RtsFlags.GranFlags.GranSimStats.Global &&
1386       !closure_SHOULD_SPARK(spark->node))
1387    globalGranStats.withered_sparks++;
1388    /* Not adding the spark to the spark queue would be the right */
1389    /* thing here, but it also would be cheating, as this info can't be */
1390    /* available in a real system. -- HWL */
1391
1392  --OutstandingFishes[proc];
1393
1394  add_to_spark_queue(spark);
1395
1396  IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
1397                print_sparkq_stats());
1398
1399  /* Should we treat stolen sparks specially? Currently, we don't. */
1400
1401  if (procStatus[proc]==Fishing)
1402    procStatus[proc] = Idle;
1403
1404  /* add_to_spark_queue will increase the time of the current proc. */
1405  /*
1406    If proc was fishing, it is Idle now with the new spark in its spark
1407    pool. This means that the next time handleIdlePEs is called, a local
1408    FindWork will be created on this PE to turn the spark into a thread. Of
1409    course another PE might steal the spark in the meantime (that's why we
1410    are using events rather than inlining all the operations in the first
1411    place). */
1412 }
1413
1414 /*
1415   In the Constellation class version of GranSim the semantics of StarThread
1416   events has changed. Now, StartThread has to perform 3 basic operations:
1417    - create a new thread (previously this was done in ActivateSpark);
1418    - insert the thread into the run queue of the current processor
1419    - generate a new event for actually running the new thread
1420   Note that the insertThread is called via createThread. 
1421 */
1422   
1423 //@cindex do_the_startthread
1424
1425 void
1426 do_the_startthread(rtsEvent *event)
1427 {
1428   PEs proc          = event->proc;        /* proc that requested node */
1429   StgTSO *tso       = event->tso;         /* tso that requested node */
1430   StgClosure  *node = event->node;        /* requested, remote node */
1431   rtsSpark *spark   = event->spark;
1432   GranEventType gr_evttype;
1433
1434   ASSERT(CurrentProc==proc);
1435   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1436   ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
1437   /* if this was called via StartThread: */
1438   ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
1439   // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
1440   /* if this was called via ResumeThread: */
1441   ASSERT(event->evttype!=ResumeThread || 
1442            RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc)); 
1443
1444   /* startThread may have been called from the main event handler upon
1445      finding either a ResumeThread or a StartThread event; set the
1446      gr_evttype (needed for writing to .gr file) accordingly */
1447   // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
1448
1449   if ( event->evttype == StartThread ) {
1450     GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? 
1451                                  GR_START : GR_STARTQ;
1452
1453     tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
1454     pushClosure(tso, node);
1455
1456     // ToDo: fwd info on local/global spark to thread -- HWL
1457     // tso->gran.exported =  spark->exported;
1458     // tso->gran.locked =   !spark->global;
1459     tso->gran.sparkname = spark->name;
1460
1461     ASSERT(CurrentProc==proc);
1462     if (RtsFlags.GranFlags.GranSimStats.Full)
1463       DumpGranEvent(gr_evttype,tso);
1464
1465     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
1466   } else { // event->evttype == ResumeThread
1467     GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? 
1468                                  GR_RESUME : GR_RESUMEQ;
1469
1470     insertThread(tso, proc);
1471
1472     ASSERT(CurrentProc==proc);
1473     if (RtsFlags.GranFlags.GranSimStats.Full)
1474       DumpGranEvent(gr_evttype,tso);
1475   }
1476
1477   ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
1478   procStatus[proc] = Busy;
1479   /* make sure that this thread is actually run */
1480   new_event(proc, proc, 
1481             CurrentTime[proc],
1482             ContinueThread,
1483             tso, node, (rtsSpark*)NULL);
1484   
1485   /* A wee bit of statistics gathering */
1486   if (RtsFlags.GranFlags.GranSimStats.Global) {
1487     globalGranStats.tot_add_threads++;
1488     globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
1489   }
1490
1491 }
1492
1493 //@cindex do_the_findwork
1494 void
1495 do_the_findwork(rtsEvent* event) 
1496 {
1497   PEs proc = event->proc,       /* proc to search for work */
1498       creator = event->creator; /* proc that requested work */
1499   rtsSparkQ spark = event->spark;
1500   /* ToDo: check that this size is safe -- HWL */
1501 #if 0
1502  ToDo: check available heap
1503
1504   nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
1505                  // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
1506 #endif
1507
1508   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
1509
1510   /* If GUM style fishing is enabled, the contents of the spark field says
1511      what to steal (spark(1) or thread(2)); */
1512   ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
1513
1514   /* Make sure that we have enough heap for creating a new
1515      thread. This is a conservative estimate of the required heap.
1516      This eliminates special checks for GC around NewThread within
1517      ActivateSpark.                                                 */
1518
1519 #if 0
1520  ToDo: check available heap
1521
1522   if (Hp + req_heap > HpLim ) {
1523     IF_DEBUG(gc, 
1524              belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
1525       GarbageCollect(GetRoots);
1526       // ReallyPerformThreadGC(req_heap, rtsFalse);   old -- HWL
1527       Hp -= req_heap;
1528       if (procStatus[CurrentProc]==Sparking) 
1529         procStatus[CurrentProc]=Idle;
1530       return;
1531   }
1532 #endif
1533   
1534   if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1535        RtsFlags.GranFlags.Fishing ||
1536        ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
1537         (RtsFlags.GranFlags.FetchStrategy >= 2 || 
1538          OutstandingFetches[proc] == 0)) ) 
1539    {
1540     rtsBool found;
1541     rtsSparkQ  prev, spark;
1542     
1543     /* ToDo: check */
1544     ASSERT(procStatus[proc]==Sparking ||
1545            RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1546            RtsFlags.GranFlags.Fishing);
1547     
1548     /* SImmoHwI' yInej! Search spark queue! */
1549     /* gimme_spark (event, &found, &spark); */
1550     findLocalSpark(event, &found, &spark);
1551
1552     if (!found) { /* pagh vumwI' */
1553       /*
1554         If no spark has been found this can mean 2 things:
1555          1/ The FindWork was a fish (i.e. a message sent by another PE) and 
1556             the spark pool of the receiver is empty
1557             --> the fish has to be forwarded to another PE
1558          2/ The FindWork was local to this PE (i.e. no communication; in this
1559             case creator==proc) and the spark pool of the PE is not empty 
1560             contains only sparks of closures that should not be sparked 
1561             (note: if the spark pool were empty, handleIdlePEs wouldn't have 
1562             generated a FindWork in the first place)
1563             --> the PE has to be made idle to trigger stealing sparks the next
1564                 time handleIdlePEs is performed
1565       */ 
1566
1567       ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
1568       if (creator==proc) {
1569         /* local FindWork */
1570         if (procStatus[proc]==Busy) {
1571           belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
1572                 proc, CurrentTime[proc]);
1573           procStatus[proc] = Idle;
1574         }
1575       } else {
1576         /* global FindWork i.e. a Fish */
1577         ASSERT(RtsFlags.GranFlags.Fishing);
1578         /* actually this generates another request from the originating PE */
1579         ASSERT(OutstandingFishes[creator]>0);
1580         OutstandingFishes[creator]--;
1581         /* ToDo: assign costs for sending fish to proc not to creator */
1582         stealSpark(creator); /* might steal from same PE; ToDo: fix */
1583         ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
1584         /* any assertions on state of proc possible here? */
1585       }
1586     } else {
1587       /* DaH chu' Qu' yIchen! Now create new work! */ 
1588       IF_GRAN_DEBUG(findWork,
1589                     belch("+- munching spark %p; creating thread for node %p",
1590                           spark, spark->node));
1591       activateSpark (event, spark);
1592       ASSERT(spark != (rtsSpark*)NULL);
1593       spark = delete_from_sparkq (spark, proc, rtsTrue);
1594     }
1595
1596     IF_GRAN_DEBUG(findWork,
1597                   belch("+- Contents of spark queues at the end of FindWork @ %lx",
1598                         CurrentTime[proc]); 
1599                   print_sparkq_stats());
1600
1601     /* ToDo: check ; not valid if GC occurs in ActivateSpark */
1602     ASSERT(!found ||
1603             /* forward fish  or */
1604             (proc!=creator ||
1605             /* local spark  or */
1606             (proc==creator && procStatus[proc]==Starting)) || 
1607            //(!found && procStatus[proc]==Idle) ||
1608            RtsFlags.GranFlags.DoAlwaysCreateThreads); 
1609    } else {
1610     IF_GRAN_DEBUG(findWork,
1611                   belch("+- RTS refuses to findWork on PE %d @ %lx",
1612                         proc, CurrentTime[proc]);
1613                   belch("  procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d", 
1614                         proc, proc_status_names[procStatus[proc]],
1615                         RtsFlags.GranFlags.FetchStrategy, 
1616                         proc, OutstandingFetches[proc]));
1617    }  
1618 }
1619  
1620 //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
1621 //@subsection GranSimLight routines
1622
1623 /* 
1624    This code is called from the central scheduler after having rgabbed a
1625    new event and is only needed for GranSim-Light. It mainly adjusts the
1626    ActiveTSO so that all costs that have to be assigned from within the
1627    scheduler are assigned to the right TSO. The choice of ActiveTSO depends
1628    on the type of event that has been found.  
1629 */
1630
1631 void
1632 GranSimLight_enter_system(event, ActiveTSOp)
1633 rtsEvent *event;
1634 StgTSO **ActiveTSOp;
1635 {
1636   StgTSO *ActiveTSO = *ActiveTSOp;
1637
1638   ASSERT (RtsFlags.GranFlags.Light);
1639   
1640   /* Restore local clock of the virtual processor attached to CurrentTSO.
1641      All costs will be associated to the `virt. proc' on which the tso
1642      is living. */
1643   if (ActiveTSO != NULL) {                     /* already in system area */
1644     ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1645     if (RtsFlags.GranFlags.DoFairSchedule)
1646       {
1647         if (RtsFlags.GranFlags.GranSimStats.Full &&
1648             RtsFlags.GranFlags.Debug.checkLight)
1649           DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1650       }
1651   }
1652   switch (event->evttype)
1653     { 
1654     case ContinueThread: 
1655     case FindWork:       /* inaccurate this way */
1656       ActiveTSO = run_queue_hd;
1657       break;
1658     case ResumeThread:   
1659     case StartThread:
1660     case MoveSpark:      /* has tso of virt proc in tso field of event */
1661       ActiveTSO = event->tso;
1662       break;
1663     default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
1664                   event_names[event->evttype],event->evttype);
1665     }
1666   CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
1667   if (RtsFlags.GranFlags.DoFairSchedule) {
1668       if (RtsFlags.GranFlags.GranSimStats.Full &&
1669           RtsFlags.GranFlags.Debug.checkLight)
1670         DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
1671   }
1672 }
1673
1674 void
1675 GranSimLight_leave_system(event, ActiveTSOp)
1676 rtsEvent *event;
1677 StgTSO **ActiveTSOp;
1678 {
1679   StgTSO *ActiveTSO = *ActiveTSOp;
1680
1681   ASSERT(RtsFlags.GranFlags.Light);
1682
1683   /* Save time of `virt. proc' which was active since last getevent and
1684      restore time of `virt. proc' where CurrentTSO is living on. */
1685   if(RtsFlags.GranFlags.DoFairSchedule) {
1686     if (RtsFlags.GranFlags.GranSimStats.Full &&
1687         RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
1688       DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1689   }
1690   ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1691   ActiveTSO = (StgTSO*)NULL;
1692   CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
1693   if (RtsFlags.GranFlags.DoFairSchedule /* &&  resched */ ) {
1694     // resched = rtsFalse;
1695     if (RtsFlags.GranFlags.GranSimStats.Full &&
1696         RtsFlags.GranFlags.Debug.checkLight)
1697       DumpGranEvent(GR_SCHEDULE,run_queue_hd);
1698   }
1699   /* 
1700      if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
1701      (TimeOfNextEvent == 0 ||
1702      TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
1703      new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
1704      CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
1705      TimeOfNextEvent = get_time_of_next_event();
1706      }
1707   */
1708 }
1709
1710 //@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
1711 //@subsection Code for Fetching Nodes
1712
1713 /*
1714    The following GrAnSim routines simulate the fetching of nodes from a
1715    remote processor. We use a 1 word bitmask to indicate on which processor
1716    a node is lying. Thus, moving or copying a node from one processor to
1717    another just requires an appropriate change in this bitmask (using
1718    @SET_GA@).  Additionally, the clocks have to be updated.
1719
1720    A special case arises when the node that is needed by processor A has
1721    been moved from a processor B to a processor C between sending out a
1722    @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
1723    be forwarded to C. This is simulated by issuing another FetchNode event
1724    on processor C with A as creator.
1725 */
1726  
1727 /* ngoqvam che' {GrAnSim}! */
1728
1729 /* Fetch node "node" to processor "p" */
1730
1731 //@cindex fetchNode
1732
1733 rtsFetchReturnCode
1734 fetchNode(node,from,to)
1735 StgClosure* node;
1736 PEs from, to;
1737 {
1738   /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be 
1739      entered! Instead, UnpackGraph is used in ReSchedule */
1740   StgClosure* closure;
1741
1742   ASSERT(to==CurrentProc);
1743   /* Should never be entered  in GrAnSim Light setup */
1744   ASSERT(!RtsFlags.GranFlags.Light);
1745   /* fetchNode should never be entered with DoBulkFetching */
1746   ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
1747
1748   /* Now fetch the node */
1749   if (!IS_LOCAL_TO(PROCS(node),from) &&
1750       !IS_LOCAL_TO(PROCS(node),to) ) 
1751     return NodeHasMoved;
1752   
1753   if (closure_HNF(node))                /* node already in head normal form? */
1754     node->header.gran.procs |= PE_NUMBER(to);           /* Copy node */
1755   else
1756     node->header.gran.procs = PE_NUMBER(to);            /* Move node */
1757
1758   return Ok;
1759 }
1760
1761 /* 
1762    Process a fetch request. 
1763    
1764    Cost of sending a packet of size n = C + P*n
1765    where C = packet construction constant, 
1766          P = cost of packing one word into a packet
1767    [Should also account for multiple packets].
1768 */
1769
1770 //@cindex handleFetchRequest
1771
1772 rtsFetchReturnCode
1773 handleFetchRequest(node,to,from,tso)
1774 StgClosure* node;   // the node which is requested
1775 PEs to, from;       // fetch request: from -> to
1776 StgTSO* tso;        // the tso which needs the node
1777 {
1778   ASSERT(!RtsFlags.GranFlags.Light);
1779   /* ToDo: check assertion */
1780   ASSERT(OutstandingFetches[from]>0);
1781
1782   /* probably wrong place; */
1783   ASSERT(CurrentProc==to);
1784
1785   if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
1786     {                                 /* start tso */
1787       IF_GRAN_DEBUG(thunkStealing,
1788                     fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n", 
1789                             node, info_type(node), from));
1790
1791       if (RtsFlags.GranFlags.DoBulkFetching) {
1792         nat size;
1793         rtsPackBuffer *graph;
1794
1795         /* Create a 1-node-buffer and schedule a FETCHREPLY now */
1796         graph = PackOneNode(node, tso, &size); 
1797         new_event(from, to, CurrentTime[to],
1798                   FetchReply,
1799                   tso, (StgClosure *)graph, (rtsSpark*)NULL);
1800       } else {
1801         new_event(from, to, CurrentTime[to],
1802                   FetchReply,
1803                   tso, node, (rtsSpark*)NULL);
1804       }
1805       IF_GRAN_DEBUG(thunkStealing,
1806                     belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
1807       return (NodeIsLocal);
1808     }
1809   else if (IS_LOCAL_TO(PROCS(node), to) )   /* Is node still here? */
1810     {
1811       if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
1812         nat size;                              /* (code from GUM) */
1813         StgClosure* graph;
1814
1815         if (IS_BLACK_HOLE(node)) {   /* block on BH or RBH */
1816           new_event(from, to, CurrentTime[to],
1817                     GlobalBlock,
1818                     tso, node, (rtsSpark*)NULL);
1819           /* Note: blockFetch is done when handling GLOBALBLOCK event; 
1820                    make sure the TSO stays out of the run queue */
1821           /* When this thread is reawoken it does the usual: it tries to 
1822              enter the updated node and issues a fetch if it's remote.
1823              It has forgotten that it has sent a fetch already (i.e. a
1824              FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
1825           --OutstandingFetches[from];
1826
1827           IF_GRAN_DEBUG(thunkStealing,
1828                         belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ", 
1829                               node, to, from));
1830           if (RtsFlags.GranFlags.GranSimStats.Global) {
1831             globalGranStats.tot_FMBQs++;
1832           }
1833           return (NodeIsBH);
1834         }
1835
1836         /* The tso requesting the node is blocked and cannot be on a run queue */
1837         ASSERT(!is_on_queue(tso, from));
1838         
1839         // ToDo: check whether graph is ever used as an rtsPackBuffer!!
1840         if ((graph = (StgClosure *)PackNearbyGraph(node, tso, &size, 0)) == NULL) 
1841           return (OutOfHeap);  /* out of heap */
1842
1843         /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1844         /* Send a reply to the originator */
1845         /* ToDo: Replace that by software costs for doing graph packing! */
1846         CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
1847
1848         new_event(from, to,
1849                   CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1850                   FetchReply,
1851                   tso, (StgClosure *)graph, (rtsSpark*)NULL);
1852         
1853         CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1854         return (Ok);
1855       } else {                   /* incremental (single closure) fetching */
1856         /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1857         /* Send a reply to the originator */
1858         CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1859
1860         new_event(from, to,
1861                   CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1862                   FetchReply,
1863                   tso, node, (rtsSpark*)NULL);
1864       
1865         CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1866         return (Ok);
1867       }
1868     }
1869   else       /* Qu'vatlh! node has been grabbed by another proc => forward */
1870     {    
1871       PEs node_loc = where_is(node);
1872       rtsTime fetchtime;
1873
1874       IF_GRAN_DEBUG(thunkStealing,
1875                     belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
1876                           node,node_loc,to,from,CurrentTime[to]));
1877       if (RtsFlags.GranFlags.GranSimStats.Global) {
1878         globalGranStats.fetch_misses++;
1879       }
1880
1881       /* Prepare FORWARD message to proc p_new */
1882       CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1883       
1884       fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
1885                   RtsFlags.GranFlags.Costs.latency;
1886           
1887       new_event(node_loc, from, fetchtime,
1888                 FetchNode,
1889                 tso, node, (rtsSpark*)NULL);
1890
1891       CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1892
1893       return (NodeHasMoved);
1894     }
1895 }
1896
1897 /*
1898    blockFetch blocks a BlockedFetch node on some kind of black hole.
1899
1900    Taken from gum/HLComms.lc.   [find a  better  place for that ?] --  HWL  
1901
1902    {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
1903    create @FMBQ@'s (FetchMe blocking queues) to cope with global
1904    blocking. Instead, non-local TSO are put into the BQ in the same way as
1905    local TSOs. However, we have to check if a TSO is local or global in
1906    order to account for the latencies involved and for keeping track of the
1907    number of fetches that are really going on.  
1908 */
1909
1910 //@cindex blockFetch
1911
1912 rtsFetchReturnCode
1913 blockFetch(tso, proc, bh)
1914 StgTSO* tso;                        /* TSO which gets blocked */
1915 PEs proc;                           /* PE where that tso was running */
1916 StgClosure* bh;                     /* closure to block on (BH, RBH, BQ) */
1917 {
1918   StgInfoTable *info;
1919
1920   IF_GRAN_DEBUG(bq,
1921                 fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n", 
1922                 tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
1923
1924     if (!IS_BLACK_HOLE(bh)) {                      /* catches BHs and RBHs */
1925       IF_GRAN_DEBUG(bq,
1926                     fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n", 
1927                             bh, info_type(bh), tso, tso->id, proc));
1928
1929       /* No BH anymore => immediately unblock tso */
1930       new_event(proc, proc, CurrentTime[proc],
1931                 UnblockThread,
1932                 tso, bh, (rtsSpark*)NULL);
1933
1934       /* Is this always a REPLY to a FETCH in the profile ? */
1935       if (RtsFlags.GranFlags.GranSimStats.Full)
1936         DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
1937       return (NodeIsNoBH);
1938     }
1939
1940     /* DaH {BQ}Daq Qu' Suq 'e' wISov!
1941        Now we know that we have to put the tso into the BQ.
1942        2 cases: If block-on-fetch, tso is at head of threadq => 
1943                 => take it out of threadq and into BQ
1944                 If reschedule-on-fetch, tso is only pointed to be event
1945                 => just put it into BQ
1946
1947     ngoq ngo'!!
1948     if (!RtsFlags.GranFlags.DoAsyncFetch) {
1949       GranSimBlock(tso, proc, bh);
1950     } else {
1951       if (RtsFlags.GranFlags.GranSimStats.Full)
1952         DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
1953       ++(tso->gran.blockcount);
1954       tso->gran.blockedat = CurrentTime[proc];
1955     }
1956     */
1957
1958     /* after scheduling the GlobalBlock event the TSO is not put into the
1959        run queue again; it is only pointed to via the event we are
1960        processing now; in GranSim 4.xx there is no difference between
1961        synchr and asynchr comm here */
1962     ASSERT(!is_on_queue(tso, proc));
1963     ASSERT(tso->link == END_TSO_QUEUE);
1964
1965     GranSimBlock(tso, proc, bh);  /* GranSim statistics gathering */
1966
1967     /* Now, put tso into BQ (similar to blocking entry codes) */
1968     info = get_itbl(bh);
1969     switch (info -> type) {
1970       case RBH:
1971       case BLACKHOLE:
1972       case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1973       case SE_BLACKHOLE:   // ToDo: check whether this is a possibly ITBL here
1974       case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
1975         /* basically an inlined version of BLACKHOLE_entry -- HWL */
1976         /* Change the BLACKHOLE into a BLACKHOLE_BQ */
1977         ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1978         /* Put ourselves on the blocking queue for this black hole */
1979         // tso->link=END_TSO_QUEUE;   not necessary; see assertion above
1980         ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1981         tso->block_info.closure = bh;
1982         recordMutable((StgMutClosure *)bh);
1983         break;
1984
1985     case BLACKHOLE_BQ:
1986         /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
1987         tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue); 
1988         ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1989         recordMutable((StgMutClosure *)bh);
1990
1991 # if 0 && defined(GC_MUT_REQUIRED)
1992         ToDo: check whether recordMutable is necessary -- HWL
1993         /*
1994          * If we modify a black hole in the old generation, we have to make 
1995          * sure it goes on the mutables list
1996          */
1997
1998         if (bh <= StorageMgrInfo.OldLim) {
1999             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
2000             StorageMgrInfo.OldMutables = bh;
2001         } else
2002             MUT_LINK(bh) = MUT_NOT_LINKED;
2003 # endif
2004         break;
2005
2006     case FETCH_ME_BQ:
2007         barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
2008              bh, tso, tso->id);
2009
2010     default:
2011         {
2012           G_PRINT_NODE(bh);
2013           barf("Qagh: thought %p was a black hole (IP %p (%s))",
2014                   bh, info, info_type(bh));
2015         }
2016       }
2017     return (Ok);
2018 }
2019
2020
2021 //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
2022 //@subsection Idle PEs
2023
2024 /*
2025    Export work to idle PEs. This function is called from @ReSchedule@
2026    before dispatching on the current event. @HandleIdlePEs@ iterates over
2027    all PEs, trying to get work for idle PEs. Note, that this is a
2028    simplification compared to GUM's fishing model. We try to compensate for
2029    that by making the cost for stealing work dependent on the number of
2030    idle processors and thereby on the probability with which a randomly
2031    sent fish would find work.  
2032 */
2033
2034 //@cindex handleIdlePEs
2035
2036 void
2037 handleIdlePEs(void)
2038 {
2039   PEs p;
2040
2041   IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
2042
2043   /* Should never be entered in GrAnSim Light setup */
2044   ASSERT(!RtsFlags.GranFlags.Light);
2045
2046   /* Could check whether there are idle PEs if it's a cheap check */
2047   for (p = 0; p < RtsFlags.GranFlags.proc; p++) 
2048     if (procStatus[p]==Idle)  /*  && IS_SPARKING(p) && IS_STARTING(p) */
2049       /* First look for local work i.e. examine local spark pool! */
2050       if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
2051         new_event(p, p, CurrentTime[p],
2052                   FindWork,
2053                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2054         procStatus[p] = Sparking;
2055       } else if ((RtsFlags.GranFlags.maxFishes==0 ||
2056                   OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
2057
2058         /* If no local work then try to get remote work! 
2059            Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
2060         if (RtsFlags.GranFlags.DoStealThreadsFirst && 
2061             (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
2062           {
2063             if (SurplusThreads > 0l)                    /* Steal a thread */
2064               stealThread(p);
2065           
2066             if (procStatus[p]!=Idle)
2067               break;
2068           }
2069         
2070         if (SparksAvail > 0 && 
2071             (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
2072           stealSpark(p);
2073         
2074         if (SurplusThreads > 0 && 
2075             (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
2076           stealThread(p);
2077       }
2078 }
2079
2080 /*
2081    Steal a spark and schedule moving it to proc. We want to look at PEs in
2082    clock order -- most retarded first.  Currently sparks are only stolen
2083    from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
2084    this should be changed to first steal from the former then from the
2085    latter.
2086
2087    We model a sort of fishing mechanism by counting the number of sparks
2088    and threads we are currently stealing.  */
2089
2090 /* 
2091    Return a random nat value in the intervall [from, to) 
2092 */
2093 static nat 
2094 natRandom(from, to)
2095 nat from, to;
2096 {
2097   nat r, d;
2098
2099   ASSERT(from<=to);
2100   d = to - from;
2101   /* random returns a value in [0, RAND_MAX] */
2102   r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
2103   r = (r==to) ? from : r;
2104   ASSERT(from<=r && (r<to || from==to));
2105   return r;  
2106 }
2107
2108 /* 
2109    Find any PE other than proc. Used for GUM style fishing only.
2110 */
2111 static PEs 
2112 findRandomPE (proc)
2113 PEs proc;
2114 {
2115   nat p;
2116
2117   ASSERT(RtsFlags.GranFlags.Fishing);
2118   if (RtsFlags.GranFlags.RandomSteal) {
2119     p = natRandom(0,RtsFlags.GranFlags.proc);  /* full range of PEs */
2120   } else {
2121     p = 0;
2122   }
2123   IF_GRAN_DEBUG(randomSteal,
2124                 belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
2125                       p, proc));
2126     
2127   return (PEs)p;
2128 }
2129
2130 /*
2131   Magic code for stealing sparks/threads makes use of global knowledge on
2132   spark queues.  
2133 */
2134 static void
2135 sortPEsByTime (proc, pes_by_time, firstp, np) 
2136 PEs proc;
2137 PEs *pes_by_time;
2138 nat *firstp, *np;
2139 {
2140   PEs p, temp, n, i, j;
2141   nat first, upb, r=0, q=0;
2142
2143   ASSERT(!RtsFlags.GranFlags.Fishing);
2144
2145 #if 0  
2146   upb = RtsFlags.GranFlags.proc;            /* full range of PEs */
2147
2148   if (RtsFlags.GranFlags.RandomSteal) {
2149     r = natRandom(0,RtsFlags.GranFlags.proc);  /* full range of PEs */
2150   } else {
2151     r = 0;
2152   }
2153 #endif
2154
2155   /* pes_by_time shall contain processors from which we may steal sparks */ 
2156   for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
2157     if ((proc != p) &&                       // not the current proc
2158         (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
2159         (CurrentTime[p] <= CurrentTime[CurrentProc]))
2160       pes_by_time[n++] = p;
2161
2162   /* sort pes_by_time */
2163   for(i=0; i < n; ++i)
2164     for(j=i+1; j < n; ++j)
2165       if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
2166         rtsTime temp = pes_by_time[i];
2167         pes_by_time[i] = pes_by_time[j];
2168         pes_by_time[j] = temp;
2169       }
2170
2171   /* Choose random processor to steal spark from; first look at processors */
2172   /* that are earlier than the current one (i.e. proc) */
2173   for(first=0; 
2174       (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
2175       ++first)
2176     /* nothing */ ;
2177
2178   /* if the assertion below is true we can get rid of first */
2179   /* ASSERT(first==n); */
2180   /* ToDo: check if first is really needed; find cleaner solution */
2181
2182   *firstp = first;
2183   *np = n;
2184 }
2185
2186 /* 
2187    Steal a spark (piece of work) from any processor and bring it to proc.
2188 */
2189 //@cindex stealSpark
2190 static rtsBool 
2191 stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
2192
2193 /* 
2194    Steal a thread from any processor and bring it to proc i.e. thread migration
2195 */
2196 //@cindex stealThread
2197 static rtsBool 
2198 stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
2199
2200 /* 
2201    Steal a spark or a thread and schedule moving it to proc.
2202 */
2203 //@cindex stealSomething
2204 static rtsBool
2205 stealSomething(proc, steal_spark, steal_thread)
2206 PEs proc;                           // PE that needs work (stealer)
2207 rtsBool steal_spark, steal_thread;  // should a spark and/or thread be stolen
2208 {
2209   PEs p;
2210   rtsTime fish_arrival_time;
2211   rtsSpark *spark, *prev, *next;
2212   rtsBool stolen = rtsFalse;
2213
2214   ASSERT(steal_spark || steal_thread);
2215
2216   /* Should never be entered in GrAnSim Light setup */
2217   ASSERT(!RtsFlags.GranFlags.Light);
2218   ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
2219
2220   if (!RtsFlags.GranFlags.Fishing) {
2221     // ToDo: check if stealing threads is prefered over stealing sparks
2222     if (steal_spark) {
2223       if (stealSparkMagic(proc))
2224         return rtsTrue;
2225       else                             // no spark found
2226         if (steal_thread)
2227           return stealThreadMagic(proc);
2228         else                           // no thread found
2229           return rtsFalse;             
2230     } else {                           // ASSERT(steal_thread);
2231       return stealThreadMagic(proc);
2232     }
2233     barf("stealSomething: never reached");
2234   }
2235
2236   /* The rest of this function does GUM style fishing */
2237   
2238   p = findRandomPE(proc); /* find a random PE other than proc */
2239   
2240   /* Message packing costs for sending a Fish; qeq jabbI'ID */
2241   CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
2242   
2243   /* use another GranEvent for requesting a thread? */
2244   if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
2245     DumpRawGranEvent(p, proc, SP_REQUESTED,
2246                      (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
2247
2248   /* time of the fish arrival on the remote PE */
2249   fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
2250   
2251   /* Phps use an own Fish event for that? */
2252   /* The contents of the spark component is a HACK:
2253       1 means give me a spark;
2254       2 means give me a thread
2255       0 means give me nothing (this should never happen)
2256   */
2257   new_event(p, proc, fish_arrival_time,
2258             FindWork,
2259             (StgTSO*)NULL, (StgClosure*)NULL, 
2260             (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
2261   
2262   ++OutstandingFishes[proc];
2263   /* only with Async fetching? */
2264   if (procStatus[proc]==Idle)  
2265     procStatus[proc]=Fishing;
2266   
2267   /* time needed to clean up buffers etc after sending a message */
2268   CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
2269
2270   /* If GUM style fishing stealing always succeeds because it only consists
2271      of sending out a fish; of course, when the fish may return
2272      empty-handed! */
2273   return rtsTrue;
2274 }
2275
2276 /* 
2277    This version of stealing a spark makes use of the global info on all
2278    spark pools etc which is not available in a real parallel system.
2279    This could be extended to test e.g. the impact of perfect load information.
2280 */
2281 //@cindex stealSparkMagic
2282 static rtsBool
2283 stealSparkMagic(proc)
2284 PEs proc;
2285 {
2286   PEs p=0, i=0, j=0, n=0, first, upb;
2287   rtsSpark *spark=NULL, *next;
2288   PEs pes_by_time[MAX_PROC];
2289   rtsBool stolen = rtsFalse;
2290   rtsTime stealtime;
2291
2292   /* Should never be entered in GrAnSim Light setup */
2293   ASSERT(!RtsFlags.GranFlags.Light);
2294
2295   sortPEsByTime(proc, pes_by_time, &first, &n);
2296
2297   while (!stolen && n>0) {
2298     upb = (first==0) ? n : first;
2299     i = natRandom(0,upb);                /* choose a random eligible PE */
2300     p = pes_by_time[i];
2301
2302     IF_GRAN_DEBUG(randomSteal,
2303                   belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
2304                         p, proc));
2305       
2306     ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
2307
2308     /* Now go through rtsSparkQ and steal the first eligible spark */
2309     
2310     spark = pending_sparks_hds[p]; 
2311     while (!stolen && spark != (rtsSpark*)NULL)
2312       {
2313         /* NB: no prev pointer is needed here because all sparks that are not 
2314            chosen are pruned
2315         */
2316         if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
2317             spark->next==(rtsSpark*)NULL) 
2318           {
2319             /* Be social! Don't steal the only spark of an idle processor 
2320                not {spark} neH yInIH !! */
2321             break; /* next PE */
2322           } 
2323         else if (closure_SHOULD_SPARK(spark->node))
2324           {
2325             /* Don't Steal local sparks; 
2326                ToDo: optionally prefer local over global sparks
2327             if (!spark->global) {
2328               prev=spark;
2329               continue;                  next spark
2330             }
2331             */
2332             /* found a spark! */
2333
2334             /* Prepare message for sending spark */
2335             CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
2336
2337             if (RtsFlags.GranFlags.GranSimStats.Sparks)
2338               DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
2339                                (StgTSO*)NULL, spark->node,
2340                                spark->name, spark_queue_len(p));
2341
2342             stealtime = (CurrentTime[p] > CurrentTime[proc] ? 
2343                            CurrentTime[p] : 
2344                            CurrentTime[proc])
2345                         + sparkStealTime();
2346
2347             new_event(proc, p /* CurrentProc */, stealtime,
2348                       MoveSpark,
2349                       (StgTSO*)NULL, spark->node, spark);
2350             
2351             stolen = rtsTrue;
2352             ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
2353             if (procStatus[proc]==Idle)
2354               procStatus[proc] = Fishing;
2355             ++(spark->global);         /* record that this is a global spark */
2356             ASSERT(SparksAvail>0);
2357             --SparksAvail;            /* on-the-fly sparks are not available */
2358             next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
2359             CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
2360           }
2361         else   /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
2362           {
2363            IF_GRAN_DEBUG(checkSparkQ,
2364                          belch("^^ pruning spark %p (node %p) in stealSparkMagic",
2365                                spark, spark->node));
2366
2367             /* if the spark points to a node that should not be sparked,
2368                prune the spark queue at this point */
2369             if (RtsFlags.GranFlags.GranSimStats.Sparks)
2370               DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
2371                                (StgTSO*)NULL, spark->node,
2372                                spark->name, spark_queue_len(p));
2373             if (RtsFlags.GranFlags.GranSimStats.Global)
2374               globalGranStats.pruned_sparks++;
2375             
2376             ASSERT(SparksAvail>0);
2377             --SparksAvail;
2378             spark = delete_from_sparkq(spark, p, rtsTrue);
2379           }
2380         /* unlink spark (may have been freed!) from sparkq;
2381         if (prev == NULL) // spark was head of spark queue
2382           pending_sparks_hds[p] = spark->next;
2383         else  
2384           prev->next = spark->next;
2385         if (spark->next == NULL)
2386           pending_sparks_tls[p] = prev;
2387         else  
2388           next->prev = prev;
2389         */
2390       }                    /* while ...    iterating over sparkq */
2391
2392     /* ToDo: assert that PE p still has work left after stealing the spark */
2393
2394     if (!stolen && (n>0)) {  /* nothing stealable from proc p :( */
2395       ASSERT(pes_by_time[i]==p);
2396
2397       /* remove p from the list (at pos i) */
2398       for (j=i; j+1<n; j++)
2399         pes_by_time[j] = pes_by_time[j+1];
2400       n--;
2401       
2402       /* update index to first proc which is later (or equal) than proc */
2403       for ( ;
2404             (first>0) &&
2405               (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2406             first--)
2407         /* nothing */ ;
2408     } 
2409   }  /* while ... iterating over PEs in pes_by_time */
2410
2411   IF_GRAN_DEBUG(randomSteal,
2412                 if (stolen)
2413                   belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2414                        spark, spark->node, proc, p, 
2415                        SparksAvail, idlers());
2416                 else  
2417                   belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
2418                         proc, SparksAvail, idlers()));
2419
2420   if (RtsFlags.GranFlags.GranSimStats.Global &&
2421       stolen && (i!=0)) {                          /* only for statistics */
2422     globalGranStats.rs_sp_count++;
2423     globalGranStats.ntimes_total += n;
2424     globalGranStats.fl_total += first;
2425     globalGranStats.no_of_steals++;
2426   }
2427
2428   return stolen;
2429 }
2430
2431 /* 
2432    The old stealThread code, which makes use of global info and does not
2433    send out fishes.  
2434    NB: most of this is the same as in stealSparkMagic;
2435        only the pieces specific to processing thread queues are different; 
2436        long live polymorphism!  
2437 */
2438
2439 //@cindex stealThreadMagic
2440 static rtsBool
2441 stealThreadMagic(proc)
2442 PEs proc;
2443 {
2444   PEs p=0, i=0, j=0, n=0, first, upb;
2445   StgTSO *tso=END_TSO_QUEUE;
2446   PEs pes_by_time[MAX_PROC];
2447   rtsBool stolen = rtsFalse;
2448   rtsTime stealtime;
2449
2450   /* Should never be entered in GrAnSim Light setup */
2451   ASSERT(!RtsFlags.GranFlags.Light);
2452
2453   sortPEsByTime(proc, pes_by_time, &first, &n);
2454
2455   while (!stolen && n>0) {
2456     upb = (first==0) ? n : first;
2457     i = natRandom(0,upb);                /* choose a random eligible PE */
2458     p = pes_by_time[i];
2459
2460     IF_GRAN_DEBUG(randomSteal,
2461                   belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
2462                         p, proc));
2463       
2464     /* Steal the first exportable thread in the runnable queue but
2465        never steal the first in the queue for social reasons;
2466        not Qu' wa'DIch yInIH !!
2467     */
2468     /* Would be better to search through queue and have options which of
2469        the threads to pick when stealing */
2470     if (run_queue_hds[p] == END_TSO_QUEUE) {
2471       IF_GRAN_DEBUG(randomSteal,
2472                     belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)", 
2473                           p, proc));
2474     } else {
2475       tso = run_queue_hds[p]->link;  /* tso is *2nd* thread in thread queue */
2476       /* Found one */
2477       stolen = rtsTrue;
2478
2479       /* update links in queue */
2480       run_queue_hds[p]->link = tso->link;
2481       if (run_queue_tls[p] == tso)
2482         run_queue_tls[p] = run_queue_hds[p];
2483       
2484       /* ToDo: Turn magic constants into params */
2485       
2486       CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
2487       
2488       stealtime = (CurrentTime[p] > CurrentTime[proc] ? 
2489                    CurrentTime[p] : 
2490                    CurrentTime[proc])
2491         + sparkStealTime() 
2492         + 4l * RtsFlags.GranFlags.Costs.additional_latency
2493         + 5l * RtsFlags.GranFlags.Costs.munpacktime;
2494
2495       /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
2496       SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */); 
2497
2498       /* Move from one queue to another */
2499       new_event(proc, p, stealtime,
2500                 MoveThread,
2501                 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
2502
2503       /* MAKE_BUSY(proc);  not yet; only when thread is in threadq */
2504       ++OutstandingFishes[proc];
2505       if (procStatus[proc])
2506         procStatus[proc] = Fishing;
2507       --SurplusThreads;
2508
2509       if(RtsFlags.GranFlags.GranSimStats.Full)
2510         DumpRawGranEvent(p, proc, 
2511                          GR_STEALING, 
2512                          tso, (StgClosure*)NULL, (StgInt)0, 0);
2513       
2514       /* costs for tidying up buffer after having sent it */
2515       CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
2516     }
2517
2518     /* ToDo: assert that PE p still has work left after stealing the spark */
2519
2520     if (!stolen && (n>0)) {  /* nothing stealable from proc p :( */
2521       ASSERT(pes_by_time[i]==p);
2522
2523       /* remove p from the list (at pos i) */
2524       for (j=i; j+1<n; j++)
2525         pes_by_time[j] = pes_by_time[j+1];
2526       n--;
2527       
2528       /* update index to first proc which is later (or equal) than proc */
2529       for ( ;
2530             (first>0) &&
2531               (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2532             first--)
2533         /* nothing */ ;
2534     } 
2535   }  /* while ... iterating over PEs in pes_by_time */
2536
2537   IF_GRAN_DEBUG(randomSteal,
2538                 if (stolen)
2539                   belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2540                         tso->id, tso, proc, p,
2541                         SparksAvail, idlers());
2542                 else
2543                   belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
2544                         proc, SparksAvail, idlers()));
2545
2546   if (RtsFlags.GranFlags.GranSimStats.Global &&
2547       stolen && (i!=0)) { /* only for statistics */
2548     /* ToDo: more statistics on avg thread queue lenght etc */
2549     globalGranStats.rs_t_count++;
2550     globalGranStats.no_of_migrates++;
2551   }
2552
2553   return stolen;
2554 }
2555
2556 //@cindex sparkStealTime
2557 static rtsTime
2558 sparkStealTime(void)
2559 {
2560   double fishdelay, sparkdelay, latencydelay;
2561   fishdelay =  (double)RtsFlags.GranFlags.proc/2;
2562   sparkdelay = fishdelay - 
2563           ((fishdelay-1.0)/(double)(RtsFlags.GranFlags.proc-1))*((double)idlers());
2564   latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
2565
2566   return((rtsTime)latencydelay);
2567 }
2568
2569 //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
2570 //@subsection Routines directly called from Haskell world
2571 /* 
2572 The @GranSim...@ routines in here are directly called via macros from the
2573 threaded world. 
2574
2575 First some auxiliary routines.
2576 */
2577
2578 /* Take the current thread off the thread queue and thereby activate the 
2579    next thread. It's assumed that the next ReSchedule after this uses 
2580    NEW_THREAD as param. 
2581    This fct is called from GranSimBlock and GranSimFetch 
2582 */
2583
2584 //@cindex ActivateNextThread
2585
2586 void 
2587 ActivateNextThread (proc)
2588 PEs proc;
2589 {
2590   StgTSO *t;
2591   /*
2592     This routine is entered either via GranSimFetch or via GranSimBlock.
2593     It has to prepare the CurrentTSO for being blocked and update the
2594     run queue and other statistics on PE proc. The actual enqueuing to the 
2595     blocking queue (if coming from GranSimBlock) is done in the entry code 
2596     of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
2597   */
2598   /* ToDo: add assertions here!! */
2599   //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
2600
2601   // Only necessary if the running thread is at front of the queue
2602   // run_queue_hds[proc] = run_queue_hds[proc]->link;
2603   ASSERT(CurrentProc==proc);
2604   ASSERT(!is_on_queue(CurrentTSO,proc));
2605   if (run_queue_hds[proc]==END_TSO_QUEUE) {
2606     /* NB: this routine is only entered with asynchr comm (see assertion) */
2607     procStatus[proc] = Idle;
2608   } else {
2609     /* ToDo: check cost assignment */
2610     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
2611     if (RtsFlags.GranFlags.GranSimStats.Full && 
2612         (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight)) 
2613                                       /* right flag !?? ^^^ */ 
2614       DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
2615                        (StgClosure*)NULL, (StgInt)0, 0);
2616   }
2617 }
2618
2619 /* 
2620    The following GranSim fcts are stg-called from the threaded world.    
2621 */
2622
2623 /* Called from HP_CHK and friends (see StgMacros.h)  */
2624 //@cindex GranSimAllocate
2625 void 
2626 GranSimAllocate(n)
2627 StgInt n;
2628 {
2629   CurrentTSO->gran.allocs += n;
2630   ++(CurrentTSO->gran.basicblocks);
2631
2632   if (RtsFlags.GranFlags.GranSimStats.Heap) {
2633       DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
2634                        (StgClosure*)NULL, (StgInt)0, n);
2635   }
2636   
2637   CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
2638   CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
2639 }
2640
2641 /*
2642   Subtract the values added above, if a heap check fails and
2643   so has to be redone.
2644 */
2645 //@cindex GranSimUnallocate
2646 void 
2647 GranSimUnallocate(n)
2648 StgInt n;
2649 {
2650   CurrentTSO->gran.allocs -= n;
2651   --(CurrentTSO->gran.basicblocks);
2652   
2653   CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2654   CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2655 }
2656
2657 /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
2658 //@cindex GranSimExec
2659 void 
2660 GranSimExec(ariths,branches,loads,stores,floats)
2661 StgWord ariths,branches,loads,stores,floats;
2662 {
2663   StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths + 
2664             RtsFlags.GranFlags.Costs.branch_cost*branches + 
2665             RtsFlags.GranFlags.Costs.load_cost * loads +
2666             RtsFlags.GranFlags.Costs.store_cost*stores + 
2667             RtsFlags.GranFlags.Costs.float_cost*floats;
2668
2669   CurrentTSO->gran.exectime += cost;
2670   CurrentTime[CurrentProc] += cost;
2671 }
2672
2673 /* 
2674    Fetch the node if it isn't local
2675    -- result indicates whether fetch has been done.
2676
2677    This is GRIP-style single item fetching.
2678 */
2679
2680 //@cindex GranSimFetch
2681 StgInt 
2682 GranSimFetch(node /* , liveness_mask */ )
2683 StgClosure *node;
2684 /* StgInt liveness_mask; */
2685 {
2686   /* reset the return value (to be checked within STG land) */
2687   NeedToReSchedule = rtsFalse;   
2688
2689   if (RtsFlags.GranFlags.Light) {
2690      /* Always reschedule in GrAnSim-Light to prevent one TSO from
2691         running off too far 
2692      new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2693               ContinueThread,CurrentTSO,node,NULL);
2694      */
2695      return(0); 
2696   }
2697
2698   /* Faking an RBH closure:
2699      If the bitmask of the closure is 0 then this node is a fake RBH;
2700   */
2701   if (node->header.gran.procs == Nowhere) {
2702     IF_GRAN_DEBUG(bq,
2703                   belch("## Found fake RBH (node %p); delaying TSO %d (%p)", 
2704                         node, CurrentTSO->id, CurrentTSO));
2705                   
2706     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
2707               ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
2708
2709     /* Rescheduling (GranSim internal) is necessary */
2710     NeedToReSchedule = rtsTrue;
2711     
2712     return(1); 
2713   }
2714
2715   /* Note: once a node has been fetched, this test will be passed */
2716   if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
2717     {
2718       PEs p = where_is(node);
2719       rtsTime fetchtime;
2720       
2721       IF_GRAN_DEBUG(thunkStealing,
2722                     if (p==CurrentProc) 
2723                       belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
2724       
2725       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2726       /* NB: Fetch is counted on arrival (FetchReply) */
2727       
2728       fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
2729         RtsFlags.GranFlags.Costs.latency;
2730       
2731       new_event(p, CurrentProc, fetchtime,
2732                 FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
2733       
2734       if (fetchtime<TimeOfNextEvent)
2735         TimeOfNextEvent = fetchtime;
2736       
2737       /* About to block */
2738       CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
2739       
2740       ++OutstandingFetches[CurrentProc];
2741       
2742       if (RtsFlags.GranFlags.DoAsyncFetch) 
2743         /* if asynchr comm is turned on, activate the next thread in the q */
2744         ActivateNextThread(CurrentProc);
2745       else
2746         procStatus[CurrentProc] = Fetching;
2747
2748 #if 0 
2749       /* ToDo: nuke the entire if (anything special for fair schedule?) */
2750       if (RtsFlags.GranFlags.DoAsyncFetch) 
2751         {
2752           /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
2753           if(!RtsFlags.GranFlags.DoFairSchedule)
2754             {
2755               /* now done in do_the_fetchnode 
2756               if (RtsFlags.GranFlags.GranSimStats.Full)
2757                 DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
2758                                  node, (StgInt)0, 0);
2759               */                                
2760               ActivateNextThread(CurrentProc);
2761               
2762 # if 0 && defined(GRAN_CHECK)
2763               if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
2764                 if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
2765                   fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
2766                           CurrentTSO,CurrentTime[CurrentProc]);
2767                   stg_exit(EXIT_FAILURE);
2768                 } else {
2769                   TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
2770                 }
2771               }
2772 # endif
2773               CurrentTSO->link = END_TSO_QUEUE;
2774               /* CurrentTSO = END_TSO_QUEUE; */
2775               
2776               /* CurrentTSO is pointed to by the FetchNode event; it is
2777                  on no run queue any more */
2778           } else {  /* fair scheduling currently not supported -- HWL */
2779             barf("Asynchr communication is not yet compatible with fair scheduling\n");
2780           }
2781         } else {                /* !RtsFlags.GranFlags.DoAsyncFetch */
2782           procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
2783           /* now done in do_the_fetchnode 
2784           if (RtsFlags.GranFlags.GranSimStats.Full)
2785             DumpRawGranEvent(CurrentProc, p,
2786                              GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
2787           */
2788           IF_GRAN_DEBUG(blockOnFetch, 
2789                         BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
2790         }
2791 #endif /* 0 */
2792
2793       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2794       
2795       /* Rescheduling (GranSim internal) is necessary */
2796       NeedToReSchedule = rtsTrue;
2797       
2798       return(1); 
2799     }
2800   return(0);
2801 }
2802
2803 //@cindex GranSimSpark
2804 void 
2805 GranSimSpark(local,node)
2806 StgInt local;
2807 StgClosure *node;
2808 {
2809   /* ++SparksAvail;  Nope; do that in add_to_spark_queue */
2810   if (RtsFlags.GranFlags.GranSimStats.Sparks)
2811     DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
2812                      END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
2813
2814   /* Force the PE to take notice of the spark */
2815   if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2816     new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2817               FindWork,
2818               END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
2819     if (CurrentTime[CurrentProc]<TimeOfNextEvent)
2820       TimeOfNextEvent = CurrentTime[CurrentProc];
2821   }
2822
2823   if(local)
2824     ++CurrentTSO->gran.localsparks;
2825   else
2826     ++CurrentTSO->gran.globalsparks;
2827 }
2828
2829 //@cindex GranSimSparkAt
2830 void 
2831 GranSimSparkAt(spark,where,identifier)
2832 rtsSpark *spark;
2833 StgClosure *where;    /* This should be a node; alternatively could be a GA */
2834 StgInt identifier;
2835 {
2836   PEs p = where_is(where);
2837   GranSimSparkAtAbs(spark,p,identifier);
2838 }
2839
2840 //@cindex GranSimSparkAtAbs
2841 void 
2842 GranSimSparkAtAbs(spark,proc,identifier)
2843 rtsSpark *spark;
2844 PEs proc;        
2845 StgInt identifier;
2846 {
2847   rtsTime exporttime;
2848
2849   if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
2850     return;                          /* turned a spark into a NULL. */
2851
2852   /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2853   if(RtsFlags.GranFlags.GranSimStats.Sparks)
2854     DumpRawGranEvent(proc,0,SP_SPARKAT,
2855                      END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
2856
2857   if (proc!=CurrentProc) {
2858     CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2859     exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]? 
2860                   CurrentTime[proc]: CurrentTime[CurrentProc])
2861                  + RtsFlags.GranFlags.Costs.latency;
2862   } else {
2863     exporttime = CurrentTime[CurrentProc];
2864   }
2865
2866   if ( RtsFlags.GranFlags.Light )
2867     /* Need CurrentTSO in event field to associate costs with creating
2868        spark even in a GrAnSim Light setup */
2869     new_event(proc, CurrentProc, exporttime,
2870               MoveSpark,
2871               CurrentTSO, spark->node, spark);
2872   else
2873     new_event(proc, CurrentProc, exporttime,
2874               MoveSpark, (StgTSO*)NULL, spark->node, spark);
2875   /* Bit of a hack to treat placed sparks the same as stolen sparks */
2876   ++OutstandingFishes[proc];
2877
2878   /* Force the PE to take notice of the spark (FINDWORK is put after a
2879      MoveSpark into the sparkq!) */
2880   if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2881     new_event(CurrentProc,CurrentProc,exporttime+1,
2882               FindWork,
2883               (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2884   }
2885
2886   if (exporttime<TimeOfNextEvent)
2887     TimeOfNextEvent = exporttime;
2888
2889   if (proc!=CurrentProc) {
2890     CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2891     ++CurrentTSO->gran.globalsparks;
2892   } else { 
2893     ++CurrentTSO->gran.localsparks;
2894   }
2895 }
2896
2897 /* 
2898    This function handles local and global blocking.  It's called either
2899    from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
2900    trying to fetch an BH or RBH 
2901 */
2902
2903 //@cindex GranSimBlock
2904 void 
2905 GranSimBlock(tso, proc, node)
2906 StgTSO *tso;
2907 PEs proc;
2908 StgClosure *node;
2909 {
2910   PEs node_proc = where_is(node), 
2911       tso_proc = where_is((StgClosure *)tso);
2912
2913   ASSERT(tso_proc==CurrentProc);
2914   // ASSERT(node_proc==CurrentProc);
2915   IF_GRAN_DEBUG(bq,
2916                 if (node_proc!=CurrentProc) 
2917                   belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
2918                         tso->id, tso, tso_proc, node, node_proc)); 
2919   ASSERT(tso->link==END_TSO_QUEUE);
2920   ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
2921   //ASSERT(tso==run_queue_hds[proc]);
2922
2923   IF_DEBUG(gran,
2924            belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
2925                  tso->id, tso, proc, node, CurrentTime[proc]));
2926
2927
2928     /* THIS SHOULD NEVER HAPPEN!
2929        If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
2930        we have missed a GranSimFetch before entering this closure;
2931        we hack around it for now, faking a FetchNode; 
2932        because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
2933        tso will be blocked on this closure until the FetchReply occurs.
2934
2935        ngoq Dogh! 
2936
2937     if (node_proc!=CurrentProc) {
2938       StgInt ret;
2939       ret = GranSimFetch(node);
2940       IF_GRAN_DEBUG(bq,
2941                     if (ret)
2942                       belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
2943                             node, node_proc, CurrentProc););
2944       return;
2945     }
2946     */
2947
2948   if (RtsFlags.GranFlags.GranSimStats.Full)
2949     DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
2950
2951   ++(tso->gran.blockcount);
2952   /* Distinction  between local and global block is made in blockFetch */
2953   tso->gran.blockedat = CurrentTime[proc];
2954
2955   CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
2956   ActivateNextThread(proc);
2957   /* tso->link = END_TSO_QUEUE;    not really necessary; only for testing */
2958 }
2959
2960 #endif /* GRAN */
2961
2962 //@node Index,  , Dumping routines, GranSim specific code
2963 //@subsection Index
2964
2965 //@index
2966 //* ActivateNextThread::  @cindex\s-+ActivateNextThread
2967 //* CurrentProc::  @cindex\s-+CurrentProc
2968 //* CurrentTime::  @cindex\s-+CurrentTime
2969 //* GranSimAllocate::  @cindex\s-+GranSimAllocate
2970 //* GranSimBlock::  @cindex\s-+GranSimBlock
2971 //* GranSimExec::  @cindex\s-+GranSimExec
2972 //* GranSimFetch::  @cindex\s-+GranSimFetch
2973 //* GranSimLight_insertThread::  @cindex\s-+GranSimLight_insertThread
2974 //* GranSimSpark::  @cindex\s-+GranSimSpark
2975 //* GranSimSparkAt::  @cindex\s-+GranSimSparkAt
2976 //* GranSimSparkAtAbs::  @cindex\s-+GranSimSparkAtAbs
2977 //* GranSimUnallocate::  @cindex\s-+GranSimUnallocate
2978 //* any_idle::  @cindex\s-+any_idle
2979 //* blockFetch::  @cindex\s-+blockFetch
2980 //* do_the_fetchnode::  @cindex\s-+do_the_fetchnode
2981 //* do_the_fetchreply::  @cindex\s-+do_the_fetchreply
2982 //* do_the_findwork::  @cindex\s-+do_the_findwork
2983 //* do_the_globalblock::  @cindex\s-+do_the_globalblock
2984 //* do_the_movespark::  @cindex\s-+do_the_movespark
2985 //* do_the_movethread::  @cindex\s-+do_the_movethread
2986 //* do_the_startthread::  @cindex\s-+do_the_startthread
2987 //* do_the_unblock::  @cindex\s-+do_the_unblock
2988 //* fetchNode::  @cindex\s-+fetchNode
2989 //* ga_to_proc::  @cindex\s-+ga_to_proc
2990 //* get_next_event::  @cindex\s-+get_next_event
2991 //* get_time_of_next_event::  @cindex\s-+get_time_of_next_event
2992 //* grab_event::  @cindex\s-+grab_event
2993 //* handleFetchRequest::  @cindex\s-+handleFetchRequest
2994 //* handleIdlePEs::  @cindex\s-+handleIdlePEs
2995 //* idlers::  @cindex\s-+idlers
2996 //* insertThread::  @cindex\s-+insertThread
2997 //* insert_event::  @cindex\s-+insert_event
2998 //* is_on_queue::  @cindex\s-+is_on_queue
2999 //* is_unique::  @cindex\s-+is_unique
3000 //* new_event::  @cindex\s-+new_event
3001 //* prepend_event::  @cindex\s-+prepend_event
3002 //* print_event::  @cindex\s-+print_event
3003 //* print_eventq::  @cindex\s-+print_eventq
3004 //* prune_eventq ::  @cindex\s-+prune_eventq 
3005 //* spark queue::  @cindex\s-+spark queue
3006 //* sparkStealTime::  @cindex\s-+sparkStealTime
3007 //* stealSomething::  @cindex\s-+stealSomething
3008 //* stealSpark::  @cindex\s-+stealSpark
3009 //* stealSparkMagic::  @cindex\s-+stealSparkMagic
3010 //* stealThread::  @cindex\s-+stealThread
3011 //* stealThreadMagic::  @cindex\s-+stealThreadMagic
3012 //* thread_queue_len::  @cindex\s-+thread_queue_len
3013 //* traverse_eventq_for_gc::  @cindex\s-+traverse_eventq_for_gc
3014 //* where_is::  @cindex\s-+where_is
3015 //@end index