[project @ 2003-12-16 13:27:31 by simonmar]
[ghc-hetmet.git] / ghc / rts / parallel / GranSim.c
1 /* 
2    Time-stamp: <Tue Mar 06 2001 00:17:42 Stardate: [-30]6285.06 hwloidl>
3    $Id: GranSim.c,v 1.5 2001/07/23 17:23:20 simonmar Exp $
4
5    Variables and functions specific to GranSim the parallelism simulator
6    for GPH.
7 */
8
9 //@node GranSim specific code, , ,
10 //@section GranSim specific code
11
12 /*
13    Macros for dealing with the new and improved GA field for simulating
14    parallel execution. Based on @CONCURRENT@ package. The GA field now
15    contains a mask, where the n-th bit stands for the n-th processor, where
16    this data can be found. In case of multiple copies, several bits are
17    set. The total number of processors is bounded by @MAX_PROC@, which
18    should be <= the length of a word in bits.  -- HWL 
19 */
20
21 //@menu
22 //* Includes::                  
23 //* Prototypes and externs::    
24 //* Constants and Variables::   
25 //* Initialisation::            
26 //* Global Address Operations::  
27 //* Global Event Queue::        
28 //* Spark queue functions::     
29 //* Scheduling functions::      
30 //* Thread Queue routines::     
31 //* GranSim functions::         
32 //* GranSimLight routines::     
33 //* Code for Fetching Nodes::   
34 //* Idle PEs::                  
35 //* Routines directly called from Haskell world::  
36 //* Emiting profiling info for GrAnSim::  
37 //* Dumping routines::          
38 //* Index::                     
39 //@end menu
40
41 //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
42 //@subsection Includes
43
44 #include "Rts.h"
45 #include "RtsFlags.h"
46 #include "RtsUtils.h"
47 #include "StgMiscClosures.h"
48 #include "StgTypes.h"
49 #include "Schedule.h"
50 #include "SchedAPI.h"       // for pushClosure
51 #include "GranSimRts.h"
52 #include "GranSim.h"
53 #include "ParallelRts.h"
54 #include "ParallelDebug.h"
55 #include "Sparks.h"
56 #include "Storage.h"       // for recordMutable
57
58
59 //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
60 //@subsection Prototypes and externs
61
62 #if defined(GRAN)
63
64 /* Prototypes */
65 static inline PEs      ga_to_proc(StgWord);
66 static inline rtsBool  any_idle(void);
67 static inline nat      idlers(void);
68        PEs             where_is(StgClosure *node);
69
70 static rtsBool         stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
71 static rtsBool         stealSpark(PEs proc);
72 static rtsBool         stealThread(PEs proc);
73 static rtsBool         stealSparkMagic(PEs proc);
74 static rtsBool         stealThreadMagic(PEs proc);
75 /* subsumed by stealSomething
76 static void            stealThread(PEs proc); 
77 static void            stealSpark(PEs proc);
78 */
79 static rtsTime         sparkStealTime(void);
80 static nat             natRandom(nat from, nat to);
81 static PEs             findRandomPE(PEs proc);
82 static void            sortPEsByTime (PEs proc, PEs *pes_by_time, 
83                                       nat *firstp, nat *np);
84
85 void GetRoots(void);
86
87 #endif /* GRAN */
88
89 //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
90 //@subsection Constants and Variables
91
92 #if defined(GRAN) || defined(PAR)
93 /* See GranSim.h for the definition of the enum gran_event_types */
94 char *gran_event_names[] = {
95     "START", "START(Q)",
96     "STEALING", "STOLEN", "STOLEN(Q)",
97     "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
98     "SCHEDULE", "DESCHEDULE",
99     "END",
100     "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
101     "ALLOC",
102     "TERMINATE",
103     "SYSTEM_START", "SYSTEM_END",           /* only for debugging */
104     "??"
105 };
106 #endif
107
108 #if defined(GRAN)                                              /* whole file */
109 char *proc_status_names[] = {
110   "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy", 
111   "UnknownProcStatus"
112 };
113
114 /* For internal use (event statistics) only */
115 char *event_names[] =
116     { "ContinueThread", "StartThread", "ResumeThread", 
117       "MoveSpark", "MoveThread", "FindWork",
118       "FetchNode", "FetchReply",
119       "GlobalBlock", "UnblockThread"
120     };
121
122 //@cindex CurrentProc
123 PEs CurrentProc = 0;
124
125 /*
126   ToDo: Create a structure for the processor status and put all the 
127         arrays below into it. 
128   -- HWL */
129
130 //@cindex CurrentTime
131 /* One clock for each PE */
132 rtsTime CurrentTime[MAX_PROC];  
133
134 /* Useful to restrict communication; cf fishing model in GUM */
135 nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
136
137 /* Status of each PE (new since but independent of GranSim Light) */
138 rtsProcStatus procStatus[MAX_PROC];
139
140 # if defined(GRAN) && defined(GRAN_CHECK)
141 /* To check if the RTS ever tries to run a thread that should be blocked
142    because of fetching remote data */
143 StgTSO *BlockedOnFetch[MAX_PROC];
144 # define FETCH_MASK_TSO  0x08000000      /* only bits 0, 1, 2 should be used */
145 # endif
146
147 nat SparksAvail = 0;     /* How many sparks are available */
148 nat SurplusThreads = 0;  /* How many excess threads are there */
149
150 /* Do we need to reschedule following a fetch? */
151 rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse; 
152 rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
153
154 //@cindex spark queue
155 /* GranSim: a globally visible array of spark queues */
156 rtsSparkQ pending_sparks_hds[MAX_PROC];
157 rtsSparkQ pending_sparks_tls[MAX_PROC];
158
159 nat sparksIgnored = 0, sparksCreated = 0;
160
161 GlobalGranStats globalGranStats;
162
163 nat gran_arith_cost, gran_branch_cost, gran_load_cost, 
164     gran_store_cost, gran_float_cost;
165
166 /*
167 Old comment from 0.29. ToDo: Check and update -- HWL
168
169 The following variables control the behaviour of GrAnSim. In general, there
170 is one RTS option for enabling each of these features. In getting the
171 desired setup of GranSim the following questions have to be answered:
172 \begin{itemize}
173 \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)? 
174       Currently only unfair scheduling is supported.
175 \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)? 
176       Either block and wait for the
177       data or reschedule and do some other work.
178       Thus, if this variable is true, asynchronous communication is
179       modelled. Block on fetch mainly makes sense for incremental fetching.
180
181       There is also a simplified fetch variant available
182       (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
183       communication. It is faster but the results will be less accurate.
184 \item How aggressive to be in getting work after a reschedule on fetch
185       (@RtsFlags.GranFlags.FetchStrategy@)?
186       This is determined by the so-called {\em fetching
187       strategy\/}. Currently, there are four possibilities:
188       \begin{enumerate}
189        \item Only run a runnable thread.
190        \item Turn a spark into a thread, if necessary.
191        \item Steal a remote spark, if necessary.
192        \item Steal a runnable thread from another processor, if necessary.
193       \end{itemize}
194       The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
195       when rescheduling on a fetch.
196 \item Should sparks or threads be stolen first when looking for work
197       (@RtsFlags.GranFlags.DoStealThreadsFirst@)? 
198       The default is to steal sparks first (much cheaper).
199 \item Should the RTS use a lazy thread creation scheme
200       (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)?  By default yes i.e.\ sparks are only
201       turned into threads when work is needed. Also note, that sparks
202       can be discarded by the RTS (this is done in the case of an overflow
203       of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@  to @True@ forces
204       the creation of threads at the next possibility (i.e.\ when new work
205       is demanded the next time).
206 \item Should data be fetched closure-by-closure or in packets
207       (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental 
208       (i.e.\ closure-by-closure) strategy. This makes sense in a
209       low-latency setting but is bad in a high-latency system. Setting 
210       @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
211       parameters determine the size of the packets (@pack_buffer_size@) and the number of
212       thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
213 \item If there is no other possibility to find work, should runnable threads
214       be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
215       RTS tried to get sparks (either local or remote ones) first. Thread
216       migration is very expensive, since a whole TSO has to be transferred
217       and probably data locality becomes worse in the process. Note, that
218       the closure, which will be evaluated next by that TSO is not
219       transferred together with the TSO (that might block another thread).
220 \item Should the RTS distinguish between sparks created by local nodes and
221       stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)?  The idea is to improve 
222       data locality by preferring sparks of local nodes (it is more likely
223       that the data for those sparks is already on the local processor). 
224       However, such a distinction also imposes an overhead on the spark
225       queue management, and typically a large number of sparks are
226       generated during execution. By default this variable is set to @False@.
227 \item Should the RTS use granularity control mechanisms? The idea of a 
228       granularity control mechanism is to make use of granularity
229       information provided via annotation of the @par@ construct in order
230       to prefer bigger threads when either turning a spark into a thread or
231       when choosing the next thread to schedule. Currently, three such
232       mechanisms are implemented:
233       \begin{itemize}
234         \item Cut-off: The granularity information is interpreted as a
235               priority. If a threshold priority is given to the RTS, then
236               only those sparks with a higher priority than the threshold 
237               are actually created. Other sparks are immediately discarded.
238               This is similar to a usual cut-off mechanism often used in 
239               parallel programs, where parallelism is only created if the 
240               input data is lage enough. With this option, the choice is 
241               hidden in the RTS and only the threshold value has to be 
242               provided as a parameter to the runtime system.
243         \item Priority Sparking: This mechanism keeps priorities for sparks
244               and chooses the spark with the highest priority when turning
245               a spark into a thread. After that the priority information is
246               discarded. The overhead of this mechanism comes from
247               maintaining a sorted spark queue.
248         \item Priority Scheduling: This mechanism keeps the granularity
249               information for threads, to. Thus, on each reschedule the 
250               largest thread is chosen. This mechanism has a higher
251               overhead, as the thread queue is sorted, too.
252        \end{itemize}  
253 \end{itemize}
254 */
255
256 //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
257 //@subsection Initialisation
258
259 void 
260 init_gr_stats (void) {
261   memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
262 #if 0
263   /* event stats */
264   globalGranStats.noOfEvents = 0;
265   for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
266
267   /* communication stats */
268   globalGranStats.fetch_misses = 0;
269   globalGranStats.tot_low_pri_sparks = 0;
270
271   /* obscure stats */  
272   globalGranStats.rs_sp_count = 0;
273   globalGranStats.rs_t_count = 0;
274   globalGranStats.ntimes_total = 0, 
275   globalGranStats.fl_total = 0;
276   globalGranStats.no_of_steals = 0;
277
278   /* spark queue stats */
279   globalGranStats.tot_sq_len = 0, 
280   globalGranStats.tot_sq_probes = 0; 
281   globalGranStats.tot_sparks = 0;
282   globalGranStats.withered_sparks = 0;
283   globalGranStats.tot_add_threads = 0;
284   globalGranStats.tot_tq_len = 0;
285   globalGranStats.non_end_add_threads = 0;
286
287   /* thread stats */
288   globalGranStats.tot_threads_created = 0;
289   for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
290 #endif /* 0 */
291 }
292
293 //@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
294 //@subsection Global Address Operations
295 /*
296   ----------------------------------------------------------------------
297   Global Address Operations
298
299   These functions perform operations on the global-address (ga) part of a
300   closure. The ga is the only new field (1 word) in a closure introduced by
301   GrAnSim. It serves as a bitmask, indicating on which processor the
302   closure is residing. Since threads are described by Thread State Object
303   (TSO), which is nothing but another kind of closure, this scheme allows
304   gives placement information about threads.
305
306   A ga is just a bitmask, so the operations on them are mainly bitmask
307   manipulating functions. Note, that there are important macros like PROCS,
308   IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
309
310   NOTE: In GrAnSim-light we don't maintain placement information. This
311   allows to simulate an arbitrary number of processors. The price we have
312   to be is the lack of costing any communication properly. In short,
313   GrAnSim-light is meant to reveal the maximal parallelism in a program.
314   From an implementation point of view the important thing is: {\em
315   GrAnSim-light does not maintain global-addresses}.  */
316
317 /* ga_to_proc returns the first processor marked in the bitmask ga.
318    Normally only one bit in ga should be set. But for PLCs all bits
319    are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
320  
321 //@cindex ga_to_proc
322
323 static inline PEs
324 ga_to_proc(StgWord ga)
325 {
326     PEs i;
327     for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
328     ASSERT(i<RtsFlags.GranFlags.proc);
329     return (i);
330 }
331
332 /* NB: This takes a *node* rather than just a ga as input */
333 //@cindex where_is
334 PEs
335 where_is(StgClosure *node)
336 { return (ga_to_proc(PROCS(node))); }
337
338 // debugging only
339 //@cindex is_unique
340 rtsBool
341 is_unique(StgClosure *node)
342
343   PEs i;
344   rtsBool unique = rtsFalse;
345
346   for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
347     if (IS_LOCAL_TO(PROCS(node), i))
348       if (unique)          // exactly 1 instance found so far
349         return rtsFalse;   // found a 2nd instance => not unique
350       else 
351         unique = rtsTrue;  // found 1st instance 
352   ASSERT(unique);          // otherwise returned from within loop
353   return (unique);
354 }
355
356 //@cindex any_idle
357 static inline rtsBool
358 any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
359  PEs i; 
360  rtsBool any_idle; 
361  for(i=0, any_idle=rtsFalse; 
362      !any_idle && i<RtsFlags.GranFlags.proc; 
363      any_idle = any_idle || procStatus[i] == Idle, i++) 
364  {} ;
365 }
366
367 //@cindex idlers
368 static inline nat
369 idlers(void) {  /* number of idle PEs */
370  PEs i, j; 
371  for(i=0, j=0;
372      i<RtsFlags.GranFlags.proc; 
373      j += (procStatus[i] == Idle) ? 1 : 0, i++) 
374  {} ;
375  return j;
376 }
377
378 //@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
379 //@subsection Global Event Queue
380 /*
381 The following routines implement an ADT of an event-queue (FIFO). 
382 ToDo: Put that in an own file(?)
383 */
384
385 /* Pointer to the global event queue; events are currently malloc'ed */
386 rtsEventQ EventHd = NULL;
387
388 //@cindex get_next_event
389 rtsEvent *
390 get_next_event(void)
391 {
392   static rtsEventQ entry = NULL;
393
394   if (EventHd == NULL) {
395     barf("No next event. This may be caused by a circular data dependency in the program.");
396   }
397
398   if (entry != NULL)
399     free((char *)entry);
400
401   if (RtsFlags.GranFlags.GranSimStats.Global) {     /* count events */
402     globalGranStats.noOfEvents++;
403     globalGranStats.event_counts[EventHd->evttype]++;
404   }
405
406   entry = EventHd;
407
408   IF_GRAN_DEBUG(event_trace,
409            print_event(entry));
410
411   EventHd = EventHd->next;
412   return(entry);
413 }
414
415 /* When getting the time of the next event we ignore CONTINUETHREAD events:
416    we don't want to be interrupted before the end of the current time slice
417    unless there is something important to handle. 
418 */
419 //@cindex get_time_of_next_event
420 rtsTime
421 get_time_of_next_event(void)
422
423   rtsEventQ event = EventHd;
424
425   while (event != NULL && event->evttype==ContinueThread) {
426     event = event->next;
427   }
428   if(event == NULL)
429       return ((rtsTime) 0);
430   else
431       return (event->time);
432 }
433
434 /* ToDo: replace malloc/free with a free list */
435 //@cindex insert_event
436 void
437 insert_event(newentry)
438 rtsEvent *newentry;
439 {
440   rtsEventType evttype = newentry->evttype;
441   rtsEvent *event, **prev;
442
443   /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
444
445   /* Search the queue and insert at the right point:
446      FINDWORK before everything, CONTINUETHREAD after everything.
447
448      This ensures that we find any available work after all threads have
449      executed the current cycle.  This level of detail would normally be
450      irrelevant, but matters for ridiculously low latencies...
451   */
452
453   /* Changed the ordering: Now FINDWORK comes after everything but 
454      CONTINUETHREAD. This makes sure that a MOVESPARK comes before a 
455      FINDWORK. This is important when a GranSimSparkAt happens and
456      DoAlwaysCreateThreads is turned on. Also important if a GC occurs
457      when trying to build a new thread (see much_spark)  -- HWL 02/96  */
458
459   if(EventHd == NULL)
460     EventHd = newentry;
461   else {
462     for (event = EventHd, prev=(rtsEvent**)&EventHd; 
463          event != NULL; 
464          prev = (rtsEvent**)&(event->next), event = event->next) {
465       switch (evttype) {
466         case FindWork: if ( event->time < newentry->time ||
467                             ( (event->time == newentry->time) &&
468                               (event->evttype != ContinueThread) ) )
469                          continue;
470                        else
471                          break;
472         case ContinueThread: if ( event->time <= newentry->time )
473                                continue;
474                              else
475                                break;
476         default: if ( event->time < newentry->time || 
477                       ((event->time == newentry->time) &&
478                        (event->evttype == newentry->evttype)) )
479                    continue;
480                  else
481                    break;
482        }
483        /* Insert newentry here (i.e. before event) */
484        *prev = newentry;
485        newentry->next = event;
486        break;
487     }
488     if (event == NULL)
489       *prev = newentry;
490   }
491 }
492
493 //@cindex new_event
494 void
495 new_event(proc,creator,time,evttype,tso,node,spark)
496 PEs proc, creator;
497 rtsTime time;
498 rtsEventType evttype;
499 StgTSO *tso;
500 StgClosure *node;
501 rtsSpark *spark;
502 {
503   rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
504
505   newentry->proc     = proc;
506   newentry->creator  = creator;
507   newentry->time     = time;
508   newentry->evttype  = evttype;
509   newentry->tso      = tso;
510   newentry->node     = node;
511   newentry->spark    = spark;
512   newentry->gc_info  = 0;
513   newentry->next     = NULL;
514
515   insert_event(newentry);
516
517   IF_DEBUG(gran, 
518            fprintf(stderr, "GRAN: new_event: \n"); 
519            print_event(newentry));
520 }
521
522 //@cindex prepend_event
523 void
524 prepend_event(event)       /* put event at beginning of EventQueue */
525 rtsEvent *event;
526 {                                 /* only used for GC! */
527  event->next = EventHd;
528  EventHd = event;
529 }
530
531 //@cindex grab_event
532 rtsEventQ
533 grab_event(void)             /* undo prepend_event i.e. get the event */
534 {                        /* at the head of EventQ but don't free anything */
535  rtsEventQ event = EventHd;
536
537  if (EventHd == NULL) {
538    barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
539  }
540
541  EventHd = EventHd->next;
542  return (event);
543 }
544
545 //@cindex traverse_eventq_for_gc
546 void 
547 traverse_eventq_for_gc(void)
548 {
549  rtsEventQ event = EventHd;
550  StgWord bufsize;
551  StgClosure *closurep;
552  StgTSO *tsop;
553  StgPtr buffer, bufptr;
554  PEs proc, creator;
555
556  /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
557     orig closure (root of packed graph). This means that a graph, which is
558     between processors at the time of GC is fetched again at the time when
559     it would have arrived, had there been no GC. Slightly inaccurate but
560     safe for GC.
561     This is only needed for GUM style fetchng. -- HWL */
562  if (!RtsFlags.GranFlags.DoBulkFetching)
563    return;
564
565  for(event = EventHd; event!=NULL; event=event->next) {
566    if (event->evttype==FetchReply) {
567      buffer = stgCast(StgPtr,event->node);
568      ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG);  /* It's a pack buffer */
569      bufsize = buffer[PACK_SIZE_LOCN];
570      closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
571      tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
572      proc = event->proc;
573      creator = event->creator;                 /* similar to unpacking */
574      for (bufptr=buffer+PACK_HDR_SIZE; 
575           bufptr<(buffer+bufsize);
576           bufptr++) {
577          // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
578          //      (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
579            if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
580              convertFromRBH(stgCast(StgClosure *,bufptr));
581          }
582      }
583      free(buffer);
584      event->evttype = FetchNode;
585      event->proc    = creator;
586      event->creator = proc;
587      event->node    = closurep;
588      event->tso     = tsop;
589      event->gc_info = 0;
590    }
591  }
592 }
593
594 void
595 markEventQueue(void)
596
597   StgClosure *MarkRoot(StgClosure *root); // prototype
598
599   rtsEventQ event = EventHd;
600   nat len;
601
602   /* iterate over eventq and register relevant fields in event as roots */
603   for(event = EventHd, len =  0; event!=NULL; event=event->next, len++) {
604     switch (event->evttype) {
605       case ContinueThread:  
606         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
607         break;
608       case StartThread: 
609         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
610         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
611         break;
612       case ResumeThread:
613         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
614         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
615         break;
616       case MoveSpark:
617         event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
618         break;
619       case MoveThread:
620         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
621         break;
622       case FindWork:
623         break;
624       case FetchNode: 
625         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
626         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
627         break;
628       case FetchReply:
629         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
630         if (RtsFlags.GranFlags.DoBulkFetching)
631           // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
632           belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
633         else
634           event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
635         break;
636       case GlobalBlock:
637         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
638         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
639         break;
640       case UnblockThread:
641         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
642         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
643         break;
644       default:
645         barf("markEventQueue: trying to mark unknown event @ %p", event);
646     }}
647   IF_DEBUG(gc,
648            belch("GC: markEventQueue: %d events in queue", len));
649 }
650
651 /*
652   Prune all ContinueThread events related to tso or node in the eventq.
653   Currently used if a thread leaves STG land with ThreadBlocked status,
654   i.e. it blocked on a closure and has been put on its blocking queue.  It
655   will be reawakended via a call to awakenBlockedQueue. Until then no
656   event effecting this tso should appear in the eventq.  A bit of a hack,
657   because ideally we shouldn't generate such spurious ContinueThread events
658   in the first place.  
659 */
660 //@cindex prune_eventq 
661 void 
662 prune_eventq(tso, node) 
663 StgTSO *tso; 
664 StgClosure *node; 
665 { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
666
667   /* node unused for now */ 
668   ASSERT(node==NULL); 
669   /* tso must be valid, then */
670   ASSERT(tso!=END_TSO_QUEUE);
671   while (event != NULL) {
672     if (event->evttype==ContinueThread && 
673         (event->tso==tso)) {
674       IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
675                     belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
676                           event->tso->id, event->tso, event->proc, event->time, event));
677       if (prev==(rtsEventQ)NULL) { // beginning of eventq
678         EventHd = event->next;
679         free(event); 
680         event = EventHd;
681       } else {
682         prev->next = event->next;
683         free(event); 
684         event = prev->next;
685       }
686     } else { // no pruning necessary; go to next event
687       prev = event;
688       event = event->next;
689     }
690   }
691 }
692
693 //@cindex print_event
694 void
695 print_event(event)
696 rtsEvent *event;
697 {
698   char str_tso[16], str_node[16];
699   StgThreadID tso_id;
700
701   if (event->tso==END_TSO_QUEUE) {
702     strcpy(str_tso, "______");
703     tso_id = 0;
704   } else { 
705     sprintf(str_tso, "%p", event->tso);
706     tso_id = (event->tso==NULL) ? 0 : event->tso->id;
707   }
708   if  (event->node==(StgClosure*)NULL) {
709     strcpy(str_node, "______");
710   } else {
711     sprintf(str_node, "%p", event->node);
712   }
713   // HWL: shouldn't be necessary; ToDo: nuke
714   //str_tso[6]='\0';
715   //str_node[6]='\0';
716
717   if (event==NULL)
718     fprintf(stderr,"Evt: NIL\n");
719   else
720     fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
721               event_names[event->evttype], event->evttype,
722               event->proc, event->creator, event->time, 
723               tso_id, str_tso, str_node
724               /*, event->spark, event->next */ );
725
726 }
727
728 //@cindex print_eventq
729 void
730 print_eventq(hd)
731 rtsEvent *hd;
732 {
733   rtsEvent *x;
734
735   fprintf(stderr,"Event Queue with root at %p:\n", hd);
736   for (x=hd; x!=NULL; x=x->next) {
737     print_event(x);
738   }
739 }
740
741 /* 
742    Spark queue functions are now all  in Sparks.c!!
743 */
744 //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
745 //@subsection Scheduling functions
746
747 /* 
748    These functions are variants of thread initialisation and therefore
749    related to initThread and friends in Schedule.c. However, they are
750    specific to a GranSim setup in storing more info in the TSO's statistics
751    buffer and sorting the thread queues etc.  
752 */
753
754 /*
755    A large portion of startThread deals with maintaining a sorted thread
756    queue, which is needed for the Priority Sparking option. Without that
757    complication the code boils down to FIFO handling.  
758 */
759 //@cindex insertThread
760 void
761 insertThread(tso, proc)
762 StgTSO*     tso;
763 PEs         proc;
764 {
765   StgTSO *prev = NULL, *next = NULL;
766   nat count = 0;
767   rtsBool found = rtsFalse;
768
769   ASSERT(CurrentProc==proc);
770   ASSERT(!is_on_queue(tso,proc));
771   /* Idle proc: put the thread on the run queue
772      same for pri spark and basic version */
773   if (run_queue_hds[proc] == END_TSO_QUEUE)
774     {
775       /* too strong!
776       ASSERT((CurrentProc==MainProc &&   
777               CurrentTime[MainProc]==0 &&
778               procStatus[MainProc]==Idle) ||
779              procStatus[proc]==Starting);
780       */
781       run_queue_hds[proc] = run_queue_tls[proc] = tso;
782
783       CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
784
785       /* new_event of ContinueThread has been moved to do_the_startthread */
786
787       /* too strong!
788       ASSERT(procStatus[proc]==Idle || 
789              procStatus[proc]==Fishing || 
790              procStatus[proc]==Starting);
791       procStatus[proc] = Busy;
792       */
793       return;
794     }
795
796   if (RtsFlags.GranFlags.Light)
797     GranSimLight_insertThread(tso, proc);
798
799   /* Only for Pri Scheduling: find place where to insert tso into queue */
800   if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
801     /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
802     for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
803          (next != END_TSO_QUEUE) && 
804          !(found = tso->gran.pri >= next->gran.pri);
805          prev = next, next = next->link, count++) 
806       { 
807        ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
808               (prev==(StgTSO*)NULL || prev->link==next));
809       }
810
811   ASSERT(!found || next != END_TSO_QUEUE);
812   ASSERT(procStatus[proc]!=Idle);
813  
814   if (found) {
815      /* found can only be rtsTrue if pri scheduling enabled */ 
816      ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
817      if (RtsFlags.GranFlags.GranSimStats.Global) 
818        globalGranStats.non_end_add_threads++;
819      /* Add tso to ThreadQueue between prev and next */
820      tso->link = next;
821      if ( next == (StgTSO*)END_TSO_QUEUE ) {
822        run_queue_tl = tso;
823      } else {
824        /* no back link for TSO chain */
825      }
826      
827      if ( prev == (StgTSO*)END_TSO_QUEUE ) {
828        /* Never add TSO as first elem of thread queue; the first */
829        /* element should be the one that is currently running -- HWL */
830        IF_DEBUG(gran,
831                 belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
832                     tso, tso->gran.pri, run_queue_hd, proc,
833                     CurrentTime[proc]));
834      } else {
835       prev->link = tso;
836      }
837   } else { /* !found */ /* or not pri sparking! */
838     /* Add TSO to the end of the thread queue on that processor */
839     run_queue_tls[proc]->link = tso;
840     run_queue_tls[proc] = tso;
841   }
842   ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
843   CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
844                        RtsFlags.GranFlags.Costs.threadqueuetime;
845
846   /* ToDo: check if this is still needed -- HWL 
847   if (RtsFlags.GranFlags.DoThreadMigration)
848     ++SurplusThreads;
849
850   if (RtsFlags.GranFlags.GranSimStats.Full &&
851       !(( event_type == GR_START || event_type == GR_STARTQ) && 
852         RtsFlags.GranFlags.labelling) )
853     DumpRawGranEvent(proc, creator, event_type+1, tso, node, 
854                      tso->gran.sparkname, spark_queue_len(proc));
855   */
856
857 # if defined(GRAN_CHECK)
858   /* Check if thread queue is sorted. Only for testing, really!  HWL */
859   if ( RtsFlags.GranFlags.DoPriorityScheduling && 
860        (RtsFlags.GranFlags.Debug.sortedQ) ) {
861     rtsBool sorted = rtsTrue;
862     StgTSO *prev, *next;
863
864     if (run_queue_hds[proc]==END_TSO_QUEUE || 
865         run_queue_hds[proc]->link==END_TSO_QUEUE) {
866       /* just 1 elem => ok */
867     } else {
868       /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
869       for (prev = run_queue_hds[proc]->link, next = prev->link;
870            (next != END_TSO_QUEUE) ;
871            prev = next, next = prev->link) {
872         ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
873                (prev==(StgTSO*)NULL || prev->link==next));
874         sorted = sorted && 
875                  (prev->gran.pri >= next->gran.pri);
876       }
877     }
878     if (!sorted) {
879       fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
880               CurrentProc);
881       G_THREADQ(run_queue_hd,0x1);
882     }
883   }
884 # endif
885 }
886
887 /*
888   insertThread, which is only used for GranSim Light, is similar to
889   startThread in that it adds a TSO to a thread queue. However, it assumes
890   that the thread queue is sorted by local clocks and it inserts the TSO at
891   the right place in the queue. Don't create any event, just insert.  
892 */
893 //@cindex GranSimLight_insertThread
894 rtsBool
895 GranSimLight_insertThread(tso, proc)
896 StgTSO* tso;
897 PEs proc;
898 {
899   StgTSO *prev, *next;
900   nat count = 0;
901   rtsBool found = rtsFalse;
902
903   ASSERT(RtsFlags.GranFlags.Light);
904
905   /* In GrAnSim-Light we always have an idle `virtual' proc.
906      The semantics of the one-and-only thread queue is different here:
907      all threads in the queue are running (each on its own virtual processor);
908      the queue is only needed internally in the simulator to interleave the
909      reductions of the different processors.
910      The one-and-only thread queue is sorted by the local clocks of the TSOs.
911   */
912   ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
913   ASSERT(tso->link == END_TSO_QUEUE);
914
915   /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
916   if (RtsFlags.GranFlags.GranSimStats.Full &&
917       (RtsFlags.GranFlags.Debug.checkLight) && 
918       (run_queue_hd->link == END_TSO_QUEUE)) {
919     DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
920                      run_queue_hds[proc], (StgClosure*)NULL, 
921                      tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
922     // resched = rtsTrue;
923   }
924
925   /* this routine should only be used in a GrAnSim Light setup */
926   /* && CurrentProc must be 0 in GrAnSim Light setup */
927   ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
928
929   /* Idle proc; same for pri spark and basic version */
930   if (run_queue_hd==END_TSO_QUEUE)
931     {
932       run_queue_hd = run_queue_tl = tso;
933       /* MAKE_BUSY(CurrentProc); */
934       return rtsTrue;
935     }
936
937   for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
938        (next != END_TSO_QUEUE) && 
939        !(found = (tso->gran.clock < next->gran.clock));
940        prev = next, next = next->link, count++) 
941     { 
942        ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
943               (prev==(StgTSO*)NULL || prev->link==next));
944     }
945
946   /* found can only be rtsTrue if pri sparking enabled */ 
947   if (found) {
948      /* Add tso to ThreadQueue between prev and next */
949      tso->link = next;
950      if ( next == END_TSO_QUEUE ) {
951        run_queue_tls[proc] = tso;
952      } else {
953        /* no back link for TSO chain */
954      }
955      
956      if ( prev == END_TSO_QUEUE ) {
957        run_queue_hds[proc] = tso;
958      } else {
959        prev->link = tso;
960      }
961   } else { /* !found */ /* or not pri sparking! */
962     /* Add TSO to the end of the thread queue on that processor */
963     run_queue_tls[proc]->link = tso;
964     run_queue_tls[proc] = tso;
965   }
966
967   if ( prev == END_TSO_QUEUE ) {        /* new head of queue */
968     new_event(proc, proc, CurrentTime[proc],
969               ContinueThread,
970               tso, (StgClosure*)NULL, (rtsSpark*)NULL);
971   }
972   /*
973   if (RtsFlags.GranFlags.GranSimStats.Full && 
974       !(( event_type == GR_START || event_type == GR_STARTQ) && 
975         RtsFlags.GranFlags.labelling) )
976     DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
977                      tso->gran.sparkname, spark_queue_len(proc));
978   */
979   return rtsTrue;
980 }
981
982 /*
983   endThread is responsible for general clean-up after the thread tso has
984   finished. This includes emitting statistics into the profile etc.  
985 */
986 void
987 endThread(StgTSO *tso, PEs proc) 
988 {
989   ASSERT(procStatus[proc]==Busy);        // coming straight out of STG land
990   ASSERT(tso->what_next==ThreadComplete);
991   // ToDo: prune ContinueThreads for this TSO from event queue
992   DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
993
994   /* if this was the last thread on this PE then make it Idle */
995   if (run_queue_hds[proc]==END_TSO_QUEUE) {
996     procStatus[CurrentProc] = Idle;
997   }
998 }
999
1000 //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
1001 //@subsection Thread Queue routines
1002
1003 /* 
1004    Check whether given tso resides on the run queue of the current processor.
1005    Only used for debugging.
1006 */
1007    
1008 //@cindex is_on_queue
1009 rtsBool
1010 is_on_queue (StgTSO *tso, PEs proc) 
1011 {
1012   StgTSO *t;
1013   rtsBool found;
1014
1015   for (t=run_queue_hds[proc], found=rtsFalse; 
1016        t!=END_TSO_QUEUE && !(found = t==tso);
1017        t=t->link)
1018     /* nothing */ ;
1019
1020   return found;
1021 }
1022
1023 /* This routine  is only  used for keeping   a statistics  of thread  queue
1024    lengths to evaluate the impact of priority scheduling. -- HWL 
1025    {spark_queue_len}vo' jInIHta'
1026 */
1027 //@cindex thread_queue_len
1028 nat
1029 thread_queue_len(PEs proc) 
1030 {
1031  StgTSO *prev, *next;
1032  nat len;
1033
1034  for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
1035       next != END_TSO_QUEUE; 
1036       len++, prev = next, next = prev->link)
1037    {}
1038
1039  return (len);
1040 }
1041
1042 //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
1043 //@subsection GranSim functions
1044
1045 /* -----------------------------------------------------------------  */
1046 /* The main event handling functions; called from Schedule.c (schedule) */
1047 /* -----------------------------------------------------------------  */
1048  
1049 //@cindex do_the_globalblock
1050
1051 void 
1052 do_the_globalblock(rtsEvent* event)
1053
1054   PEs proc          = event->proc;        /* proc that requested node */
1055   StgTSO *tso       = event->tso;         /* tso that requested node */
1056   StgClosure  *node = event->node;        /* requested, remote node */
1057
1058   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
1059   /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
1060   ASSERT(!RtsFlags.GranFlags.Light);
1061   /* GlobalBlock events only valid with GUM fetching */
1062   ASSERT(RtsFlags.GranFlags.DoBulkFetching);
1063
1064   IF_GRAN_DEBUG(bq, // globalBlock,
1065     if (IS_LOCAL_TO(PROCS(node),proc)) {
1066       belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
1067             tso->id, tso, node, proc);
1068     });
1069
1070   /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
1071   if ( blockFetch(tso,proc,node) != 0 )
1072     return;                     /* node has become local by now */
1073
1074 #if 0
1075  ToDo: check whether anything has to be done at all after blockFetch -- HWL
1076
1077   if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
1078     StgTSO* tso = run_queue_hds[proc];       /* awaken next thread */
1079     if (tso != (StgTSO*)NULL) {
1080       new_event(proc, proc, CurrentTime[proc],
1081                 ContinueThread,
1082                 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
1083       CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
1084       if (RtsFlags.GranFlags.GranSimStats.Full)
1085         DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
1086                          (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));  // ToDo: check sparkname and spar_queue_len
1087       procStatus[proc] = Busy;                  /* might have been fetching */
1088     } else {
1089       procStatus[proc] = Idle;                     /* no work on proc now */
1090     }
1091   } else {  /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
1092               /* other thread is already running */
1093               /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL 
1094               new_event(proc,proc,CurrentTime[proc],
1095                        CONTINUETHREAD,EVENT_TSO(event),
1096                        (RtsFlags.GranFlags.DoBulkFetching ? closure :
1097                        EVENT_NODE(event)),NULL);
1098               */
1099   }
1100 #endif
1101 }
1102
1103 //@cindex do_the_unblock
1104
1105 void 
1106 do_the_unblock(rtsEvent* event) 
1107 {
1108   PEs proc = event->proc,       /* proc that requested node */
1109       creator = event->creator; /* proc that requested node */
1110   StgTSO* tso = event->tso;     /* tso that requested node */
1111   StgClosure* node = event->node;  /* requested, remote node */
1112   
1113   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
1114   /* There should be no UNBLOCKs in GrAnSim Light setup */
1115   ASSERT(!RtsFlags.GranFlags.Light);
1116   /* UnblockThread means either FetchReply has arrived or
1117      a blocking queue has been awakened;
1118      ToDo: check with assertions
1119   ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
1120   */
1121   if (!RtsFlags.GranFlags.DoAsyncFetch) {  /* block-on-fetch */
1122     /* We count block-on-fetch as normal block time */    
1123     tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
1124     /* Dumping now done when processing the event
1125        No costs for contextswitch or thread queueing in this case 
1126        if (RtsFlags.GranFlags.GranSimStats.Full)
1127          DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso, 
1128                           (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
1129     */
1130     /* Maybe do this in FetchReply already 
1131     if (procStatus[proc]==Fetching)
1132       procStatus[proc] = Busy;
1133     */
1134     /*
1135     new_event(proc, proc, CurrentTime[proc],
1136               ContinueThread,
1137               tso, node, (rtsSpark*)NULL);
1138     */
1139   } else {
1140     /* Asynchr comm causes additional costs here: */
1141     /* Bring the TSO from the blocked queue into the threadq */
1142   }
1143   /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
1144   new_event(proc, proc, 
1145             CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
1146             ResumeThread,
1147             tso, node, (rtsSpark*)NULL);
1148 }
1149
1150 //@cindex do_the_fetchnode
1151
1152 void
1153 do_the_fetchnode(rtsEvent* event)
1154 {
1155   PEs proc = event->proc,       /* proc that holds the requested node */
1156       creator = event->creator; /* proc that requested node */
1157   StgTSO* tso = event->tso;
1158   StgClosure* node = event->node;  /* requested, remote node */
1159   rtsFetchReturnCode rc;
1160
1161   ASSERT(CurrentProc==proc);
1162   /* There should be no FETCHNODEs in GrAnSim Light setup */
1163   ASSERT(!RtsFlags.GranFlags.Light);
1164
1165   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
1166
1167   CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1168
1169   /* ToDo: check whether this is the right place for dumping the event */
1170   if (RtsFlags.GranFlags.GranSimStats.Full)
1171     DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
1172
1173   do {
1174     rc = handleFetchRequest(node, proc, creator, tso);
1175     if (rc == OutOfHeap) {                                   /* trigger GC */
1176 # if defined(GRAN_CHECK)  && defined(GRAN)
1177      if (RtsFlags.GcFlags.giveStats)
1178        fprintf(RtsFlags.GcFlags.statsFile,"*****   veQ boSwI'  PackNearbyGraph(node %p, tso %p (%d))\n",
1179                 node, tso, tso->id);
1180 # endif
1181      barf("//// do_the_fetchnode: out of heap after handleFetchRequest; ToDo: call GarbageCollect()");
1182      prepend_event(event);
1183      GarbageCollect(GetRoots, rtsFalse); 
1184      // HWL: ToDo: check whether a ContinueThread has to be issued
1185      // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
1186 # if 0 && defined(GRAN_CHECK)  && defined(GRAN)
1187      if (RtsFlags.GcFlags.giveStats) {
1188        fprintf(RtsFlags.GcFlags.statsFile,"*****      SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
1189                 Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED);  ???
1190        fprintf(stderr,"*****      No. of packets so far: %d (total size: %d)\n", 
1191                 globalGranStats.tot_packets, globalGranStats.tot_packet_size);
1192      }
1193 # endif 
1194      event = grab_event();
1195      // Hp -= PACK_HEAP_REQUIRED; // ???
1196
1197      /* GC knows that events are special and follows the pointer i.e. */
1198      /* events are valid even if they moved. An EXIT is triggered */
1199      /* if there is not enough heap after GC. */
1200     }
1201   } while (rc == OutOfHeap);
1202 }
1203
1204 //@cindex do_the_fetchreply
1205 void 
1206 do_the_fetchreply(rtsEvent* event)
1207 {
1208   PEs proc = event->proc,       /* proc that requested node */
1209       creator = event->creator; /* proc that holds the requested node */
1210   StgTSO* tso = event->tso;
1211   StgClosure* node = event->node;  /* requested, remote node */
1212   StgClosure* closure=(StgClosure*)NULL;
1213
1214   ASSERT(CurrentProc==proc);
1215   ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
1216
1217   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
1218   /* There should be no FETCHREPLYs in GrAnSim Light setup */
1219   ASSERT(!RtsFlags.GranFlags.Light);
1220
1221   /* assign message unpack costs *before* dumping the event */
1222   CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1223   
1224   /* ToDo: check whether this is the right place for dumping the event */
1225   if (RtsFlags.GranFlags.GranSimStats.Full)
1226     DumpRawGranEvent(proc, creator, GR_REPLY, tso, node, 
1227                       tso->gran.sparkname, spark_queue_len(proc));
1228
1229   /* THIS SHOULD NEVER HAPPEN 
1230      If tso is in the BQ of node this means that it actually entered the 
1231      remote closure, due to a missing GranSimFetch at the beginning of the 
1232      entry code; therefore, this is actually a faked fetch, triggered from 
1233      within GranSimBlock; 
1234      since tso is both in the EVQ and the BQ for node, we have to take it out 
1235      of the BQ first before we can handle the FetchReply;
1236      ToDo: special cases in awakenBlockedQueue, since the BQ magically moved.
1237   */
1238   if (tso->block_info.closure!=(StgClosure*)NULL) {
1239     IF_GRAN_DEBUG(bq,
1240                   belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
1241                         tso->id, tso, node));
1242     // unlink_from_bq(tso, node);
1243   }
1244     
1245   if (RtsFlags.GranFlags.DoBulkFetching) {      /* bulk (packet) fetching */
1246     rtsPackBuffer *buffer = (rtsPackBuffer*)node;
1247     nat size = buffer->size;
1248   
1249     /* NB: Fetch misses can't occur with GUM fetching, as */
1250     /* updatable closure are turned into RBHs and therefore locked */
1251     /* for other processors that try to grab them. */
1252   
1253     closure = UnpackGraph(buffer);
1254     CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
1255   } else  // incremental fetching
1256       /* Copy or  move node to CurrentProc */
1257       if (fetchNode(node, creator, proc)) {
1258         /* Fetch has failed i.e. node has been grabbed by another PE */
1259         PEs p = where_is(node);
1260         rtsTime fetchtime;
1261      
1262         if (RtsFlags.GranFlags.GranSimStats.Global)
1263           globalGranStats.fetch_misses++;
1264
1265         IF_GRAN_DEBUG(thunkStealing,
1266                  belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
1267                        CurrentTime[proc],node,p,creator));
1268
1269         CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
1270         
1271         /* Count fetch again !? */
1272         ++(tso->gran.fetchcount);
1273         tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1274         
1275         fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
1276                     RtsFlags.GranFlags.Costs.latency;
1277         
1278         /* Chase the grabbed node */
1279         new_event(p, proc, fetchtime,
1280                   FetchNode,
1281                   tso, node, (rtsSpark*)NULL);
1282
1283 # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
1284        IF_GRAN_DEBUG(blockOnFetch,
1285                      BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
1286         
1287        IF_GRAN_DEBUG(blockOnFetch_sanity,
1288                      tso->type |= FETCH_MASK_TSO;)
1289 # endif
1290
1291         CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
1292         
1293         return; /* NB: no REPLy has been processed; tso still sleeping */
1294     }
1295
1296     /* -- Qapla'! Fetch has been successful; node is here, now  */
1297     ++(event->tso->gran.fetchcount);
1298     event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1299
1300     /* this is now done at the beginning of this routine
1301     if (RtsFlags.GranFlags.GranSimStats.Full)
1302        DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
1303                         (RtsFlags.GranFlags.DoBulkFetching ? 
1304                                closure : 
1305                                event->node),
1306                         tso->gran.sparkname, spark_queue_len(proc));
1307     */
1308
1309     ASSERT(OutstandingFetches[proc] > 0);
1310     --OutstandingFetches[proc];
1311     new_event(proc, proc, CurrentTime[proc],
1312               ResumeThread,
1313               event->tso, (RtsFlags.GranFlags.DoBulkFetching ? 
1314                            closure : 
1315                            event->node),
1316               (rtsSpark*)NULL);
1317 }
1318
1319 //@cindex do_the_movethread
1320
1321 void
1322 do_the_movethread(rtsEvent* event) {
1323   PEs proc = event->proc,       /* proc that requested node */
1324       creator = event->creator; /* proc that holds the requested node */
1325   StgTSO* tso = event->tso;
1326
1327  IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
1328
1329  ASSERT(CurrentProc==proc);
1330  /* There should be no MOVETHREADs in GrAnSim Light setup */
1331  ASSERT(!RtsFlags.GranFlags.Light);
1332  /* MOVETHREAD events should never occur without -bM */
1333  ASSERT(RtsFlags.GranFlags.DoThreadMigration);
1334  /* Bitmask of moved thread should be 0 */
1335  ASSERT(PROCS(tso)==0);
1336  ASSERT(procStatus[proc] == Fishing ||
1337         RtsFlags.GranFlags.DoAsyncFetch);
1338  ASSERT(OutstandingFishes[proc]>0);
1339
1340  /* ToDo: exact costs for unpacking the whole TSO  */
1341  CurrentTime[proc] +=  5l * RtsFlags.GranFlags.Costs.munpacktime;
1342
1343  /* ToDo: check whether this is the right place for dumping the event */
1344  if (RtsFlags.GranFlags.GranSimStats.Full)
1345    DumpRawGranEvent(proc, creator, 
1346                     GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
1347
1348  // ToDo: check cost functions
1349  --OutstandingFishes[proc];
1350  SET_GRAN_HDR(tso, ThisPE);         // adjust the bitmask for the TSO
1351  insertThread(tso, proc);
1352
1353  if (procStatus[proc]==Fishing)
1354    procStatus[proc] = Idle;
1355
1356  if (RtsFlags.GranFlags.GranSimStats.Global)
1357    globalGranStats.tot_TSOs_migrated++;
1358 }
1359
1360 //@cindex do_the_movespark
1361
1362 void
1363 do_the_movespark(rtsEvent* event) {
1364  PEs proc = event->proc,       /* proc that requested spark */
1365      creator = event->creator; /* proc that holds the requested spark */
1366  StgTSO* tso = event->tso;
1367  rtsSparkQ spark = event->spark;
1368
1369  IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
1370
1371  ASSERT(CurrentProc==proc);
1372  ASSERT(spark!=NULL);
1373  ASSERT(procStatus[proc] == Fishing ||
1374         RtsFlags.GranFlags.DoAsyncFetch);
1375  ASSERT(OutstandingFishes[proc]>0); 
1376
1377  CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1378           
1379  /* record movement of spark only if spark profiling is turned on */
1380  if (RtsFlags.GranFlags.GranSimStats.Sparks)
1381     DumpRawGranEvent(proc, creator,
1382                      SP_ACQUIRED,
1383                      tso, spark->node, spark->name, spark_queue_len(proc));
1384
1385  /* global statistics */
1386  if ( RtsFlags.GranFlags.GranSimStats.Global &&
1387       !closure_SHOULD_SPARK(spark->node))
1388    globalGranStats.withered_sparks++;
1389    /* Not adding the spark to the spark queue would be the right */
1390    /* thing here, but it also would be cheating, as this info can't be */
1391    /* available in a real system. -- HWL */
1392
1393  --OutstandingFishes[proc];
1394
1395  add_to_spark_queue(spark);
1396
1397  IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
1398                print_sparkq_stats());
1399
1400  /* Should we treat stolen sparks specially? Currently, we don't. */
1401
1402  if (procStatus[proc]==Fishing)
1403    procStatus[proc] = Idle;
1404
1405  /* add_to_spark_queue will increase the time of the current proc. */
1406  /*
1407    If proc was fishing, it is Idle now with the new spark in its spark
1408    pool. This means that the next time handleIdlePEs is called, a local
1409    FindWork will be created on this PE to turn the spark into a thread. Of
1410    course another PE might steal the spark in the meantime (that's why we
1411    are using events rather than inlining all the operations in the first
1412    place). */
1413 }
1414
1415 /*
1416   In the Constellation class version of GranSim the semantics of StarThread
1417   events has changed. Now, StartThread has to perform 3 basic operations:
1418    - create a new thread (previously this was done in ActivateSpark);
1419    - insert the thread into the run queue of the current processor
1420    - generate a new event for actually running the new thread
1421   Note that the insertThread is called via createThread. 
1422 */
1423   
1424 //@cindex do_the_startthread
1425
1426 void
1427 do_the_startthread(rtsEvent *event)
1428 {
1429   PEs proc          = event->proc;        /* proc that requested node */
1430   StgTSO *tso       = event->tso;         /* tso that requested node */
1431   StgClosure  *node = event->node;        /* requested, remote node */
1432   rtsSpark *spark   = event->spark;
1433   GranEventType gr_evttype;
1434
1435   ASSERT(CurrentProc==proc);
1436   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1437   ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
1438   /* if this was called via StartThread: */
1439   ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
1440   // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
1441   /* if this was called via ResumeThread: */
1442   ASSERT(event->evttype!=ResumeThread || 
1443            RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc)); 
1444
1445   /* startThread may have been called from the main event handler upon
1446      finding either a ResumeThread or a StartThread event; set the
1447      gr_evttype (needed for writing to .gr file) accordingly */
1448   // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
1449
1450   if ( event->evttype == StartThread ) {
1451     GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? 
1452                                  GR_START : GR_STARTQ;
1453
1454     tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
1455     pushClosure(tso, node);
1456
1457     // ToDo: fwd info on local/global spark to thread -- HWL
1458     // tso->gran.exported =  spark->exported;
1459     // tso->gran.locked =   !spark->global;
1460     tso->gran.sparkname = spark->name;
1461
1462     ASSERT(CurrentProc==proc);
1463     if (RtsFlags.GranFlags.GranSimStats.Full)
1464       DumpGranEvent(gr_evttype,tso);
1465
1466     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
1467   } else { // event->evttype == ResumeThread
1468     GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? 
1469                                  GR_RESUME : GR_RESUMEQ;
1470
1471     insertThread(tso, proc);
1472
1473     ASSERT(CurrentProc==proc);
1474     if (RtsFlags.GranFlags.GranSimStats.Full)
1475       DumpGranEvent(gr_evttype,tso);
1476   }
1477
1478   ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
1479   procStatus[proc] = Busy;
1480   /* make sure that this thread is actually run */
1481   new_event(proc, proc, 
1482             CurrentTime[proc],
1483             ContinueThread,
1484             tso, node, (rtsSpark*)NULL);
1485   
1486   /* A wee bit of statistics gathering */
1487   if (RtsFlags.GranFlags.GranSimStats.Global) {
1488     globalGranStats.tot_add_threads++;
1489     globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
1490   }
1491
1492 }
1493
1494 //@cindex do_the_findwork
1495 void
1496 do_the_findwork(rtsEvent* event) 
1497 {
1498   PEs proc = event->proc,       /* proc to search for work */
1499       creator = event->creator; /* proc that requested work */
1500   rtsSparkQ spark = event->spark;
1501   /* ToDo: check that this size is safe -- HWL */
1502 #if 0
1503  ToDo: check available heap
1504
1505   nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
1506                  // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
1507 #endif
1508
1509   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
1510
1511   /* If GUM style fishing is enabled, the contents of the spark field says
1512      what to steal (spark(1) or thread(2)); */
1513   ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
1514
1515   /* Make sure that we have enough heap for creating a new
1516      thread. This is a conservative estimate of the required heap.
1517      This eliminates special checks for GC around NewThread within
1518      ActivateSpark.                                                 */
1519
1520 #if 0
1521  ToDo: check available heap
1522
1523   if (Hp + req_heap > HpLim ) {
1524     IF_DEBUG(gc, 
1525              belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
1526       GarbageCollect(GetRoots);
1527       // ReallyPerformThreadGC(req_heap, rtsFalse);   old -- HWL
1528       Hp -= req_heap;
1529       if (procStatus[CurrentProc]==Sparking) 
1530         procStatus[CurrentProc]=Idle;
1531       return;
1532   }
1533 #endif
1534   
1535   if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1536        RtsFlags.GranFlags.Fishing ||
1537        ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
1538         (RtsFlags.GranFlags.FetchStrategy >= 2 || 
1539          OutstandingFetches[proc] == 0)) ) 
1540    {
1541     rtsBool found;
1542     rtsSparkQ  prev, spark;
1543     
1544     /* ToDo: check */
1545     ASSERT(procStatus[proc]==Sparking ||
1546            RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1547            RtsFlags.GranFlags.Fishing);
1548     
1549     /* SImmoHwI' yInej! Search spark queue! */
1550     /* gimme_spark (event, &found, &spark); */
1551     findLocalSpark(event, &found, &spark);
1552
1553     if (!found) { /* pagh vumwI' */
1554       /*
1555         If no spark has been found this can mean 2 things:
1556          1/ The FindWork was a fish (i.e. a message sent by another PE) and 
1557             the spark pool of the receiver is empty
1558             --> the fish has to be forwarded to another PE
1559          2/ The FindWork was local to this PE (i.e. no communication; in this
1560             case creator==proc) and the spark pool of the PE is not empty 
1561             contains only sparks of closures that should not be sparked 
1562             (note: if the spark pool were empty, handleIdlePEs wouldn't have 
1563             generated a FindWork in the first place)
1564             --> the PE has to be made idle to trigger stealing sparks the next
1565                 time handleIdlePEs is performed
1566       */ 
1567
1568       ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
1569       if (creator==proc) {
1570         /* local FindWork */
1571         if (procStatus[proc]==Busy) {
1572           belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
1573                 proc, CurrentTime[proc]);
1574           procStatus[proc] = Idle;
1575         }
1576       } else {
1577         /* global FindWork i.e. a Fish */
1578         ASSERT(RtsFlags.GranFlags.Fishing);
1579         /* actually this generates another request from the originating PE */
1580         ASSERT(OutstandingFishes[creator]>0);
1581         OutstandingFishes[creator]--;
1582         /* ToDo: assign costs for sending fish to proc not to creator */
1583         stealSpark(creator); /* might steal from same PE; ToDo: fix */
1584         ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
1585         /* any assertions on state of proc possible here? */
1586       }
1587     } else {
1588       /* DaH chu' Qu' yIchen! Now create new work! */ 
1589       IF_GRAN_DEBUG(findWork,
1590                     belch("+- munching spark %p; creating thread for node %p",
1591                           spark, spark->node));
1592       activateSpark (event, spark);
1593       ASSERT(spark != (rtsSpark*)NULL);
1594       spark = delete_from_sparkq (spark, proc, rtsTrue);
1595     }
1596
1597     IF_GRAN_DEBUG(findWork,
1598                   belch("+- Contents of spark queues at the end of FindWork @ %lx",
1599                         CurrentTime[proc]); 
1600                   print_sparkq_stats());
1601
1602     /* ToDo: check ; not valid if GC occurs in ActivateSpark */
1603     ASSERT(!found ||
1604             /* forward fish  or */
1605             (proc!=creator ||
1606             /* local spark  or */
1607             (proc==creator && procStatus[proc]==Starting)) || 
1608            //(!found && procStatus[proc]==Idle) ||
1609            RtsFlags.GranFlags.DoAlwaysCreateThreads); 
1610    } else {
1611     IF_GRAN_DEBUG(findWork,
1612                   belch("+- RTS refuses to findWork on PE %d @ %lx",
1613                         proc, CurrentTime[proc]);
1614                   belch("  procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d", 
1615                         proc, proc_status_names[procStatus[proc]],
1616                         RtsFlags.GranFlags.FetchStrategy, 
1617                         proc, OutstandingFetches[proc]));
1618    }  
1619 }
1620  
1621 //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
1622 //@subsection GranSimLight routines
1623
1624 /* 
1625    This code is called from the central scheduler after having rgabbed a
1626    new event and is only needed for GranSim-Light. It mainly adjusts the
1627    ActiveTSO so that all costs that have to be assigned from within the
1628    scheduler are assigned to the right TSO. The choice of ActiveTSO depends
1629    on the type of event that has been found.  
1630 */
1631
1632 void
1633 GranSimLight_enter_system(event, ActiveTSOp)
1634 rtsEvent *event;
1635 StgTSO **ActiveTSOp;
1636 {
1637   StgTSO *ActiveTSO = *ActiveTSOp;
1638
1639   ASSERT (RtsFlags.GranFlags.Light);
1640   
1641   /* Restore local clock of the virtual processor attached to CurrentTSO.
1642      All costs will be associated to the `virt. proc' on which the tso
1643      is living. */
1644   if (ActiveTSO != NULL) {                     /* already in system area */
1645     ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1646     if (RtsFlags.GranFlags.DoFairSchedule)
1647       {
1648         if (RtsFlags.GranFlags.GranSimStats.Full &&
1649             RtsFlags.GranFlags.Debug.checkLight)
1650           DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1651       }
1652   }
1653   switch (event->evttype)
1654     { 
1655     case ContinueThread: 
1656     case FindWork:       /* inaccurate this way */
1657       ActiveTSO = run_queue_hd;
1658       break;
1659     case ResumeThread:   
1660     case StartThread:
1661     case MoveSpark:      /* has tso of virt proc in tso field of event */
1662       ActiveTSO = event->tso;
1663       break;
1664     default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
1665                   event_names[event->evttype],event->evttype);
1666     }
1667   CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
1668   if (RtsFlags.GranFlags.DoFairSchedule) {
1669       if (RtsFlags.GranFlags.GranSimStats.Full &&
1670           RtsFlags.GranFlags.Debug.checkLight)
1671         DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
1672   }
1673 }
1674
1675 void
1676 GranSimLight_leave_system(event, ActiveTSOp)
1677 rtsEvent *event;
1678 StgTSO **ActiveTSOp;
1679 {
1680   StgTSO *ActiveTSO = *ActiveTSOp;
1681
1682   ASSERT(RtsFlags.GranFlags.Light);
1683
1684   /* Save time of `virt. proc' which was active since last getevent and
1685      restore time of `virt. proc' where CurrentTSO is living on. */
1686   if(RtsFlags.GranFlags.DoFairSchedule) {
1687     if (RtsFlags.GranFlags.GranSimStats.Full &&
1688         RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
1689       DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1690   }
1691   ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1692   ActiveTSO = (StgTSO*)NULL;
1693   CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
1694   if (RtsFlags.GranFlags.DoFairSchedule /* &&  resched */ ) {
1695     // resched = rtsFalse;
1696     if (RtsFlags.GranFlags.GranSimStats.Full &&
1697         RtsFlags.GranFlags.Debug.checkLight)
1698       DumpGranEvent(GR_SCHEDULE,run_queue_hd);
1699   }
1700   /* 
1701      if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
1702      (TimeOfNextEvent == 0 ||
1703      TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
1704      new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
1705      CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
1706      TimeOfNextEvent = get_time_of_next_event();
1707      }
1708   */
1709 }
1710
1711 //@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
1712 //@subsection Code for Fetching Nodes
1713
1714 /*
1715    The following GrAnSim routines simulate the fetching of nodes from a
1716    remote processor. We use a 1 word bitmask to indicate on which processor
1717    a node is lying. Thus, moving or copying a node from one processor to
1718    another just requires an appropriate change in this bitmask (using
1719    @SET_GA@).  Additionally, the clocks have to be updated.
1720
1721    A special case arises when the node that is needed by processor A has
1722    been moved from a processor B to a processor C between sending out a
1723    @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
1724    be forwarded to C. This is simulated by issuing another FetchNode event
1725    on processor C with A as creator.
1726 */
1727  
1728 /* ngoqvam che' {GrAnSim}! */
1729
1730 /* Fetch node "node" to processor "p" */
1731
1732 //@cindex fetchNode
1733
1734 rtsFetchReturnCode
1735 fetchNode(node,from,to)
1736 StgClosure* node;
1737 PEs from, to;
1738 {
1739   /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be 
1740      entered! Instead, UnpackGraph is used in ReSchedule */
1741   StgClosure* closure;
1742
1743   ASSERT(to==CurrentProc);
1744   /* Should never be entered  in GrAnSim Light setup */
1745   ASSERT(!RtsFlags.GranFlags.Light);
1746   /* fetchNode should never be entered with DoBulkFetching */
1747   ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
1748
1749   /* Now fetch the node */
1750   if (!IS_LOCAL_TO(PROCS(node),from) &&
1751       !IS_LOCAL_TO(PROCS(node),to) ) 
1752     return NodeHasMoved;
1753   
1754   if (closure_HNF(node))                /* node already in head normal form? */
1755     node->header.gran.procs |= PE_NUMBER(to);           /* Copy node */
1756   else
1757     node->header.gran.procs = PE_NUMBER(to);            /* Move node */
1758
1759   return Ok;
1760 }
1761
1762 /* 
1763    Process a fetch request. 
1764    
1765    Cost of sending a packet of size n = C + P*n
1766    where C = packet construction constant, 
1767          P = cost of packing one word into a packet
1768    [Should also account for multiple packets].
1769 */
1770
1771 //@cindex handleFetchRequest
1772
1773 rtsFetchReturnCode
1774 handleFetchRequest(node,to,from,tso)
1775 StgClosure* node;   // the node which is requested
1776 PEs to, from;       // fetch request: from -> to
1777 StgTSO* tso;        // the tso which needs the node
1778 {
1779   ASSERT(!RtsFlags.GranFlags.Light);
1780   /* ToDo: check assertion */
1781   ASSERT(OutstandingFetches[from]>0);
1782
1783   /* probably wrong place; */
1784   ASSERT(CurrentProc==to);
1785
1786   if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
1787     {                                 /* start tso */
1788       IF_GRAN_DEBUG(thunkStealing,
1789                     fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n", 
1790                             node, info_type(node), from));
1791
1792       if (RtsFlags.GranFlags.DoBulkFetching) {
1793         nat size;
1794         rtsPackBuffer *graph;
1795
1796         /* Create a 1-node-buffer and schedule a FETCHREPLY now */
1797         graph = PackOneNode(node, tso, &size); 
1798         new_event(from, to, CurrentTime[to],
1799                   FetchReply,
1800                   tso, (StgClosure *)graph, (rtsSpark*)NULL);
1801       } else {
1802         new_event(from, to, CurrentTime[to],
1803                   FetchReply,
1804                   tso, node, (rtsSpark*)NULL);
1805       }
1806       IF_GRAN_DEBUG(thunkStealing,
1807                     belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
1808       return (NodeIsLocal);
1809     }
1810   else if (IS_LOCAL_TO(PROCS(node), to) )   /* Is node still here? */
1811     {
1812       if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
1813         nat size;                              /* (code from GUM) */
1814         StgClosure* graph;
1815
1816         if (IS_BLACK_HOLE(node)) {   /* block on BH or RBH */
1817           new_event(from, to, CurrentTime[to],
1818                     GlobalBlock,
1819                     tso, node, (rtsSpark*)NULL);
1820           /* Note: blockFetch is done when handling GLOBALBLOCK event; 
1821                    make sure the TSO stays out of the run queue */
1822           /* When this thread is reawoken it does the usual: it tries to 
1823              enter the updated node and issues a fetch if it's remote.
1824              It has forgotten that it has sent a fetch already (i.e. a
1825              FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
1826           --OutstandingFetches[from];
1827
1828           IF_GRAN_DEBUG(thunkStealing,
1829                         belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ", 
1830                               node, to, from));
1831           if (RtsFlags.GranFlags.GranSimStats.Global) {
1832             globalGranStats.tot_FMBQs++;
1833           }
1834           return (NodeIsBH);
1835         }
1836
1837         /* The tso requesting the node is blocked and cannot be on a run queue */
1838         ASSERT(!is_on_queue(tso, from));
1839         
1840         // ToDo: check whether graph is ever used as an rtsPackBuffer!!
1841         if ((graph = (StgClosure *)PackNearbyGraph(node, tso, &size, 0)) == NULL) 
1842           return (OutOfHeap);  /* out of heap */
1843
1844         /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1845         /* Send a reply to the originator */
1846         /* ToDo: Replace that by software costs for doing graph packing! */
1847         CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
1848
1849         new_event(from, to,
1850                   CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1851                   FetchReply,
1852                   tso, (StgClosure *)graph, (rtsSpark*)NULL);
1853         
1854         CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1855         return (Ok);
1856       } else {                   /* incremental (single closure) fetching */
1857         /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1858         /* Send a reply to the originator */
1859         CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1860
1861         new_event(from, to,
1862                   CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1863                   FetchReply,
1864                   tso, node, (rtsSpark*)NULL);
1865       
1866         CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1867         return (Ok);
1868       }
1869     }
1870   else       /* Qu'vatlh! node has been grabbed by another proc => forward */
1871     {    
1872       PEs node_loc = where_is(node);
1873       rtsTime fetchtime;
1874
1875       IF_GRAN_DEBUG(thunkStealing,
1876                     belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
1877                           node,node_loc,to,from,CurrentTime[to]));
1878       if (RtsFlags.GranFlags.GranSimStats.Global) {
1879         globalGranStats.fetch_misses++;
1880       }
1881
1882       /* Prepare FORWARD message to proc p_new */
1883       CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1884       
1885       fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
1886                   RtsFlags.GranFlags.Costs.latency;
1887           
1888       new_event(node_loc, from, fetchtime,
1889                 FetchNode,
1890                 tso, node, (rtsSpark*)NULL);
1891
1892       CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1893
1894       return (NodeHasMoved);
1895     }
1896 }
1897
1898 /*
1899    blockFetch blocks a BlockedFetch node on some kind of black hole.
1900
1901    Taken from gum/HLComms.lc.   [find a  better  place for that ?] --  HWL  
1902
1903    {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
1904    create @FMBQ@'s (FetchMe blocking queues) to cope with global
1905    blocking. Instead, non-local TSO are put into the BQ in the same way as
1906    local TSOs. However, we have to check if a TSO is local or global in
1907    order to account for the latencies involved and for keeping track of the
1908    number of fetches that are really going on.  
1909 */
1910
1911 //@cindex blockFetch
1912
1913 rtsFetchReturnCode
1914 blockFetch(tso, proc, bh)
1915 StgTSO* tso;                        /* TSO which gets blocked */
1916 PEs proc;                           /* PE where that tso was running */
1917 StgClosure* bh;                     /* closure to block on (BH, RBH, BQ) */
1918 {
1919   StgInfoTable *info;
1920
1921   IF_GRAN_DEBUG(bq,
1922                 fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n", 
1923                 tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
1924
1925     if (!IS_BLACK_HOLE(bh)) {                      /* catches BHs and RBHs */
1926       IF_GRAN_DEBUG(bq,
1927                     fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n", 
1928                             bh, info_type(bh), tso, tso->id, proc));
1929
1930       /* No BH anymore => immediately unblock tso */
1931       new_event(proc, proc, CurrentTime[proc],
1932                 UnblockThread,
1933                 tso, bh, (rtsSpark*)NULL);
1934
1935       /* Is this always a REPLY to a FETCH in the profile ? */
1936       if (RtsFlags.GranFlags.GranSimStats.Full)
1937         DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
1938       return (NodeIsNoBH);
1939     }
1940
1941     /* DaH {BQ}Daq Qu' Suq 'e' wISov!
1942        Now we know that we have to put the tso into the BQ.
1943        2 cases: If block-on-fetch, tso is at head of threadq => 
1944                 => take it out of threadq and into BQ
1945                 If reschedule-on-fetch, tso is only pointed to be event
1946                 => just put it into BQ
1947
1948     ngoq ngo'!!
1949     if (!RtsFlags.GranFlags.DoAsyncFetch) {
1950       GranSimBlock(tso, proc, bh);
1951     } else {
1952       if (RtsFlags.GranFlags.GranSimStats.Full)
1953         DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
1954       ++(tso->gran.blockcount);
1955       tso->gran.blockedat = CurrentTime[proc];
1956     }
1957     */
1958
1959     /* after scheduling the GlobalBlock event the TSO is not put into the
1960        run queue again; it is only pointed to via the event we are
1961        processing now; in GranSim 4.xx there is no difference between
1962        synchr and asynchr comm here */
1963     ASSERT(!is_on_queue(tso, proc));
1964     ASSERT(tso->link == END_TSO_QUEUE);
1965
1966     GranSimBlock(tso, proc, bh);  /* GranSim statistics gathering */
1967
1968     /* Now, put tso into BQ (similar to blocking entry codes) */
1969     info = get_itbl(bh);
1970     switch (info -> type) {
1971       case RBH:
1972       case BLACKHOLE:
1973       case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1974       case SE_BLACKHOLE:   // ToDo: check whether this is a possibly ITBL here
1975       case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
1976         /* basically an inlined version of BLACKHOLE_entry -- HWL */
1977         /* Change the BLACKHOLE into a BLACKHOLE_BQ */
1978         ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1979         /* Put ourselves on the blocking queue for this black hole */
1980         // tso->link=END_TSO_QUEUE;   not necessary; see assertion above
1981         ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1982         tso->block_info.closure = bh;
1983         recordMutable((StgMutClosure *)bh);
1984         break;
1985
1986     case BLACKHOLE_BQ:
1987         /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
1988         tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue); 
1989         ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1990         recordMutable((StgMutClosure *)bh);
1991
1992 # if 0 && defined(GC_MUT_REQUIRED)
1993         ToDo: check whether recordMutable is necessary -- HWL
1994         /*
1995          * If we modify a black hole in the old generation, we have to make 
1996          * sure it goes on the mutables list
1997          */
1998
1999         if (bh <= StorageMgrInfo.OldLim) {
2000             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
2001             StorageMgrInfo.OldMutables = bh;
2002         } else
2003             MUT_LINK(bh) = MUT_NOT_LINKED;
2004 # endif
2005         break;
2006
2007     case FETCH_ME_BQ:
2008         barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
2009              bh, tso, tso->id);
2010
2011     default:
2012         {
2013           G_PRINT_NODE(bh);
2014           barf("Qagh: thought %p was a black hole (IP %p (%s))",
2015                   bh, info, info_type(bh));
2016         }
2017       }
2018     return (Ok);
2019 }
2020
2021
2022 //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
2023 //@subsection Idle PEs
2024
2025 /*
2026    Export work to idle PEs. This function is called from @ReSchedule@
2027    before dispatching on the current event. @HandleIdlePEs@ iterates over
2028    all PEs, trying to get work for idle PEs. Note, that this is a
2029    simplification compared to GUM's fishing model. We try to compensate for
2030    that by making the cost for stealing work dependent on the number of
2031    idle processors and thereby on the probability with which a randomly
2032    sent fish would find work.  
2033 */
2034
2035 //@cindex handleIdlePEs
2036
2037 void
2038 handleIdlePEs(void)
2039 {
2040   PEs p;
2041
2042   IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
2043
2044   /* Should never be entered in GrAnSim Light setup */
2045   ASSERT(!RtsFlags.GranFlags.Light);
2046
2047   /* Could check whether there are idle PEs if it's a cheap check */
2048   for (p = 0; p < RtsFlags.GranFlags.proc; p++) 
2049     if (procStatus[p]==Idle)  /*  && IS_SPARKING(p) && IS_STARTING(p) */
2050       /* First look for local work i.e. examine local spark pool! */
2051       if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
2052         new_event(p, p, CurrentTime[p],
2053                   FindWork,
2054                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2055         procStatus[p] = Sparking;
2056       } else if ((RtsFlags.GranFlags.maxFishes==0 ||
2057                   OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
2058
2059         /* If no local work then try to get remote work! 
2060            Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
2061         if (RtsFlags.GranFlags.DoStealThreadsFirst && 
2062             (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
2063           {
2064             if (SurplusThreads > 0l)                    /* Steal a thread */
2065               stealThread(p);
2066           
2067             if (procStatus[p]!=Idle)
2068               break;
2069           }
2070         
2071         if (SparksAvail > 0 && 
2072             (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
2073           stealSpark(p);
2074         
2075         if (SurplusThreads > 0 && 
2076             (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
2077           stealThread(p);
2078       }
2079 }
2080
2081 /*
2082    Steal a spark and schedule moving it to proc. We want to look at PEs in
2083    clock order -- most retarded first.  Currently sparks are only stolen
2084    from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
2085    this should be changed to first steal from the former then from the
2086    latter.
2087
2088    We model a sort of fishing mechanism by counting the number of sparks
2089    and threads we are currently stealing.  */
2090
2091 /* 
2092    Return a random nat value in the intervall [from, to) 
2093 */
2094 static nat 
2095 natRandom(from, to)
2096 nat from, to;
2097 {
2098   nat r, d;
2099
2100   ASSERT(from<=to);
2101   d = to - from;
2102   /* random returns a value in [0, RAND_MAX] */
2103   r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
2104   r = (r==to) ? from : r;
2105   ASSERT(from<=r && (r<to || from==to));
2106   return r;  
2107 }
2108
2109 /* 
2110    Find any PE other than proc. Used for GUM style fishing only.
2111 */
2112 static PEs 
2113 findRandomPE (proc)
2114 PEs proc;
2115 {
2116   nat p;
2117
2118   ASSERT(RtsFlags.GranFlags.Fishing);
2119   if (RtsFlags.GranFlags.RandomSteal) {
2120     p = natRandom(0,RtsFlags.GranFlags.proc);  /* full range of PEs */
2121   } else {
2122     p = 0;
2123   }
2124   IF_GRAN_DEBUG(randomSteal,
2125                 belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
2126                       p, proc));
2127     
2128   return (PEs)p;
2129 }
2130
2131 /*
2132   Magic code for stealing sparks/threads makes use of global knowledge on
2133   spark queues.  
2134 */
2135 static void
2136 sortPEsByTime (proc, pes_by_time, firstp, np) 
2137 PEs proc;
2138 PEs *pes_by_time;
2139 nat *firstp, *np;
2140 {
2141   PEs p, temp, n, i, j;
2142   nat first, upb, r=0, q=0;
2143
2144   ASSERT(!RtsFlags.GranFlags.Fishing);
2145
2146 #if 0  
2147   upb = RtsFlags.GranFlags.proc;            /* full range of PEs */
2148
2149   if (RtsFlags.GranFlags.RandomSteal) {
2150     r = natRandom(0,RtsFlags.GranFlags.proc);  /* full range of PEs */
2151   } else {
2152     r = 0;
2153   }
2154 #endif
2155
2156   /* pes_by_time shall contain processors from which we may steal sparks */ 
2157   for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
2158     if ((proc != p) &&                       // not the current proc
2159         (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
2160         (CurrentTime[p] <= CurrentTime[CurrentProc]))
2161       pes_by_time[n++] = p;
2162
2163   /* sort pes_by_time */
2164   for(i=0; i < n; ++i)
2165     for(j=i+1; j < n; ++j)
2166       if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
2167         rtsTime temp = pes_by_time[i];
2168         pes_by_time[i] = pes_by_time[j];
2169         pes_by_time[j] = temp;
2170       }
2171
2172   /* Choose random processor to steal spark from; first look at processors */
2173   /* that are earlier than the current one (i.e. proc) */
2174   for(first=0; 
2175       (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
2176       ++first)
2177     /* nothing */ ;
2178
2179   /* if the assertion below is true we can get rid of first */
2180   /* ASSERT(first==n); */
2181   /* ToDo: check if first is really needed; find cleaner solution */
2182
2183   *firstp = first;
2184   *np = n;
2185 }
2186
2187 /* 
2188    Steal a spark (piece of work) from any processor and bring it to proc.
2189 */
2190 //@cindex stealSpark
2191 static rtsBool 
2192 stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
2193
2194 /* 
2195    Steal a thread from any processor and bring it to proc i.e. thread migration
2196 */
2197 //@cindex stealThread
2198 static rtsBool 
2199 stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
2200
2201 /* 
2202    Steal a spark or a thread and schedule moving it to proc.
2203 */
2204 //@cindex stealSomething
2205 static rtsBool
2206 stealSomething(proc, steal_spark, steal_thread)
2207 PEs proc;                           // PE that needs work (stealer)
2208 rtsBool steal_spark, steal_thread;  // should a spark and/or thread be stolen
2209 {
2210   PEs p;
2211   rtsTime fish_arrival_time;
2212   rtsSpark *spark, *prev, *next;
2213   rtsBool stolen = rtsFalse;
2214
2215   ASSERT(steal_spark || steal_thread);
2216
2217   /* Should never be entered in GrAnSim Light setup */
2218   ASSERT(!RtsFlags.GranFlags.Light);
2219   ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
2220
2221   if (!RtsFlags.GranFlags.Fishing) {
2222     // ToDo: check if stealing threads is prefered over stealing sparks
2223     if (steal_spark) {
2224       if (stealSparkMagic(proc))
2225         return rtsTrue;
2226       else                             // no spark found
2227         if (steal_thread)
2228           return stealThreadMagic(proc);
2229         else                           // no thread found
2230           return rtsFalse;             
2231     } else {                           // ASSERT(steal_thread);
2232       return stealThreadMagic(proc);
2233     }
2234     barf("stealSomething: never reached");
2235   }
2236
2237   /* The rest of this function does GUM style fishing */
2238   
2239   p = findRandomPE(proc); /* find a random PE other than proc */
2240   
2241   /* Message packing costs for sending a Fish; qeq jabbI'ID */
2242   CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
2243   
2244   /* use another GranEvent for requesting a thread? */
2245   if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
2246     DumpRawGranEvent(p, proc, SP_REQUESTED,
2247                      (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
2248
2249   /* time of the fish arrival on the remote PE */
2250   fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
2251   
2252   /* Phps use an own Fish event for that? */
2253   /* The contents of the spark component is a HACK:
2254       1 means give me a spark;
2255       2 means give me a thread
2256       0 means give me nothing (this should never happen)
2257   */
2258   new_event(p, proc, fish_arrival_time,
2259             FindWork,
2260             (StgTSO*)NULL, (StgClosure*)NULL, 
2261             (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
2262   
2263   ++OutstandingFishes[proc];
2264   /* only with Async fetching? */
2265   if (procStatus[proc]==Idle)  
2266     procStatus[proc]=Fishing;
2267   
2268   /* time needed to clean up buffers etc after sending a message */
2269   CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
2270
2271   /* If GUM style fishing stealing always succeeds because it only consists
2272      of sending out a fish; of course, when the fish may return
2273      empty-handed! */
2274   return rtsTrue;
2275 }
2276
2277 /* 
2278    This version of stealing a spark makes use of the global info on all
2279    spark pools etc which is not available in a real parallel system.
2280    This could be extended to test e.g. the impact of perfect load information.
2281 */
2282 //@cindex stealSparkMagic
2283 static rtsBool
2284 stealSparkMagic(proc)
2285 PEs proc;
2286 {
2287   PEs p=0, i=0, j=0, n=0, first, upb;
2288   rtsSpark *spark=NULL, *next;
2289   PEs pes_by_time[MAX_PROC];
2290   rtsBool stolen = rtsFalse;
2291   rtsTime stealtime;
2292
2293   /* Should never be entered in GrAnSim Light setup */
2294   ASSERT(!RtsFlags.GranFlags.Light);
2295
2296   sortPEsByTime(proc, pes_by_time, &first, &n);
2297
2298   while (!stolen && n>0) {
2299     upb = (first==0) ? n : first;
2300     i = natRandom(0,upb);                /* choose a random eligible PE */
2301     p = pes_by_time[i];
2302
2303     IF_GRAN_DEBUG(randomSteal,
2304                   belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
2305                         p, proc));
2306       
2307     ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
2308
2309     /* Now go through rtsSparkQ and steal the first eligible spark */
2310     
2311     spark = pending_sparks_hds[p]; 
2312     while (!stolen && spark != (rtsSpark*)NULL)
2313       {
2314         /* NB: no prev pointer is needed here because all sparks that are not 
2315            chosen are pruned
2316         */
2317         if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
2318             spark->next==(rtsSpark*)NULL) 
2319           {
2320             /* Be social! Don't steal the only spark of an idle processor 
2321                not {spark} neH yInIH !! */
2322             break; /* next PE */
2323           } 
2324         else if (closure_SHOULD_SPARK(spark->node))
2325           {
2326             /* Don't Steal local sparks; 
2327                ToDo: optionally prefer local over global sparks
2328             if (!spark->global) {
2329               prev=spark;
2330               continue;                  next spark
2331             }
2332             */
2333             /* found a spark! */
2334
2335             /* Prepare message for sending spark */
2336             CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
2337
2338             if (RtsFlags.GranFlags.GranSimStats.Sparks)
2339               DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
2340                                (StgTSO*)NULL, spark->node,
2341                                spark->name, spark_queue_len(p));
2342
2343             stealtime = (CurrentTime[p] > CurrentTime[proc] ? 
2344                            CurrentTime[p] : 
2345                            CurrentTime[proc])
2346                         + sparkStealTime();
2347
2348             new_event(proc, p /* CurrentProc */, stealtime,
2349                       MoveSpark,
2350                       (StgTSO*)NULL, spark->node, spark);
2351             
2352             stolen = rtsTrue;
2353             ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
2354             if (procStatus[proc]==Idle)
2355               procStatus[proc] = Fishing;
2356             ++(spark->global);         /* record that this is a global spark */
2357             ASSERT(SparksAvail>0);
2358             --SparksAvail;            /* on-the-fly sparks are not available */
2359             next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
2360             CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
2361           }
2362         else   /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
2363           {
2364            IF_GRAN_DEBUG(checkSparkQ,
2365                          belch("^^ pruning spark %p (node %p) in stealSparkMagic",
2366                                spark, spark->node));
2367
2368             /* if the spark points to a node that should not be sparked,
2369                prune the spark queue at this point */
2370             if (RtsFlags.GranFlags.GranSimStats.Sparks)
2371               DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
2372                                (StgTSO*)NULL, spark->node,
2373                                spark->name, spark_queue_len(p));
2374             if (RtsFlags.GranFlags.GranSimStats.Global)
2375               globalGranStats.pruned_sparks++;
2376             
2377             ASSERT(SparksAvail>0);
2378             --SparksAvail;
2379             spark = delete_from_sparkq(spark, p, rtsTrue);
2380           }
2381         /* unlink spark (may have been freed!) from sparkq;
2382         if (prev == NULL) // spark was head of spark queue
2383           pending_sparks_hds[p] = spark->next;
2384         else  
2385           prev->next = spark->next;
2386         if (spark->next == NULL)
2387           pending_sparks_tls[p] = prev;
2388         else  
2389           next->prev = prev;
2390         */
2391       }                    /* while ...    iterating over sparkq */
2392
2393     /* ToDo: assert that PE p still has work left after stealing the spark */
2394
2395     if (!stolen && (n>0)) {  /* nothing stealable from proc p :( */
2396       ASSERT(pes_by_time[i]==p);
2397
2398       /* remove p from the list (at pos i) */
2399       for (j=i; j+1<n; j++)
2400         pes_by_time[j] = pes_by_time[j+1];
2401       n--;
2402       
2403       /* update index to first proc which is later (or equal) than proc */
2404       for ( ;
2405             (first>0) &&
2406               (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2407             first--)
2408         /* nothing */ ;
2409     } 
2410   }  /* while ... iterating over PEs in pes_by_time */
2411
2412   IF_GRAN_DEBUG(randomSteal,
2413                 if (stolen)
2414                   belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2415                        spark, spark->node, proc, p, 
2416                        SparksAvail, idlers());
2417                 else  
2418                   belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
2419                         proc, SparksAvail, idlers()));
2420
2421   if (RtsFlags.GranFlags.GranSimStats.Global &&
2422       stolen && (i!=0)) {                          /* only for statistics */
2423     globalGranStats.rs_sp_count++;
2424     globalGranStats.ntimes_total += n;
2425     globalGranStats.fl_total += first;
2426     globalGranStats.no_of_steals++;
2427   }
2428
2429   return stolen;
2430 }
2431
2432 /* 
2433    The old stealThread code, which makes use of global info and does not
2434    send out fishes.  
2435    NB: most of this is the same as in stealSparkMagic;
2436        only the pieces specific to processing thread queues are different; 
2437        long live polymorphism!  
2438 */
2439
2440 //@cindex stealThreadMagic
2441 static rtsBool
2442 stealThreadMagic(proc)
2443 PEs proc;
2444 {
2445   PEs p=0, i=0, j=0, n=0, first, upb;
2446   StgTSO *tso=END_TSO_QUEUE;
2447   PEs pes_by_time[MAX_PROC];
2448   rtsBool stolen = rtsFalse;
2449   rtsTime stealtime;
2450
2451   /* Should never be entered in GrAnSim Light setup */
2452   ASSERT(!RtsFlags.GranFlags.Light);
2453
2454   sortPEsByTime(proc, pes_by_time, &first, &n);
2455
2456   while (!stolen && n>0) {
2457     upb = (first==0) ? n : first;
2458     i = natRandom(0,upb);                /* choose a random eligible PE */
2459     p = pes_by_time[i];
2460
2461     IF_GRAN_DEBUG(randomSteal,
2462                   belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
2463                         p, proc));
2464       
2465     /* Steal the first exportable thread in the runnable queue but
2466        never steal the first in the queue for social reasons;
2467        not Qu' wa'DIch yInIH !!
2468     */
2469     /* Would be better to search through queue and have options which of
2470        the threads to pick when stealing */
2471     if (run_queue_hds[p] == END_TSO_QUEUE) {
2472       IF_GRAN_DEBUG(randomSteal,
2473                     belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)", 
2474                           p, proc));
2475     } else {
2476       tso = run_queue_hds[p]->link;  /* tso is *2nd* thread in thread queue */
2477       /* Found one */
2478       stolen = rtsTrue;
2479
2480       /* update links in queue */
2481       run_queue_hds[p]->link = tso->link;
2482       if (run_queue_tls[p] == tso)
2483         run_queue_tls[p] = run_queue_hds[p];
2484       
2485       /* ToDo: Turn magic constants into params */
2486       
2487       CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
2488       
2489       stealtime = (CurrentTime[p] > CurrentTime[proc] ? 
2490                    CurrentTime[p] : 
2491                    CurrentTime[proc])
2492         + sparkStealTime() 
2493         + 4l * RtsFlags.GranFlags.Costs.additional_latency
2494         + 5l * RtsFlags.GranFlags.Costs.munpacktime;
2495
2496       /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
2497       SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */); 
2498
2499       /* Move from one queue to another */
2500       new_event(proc, p, stealtime,
2501                 MoveThread,
2502                 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
2503
2504       /* MAKE_BUSY(proc);  not yet; only when thread is in threadq */
2505       ++OutstandingFishes[proc];
2506       if (procStatus[proc])
2507         procStatus[proc] = Fishing;
2508       --SurplusThreads;
2509
2510       if(RtsFlags.GranFlags.GranSimStats.Full)
2511         DumpRawGranEvent(p, proc, 
2512                          GR_STEALING, 
2513                          tso, (StgClosure*)NULL, (StgInt)0, 0);
2514       
2515       /* costs for tidying up buffer after having sent it */
2516       CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
2517     }
2518
2519     /* ToDo: assert that PE p still has work left after stealing the spark */
2520
2521     if (!stolen && (n>0)) {  /* nothing stealable from proc p :( */
2522       ASSERT(pes_by_time[i]==p);
2523
2524       /* remove p from the list (at pos i) */
2525       for (j=i; j+1<n; j++)
2526         pes_by_time[j] = pes_by_time[j+1];
2527       n--;
2528       
2529       /* update index to first proc which is later (or equal) than proc */
2530       for ( ;
2531             (first>0) &&
2532               (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2533             first--)
2534         /* nothing */ ;
2535     } 
2536   }  /* while ... iterating over PEs in pes_by_time */
2537
2538   IF_GRAN_DEBUG(randomSteal,
2539                 if (stolen)
2540                   belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2541                         tso->id, tso, proc, p,
2542                         SparksAvail, idlers());
2543                 else
2544                   belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
2545                         proc, SparksAvail, idlers()));
2546
2547   if (RtsFlags.GranFlags.GranSimStats.Global &&
2548       stolen && (i!=0)) { /* only for statistics */
2549     /* ToDo: more statistics on avg thread queue lenght etc */
2550     globalGranStats.rs_t_count++;
2551     globalGranStats.no_of_migrates++;
2552   }
2553
2554   return stolen;
2555 }
2556
2557 //@cindex sparkStealTime
2558 static rtsTime
2559 sparkStealTime(void)
2560 {
2561   double fishdelay, sparkdelay, latencydelay;
2562   fishdelay =  (double)RtsFlags.GranFlags.proc/2;
2563   sparkdelay = fishdelay - 
2564           ((fishdelay-1.0)/(double)(RtsFlags.GranFlags.proc-1))*((double)idlers());
2565   latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
2566
2567   return((rtsTime)latencydelay);
2568 }
2569
2570 //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
2571 //@subsection Routines directly called from Haskell world
2572 /* 
2573 The @GranSim...@ routines in here are directly called via macros from the
2574 threaded world. 
2575
2576 First some auxiliary routines.
2577 */
2578
2579 /* Take the current thread off the thread queue and thereby activate the 
2580    next thread. It's assumed that the next ReSchedule after this uses 
2581    NEW_THREAD as param. 
2582    This fct is called from GranSimBlock and GranSimFetch 
2583 */
2584
2585 //@cindex ActivateNextThread
2586
2587 void 
2588 ActivateNextThread (proc)
2589 PEs proc;
2590 {
2591   StgTSO *t;
2592   /*
2593     This routine is entered either via GranSimFetch or via GranSimBlock.
2594     It has to prepare the CurrentTSO for being blocked and update the
2595     run queue and other statistics on PE proc. The actual enqueuing to the 
2596     blocking queue (if coming from GranSimBlock) is done in the entry code 
2597     of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
2598   */
2599   /* ToDo: add assertions here!! */
2600   //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
2601
2602   // Only necessary if the running thread is at front of the queue
2603   // run_queue_hds[proc] = run_queue_hds[proc]->link;
2604   ASSERT(CurrentProc==proc);
2605   ASSERT(!is_on_queue(CurrentTSO,proc));
2606   if (run_queue_hds[proc]==END_TSO_QUEUE) {
2607     /* NB: this routine is only entered with asynchr comm (see assertion) */
2608     procStatus[proc] = Idle;
2609   } else {
2610     /* ToDo: check cost assignment */
2611     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
2612     if (RtsFlags.GranFlags.GranSimStats.Full && 
2613         (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight)) 
2614                                       /* right flag !?? ^^^ */ 
2615       DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
2616                        (StgClosure*)NULL, (StgInt)0, 0);
2617   }
2618 }
2619
2620 /* 
2621    The following GranSim fcts are stg-called from the threaded world.    
2622 */
2623
2624 /* Called from HP_CHK and friends (see StgMacros.h)  */
2625 //@cindex GranSimAllocate
2626 void 
2627 GranSimAllocate(n)
2628 StgInt n;
2629 {
2630   CurrentTSO->gran.allocs += n;
2631   ++(CurrentTSO->gran.basicblocks);
2632
2633   if (RtsFlags.GranFlags.GranSimStats.Heap) {
2634       DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
2635                        (StgClosure*)NULL, (StgInt)0, n);
2636   }
2637   
2638   CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
2639   CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
2640 }
2641
2642 /*
2643   Subtract the values added above, if a heap check fails and
2644   so has to be redone.
2645 */
2646 //@cindex GranSimUnallocate
2647 void 
2648 GranSimUnallocate(n)
2649 StgInt n;
2650 {
2651   CurrentTSO->gran.allocs -= n;
2652   --(CurrentTSO->gran.basicblocks);
2653   
2654   CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2655   CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2656 }
2657
2658 /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
2659 //@cindex GranSimExec
2660 void 
2661 GranSimExec(ariths,branches,loads,stores,floats)
2662 StgWord ariths,branches,loads,stores,floats;
2663 {
2664   StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths + 
2665             RtsFlags.GranFlags.Costs.branch_cost*branches + 
2666             RtsFlags.GranFlags.Costs.load_cost * loads +
2667             RtsFlags.GranFlags.Costs.store_cost*stores + 
2668             RtsFlags.GranFlags.Costs.float_cost*floats;
2669
2670   CurrentTSO->gran.exectime += cost;
2671   CurrentTime[CurrentProc] += cost;
2672 }
2673
2674 /* 
2675    Fetch the node if it isn't local
2676    -- result indicates whether fetch has been done.
2677
2678    This is GRIP-style single item fetching.
2679 */
2680
2681 //@cindex GranSimFetch
2682 StgInt 
2683 GranSimFetch(node /* , liveness_mask */ )
2684 StgClosure *node;
2685 /* StgInt liveness_mask; */
2686 {
2687   /* reset the return value (to be checked within STG land) */
2688   NeedToReSchedule = rtsFalse;   
2689
2690   if (RtsFlags.GranFlags.Light) {
2691      /* Always reschedule in GrAnSim-Light to prevent one TSO from
2692         running off too far 
2693      new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2694               ContinueThread,CurrentTSO,node,NULL);
2695      */
2696      return(0); 
2697   }
2698
2699   /* Faking an RBH closure:
2700      If the bitmask of the closure is 0 then this node is a fake RBH;
2701   */
2702   if (node->header.gran.procs == Nowhere) {
2703     IF_GRAN_DEBUG(bq,
2704                   belch("## Found fake RBH (node %p); delaying TSO %d (%p)", 
2705                         node, CurrentTSO->id, CurrentTSO));
2706                   
2707     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
2708               ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
2709
2710     /* Rescheduling (GranSim internal) is necessary */
2711     NeedToReSchedule = rtsTrue;
2712     
2713     return(1); 
2714   }
2715
2716   /* Note: once a node has been fetched, this test will be passed */
2717   if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
2718     {
2719       PEs p = where_is(node);
2720       rtsTime fetchtime;
2721       
2722       IF_GRAN_DEBUG(thunkStealing,
2723                     if (p==CurrentProc) 
2724                       belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
2725       
2726       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2727       /* NB: Fetch is counted on arrival (FetchReply) */
2728       
2729       fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
2730         RtsFlags.GranFlags.Costs.latency;
2731       
2732       new_event(p, CurrentProc, fetchtime,
2733                 FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
2734       
2735       if (fetchtime<TimeOfNextEvent)
2736         TimeOfNextEvent = fetchtime;
2737       
2738       /* About to block */
2739       CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
2740       
2741       ++OutstandingFetches[CurrentProc];
2742       
2743       if (RtsFlags.GranFlags.DoAsyncFetch) 
2744         /* if asynchr comm is turned on, activate the next thread in the q */
2745         ActivateNextThread(CurrentProc);
2746       else
2747         procStatus[CurrentProc] = Fetching;
2748
2749 #if 0 
2750       /* ToDo: nuke the entire if (anything special for fair schedule?) */
2751       if (RtsFlags.GranFlags.DoAsyncFetch) 
2752         {
2753           /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
2754           if(!RtsFlags.GranFlags.DoFairSchedule)
2755             {
2756               /* now done in do_the_fetchnode 
2757               if (RtsFlags.GranFlags.GranSimStats.Full)
2758                 DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
2759                                  node, (StgInt)0, 0);
2760               */                                
2761               ActivateNextThread(CurrentProc);
2762               
2763 # if 0 && defined(GRAN_CHECK)
2764               if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
2765                 if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
2766                   fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
2767                           CurrentTSO,CurrentTime[CurrentProc]);
2768                   stg_exit(EXIT_FAILURE);
2769                 } else {
2770                   TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
2771                 }
2772               }
2773 # endif
2774               CurrentTSO->link = END_TSO_QUEUE;
2775               /* CurrentTSO = END_TSO_QUEUE; */
2776               
2777               /* CurrentTSO is pointed to by the FetchNode event; it is
2778                  on no run queue any more */
2779           } else {  /* fair scheduling currently not supported -- HWL */
2780             barf("Asynchr communication is not yet compatible with fair scheduling\n");
2781           }
2782         } else {                /* !RtsFlags.GranFlags.DoAsyncFetch */
2783           procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
2784           /* now done in do_the_fetchnode 
2785           if (RtsFlags.GranFlags.GranSimStats.Full)
2786             DumpRawGranEvent(CurrentProc, p,
2787                              GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
2788           */
2789           IF_GRAN_DEBUG(blockOnFetch, 
2790                         BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
2791         }
2792 #endif /* 0 */
2793
2794       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2795       
2796       /* Rescheduling (GranSim internal) is necessary */
2797       NeedToReSchedule = rtsTrue;
2798       
2799       return(1); 
2800     }
2801   return(0);
2802 }
2803
2804 //@cindex GranSimSpark
2805 void 
2806 GranSimSpark(local,node)
2807 StgInt local;
2808 StgClosure *node;
2809 {
2810   /* ++SparksAvail;  Nope; do that in add_to_spark_queue */
2811   if (RtsFlags.GranFlags.GranSimStats.Sparks)
2812     DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
2813                      END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
2814
2815   /* Force the PE to take notice of the spark */
2816   if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2817     new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2818               FindWork,
2819               END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
2820     if (CurrentTime[CurrentProc]<TimeOfNextEvent)
2821       TimeOfNextEvent = CurrentTime[CurrentProc];
2822   }
2823
2824   if(local)
2825     ++CurrentTSO->gran.localsparks;
2826   else
2827     ++CurrentTSO->gran.globalsparks;
2828 }
2829
2830 //@cindex GranSimSparkAt
2831 void 
2832 GranSimSparkAt(spark,where,identifier)
2833 rtsSpark *spark;
2834 StgClosure *where;    /* This should be a node; alternatively could be a GA */
2835 StgInt identifier;
2836 {
2837   PEs p = where_is(where);
2838   GranSimSparkAtAbs(spark,p,identifier);
2839 }
2840
2841 //@cindex GranSimSparkAtAbs
2842 void 
2843 GranSimSparkAtAbs(spark,proc,identifier)
2844 rtsSpark *spark;
2845 PEs proc;        
2846 StgInt identifier;
2847 {
2848   rtsTime exporttime;
2849
2850   if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
2851     return;                          /* turned a spark into a NULL. */
2852
2853   /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2854   if(RtsFlags.GranFlags.GranSimStats.Sparks)
2855     DumpRawGranEvent(proc,0,SP_SPARKAT,
2856                      END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
2857
2858   if (proc!=CurrentProc) {
2859     CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2860     exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]? 
2861                   CurrentTime[proc]: CurrentTime[CurrentProc])
2862                  + RtsFlags.GranFlags.Costs.latency;
2863   } else {
2864     exporttime = CurrentTime[CurrentProc];
2865   }
2866
2867   if ( RtsFlags.GranFlags.Light )
2868     /* Need CurrentTSO in event field to associate costs with creating
2869        spark even in a GrAnSim Light setup */
2870     new_event(proc, CurrentProc, exporttime,
2871               MoveSpark,
2872               CurrentTSO, spark->node, spark);
2873   else
2874     new_event(proc, CurrentProc, exporttime,
2875               MoveSpark, (StgTSO*)NULL, spark->node, spark);
2876   /* Bit of a hack to treat placed sparks the same as stolen sparks */
2877   ++OutstandingFishes[proc];
2878
2879   /* Force the PE to take notice of the spark (FINDWORK is put after a
2880      MoveSpark into the sparkq!) */
2881   if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2882     new_event(CurrentProc,CurrentProc,exporttime+1,
2883               FindWork,
2884               (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2885   }
2886
2887   if (exporttime<TimeOfNextEvent)
2888     TimeOfNextEvent = exporttime;
2889
2890   if (proc!=CurrentProc) {
2891     CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2892     ++CurrentTSO->gran.globalsparks;
2893   } else { 
2894     ++CurrentTSO->gran.localsparks;
2895   }
2896 }
2897
2898 /* 
2899    This function handles local and global blocking.  It's called either
2900    from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
2901    trying to fetch an BH or RBH 
2902 */
2903
2904 //@cindex GranSimBlock
2905 void 
2906 GranSimBlock(tso, proc, node)
2907 StgTSO *tso;
2908 PEs proc;
2909 StgClosure *node;
2910 {
2911   PEs node_proc = where_is(node), 
2912       tso_proc = where_is((StgClosure *)tso);
2913
2914   ASSERT(tso_proc==CurrentProc);
2915   // ASSERT(node_proc==CurrentProc);
2916   IF_GRAN_DEBUG(bq,
2917                 if (node_proc!=CurrentProc) 
2918                   belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
2919                         tso->id, tso, tso_proc, node, node_proc)); 
2920   ASSERT(tso->link==END_TSO_QUEUE);
2921   ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
2922   //ASSERT(tso==run_queue_hds[proc]);
2923
2924   IF_DEBUG(gran,
2925            belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
2926                  tso->id, tso, proc, node, CurrentTime[proc]));
2927
2928
2929     /* THIS SHOULD NEVER HAPPEN!
2930        If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
2931        we have missed a GranSimFetch before entering this closure;
2932        we hack around it for now, faking a FetchNode; 
2933        because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
2934        tso will be blocked on this closure until the FetchReply occurs.
2935
2936        ngoq Dogh! 
2937
2938     if (node_proc!=CurrentProc) {
2939       StgInt ret;
2940       ret = GranSimFetch(node);
2941       IF_GRAN_DEBUG(bq,
2942                     if (ret)
2943                       belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
2944                             node, node_proc, CurrentProc););
2945       return;
2946     }
2947     */
2948
2949   if (RtsFlags.GranFlags.GranSimStats.Full)
2950     DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
2951
2952   ++(tso->gran.blockcount);
2953   /* Distinction  between local and global block is made in blockFetch */
2954   tso->gran.blockedat = CurrentTime[proc];
2955
2956   CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
2957   ActivateNextThread(proc);
2958   /* tso->link = END_TSO_QUEUE;    not really necessary; only for testing */
2959 }
2960
2961 #endif /* GRAN */
2962
2963 //@node Index,  , Dumping routines, GranSim specific code
2964 //@subsection Index
2965
2966 //@index
2967 //* ActivateNextThread::  @cindex\s-+ActivateNextThread
2968 //* CurrentProc::  @cindex\s-+CurrentProc
2969 //* CurrentTime::  @cindex\s-+CurrentTime
2970 //* GranSimAllocate::  @cindex\s-+GranSimAllocate
2971 //* GranSimBlock::  @cindex\s-+GranSimBlock
2972 //* GranSimExec::  @cindex\s-+GranSimExec
2973 //* GranSimFetch::  @cindex\s-+GranSimFetch
2974 //* GranSimLight_insertThread::  @cindex\s-+GranSimLight_insertThread
2975 //* GranSimSpark::  @cindex\s-+GranSimSpark
2976 //* GranSimSparkAt::  @cindex\s-+GranSimSparkAt
2977 //* GranSimSparkAtAbs::  @cindex\s-+GranSimSparkAtAbs
2978 //* GranSimUnallocate::  @cindex\s-+GranSimUnallocate
2979 //* any_idle::  @cindex\s-+any_idle
2980 //* blockFetch::  @cindex\s-+blockFetch
2981 //* do_the_fetchnode::  @cindex\s-+do_the_fetchnode
2982 //* do_the_fetchreply::  @cindex\s-+do_the_fetchreply
2983 //* do_the_findwork::  @cindex\s-+do_the_findwork
2984 //* do_the_globalblock::  @cindex\s-+do_the_globalblock
2985 //* do_the_movespark::  @cindex\s-+do_the_movespark
2986 //* do_the_movethread::  @cindex\s-+do_the_movethread
2987 //* do_the_startthread::  @cindex\s-+do_the_startthread
2988 //* do_the_unblock::  @cindex\s-+do_the_unblock
2989 //* fetchNode::  @cindex\s-+fetchNode
2990 //* ga_to_proc::  @cindex\s-+ga_to_proc
2991 //* get_next_event::  @cindex\s-+get_next_event
2992 //* get_time_of_next_event::  @cindex\s-+get_time_of_next_event
2993 //* grab_event::  @cindex\s-+grab_event
2994 //* handleFetchRequest::  @cindex\s-+handleFetchRequest
2995 //* handleIdlePEs::  @cindex\s-+handleIdlePEs
2996 //* idlers::  @cindex\s-+idlers
2997 //* insertThread::  @cindex\s-+insertThread
2998 //* insert_event::  @cindex\s-+insert_event
2999 //* is_on_queue::  @cindex\s-+is_on_queue
3000 //* is_unique::  @cindex\s-+is_unique
3001 //* new_event::  @cindex\s-+new_event
3002 //* prepend_event::  @cindex\s-+prepend_event
3003 //* print_event::  @cindex\s-+print_event
3004 //* print_eventq::  @cindex\s-+print_eventq
3005 //* prune_eventq ::  @cindex\s-+prune_eventq 
3006 //* spark queue::  @cindex\s-+spark queue
3007 //* sparkStealTime::  @cindex\s-+sparkStealTime
3008 //* stealSomething::  @cindex\s-+stealSomething
3009 //* stealSpark::  @cindex\s-+stealSpark
3010 //* stealSparkMagic::  @cindex\s-+stealSparkMagic
3011 //* stealThread::  @cindex\s-+stealThread
3012 //* stealThreadMagic::  @cindex\s-+stealThreadMagic
3013 //* thread_queue_len::  @cindex\s-+thread_queue_len
3014 //* traverse_eventq_for_gc::  @cindex\s-+traverse_eventq_for_gc
3015 //* where_is::  @cindex\s-+where_is
3016 //@end index