[project @ 2000-01-14 14:56:40 by simonmar]
[ghc-hetmet.git] / ghc / rts / parallel / GranSim.c
1 /* 
2    Time-stamp: <Sat Dec 11 1999 17:25:27 Stardate: [-30]4033.42 software>
3    $Id: GranSim.c,v 1.2 2000/01/13 14:34:06 hwloidl Exp $
4
5    Variables and functions specific to GranSim the parallelism simulator
6    for GPH.
7 */
8
9 //@node GranSim specific code, , ,
10 //@section GranSim specific code
11
12 /*
13    Macros for dealing with the new and improved GA field for simulating
14    parallel execution. Based on @CONCURRENT@ package. The GA field now
15    contains a mask, where the n-th bit stands for the n-th processor, where
16    this data can be found. In case of multiple copies, several bits are
17    set. The total number of processors is bounded by @MAX_PROC@, which
18    should be <= the length of a word in bits.  -- HWL 
19 */
20
21 //@menu
22 //* Includes::                  
23 //* Prototypes and externs::    
24 //* Constants and Variables::   
25 //* Initialisation::            
26 //* Global Address Operations::  
27 //* Global Event Queue::        
28 //* Spark queue functions::     
29 //* Scheduling functions::      
30 //* Thread Queue routines::     
31 //* GranSim functions::         
32 //* GranSimLight routines::     
33 //* Code for Fetching Nodes::   
34 //* Idle PEs::                  
35 //* Routines directly called from Haskell world::  
36 //* Emiting profiling info for GrAnSim::  
37 //* Dumping routines::          
38 //* Index::                     
39 //@end menu
40
41 //@node Includes, Prototypes and externs, GranSim specific code, GranSim specific code
42 //@subsection Includes
43
44 #include "Rts.h"
45 #include "RtsFlags.h"
46 #include "RtsUtils.h"
47 #include "StgMiscClosures.h"
48 #include "StgTypes.h"
49 #include "Schedule.h"
50 #include "SchedAPI.h"       // for pushClosure
51 #include "GC.h"
52 #include "GranSimRts.h"
53 #include "GranSim.h"
54 #include "ParallelRts.h"
55 #include "ParallelDebug.h"
56 #include "Storage.h"       // for recordMutable
57
58
59 //@node Prototypes and externs, Constants and Variables, Includes, GranSim specific code
60 //@subsection Prototypes and externs
61
62 #if defined(GRAN)
63
64 /* Prototypes */
65 static inline PEs      ga_to_proc(StgWord);
66 static inline rtsBool  any_idle(void);
67 static inline nat      idlers(void);
68        PEs             where_is(StgClosure *node);
69
70 static rtsBool         stealSomething(PEs proc, rtsBool steal_spark, rtsBool steal_thread);
71 static inline rtsBool  stealSpark(PEs proc);
72 static inline rtsBool  stealThread(PEs proc);
73 static rtsBool         stealSparkMagic(PEs proc);
74 static rtsBool         stealThreadMagic(PEs proc);
75 /* subsumed by stealSomething
76 static void            stealThread(PEs proc); 
77 static void            stealSpark(PEs proc);
78 */
79 static rtsTime         sparkStealTime(void);
80 static nat             natRandom(nat from, nat to);
81 static PEs             findRandomPE(PEs proc);
82 static void            sortPEsByTime (PEs proc, PEs *pes_by_time, 
83                                       nat *firstp, nat *np);
84
85 void GetRoots(void);
86
87 #endif /* GRAN */
88
89 //@node Constants and Variables, Initialisation, Prototypes and externs, GranSim specific code
90 //@subsection Constants and Variables
91
92 #if defined(GRAN) || defined(PAR)
93 /* See GranSim.h for the definition of the enum gran_event_types */
94 char *gran_event_names[] = {
95     "START", "START(Q)",
96     "STEALING", "STOLEN", "STOLEN(Q)",
97     "FETCH", "REPLY", "BLOCK", "RESUME", "RESUME(Q)",
98     "SCHEDULE", "DESCHEDULE",
99     "END",
100     "SPARK", "SPARKAT", "USED", "PRUNED", "EXPORTED", "ACQUIRED",
101     "ALLOC",
102     "TERMINATE",
103     "SYSTEM_START", "SYSTEM_END",           /* only for debugging */
104     "??"
105 };
106 #endif
107
108 #if defined(GRAN)                                              /* whole file */
109 char *proc_status_names[] = {
110   "Idle", "Sparking", "Starting", "Fetching", "Fishing", "Busy", 
111   "UnknownProcStatus"
112 };
113
114 /* For internal use (event statistics) only */
115 char *event_names[] =
116     { "ContinueThread", "StartThread", "ResumeThread", 
117       "MoveSpark", "MoveThread", "FindWork",
118       "FetchNode", "FetchReply",
119       "GlobalBlock", "UnblockThread"
120     };
121
122 //@cindex CurrentProc
123 PEs CurrentProc = 0;
124
125 /*
126   ToDo: Create a structure for the processor status and put all the 
127         arrays below into it. 
128   -- HWL */
129
130 //@cindex CurrentTime
131 /* One clock for each PE */
132 rtsTime CurrentTime[MAX_PROC];  
133
134 /* Useful to restrict communication; cf fishing model in GUM */
135 nat OutstandingFetches[MAX_PROC], OutstandingFishes[MAX_PROC];
136
137 /* Status of each PE (new since but independent of GranSim Light) */
138 rtsProcStatus procStatus[MAX_PROC];
139
140 # if defined(GRAN) && defined(GRAN_CHECK)
141 /* To check if the RTS ever tries to run a thread that should be blocked
142    because of fetching remote data */
143 StgTSO *BlockedOnFetch[MAX_PROC];
144 # define FETCH_MASK_TSO  0x08000000      /* only bits 0, 1, 2 should be used */
145 # endif
146
147 nat SparksAvail = 0;     /* How many sparks are available */
148 nat SurplusThreads = 0;  /* How many excess threads are there */
149
150 /* Do we need to reschedule following a fetch? */
151 rtsBool NeedToReSchedule = rtsFalse, IgnoreEvents = rtsFalse, IgnoreYields = rtsFalse; 
152 rtsTime TimeOfNextEvent, TimeOfLastEvent, EndOfTimeSlice; /* checked from the threaded world! */
153
154 //@cindex spark queue
155 /* GranSim: a globally visible array of spark queues */
156 rtsSparkQ pending_sparks_hds[MAX_PROC];
157 rtsSparkQ pending_sparks_tls[MAX_PROC];
158
159 nat sparksIgnored = 0, sparksCreated = 0;
160
161 GlobalGranStats globalGranStats;
162
163 nat gran_arith_cost, gran_branch_cost, gran_load_cost, 
164     gran_store_cost, gran_float_cost;
165
166 /*
167 Old comment from 0.29. ToDo: Check and update -- HWL
168
169 The following variables control the behaviour of GrAnSim. In general, there
170 is one RTS option for enabling each of these features. In getting the
171 desired setup of GranSim the following questions have to be answered:
172 \begin{itemize}
173 \item {\em Which scheduling algorithm} to use (@RtsFlags.GranFlags.DoFairSchedule@)? 
174       Currently only unfair scheduling is supported.
175 \item What to do when remote data is fetched (@RtsFlags.GranFlags.DoAsyncFetch@)? 
176       Either block and wait for the
177       data or reschedule and do some other work.
178       Thus, if this variable is true, asynchronous communication is
179       modelled. Block on fetch mainly makes sense for incremental fetching.
180
181       There is also a simplified fetch variant available
182       (@RtsFlags.GranFlags.SimplifiedFetch@). This variant does not use events to model
183       communication. It is faster but the results will be less accurate.
184 \item How aggressive to be in getting work after a reschedule on fetch
185       (@RtsFlags.GranFlags.FetchStrategy@)?
186       This is determined by the so-called {\em fetching
187       strategy\/}. Currently, there are four possibilities:
188       \begin{enumerate}
189        \item Only run a runnable thread.
190        \item Turn a spark into a thread, if necessary.
191        \item Steal a remote spark, if necessary.
192        \item Steal a runnable thread from another processor, if necessary.
193       \end{itemize}
194       The variable @RtsFlags.GranFlags.FetchStrategy@ determines how far to go in this list
195       when rescheduling on a fetch.
196 \item Should sparks or threads be stolen first when looking for work
197       (@RtsFlags.GranFlags.DoStealThreadsFirst@)? 
198       The default is to steal sparks first (much cheaper).
199 \item Should the RTS use a lazy thread creation scheme
200       (@RtsFlags.GranFlags.DoAlwaysCreateThreads@)?  By default yes i.e.\ sparks are only
201       turned into threads when work is needed. Also note, that sparks
202       can be discarded by the RTS (this is done in the case of an overflow
203       of the spark pool). Setting @RtsFlags.GranFlags.DoAlwaysCreateThreads@  to @True@ forces
204       the creation of threads at the next possibility (i.e.\ when new work
205       is demanded the next time).
206 \item Should data be fetched closure-by-closure or in packets
207       (@RtsFlags.GranFlags.DoBulkFetching@)? The default strategy is a GRIP-like incremental 
208       (i.e.\ closure-by-closure) strategy. This makes sense in a
209       low-latency setting but is bad in a high-latency system. Setting 
210       @RtsFlags.GranFlags.DoBulkFetching@ to @True@ enables bulk (packet) fetching. Other
211       parameters determine the size of the packets (@pack_buffer_size@) and the number of
212       thunks that should be put into one packet (@RtsFlags.GranFlags.ThunksToPack@).
213 \item If there is no other possibility to find work, should runnable threads
214       be moved to an idle processor (@RtsFlags.GranFlags.DoThreadMigration@)? In any case, the
215       RTS tried to get sparks (either local or remote ones) first. Thread
216       migration is very expensive, since a whole TSO has to be transferred
217       and probably data locality becomes worse in the process. Note, that
218       the closure, which will be evaluated next by that TSO is not
219       transferred together with the TSO (that might block another thread).
220 \item Should the RTS distinguish between sparks created by local nodes and
221       stolen sparks (@RtsFlags.GranFlags.PreferSparksOfLocalNodes@)?  The idea is to improve 
222       data locality by preferring sparks of local nodes (it is more likely
223       that the data for those sparks is already on the local processor). 
224       However, such a distinction also imposes an overhead on the spark
225       queue management, and typically a large number of sparks are
226       generated during execution. By default this variable is set to @False@.
227 \item Should the RTS use granularity control mechanisms? The idea of a 
228       granularity control mechanism is to make use of granularity
229       information provided via annotation of the @par@ construct in order
230       to prefer bigger threads when either turning a spark into a thread or
231       when choosing the next thread to schedule. Currently, three such
232       mechanisms are implemented:
233       \begin{itemize}
234         \item Cut-off: The granularity information is interpreted as a
235               priority. If a threshold priority is given to the RTS, then
236               only those sparks with a higher priority than the threshold 
237               are actually created. Other sparks are immediately discarded.
238               This is similar to a usual cut-off mechanism often used in 
239               parallel programs, where parallelism is only created if the 
240               input data is lage enough. With this option, the choice is 
241               hidden in the RTS and only the threshold value has to be 
242               provided as a parameter to the runtime system.
243         \item Priority Sparking: This mechanism keeps priorities for sparks
244               and chooses the spark with the highest priority when turning
245               a spark into a thread. After that the priority information is
246               discarded. The overhead of this mechanism comes from
247               maintaining a sorted spark queue.
248         \item Priority Scheduling: This mechanism keeps the granularity
249               information for threads, to. Thus, on each reschedule the 
250               largest thread is chosen. This mechanism has a higher
251               overhead, as the thread queue is sorted, too.
252        \end{itemize}  
253 \end{itemize}
254 */
255
256 //@node Initialisation, Global Address Operations, Constants and Variables, GranSim specific code
257 //@subsection Initialisation
258
259 void 
260 init_gr_stats (void) {
261   memset(&globalGranStats, '\0', sizeof(GlobalGranStats));
262 #if 0
263   /* event stats */
264   globalGranStats.noOfEvents = 0;
265   for (i=0; i<MAX_EVENT; i++) globalGranStats.event_counts[i]=0;
266
267   /* communication stats */
268   globalGranStats.fetch_misses = 0;
269   globalGranStats.tot_low_pri_sparks = 0;
270
271   /* obscure stats */  
272   globalGranStats.rs_sp_count = 0;
273   globalGranStats.rs_t_count = 0;
274   globalGranStats.ntimes_total = 0, 
275   globalGranStats.fl_total = 0;
276   globalGranStats.no_of_steals = 0;
277
278   /* spark queue stats */
279   globalGranStats.tot_sq_len = 0, 
280   globalGranStats.tot_sq_probes = 0; 
281   globalGranStats.tot_sparks = 0;
282   globalGranStats.withered_sparks = 0;
283   globalGranStats.tot_add_threads = 0;
284   globalGranStats.tot_tq_len = 0;
285   globalGranStats.non_end_add_threads = 0;
286
287   /* thread stats */
288   globalGranStats.tot_threads_created = 0;
289   for (i=0; i<MAX_PROC; i++) globalGranStats.threads_created_on_PE[i]=0;
290 #endif /* 0 */
291 }
292
293 //@node Global Address Operations, Global Event Queue, Initialisation, GranSim specific code
294 //@subsection Global Address Operations
295 /*
296   ----------------------------------------------------------------------
297   Global Address Operations
298
299   These functions perform operations on the global-address (ga) part of a
300   closure. The ga is the only new field (1 word) in a closure introduced by
301   GrAnSim. It serves as a bitmask, indicating on which processor the
302   closure is residing. Since threads are described by Thread State Object
303   (TSO), which is nothing but another kind of closure, this scheme allows
304   gives placement information about threads.
305
306   A ga is just a bitmask, so the operations on them are mainly bitmask
307   manipulating functions. Note, that there are important macros like PROCS,
308   IS_LOCAL_TO etc. They are defined in @GrAnSim.lh@.
309
310   NOTE: In GrAnSim-light we don't maintain placement information. This
311   allows to simulate an arbitrary number of processors. The price we have
312   to be is the lack of costing any communication properly. In short,
313   GrAnSim-light is meant to reveal the maximal parallelism in a program.
314   From an implementation point of view the important thing is: {\em
315   GrAnSim-light does not maintain global-addresses}.  */
316
317 /* ga_to_proc returns the first processor marked in the bitmask ga.
318    Normally only one bit in ga should be set. But for PLCs all bits
319    are set. That shouldn't hurt since we only need IS_LOCAL_TO for PLCs */
320  
321 //@cindex ga_to_proc
322
323 static inline PEs
324 ga_to_proc(StgWord ga)
325 {
326     PEs i;
327     for (i = 0; i < RtsFlags.GranFlags.proc && !IS_LOCAL_TO(ga, i); i++);
328     ASSERT(0<=i && i<RtsFlags.GranFlags.proc);
329     return (i);
330 }
331
332 /* NB: This takes a *node* rather than just a ga as input */
333 //@cindex where_is
334 PEs
335 where_is(StgClosure *node)
336 { return (ga_to_proc(PROCS(node))); }
337
338 // debugging only
339 //@cindex is_unique
340 rtsBool
341 is_unique(StgClosure *node)
342
343   PEs i;
344   rtsBool unique = rtsFalse;
345
346   for (i = 0; i < RtsFlags.GranFlags.proc ; i++)
347     if (IS_LOCAL_TO(PROCS(node), i))
348       if (unique)          // exactly 1 instance found so far
349         return rtsFalse;   // found a 2nd instance => not unique
350       else 
351         unique = rtsTrue;  // found 1st instance 
352   ASSERT(unique);          // otherwise returned from within loop
353   return (unique);
354 }
355
356 //@cindex any_idle
357 static inline rtsBool
358 any_idle(void) { /* any (map (\ i -> procStatus[i] == Idle)) [0,..,MAX_PROC] */
359  PEs i; 
360  rtsBool any_idle; 
361  for(i=0, any_idle=rtsFalse; 
362      !any_idle && i<RtsFlags.GranFlags.proc; 
363      any_idle = any_idle || procStatus[i] == Idle, i++) 
364  {} ;
365 }
366
367 //@cindex idlers
368 static inline nat
369 idlers(void) {  /* number of idle PEs */
370  PEs i, j; 
371  for(i=0, j=0;
372      i<RtsFlags.GranFlags.proc; 
373      j += (procStatus[i] == Idle) ? 1 : 0, i++) 
374  {} ;
375  return j;
376 }
377
378 //@node Global Event Queue, Spark queue functions, Global Address Operations, GranSim specific code
379 //@subsection Global Event Queue
380 /*
381 The following routines implement an ADT of an event-queue (FIFO). 
382 ToDo: Put that in an own file(?)
383 */
384
385 /* Pointer to the global event queue; events are currently malloc'ed */
386 rtsEventQ EventHd = NULL;
387
388 //@cindex get_next_event
389 rtsEvent *
390 get_next_event(void)
391 {
392   static rtsEventQ entry = NULL;
393
394   if (EventHd == NULL) {
395     barf("No next event. This may be caused by a circular data dependency in the program.");
396   }
397
398   if (entry != NULL)
399     free((char *)entry);
400
401   if (RtsFlags.GranFlags.GranSimStats.Global) {     /* count events */
402     globalGranStats.noOfEvents++;
403     globalGranStats.event_counts[EventHd->evttype]++;
404   }
405
406   entry = EventHd;
407
408   IF_GRAN_DEBUG(event_trace,
409            print_event(entry));
410
411   EventHd = EventHd->next;
412   return(entry);
413 }
414
415 /* When getting the time of the next event we ignore CONTINUETHREAD events:
416    we don't want to be interrupted before the end of the current time slice
417    unless there is something important to handle. 
418 */
419 //@cindex get_time_of_next_event
420 rtsTime
421 get_time_of_next_event(void)
422
423   rtsEventQ event = EventHd;
424
425   while (event != NULL && event->evttype==ContinueThread) {
426     event = event->next;
427   }
428   if(event == NULL)
429       return ((rtsTime) 0);
430   else
431       return (event->time);
432 }
433
434 /* ToDo: replace malloc/free with a free list */
435 //@cindex insert_event
436 void
437 insert_event(newentry)
438 rtsEvent *newentry;
439 {
440   rtsEventType evttype = newentry->evttype;
441   rtsEvent *event, **prev;
442
443   /* if(evttype >= CONTINUETHREAD1) evttype = CONTINUETHREAD; */
444
445   /* Search the queue and insert at the right point:
446      FINDWORK before everything, CONTINUETHREAD after everything.
447
448      This ensures that we find any available work after all threads have
449      executed the current cycle.  This level of detail would normally be
450      irrelevant, but matters for ridiculously low latencies...
451   */
452
453   /* Changed the ordering: Now FINDWORK comes after everything but 
454      CONTINUETHREAD. This makes sure that a MOVESPARK comes before a 
455      FINDWORK. This is important when a GranSimSparkAt happens and
456      DoAlwaysCreateThreads is turned on. Also important if a GC occurs
457      when trying to build a new thread (see much_spark)  -- HWL 02/96  */
458
459   if(EventHd == NULL)
460     EventHd = newentry;
461   else {
462     for (event = EventHd, prev=(rtsEvent**)&EventHd; 
463          event != NULL; 
464          prev = (rtsEvent**)&(event->next), event = event->next) {
465       switch (evttype) {
466         case FindWork: if ( event->time < newentry->time ||
467                             ( (event->time == newentry->time) &&
468                               (event->evttype != ContinueThread) ) )
469                          continue;
470                        else
471                          break;
472         case ContinueThread: if ( event->time <= newentry->time )
473                                continue;
474                              else
475                                break;
476         default: if ( event->time < newentry->time || 
477                       ((event->time == newentry->time) &&
478                        (event->evttype == newentry->evttype)) )
479                    continue;
480                  else
481                    break;
482        }
483        /* Insert newentry here (i.e. before event) */
484        *prev = newentry;
485        newentry->next = event;
486        break;
487     }
488     if (event == NULL)
489       *prev = newentry;
490   }
491 }
492
493 //@cindex new_event
494 void
495 new_event(proc,creator,time,evttype,tso,node,spark)
496 PEs proc, creator;
497 rtsTime time;
498 rtsEventType evttype;
499 StgTSO *tso;
500 StgClosure *node;
501 rtsSpark *spark;
502 {
503   rtsEvent *newentry = (rtsEvent *) stgMallocBytes(sizeof(rtsEvent), "new_event");
504
505   newentry->proc     = proc;
506   newentry->creator  = creator;
507   newentry->time     = time;
508   newentry->evttype  = evttype;
509   newentry->tso      = tso;
510   newentry->node     = node;
511   newentry->spark    = spark;
512   newentry->gc_info  = 0;
513   newentry->next     = NULL;
514
515   insert_event(newentry);
516
517   IF_DEBUG(gran, 
518            fprintf(stderr, "GRAN: new_event: \n"); 
519            print_event(newentry))
520 }
521
522 //@cindex prepend_event
523 void
524 prepend_event(event)       /* put event at beginning of EventQueue */
525 rtsEvent *event;
526 {                                 /* only used for GC! */
527  event->next = EventHd;
528  EventHd = event;
529 }
530
531 //@cindex grab_event
532 rtsEventQ
533 grab_event(void)             /* undo prepend_event i.e. get the event */
534 {                        /* at the head of EventQ but don't free anything */
535  rtsEventQ event = EventHd;
536
537  if (EventHd == NULL) {
538    barf("No next event (in grab_event). This may be caused by a circular data dependency in the program.");
539  }
540
541  EventHd = EventHd->next;
542  return (event);
543 }
544
545 //@cindex traverse_eventq_for_gc
546 void 
547 traverse_eventq_for_gc(void)
548 {
549  rtsEventQ event = EventHd;
550  StgWord bufsize;
551  StgClosure *closurep;
552  StgTSO *tsop;
553  StgPtr buffer, bufptr;
554  PEs proc, creator;
555
556  /* Traverse eventq and replace every FETCHREPLY by a FETCHNODE for the
557     orig closure (root of packed graph). This means that a graph, which is
558     between processors at the time of GC is fetched again at the time when
559     it would have arrived, had there been no GC. Slightly inaccurate but
560     safe for GC.
561     This is only needed for GUM style fetchng. -- HWL */
562  if (!RtsFlags.GranFlags.DoBulkFetching)
563    return;
564
565  for(event = EventHd; event!=NULL; event=event->next) {
566    if (event->evttype==FetchReply) {
567      buffer = stgCast(StgPtr,event->node);
568      ASSERT(buffer[PACK_FLAG_LOCN]==MAGIC_PACK_FLAG);  /* It's a pack buffer */
569      bufsize = buffer[PACK_SIZE_LOCN];
570      closurep = stgCast(StgClosure*,buffer[PACK_HDR_SIZE]);
571      tsop = stgCast(StgTSO*,buffer[PACK_TSO_LOCN]);
572      proc = event->proc;
573      creator = event->creator;                 /* similar to unpacking */
574      for (bufptr=buffer+PACK_HDR_SIZE; 
575           bufptr<(buffer+bufsize);
576           bufptr++) {
577          // if ( (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_SPEC_RBH_TYPE) ||
578          //      (INFO_TYPE(INFO_PTR(*bufptr)) == INFO_GEN_RBH_TYPE) ) {
579            if ( GET_INFO(stgCast(StgClosure*,bufptr)) ) {
580              convertFromRBH(stgCast(StgClosure *,bufptr));
581          }
582      }
583      free(buffer);
584      event->evttype = FetchNode;
585      event->proc    = creator;
586      event->creator = proc;
587      event->node    = closurep;
588      event->tso     = tsop;
589      event->gc_info = 0;
590    }
591  }
592 }
593
594 void
595 markEventQueue(void)
596
597   StgClosure *MarkRoot(StgClosure *root); // prototype
598
599   rtsEventQ event = EventHd;
600   nat len;
601
602   /* iterate over eventq and register relevant fields in event as roots */
603   for(event = EventHd, len =  0; event!=NULL; event=event->next, len++) {
604     switch (event->evttype) {
605       case ContinueThread:  
606         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
607         break;
608       case StartThread: 
609         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
610         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
611         break;
612       case ResumeThread:
613         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
614         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
615         break;
616       case MoveSpark:
617         event->spark->node = (StgClosure *)MarkRoot((StgClosure *)event->spark->node);
618         break;
619       case MoveThread:
620         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
621         break;
622       case FindWork:
623         break;
624       case FetchNode: 
625         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
626         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
627         break;
628       case FetchReply:
629         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
630         if (RtsFlags.GranFlags.DoBulkFetching)
631           // ToDo: traverse_eventw_for_gc if GUM-Fetching!!! HWL
632           belch("ghuH: packets in BulkFetching not marked as roots; mayb be fatal");
633         else
634           event->node = (StgTSO *)MarkRoot((StgClosure *)event->node);
635         break;
636       case GlobalBlock:
637         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
638         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
639         break;
640       case UnblockThread:
641         event->tso = (StgTSO *)MarkRoot((StgClosure *)event->tso);
642         event->node = (StgClosure *)MarkRoot((StgClosure *)event->node);
643         break;
644       default:
645         barf("markEventQueue: trying to mark unknown event @ %p", event);
646     }}
647   IF_DEBUG(gc,
648            belch("GC: markEventQueue: %d events in queue", len));
649 }
650
651 /*
652   Prune all ContinueThread events related to tso or node in the eventq.
653   Currently used if a thread leaves STG land with ThreadBlocked status,
654   i.e. it blocked on a closure and has been put on its blocking queue.  It
655   will be reawakended via a call to awaken_blocked_queue. Until then no
656   event effecting this tso should appear in the eventq.  A bit of a hack,
657   because ideally we shouldn't generate such spurious ContinueThread events
658   in the first place.  
659 */
660 //@cindex prune_eventq 
661 void 
662 prune_eventq(tso, node) 
663 StgTSO *tso; 
664 StgClosure *node; 
665 { rtsEventQ prev = (rtsEventQ)NULL, event = EventHd;
666
667   /* node unused for now */ 
668   ASSERT(node==NULL); 
669   /* tso must be valid, then */
670   ASSERT(tso!=END_TSO_QUEUE);
671   while (event != NULL) {
672     if (event->evttype==ContinueThread && 
673         (event->tso==tso)) {
674       IF_GRAN_DEBUG(event_trace, // ToDo: use another debug flag
675                     belch("prune_eventq: pruning ContinueThread event for TSO %d (%p) on PE %d @ %lx (%p)",
676                           event->tso->id, event->tso, event->proc, event->time, event));
677       if (prev==(rtsEventQ)NULL) { // beginning of eventq
678         EventHd = event->next;
679         free(event); 
680         event = EventHd;
681       } else {
682         prev->next = event->next;
683         free(event); 
684         event = prev->next;
685       }
686     } else { // no pruning necessary; go to next event
687       prev = event;
688       event = event->next;
689     }
690   }
691 }
692
693 //@cindex print_event
694 void
695 print_event(event)
696 rtsEvent *event;
697 {
698   char str_tso[16], str_node[16];
699   StgThreadID tso_id;
700
701   if (event->tso==END_TSO_QUEUE) {
702     strcpy(str_tso, "______");
703     tso_id = 0;
704   } else { 
705     sprintf(str_tso, "%p", event->tso);
706     tso_id = (event->tso==NULL) ? 0 : event->tso->id;
707   }
708   if  (event->node==(StgClosure*)NULL) {
709     strcpy(str_node, "______");
710   } else {
711     sprintf(str_node, "%p", event->node);
712   }
713   // HWL: shouldn't be necessary; ToDo: nuke
714   //str_tso[6]='\0';
715   //str_node[6]='\0';
716
717   if (event==NULL)
718     fprintf(stderr,"Evt: NIL\n");
719   else
720     fprintf(stderr, "Evt: %s (%u), PE %u [%u], Time %lu, TSO %d (%s), Node %s\n", //"Evt: %s (%u), PE %u [%u], Time %u, TSO %s (%#l), Node %s\n",
721               event_names[event->evttype], event->evttype,
722               event->proc, event->creator, event->time, 
723               tso_id, str_tso, str_node
724               /*, event->spark, event->next */ );
725
726 }
727
728 //@cindex print_eventq
729 void
730 print_eventq(hd)
731 rtsEvent *hd;
732 {
733   rtsEvent *x;
734
735   fprintf(stderr,"Event Queue with root at %p:\n", hd);
736   for (x=hd; x!=NULL; x=x->next) {
737     print_event(x);
738   }
739 }
740
741 /* 
742    Spark queue functions are now all  in Sparks.c!!
743 */
744 //@node Scheduling functions, Thread Queue routines, Spark queue functions, GranSim specific code
745 //@subsection Scheduling functions
746
747 /* 
748    These functions are variants of thread initialisation and therefore
749    related to initThread and friends in Schedule.c. However, they are
750    specific to a GranSim setup in storing more info in the TSO's statistics
751    buffer and sorting the thread queues etc.  
752 */
753
754 /*
755    A large portion of startThread deals with maintaining a sorted thread
756    queue, which is needed for the Priority Sparking option. Without that
757    complication the code boils down to FIFO handling.  
758 */
759 //@cindex insertThread
760 void
761 insertThread(tso, proc)
762 StgTSO*     tso;
763 PEs         proc;
764 {
765   StgTSO *prev = NULL, *next = NULL;
766   nat count = 0;
767   rtsBool found = rtsFalse;
768
769   ASSERT(CurrentProc==proc);
770   ASSERT(!is_on_queue(tso,proc));
771   /* Idle proc: put the thread on the run queue
772      same for pri spark and basic version */
773   if (run_queue_hds[proc] == END_TSO_QUEUE)
774     {
775       /* too strong!
776       ASSERT((CurrentProc==MainProc &&   
777               CurrentTime[MainProc]==0 &&
778               procStatus[MainProc]==Idle) ||
779              procStatus[proc]==Starting);
780       */
781       run_queue_hds[proc] = run_queue_tls[proc] = tso;
782
783       CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
784
785       /* new_event of ContinueThread has been moved to do_the_startthread */
786
787       /* too strong!
788       ASSERT(procStatus[proc]==Idle || 
789              procStatus[proc]==Fishing || 
790              procStatus[proc]==Starting);
791       procStatus[proc] = Busy;
792       */
793       return;
794     }
795
796   if (RtsFlags.GranFlags.Light)
797     GranSimLight_insertThread(tso, proc);
798
799   /* Only for Pri Scheduling: find place where to insert tso into queue */
800   if (RtsFlags.GranFlags.DoPriorityScheduling && tso->gran.pri!=0)
801     /* {add_to_spark_queue}vo' jInIHta'; Qu' wa'DIch yIleghQo' */
802     for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count=0;
803          (next != END_TSO_QUEUE) && 
804          !(found = tso->gran.pri >= next->gran.pri);
805          prev = next, next = next->link, count++) 
806       { 
807        ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
808               (prev==(StgTSO*)NULL || prev->link==next));
809       }
810
811   ASSERT(!found || next != END_TSO_QUEUE);
812   ASSERT(procStatus[proc]!=Idle);
813  
814   if (found) {
815      /* found can only be rtsTrue if pri scheduling enabled */ 
816      ASSERT(RtsFlags.GranFlags.DoPriorityScheduling);
817      if (RtsFlags.GranFlags.GranSimStats.Global) 
818        globalGranStats.non_end_add_threads++;
819      /* Add tso to ThreadQueue between prev and next */
820      tso->link = next;
821      if ( next == (StgTSO*)END_TSO_QUEUE ) {
822        run_queue_tl = tso;
823      } else {
824        /* no back link for TSO chain */
825      }
826      
827      if ( prev == (StgTSO*)END_TSO_QUEUE ) {
828        /* Never add TSO as first elem of thread queue; the first */
829        /* element should be the one that is currently running -- HWL */
830        IF_DEBUG(gran,
831                 belch("GRAN: Qagh: NewThread (w/ PriorityScheduling): Trying to add TSO %p (PRI=%d) as first elem of threadQ (%p) on proc %u (@ %u)\n",
832                     tso, tso->gran.pri, run_queue_hd, proc,
833                     CurrentTime[proc]));
834      } else {
835       prev->link = tso;
836      }
837   } else { /* !found */ /* or not pri sparking! */
838     /* Add TSO to the end of the thread queue on that processor */
839     run_queue_tls[proc]->link = tso;
840     run_queue_tls[proc] = tso;
841   }
842   ASSERT(RtsFlags.GranFlags.DoPriorityScheduling || count==0);
843   CurrentTime[proc] += count * RtsFlags.GranFlags.Costs.pri_sched_overhead +
844                        RtsFlags.GranFlags.Costs.threadqueuetime;
845
846   /* ToDo: check if this is still needed -- HWL 
847   if (RtsFlags.GranFlags.DoThreadMigration)
848     ++SurplusThreads;
849
850   if (RtsFlags.GranFlags.GranSimStats.Full &&
851       !(( event_type == GR_START || event_type == GR_STARTQ) && 
852         RtsFlags.GranFlags.labelling) )
853     DumpRawGranEvent(proc, creator, event_type+1, tso, node, 
854                      tso->gran.sparkname, spark_queue_len(proc));
855   */
856
857 # if defined(GRAN_CHECK)
858   /* Check if thread queue is sorted. Only for testing, really!  HWL */
859   if ( RtsFlags.GranFlags.DoPriorityScheduling && 
860        (RtsFlags.GranFlags.Debug.sortedQ) ) {
861     rtsBool sorted = rtsTrue;
862     StgTSO *prev, *next;
863
864     if (run_queue_hds[proc]==END_TSO_QUEUE || 
865         run_queue_hds[proc]->link==END_TSO_QUEUE) {
866       /* just 1 elem => ok */
867     } else {
868       /* Qu' wa'DIch yIleghQo' (ignore first elem)! */
869       for (prev = run_queue_hds[proc]->link, next = prev->link;
870            (next != END_TSO_QUEUE) ;
871            prev = next, next = prev->link) {
872         ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
873                (prev==(StgTSO*)NULL || prev->link==next));
874         sorted = sorted && 
875                  (prev->gran.pri >= next->gran.pri);
876       }
877     }
878     if (!sorted) {
879       fprintf(stderr,"Qagh: THREADQ on PE %d is not sorted:\n",
880               CurrentProc);
881       G_THREADQ(run_queue_hd,0x1);
882     }
883   }
884 # endif
885 }
886
887 /*
888   insertThread, which is only used for GranSim Light, is similar to
889   startThread in that it adds a TSO to a thread queue. However, it assumes
890   that the thread queue is sorted by local clocks and it inserts the TSO at
891   the right place in the queue. Don't create any event, just insert.  
892 */
893 //@cindex GranSimLight_insertThread
894 rtsBool
895 GranSimLight_insertThread(tso, proc)
896 StgTSO* tso;
897 PEs proc;
898 {
899   StgTSO *prev, *next;
900   nat count = 0;
901   rtsBool found = rtsFalse;
902
903   ASSERT(RtsFlags.GranFlags.Light);
904
905   /* In GrAnSim-Light we always have an idle `virtual' proc.
906      The semantics of the one-and-only thread queue is different here:
907      all threads in the queue are running (each on its own virtual processor);
908      the queue is only needed internally in the simulator to interleave the
909      reductions of the different processors.
910      The one-and-only thread queue is sorted by the local clocks of the TSOs.
911   */
912   ASSERT(run_queue_hds[proc] != END_TSO_QUEUE);
913   ASSERT(tso->link == END_TSO_QUEUE);
914
915   /* If only one thread in queue so far we emit DESCHEDULE in debug mode */
916   if (RtsFlags.GranFlags.GranSimStats.Full &&
917       (RtsFlags.GranFlags.Debug.checkLight) && 
918       (run_queue_hd->link == END_TSO_QUEUE)) {
919     DumpRawGranEvent(proc, proc, GR_DESCHEDULE,
920                      run_queue_hds[proc], (StgClosure*)NULL, 
921                      tso->gran.sparkname, spark_queue_len(proc)); // ToDo: check spar_queue_len
922     // resched = rtsTrue;
923   }
924
925   /* this routine should only be used in a GrAnSim Light setup */
926   /* && CurrentProc must be 0 in GrAnSim Light setup */
927   ASSERT(RtsFlags.GranFlags.Light && CurrentProc==0);
928
929   /* Idle proc; same for pri spark and basic version */
930   if (run_queue_hd==END_TSO_QUEUE)
931     {
932       run_queue_hd = run_queue_tl = tso;
933       /* MAKE_BUSY(CurrentProc); */
934       return rtsTrue;
935     }
936
937   for (prev = run_queue_hds[proc], next = run_queue_hds[proc]->link, count = 0;
938        (next != END_TSO_QUEUE) && 
939        !(found = (tso->gran.clock < next->gran.clock));
940        prev = next, next = next->link, count++) 
941     { 
942        ASSERT((prev!=(StgTSO*)NULL || next==run_queue_hds[proc]) &&
943               (prev==(StgTSO*)NULL || prev->link==next));
944     }
945
946   /* found can only be rtsTrue if pri sparking enabled */ 
947   if (found) {
948      /* Add tso to ThreadQueue between prev and next */
949      tso->link = next;
950      if ( next == END_TSO_QUEUE ) {
951        run_queue_tls[proc] = tso;
952      } else {
953        /* no back link for TSO chain */
954      }
955      
956      if ( prev == END_TSO_QUEUE ) {
957        run_queue_hds[proc] = tso;
958      } else {
959        prev->link = tso;
960      }
961   } else { /* !found */ /* or not pri sparking! */
962     /* Add TSO to the end of the thread queue on that processor */
963     run_queue_tls[proc]->link = tso;
964     run_queue_tls[proc] = tso;
965   }
966
967   if ( prev == END_TSO_QUEUE ) {        /* new head of queue */
968     new_event(proc, proc, CurrentTime[proc],
969               ContinueThread,
970               tso, (StgClosure*)NULL, (rtsSpark*)NULL);
971   }
972   /*
973   if (RtsFlags.GranFlags.GranSimStats.Full && 
974       !(( event_type == GR_START || event_type == GR_STARTQ) && 
975         RtsFlags.GranFlags.labelling) )
976     DumpRawGranEvent(proc, creator, gr_evttype, tso, node,
977                      tso->gran.sparkname, spark_queue_len(proc));
978   */
979   return rtsTrue;
980 }
981
982 /*
983   endThread is responsible for general clean-up after the thread tso has
984   finished. This includes emitting statistics into the profile etc.  
985 */
986 void
987 endThread(StgTSO *tso, PEs proc) 
988 {
989   ASSERT(procStatus[proc]==Busy);        // coming straight out of STG land
990   ASSERT(tso->whatNext==ThreadComplete);
991   // ToDo: prune ContinueThreads for this TSO from event queue
992   DumpEndEvent(proc, tso, rtsFalse /* not mandatory */);
993
994   /* if this was the last thread on this PE then make it Idle */
995   if (run_queue_hds[proc]==END_TSO_QUEUE) {
996     procStatus[CurrentProc] = Idle;
997   }
998 }
999
1000 //@node Thread Queue routines, GranSim functions, Scheduling functions, GranSim specific code
1001 //@subsection Thread Queue routines
1002
1003 /* 
1004    Check whether given tso resides on the run queue of the current processor.
1005    Only used for debugging.
1006 */
1007    
1008 //@cindex is_on_queue
1009 rtsBool
1010 is_on_queue (StgTSO *tso, PEs proc) 
1011 {
1012   StgTSO *t;
1013   rtsBool found;
1014
1015   for (t=run_queue_hds[proc], found=rtsFalse; 
1016        t!=END_TSO_QUEUE && !(found = t==tso);
1017        t=t->link)
1018     /* nothing */ ;
1019
1020   return found;
1021 }
1022
1023 /* This routine  is only  used for keeping   a statistics  of thread  queue
1024    lengths to evaluate the impact of priority scheduling. -- HWL 
1025    {spark_queue_len}vo' jInIHta'
1026 */
1027 //@cindex thread_queue_len
1028 nat
1029 thread_queue_len(PEs proc) 
1030 {
1031  StgTSO *prev, *next;
1032  nat len;
1033
1034  for (len = 0, prev = END_TSO_QUEUE, next = run_queue_hds[proc];
1035       next != END_TSO_QUEUE; 
1036       len++, prev = next, next = prev->link)
1037    {}
1038
1039  return (len);
1040 }
1041
1042 //@node GranSim functions, GranSimLight routines, Thread Queue routines, GranSim specific code
1043 //@subsection GranSim functions
1044
1045 /* -----------------------------------------------------------------  */
1046 /* The main event handling functions; called from Schedule.c (schedule) */
1047 /* -----------------------------------------------------------------  */
1048  
1049 //@cindex do_the_globalblock
1050
1051 void 
1052 do_the_globalblock(rtsEvent* event)
1053
1054   PEs proc          = event->proc;        /* proc that requested node */
1055   StgTSO *tso       = event->tso;         /* tso that requested node */
1056   StgClosure  *node = event->node;        /* requested, remote node */
1057
1058   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the GlobalBlock\n"));
1059   /* There should be no GLOBALBLOCKs in GrAnSim Light setup */
1060   ASSERT(!RtsFlags.GranFlags.Light);
1061   /* GlobalBlock events only valid with GUM fetching */
1062   ASSERT(RtsFlags.GranFlags.DoBulkFetching);
1063
1064   IF_GRAN_DEBUG(bq, // globalBlock,
1065     if (IS_LOCAL_TO(PROCS(node),proc)) {
1066       belch("## Qagh: GlobalBlock: Blocking TSO %d (%p) on LOCAL node %p (PE %d).\n",
1067             tso->id, tso, node, proc);
1068     });
1069
1070   /* CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.munpacktime; */
1071   if ( blockFetch(tso,proc,node) != 0 )
1072     return;                     /* node has become local by now */
1073
1074 #if 0
1075  ToDo: check whether anything has to be done at all after blockFetch -- HWL
1076
1077   if (!RtsFlags.GranFlags.DoAsyncFetch) { /* head of queue is next thread */
1078     StgTSO* tso = run_queue_hds[proc];       /* awaken next thread */
1079     if (tso != (StgTSO*)NULL) {
1080       new_event(proc, proc, CurrentTime[proc],
1081                 ContinueThread,
1082                 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
1083       CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
1084       if (RtsFlags.GranFlags.GranSimStats.Full)
1085         DumpRawGranEvent(proc, CurrentProc, GR_SCHEDULE, tso,
1086                          (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));  // ToDo: check sparkname and spar_queue_len
1087       procStatus[proc] = Busy;                  /* might have been fetching */
1088     } else {
1089       procStatus[proc] = Idle;                     /* no work on proc now */
1090     }
1091   } else {  /* RtsFlags.GranFlags.DoAsyncFetch i.e. block-on-fetch */
1092               /* other thread is already running */
1093               /* 'oH 'utbe' 'e' vIHar ; I think that's not needed -- HWL 
1094               new_event(proc,proc,CurrentTime[proc],
1095                        CONTINUETHREAD,EVENT_TSO(event),
1096                        (RtsFlags.GranFlags.DoBulkFetching ? closure :
1097                        EVENT_NODE(event)),NULL);
1098               */
1099   }
1100 #endif
1101 }
1102
1103 //@cindex do_the_unblock
1104
1105 void 
1106 do_the_unblock(rtsEvent* event) 
1107 {
1108   PEs proc = event->proc,       /* proc that requested node */
1109       creator = event->creator; /* proc that requested node */
1110   StgTSO* tso = event->tso;     /* tso that requested node */
1111   StgClosure* node = event->node;  /* requested, remote node */
1112   
1113   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the UnBlock\n"))
1114   /* There should be no UNBLOCKs in GrAnSim Light setup */
1115   ASSERT(!RtsFlags.GranFlags.Light);
1116   /* UnblockThread means either FetchReply has arrived or
1117      a blocking queue has been awakened;
1118      ToDo: check with assertions
1119   ASSERT(procStatus[proc]==Fetching || IS_BLACK_HOLE(event->node));
1120   */
1121   if (!RtsFlags.GranFlags.DoAsyncFetch) {  /* block-on-fetch */
1122     /* We count block-on-fetch as normal block time */    
1123     tso->gran.blocktime += CurrentTime[proc] - tso->gran.blockedat;
1124     /* Dumping now done when processing the event
1125        No costs for contextswitch or thread queueing in this case 
1126        if (RtsFlags.GranFlags.GranSimStats.Full)
1127          DumpRawGranEvent(proc, CurrentProc, GR_RESUME, tso, 
1128                           (StgClosure*)NULL, tso->gran.sparkname, spark_queue_len(CurrentProc));
1129     */
1130     /* Maybe do this in FetchReply already 
1131     if (procStatus[proc]==Fetching)
1132       procStatus[proc] = Busy;
1133     */
1134     /*
1135     new_event(proc, proc, CurrentTime[proc],
1136               ContinueThread,
1137               tso, node, (rtsSpark*)NULL);
1138     */
1139   } else {
1140     /* Asynchr comm causes additional costs here: */
1141     /* Bring the TSO from the blocked queue into the threadq */
1142   }
1143   /* In all cases, the UnblockThread causes a ResumeThread to be scheduled */
1144   new_event(proc, proc, 
1145             CurrentTime[proc]+RtsFlags.GranFlags.Costs.threadqueuetime,
1146             ResumeThread,
1147             tso, node, (rtsSpark*)NULL);
1148 }
1149
1150 //@cindex do_the_fetchnode
1151
1152 void
1153 do_the_fetchnode(rtsEvent* event)
1154 {
1155   PEs proc = event->proc,       /* proc that holds the requested node */
1156       creator = event->creator; /* proc that requested node */
1157   StgTSO* tso = event->tso;
1158   StgClosure* node = event->node;  /* requested, remote node */
1159   rtsFetchReturnCode rc;
1160
1161   ASSERT(CurrentProc==proc);
1162   /* There should be no FETCHNODEs in GrAnSim Light setup */
1163   ASSERT(!RtsFlags.GranFlags.Light);
1164
1165   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchNode\n"));
1166
1167   CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1168
1169   /* ToDo: check whether this is the right place for dumping the event */
1170   if (RtsFlags.GranFlags.GranSimStats.Full)
1171     DumpRawGranEvent(creator, proc, GR_FETCH, tso, node, (StgInt)0, 0);
1172
1173   do {
1174     rc = handleFetchRequest(node, proc, creator, tso);
1175     if (rc == OutOfHeap) {                                   /* trigger GC */
1176 # if defined(GRAN_CHECK)  && defined(GRAN)
1177      if (RtsFlags.GcFlags.giveStats)
1178        fprintf(RtsFlags.GcFlags.statsFile,"*****   veQ boSwI'  PackNearbyGraph(node %p, tso %p (%d))\n",
1179                 node, tso, tso->id);
1180 # endif
1181      prepend_event(event);
1182      GarbageCollect(GetRoots); 
1183      // HWL: ToDo: check whether a ContinueThread has to be issued
1184      // HWL old: ReallyPerformThreadGC(PACK_HEAP_REQUIRED, rtsFalse);
1185 # if defined(GRAN_CHECK)  && defined(GRAN)
1186      if (RtsFlags.GcFlags.giveStats) {
1187        fprintf(RtsFlags.GcFlags.statsFile,"*****      SAVE_Hp=%p, SAVE_HpLim=%p, PACK_HEAP_REQUIRED=%d\n",
1188                 Hp, HpLim, 0) ; // PACK_HEAP_REQUIRED);  ???
1189        fprintf(stderr,"*****      No. of packets so far: %d (total size: %d)\n", 
1190                 globalGranStats.tot_packets, globalGranStats.tot_packet_size);
1191      }
1192 # endif 
1193      event = grab_event();
1194      // Hp -= PACK_HEAP_REQUIRED; // ???
1195
1196      /* GC knows that events are special and follows the pointer i.e. */
1197      /* events are valid even if they moved. An EXIT is triggered */
1198      /* if there is not enough heap after GC. */
1199     }
1200   } while (rc == OutOfHeap);
1201 }
1202
1203 //@cindex do_the_fetchreply
1204 void 
1205 do_the_fetchreply(rtsEvent* event)
1206 {
1207   PEs proc = event->proc,       /* proc that requested node */
1208       creator = event->creator; /* proc that holds the requested node */
1209   StgTSO* tso = event->tso;
1210   StgClosure* node = event->node;  /* requested, remote node */
1211   StgClosure* closure=(StgClosure*)NULL;
1212
1213   ASSERT(CurrentProc==proc);
1214   ASSERT(RtsFlags.GranFlags.DoAsyncFetch || procStatus[proc]==Fetching);
1215
1216   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the FetchReply\n"));
1217   /* There should be no FETCHREPLYs in GrAnSim Light setup */
1218   ASSERT(!RtsFlags.GranFlags.Light);
1219
1220   /* assign message unpack costs *before* dumping the event */
1221   CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1222   
1223   /* ToDo: check whether this is the right place for dumping the event */
1224   if (RtsFlags.GranFlags.GranSimStats.Full)
1225     DumpRawGranEvent(proc, creator, GR_REPLY, tso, node, 
1226                       tso->gran.sparkname, spark_queue_len(proc));
1227
1228   /* THIS SHOULD NEVER HAPPEN 
1229      If tso is in the BQ of node this means that it actually entered the 
1230      remote closure, due to a missing GranSimFetch at the beginning of the 
1231      entry code; therefore, this is actually a faked fetch, triggered from 
1232      within GranSimBlock; 
1233      since tso is both in the EVQ and the BQ for node, we have to take it out 
1234      of the BQ first before we can handle the FetchReply;
1235      ToDo: special cases in awaken_blocked_queue, since the BQ magically moved.
1236   */
1237   if (tso->blocked_on!=(StgClosure*)NULL) {
1238     IF_GRAN_DEBUG(bq,
1239                   belch("## ghuH: TSO %d (%p) in FetchReply is blocked on node %p (shouldn't happen AFAIK)",
1240                         tso->id, tso, node));
1241     // unlink_from_bq(tso, node);
1242   }
1243     
1244   if (RtsFlags.GranFlags.DoBulkFetching) {      /* bulk (packet) fetching */
1245     rtsPackBuffer *buffer = (rtsPackBuffer*)node;
1246     nat size = buffer->size;
1247   
1248     /* NB: Fetch misses can't occur with GUM fetching, as */
1249     /* updatable closure are turned into RBHs and therefore locked */
1250     /* for other processors that try to grab them. */
1251   
1252     closure = UnpackGraph(buffer);
1253     CurrentTime[proc] += size * RtsFlags.GranFlags.Costs.munpacktime;
1254   } else  // incremental fetching
1255       /* Copy or  move node to CurrentProc */
1256       if (fetchNode(node, creator, proc)) {
1257         /* Fetch has failed i.e. node has been grabbed by another PE */
1258         PEs p = where_is(node);
1259         rtsTime fetchtime;
1260      
1261         if (RtsFlags.GranFlags.GranSimStats.Global)
1262           globalGranStats.fetch_misses++;
1263
1264         IF_GRAN_DEBUG(thunkStealing,
1265                  belch("== Qu'vatlh! fetch miss @ %u: node %p is at proc %u (rather than proc %u)\n",
1266                        CurrentTime[proc],node,p,creator));
1267
1268         CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
1269         
1270         /* Count fetch again !? */
1271         ++(tso->gran.fetchcount);
1272         tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1273         
1274         fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
1275                     RtsFlags.GranFlags.Costs.latency;
1276         
1277         /* Chase the grabbed node */
1278         new_event(p, proc, fetchtime,
1279                   FetchNode,
1280                   tso, node, (rtsSpark*)NULL);
1281
1282 # if 0 && defined(GRAN_CHECK) && defined(GRAN) /* Just for testing */
1283        IF_GRAN_DEBUG(blockOnFetch,
1284                      BlockedOnFetch[CurrentProc] = tso;) /*-rtsTrue;-*/
1285         
1286        IF_GRAN_DEBUG(blockOnFetch_sanity,
1287                      tso->type |= FETCH_MASK_TSO;)
1288 # endif
1289
1290         CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
1291         
1292         return; /* NB: no REPLy has been processed; tso still sleeping */
1293     }
1294
1295     /* -- Qapla'! Fetch has been successful; node is here, now  */
1296     ++(event->tso->gran.fetchcount);
1297     event->tso->gran.fetchtime += RtsFlags.GranFlags.Costs.fetchtime;
1298
1299     /* this is now done at the beginning of this routine
1300     if (RtsFlags.GranFlags.GranSimStats.Full)
1301        DumpRawGranEvent(proc,event->creator, GR_REPLY, event->tso,
1302                         (RtsFlags.GranFlags.DoBulkFetching ? 
1303                                closure : 
1304                                event->node),
1305                         tso->gran.sparkname, spark_queue_len(proc));
1306     */
1307
1308     --OutstandingFetches[proc];
1309     ASSERT(OutstandingFetches[proc] >= 0);
1310     new_event(proc, proc, CurrentTime[proc],
1311               ResumeThread,
1312               event->tso, (RtsFlags.GranFlags.DoBulkFetching ? 
1313                            closure : 
1314                            event->node),
1315               (rtsSpark*)NULL);
1316 }
1317
1318 //@cindex do_the_movethread
1319
1320 void
1321 do_the_movethread(rtsEvent* event) {
1322   PEs proc = event->proc,       /* proc that requested node */
1323       creator = event->creator; /* proc that holds the requested node */
1324   StgTSO* tso = event->tso;
1325
1326  IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveThread\n"));
1327
1328  ASSERT(CurrentProc==proc);
1329  /* There should be no MOVETHREADs in GrAnSim Light setup */
1330  ASSERT(!RtsFlags.GranFlags.Light);
1331  /* MOVETHREAD events should never occur without -bM */
1332  ASSERT(RtsFlags.GranFlags.DoThreadMigration);
1333  /* Bitmask of moved thread should be 0 */
1334  ASSERT(PROCS(tso)==0);
1335  ASSERT(procStatus[proc] == Fishing ||
1336         RtsFlags.GranFlags.DoAsyncFetch);
1337  ASSERT(OutstandingFishes[proc]>0);
1338
1339  /* ToDo: exact costs for unpacking the whole TSO  */
1340  CurrentTime[proc] +=  5l * RtsFlags.GranFlags.Costs.munpacktime;
1341
1342  /* ToDo: check whether this is the right place for dumping the event */
1343  if (RtsFlags.GranFlags.GranSimStats.Full)
1344    DumpRawGranEvent(proc, creator, 
1345                     GR_STOLEN, tso, (StgClosure*)NULL, (StgInt)0, 0);
1346
1347  // ToDo: check cost functions
1348  --OutstandingFishes[proc];
1349  SET_GRAN_HDR(tso, ThisPE);         // adjust the bitmask for the TSO
1350  insertThread(tso, proc);
1351
1352  if (procStatus[proc]==Fishing)
1353    procStatus[proc] = Idle;
1354
1355  if (RtsFlags.GranFlags.GranSimStats.Global)
1356    globalGranStats.tot_TSOs_migrated++;
1357 }
1358
1359 //@cindex do_the_movespark
1360
1361 void
1362 do_the_movespark(rtsEvent* event) {
1363  PEs proc = event->proc,       /* proc that requested spark */
1364      creator = event->creator; /* proc that holds the requested spark */
1365  StgTSO* tso = event->tso;
1366  rtsSparkQ spark = event->spark;
1367
1368  IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the MoveSpark\n"))
1369
1370  ASSERT(CurrentProc==proc);
1371  ASSERT(spark!=NULL);
1372  ASSERT(procStatus[proc] == Fishing ||
1373         RtsFlags.GranFlags.DoAsyncFetch);
1374  ASSERT(OutstandingFishes[proc]>0); 
1375
1376  CurrentTime[proc] += RtsFlags.GranFlags.Costs.munpacktime;
1377           
1378  /* record movement of spark only if spark profiling is turned on */
1379  if (RtsFlags.GranFlags.GranSimStats.Sparks)
1380     DumpRawGranEvent(proc, creator,
1381                      SP_ACQUIRED,
1382                      tso, spark->node, spark->name, spark_queue_len(proc));
1383
1384  /* global statistics */
1385  if ( RtsFlags.GranFlags.GranSimStats.Global &&
1386       !closure_SHOULD_SPARK(spark->node))
1387    globalGranStats.withered_sparks++;
1388    /* Not adding the spark to the spark queue would be the right */
1389    /* thing here, but it also would be cheating, as this info can't be */
1390    /* available in a real system. -- HWL */
1391
1392  --OutstandingFishes[proc];
1393
1394  add_to_spark_queue(spark);
1395
1396  IF_GRAN_DEBUG(randomSteal, // ToDo: spark-distribution flag
1397                print_sparkq_stats());
1398
1399  /* Should we treat stolen sparks specially? Currently, we don't. */
1400
1401  if (procStatus[proc]==Fishing)
1402    procStatus[proc] = Idle;
1403
1404  /* add_to_spark_queue will increase the time of the current proc. */
1405  /*
1406    If proc was fishing, it is Idle now with the new spark in its spark
1407    pool. This means that the next time handleIdlePEs is called, a local
1408    FindWork will be created on this PE to turn the spark into a thread. Of
1409    course another PE might steal the spark in the meantime (that's why we
1410    are using events rather than inlining all the operations in the first
1411    place). */
1412 }
1413
1414 /*
1415   In the Constellation class version of GranSim the semantics of StarThread
1416   events has changed. Now, StartThread has to perform 3 basic operations:
1417    - create a new thread (previously this was done in ActivateSpark);
1418    - insert the thread into the run queue of the current processor
1419    - generate a new event for actually running the new thread
1420   Note that the insertThread is called via createThread. 
1421 */
1422   
1423 //@cindex do_the_startthread
1424
1425 void
1426 do_the_startthread(rtsEvent *event)
1427 {
1428   PEs proc          = event->proc;        /* proc that requested node */
1429   StgTSO *tso       = event->tso;         /* tso that requested node */
1430   StgClosure  *node = event->node;        /* requested, remote node */
1431   rtsSpark *spark   = event->spark;
1432   GranEventType gr_evttype;
1433
1434   ASSERT(CurrentProc==proc);
1435   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1436   ASSERT(event->evttype == ResumeThread || event->evttype == StartThread);
1437   /* if this was called via StartThread: */
1438   ASSERT(event->evttype!=StartThread || tso == END_TSO_QUEUE); // not yet created
1439   // ToDo: check: ASSERT(event->evttype!=StartThread || procStatus[proc]==Starting);
1440   /* if this was called via ResumeThread: */
1441   ASSERT(event->evttype!=ResumeThread || 
1442            RtsFlags.GranFlags.DoAsyncFetch ||!is_on_queue(tso,proc)); 
1443
1444   /* startThread may have been called from the main event handler upon
1445      finding either a ResumeThread or a StartThread event; set the
1446      gr_evttype (needed for writing to .gr file) accordingly */
1447   // gr_evttype = (event->evttype == ResumeThread) ? GR_RESUME : GR_START;
1448
1449   if ( event->evttype == StartThread ) {
1450     GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? 
1451                                  GR_START : GR_STARTQ;
1452
1453     tso = createThread(BLOCK_SIZE_W, spark->gran_info);// implicit insertThread!
1454     pushClosure(tso, node);
1455
1456     // ToDo: fwd info on local/global spark to thread -- HWL
1457     // tso->gran.exported =  spark->exported;
1458     // tso->gran.locked =   !spark->global;
1459     tso->gran.sparkname = spark->name;
1460
1461     ASSERT(CurrentProc==proc);
1462     if (RtsFlags.GranFlags.GranSimStats.Full)
1463       DumpGranEvent(gr_evttype,tso);
1464
1465     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcreatetime;
1466   } else { // event->evttype == ResumeThread
1467     GranEventType gr_evttype = (run_queue_hds[proc]==END_TSO_QUEUE) ? 
1468                                  GR_RESUME : GR_RESUMEQ;
1469
1470     insertThread(tso, proc);
1471
1472     ASSERT(CurrentProc==proc);
1473     if (RtsFlags.GranFlags.GranSimStats.Full)
1474       DumpGranEvent(gr_evttype,tso);
1475   }
1476
1477   ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE); // non-empty run queue
1478   procStatus[proc] = Busy;
1479   /* make sure that this thread is actually run */
1480   new_event(proc, proc, 
1481             CurrentTime[proc],
1482             ContinueThread,
1483             tso, node, (rtsSpark*)NULL);
1484   
1485   /* A wee bit of statistics gathering */
1486   if (RtsFlags.GranFlags.GranSimStats.Global) {
1487     globalGranStats.tot_add_threads++;
1488     globalGranStats.tot_tq_len += thread_queue_len(CurrentProc);
1489   }
1490
1491 }
1492
1493 //@cindex do_the_findwork
1494 void
1495 do_the_findwork(rtsEvent* event) 
1496 {
1497   PEs proc = event->proc,       /* proc to search for work */
1498       creator = event->creator; /* proc that requested work */
1499   rtsSparkQ spark = event->spark;
1500   /* ToDo: check that this size is safe -- HWL */
1501   nat req_heap = sizeofW(StgTSO) + MIN_STACK_WORDS;
1502                  // add this? -- HWL:RtsFlags.ConcFlags.stkChunkSize;
1503
1504   IF_DEBUG(gran, fprintf(stderr, "GRAN: doing the Findwork\n"));
1505
1506   /* If GUM style fishing is enabled, the contents of the spark field says
1507      what to steal (spark(1) or thread(2)); */
1508   ASSERT(!(RtsFlags.GranFlags.Fishing && event->spark==(rtsSpark*)0));
1509
1510   /* Make sure that we have enough heap for creating a new
1511      thread. This is a conservative estimate of the required heap.
1512      This eliminates special checks for GC around NewThread within
1513      ActivateSpark.                                                 */
1514   
1515   if (Hp + req_heap > HpLim ) {
1516     IF_DEBUG(gc, 
1517              belch("GC: Doing GC from within Findwork handling (that's bloody dangerous if you ask me)");)
1518       GarbageCollect(GetRoots);
1519       // ReallyPerformThreadGC(req_heap, rtsFalse);   old -- HWL
1520       Hp -= req_heap;
1521       if (procStatus[CurrentProc]==Sparking) 
1522         procStatus[CurrentProc]=Idle;
1523       return;
1524   }
1525   
1526   if ( RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1527        RtsFlags.GranFlags.Fishing ||
1528        ((procStatus[proc]==Idle || procStatus[proc]==Sparking) &&
1529         (RtsFlags.GranFlags.FetchStrategy >= 2 || 
1530          OutstandingFetches[proc] == 0)) ) 
1531    {
1532     rtsBool found;
1533     rtsSparkQ  prev, spark;
1534     
1535     /* ToDo: check */
1536     ASSERT(procStatus[proc]==Sparking ||
1537            RtsFlags.GranFlags.DoAlwaysCreateThreads ||
1538            RtsFlags.GranFlags.Fishing);
1539     
1540     /* SImmoHwI' yInej! Search spark queue! */
1541     /* gimme_spark (event, &found, &spark); */
1542     findLocalSpark(event, &found, &spark);
1543
1544     if (!found) { /* pagh vumwI' */
1545       /*
1546         If no spark has been found this can mean 2 things:
1547          1/ The FindWork was a fish (i.e. a message sent by another PE) and 
1548             the spark pool of the receiver is empty
1549             --> the fish has to be forwarded to another PE
1550          2/ The FindWork was local to this PE (i.e. no communication; in this
1551             case creator==proc) and the spark pool of the PE is not empty 
1552             contains only sparks of closures that should not be sparked 
1553             (note: if the spark pool were empty, handleIdlePEs wouldn't have 
1554             generated a FindWork in the first place)
1555             --> the PE has to be made idle to trigger stealing sparks the next
1556                 time handleIdlePEs is performed
1557       */ 
1558
1559       ASSERT(pending_sparks_hds[proc]==(rtsSpark*)NULL);
1560       if (creator==proc) {
1561         /* local FindWork */
1562         if (procStatus[proc]==Busy) {
1563           belch("ghuH: PE %d in Busy state while processing local FindWork (spark pool is empty!) @ %lx",
1564                 proc, CurrentTime[proc]);
1565           procStatus[proc] = Idle;
1566         }
1567       } else {
1568         /* global FindWork i.e. a Fish */
1569         ASSERT(RtsFlags.GranFlags.Fishing);
1570         /* actually this generates another request from the originating PE */
1571         ASSERT(OutstandingFishes[creator]>0);
1572         OutstandingFishes[creator]--;
1573         /* ToDo: assign costs for sending fish to proc not to creator */
1574         stealSpark(creator); /* might steal from same PE; ToDo: fix */
1575         ASSERT(RtsFlags.GranFlags.maxFishes!=1 || procStatus[creator] == Fishing);
1576         /* any assertions on state of proc possible here? */
1577       }
1578     } else {
1579       /* DaH chu' Qu' yIchen! Now create new work! */ 
1580       IF_GRAN_DEBUG(findWork,
1581                     belch("+- munching spark %p; creating thread for node %p",
1582                           spark, spark->node));
1583       activateSpark (event, spark);
1584       ASSERT(spark != (rtsSpark*)NULL);
1585       spark = delete_from_sparkq (spark, proc, rtsTrue);
1586     }
1587
1588     IF_GRAN_DEBUG(findWork,
1589                   belch("+- Contents of spark queues at the end of FindWork @ %lx",
1590                         CurrentTime[proc]); 
1591                   print_sparkq_stats());
1592
1593     /* ToDo: check ; not valid if GC occurs in ActivateSpark */
1594     ASSERT(!found ||
1595             /* forward fish  or */
1596             (proc!=creator ||
1597             /* local spark  or */
1598             (proc==creator && procStatus[proc]==Starting)) || 
1599            //(!found && procStatus[proc]==Idle) ||
1600            RtsFlags.GranFlags.DoAlwaysCreateThreads); 
1601    } else {
1602     IF_GRAN_DEBUG(findWork,
1603                   belch("+- RTS refuses to findWork on PE %d @ %lx",
1604                         proc, CurrentTime[proc]);
1605                   belch("  procStatus[%d]=%s, fetch strategy=%d, outstanding fetches[%d]=%d", 
1606                         proc, proc_status_names[procStatus[proc]],
1607                         RtsFlags.GranFlags.FetchStrategy, 
1608                         proc, OutstandingFetches[proc]));
1609    }  
1610 }
1611  
1612 //@node GranSimLight routines, Code for Fetching Nodes, GranSim functions, GranSim specific code
1613 //@subsection GranSimLight routines
1614
1615 /* 
1616    This code is called from the central scheduler after having rgabbed a
1617    new event and is only needed for GranSim-Light. It mainly adjusts the
1618    ActiveTSO so that all costs that have to be assigned from within the
1619    scheduler are assigned to the right TSO. The choice of ActiveTSO depends
1620    on the type of event that has been found.  
1621 */
1622
1623 void
1624 GranSimLight_enter_system(event, ActiveTSOp)
1625 rtsEvent *event;
1626 StgTSO **ActiveTSOp;
1627 {
1628   StgTSO *ActiveTSO = *ActiveTSOp;
1629
1630   ASSERT (RtsFlags.GranFlags.Light);
1631   
1632   /* Restore local clock of the virtual processor attached to CurrentTSO.
1633      All costs will be associated to the `virt. proc' on which the tso
1634      is living. */
1635   if (ActiveTSO != NULL) {                     /* already in system area */
1636     ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1637     if (RtsFlags.GranFlags.DoFairSchedule)
1638       {
1639         if (RtsFlags.GranFlags.GranSimStats.Full &&
1640             RtsFlags.GranFlags.Debug.checkLight)
1641           DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1642       }
1643   }
1644   switch (event->evttype)
1645     { 
1646     case ContinueThread: 
1647     case FindWork:       /* inaccurate this way */
1648       ActiveTSO = run_queue_hd;
1649       break;
1650     case ResumeThread:   
1651     case StartThread:
1652     case MoveSpark:      /* has tso of virt proc in tso field of event */
1653       ActiveTSO = event->tso;
1654       break;
1655     default: barf("Illegal event type %s (%d) in GrAnSim Light setup\n",
1656                   event_names[event->evttype],event->evttype);
1657     }
1658   CurrentTime[CurrentProc] = ActiveTSO->gran.clock;
1659   if (RtsFlags.GranFlags.DoFairSchedule) {
1660       if (RtsFlags.GranFlags.GranSimStats.Full &&
1661           RtsFlags.GranFlags.Debug.checkLight)
1662         DumpGranEvent(GR_SYSTEM_START,ActiveTSO);
1663   }
1664 }
1665
1666 void
1667 GranSimLight_leave_system(event, ActiveTSOp)
1668 rtsEvent *event;
1669 StgTSO **ActiveTSOp;
1670 {
1671   StgTSO *ActiveTSO = *ActiveTSOp;
1672
1673   ASSERT(RtsFlags.GranFlags.Light);
1674
1675   /* Save time of `virt. proc' which was active since last getevent and
1676      restore time of `virt. proc' where CurrentTSO is living on. */
1677   if(RtsFlags.GranFlags.DoFairSchedule) {
1678     if (RtsFlags.GranFlags.GranSimStats.Full &&
1679         RtsFlags.GranFlags.Debug.checkLight) // ToDo: clean up flags
1680       DumpGranEvent(GR_SYSTEM_END,ActiveTSO);
1681   }
1682   ActiveTSO->gran.clock = CurrentTime[CurrentProc];
1683   ActiveTSO = (StgTSO*)NULL;
1684   CurrentTime[CurrentProc] = CurrentTSO->gran.clock;
1685   if (RtsFlags.GranFlags.DoFairSchedule /* &&  resched */ ) {
1686     // resched = rtsFalse;
1687     if (RtsFlags.GranFlags.GranSimStats.Full &&
1688         RtsFlags.GranFlags.Debug.checkLight)
1689       DumpGranEvent(GR_SCHEDULE,run_queue_hd);
1690   }
1691   /* 
1692      if (TSO_LINK(ThreadQueueHd)!=PrelBase_Z91Z93_closure &&
1693      (TimeOfNextEvent == 0 ||
1694      TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000<TimeOfNextEvent)) {
1695      new_event(CurrentProc,CurrentProc,TSO_CLOCK(TSO_LINK(ThreadQueueHd))+1000,
1696      CONTINUETHREAD,TSO_LINK(ThreadQueueHd),PrelBase_Z91Z93_closure,NULL);
1697      TimeOfNextEvent = get_time_of_next_event();
1698      }
1699   */
1700 }
1701
1702 //@node Code for Fetching Nodes, Idle PEs, GranSimLight routines, GranSim specific code
1703 //@subsection Code for Fetching Nodes
1704
1705 /*
1706    The following GrAnSim routines simulate the fetching of nodes from a
1707    remote processor. We use a 1 word bitmask to indicate on which processor
1708    a node is lying. Thus, moving or copying a node from one processor to
1709    another just requires an appropriate change in this bitmask (using
1710    @SET_GA@).  Additionally, the clocks have to be updated.
1711
1712    A special case arises when the node that is needed by processor A has
1713    been moved from a processor B to a processor C between sending out a
1714    @FETCH@ (from A) and its arrival at B. In that case the @FETCH@ has to
1715    be forwarded to C. This is simulated by issuing another FetchNode event
1716    on processor C with A as creator.
1717 */
1718  
1719 /* ngoqvam che' {GrAnSim}! */
1720
1721 /* Fetch node "node" to processor "p" */
1722
1723 //@cindex fetchNode
1724
1725 rtsFetchReturnCode
1726 fetchNode(node,from,to)
1727 StgClosure* node;
1728 PEs from, to;
1729 {
1730   /* In case of RtsFlags.GranFlags.DoBulkFetching this fct should never be 
1731      entered! Instead, UnpackGraph is used in ReSchedule */
1732   StgClosure* closure;
1733
1734   ASSERT(to==CurrentProc);
1735   /* Should never be entered  in GrAnSim Light setup */
1736   ASSERT(!RtsFlags.GranFlags.Light);
1737   /* fetchNode should never be entered with DoBulkFetching */
1738   ASSERT(!RtsFlags.GranFlags.DoBulkFetching);
1739
1740   /* Now fetch the node */
1741   if (!IS_LOCAL_TO(PROCS(node),from) &&
1742       !IS_LOCAL_TO(PROCS(node),to) ) 
1743     return NodeHasMoved;
1744   
1745   if (closure_HNF(node))                /* node already in head normal form? */
1746     node->header.gran.procs |= PE_NUMBER(to);           /* Copy node */
1747   else
1748     node->header.gran.procs = PE_NUMBER(to);            /* Move node */
1749
1750   return Ok;
1751 }
1752
1753 /* 
1754    Process a fetch request. 
1755    
1756    Cost of sending a packet of size n = C + P*n
1757    where C = packet construction constant, 
1758          P = cost of packing one word into a packet
1759    [Should also account for multiple packets].
1760 */
1761
1762 //@cindex handleFetchRequest
1763
1764 rtsFetchReturnCode
1765 handleFetchRequest(node,to,from,tso)
1766 StgClosure* node;   // the node which is requested
1767 PEs to, from;       // fetch request: from -> to
1768 StgTSO* tso;        // the tso which needs the node
1769 {
1770   ASSERT(!RtsFlags.GranFlags.Light);
1771   /* ToDo: check assertion */
1772   ASSERT(OutstandingFetches[from]>0);
1773
1774   /* probably wrong place; */
1775   ASSERT(CurrentProc==to);
1776
1777   if (IS_LOCAL_TO(PROCS(node), from)) /* Somebody else moved node already => */
1778     {                                 /* start tso */
1779       IF_GRAN_DEBUG(thunkStealing,
1780                     fprintf(stderr,"ghuH: handleFetchRequest entered with local node %p (%s) (PE %d)\n", 
1781                             node, info_type(node), from));
1782
1783       if (RtsFlags.GranFlags.DoBulkFetching) {
1784         nat size;
1785         rtsPackBuffer *graph;
1786
1787         /* Create a 1-node-buffer and schedule a FETCHREPLY now */
1788         graph = PackOneNode(node, tso, &size); 
1789         new_event(from, to, CurrentTime[to],
1790                   FetchReply,
1791                   tso, graph, (rtsSpark*)NULL);
1792       } else {
1793         new_event(from, to, CurrentTime[to],
1794                   FetchReply,
1795                   tso, node, (rtsSpark*)NULL);
1796       }
1797       IF_GRAN_DEBUG(thunkStealing,
1798                     belch("== majQa'! closure %p is local on PE %d already (this is a good thing)", node, from));
1799       return (NodeIsLocal);
1800     }
1801   else if (IS_LOCAL_TO(PROCS(node), to) )   /* Is node still here? */
1802     {
1803       if (RtsFlags.GranFlags.DoBulkFetching) { /* {GUM}vo' ngoqvam vInIHta' */
1804         nat size;                              /* (code from GUM) */
1805         StgClosure* graph;
1806
1807         if (IS_BLACK_HOLE(node)) {   /* block on BH or RBH */
1808           new_event(from, to, CurrentTime[to],
1809                     GlobalBlock,
1810                     tso, node, (rtsSpark*)NULL);
1811           /* Note: blockFetch is done when handling GLOBALBLOCK event; 
1812                    make sure the TSO stays out of the run queue */
1813           /* When this thread is reawoken it does the usual: it tries to 
1814              enter the updated node and issues a fetch if it's remote.
1815              It has forgotten that it has sent a fetch already (i.e. a
1816              FETCHNODE is swallowed by a BH, leaving the thread in a BQ) */
1817           --OutstandingFetches[from];
1818
1819           IF_GRAN_DEBUG(thunkStealing,
1820                         belch("== majQa'! closure %p on PE %d is a BH (demander=PE %d); faking a FMBQ", 
1821                               node, to, from));
1822           if (RtsFlags.GranFlags.GranSimStats.Global) {
1823             globalGranStats.tot_FMBQs++;
1824           }
1825           return (NodeIsBH);
1826         }
1827
1828         /* The tso requesting the node is blocked and cannot be on a run queue */
1829         ASSERT(!is_on_queue(tso, from));
1830
1831         if ((graph = PackNearbyGraph(node, tso, &size)) == NULL) 
1832           return (OutOfHeap);  /* out of heap */
1833
1834         /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1835         /* Send a reply to the originator */
1836         /* ToDo: Replace that by software costs for doing graph packing! */
1837         CurrentTime[to] += size * RtsFlags.GranFlags.Costs.mpacktime;
1838
1839         new_event(from, to,
1840                   CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1841                   FetchReply,
1842                   tso, (StgClosure *)graph, (rtsSpark*)NULL);
1843         
1844         CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1845         return (Ok);
1846       } else {                   /* incremental (single closure) fetching */
1847         /* Actual moving/copying of node is done on arrival; see FETCHREPLY */
1848         /* Send a reply to the originator */
1849         CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1850
1851         new_event(from, to,
1852                   CurrentTime[to]+RtsFlags.GranFlags.Costs.latency,
1853                   FetchReply,
1854                   tso, node, (rtsSpark*)NULL);
1855       
1856         CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1857         return (Ok);
1858       }
1859     }
1860   else       /* Qu'vatlh! node has been grabbed by another proc => forward */
1861     {    
1862       PEs node_loc = where_is(node);
1863       rtsTime fetchtime;
1864
1865       IF_GRAN_DEBUG(thunkStealing,
1866                     belch("== Qu'vatlh! node %p has been grabbed by PE %d from PE %d (demander=%d) @ %d\n",
1867                           node,node_loc,to,from,CurrentTime[to]));
1868       if (RtsFlags.GranFlags.GranSimStats.Global) {
1869         globalGranStats.fetch_misses++;
1870       }
1871
1872       /* Prepare FORWARD message to proc p_new */
1873       CurrentTime[to] += RtsFlags.GranFlags.Costs.mpacktime;
1874       
1875       fetchtime = stg_max(CurrentTime[to], CurrentTime[node_loc]) +
1876                   RtsFlags.GranFlags.Costs.latency;
1877           
1878       new_event(node_loc, from, fetchtime,
1879                 FetchNode,
1880                 tso, node, (rtsSpark*)NULL);
1881
1882       CurrentTime[to] += RtsFlags.GranFlags.Costs.mtidytime;
1883
1884       return (NodeHasMoved);
1885     }
1886 }
1887
1888 /*
1889    blockFetch blocks a BlockedFetch node on some kind of black hole.
1890
1891    Taken from gum/HLComms.lc.   [find a  better  place for that ?] --  HWL  
1892
1893    {\bf Note:} In GranSim we don't have @FETCHME@ nodes and therefore don't
1894    create @FMBQ@'s (FetchMe blocking queues) to cope with global
1895    blocking. Instead, non-local TSO are put into the BQ in the same way as
1896    local TSOs. However, we have to check if a TSO is local or global in
1897    order to account for the latencies involved and for keeping track of the
1898    number of fetches that are really going on.  
1899 */
1900
1901 //@cindex blockFetch
1902
1903 rtsFetchReturnCode
1904 blockFetch(tso, proc, bh)
1905 StgTSO* tso;                        /* TSO which gets blocked */
1906 PEs proc;                           /* PE where that tso was running */
1907 StgClosure* bh;                     /* closure to block on (BH, RBH, BQ) */
1908 {
1909   StgInfoTable *info;
1910
1911   IF_GRAN_DEBUG(bq,
1912                 fprintf(stderr,"## blockFetch: blocking TSO %p (%d)[PE %d] on node %p (%s) [PE %d]. No graph is packed!\n", 
1913                 tso, tso->id, proc, bh, info_type(bh), where_is(bh)));
1914
1915     if (!IS_BLACK_HOLE(bh)) {                      /* catches BHs and RBHs */
1916       IF_GRAN_DEBUG(bq,
1917                     fprintf(stderr,"## blockFetch: node %p (%s) is not a BH => awakening TSO %p (%d) [PE %u]\n", 
1918                             bh, info_type(bh), tso, tso->id, proc));
1919
1920       /* No BH anymore => immediately unblock tso */
1921       new_event(proc, proc, CurrentTime[proc],
1922                 UnblockThread,
1923                 tso, bh, (rtsSpark*)NULL);
1924
1925       /* Is this always a REPLY to a FETCH in the profile ? */
1926       if (RtsFlags.GranFlags.GranSimStats.Full)
1927         DumpRawGranEvent(proc, proc, GR_REPLY, tso, bh, (StgInt)0, 0);
1928       return (NodeIsNoBH);
1929     }
1930
1931     /* DaH {BQ}Daq Qu' Suq 'e' wISov!
1932        Now we know that we have to put the tso into the BQ.
1933        2 cases: If block-on-fetch, tso is at head of threadq => 
1934                 => take it out of threadq and into BQ
1935                 If reschedule-on-fetch, tso is only pointed to be event
1936                 => just put it into BQ
1937
1938     ngoq ngo'!!
1939     if (!RtsFlags.GranFlags.DoAsyncFetch) {
1940       GranSimBlock(tso, proc, bh);
1941     } else {
1942       if (RtsFlags.GranFlags.GranSimStats.Full)
1943         DumpRawGranEvent(proc, where_is(bh), GR_BLOCK, tso, bh, (StgInt)0, 0);
1944       ++(tso->gran.blockcount);
1945       tso->gran.blockedat = CurrentTime[proc];
1946     }
1947     */
1948
1949     /* after scheduling the GlobalBlock event the TSO is not put into the
1950        run queue again; it is only pointed to via the event we are
1951        processing now; in GranSim 4.xx there is no difference between
1952        synchr and asynchr comm here */
1953     ASSERT(!is_on_queue(tso, proc));
1954     ASSERT(tso->link == END_TSO_QUEUE);
1955
1956     GranSimBlock(tso, proc, bh);  /* GranSim statistics gathering */
1957
1958     /* Now, put tso into BQ (similar to blocking entry codes) */
1959     info = get_itbl(bh);
1960     switch (info -> type) {
1961       case RBH:
1962       case BLACKHOLE:
1963       case CAF_BLACKHOLE: // ToDo: check whether this is a possibly ITBL here
1964       case SE_BLACKHOLE:   // ToDo: check whether this is a possibly ITBL here
1965       case SE_CAF_BLACKHOLE:// ToDo: check whether this is a possibly ITBL here
1966         /* basically an inlined version of BLACKHOLE_entry -- HWL */
1967         /* Change the BLACKHOLE into a BLACKHOLE_BQ */
1968         ((StgBlockingQueue *)bh)->header.info = &BLACKHOLE_BQ_info;
1969         /* Put ourselves on the blocking queue for this black hole */
1970         // tso->link=END_TSO_QUEUE;   not necessary; see assertion above
1971         ((StgBlockingQueue *)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1972         tso->blocked_on = bh;
1973         recordMutable((StgMutClosure *)bh);
1974         break;
1975
1976     case BLACKHOLE_BQ:
1977         /* basically an inlined version of BLACKHOLE_BQ_entry -- HWL */
1978         tso->link = (StgTSO *) (((StgBlockingQueue*)bh)->blocking_queue); 
1979         ((StgBlockingQueue*)bh)->blocking_queue = (StgBlockingQueueElement *)tso;
1980         recordMutable((StgMutClosure *)bh);
1981
1982 # if 0 && defined(GC_MUT_REQUIRED)
1983         ToDo: check whether recordMutable is necessary -- HWL
1984         /*
1985          * If we modify a black hole in the old generation, we have to make 
1986          * sure it goes on the mutables list
1987          */
1988
1989         if (bh <= StorageMgrInfo.OldLim) {
1990             MUT_LINK(bh) = (W_) StorageMgrInfo.OldMutables;
1991             StorageMgrInfo.OldMutables = bh;
1992         } else
1993             MUT_LINK(bh) = MUT_NOT_LINKED;
1994 # endif
1995         break;
1996
1997     case FETCH_ME_BQ:
1998         barf("Qagh: FMBQ closure (%p) found in GrAnSim (TSO=%p (%d))\n",
1999              bh, tso, tso->id);
2000
2001     default:
2002         {
2003           G_PRINT_NODE(bh);
2004           barf("Qagh: thought %p was a black hole (IP %p (%s))",
2005                   bh, info, info_type(get_itbl(bh)));
2006         }
2007       }
2008     return (Ok);
2009 }
2010
2011
2012 //@node Idle PEs, Routines directly called from Haskell world, Code for Fetching Nodes, GranSim specific code
2013 //@subsection Idle PEs
2014
2015 /*
2016    Export work to idle PEs. This function is called from @ReSchedule@
2017    before dispatching on the current event. @HandleIdlePEs@ iterates over
2018    all PEs, trying to get work for idle PEs. Note, that this is a
2019    simplification compared to GUM's fishing model. We try to compensate for
2020    that by making the cost for stealing work dependent on the number of
2021    idle processors and thereby on the probability with which a randomly
2022    sent fish would find work.  
2023 */
2024
2025 //@cindex handleIdlePEs
2026
2027 void
2028 handleIdlePEs(void)
2029 {
2030   PEs p;
2031
2032   IF_DEBUG(gran, fprintf(stderr, "GRAN: handling Idle PEs\n"))
2033
2034   /* Should never be entered in GrAnSim Light setup */
2035   ASSERT(!RtsFlags.GranFlags.Light);
2036
2037   /* Could check whether there are idle PEs if it's a cheap check */
2038   for (p = 0; p < RtsFlags.GranFlags.proc; p++) 
2039     if (procStatus[p]==Idle)  /*  && IS_SPARKING(p) && IS_STARTING(p) */
2040       /* First look for local work i.e. examine local spark pool! */
2041       if (pending_sparks_hds[p]!=(rtsSpark *)NULL) {
2042         new_event(p, p, CurrentTime[p],
2043                   FindWork,
2044                   (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2045         procStatus[p] = Sparking;
2046       } else if ((RtsFlags.GranFlags.maxFishes==0 ||
2047                   OutstandingFishes[p]<RtsFlags.GranFlags.maxFishes) ) {
2048
2049         /* If no local work then try to get remote work! 
2050            Qu' Hopbe' pagh tu'lu'pu'chugh Qu' Hop yISuq ! */
2051         if (RtsFlags.GranFlags.DoStealThreadsFirst && 
2052             (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0))
2053           {
2054             if (SurplusThreads > 0l)                    /* Steal a thread */
2055               stealThread(p);
2056           
2057             if (procStatus[p]!=Idle)
2058               break;
2059           }
2060         
2061         if (SparksAvail > 0 && 
2062             (RtsFlags.GranFlags.FetchStrategy >= 3 || OutstandingFetches[p] == 0)) /* Steal a spark */
2063           stealSpark(p);
2064         
2065         if (SurplusThreads > 0 && 
2066             (RtsFlags.GranFlags.FetchStrategy >= 4 || OutstandingFetches[p] == 0)) /* Steal a thread */
2067           stealThread(p);
2068       }
2069 }
2070
2071 /*
2072    Steal a spark and schedule moving it to proc. We want to look at PEs in
2073    clock order -- most retarded first.  Currently sparks are only stolen
2074    from the @ADVISORY_POOL@ never from the @REQUIRED_POOL@. Eventually,
2075    this should be changed to first steal from the former then from the
2076    latter.
2077
2078    We model a sort of fishing mechanism by counting the number of sparks
2079    and threads we are currently stealing.  */
2080
2081 /* 
2082    Return a random nat value in the intervall [from, to) 
2083 */
2084 static nat 
2085 natRandom(from, to)
2086 nat from, to;
2087 {
2088   nat r, d;
2089
2090   ASSERT(from<=to);
2091   d = to - from;
2092   /* random returns a value in [0, RAND_MAX] */
2093   r = (nat) ((float)from + ((float)random()*(float)d)/(float)RAND_MAX);
2094   r = (r==to) ? from : r;
2095   ASSERT(from<=r && (r<to || from==to));
2096   return r;  
2097 }
2098
2099 /* 
2100    Find any PE other than proc. Used for GUM style fishing only.
2101 */
2102 static PEs 
2103 findRandomPE (proc)
2104 PEs proc;
2105 {
2106   nat p;
2107
2108   ASSERT(RtsFlags.GranFlags.Fishing);
2109   if (RtsFlags.GranFlags.RandomSteal) {
2110     p = natRandom(0,RtsFlags.GranFlags.proc);  /* full range of PEs */
2111   } else {
2112     p = 0;
2113   }
2114   IF_GRAN_DEBUG(randomSteal,
2115                 belch("^^ RANDOM_STEAL (fishing): stealing from PE %d (current proc is %d)",
2116                       p, proc);)
2117     
2118   return (PEs)p;
2119 }
2120
2121 /*
2122   Magic code for stealing sparks/threads makes use of global knowledge on
2123   spark queues.  
2124 */
2125 static void
2126 sortPEsByTime (proc, pes_by_time, firstp, np) 
2127 PEs proc;
2128 PEs *pes_by_time;
2129 nat *firstp, *np;
2130 {
2131   PEs p, temp, n, i, j;
2132   nat first, upb, r=0, q=0;
2133
2134   ASSERT(!RtsFlags.GranFlags.Fishing);
2135
2136 #if 0  
2137   upb = RtsFlags.GranFlags.proc;            /* full range of PEs */
2138
2139   if (RtsFlags.GranFlags.RandomSteal) {
2140     r = natRandom(0,RtsFlags.GranFlags.proc);  /* full range of PEs */
2141   } else {
2142     r = 0;
2143   }
2144 #endif
2145
2146   /* pes_by_time shall contain processors from which we may steal sparks */ 
2147   for(n=0, p=0; p < RtsFlags.GranFlags.proc; ++p)
2148     if ((proc != p) &&                       // not the current proc
2149         (pending_sparks_hds[p] != (rtsSpark *)NULL) && // non-empty spark pool
2150         (CurrentTime[p] <= CurrentTime[CurrentProc]))
2151       pes_by_time[n++] = p;
2152
2153   /* sort pes_by_time */
2154   for(i=0; i < n; ++i)
2155     for(j=i+1; j < n; ++j)
2156       if (CurrentTime[pes_by_time[i]] > CurrentTime[pes_by_time[j]]) {
2157         rtsTime temp = pes_by_time[i];
2158         pes_by_time[i] = pes_by_time[j];
2159         pes_by_time[j] = temp;
2160       }
2161
2162   /* Choose random processor to steal spark from; first look at processors */
2163   /* that are earlier than the current one (i.e. proc) */
2164   for(first=0; 
2165       (first < n) && (CurrentTime[pes_by_time[first]] <= CurrentTime[proc]);
2166       ++first)
2167     /* nothing */ ;
2168
2169   /* if the assertion below is true we can get rid of first */
2170   /* ASSERT(first==n); */
2171   /* ToDo: check if first is really needed; find cleaner solution */
2172
2173   *firstp = first;
2174   *np = n;
2175 }
2176
2177 /* 
2178    Steal a spark (piece of work) from any processor and bring it to proc.
2179 */
2180 //@cindex stealSpark
2181 static inline rtsBool 
2182 stealSpark(PEs proc) { stealSomething(proc, rtsTrue, rtsFalse); }
2183
2184 /* 
2185    Steal a thread from any processor and bring it to proc i.e. thread migration
2186 */
2187 //@cindex stealThread
2188 static inline rtsBool 
2189 stealThread(PEs proc) { stealSomething(proc, rtsFalse, rtsTrue); }
2190
2191 /* 
2192    Steal a spark or a thread and schedule moving it to proc.
2193 */
2194 //@cindex stealSomething
2195 static rtsBool
2196 stealSomething(proc, steal_spark, steal_thread)
2197 PEs proc;                           // PE that needs work (stealer)
2198 rtsBool steal_spark, steal_thread;  // should a spark and/or thread be stolen
2199 {
2200   PEs p;
2201   rtsTime fish_arrival_time;
2202   rtsSpark *spark, *prev, *next;
2203   rtsBool stolen = rtsFalse;
2204
2205   ASSERT(steal_spark || steal_thread);
2206
2207   /* Should never be entered in GrAnSim Light setup */
2208   ASSERT(!RtsFlags.GranFlags.Light);
2209   ASSERT(!steal_thread || RtsFlags.GranFlags.DoThreadMigration);
2210
2211   if (!RtsFlags.GranFlags.Fishing) {
2212     // ToDo: check if stealing threads is prefered over stealing sparks
2213     if (steal_spark) {
2214       if (stealSparkMagic(proc))
2215         return rtsTrue;
2216       else                             // no spark found
2217         if (steal_thread)
2218           return stealThreadMagic(proc);
2219         else                           // no thread found
2220           return rtsFalse;             
2221     } else {                           // ASSERT(steal_thread);
2222       return stealThreadMagic(proc);
2223     }
2224     barf("stealSomething: never reached");
2225   }
2226
2227   /* The rest of this function does GUM style fishing */
2228   
2229   p = findRandomPE(proc); /* find a random PE other than proc */
2230   
2231   /* Message packing costs for sending a Fish; qeq jabbI'ID */
2232   CurrentTime[proc] += RtsFlags.GranFlags.Costs.mpacktime;
2233   
2234   /* use another GranEvent for requesting a thread? */
2235   if (steal_spark && RtsFlags.GranFlags.GranSimStats.Sparks)
2236     DumpRawGranEvent(p, proc, SP_REQUESTED,
2237                      (StgTSO*)NULL, (StgClosure *)NULL, (StgInt)0, 0);
2238
2239   /* time of the fish arrival on the remote PE */
2240   fish_arrival_time = CurrentTime[proc] + RtsFlags.GranFlags.Costs.latency;
2241   
2242   /* Phps use an own Fish event for that? */
2243   /* The contents of the spark component is a HACK:
2244       1 means give me a spark;
2245       2 means give me a thread
2246       0 means give me nothing (this should never happen)
2247   */
2248   new_event(p, proc, fish_arrival_time,
2249             FindWork,
2250             (StgTSO*)NULL, (StgClosure*)NULL, 
2251             (steal_spark ? (rtsSpark*)1 : steal_thread ? (rtsSpark*)2 : (rtsSpark*)0));
2252   
2253   ++OutstandingFishes[proc];
2254   /* only with Async fetching? */
2255   if (procStatus[proc]==Idle)  
2256     procStatus[proc]=Fishing;
2257   
2258   /* time needed to clean up buffers etc after sending a message */
2259   CurrentTime[proc] += RtsFlags.GranFlags.Costs.mtidytime;
2260
2261   /* If GUM style fishing stealing always succeeds because it only consists
2262      of sending out a fish; of course, when the fish may return
2263      empty-handed! */
2264   return rtsTrue;
2265 }
2266
2267 /* 
2268    This version of stealing a spark makes use of the global info on all
2269    spark pools etc which is not available in a real parallel system.
2270    This could be extended to test e.g. the impact of perfect load information.
2271 */
2272 //@cindex stealSparkMagic
2273 static rtsBool
2274 stealSparkMagic(proc)
2275 PEs proc;
2276 {
2277   PEs p, i, j, n, first, upb;
2278   rtsSpark *spark, *next;
2279   PEs pes_by_time[MAX_PROC];
2280   rtsBool stolen = rtsFalse;
2281   rtsTime stealtime;
2282
2283   /* Should never be entered in GrAnSim Light setup */
2284   ASSERT(!RtsFlags.GranFlags.Light);
2285
2286   sortPEsByTime(proc, pes_by_time, &first, &n);
2287
2288   while (!stolen && n>0) {
2289     upb = (first==0) ? n : first;
2290     i = natRandom(0,upb);                /* choose a random eligible PE */
2291     p = pes_by_time[i];
2292
2293     IF_GRAN_DEBUG(randomSteal,
2294                   belch("^^ stealSparkMagic (random_steal, not fishing): stealing spark from PE %d (current proc is %d)",
2295                         p, proc));
2296       
2297     ASSERT(pending_sparks_hds[p]!=(rtsSpark *)NULL); /* non-empty spark pool */
2298
2299     /* Now go through rtsSparkQ and steal the first eligible spark */
2300     
2301     spark = pending_sparks_hds[p]; 
2302     while (!stolen && spark != (rtsSpark*)NULL)
2303       {
2304         /* NB: no prev pointer is needed here because all sparks that are not 
2305            chosen are pruned
2306         */
2307         if ((procStatus[p]==Idle || procStatus[p]==Sparking || procStatus[p] == Fishing) &&
2308             spark->next==(rtsSpark*)NULL) 
2309           {
2310             /* Be social! Don't steal the only spark of an idle processor 
2311                not {spark} neH yInIH !! */
2312             break; /* next PE */
2313           } 
2314         else if (closure_SHOULD_SPARK(spark->node))
2315           {
2316             /* Don't Steal local sparks; 
2317                ToDo: optionally prefer local over global sparks
2318             if (!spark->global) {
2319               prev=spark;
2320               continue;                  next spark
2321             }
2322             */
2323             /* found a spark! */
2324
2325             /* Prepare message for sending spark */
2326             CurrentTime[p] += RtsFlags.GranFlags.Costs.mpacktime;
2327
2328             if (RtsFlags.GranFlags.GranSimStats.Sparks)
2329               DumpRawGranEvent(p, (PEs)0, SP_EXPORTED,
2330                                (StgTSO*)NULL, spark->node,
2331                                spark->name, spark_queue_len(p));
2332
2333             stealtime = (CurrentTime[p] > CurrentTime[proc] ? 
2334                            CurrentTime[p] : 
2335                            CurrentTime[proc])
2336                         + sparkStealTime();
2337
2338             new_event(proc, p /* CurrentProc */, stealtime,
2339                       MoveSpark,
2340                       (StgTSO*)NULL, spark->node, spark);
2341             
2342             stolen = rtsTrue;
2343             ++OutstandingFishes[proc]; /* no. of sparks currently on the fly */
2344             if (procStatus[proc]==Idle)
2345               procStatus[proc] = Fishing;
2346             ++(spark->global);         /* record that this is a global spark */
2347             ASSERT(SparksAvail>0);
2348             --SparksAvail;            /* on-the-fly sparks are not available */
2349             next = delete_from_sparkq(spark, p, rtsFalse); // don't dispose!
2350             CurrentTime[p] += RtsFlags.GranFlags.Costs.mtidytime;
2351           }
2352         else   /* !(closure_SHOULD_SPARK(SPARK_NODE(spark))) */
2353           {
2354            IF_GRAN_DEBUG(checkSparkQ,
2355                          belch("^^ pruning spark %p (node %p) in stealSparkMagic",
2356                                spark, spark->node));
2357
2358             /* if the spark points to a node that should not be sparked,
2359                prune the spark queue at this point */
2360             if (RtsFlags.GranFlags.GranSimStats.Sparks)
2361               DumpRawGranEvent(p, (PEs)0, SP_PRUNED,
2362                                (StgTSO*)NULL, spark->node,
2363                                spark->name, spark_queue_len(p));
2364             if (RtsFlags.GranFlags.GranSimStats.Global)
2365               globalGranStats.pruned_sparks++;
2366             
2367             ASSERT(SparksAvail>0);
2368             --SparksAvail;
2369             spark = delete_from_sparkq(spark, p, rtsTrue);
2370           }
2371         /* unlink spark (may have been freed!) from sparkq;
2372         if (prev == NULL) // spark was head of spark queue
2373           pending_sparks_hds[p] = spark->next;
2374         else  
2375           prev->next = spark->next;
2376         if (spark->next == NULL)
2377           pending_sparks_tls[p] = prev;
2378         else  
2379           next->prev = prev;
2380         */
2381       }                    /* while ...    iterating over sparkq */
2382
2383     /* ToDo: assert that PE p still has work left after stealing the spark */
2384
2385     if (!stolen && (n>0)) {  /* nothing stealable from proc p :( */
2386       ASSERT(pes_by_time[i]==p);
2387
2388       /* remove p from the list (at pos i) */
2389       for (j=i; j+1<n; j++)
2390         pes_by_time[j] = pes_by_time[j+1];
2391       n--;
2392       
2393       /* update index to first proc which is later (or equal) than proc */
2394       for ( ;
2395             (first>0) &&
2396               (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2397             first--)
2398         /* nothing */ ;
2399     } 
2400   }  /* while ... iterating over PEs in pes_by_time */
2401
2402   IF_GRAN_DEBUG(randomSteal,
2403                 if (stolen)
2404                   belch("^^ stealSparkMagic: spark %p (node=%p) stolen by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2405                        spark, spark->node, proc, p, 
2406                        SparksAvail, idlers());
2407                 else  
2408                   belch("^^ stealSparkMagic: nothing stolen by PE %d (sparkq len after pruning=%d)(SparksAvail=%d; idlers=%d)",
2409                         proc, SparksAvail, idlers()));
2410
2411   if (RtsFlags.GranFlags.GranSimStats.Global &&
2412       stolen && (i!=0)) {                          /* only for statistics */
2413     globalGranStats.rs_sp_count++;
2414     globalGranStats.ntimes_total += n;
2415     globalGranStats.fl_total += first;
2416     globalGranStats.no_of_steals++;
2417   }
2418
2419   return stolen;
2420 }
2421
2422 /* 
2423    The old stealThread code, which makes use of global info and does not
2424    send out fishes.  
2425    NB: most of this is the same as in stealSparkMagic;
2426        only the pieces specific to processing thread queues are different; 
2427        long live polymorphism!  
2428 */
2429
2430 //@cindex stealThreadMagic
2431 static rtsBool
2432 stealThreadMagic(proc)
2433 PEs proc;
2434 {
2435   PEs p, i, j, n, first, upb;
2436   StgTSO *tso;
2437   PEs pes_by_time[MAX_PROC];
2438   rtsBool stolen = rtsFalse;
2439   rtsTime stealtime;
2440
2441   /* Should never be entered in GrAnSim Light setup */
2442   ASSERT(!RtsFlags.GranFlags.Light);
2443
2444   sortPEsByTime(proc, pes_by_time, &first, &n);
2445
2446   while (!stolen && n>0) {
2447     upb = (first==0) ? n : first;
2448     i = natRandom(0,upb);                /* choose a random eligible PE */
2449     p = pes_by_time[i];
2450
2451     IF_GRAN_DEBUG(randomSteal,
2452                   belch("^^ stealThreadMagic (random_steal, not fishing): stealing thread from PE %d (current proc is %d)",
2453                         p, proc));
2454       
2455     /* Steal the first exportable thread in the runnable queue but
2456        never steal the first in the queue for social reasons;
2457        not Qu' wa'DIch yInIH !!
2458     */
2459     /* Would be better to search through queue and have options which of
2460        the threads to pick when stealing */
2461     if (run_queue_hds[p] == END_TSO_QUEUE) {
2462       IF_GRAN_DEBUG(randomSteal,
2463                     belch("^^ stealThreadMagic: No thread to steal from PE %d (stealer=PE %d)", 
2464                           p, proc));
2465     } else {
2466       tso = run_queue_hds[p]->link;  /* tso is *2nd* thread in thread queue */
2467       /* Found one */
2468       stolen = rtsTrue;
2469
2470       /* update links in queue */
2471       run_queue_hds[p]->link = tso->link;
2472       if (run_queue_tls[p] == tso)
2473         run_queue_tls[p] = run_queue_hds[p];
2474       
2475       /* ToDo: Turn magic constants into params */
2476       
2477       CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mpacktime;
2478       
2479       stealtime = (CurrentTime[p] > CurrentTime[proc] ? 
2480                    CurrentTime[p] : 
2481                    CurrentTime[proc])
2482         + sparkStealTime() 
2483         + 4l * RtsFlags.GranFlags.Costs.additional_latency
2484         + 5l * RtsFlags.GranFlags.Costs.munpacktime;
2485
2486       /* Move the thread; set bitmask to 0 while TSO is `on-the-fly' */
2487       SET_GRAN_HDR(tso,Nowhere /* PE_NUMBER(proc) */); 
2488
2489       /* Move from one queue to another */
2490       new_event(proc, p, stealtime,
2491                 MoveThread,
2492                 tso, (StgClosure*)NULL, (rtsSpark*)NULL);
2493
2494       /* MAKE_BUSY(proc);  not yet; only when thread is in threadq */
2495       ++OutstandingFishes[proc];
2496       if (procStatus[proc])
2497         procStatus[proc] = Fishing;
2498       --SurplusThreads;
2499
2500       if(RtsFlags.GranFlags.GranSimStats.Full)
2501         DumpRawGranEvent(p, proc, 
2502                          GR_STEALING, 
2503                          tso, (StgClosure*)NULL, (StgInt)0, 0);
2504       
2505       /* costs for tidying up buffer after having sent it */
2506       CurrentTime[p] += 5l * RtsFlags.GranFlags.Costs.mtidytime;
2507     }
2508
2509     /* ToDo: assert that PE p still has work left after stealing the spark */
2510
2511     if (!stolen && (n>0)) {  /* nothing stealable from proc p :( */
2512       ASSERT(pes_by_time[i]==p);
2513
2514       /* remove p from the list (at pos i) */
2515       for (j=i; j+1<n; j++)
2516         pes_by_time[j] = pes_by_time[j+1];
2517       n--;
2518       
2519       /* update index to first proc which is later (or equal) than proc */
2520       for ( ;
2521             (first>0) &&
2522               (CurrentTime[pes_by_time[first-1]]>CurrentTime[proc]);
2523             first--)
2524         /* nothing */ ;
2525     } 
2526   }  /* while ... iterating over PEs in pes_by_time */
2527
2528   IF_GRAN_DEBUG(randomSteal,
2529                 if (stolen)
2530                   belch("^^ stealThreadMagic: stolen TSO %d (%p) by PE %d from PE %d (SparksAvail=%d; idlers=%d)",
2531                         tso->id, tso, proc, p,
2532                         SparksAvail, idlers());
2533                 else
2534                   belch("stealThreadMagic: nothing stolen by PE %d (SparksAvail=%d; idlers=%d)",
2535                         proc, SparksAvail, idlers()));
2536
2537   if (RtsFlags.GranFlags.GranSimStats.Global &&
2538       stolen && (i!=0)) { /* only for statistics */
2539     /* ToDo: more statistics on avg thread queue lenght etc */
2540     globalGranStats.rs_t_count++;
2541     globalGranStats.no_of_migrates++;
2542   }
2543
2544   return stolen;
2545 }
2546
2547 //@cindex sparkStealTime
2548 static rtsTime
2549 sparkStealTime(void)
2550 {
2551   double fishdelay, sparkdelay, latencydelay;
2552   fishdelay =  (double)RtsFlags.GranFlags.proc/2;
2553   sparkdelay = fishdelay - 
2554           ((fishdelay-1)/(double)(RtsFlags.GranFlags.proc-1))*(double)idlers();
2555   latencydelay = sparkdelay*((double)RtsFlags.GranFlags.Costs.latency);
2556
2557   return((rtsTime)latencydelay);
2558 }
2559
2560 //@node Routines directly called from Haskell world, Emiting profiling info for GrAnSim, Idle PEs, GranSim specific code
2561 //@subsection Routines directly called from Haskell world
2562 /* 
2563 The @GranSim...@ routines in here are directly called via macros from the
2564 threaded world. 
2565
2566 First some auxiliary routines.
2567 */
2568
2569 /* Take the current thread off the thread queue and thereby activate the 
2570    next thread. It's assumed that the next ReSchedule after this uses 
2571    NEW_THREAD as param. 
2572    This fct is called from GranSimBlock and GranSimFetch 
2573 */
2574
2575 //@cindex ActivateNextThread
2576
2577 void 
2578 ActivateNextThread (proc)
2579 PEs proc;
2580 {
2581   StgTSO *t;
2582   /*
2583     This routine is entered either via GranSimFetch or via GranSimBlock.
2584     It has to prepare the CurrentTSO for being blocked and update the
2585     run queue and other statistics on PE proc. The actual enqueuing to the 
2586     blocking queue (if coming from GranSimBlock) is done in the entry code 
2587     of the BLACKHOLE and BLACKHOLE_BQ closures (see StgMiscClosures.hc).
2588   */
2589   /* ToDo: add assertions here!! */
2590   //ASSERT(run_queue_hds[proc]!=END_TSO_QUEUE);
2591
2592   // Only necessary if the running thread is at front of the queue
2593   // run_queue_hds[proc] = run_queue_hds[proc]->link;
2594   ASSERT(CurrentProc==proc);
2595   ASSERT(!is_on_queue(CurrentTSO,proc));
2596   if (run_queue_hds[proc]==END_TSO_QUEUE) {
2597     /* NB: this routine is only entered with asynchr comm (see assertion) */
2598     procStatus[proc] = Idle;
2599   } else {
2600     /* ToDo: check cost assignment */
2601     CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadcontextswitchtime;
2602     if (RtsFlags.GranFlags.GranSimStats.Full && 
2603         (!RtsFlags.GranFlags.Light || RtsFlags.GranFlags.Debug.checkLight)) 
2604                                       /* right flag !?? ^^^ */ 
2605       DumpRawGranEvent(proc, 0, GR_SCHEDULE, run_queue_hds[proc],
2606                        (StgClosure*)NULL, (StgInt)0, 0);
2607   }
2608 }
2609
2610 /* 
2611    The following GranSim fcts are stg-called from the threaded world.    
2612 */
2613
2614 /* Called from HP_CHK and friends (see StgMacros.h)  */
2615 //@cindex GranSimAllocate
2616 void 
2617 GranSimAllocate(n)
2618 StgInt n;
2619 {
2620   CurrentTSO->gran.allocs += n;
2621   ++(CurrentTSO->gran.basicblocks);
2622
2623   if (RtsFlags.GranFlags.GranSimStats.Heap) {
2624       DumpRawGranEvent(CurrentProc, 0, GR_ALLOC, CurrentTSO,
2625                        (StgClosure*)NULL, (StgInt)0, n);
2626   }
2627   
2628   CurrentTSO->gran.exectime += RtsFlags.GranFlags.Costs.heapalloc_cost;
2629   CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.heapalloc_cost;
2630 }
2631
2632 /*
2633   Subtract the values added above, if a heap check fails and
2634   so has to be redone.
2635 */
2636 //@cindex GranSimUnallocate
2637 void 
2638 GranSimUnallocate(n)
2639 StgInt n;
2640 {
2641   CurrentTSO->gran.allocs -= n;
2642   --(CurrentTSO->gran.basicblocks);
2643   
2644   CurrentTSO->gran.exectime -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2645   CurrentTime[CurrentProc] -= RtsFlags.GranFlags.Costs.heapalloc_cost;
2646 }
2647
2648 /* NB: We now inline this code via GRAN_EXEC rather than calling this fct */
2649 //@cindex GranSimExec
2650 void 
2651 GranSimExec(ariths,branches,loads,stores,floats)
2652 StgWord ariths,branches,loads,stores,floats;
2653 {
2654   StgWord cost = RtsFlags.GranFlags.Costs.arith_cost*ariths + 
2655             RtsFlags.GranFlags.Costs.branch_cost*branches + 
2656             RtsFlags.GranFlags.Costs.load_cost * loads +
2657             RtsFlags.GranFlags.Costs.store_cost*stores + 
2658             RtsFlags.GranFlags.Costs.float_cost*floats;
2659
2660   CurrentTSO->gran.exectime += cost;
2661   CurrentTime[CurrentProc] += cost;
2662 }
2663
2664 /* 
2665    Fetch the node if it isn't local
2666    -- result indicates whether fetch has been done.
2667
2668    This is GRIP-style single item fetching.
2669 */
2670
2671 //@cindex GranSimFetch
2672 StgInt 
2673 GranSimFetch(node /* , liveness_mask */ )
2674 StgClosure *node;
2675 /* StgInt liveness_mask; */
2676 {
2677   /* reset the return value (to be checked within STG land) */
2678   NeedToReSchedule = rtsFalse;   
2679
2680   if (RtsFlags.GranFlags.Light) {
2681      /* Always reschedule in GrAnSim-Light to prevent one TSO from
2682         running off too far 
2683      new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2684               ContinueThread,CurrentTSO,node,NULL);
2685      */
2686      return(0); 
2687   }
2688
2689   /* Faking an RBH closure:
2690      If the bitmask of the closure is 0 then this node is a fake RBH;
2691   */
2692   if (node->header.gran.procs == Nowhere) {
2693     IF_GRAN_DEBUG(bq,
2694                   belch("## Found fake RBH (node %p); delaying TSO %d (%p)", 
2695                         node, CurrentTSO->id, CurrentTSO));
2696                   
2697     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc]+10000,
2698               ContinueThread, CurrentTSO, node, (rtsSpark*)NULL);
2699
2700     /* Rescheduling (GranSim internal) is necessary */
2701     NeedToReSchedule = rtsTrue;
2702     
2703     return(1); 
2704   }
2705
2706   /* Note: once a node has been fetched, this test will be passed */
2707   if (!IS_LOCAL_TO(PROCS(node),CurrentProc))
2708     {
2709       PEs p = where_is(node);
2710       rtsTime fetchtime;
2711       
2712       IF_GRAN_DEBUG(thunkStealing,
2713                     if (p==CurrentProc) 
2714                       belch("GranSimFetch: Trying to fetch from own processor%u\n", p););
2715       
2716       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2717       /* NB: Fetch is counted on arrival (FetchReply) */
2718       
2719       fetchtime = stg_max(CurrentTime[CurrentProc],CurrentTime[p]) +
2720         RtsFlags.GranFlags.Costs.latency;
2721       
2722       new_event(p, CurrentProc, fetchtime,
2723                 FetchNode, CurrentTSO, node, (rtsSpark*)NULL);
2724       
2725       if (fetchtime<TimeOfNextEvent)
2726         TimeOfNextEvent = fetchtime;
2727       
2728       /* About to block */
2729       CurrentTSO->gran.blockedat = CurrentTime[CurrentProc];
2730       
2731       ++OutstandingFetches[CurrentProc];
2732       
2733       if (RtsFlags.GranFlags.DoAsyncFetch) 
2734         /* if asynchr comm is turned on, activate the next thread in the q */
2735         ActivateNextThread(CurrentProc);
2736       else
2737         procStatus[CurrentProc] = Fetching;
2738
2739 #if 0 
2740       /* ToDo: nuke the entire if (anything special for fair schedule?) */
2741       if (RtsFlags.GranFlags.DoAsyncFetch) 
2742         {
2743           /* Remove CurrentTSO from the queue -- assumes head of queue == CurrentTSO */
2744           if(!RtsFlags.GranFlags.DoFairSchedule)
2745             {
2746               /* now done in do_the_fetchnode 
2747               if (RtsFlags.GranFlags.GranSimStats.Full)
2748                 DumpRawGranEvent(CurrentProc, p, GR_FETCH, CurrentTSO,
2749                                  node, (StgInt)0, 0);
2750               */                                
2751               ActivateNextThread(CurrentProc);
2752               
2753 # if 0 && defined(GRAN_CHECK)
2754               if (RtsFlags.GranFlags.Debug.blockOnFetch_sanity) {
2755                 if (TSO_TYPE(CurrentTSO) & FETCH_MASK_TSO) {
2756                   fprintf(stderr,"FetchNode: TSO 0x%x has fetch-mask set @ %d\n",
2757                           CurrentTSO,CurrentTime[CurrentProc]);
2758                   stg_exit(EXIT_FAILURE);
2759                 } else {
2760                   TSO_TYPE(CurrentTSO) |= FETCH_MASK_TSO;
2761                 }
2762               }
2763 # endif
2764               CurrentTSO->link = END_TSO_QUEUE;
2765               /* CurrentTSO = END_TSO_QUEUE; */
2766               
2767               /* CurrentTSO is pointed to by the FetchNode event; it is
2768                  on no run queue any more */
2769           } else {  /* fair scheduling currently not supported -- HWL */
2770             barf("Asynchr communication is not yet compatible with fair scheduling\n");
2771           }
2772         } else {                /* !RtsFlags.GranFlags.DoAsyncFetch */
2773           procStatus[CurrentProc] = Fetching; // ToDo: BlockedOnFetch;
2774           /* now done in do_the_fetchnode 
2775           if (RtsFlags.GranFlags.GranSimStats.Full)
2776             DumpRawGranEvent(CurrentProc, p,
2777                              GR_FETCH, CurrentTSO, node, (StgInt)0, 0);
2778           */
2779           IF_GRAN_DEBUG(blockOnFetch, 
2780                         BlockedOnFetch[CurrentProc] = CurrentTSO;); /*- rtsTrue; -*/
2781         }
2782 #endif /* 0 */
2783
2784       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2785       
2786       /* Rescheduling (GranSim internal) is necessary */
2787       NeedToReSchedule = rtsTrue;
2788       
2789       return(1); 
2790     }
2791   return(0);
2792 }
2793
2794 //@cindex GranSimSpark
2795 void 
2796 GranSimSpark(local,node)
2797 StgInt local;
2798 StgClosure *node;
2799 {
2800   /* ++SparksAvail;  Nope; do that in add_to_spark_queue */
2801   if (RtsFlags.GranFlags.GranSimStats.Sparks)
2802     DumpRawGranEvent(CurrentProc, (PEs)0, SP_SPARK,
2803                      END_TSO_QUEUE, node, (StgInt)0, spark_queue_len(CurrentProc)-1);
2804
2805   /* Force the PE to take notice of the spark */
2806   if(RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2807     new_event(CurrentProc,CurrentProc,CurrentTime[CurrentProc],
2808               FindWork,
2809               END_TSO_QUEUE, (StgClosure*)NULL, (rtsSpark*)NULL);
2810     if (CurrentTime[CurrentProc]<TimeOfNextEvent)
2811       TimeOfNextEvent = CurrentTime[CurrentProc];
2812   }
2813
2814   if(local)
2815     ++CurrentTSO->gran.localsparks;
2816   else
2817     ++CurrentTSO->gran.globalsparks;
2818 }
2819
2820 //@cindex GranSimSparkAt
2821 void 
2822 GranSimSparkAt(spark,where,identifier)
2823 rtsSpark *spark;
2824 StgClosure *where;    /* This should be a node; alternatively could be a GA */
2825 StgInt identifier;
2826 {
2827   PEs p = where_is(where);
2828   GranSimSparkAtAbs(spark,p,identifier);
2829 }
2830
2831 //@cindex GranSimSparkAtAbs
2832 void 
2833 GranSimSparkAtAbs(spark,proc,identifier)
2834 rtsSpark *spark;
2835 PEs proc;        
2836 StgInt identifier;
2837 {
2838   rtsTime exporttime;
2839
2840   if (spark == (rtsSpark *)NULL) /* Note: Granularity control might have */
2841     return;                          /* turned a spark into a NULL. */
2842
2843   /* ++SparksAvail; Nope; do that in add_to_spark_queue */
2844   if(RtsFlags.GranFlags.GranSimStats.Sparks)
2845     DumpRawGranEvent(proc,0,SP_SPARKAT,
2846                      END_TSO_QUEUE, spark->node, (StgInt)0, spark_queue_len(proc));
2847
2848   if (proc!=CurrentProc) {
2849     CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime;
2850     exporttime = (CurrentTime[proc] > CurrentTime[CurrentProc]? 
2851                   CurrentTime[proc]: CurrentTime[CurrentProc])
2852                  + RtsFlags.GranFlags.Costs.latency;
2853   } else {
2854     exporttime = CurrentTime[CurrentProc];
2855   }
2856
2857   if ( RtsFlags.GranFlags.Light )
2858     /* Need CurrentTSO in event field to associate costs with creating
2859        spark even in a GrAnSim Light setup */
2860     new_event(proc, CurrentProc, exporttime,
2861               MoveSpark,
2862               CurrentTSO, spark->node, spark);
2863   else
2864     new_event(proc, CurrentProc, exporttime,
2865               MoveSpark, (StgTSO*)NULL, spark->node, spark);
2866   /* Bit of a hack to treat placed sparks the same as stolen sparks */
2867   ++OutstandingFishes[proc];
2868
2869   /* Force the PE to take notice of the spark (FINDWORK is put after a
2870      MoveSpark into the sparkq!) */
2871   if (RtsFlags.GranFlags.DoAlwaysCreateThreads) {
2872     new_event(CurrentProc,CurrentProc,exporttime+1,
2873               FindWork,
2874               (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
2875   }
2876
2877   if (exporttime<TimeOfNextEvent)
2878     TimeOfNextEvent = exporttime;
2879
2880   if (proc!=CurrentProc) {
2881     CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mtidytime;
2882     ++CurrentTSO->gran.globalsparks;
2883   } else { 
2884     ++CurrentTSO->gran.localsparks;
2885   }
2886 }
2887
2888 /* 
2889    This function handles local and global blocking.  It's called either
2890    from threaded code (RBH_entry, BH_entry etc) or from blockFetch when
2891    trying to fetch an BH or RBH 
2892 */
2893
2894 //@cindex GranSimBlock
2895 void 
2896 GranSimBlock(tso, proc, node)
2897 StgTSO *tso;
2898 PEs proc;
2899 StgClosure *node;
2900 {
2901   PEs node_proc = where_is(node), tso_proc = where_is(tso);
2902
2903   ASSERT(tso_proc==CurrentProc);
2904   // ASSERT(node_proc==CurrentProc);
2905   IF_GRAN_DEBUG(bq,
2906                 if (node_proc!=CurrentProc) 
2907                   belch("## ghuH: TSO %d (%lx) [PE %d] blocks on non-local node %p [PE %d] (no simulation of FETCHMEs)",
2908                         tso->id, tso, tso_proc, node, node_proc)); 
2909   ASSERT(tso->link==END_TSO_QUEUE);
2910   ASSERT(!is_on_queue(tso,proc)); // tso must not be on run queue already!
2911   //ASSERT(tso==run_queue_hds[proc]);
2912
2913   IF_DEBUG(gran,
2914            belch("GRAN: TSO %d (%p) [PE %d] blocks on closure %p @ %lx",
2915                  tso->id, tso, proc, node, CurrentTime[proc]);)
2916
2917
2918     /* THIS SHOULD NEVER HAPPEN!
2919        If tso tries to block on a remote node (i.e. node_proc!=CurrentProc)
2920        we have missed a GranSimFetch before entering this closure;
2921        we hack around it for now, faking a FetchNode; 
2922        because GranSimBlock is entered via a BLACKHOLE(_BQ) closure,
2923        tso will be blocked on this closure until the FetchReply occurs.
2924
2925        ngoq Dogh! 
2926
2927     if (node_proc!=CurrentProc) {
2928       StgInt ret;
2929       ret = GranSimFetch(node);
2930       IF_GRAN_DEBUG(bq,
2931                     if (ret)
2932                       belch(".. GranSimBlock: faking a FetchNode of node %p from %d to %d",
2933                             node, node_proc, CurrentProc););
2934       return;
2935     }
2936     */
2937
2938   if (RtsFlags.GranFlags.GranSimStats.Full)
2939     DumpRawGranEvent(proc,node_proc,GR_BLOCK,tso,node,(StgInt)0,0);
2940
2941   ++(tso->gran.blockcount);
2942   /* Distinction  between local and global block is made in blockFetch */
2943   tso->gran.blockedat = CurrentTime[proc];
2944
2945   CurrentTime[proc] += RtsFlags.GranFlags.Costs.threadqueuetime;
2946   ActivateNextThread(proc);
2947   /* tso->link = END_TSO_QUEUE;    not really necessary; only for testing */
2948 }
2949
2950 #endif /* GRAN */
2951
2952 //@node Index,  , Dumping routines, GranSim specific code
2953 //@subsection Index
2954
2955 //@index
2956 //* ActivateNextThread::  @cindex\s-+ActivateNextThread
2957 //* CurrentProc::  @cindex\s-+CurrentProc
2958 //* CurrentTime::  @cindex\s-+CurrentTime
2959 //* GranSimAllocate::  @cindex\s-+GranSimAllocate
2960 //* GranSimBlock::  @cindex\s-+GranSimBlock
2961 //* GranSimExec::  @cindex\s-+GranSimExec
2962 //* GranSimFetch::  @cindex\s-+GranSimFetch
2963 //* GranSimLight_insertThread::  @cindex\s-+GranSimLight_insertThread
2964 //* GranSimSpark::  @cindex\s-+GranSimSpark
2965 //* GranSimSparkAt::  @cindex\s-+GranSimSparkAt
2966 //* GranSimSparkAtAbs::  @cindex\s-+GranSimSparkAtAbs
2967 //* GranSimUnallocate::  @cindex\s-+GranSimUnallocate
2968 //* any_idle::  @cindex\s-+any_idle
2969 //* blockFetch::  @cindex\s-+blockFetch
2970 //* do_the_fetchnode::  @cindex\s-+do_the_fetchnode
2971 //* do_the_fetchreply::  @cindex\s-+do_the_fetchreply
2972 //* do_the_findwork::  @cindex\s-+do_the_findwork
2973 //* do_the_globalblock::  @cindex\s-+do_the_globalblock
2974 //* do_the_movespark::  @cindex\s-+do_the_movespark
2975 //* do_the_movethread::  @cindex\s-+do_the_movethread
2976 //* do_the_startthread::  @cindex\s-+do_the_startthread
2977 //* do_the_unblock::  @cindex\s-+do_the_unblock
2978 //* fetchNode::  @cindex\s-+fetchNode
2979 //* ga_to_proc::  @cindex\s-+ga_to_proc
2980 //* get_next_event::  @cindex\s-+get_next_event
2981 //* get_time_of_next_event::  @cindex\s-+get_time_of_next_event
2982 //* grab_event::  @cindex\s-+grab_event
2983 //* handleFetchRequest::  @cindex\s-+handleFetchRequest
2984 //* handleIdlePEs::  @cindex\s-+handleIdlePEs
2985 //* idlers::  @cindex\s-+idlers
2986 //* insertThread::  @cindex\s-+insertThread
2987 //* insert_event::  @cindex\s-+insert_event
2988 //* is_on_queue::  @cindex\s-+is_on_queue
2989 //* is_unique::  @cindex\s-+is_unique
2990 //* new_event::  @cindex\s-+new_event
2991 //* prepend_event::  @cindex\s-+prepend_event
2992 //* print_event::  @cindex\s-+print_event
2993 //* print_eventq::  @cindex\s-+print_eventq
2994 //* prune_eventq ::  @cindex\s-+prune_eventq 
2995 //* spark queue::  @cindex\s-+spark queue
2996 //* sparkStealTime::  @cindex\s-+sparkStealTime
2997 //* stealSomething::  @cindex\s-+stealSomething
2998 //* stealSpark::  @cindex\s-+stealSpark
2999 //* stealSparkMagic::  @cindex\s-+stealSparkMagic
3000 //* stealThread::  @cindex\s-+stealThread
3001 //* stealThreadMagic::  @cindex\s-+stealThreadMagic
3002 //* thread_queue_len::  @cindex\s-+thread_queue_len
3003 //* traverse_eventq_for_gc::  @cindex\s-+traverse_eventq_for_gc
3004 //* where_is::  @cindex\s-+where_is
3005 //@end index