6f4e5f963855585b18b3978f661fca1886ab4760
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
80 #include  "Task.h"
81
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
84 #endif
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88
89 #include <string.h>
90 #include <stdlib.h>
91 #include <stdarg.h>
92
93 #ifdef HAVE_ERRNO_H
94 #include <errno.h>
95 #endif
96
97 #ifdef THREADED_RTS
98 #define USED_IN_THREADED_RTS
99 #else
100 #define USED_IN_THREADED_RTS STG_UNUSED
101 #endif
102
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #else
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 #endif
108
109 /* Main thread queue.
110  * Locks required: sched_mutex.
111  */
112 StgMainThread *main_threads = NULL;
113
114 /* Thread queues.
115  * Locks required: sched_mutex.
116  */
117 #if defined(GRAN)
118
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122 /* 
123    In GranSim we have a runnable and a blocked queue for each processor.
124    In order to minimise code changes new arrays run_queue_hds/tls
125    are created. run_queue_hd is then a short cut (macro) for
126    run_queue_hds[CurrentProc] (see GranSim.h).
127    -- HWL
128 */
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133    the std RTS (i.e. we are cheating). However, we don't use this list in
134    the GranSim specific code at the moment (so we are only potentially
135    cheating).  */
136
137 #else /* !GRAN */
138
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144
145 #endif
146
147 /* Linked list of all threads.
148  * Used for detecting garbage collected threads.
149  */
150 StgTSO *all_threads = NULL;
151
152 /* When a thread performs a safe C call (_ccall_GC, using old
153  * terminology), it gets put on the suspended_ccalling_threads
154  * list. Used by the garbage collector.
155  */
156 static StgTSO *suspended_ccalling_threads;
157
158 static StgTSO *threadStackOverflow(StgTSO *tso);
159
160 /* KH: The following two flags are shared memory locations.  There is no need
161        to lock them, since they are only unset at the end of a scheduler
162        operation.
163 */
164
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
167
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
170
171 /* If this flag is set, we are running Haskell code.  Used to detect
172  * uses of 'foreign import unsafe' that should be 'safe'.
173  */
174 rtsBool in_haskell = rtsFalse;
175
176 /* Next thread ID to allocate.
177  * Locks required: thread_id_mutex
178  */
179 static StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the closure to enter)
191  *  + 1                       (stg_ap_v_ret)
192  *  + 1                       (spare slot req'd by stg_ap_v_ret)
193  *
194  * A thread with this stack will bomb immediately with a stack
195  * overflow, which will increase its stack size.  
196  */
197
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
199
200
201 #if defined(GRAN)
202 StgTSO *CurrentTSO;
203 #endif
204
205 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
206  *  exists - earlier gccs apparently didn't.
207  *  -= chak
208  */
209 StgTSO dummy_tso;
210
211 static rtsBool ready_to_gc;
212
213 /*
214  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
215  * in an MT setting, needed to signal that a worker thread shouldn't hang around
216  * in the scheduler when it is out of work.
217  */
218 static rtsBool shutting_down_scheduler = rtsFalse;
219
220 void            addToBlockedQueue ( StgTSO *tso );
221
222 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
223        void     interruptStgRts   ( void );
224
225 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
226 static void     detectBlackHoles  ( void );
227 #endif
228
229 static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
230
231 #if defined(RTS_SUPPORTS_THREADS)
232 /* ToDo: carefully document the invariants that go together
233  *       with these synchronisation objects.
234  */
235 Mutex     sched_mutex       = INIT_MUTEX_VAR;
236 Mutex     term_mutex        = INIT_MUTEX_VAR;
237
238 #endif /* RTS_SUPPORTS_THREADS */
239
240 #if defined(PAR)
241 StgTSO *LastTSO;
242 rtsTime TimeOfLastYield;
243 rtsBool emitSchedule = rtsTrue;
244 #endif
245
246 #if DEBUG
247 static char *whatNext_strs[] = {
248   "(unknown)",
249   "ThreadRunGHC",
250   "ThreadInterpret",
251   "ThreadKilled",
252   "ThreadRelocated",
253   "ThreadComplete"
254 };
255 #endif
256
257 #if defined(PAR)
258 StgTSO * createSparkThread(rtsSpark spark);
259 StgTSO * activateSpark (rtsSpark spark);  
260 #endif
261
262 /* ----------------------------------------------------------------------------
263  * Starting Tasks
264  * ------------------------------------------------------------------------- */
265
266 #if defined(RTS_SUPPORTS_THREADS)
267 static rtsBool startingWorkerThread = rtsFalse;
268
269 static void taskStart(void);
270 static void
271 taskStart(void)
272 {
273   ACQUIRE_LOCK(&sched_mutex);
274   startingWorkerThread = rtsFalse;
275   schedule(NULL,NULL);
276   RELEASE_LOCK(&sched_mutex);
277 }
278
279 void
280 startSchedulerTaskIfNecessary(void)
281 {
282   if(run_queue_hd != END_TSO_QUEUE
283     || blocked_queue_hd != END_TSO_QUEUE
284     || sleeping_queue != END_TSO_QUEUE)
285   {
286     if(!startingWorkerThread)
287     { // we don't want to start another worker thread
288       // just because the last one hasn't yet reached the
289       // "waiting for capability" state
290       startingWorkerThread = rtsTrue;
291       if(!startTask(taskStart))
292       {
293         startingWorkerThread = rtsFalse;
294       }
295     }
296   }
297 }
298 #endif
299
300 /* ---------------------------------------------------------------------------
301    Main scheduling loop.
302
303    We use round-robin scheduling, each thread returning to the
304    scheduler loop when one of these conditions is detected:
305
306       * out of heap space
307       * timer expires (thread yields)
308       * thread blocks
309       * thread ends
310       * stack overflow
311
312    Locking notes:  we acquire the scheduler lock once at the beginning
313    of the scheduler loop, and release it when
314     
315       * running a thread, or
316       * waiting for work, or
317       * waiting for a GC to complete.
318
319    GRAN version:
320      In a GranSim setup this loop iterates over the global event queue.
321      This revolves around the global event queue, which determines what 
322      to do next. Therefore, it's more complicated than either the 
323      concurrent or the parallel (GUM) setup.
324
325    GUM version:
326      GUM iterates over incoming messages.
327      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
328      and sends out a fish whenever it has nothing to do; in-between
329      doing the actual reductions (shared code below) it processes the
330      incoming messages and deals with delayed operations 
331      (see PendingFetches).
332      This is not the ugliest code you could imagine, but it's bloody close.
333
334    ------------------------------------------------------------------------ */
335 static void
336 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
337           Capability *initialCapability )
338 {
339   StgTSO *t;
340   Capability *cap;
341   StgThreadReturnCode ret;
342 #if defined(GRAN)
343   rtsEvent *event;
344 #elif defined(PAR)
345   StgSparkPool *pool;
346   rtsSpark spark;
347   StgTSO *tso;
348   GlobalTaskId pe;
349   rtsBool receivedFinish = rtsFalse;
350 # if defined(DEBUG)
351   nat tp_size, sp_size; // stats only
352 # endif
353 #endif
354   rtsBool was_interrupted = rtsFalse;
355   nat prev_what_next;
356   
357   // Pre-condition: sched_mutex is held.
358   // We might have a capability, passed in as initialCapability.
359   cap = initialCapability;
360
361 #if defined(RTS_SUPPORTS_THREADS)
362   //
363   // in the threaded case, the capability is either passed in via the
364   // initialCapability parameter, or initialized inside the scheduler
365   // loop 
366   //
367   IF_DEBUG(scheduler,
368            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
369                        mainThread, initialCapability);
370       );
371 #else
372   // simply initialise it in the non-threaded case
373   grabCapability(&cap);
374 #endif
375
376 #if defined(GRAN)
377   /* set up first event to get things going */
378   /* ToDo: assign costs for system setup and init MainTSO ! */
379   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
380             ContinueThread, 
381             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
382
383   IF_DEBUG(gran,
384            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
385            G_TSO(CurrentTSO, 5));
386
387   if (RtsFlags.GranFlags.Light) {
388     /* Save current time; GranSim Light only */
389     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
390   }      
391
392   event = get_next_event();
393
394   while (event!=(rtsEvent*)NULL) {
395     /* Choose the processor with the next event */
396     CurrentProc = event->proc;
397     CurrentTSO = event->tso;
398
399 #elif defined(PAR)
400
401   while (!receivedFinish) {    /* set by processMessages */
402                                /* when receiving PP_FINISH message         */ 
403
404 #else // everything except GRAN and PAR
405
406   while (1) {
407
408 #endif
409
410      IF_DEBUG(scheduler, printAllThreads());
411
412 #if defined(RTS_SUPPORTS_THREADS)
413       // Yield the capability to higher-priority tasks if necessary.
414       //
415       if (cap != NULL) {
416           yieldCapability(&cap);
417       }
418
419       // If we do not currently hold a capability, we wait for one
420       //
421       if (cap == NULL) {
422           waitForCapability(&sched_mutex, &cap,
423                             mainThread ? &mainThread->bound_thread_cond : NULL);
424       }
425
426       // We now have a capability...
427 #endif
428
429     // Check whether we have re-entered the RTS from Haskell without
430     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
431     // call).
432     if (in_haskell) {
433           errorBelch("schedule: re-entered unsafely.\n"
434                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
435           stg_exit(1);
436     }
437
438     //
439     // If we're interrupted (the user pressed ^C, or some other
440     // termination condition occurred), kill all the currently running
441     // threads.
442     //
443     if (interrupted) {
444         IF_DEBUG(scheduler, sched_belch("interrupted"));
445         interrupted = rtsFalse;
446         was_interrupted = rtsTrue;
447 #if defined(RTS_SUPPORTS_THREADS)
448         // In the threaded RTS, deadlock detection doesn't work,
449         // so just exit right away.
450         errorBelch("interrupted");
451         releaseCapability(cap);
452         RELEASE_LOCK(&sched_mutex);
453         shutdownHaskellAndExit(EXIT_SUCCESS);
454 #else
455         deleteAllThreads();
456 #endif
457     }
458
459 #if defined(RTS_USER_SIGNALS)
460     // check for signals each time around the scheduler
461     if (signals_pending()) {
462       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
463       startSignalHandlers();
464       ACQUIRE_LOCK(&sched_mutex);
465     }
466 #endif
467
468     //
469     // Check whether any waiting threads need to be woken up.  If the
470     // run queue is empty, and there are no other tasks running, we
471     // can wait indefinitely for something to happen.
472     //
473     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
474     {
475 #if defined(RTS_SUPPORTS_THREADS)
476         // We shouldn't be here...
477         barf("schedule: awaitEvent() in threaded RTS");
478 #endif
479         awaitEvent( EMPTY_RUN_QUEUE() );
480     }
481     // we can be interrupted while waiting for I/O...
482     if (interrupted) continue;
483
484     /* 
485      * Detect deadlock: when we have no threads to run, there are no
486      * threads waiting on I/O or sleeping, and all the other tasks are
487      * waiting for work, we must have a deadlock of some description.
488      *
489      * We first try to find threads blocked on themselves (ie. black
490      * holes), and generate NonTermination exceptions where necessary.
491      *
492      * If no threads are black holed, we have a deadlock situation, so
493      * inform all the main threads.
494      */
495 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
496     if (   EMPTY_THREAD_QUEUES() )
497     {
498         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
499
500         // Garbage collection can release some new threads due to
501         // either (a) finalizers or (b) threads resurrected because
502         // they are unreachable and will therefore be sent an
503         // exception.  Any threads thus released will be immediately
504         // runnable.
505         GarbageCollect(GetRoots,rtsTrue);
506         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
507
508 #if defined(RTS_USER_SIGNALS)
509         /* If we have user-installed signal handlers, then wait
510          * for signals to arrive rather then bombing out with a
511          * deadlock.
512          */
513         if ( anyUserHandlers() ) {
514             IF_DEBUG(scheduler, 
515                      sched_belch("still deadlocked, waiting for signals..."));
516
517             awaitUserSignals();
518
519             // we might be interrupted...
520             if (interrupted) { continue; }
521
522             if (signals_pending()) {
523                 RELEASE_LOCK(&sched_mutex);
524                 startSignalHandlers();
525                 ACQUIRE_LOCK(&sched_mutex);
526             }
527             ASSERT(!EMPTY_RUN_QUEUE());
528             goto not_deadlocked;
529         }
530 #endif
531
532         /* Probably a real deadlock.  Send the current main thread the
533          * Deadlock exception (or in the SMP build, send *all* main
534          * threads the deadlock exception, since none of them can make
535          * progress).
536          */
537         {
538             StgMainThread *m;
539             m = main_threads;
540             switch (m->tso->why_blocked) {
541             case BlockedOnBlackHole:
542             case BlockedOnException:
543             case BlockedOnMVar:
544                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
545                 break;
546             default:
547                 barf("deadlock: main thread blocked in a strange way");
548             }
549         }
550     }
551   not_deadlocked:
552
553 #elif defined(RTS_SUPPORTS_THREADS)
554     // ToDo: add deadlock detection in threaded RTS
555 #elif defined(PAR)
556     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
557 #endif
558
559 #if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
560     /* win32: might be back here due to awaitEvent() being abandoned
561      * as a result of a console event having been delivered.
562      */
563     if ( EMPTY_RUN_QUEUE() ) {
564         continue; // nothing to do
565     }
566 #endif
567
568 #if defined(GRAN)
569     if (RtsFlags.GranFlags.Light)
570       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
571
572     /* adjust time based on time-stamp */
573     if (event->time > CurrentTime[CurrentProc] &&
574         event->evttype != ContinueThread)
575       CurrentTime[CurrentProc] = event->time;
576     
577     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
578     if (!RtsFlags.GranFlags.Light)
579       handleIdlePEs();
580
581     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
582
583     /* main event dispatcher in GranSim */
584     switch (event->evttype) {
585       /* Should just be continuing execution */
586     case ContinueThread:
587       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
588       /* ToDo: check assertion
589       ASSERT(run_queue_hd != (StgTSO*)NULL &&
590              run_queue_hd != END_TSO_QUEUE);
591       */
592       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
593       if (!RtsFlags.GranFlags.DoAsyncFetch &&
594           procStatus[CurrentProc]==Fetching) {
595         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
596               CurrentTSO->id, CurrentTSO, CurrentProc);
597         goto next_thread;
598       } 
599       /* Ignore ContinueThreads for completed threads */
600       if (CurrentTSO->what_next == ThreadComplete) {
601         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
602               CurrentTSO->id, CurrentTSO, CurrentProc);
603         goto next_thread;
604       } 
605       /* Ignore ContinueThreads for threads that are being migrated */
606       if (PROCS(CurrentTSO)==Nowhere) { 
607         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
608               CurrentTSO->id, CurrentTSO, CurrentProc);
609         goto next_thread;
610       }
611       /* The thread should be at the beginning of the run queue */
612       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
613         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
614               CurrentTSO->id, CurrentTSO, CurrentProc);
615         break; // run the thread anyway
616       }
617       /*
618       new_event(proc, proc, CurrentTime[proc],
619                 FindWork,
620                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
621       goto next_thread; 
622       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
623       break; // now actually run the thread; DaH Qu'vam yImuHbej 
624
625     case FetchNode:
626       do_the_fetchnode(event);
627       goto next_thread;             /* handle next event in event queue  */
628       
629     case GlobalBlock:
630       do_the_globalblock(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case FetchReply:
634       do_the_fetchreply(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case UnblockThread:   /* Move from the blocked queue to the tail of */
638       do_the_unblock(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case ResumeThread:  /* Move from the blocked queue to the tail of */
642       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
643       event->tso->gran.blocktime += 
644         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
645       do_the_startthread(event);
646       goto next_thread;             /* handle next event in event queue  */
647       
648     case StartThread:
649       do_the_startthread(event);
650       goto next_thread;             /* handle next event in event queue  */
651       
652     case MoveThread:
653       do_the_movethread(event);
654       goto next_thread;             /* handle next event in event queue  */
655       
656     case MoveSpark:
657       do_the_movespark(event);
658       goto next_thread;             /* handle next event in event queue  */
659       
660     case FindWork:
661       do_the_findwork(event);
662       goto next_thread;             /* handle next event in event queue  */
663       
664     default:
665       barf("Illegal event type %u\n", event->evttype);
666     }  /* switch */
667     
668     /* This point was scheduler_loop in the old RTS */
669
670     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
671
672     TimeOfLastEvent = CurrentTime[CurrentProc];
673     TimeOfNextEvent = get_time_of_next_event();
674     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
675     // CurrentTSO = ThreadQueueHd;
676
677     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
678                          TimeOfNextEvent));
679
680     if (RtsFlags.GranFlags.Light) 
681       GranSimLight_leave_system(event, &ActiveTSO); 
682
683     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
684
685     IF_DEBUG(gran, 
686              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
687
688     /* in a GranSim setup the TSO stays on the run queue */
689     t = CurrentTSO;
690     /* Take a thread from the run queue. */
691     POP_RUN_QUEUE(t); // take_off_run_queue(t);
692
693     IF_DEBUG(gran, 
694              debugBelch("GRAN: About to run current thread, which is\n");
695              G_TSO(t,5));
696
697     context_switch = 0; // turned on via GranYield, checking events and time slice
698
699     IF_DEBUG(gran, 
700              DumpGranEvent(GR_SCHEDULE, t));
701
702     procStatus[CurrentProc] = Busy;
703
704 #elif defined(PAR)
705     if (PendingFetches != END_BF_QUEUE) {
706         processFetches();
707     }
708
709     /* ToDo: phps merge with spark activation above */
710     /* check whether we have local work and send requests if we have none */
711     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
712       /* :-[  no local threads => look out for local sparks */
713       /* the spark pool for the current PE */
714       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
715       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
716           pool->hd < pool->tl) {
717         /* 
718          * ToDo: add GC code check that we really have enough heap afterwards!!
719          * Old comment:
720          * If we're here (no runnable threads) and we have pending
721          * sparks, we must have a space problem.  Get enough space
722          * to turn one of those pending sparks into a
723          * thread... 
724          */
725
726         spark = findSpark(rtsFalse);                /* get a spark */
727         if (spark != (rtsSpark) NULL) {
728           tso = activateSpark(spark);       /* turn the spark into a thread */
729           IF_PAR_DEBUG(schedule,
730                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
731                              tso->id, tso, advisory_thread_count));
732
733           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
734             debugBelch("==^^ failed to activate spark\n");
735             goto next_thread;
736           }               /* otherwise fall through & pick-up new tso */
737         } else {
738           IF_PAR_DEBUG(verbose,
739                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
740                              spark_queue_len(pool)));
741           goto next_thread;
742         }
743       }
744
745       /* If we still have no work we need to send a FISH to get a spark
746          from another PE 
747       */
748       if (EMPTY_RUN_QUEUE()) {
749       /* =8-[  no local sparks => look for work on other PEs */
750         /*
751          * We really have absolutely no work.  Send out a fish
752          * (there may be some out there already), and wait for
753          * something to arrive.  We clearly can't run any threads
754          * until a SCHEDULE or RESUME arrives, and so that's what
755          * we're hoping to see.  (Of course, we still have to
756          * respond to other types of messages.)
757          */
758         TIME now = msTime() /*CURRENT_TIME*/;
759         IF_PAR_DEBUG(verbose, 
760                      debugBelch("--  now=%ld\n", now));
761         IF_PAR_DEBUG(verbose,
762                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
763                          (last_fish_arrived_at!=0 &&
764                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
765                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
766                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
767                              last_fish_arrived_at,
768                              RtsFlags.ParFlags.fishDelay, now);
769                      });
770         
771         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
772             (last_fish_arrived_at==0 ||
773              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
774           /* outstandingFishes is set in sendFish, processFish;
775              avoid flooding system with fishes via delay */
776           pe = choosePE();
777           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
778                    NEW_FISH_HUNGER);
779
780           // Global statistics: count no. of fishes
781           if (RtsFlags.ParFlags.ParStats.Global &&
782               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
783             globalParStats.tot_fish_mess++;
784           }
785         }
786       
787         receivedFinish = processMessages();
788         goto next_thread;
789       }
790     } else if (PacketsWaiting()) {  /* Look for incoming messages */
791       receivedFinish = processMessages();
792     }
793
794     /* Now we are sure that we have some work available */
795     ASSERT(run_queue_hd != END_TSO_QUEUE);
796
797     /* Take a thread from the run queue, if we have work */
798     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
799     IF_DEBUG(sanity,checkTSO(t));
800
801     /* ToDo: write something to the log-file
802     if (RTSflags.ParFlags.granSimStats && !sameThread)
803         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
804
805     CurrentTSO = t;
806     */
807     /* the spark pool for the current PE */
808     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
809
810     IF_DEBUG(scheduler, 
811              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
812                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
813
814 # if 1
815     if (0 && RtsFlags.ParFlags.ParStats.Full && 
816         t && LastTSO && t->id != LastTSO->id && 
817         LastTSO->why_blocked == NotBlocked && 
818         LastTSO->what_next != ThreadComplete) {
819       // if previously scheduled TSO not blocked we have to record the context switch
820       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
821                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
822     }
823
824     if (RtsFlags.ParFlags.ParStats.Full && 
825         (emitSchedule /* forced emit */ ||
826         (t && LastTSO && t->id != LastTSO->id))) {
827       /* 
828          we are running a different TSO, so write a schedule event to log file
829          NB: If we use fair scheduling we also have to write  a deschedule 
830              event for LastTSO; with unfair scheduling we know that the
831              previous tso has blocked whenever we switch to another tso, so
832              we don't need it in GUM for now
833       */
834       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
835                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
836       emitSchedule = rtsFalse;
837     }
838      
839 # endif
840 #else /* !GRAN && !PAR */
841   
842     // grab a thread from the run queue
843     ASSERT(run_queue_hd != END_TSO_QUEUE);
844     POP_RUN_QUEUE(t);
845
846     // Sanity check the thread we're about to run.  This can be
847     // expensive if there is lots of thread switching going on...
848     IF_DEBUG(sanity,checkTSO(t));
849 #endif
850
851 #ifdef THREADED_RTS
852     {
853       StgMainThread *m = t->main;
854       
855       if(m)
856       {
857         if(m == mainThread)
858         {
859           IF_DEBUG(scheduler,
860             sched_belch("### Running thread %d in bound thread", t->id));
861           // yes, the Haskell thread is bound to the current native thread
862         }
863         else
864         {
865           IF_DEBUG(scheduler,
866             sched_belch("### thread %d bound to another OS thread", t->id));
867           // no, bound to a different Haskell thread: pass to that thread
868           PUSH_ON_RUN_QUEUE(t);
869           passCapability(&m->bound_thread_cond);
870           continue;
871         }
872       }
873       else
874       {
875         if(mainThread != NULL)
876         // The thread we want to run is bound.
877         {
878           IF_DEBUG(scheduler,
879             sched_belch("### this OS thread cannot run thread %d", t->id));
880           // no, the current native thread is bound to a different
881           // Haskell thread, so pass it to any worker thread
882           PUSH_ON_RUN_QUEUE(t);
883           passCapabilityToWorker();
884           continue; 
885         }
886       }
887     }
888 #endif
889
890     cap->r.rCurrentTSO = t;
891     
892     /* context switches are now initiated by the timer signal, unless
893      * the user specified "context switch as often as possible", with
894      * +RTS -C0
895      */
896     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
897          && (run_queue_hd != END_TSO_QUEUE
898              || blocked_queue_hd != END_TSO_QUEUE
899              || sleeping_queue != END_TSO_QUEUE)))
900         context_switch = 1;
901
902 run_thread:
903
904     RELEASE_LOCK(&sched_mutex);
905
906     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
907                               (long)t->id, whatNext_strs[t->what_next]));
908
909 #ifdef PROFILING
910     startHeapProfTimer();
911 #endif
912
913     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
914     /* Run the current thread 
915      */
916     prev_what_next = t->what_next;
917
918     errno = t->saved_errno;
919     in_haskell = rtsTrue;
920
921     switch (prev_what_next) {
922
923     case ThreadKilled:
924     case ThreadComplete:
925         /* Thread already finished, return to scheduler. */
926         ret = ThreadFinished;
927         break;
928
929     case ThreadRunGHC:
930         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
931         break;
932
933     case ThreadInterpret:
934         ret = interpretBCO(cap);
935         break;
936
937     default:
938       barf("schedule: invalid what_next field");
939     }
940
941     in_haskell = rtsFalse;
942
943     // The TSO might have moved, so find the new location:
944     t = cap->r.rCurrentTSO;
945
946     // And save the current errno in this thread.
947     t->saved_errno = errno;
948
949     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
950     
951     /* Costs for the scheduler are assigned to CCS_SYSTEM */
952 #ifdef PROFILING
953     stopHeapProfTimer();
954     CCCS = CCS_SYSTEM;
955 #endif
956     
957     ACQUIRE_LOCK(&sched_mutex);
958     
959 #ifdef RTS_SUPPORTS_THREADS
960     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
961 #elif !defined(GRAN) && !defined(PAR)
962     IF_DEBUG(scheduler,debugBelch("sched: "););
963 #endif
964     
965 #if defined(PAR)
966     /* HACK 675: if the last thread didn't yield, make sure to print a 
967        SCHEDULE event to the log file when StgRunning the next thread, even
968        if it is the same one as before */
969     LastTSO = t; 
970     TimeOfLastYield = CURRENT_TIME;
971 #endif
972
973     switch (ret) {
974     case HeapOverflow:
975 #if defined(GRAN)
976       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
977       globalGranStats.tot_heapover++;
978 #elif defined(PAR)
979       globalParStats.tot_heapover++;
980 #endif
981
982       // did the task ask for a large block?
983       if (cap->r.rHpAlloc > BLOCK_SIZE) {
984           // if so, get one and push it on the front of the nursery.
985           bdescr *bd;
986           nat blocks;
987           
988           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
989
990           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
991                                    (long)t->id, whatNext_strs[t->what_next], blocks));
992
993           // don't do this if it would push us over the
994           // alloc_blocks_lim limit; we'll GC first.
995           if (alloc_blocks + blocks < alloc_blocks_lim) {
996
997               alloc_blocks += blocks;
998               bd = allocGroup( blocks );
999
1000               // link the new group into the list
1001               bd->link = cap->r.rCurrentNursery;
1002               bd->u.back = cap->r.rCurrentNursery->u.back;
1003               if (cap->r.rCurrentNursery->u.back != NULL) {
1004                   cap->r.rCurrentNursery->u.back->link = bd;
1005               } else {
1006                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1007                          g0s0->blocks == cap->r.rNursery);
1008                   cap->r.rNursery = g0s0->blocks = bd;
1009               }           
1010               cap->r.rCurrentNursery->u.back = bd;
1011
1012               // initialise it as a nursery block.  We initialise the
1013               // step, gen_no, and flags field of *every* sub-block in
1014               // this large block, because this is easier than making
1015               // sure that we always find the block head of a large
1016               // block whenever we call Bdescr() (eg. evacuate() and
1017               // isAlive() in the GC would both have to do this, at
1018               // least).
1019               { 
1020                   bdescr *x;
1021                   for (x = bd; x < bd + blocks; x++) {
1022                       x->step = g0s0;
1023                       x->gen_no = 0;
1024                       x->flags = 0;
1025                   }
1026               }
1027
1028               // don't forget to update the block count in g0s0.
1029               g0s0->n_blocks += blocks;
1030               // This assert can be a killer if the app is doing lots
1031               // of large block allocations.
1032               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1033
1034               // now update the nursery to point to the new block
1035               cap->r.rCurrentNursery = bd;
1036
1037               // we might be unlucky and have another thread get on the
1038               // run queue before us and steal the large block, but in that
1039               // case the thread will just end up requesting another large
1040               // block.
1041               PUSH_ON_RUN_QUEUE(t);
1042               break;
1043           }
1044       }
1045
1046       /* make all the running tasks block on a condition variable,
1047        * maybe set context_switch and wait till they all pile in,
1048        * then have them wait on a GC condition variable.
1049        */
1050       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1051                                (long)t->id, whatNext_strs[t->what_next]));
1052       threadPaused(t);
1053 #if defined(GRAN)
1054       ASSERT(!is_on_queue(t,CurrentProc));
1055 #elif defined(PAR)
1056       /* Currently we emit a DESCHEDULE event before GC in GUM.
1057          ToDo: either add separate event to distinguish SYSTEM time from rest
1058                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1059       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1060         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1061                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1062         emitSchedule = rtsTrue;
1063       }
1064 #endif
1065       
1066       ready_to_gc = rtsTrue;
1067       PUSH_ON_RUN_QUEUE(t);
1068       /* actual GC is done at the end of the while loop */
1069       break;
1070       
1071     case StackOverflow:
1072 #if defined(GRAN)
1073       IF_DEBUG(gran, 
1074                DumpGranEvent(GR_DESCHEDULE, t));
1075       globalGranStats.tot_stackover++;
1076 #elif defined(PAR)
1077       // IF_DEBUG(par, 
1078       // DumpGranEvent(GR_DESCHEDULE, t);
1079       globalParStats.tot_stackover++;
1080 #endif
1081       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1082                                (long)t->id, whatNext_strs[t->what_next]));
1083       /* just adjust the stack for this thread, then pop it back
1084        * on the run queue.
1085        */
1086       threadPaused(t);
1087       { 
1088         /* enlarge the stack */
1089         StgTSO *new_t = threadStackOverflow(t);
1090         
1091         /* This TSO has moved, so update any pointers to it from the
1092          * main thread stack.  It better not be on any other queues...
1093          * (it shouldn't be).
1094          */
1095         if (t->main != NULL) {
1096             t->main->tso = new_t;
1097         }
1098         PUSH_ON_RUN_QUEUE(new_t);
1099       }
1100       break;
1101
1102     case ThreadYielding:
1103       // Reset the context switch flag.  We don't do this just before
1104       // running the thread, because that would mean we would lose ticks
1105       // during GC, which can lead to unfair scheduling (a thread hogs
1106       // the CPU because the tick always arrives during GC).  This way
1107       // penalises threads that do a lot of allocation, but that seems
1108       // better than the alternative.
1109       context_switch = 0;
1110
1111 #if defined(GRAN)
1112       IF_DEBUG(gran, 
1113                DumpGranEvent(GR_DESCHEDULE, t));
1114       globalGranStats.tot_yields++;
1115 #elif defined(PAR)
1116       // IF_DEBUG(par, 
1117       // DumpGranEvent(GR_DESCHEDULE, t);
1118       globalParStats.tot_yields++;
1119 #endif
1120       /* put the thread back on the run queue.  Then, if we're ready to
1121        * GC, check whether this is the last task to stop.  If so, wake
1122        * up the GC thread.  getThread will block during a GC until the
1123        * GC is finished.
1124        */
1125       IF_DEBUG(scheduler,
1126                if (t->what_next != prev_what_next) {
1127                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1128                          (long)t->id, whatNext_strs[t->what_next]);
1129                } else {
1130                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1131                          (long)t->id, whatNext_strs[t->what_next]);
1132                }
1133                );
1134
1135       IF_DEBUG(sanity,
1136                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1137                checkTSO(t));
1138       ASSERT(t->link == END_TSO_QUEUE);
1139
1140       // Shortcut if we're just switching evaluators: don't bother
1141       // doing stack squeezing (which can be expensive), just run the
1142       // thread.
1143       if (t->what_next != prev_what_next) {
1144           goto run_thread;
1145       }
1146
1147       threadPaused(t);
1148
1149 #if defined(GRAN)
1150       ASSERT(!is_on_queue(t,CurrentProc));
1151
1152       IF_DEBUG(sanity,
1153                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1154                checkThreadQsSanity(rtsTrue));
1155 #endif
1156
1157 #if defined(PAR)
1158       if (RtsFlags.ParFlags.doFairScheduling) { 
1159         /* this does round-robin scheduling; good for concurrency */
1160         APPEND_TO_RUN_QUEUE(t);
1161       } else {
1162         /* this does unfair scheduling; good for parallelism */
1163         PUSH_ON_RUN_QUEUE(t);
1164       }
1165 #else
1166       // this does round-robin scheduling; good for concurrency
1167       APPEND_TO_RUN_QUEUE(t);
1168 #endif
1169
1170 #if defined(GRAN)
1171       /* add a ContinueThread event to actually process the thread */
1172       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1173                 ContinueThread,
1174                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1175       IF_GRAN_DEBUG(bq, 
1176                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1177                G_EVENTQ(0);
1178                G_CURR_THREADQ(0));
1179 #endif /* GRAN */
1180       break;
1181
1182     case ThreadBlocked:
1183 #if defined(GRAN)
1184       IF_DEBUG(scheduler,
1185                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1186                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1187                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1188
1189       // ??? needed; should emit block before
1190       IF_DEBUG(gran, 
1191                DumpGranEvent(GR_DESCHEDULE, t)); 
1192       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1193       /*
1194         ngoq Dogh!
1195       ASSERT(procStatus[CurrentProc]==Busy || 
1196               ((procStatus[CurrentProc]==Fetching) && 
1197               (t->block_info.closure!=(StgClosure*)NULL)));
1198       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1199           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1200             procStatus[CurrentProc]==Fetching)) 
1201         procStatus[CurrentProc] = Idle;
1202       */
1203 #elif defined(PAR)
1204       IF_DEBUG(scheduler,
1205                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1206                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1207       IF_PAR_DEBUG(bq,
1208
1209                    if (t->block_info.closure!=(StgClosure*)NULL) 
1210                      print_bq(t->block_info.closure));
1211
1212       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1213       blockThread(t);
1214
1215       /* whatever we schedule next, we must log that schedule */
1216       emitSchedule = rtsTrue;
1217
1218 #else /* !GRAN */
1219       /* don't need to do anything.  Either the thread is blocked on
1220        * I/O, in which case we'll have called addToBlockedQueue
1221        * previously, or it's blocked on an MVar or Blackhole, in which
1222        * case it'll be on the relevant queue already.
1223        */
1224       ASSERT(t->why_blocked != NotBlocked);
1225       IF_DEBUG(scheduler,
1226                debugBelch("--<< thread %d (%s) stopped: ", 
1227                        t->id, whatNext_strs[t->what_next]);
1228                printThreadBlockage(t);
1229                debugBelch("\n"));
1230
1231       /* Only for dumping event to log file 
1232          ToDo: do I need this in GranSim, too?
1233       blockThread(t);
1234       */
1235 #endif
1236       threadPaused(t);
1237       break;
1238
1239     case ThreadFinished:
1240       /* Need to check whether this was a main thread, and if so, signal
1241        * the task that started it with the return value.  If we have no
1242        * more main threads, we probably need to stop all the tasks until
1243        * we get a new one.
1244        */
1245       /* We also end up here if the thread kills itself with an
1246        * uncaught exception, see Exception.hc.
1247        */
1248       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1249                                t->id, whatNext_strs[t->what_next]));
1250 #if defined(GRAN)
1251       endThread(t, CurrentProc); // clean-up the thread
1252 #elif defined(PAR)
1253       /* For now all are advisory -- HWL */
1254       //if(t->priority==AdvisoryPriority) ??
1255       advisory_thread_count--;
1256       
1257 # ifdef DIST
1258       if(t->dist.priority==RevalPriority)
1259         FinishReval(t);
1260 # endif
1261       
1262       if (RtsFlags.ParFlags.ParStats.Full &&
1263           !RtsFlags.ParFlags.ParStats.Suppressed) 
1264         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1265 #endif
1266
1267       //
1268       // Check whether the thread that just completed was a main
1269       // thread, and if so return with the result.  
1270       //
1271       // There is an assumption here that all thread completion goes
1272       // through this point; we need to make sure that if a thread
1273       // ends up in the ThreadKilled state, that it stays on the run
1274       // queue so it can be dealt with here.
1275       //
1276       if (
1277 #if defined(RTS_SUPPORTS_THREADS)
1278           mainThread != NULL
1279 #else
1280           mainThread->tso == t
1281 #endif
1282           )
1283       {
1284           // We are a bound thread: this must be our thread that just
1285           // completed.
1286           ASSERT(mainThread->tso == t);
1287
1288           if (t->what_next == ThreadComplete) {
1289               if (mainThread->ret) {
1290                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1291                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1292               }
1293               mainThread->stat = Success;
1294           } else {
1295               if (mainThread->ret) {
1296                   *(mainThread->ret) = NULL;
1297               }
1298               if (was_interrupted) {
1299                   mainThread->stat = Interrupted;
1300               } else {
1301                   mainThread->stat = Killed;
1302               }
1303           }
1304 #ifdef DEBUG
1305           removeThreadLabel((StgWord)mainThread->tso->id);
1306 #endif
1307           if (mainThread->prev == NULL) {
1308               main_threads = mainThread->link;
1309           } else {
1310               mainThread->prev->link = mainThread->link;
1311           }
1312           if (mainThread->link != NULL) {
1313               mainThread->link->prev = NULL;
1314           }
1315           releaseCapability(cap);
1316           return;
1317       }
1318
1319 #ifdef RTS_SUPPORTS_THREADS
1320       ASSERT(t->main == NULL);
1321 #else
1322       if (t->main != NULL) {
1323           // Must be a main thread that is not the topmost one.  Leave
1324           // it on the run queue until the stack has unwound to the
1325           // point where we can deal with this.  Leaving it on the run
1326           // queue also ensures that the garbage collector knows about
1327           // this thread and its return value (it gets dropped from the
1328           // all_threads list so there's no other way to find it).
1329           APPEND_TO_RUN_QUEUE(t);
1330       }
1331 #endif
1332       break;
1333
1334     default:
1335       barf("schedule: invalid thread return code %d", (int)ret);
1336     }
1337
1338 #ifdef PROFILING
1339     // When we have +RTS -i0 and we're heap profiling, do a census at
1340     // every GC.  This lets us get repeatable runs for debugging.
1341     if (performHeapProfile ||
1342         (RtsFlags.ProfFlags.profileInterval==0 &&
1343          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1344         GarbageCollect(GetRoots, rtsTrue);
1345         heapCensus();
1346         performHeapProfile = rtsFalse;
1347         ready_to_gc = rtsFalse; // we already GC'd
1348     }
1349 #endif
1350
1351     if (ready_to_gc) {
1352       /* Kick any transactions which are invalid back to their atomically frames.
1353        * When next scheduled they will try to commit, this commit will fail and
1354        * they will retry. */
1355       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1356         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1357           if (!stmValidateTransaction (t -> trec)) {
1358             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1359
1360             // strip the stack back to the ATOMICALLY_FRAME, aborting
1361             // the (nested) transaction, and saving the stack of any
1362             // partially-evaluated thunks on the heap.
1363             raiseAsync_(t, NULL, rtsTrue);
1364             
1365 #ifdef REG_R1
1366             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1367 #endif
1368           }
1369         }
1370       }
1371
1372       /* everybody back, start the GC.
1373        * Could do it in this thread, or signal a condition var
1374        * to do it in another thread.  Either way, we need to
1375        * broadcast on gc_pending_cond afterward.
1376        */
1377 #if defined(RTS_SUPPORTS_THREADS)
1378       IF_DEBUG(scheduler,sched_belch("doing GC"));
1379 #endif
1380       GarbageCollect(GetRoots,rtsFalse);
1381       ready_to_gc = rtsFalse;
1382 #if defined(GRAN)
1383       /* add a ContinueThread event to continue execution of current thread */
1384       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1385                 ContinueThread,
1386                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1387       IF_GRAN_DEBUG(bq, 
1388                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1389                G_EVENTQ(0);
1390                G_CURR_THREADQ(0));
1391 #endif /* GRAN */
1392     }
1393
1394 #if defined(GRAN)
1395   next_thread:
1396     IF_GRAN_DEBUG(unused,
1397                   print_eventq(EventHd));
1398
1399     event = get_next_event();
1400 #elif defined(PAR)
1401   next_thread:
1402     /* ToDo: wait for next message to arrive rather than busy wait */
1403 #endif /* GRAN */
1404
1405   } /* end of while(1) */
1406
1407   IF_PAR_DEBUG(verbose,
1408                debugBelch("== Leaving schedule() after having received Finish\n"));
1409 }
1410
1411 /* ---------------------------------------------------------------------------
1412  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1413  * used by Control.Concurrent for error checking.
1414  * ------------------------------------------------------------------------- */
1415  
1416 StgBool
1417 rtsSupportsBoundThreads(void)
1418 {
1419 #ifdef THREADED_RTS
1420   return rtsTrue;
1421 #else
1422   return rtsFalse;
1423 #endif
1424 }
1425
1426 /* ---------------------------------------------------------------------------
1427  * isThreadBound(tso): check whether tso is bound to an OS thread.
1428  * ------------------------------------------------------------------------- */
1429  
1430 StgBool
1431 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1432 {
1433 #ifdef THREADED_RTS
1434   return (tso->main != NULL);
1435 #endif
1436   return rtsFalse;
1437 }
1438
1439 /* ---------------------------------------------------------------------------
1440  * Singleton fork(). Do not copy any running threads.
1441  * ------------------------------------------------------------------------- */
1442
1443 #ifndef mingw32_HOST_OS
1444 #define FORKPROCESS_PRIMOP_SUPPORTED
1445 #endif
1446
1447 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1448 static void 
1449 deleteThreadImmediately(StgTSO *tso);
1450 #endif
1451 StgInt
1452 forkProcess(HsStablePtr *entry
1453 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1454             STG_UNUSED
1455 #endif
1456            )
1457 {
1458 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1459   pid_t pid;
1460   StgTSO* t,*next;
1461   StgMainThread *m;
1462   SchedulerStatus rc;
1463
1464   IF_DEBUG(scheduler,sched_belch("forking!"));
1465   rts_lock(); // This not only acquires sched_mutex, it also
1466               // makes sure that no other threads are running
1467
1468   pid = fork();
1469
1470   if (pid) { /* parent */
1471
1472   /* just return the pid */
1473     rts_unlock();
1474     return pid;
1475     
1476   } else { /* child */
1477     
1478     
1479       // delete all threads
1480     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1481     
1482     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1483       next = t->link;
1484
1485         // don't allow threads to catch the ThreadKilled exception
1486       deleteThreadImmediately(t);
1487     }
1488     
1489       // wipe the main thread list
1490     while((m = main_threads) != NULL) {
1491       main_threads = m->link;
1492 # ifdef THREADED_RTS
1493       closeCondition(&m->bound_thread_cond);
1494 # endif
1495       stgFree(m);
1496     }
1497     
1498     rc = rts_evalStableIO(entry, NULL);  // run the action
1499     rts_checkSchedStatus("forkProcess",rc);
1500     
1501     rts_unlock();
1502     
1503     hs_exit();                      // clean up and exit
1504     stg_exit(0);
1505   }
1506 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1507   barf("forkProcess#: primop not supported, sorry!\n");
1508   return -1;
1509 #endif
1510 }
1511
1512 /* ---------------------------------------------------------------------------
1513  * deleteAllThreads():  kill all the live threads.
1514  *
1515  * This is used when we catch a user interrupt (^C), before performing
1516  * any necessary cleanups and running finalizers.
1517  *
1518  * Locks: sched_mutex held.
1519  * ------------------------------------------------------------------------- */
1520    
1521 void
1522 deleteAllThreads ( void )
1523 {
1524   StgTSO* t, *next;
1525   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1526   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1527       next = t->global_link;
1528       deleteThread(t);
1529   }      
1530
1531   // The run queue now contains a bunch of ThreadKilled threads.  We
1532   // must not throw these away: the main thread(s) will be in there
1533   // somewhere, and the main scheduler loop has to deal with it.
1534   // Also, the run queue is the only thing keeping these threads from
1535   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1536
1537   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1538   ASSERT(sleeping_queue == END_TSO_QUEUE);
1539 }
1540
1541 /* startThread and  insertThread are now in GranSim.c -- HWL */
1542
1543
1544 /* ---------------------------------------------------------------------------
1545  * Suspending & resuming Haskell threads.
1546  * 
1547  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1548  * its capability before calling the C function.  This allows another
1549  * task to pick up the capability and carry on running Haskell
1550  * threads.  It also means that if the C call blocks, it won't lock
1551  * the whole system.
1552  *
1553  * The Haskell thread making the C call is put to sleep for the
1554  * duration of the call, on the susepended_ccalling_threads queue.  We
1555  * give out a token to the task, which it can use to resume the thread
1556  * on return from the C function.
1557  * ------------------------------------------------------------------------- */
1558    
1559 StgInt
1560 suspendThread( StgRegTable *reg )
1561 {
1562   nat tok;
1563   Capability *cap;
1564   int saved_errno = errno;
1565
1566   /* assume that *reg is a pointer to the StgRegTable part
1567    * of a Capability.
1568    */
1569   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1570
1571   ACQUIRE_LOCK(&sched_mutex);
1572
1573   IF_DEBUG(scheduler,
1574            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1575
1576   // XXX this might not be necessary --SDM
1577   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1578
1579   threadPaused(cap->r.rCurrentTSO);
1580   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1581   suspended_ccalling_threads = cap->r.rCurrentTSO;
1582
1583   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1584       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1585       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1586   } else {
1587       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1588   }
1589
1590   /* Use the thread ID as the token; it should be unique */
1591   tok = cap->r.rCurrentTSO->id;
1592
1593   /* Hand back capability */
1594   releaseCapability(cap);
1595   
1596 #if defined(RTS_SUPPORTS_THREADS)
1597   /* Preparing to leave the RTS, so ensure there's a native thread/task
1598      waiting to take over.
1599   */
1600   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1601 #endif
1602
1603   in_haskell = rtsFalse;
1604   RELEASE_LOCK(&sched_mutex);
1605   
1606   errno = saved_errno;
1607   return tok; 
1608 }
1609
1610 StgRegTable *
1611 resumeThread( StgInt tok )
1612 {
1613   StgTSO *tso, **prev;
1614   Capability *cap;
1615   int saved_errno = errno;
1616
1617 #if defined(RTS_SUPPORTS_THREADS)
1618   /* Wait for permission to re-enter the RTS with the result. */
1619   ACQUIRE_LOCK(&sched_mutex);
1620   waitForReturnCapability(&sched_mutex, &cap);
1621
1622   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1623 #else
1624   grabCapability(&cap);
1625 #endif
1626
1627   /* Remove the thread off of the suspended list */
1628   prev = &suspended_ccalling_threads;
1629   for (tso = suspended_ccalling_threads; 
1630        tso != END_TSO_QUEUE; 
1631        prev = &tso->link, tso = tso->link) {
1632     if (tso->id == (StgThreadID)tok) {
1633       *prev = tso->link;
1634       break;
1635     }
1636   }
1637   if (tso == END_TSO_QUEUE) {
1638     barf("resumeThread: thread not found");
1639   }
1640   tso->link = END_TSO_QUEUE;
1641   
1642   if(tso->why_blocked == BlockedOnCCall) {
1643       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1644       tso->blocked_exceptions = NULL;
1645   }
1646   
1647   /* Reset blocking status */
1648   tso->why_blocked  = NotBlocked;
1649
1650   cap->r.rCurrentTSO = tso;
1651   in_haskell = rtsTrue;
1652   RELEASE_LOCK(&sched_mutex);
1653   errno = saved_errno;
1654   return &cap->r;
1655 }
1656
1657
1658 /* ---------------------------------------------------------------------------
1659  * Static functions
1660  * ------------------------------------------------------------------------ */
1661 static void unblockThread(StgTSO *tso);
1662
1663 /* ---------------------------------------------------------------------------
1664  * Comparing Thread ids.
1665  *
1666  * This is used from STG land in the implementation of the
1667  * instances of Eq/Ord for ThreadIds.
1668  * ------------------------------------------------------------------------ */
1669
1670 int
1671 cmp_thread(StgPtr tso1, StgPtr tso2) 
1672
1673   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1674   StgThreadID id2 = ((StgTSO *)tso2)->id;
1675  
1676   if (id1 < id2) return (-1);
1677   if (id1 > id2) return 1;
1678   return 0;
1679 }
1680
1681 /* ---------------------------------------------------------------------------
1682  * Fetching the ThreadID from an StgTSO.
1683  *
1684  * This is used in the implementation of Show for ThreadIds.
1685  * ------------------------------------------------------------------------ */
1686 int
1687 rts_getThreadId(StgPtr tso) 
1688 {
1689   return ((StgTSO *)tso)->id;
1690 }
1691
1692 #ifdef DEBUG
1693 void
1694 labelThread(StgPtr tso, char *label)
1695 {
1696   int len;
1697   void *buf;
1698
1699   /* Caveat: Once set, you can only set the thread name to "" */
1700   len = strlen(label)+1;
1701   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1702   strncpy(buf,label,len);
1703   /* Update will free the old memory for us */
1704   updateThreadLabel(((StgTSO *)tso)->id,buf);
1705 }
1706 #endif /* DEBUG */
1707
1708 /* ---------------------------------------------------------------------------
1709    Create a new thread.
1710
1711    The new thread starts with the given stack size.  Before the
1712    scheduler can run, however, this thread needs to have a closure
1713    (and possibly some arguments) pushed on its stack.  See
1714    pushClosure() in Schedule.h.
1715
1716    createGenThread() and createIOThread() (in SchedAPI.h) are
1717    convenient packaged versions of this function.
1718
1719    currently pri (priority) is only used in a GRAN setup -- HWL
1720    ------------------------------------------------------------------------ */
1721 #if defined(GRAN)
1722 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1723 StgTSO *
1724 createThread(nat size, StgInt pri)
1725 #else
1726 StgTSO *
1727 createThread(nat size)
1728 #endif
1729 {
1730
1731     StgTSO *tso;
1732     nat stack_size;
1733
1734     /* First check whether we should create a thread at all */
1735 #if defined(PAR)
1736   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1737   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1738     threadsIgnored++;
1739     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1740           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1741     return END_TSO_QUEUE;
1742   }
1743   threadsCreated++;
1744 #endif
1745
1746 #if defined(GRAN)
1747   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1748 #endif
1749
1750   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1751
1752   /* catch ridiculously small stack sizes */
1753   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1754     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1755   }
1756
1757   stack_size = size - TSO_STRUCT_SIZEW;
1758
1759   tso = (StgTSO *)allocate(size);
1760   TICK_ALLOC_TSO(stack_size, 0);
1761
1762   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1763 #if defined(GRAN)
1764   SET_GRAN_HDR(tso, ThisPE);
1765 #endif
1766
1767   // Always start with the compiled code evaluator
1768   tso->what_next = ThreadRunGHC;
1769
1770   tso->id = next_thread_id++; 
1771   tso->why_blocked  = NotBlocked;
1772   tso->blocked_exceptions = NULL;
1773
1774   tso->saved_errno = 0;
1775   tso->main = NULL;
1776   
1777   tso->stack_size   = stack_size;
1778   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1779                               - TSO_STRUCT_SIZEW;
1780   tso->sp           = (P_)&(tso->stack) + stack_size;
1781
1782   tso->trec = NO_TREC;
1783
1784 #ifdef PROFILING
1785   tso->prof.CCCS = CCS_MAIN;
1786 #endif
1787
1788   /* put a stop frame on the stack */
1789   tso->sp -= sizeofW(StgStopFrame);
1790   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1791   tso->link = END_TSO_QUEUE;
1792
1793   // ToDo: check this
1794 #if defined(GRAN)
1795   /* uses more flexible routine in GranSim */
1796   insertThread(tso, CurrentProc);
1797 #else
1798   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1799    * from its creation
1800    */
1801 #endif
1802
1803 #if defined(GRAN) 
1804   if (RtsFlags.GranFlags.GranSimStats.Full) 
1805     DumpGranEvent(GR_START,tso);
1806 #elif defined(PAR)
1807   if (RtsFlags.ParFlags.ParStats.Full) 
1808     DumpGranEvent(GR_STARTQ,tso);
1809   /* HACk to avoid SCHEDULE 
1810      LastTSO = tso; */
1811 #endif
1812
1813   /* Link the new thread on the global thread list.
1814    */
1815   tso->global_link = all_threads;
1816   all_threads = tso;
1817
1818 #if defined(DIST)
1819   tso->dist.priority = MandatoryPriority; //by default that is...
1820 #endif
1821
1822 #if defined(GRAN)
1823   tso->gran.pri = pri;
1824 # if defined(DEBUG)
1825   tso->gran.magic = TSO_MAGIC; // debugging only
1826 # endif
1827   tso->gran.sparkname   = 0;
1828   tso->gran.startedat   = CURRENT_TIME; 
1829   tso->gran.exported    = 0;
1830   tso->gran.basicblocks = 0;
1831   tso->gran.allocs      = 0;
1832   tso->gran.exectime    = 0;
1833   tso->gran.fetchtime   = 0;
1834   tso->gran.fetchcount  = 0;
1835   tso->gran.blocktime   = 0;
1836   tso->gran.blockcount  = 0;
1837   tso->gran.blockedat   = 0;
1838   tso->gran.globalsparks = 0;
1839   tso->gran.localsparks  = 0;
1840   if (RtsFlags.GranFlags.Light)
1841     tso->gran.clock  = Now; /* local clock */
1842   else
1843     tso->gran.clock  = 0;
1844
1845   IF_DEBUG(gran,printTSO(tso));
1846 #elif defined(PAR)
1847 # if defined(DEBUG)
1848   tso->par.magic = TSO_MAGIC; // debugging only
1849 # endif
1850   tso->par.sparkname   = 0;
1851   tso->par.startedat   = CURRENT_TIME; 
1852   tso->par.exported    = 0;
1853   tso->par.basicblocks = 0;
1854   tso->par.allocs      = 0;
1855   tso->par.exectime    = 0;
1856   tso->par.fetchtime   = 0;
1857   tso->par.fetchcount  = 0;
1858   tso->par.blocktime   = 0;
1859   tso->par.blockcount  = 0;
1860   tso->par.blockedat   = 0;
1861   tso->par.globalsparks = 0;
1862   tso->par.localsparks  = 0;
1863 #endif
1864
1865 #if defined(GRAN)
1866   globalGranStats.tot_threads_created++;
1867   globalGranStats.threads_created_on_PE[CurrentProc]++;
1868   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1869   globalGranStats.tot_sq_probes++;
1870 #elif defined(PAR)
1871   // collect parallel global statistics (currently done together with GC stats)
1872   if (RtsFlags.ParFlags.ParStats.Global &&
1873       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1874     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1875     globalParStats.tot_threads_created++;
1876   }
1877 #endif 
1878
1879 #if defined(GRAN)
1880   IF_GRAN_DEBUG(pri,
1881                 sched_belch("==__ schedule: Created TSO %d (%p);",
1882                       CurrentProc, tso, tso->id));
1883 #elif defined(PAR)
1884     IF_PAR_DEBUG(verbose,
1885                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1886                        (long)tso->id, tso, advisory_thread_count));
1887 #else
1888   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1889                                 (long)tso->id, (long)tso->stack_size));
1890 #endif    
1891   return tso;
1892 }
1893
1894 #if defined(PAR)
1895 /* RFP:
1896    all parallel thread creation calls should fall through the following routine.
1897 */
1898 StgTSO *
1899 createSparkThread(rtsSpark spark) 
1900 { StgTSO *tso;
1901   ASSERT(spark != (rtsSpark)NULL);
1902   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1903   { threadsIgnored++;
1904     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1905           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1906     return END_TSO_QUEUE;
1907   }
1908   else
1909   { threadsCreated++;
1910     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1911     if (tso==END_TSO_QUEUE)     
1912       barf("createSparkThread: Cannot create TSO");
1913 #if defined(DIST)
1914     tso->priority = AdvisoryPriority;
1915 #endif
1916     pushClosure(tso,spark);
1917     PUSH_ON_RUN_QUEUE(tso);
1918     advisory_thread_count++;    
1919   }
1920   return tso;
1921 }
1922 #endif
1923
1924 /*
1925   Turn a spark into a thread.
1926   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1927 */
1928 #if defined(PAR)
1929 StgTSO *
1930 activateSpark (rtsSpark spark) 
1931 {
1932   StgTSO *tso;
1933
1934   tso = createSparkThread(spark);
1935   if (RtsFlags.ParFlags.ParStats.Full) {   
1936     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1937     IF_PAR_DEBUG(verbose,
1938                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1939                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1940   }
1941   // ToDo: fwd info on local/global spark to thread -- HWL
1942   // tso->gran.exported =  spark->exported;
1943   // tso->gran.locked =   !spark->global;
1944   // tso->gran.sparkname = spark->name;
1945
1946   return tso;
1947 }
1948 #endif
1949
1950 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1951                                    Capability *initialCapability
1952                                    );
1953
1954
1955 /* ---------------------------------------------------------------------------
1956  * scheduleThread()
1957  *
1958  * scheduleThread puts a thread on the head of the runnable queue.
1959  * This will usually be done immediately after a thread is created.
1960  * The caller of scheduleThread must create the thread using e.g.
1961  * createThread and push an appropriate closure
1962  * on this thread's stack before the scheduler is invoked.
1963  * ------------------------------------------------------------------------ */
1964
1965 static void scheduleThread_ (StgTSO* tso);
1966
1967 void
1968 scheduleThread_(StgTSO *tso)
1969 {
1970   // The thread goes at the *end* of the run-queue, to avoid possible
1971   // starvation of any threads already on the queue.
1972   APPEND_TO_RUN_QUEUE(tso);
1973   threadRunnable();
1974 }
1975
1976 void
1977 scheduleThread(StgTSO* tso)
1978 {
1979   ACQUIRE_LOCK(&sched_mutex);
1980   scheduleThread_(tso);
1981   RELEASE_LOCK(&sched_mutex);
1982 }
1983
1984 #if defined(RTS_SUPPORTS_THREADS)
1985 static Condition bound_cond_cache;
1986 static int bound_cond_cache_full = 0;
1987 #endif
1988
1989
1990 SchedulerStatus
1991 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1992                    Capability *initialCapability)
1993 {
1994     // Precondition: sched_mutex must be held
1995     StgMainThread *m;
1996
1997     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1998     m->tso = tso;
1999     tso->main = m;
2000     m->ret = ret;
2001     m->stat = NoStatus;
2002     m->link = main_threads;
2003     m->prev = NULL;
2004     if (main_threads != NULL) {
2005         main_threads->prev = m;
2006     }
2007     main_threads = m;
2008
2009 #if defined(RTS_SUPPORTS_THREADS)
2010     // Allocating a new condition for each thread is expensive, so we
2011     // cache one.  This is a pretty feeble hack, but it helps speed up
2012     // consecutive call-ins quite a bit.
2013     if (bound_cond_cache_full) {
2014         m->bound_thread_cond = bound_cond_cache;
2015         bound_cond_cache_full = 0;
2016     } else {
2017         initCondition(&m->bound_thread_cond);
2018     }
2019 #endif
2020
2021     /* Put the thread on the main-threads list prior to scheduling the TSO.
2022        Failure to do so introduces a race condition in the MT case (as
2023        identified by Wolfgang Thaller), whereby the new task/OS thread 
2024        created by scheduleThread_() would complete prior to the thread
2025        that spawned it managed to put 'itself' on the main-threads list.
2026        The upshot of it all being that the worker thread wouldn't get to
2027        signal the completion of the its work item for the main thread to
2028        see (==> it got stuck waiting.)    -- sof 6/02.
2029     */
2030     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2031     
2032     APPEND_TO_RUN_QUEUE(tso);
2033     // NB. Don't call threadRunnable() here, because the thread is
2034     // bound and only runnable by *this* OS thread, so waking up other
2035     // workers will just slow things down.
2036
2037     return waitThread_(m, initialCapability);
2038 }
2039
2040 /* ---------------------------------------------------------------------------
2041  * initScheduler()
2042  *
2043  * Initialise the scheduler.  This resets all the queues - if the
2044  * queues contained any threads, they'll be garbage collected at the
2045  * next pass.
2046  *
2047  * ------------------------------------------------------------------------ */
2048
2049 void 
2050 initScheduler(void)
2051 {
2052 #if defined(GRAN)
2053   nat i;
2054
2055   for (i=0; i<=MAX_PROC; i++) {
2056     run_queue_hds[i]      = END_TSO_QUEUE;
2057     run_queue_tls[i]      = END_TSO_QUEUE;
2058     blocked_queue_hds[i]  = END_TSO_QUEUE;
2059     blocked_queue_tls[i]  = END_TSO_QUEUE;
2060     ccalling_threadss[i]  = END_TSO_QUEUE;
2061     sleeping_queue        = END_TSO_QUEUE;
2062   }
2063 #else
2064   run_queue_hd      = END_TSO_QUEUE;
2065   run_queue_tl      = END_TSO_QUEUE;
2066   blocked_queue_hd  = END_TSO_QUEUE;
2067   blocked_queue_tl  = END_TSO_QUEUE;
2068   sleeping_queue    = END_TSO_QUEUE;
2069 #endif 
2070
2071   suspended_ccalling_threads  = END_TSO_QUEUE;
2072
2073   main_threads = NULL;
2074   all_threads  = END_TSO_QUEUE;
2075
2076   context_switch = 0;
2077   interrupted    = 0;
2078
2079   RtsFlags.ConcFlags.ctxtSwitchTicks =
2080       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2081       
2082 #if defined(RTS_SUPPORTS_THREADS)
2083   /* Initialise the mutex and condition variables used by
2084    * the scheduler. */
2085   initMutex(&sched_mutex);
2086   initMutex(&term_mutex);
2087 #endif
2088   
2089   ACQUIRE_LOCK(&sched_mutex);
2090
2091   /* A capability holds the state a native thread needs in
2092    * order to execute STG code. At least one capability is
2093    * floating around (only SMP builds have more than one).
2094    */
2095   initCapabilities();
2096   
2097 #if defined(RTS_SUPPORTS_THREADS)
2098     /* start our haskell execution tasks */
2099     startTaskManager(0,taskStart);
2100 #endif
2101
2102 #if /* defined(SMP) ||*/ defined(PAR)
2103   initSparkPools();
2104 #endif
2105
2106   RELEASE_LOCK(&sched_mutex);
2107 }
2108
2109 void
2110 exitScheduler( void )
2111 {
2112 #if defined(RTS_SUPPORTS_THREADS)
2113   stopTaskManager();
2114 #endif
2115   shutting_down_scheduler = rtsTrue;
2116 }
2117
2118 /* ----------------------------------------------------------------------------
2119    Managing the per-task allocation areas.
2120    
2121    Each capability comes with an allocation area.  These are
2122    fixed-length block lists into which allocation can be done.
2123
2124    ToDo: no support for two-space collection at the moment???
2125    ------------------------------------------------------------------------- */
2126
2127 static
2128 SchedulerStatus
2129 waitThread_(StgMainThread* m, Capability *initialCapability)
2130 {
2131   SchedulerStatus stat;
2132
2133   // Precondition: sched_mutex must be held.
2134   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2135
2136 #if defined(GRAN)
2137   /* GranSim specific init */
2138   CurrentTSO = m->tso;                // the TSO to run
2139   procStatus[MainProc] = Busy;        // status of main PE
2140   CurrentProc = MainProc;             // PE to run it on
2141   schedule(m,initialCapability);
2142 #else
2143   schedule(m,initialCapability);
2144   ASSERT(m->stat != NoStatus);
2145 #endif
2146
2147   stat = m->stat;
2148
2149 #if defined(RTS_SUPPORTS_THREADS)
2150   // Free the condition variable, returning it to the cache if possible.
2151   if (!bound_cond_cache_full) {
2152       bound_cond_cache = m->bound_thread_cond;
2153       bound_cond_cache_full = 1;
2154   } else {
2155       closeCondition(&m->bound_thread_cond);
2156   }
2157 #endif
2158
2159   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2160   stgFree(m);
2161
2162   // Postcondition: sched_mutex still held
2163   return stat;
2164 }
2165
2166 /* ---------------------------------------------------------------------------
2167    Where are the roots that we know about?
2168
2169         - all the threads on the runnable queue
2170         - all the threads on the blocked queue
2171         - all the threads on the sleeping queue
2172         - all the thread currently executing a _ccall_GC
2173         - all the "main threads"
2174      
2175    ------------------------------------------------------------------------ */
2176
2177 /* This has to be protected either by the scheduler monitor, or by the
2178         garbage collection monitor (probably the latter).
2179         KH @ 25/10/99
2180 */
2181
2182 void
2183 GetRoots( evac_fn evac )
2184 {
2185 #if defined(GRAN)
2186   {
2187     nat i;
2188     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2189       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2190           evac((StgClosure **)&run_queue_hds[i]);
2191       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2192           evac((StgClosure **)&run_queue_tls[i]);
2193       
2194       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2195           evac((StgClosure **)&blocked_queue_hds[i]);
2196       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2197           evac((StgClosure **)&blocked_queue_tls[i]);
2198       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2199           evac((StgClosure **)&ccalling_threads[i]);
2200     }
2201   }
2202
2203   markEventQueue();
2204
2205 #else /* !GRAN */
2206   if (run_queue_hd != END_TSO_QUEUE) {
2207       ASSERT(run_queue_tl != END_TSO_QUEUE);
2208       evac((StgClosure **)&run_queue_hd);
2209       evac((StgClosure **)&run_queue_tl);
2210   }
2211   
2212   if (blocked_queue_hd != END_TSO_QUEUE) {
2213       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2214       evac((StgClosure **)&blocked_queue_hd);
2215       evac((StgClosure **)&blocked_queue_tl);
2216   }
2217   
2218   if (sleeping_queue != END_TSO_QUEUE) {
2219       evac((StgClosure **)&sleeping_queue);
2220   }
2221 #endif 
2222
2223   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2224       evac((StgClosure **)&suspended_ccalling_threads);
2225   }
2226
2227 #if defined(PAR) || defined(GRAN)
2228   markSparkQueue(evac);
2229 #endif
2230
2231 #if defined(RTS_USER_SIGNALS)
2232   // mark the signal handlers (signals should be already blocked)
2233   markSignalHandlers(evac);
2234 #endif
2235 }
2236
2237 /* -----------------------------------------------------------------------------
2238    performGC
2239
2240    This is the interface to the garbage collector from Haskell land.
2241    We provide this so that external C code can allocate and garbage
2242    collect when called from Haskell via _ccall_GC.
2243
2244    It might be useful to provide an interface whereby the programmer
2245    can specify more roots (ToDo).
2246    
2247    This needs to be protected by the GC condition variable above.  KH.
2248    -------------------------------------------------------------------------- */
2249
2250 static void (*extra_roots)(evac_fn);
2251
2252 void
2253 performGC(void)
2254 {
2255   /* Obligated to hold this lock upon entry */
2256   ACQUIRE_LOCK(&sched_mutex);
2257   GarbageCollect(GetRoots,rtsFalse);
2258   RELEASE_LOCK(&sched_mutex);
2259 }
2260
2261 void
2262 performMajorGC(void)
2263 {
2264   ACQUIRE_LOCK(&sched_mutex);
2265   GarbageCollect(GetRoots,rtsTrue);
2266   RELEASE_LOCK(&sched_mutex);
2267 }
2268
2269 static void
2270 AllRoots(evac_fn evac)
2271 {
2272     GetRoots(evac);             // the scheduler's roots
2273     extra_roots(evac);          // the user's roots
2274 }
2275
2276 void
2277 performGCWithRoots(void (*get_roots)(evac_fn))
2278 {
2279   ACQUIRE_LOCK(&sched_mutex);
2280   extra_roots = get_roots;
2281   GarbageCollect(AllRoots,rtsFalse);
2282   RELEASE_LOCK(&sched_mutex);
2283 }
2284
2285 /* -----------------------------------------------------------------------------
2286    Stack overflow
2287
2288    If the thread has reached its maximum stack size, then raise the
2289    StackOverflow exception in the offending thread.  Otherwise
2290    relocate the TSO into a larger chunk of memory and adjust its stack
2291    size appropriately.
2292    -------------------------------------------------------------------------- */
2293
2294 static StgTSO *
2295 threadStackOverflow(StgTSO *tso)
2296 {
2297   nat new_stack_size, new_tso_size, stack_words;
2298   StgPtr new_sp;
2299   StgTSO *dest;
2300
2301   IF_DEBUG(sanity,checkTSO(tso));
2302   if (tso->stack_size >= tso->max_stack_size) {
2303
2304     IF_DEBUG(gc,
2305              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2306                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2307              /* If we're debugging, just print out the top of the stack */
2308              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2309                                               tso->sp+64)));
2310
2311     /* Send this thread the StackOverflow exception */
2312     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2313     return tso;
2314   }
2315
2316   /* Try to double the current stack size.  If that takes us over the
2317    * maximum stack size for this thread, then use the maximum instead.
2318    * Finally round up so the TSO ends up as a whole number of blocks.
2319    */
2320   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2321   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2322                                        TSO_STRUCT_SIZE)/sizeof(W_);
2323   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2324   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2325
2326   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2327
2328   dest = (StgTSO *)allocate(new_tso_size);
2329   TICK_ALLOC_TSO(new_stack_size,0);
2330
2331   /* copy the TSO block and the old stack into the new area */
2332   memcpy(dest,tso,TSO_STRUCT_SIZE);
2333   stack_words = tso->stack + tso->stack_size - tso->sp;
2334   new_sp = (P_)dest + new_tso_size - stack_words;
2335   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2336
2337   /* relocate the stack pointers... */
2338   dest->sp         = new_sp;
2339   dest->stack_size = new_stack_size;
2340         
2341   /* Mark the old TSO as relocated.  We have to check for relocated
2342    * TSOs in the garbage collector and any primops that deal with TSOs.
2343    *
2344    * It's important to set the sp value to just beyond the end
2345    * of the stack, so we don't attempt to scavenge any part of the
2346    * dead TSO's stack.
2347    */
2348   tso->what_next = ThreadRelocated;
2349   tso->link = dest;
2350   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2351   tso->why_blocked = NotBlocked;
2352
2353   IF_PAR_DEBUG(verbose,
2354                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2355                      tso->id, tso, tso->stack_size);
2356                /* If we're debugging, just print out the top of the stack */
2357                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2358                                                 tso->sp+64)));
2359   
2360   IF_DEBUG(sanity,checkTSO(tso));
2361 #if 0
2362   IF_DEBUG(scheduler,printTSO(dest));
2363 #endif
2364
2365   return dest;
2366 }
2367
2368 /* ---------------------------------------------------------------------------
2369    Wake up a queue that was blocked on some resource.
2370    ------------------------------------------------------------------------ */
2371
2372 #if defined(GRAN)
2373 STATIC_INLINE void
2374 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2375 {
2376 }
2377 #elif defined(PAR)
2378 STATIC_INLINE void
2379 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2380 {
2381   /* write RESUME events to log file and
2382      update blocked and fetch time (depending on type of the orig closure) */
2383   if (RtsFlags.ParFlags.ParStats.Full) {
2384     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2385                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2386                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2387     if (EMPTY_RUN_QUEUE())
2388       emitSchedule = rtsTrue;
2389
2390     switch (get_itbl(node)->type) {
2391         case FETCH_ME_BQ:
2392           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2393           break;
2394         case RBH:
2395         case FETCH_ME:
2396         case BLACKHOLE_BQ:
2397           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2398           break;
2399 #ifdef DIST
2400         case MVAR:
2401           break;
2402 #endif    
2403         default:
2404           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2405         }
2406       }
2407 }
2408 #endif
2409
2410 #if defined(GRAN)
2411 static StgBlockingQueueElement *
2412 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2413 {
2414     StgTSO *tso;
2415     PEs node_loc, tso_loc;
2416
2417     node_loc = where_is(node); // should be lifted out of loop
2418     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2419     tso_loc = where_is((StgClosure *)tso);
2420     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2421       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2422       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2423       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2424       // insertThread(tso, node_loc);
2425       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2426                 ResumeThread,
2427                 tso, node, (rtsSpark*)NULL);
2428       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2429       // len_local++;
2430       // len++;
2431     } else { // TSO is remote (actually should be FMBQ)
2432       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2433                                   RtsFlags.GranFlags.Costs.gunblocktime +
2434                                   RtsFlags.GranFlags.Costs.latency;
2435       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2436                 UnblockThread,
2437                 tso, node, (rtsSpark*)NULL);
2438       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2439       // len++;
2440     }
2441     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2442     IF_GRAN_DEBUG(bq,
2443                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2444                           (node_loc==tso_loc ? "Local" : "Global"), 
2445                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2446     tso->block_info.closure = NULL;
2447     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2448                              tso->id, tso));
2449 }
2450 #elif defined(PAR)
2451 static StgBlockingQueueElement *
2452 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2453 {
2454     StgBlockingQueueElement *next;
2455
2456     switch (get_itbl(bqe)->type) {
2457     case TSO:
2458       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2459       /* if it's a TSO just push it onto the run_queue */
2460       next = bqe->link;
2461       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2462       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2463       threadRunnable();
2464       unblockCount(bqe, node);
2465       /* reset blocking status after dumping event */
2466       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2467       break;
2468
2469     case BLOCKED_FETCH:
2470       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2471       next = bqe->link;
2472       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2473       PendingFetches = (StgBlockedFetch *)bqe;
2474       break;
2475
2476 # if defined(DEBUG)
2477       /* can ignore this case in a non-debugging setup; 
2478          see comments on RBHSave closures above */
2479     case CONSTR:
2480       /* check that the closure is an RBHSave closure */
2481       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2482              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2483              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2484       break;
2485
2486     default:
2487       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2488            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2489            (StgClosure *)bqe);
2490 # endif
2491     }
2492   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2493   return next;
2494 }
2495
2496 #else /* !GRAN && !PAR */
2497 static StgTSO *
2498 unblockOneLocked(StgTSO *tso)
2499 {
2500   StgTSO *next;
2501
2502   ASSERT(get_itbl(tso)->type == TSO);
2503   ASSERT(tso->why_blocked != NotBlocked);
2504   tso->why_blocked = NotBlocked;
2505   next = tso->link;
2506   tso->link = END_TSO_QUEUE;
2507   APPEND_TO_RUN_QUEUE(tso);
2508   threadRunnable();
2509   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2510   return next;
2511 }
2512 #endif
2513
2514 #if defined(GRAN) || defined(PAR)
2515 INLINE_ME StgBlockingQueueElement *
2516 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2517 {
2518   ACQUIRE_LOCK(&sched_mutex);
2519   bqe = unblockOneLocked(bqe, node);
2520   RELEASE_LOCK(&sched_mutex);
2521   return bqe;
2522 }
2523 #else
2524 INLINE_ME StgTSO *
2525 unblockOne(StgTSO *tso)
2526 {
2527   ACQUIRE_LOCK(&sched_mutex);
2528   tso = unblockOneLocked(tso);
2529   RELEASE_LOCK(&sched_mutex);
2530   return tso;
2531 }
2532 #endif
2533
2534 #if defined(GRAN)
2535 void 
2536 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2537 {
2538   StgBlockingQueueElement *bqe;
2539   PEs node_loc;
2540   nat len = 0; 
2541
2542   IF_GRAN_DEBUG(bq, 
2543                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2544                       node, CurrentProc, CurrentTime[CurrentProc], 
2545                       CurrentTSO->id, CurrentTSO));
2546
2547   node_loc = where_is(node);
2548
2549   ASSERT(q == END_BQ_QUEUE ||
2550          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2551          get_itbl(q)->type == CONSTR); // closure (type constructor)
2552   ASSERT(is_unique(node));
2553
2554   /* FAKE FETCH: magically copy the node to the tso's proc;
2555      no Fetch necessary because in reality the node should not have been 
2556      moved to the other PE in the first place
2557   */
2558   if (CurrentProc!=node_loc) {
2559     IF_GRAN_DEBUG(bq, 
2560                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2561                         node, node_loc, CurrentProc, CurrentTSO->id, 
2562                         // CurrentTSO, where_is(CurrentTSO),
2563                         node->header.gran.procs));
2564     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2565     IF_GRAN_DEBUG(bq, 
2566                   debugBelch("## new bitmask of node %p is %#x\n",
2567                         node, node->header.gran.procs));
2568     if (RtsFlags.GranFlags.GranSimStats.Global) {
2569       globalGranStats.tot_fake_fetches++;
2570     }
2571   }
2572
2573   bqe = q;
2574   // ToDo: check: ASSERT(CurrentProc==node_loc);
2575   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2576     //next = bqe->link;
2577     /* 
2578        bqe points to the current element in the queue
2579        next points to the next element in the queue
2580     */
2581     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2582     //tso_loc = where_is(tso);
2583     len++;
2584     bqe = unblockOneLocked(bqe, node);
2585   }
2586
2587   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2588      the closure to make room for the anchor of the BQ */
2589   if (bqe!=END_BQ_QUEUE) {
2590     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2591     /*
2592     ASSERT((info_ptr==&RBH_Save_0_info) ||
2593            (info_ptr==&RBH_Save_1_info) ||
2594            (info_ptr==&RBH_Save_2_info));
2595     */
2596     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2597     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2598     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2599
2600     IF_GRAN_DEBUG(bq,
2601                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2602                         node, info_type(node)));
2603   }
2604
2605   /* statistics gathering */
2606   if (RtsFlags.GranFlags.GranSimStats.Global) {
2607     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2608     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2609     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2610     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2611   }
2612   IF_GRAN_DEBUG(bq,
2613                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2614                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2615 }
2616 #elif defined(PAR)
2617 void 
2618 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2619 {
2620   StgBlockingQueueElement *bqe;
2621
2622   ACQUIRE_LOCK(&sched_mutex);
2623
2624   IF_PAR_DEBUG(verbose, 
2625                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2626                      node, mytid));
2627 #ifdef DIST  
2628   //RFP
2629   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2630     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2631     return;
2632   }
2633 #endif
2634   
2635   ASSERT(q == END_BQ_QUEUE ||
2636          get_itbl(q)->type == TSO ||           
2637          get_itbl(q)->type == BLOCKED_FETCH || 
2638          get_itbl(q)->type == CONSTR); 
2639
2640   bqe = q;
2641   while (get_itbl(bqe)->type==TSO || 
2642          get_itbl(bqe)->type==BLOCKED_FETCH) {
2643     bqe = unblockOneLocked(bqe, node);
2644   }
2645   RELEASE_LOCK(&sched_mutex);
2646 }
2647
2648 #else   /* !GRAN && !PAR */
2649
2650 void
2651 awakenBlockedQueueNoLock(StgTSO *tso)
2652 {
2653   while (tso != END_TSO_QUEUE) {
2654     tso = unblockOneLocked(tso);
2655   }
2656 }
2657
2658 void
2659 awakenBlockedQueue(StgTSO *tso)
2660 {
2661   ACQUIRE_LOCK(&sched_mutex);
2662   while (tso != END_TSO_QUEUE) {
2663     tso = unblockOneLocked(tso);
2664   }
2665   RELEASE_LOCK(&sched_mutex);
2666 }
2667 #endif
2668
2669 /* ---------------------------------------------------------------------------
2670    Interrupt execution
2671    - usually called inside a signal handler so it mustn't do anything fancy.   
2672    ------------------------------------------------------------------------ */
2673
2674 void
2675 interruptStgRts(void)
2676 {
2677     interrupted    = 1;
2678     context_switch = 1;
2679 }
2680
2681 /* -----------------------------------------------------------------------------
2682    Unblock a thread
2683
2684    This is for use when we raise an exception in another thread, which
2685    may be blocked.
2686    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2687    -------------------------------------------------------------------------- */
2688
2689 #if defined(GRAN) || defined(PAR)
2690 /*
2691   NB: only the type of the blocking queue is different in GranSim and GUM
2692       the operations on the queue-elements are the same
2693       long live polymorphism!
2694
2695   Locks: sched_mutex is held upon entry and exit.
2696
2697 */
2698 static void
2699 unblockThread(StgTSO *tso)
2700 {
2701   StgBlockingQueueElement *t, **last;
2702
2703   switch (tso->why_blocked) {
2704
2705   case NotBlocked:
2706     return;  /* not blocked */
2707
2708   case BlockedOnSTM:
2709     // Be careful: nothing to do here!  We tell the scheduler that the thread
2710     // is runnable and we leave it to the stack-walking code to abort the 
2711     // transaction while unwinding the stack.  We should perhaps have a debugging
2712     // test to make sure that this really happens and that the 'zombie' transaction
2713     // does not get committed.
2714     goto done;
2715
2716   case BlockedOnMVar:
2717     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2718     {
2719       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2720       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2721
2722       last = (StgBlockingQueueElement **)&mvar->head;
2723       for (t = (StgBlockingQueueElement *)mvar->head; 
2724            t != END_BQ_QUEUE; 
2725            last = &t->link, last_tso = t, t = t->link) {
2726         if (t == (StgBlockingQueueElement *)tso) {
2727           *last = (StgBlockingQueueElement *)tso->link;
2728           if (mvar->tail == tso) {
2729             mvar->tail = (StgTSO *)last_tso;
2730           }
2731           goto done;
2732         }
2733       }
2734       barf("unblockThread (MVAR): TSO not found");
2735     }
2736
2737   case BlockedOnBlackHole:
2738     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2739     {
2740       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2741
2742       last = &bq->blocking_queue;
2743       for (t = bq->blocking_queue; 
2744            t != END_BQ_QUEUE; 
2745            last = &t->link, t = t->link) {
2746         if (t == (StgBlockingQueueElement *)tso) {
2747           *last = (StgBlockingQueueElement *)tso->link;
2748           goto done;
2749         }
2750       }
2751       barf("unblockThread (BLACKHOLE): TSO not found");
2752     }
2753
2754   case BlockedOnException:
2755     {
2756       StgTSO *target  = tso->block_info.tso;
2757
2758       ASSERT(get_itbl(target)->type == TSO);
2759
2760       if (target->what_next == ThreadRelocated) {
2761           target = target->link;
2762           ASSERT(get_itbl(target)->type == TSO);
2763       }
2764
2765       ASSERT(target->blocked_exceptions != NULL);
2766
2767       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2768       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2769            t != END_BQ_QUEUE; 
2770            last = &t->link, t = t->link) {
2771         ASSERT(get_itbl(t)->type == TSO);
2772         if (t == (StgBlockingQueueElement *)tso) {
2773           *last = (StgBlockingQueueElement *)tso->link;
2774           goto done;
2775         }
2776       }
2777       barf("unblockThread (Exception): TSO not found");
2778     }
2779
2780   case BlockedOnRead:
2781   case BlockedOnWrite:
2782 #if defined(mingw32_HOST_OS)
2783   case BlockedOnDoProc:
2784 #endif
2785     {
2786       /* take TSO off blocked_queue */
2787       StgBlockingQueueElement *prev = NULL;
2788       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2789            prev = t, t = t->link) {
2790         if (t == (StgBlockingQueueElement *)tso) {
2791           if (prev == NULL) {
2792             blocked_queue_hd = (StgTSO *)t->link;
2793             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2794               blocked_queue_tl = END_TSO_QUEUE;
2795             }
2796           } else {
2797             prev->link = t->link;
2798             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2799               blocked_queue_tl = (StgTSO *)prev;
2800             }
2801           }
2802           goto done;
2803         }
2804       }
2805       barf("unblockThread (I/O): TSO not found");
2806     }
2807
2808   case BlockedOnDelay:
2809     {
2810       /* take TSO off sleeping_queue */
2811       StgBlockingQueueElement *prev = NULL;
2812       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2813            prev = t, t = t->link) {
2814         if (t == (StgBlockingQueueElement *)tso) {
2815           if (prev == NULL) {
2816             sleeping_queue = (StgTSO *)t->link;
2817           } else {
2818             prev->link = t->link;
2819           }
2820           goto done;
2821         }
2822       }
2823       barf("unblockThread (delay): TSO not found");
2824     }
2825
2826   default:
2827     barf("unblockThread");
2828   }
2829
2830  done:
2831   tso->link = END_TSO_QUEUE;
2832   tso->why_blocked = NotBlocked;
2833   tso->block_info.closure = NULL;
2834   PUSH_ON_RUN_QUEUE(tso);
2835 }
2836 #else
2837 static void
2838 unblockThread(StgTSO *tso)
2839 {
2840   StgTSO *t, **last;
2841   
2842   /* To avoid locking unnecessarily. */
2843   if (tso->why_blocked == NotBlocked) {
2844     return;
2845   }
2846
2847   switch (tso->why_blocked) {
2848
2849   case BlockedOnSTM:
2850     // Be careful: nothing to do here!  We tell the scheduler that the thread
2851     // is runnable and we leave it to the stack-walking code to abort the 
2852     // transaction while unwinding the stack.  We should perhaps have a debugging
2853     // test to make sure that this really happens and that the 'zombie' transaction
2854     // does not get committed.
2855     goto done;
2856
2857   case BlockedOnMVar:
2858     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2859     {
2860       StgTSO *last_tso = END_TSO_QUEUE;
2861       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2862
2863       last = &mvar->head;
2864       for (t = mvar->head; t != END_TSO_QUEUE; 
2865            last = &t->link, last_tso = t, t = t->link) {
2866         if (t == tso) {
2867           *last = tso->link;
2868           if (mvar->tail == tso) {
2869             mvar->tail = last_tso;
2870           }
2871           goto done;
2872         }
2873       }
2874       barf("unblockThread (MVAR): TSO not found");
2875     }
2876
2877   case BlockedOnBlackHole:
2878     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2879     {
2880       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2881
2882       last = &bq->blocking_queue;
2883       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2884            last = &t->link, t = t->link) {
2885         if (t == tso) {
2886           *last = tso->link;
2887           goto done;
2888         }
2889       }
2890       barf("unblockThread (BLACKHOLE): TSO not found");
2891     }
2892
2893   case BlockedOnException:
2894     {
2895       StgTSO *target  = tso->block_info.tso;
2896
2897       ASSERT(get_itbl(target)->type == TSO);
2898
2899       while (target->what_next == ThreadRelocated) {
2900           target = target->link;
2901           ASSERT(get_itbl(target)->type == TSO);
2902       }
2903       
2904       ASSERT(target->blocked_exceptions != NULL);
2905
2906       last = &target->blocked_exceptions;
2907       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2908            last = &t->link, t = t->link) {
2909         ASSERT(get_itbl(t)->type == TSO);
2910         if (t == tso) {
2911           *last = tso->link;
2912           goto done;
2913         }
2914       }
2915       barf("unblockThread (Exception): TSO not found");
2916     }
2917
2918   case BlockedOnRead:
2919   case BlockedOnWrite:
2920 #if defined(mingw32_HOST_OS)
2921   case BlockedOnDoProc:
2922 #endif
2923     {
2924       StgTSO *prev = NULL;
2925       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2926            prev = t, t = t->link) {
2927         if (t == tso) {
2928           if (prev == NULL) {
2929             blocked_queue_hd = t->link;
2930             if (blocked_queue_tl == t) {
2931               blocked_queue_tl = END_TSO_QUEUE;
2932             }
2933           } else {
2934             prev->link = t->link;
2935             if (blocked_queue_tl == t) {
2936               blocked_queue_tl = prev;
2937             }
2938           }
2939           goto done;
2940         }
2941       }
2942       barf("unblockThread (I/O): TSO not found");
2943     }
2944
2945   case BlockedOnDelay:
2946     {
2947       StgTSO *prev = NULL;
2948       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2949            prev = t, t = t->link) {
2950         if (t == tso) {
2951           if (prev == NULL) {
2952             sleeping_queue = t->link;
2953           } else {
2954             prev->link = t->link;
2955           }
2956           goto done;
2957         }
2958       }
2959       barf("unblockThread (delay): TSO not found");
2960     }
2961
2962   default:
2963     barf("unblockThread");
2964   }
2965
2966  done:
2967   tso->link = END_TSO_QUEUE;
2968   tso->why_blocked = NotBlocked;
2969   tso->block_info.closure = NULL;
2970   APPEND_TO_RUN_QUEUE(tso);
2971 }
2972 #endif
2973
2974 /* -----------------------------------------------------------------------------
2975  * raiseAsync()
2976  *
2977  * The following function implements the magic for raising an
2978  * asynchronous exception in an existing thread.
2979  *
2980  * We first remove the thread from any queue on which it might be
2981  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2982  *
2983  * We strip the stack down to the innermost CATCH_FRAME, building
2984  * thunks in the heap for all the active computations, so they can 
2985  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2986  * an application of the handler to the exception, and push it on
2987  * the top of the stack.
2988  * 
2989  * How exactly do we save all the active computations?  We create an
2990  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2991  * AP_STACKs pushes everything from the corresponding update frame
2992  * upwards onto the stack.  (Actually, it pushes everything up to the
2993  * next update frame plus a pointer to the next AP_STACK object.
2994  * Entering the next AP_STACK object pushes more onto the stack until we
2995  * reach the last AP_STACK object - at which point the stack should look
2996  * exactly as it did when we killed the TSO and we can continue
2997  * execution by entering the closure on top of the stack.
2998  *
2999  * We can also kill a thread entirely - this happens if either (a) the 
3000  * exception passed to raiseAsync is NULL, or (b) there's no
3001  * CATCH_FRAME on the stack.  In either case, we strip the entire
3002  * stack and replace the thread with a zombie.
3003  *
3004  * Locks: sched_mutex held upon entry nor exit.
3005  *
3006  * -------------------------------------------------------------------------- */
3007  
3008 void 
3009 deleteThread(StgTSO *tso)
3010 {
3011   if (tso->why_blocked != BlockedOnCCall &&
3012       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3013       raiseAsync(tso,NULL);
3014   }
3015 }
3016
3017 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3018 static void 
3019 deleteThreadImmediately(StgTSO *tso)
3020 { // for forkProcess only:
3021   // delete thread without giving it a chance to catch the KillThread exception
3022
3023   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3024       return;
3025   }
3026
3027   if (tso->why_blocked != BlockedOnCCall &&
3028       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3029     unblockThread(tso);
3030   }
3031
3032   tso->what_next = ThreadKilled;
3033 }
3034 #endif
3035
3036 void
3037 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3038 {
3039   /* When raising async exs from contexts where sched_mutex isn't held;
3040      use raiseAsyncWithLock(). */
3041   ACQUIRE_LOCK(&sched_mutex);
3042   raiseAsync(tso,exception);
3043   RELEASE_LOCK(&sched_mutex);
3044 }
3045
3046 void
3047 raiseAsync(StgTSO *tso, StgClosure *exception)
3048 {
3049     raiseAsync_(tso, exception, rtsFalse);
3050 }
3051
3052 static void
3053 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3054 {
3055     StgRetInfoTable *info;
3056     StgPtr sp;
3057   
3058     // Thread already dead?
3059     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3060         return;
3061     }
3062
3063     IF_DEBUG(scheduler, 
3064              sched_belch("raising exception in thread %ld.", (long)tso->id));
3065     
3066     // Remove it from any blocking queues
3067     unblockThread(tso);
3068
3069     sp = tso->sp;
3070     
3071     // The stack freezing code assumes there's a closure pointer on
3072     // the top of the stack, so we have to arrange that this is the case...
3073     //
3074     if (sp[0] == (W_)&stg_enter_info) {
3075         sp++;
3076     } else {
3077         sp--;
3078         sp[0] = (W_)&stg_dummy_ret_closure;
3079     }
3080
3081     while (1) {
3082         nat i;
3083
3084         // 1. Let the top of the stack be the "current closure"
3085         //
3086         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3087         // CATCH_FRAME.
3088         //
3089         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3090         // current closure applied to the chunk of stack up to (but not
3091         // including) the update frame.  This closure becomes the "current
3092         // closure".  Go back to step 2.
3093         //
3094         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3095         // top of the stack applied to the exception.
3096         // 
3097         // 5. If it's a STOP_FRAME, then kill the thread.
3098         // 
3099         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3100         // transaction
3101        
3102         
3103         StgPtr frame;
3104         
3105         frame = sp + 1;
3106         info = get_ret_itbl((StgClosure *)frame);
3107         
3108         while (info->i.type != UPDATE_FRAME
3109                && (info->i.type != CATCH_FRAME || exception == NULL)
3110                && info->i.type != STOP_FRAME
3111                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3112         {
3113             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3114               // IF we find an ATOMICALLY_FRAME then we abort the
3115               // current transaction and propagate the exception.  In
3116               // this case (unlike ordinary exceptions) we do not care
3117               // whether the transaction is valid or not because its
3118               // possible validity cannot have caused the exception
3119               // and will not be visible after the abort.
3120               IF_DEBUG(stm,
3121                        debugBelch("Found atomically block delivering async exception\n"));
3122               stmAbortTransaction(tso -> trec);
3123               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3124             }
3125             frame += stack_frame_sizeW((StgClosure *)frame);
3126             info = get_ret_itbl((StgClosure *)frame);
3127         }
3128         
3129         switch (info->i.type) {
3130             
3131         case ATOMICALLY_FRAME:
3132             ASSERT(stop_at_atomically);
3133             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3134             stmCondemnTransaction(tso -> trec);
3135 #ifdef REG_R1
3136             tso->sp = frame;
3137 #else
3138             // R1 is not a register: the return convention for IO in
3139             // this case puts the return value on the stack, so we
3140             // need to set up the stack to return to the atomically
3141             // frame properly...
3142             tso->sp = frame - 2;
3143             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3144             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3145 #endif
3146             tso->what_next = ThreadRunGHC;
3147             return;
3148
3149         case CATCH_FRAME:
3150             // If we find a CATCH_FRAME, and we've got an exception to raise,
3151             // then build the THUNK raise(exception), and leave it on
3152             // top of the CATCH_FRAME ready to enter.
3153             //
3154         {
3155 #ifdef PROFILING
3156             StgCatchFrame *cf = (StgCatchFrame *)frame;
3157 #endif
3158             StgClosure *raise;
3159             
3160             // we've got an exception to raise, so let's pass it to the
3161             // handler in this frame.
3162             //
3163             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3164             TICK_ALLOC_SE_THK(1,0);
3165             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3166             raise->payload[0] = exception;
3167             
3168             // throw away the stack from Sp up to the CATCH_FRAME.
3169             //
3170             sp = frame - 1;
3171             
3172             /* Ensure that async excpetions are blocked now, so we don't get
3173              * a surprise exception before we get around to executing the
3174              * handler.
3175              */
3176             if (tso->blocked_exceptions == NULL) {
3177                 tso->blocked_exceptions = END_TSO_QUEUE;
3178             }
3179             
3180             /* Put the newly-built THUNK on top of the stack, ready to execute
3181              * when the thread restarts.
3182              */
3183             sp[0] = (W_)raise;
3184             sp[-1] = (W_)&stg_enter_info;
3185             tso->sp = sp-1;
3186             tso->what_next = ThreadRunGHC;
3187             IF_DEBUG(sanity, checkTSO(tso));
3188             return;
3189         }
3190         
3191         case UPDATE_FRAME:
3192         {
3193             StgAP_STACK * ap;
3194             nat words;
3195             
3196             // First build an AP_STACK consisting of the stack chunk above the
3197             // current update frame, with the top word on the stack as the
3198             // fun field.
3199             //
3200             words = frame - sp - 1;
3201             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3202             
3203             ap->size = words;
3204             ap->fun  = (StgClosure *)sp[0];
3205             sp++;
3206             for(i=0; i < (nat)words; ++i) {
3207                 ap->payload[i] = (StgClosure *)*sp++;
3208             }
3209             
3210             SET_HDR(ap,&stg_AP_STACK_info,
3211                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3212             TICK_ALLOC_UP_THK(words+1,0);
3213             
3214             IF_DEBUG(scheduler,
3215                      debugBelch("sched: Updating ");
3216                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3217                      debugBelch(" with ");
3218                      printObj((StgClosure *)ap);
3219                 );
3220
3221             // Replace the updatee with an indirection - happily
3222             // this will also wake up any threads currently
3223             // waiting on the result.
3224             //
3225             // Warning: if we're in a loop, more than one update frame on
3226             // the stack may point to the same object.  Be careful not to
3227             // overwrite an IND_OLDGEN in this case, because we'll screw
3228             // up the mutable lists.  To be on the safe side, don't
3229             // overwrite any kind of indirection at all.  See also
3230             // threadSqueezeStack in GC.c, where we have to make a similar
3231             // check.
3232             //
3233             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3234                 // revert the black hole
3235                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3236                                (StgClosure *)ap);
3237             }
3238             sp += sizeofW(StgUpdateFrame) - 1;
3239             sp[0] = (W_)ap; // push onto stack
3240             break;
3241         }
3242         
3243         case STOP_FRAME:
3244             // We've stripped the entire stack, the thread is now dead.
3245             sp += sizeofW(StgStopFrame);
3246             tso->what_next = ThreadKilled;
3247             tso->sp = sp;
3248             return;
3249             
3250         default:
3251             barf("raiseAsync");
3252         }
3253     }
3254     barf("raiseAsync");
3255 }
3256
3257 /* -----------------------------------------------------------------------------
3258    raiseExceptionHelper
3259    
3260    This function is called by the raise# primitve, just so that we can
3261    move some of the tricky bits of raising an exception from C-- into
3262    C.  Who knows, it might be a useful re-useable thing here too.
3263    -------------------------------------------------------------------------- */
3264
3265 StgWord
3266 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3267 {
3268     StgClosure *raise_closure = NULL;
3269     StgPtr p, next;
3270     StgRetInfoTable *info;
3271     //
3272     // This closure represents the expression 'raise# E' where E
3273     // is the exception raise.  It is used to overwrite all the
3274     // thunks which are currently under evaluataion.
3275     //
3276
3277     //    
3278     // LDV profiling: stg_raise_info has THUNK as its closure
3279     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3280     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3281     // 1 does not cause any problem unless profiling is performed.
3282     // However, when LDV profiling goes on, we need to linearly scan
3283     // small object pool, where raise_closure is stored, so we should
3284     // use MIN_UPD_SIZE.
3285     //
3286     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3287     //                                 sizeofW(StgClosure)+1);
3288     //
3289
3290     //
3291     // Walk up the stack, looking for the catch frame.  On the way,
3292     // we update any closures pointed to from update frames with the
3293     // raise closure that we just built.
3294     //
3295     p = tso->sp;
3296     while(1) {
3297         info = get_ret_itbl((StgClosure *)p);
3298         next = p + stack_frame_sizeW((StgClosure *)p);
3299         switch (info->i.type) {
3300             
3301         case UPDATE_FRAME:
3302             // Only create raise_closure if we need to.
3303             if (raise_closure == NULL) {
3304                 raise_closure = 
3305                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3306                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3307                 raise_closure->payload[0] = exception;
3308             }
3309             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3310             p = next;
3311             continue;
3312
3313         case ATOMICALLY_FRAME:
3314             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3315             tso->sp = p;
3316             return ATOMICALLY_FRAME;
3317             
3318         case CATCH_FRAME:
3319             tso->sp = p;
3320             return CATCH_FRAME;
3321
3322         case CATCH_STM_FRAME:
3323             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3324             tso->sp = p;
3325             return CATCH_STM_FRAME;
3326             
3327         case STOP_FRAME:
3328             tso->sp = p;
3329             return STOP_FRAME;
3330
3331         case CATCH_RETRY_FRAME:
3332         default:
3333             p = next; 
3334             continue;
3335         }
3336     }
3337 }
3338
3339
3340 /* -----------------------------------------------------------------------------
3341    findRetryFrameHelper
3342
3343    This function is called by the retry# primitive.  It traverses the stack
3344    leaving tso->sp referring to the frame which should handle the retry.  
3345
3346    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3347    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3348
3349    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3350    despite the similar implementation.
3351
3352    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3353    not be created within memory transactions.
3354    -------------------------------------------------------------------------- */
3355
3356 StgWord
3357 findRetryFrameHelper (StgTSO *tso)
3358 {
3359   StgPtr           p, next;
3360   StgRetInfoTable *info;
3361
3362   p = tso -> sp;
3363   while (1) {
3364     info = get_ret_itbl((StgClosure *)p);
3365     next = p + stack_frame_sizeW((StgClosure *)p);
3366     switch (info->i.type) {
3367       
3368     case ATOMICALLY_FRAME:
3369       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3370       tso->sp = p;
3371       return ATOMICALLY_FRAME;
3372       
3373     case CATCH_RETRY_FRAME:
3374       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3375       tso->sp = p;
3376       return CATCH_RETRY_FRAME;
3377       
3378     case CATCH_STM_FRAME:
3379     default:
3380       ASSERT(info->i.type != CATCH_FRAME);
3381       ASSERT(info->i.type != STOP_FRAME);
3382       p = next; 
3383       continue;
3384     }
3385   }
3386 }
3387
3388 /* -----------------------------------------------------------------------------
3389    resurrectThreads is called after garbage collection on the list of
3390    threads found to be garbage.  Each of these threads will be woken
3391    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3392    on an MVar, or NonTermination if the thread was blocked on a Black
3393    Hole.
3394
3395    Locks: sched_mutex isn't held upon entry nor exit.
3396    -------------------------------------------------------------------------- */
3397
3398 void
3399 resurrectThreads( StgTSO *threads )
3400 {
3401   StgTSO *tso, *next;
3402
3403   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3404     next = tso->global_link;
3405     tso->global_link = all_threads;
3406     all_threads = tso;
3407     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3408
3409     switch (tso->why_blocked) {
3410     case BlockedOnMVar:
3411     case BlockedOnException:
3412       /* Called by GC - sched_mutex lock is currently held. */
3413       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3414       break;
3415     case BlockedOnBlackHole:
3416       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3417       break;
3418     case BlockedOnSTM:
3419       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3420       break;
3421     case NotBlocked:
3422       /* This might happen if the thread was blocked on a black hole
3423        * belonging to a thread that we've just woken up (raiseAsync
3424        * can wake up threads, remember...).
3425        */
3426       continue;
3427     default:
3428       barf("resurrectThreads: thread blocked in a strange way");
3429     }
3430   }
3431 }
3432
3433 /* ----------------------------------------------------------------------------
3434  * Debugging: why is a thread blocked
3435  * [Also provides useful information when debugging threaded programs
3436  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3437    ------------------------------------------------------------------------- */
3438
3439 static
3440 void
3441 printThreadBlockage(StgTSO *tso)
3442 {
3443   switch (tso->why_blocked) {
3444   case BlockedOnRead:
3445     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3446     break;
3447   case BlockedOnWrite:
3448     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3449     break;
3450 #if defined(mingw32_HOST_OS)
3451     case BlockedOnDoProc:
3452     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3453     break;
3454 #endif
3455   case BlockedOnDelay:
3456     debugBelch("is blocked until %d", tso->block_info.target);
3457     break;
3458   case BlockedOnMVar:
3459     debugBelch("is blocked on an MVar");
3460     break;
3461   case BlockedOnException:
3462     debugBelch("is blocked on delivering an exception to thread %d",
3463             tso->block_info.tso->id);
3464     break;
3465   case BlockedOnBlackHole:
3466     debugBelch("is blocked on a black hole");
3467     break;
3468   case NotBlocked:
3469     debugBelch("is not blocked");
3470     break;
3471 #if defined(PAR)
3472   case BlockedOnGA:
3473     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3474             tso->block_info.closure, info_type(tso->block_info.closure));
3475     break;
3476   case BlockedOnGA_NoSend:
3477     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3478             tso->block_info.closure, info_type(tso->block_info.closure));
3479     break;
3480 #endif
3481   case BlockedOnCCall:
3482     debugBelch("is blocked on an external call");
3483     break;
3484   case BlockedOnCCall_NoUnblockExc:
3485     debugBelch("is blocked on an external call (exceptions were already blocked)");
3486     break;
3487   case BlockedOnSTM:
3488     debugBelch("is blocked on an STM operation");
3489     break;
3490   default:
3491     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3492          tso->why_blocked, tso->id, tso);
3493   }
3494 }
3495
3496 static
3497 void
3498 printThreadStatus(StgTSO *tso)
3499 {
3500   switch (tso->what_next) {
3501   case ThreadKilled:
3502     debugBelch("has been killed");
3503     break;
3504   case ThreadComplete:
3505     debugBelch("has completed");
3506     break;
3507   default:
3508     printThreadBlockage(tso);
3509   }
3510 }
3511
3512 void
3513 printAllThreads(void)
3514 {
3515   StgTSO *t;
3516
3517 # if defined(GRAN)
3518   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3519   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3520                        time_string, rtsFalse/*no commas!*/);
3521
3522   debugBelch("all threads at [%s]:\n", time_string);
3523 # elif defined(PAR)
3524   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3525   ullong_format_string(CURRENT_TIME,
3526                        time_string, rtsFalse/*no commas!*/);
3527
3528   debugBelch("all threads at [%s]:\n", time_string);
3529 # else
3530   debugBelch("all threads:\n");
3531 # endif
3532
3533   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3534     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3535 #if defined(DEBUG)
3536     {
3537       void *label = lookupThreadLabel(t->id);
3538       if (label) debugBelch("[\"%s\"] ",(char *)label);
3539     }
3540 #endif
3541     printThreadStatus(t);
3542     debugBelch("\n");
3543   }
3544 }
3545     
3546 #ifdef DEBUG
3547
3548 /* 
3549    Print a whole blocking queue attached to node (debugging only).
3550 */
3551 # if defined(PAR)
3552 void 
3553 print_bq (StgClosure *node)
3554 {
3555   StgBlockingQueueElement *bqe;
3556   StgTSO *tso;
3557   rtsBool end;
3558
3559   debugBelch("## BQ of closure %p (%s): ",
3560           node, info_type(node));
3561
3562   /* should cover all closures that may have a blocking queue */
3563   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3564          get_itbl(node)->type == FETCH_ME_BQ ||
3565          get_itbl(node)->type == RBH ||
3566          get_itbl(node)->type == MVAR);
3567     
3568   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3569
3570   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3571 }
3572
3573 /* 
3574    Print a whole blocking queue starting with the element bqe.
3575 */
3576 void 
3577 print_bqe (StgBlockingQueueElement *bqe)
3578 {
3579   rtsBool end;
3580
3581   /* 
3582      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3583   */
3584   for (end = (bqe==END_BQ_QUEUE);
3585        !end; // iterate until bqe points to a CONSTR
3586        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3587        bqe = end ? END_BQ_QUEUE : bqe->link) {
3588     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3589     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3590     /* types of closures that may appear in a blocking queue */
3591     ASSERT(get_itbl(bqe)->type == TSO ||           
3592            get_itbl(bqe)->type == BLOCKED_FETCH || 
3593            get_itbl(bqe)->type == CONSTR); 
3594     /* only BQs of an RBH end with an RBH_Save closure */
3595     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3596
3597     switch (get_itbl(bqe)->type) {
3598     case TSO:
3599       debugBelch(" TSO %u (%x),",
3600               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3601       break;
3602     case BLOCKED_FETCH:
3603       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3604               ((StgBlockedFetch *)bqe)->node, 
3605               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3606               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3607               ((StgBlockedFetch *)bqe)->ga.weight);
3608       break;
3609     case CONSTR:
3610       debugBelch(" %s (IP %p),",
3611               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3612                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3613                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3614                "RBH_Save_?"), get_itbl(bqe));
3615       break;
3616     default:
3617       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3618            info_type((StgClosure *)bqe)); // , node, info_type(node));
3619       break;
3620     }
3621   } /* for */
3622   debugBelch("\n");
3623 }
3624 # elif defined(GRAN)
3625 void 
3626 print_bq (StgClosure *node)
3627 {
3628   StgBlockingQueueElement *bqe;
3629   PEs node_loc, tso_loc;
3630   rtsBool end;
3631
3632   /* should cover all closures that may have a blocking queue */
3633   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3634          get_itbl(node)->type == FETCH_ME_BQ ||
3635          get_itbl(node)->type == RBH);
3636     
3637   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3638   node_loc = where_is(node);
3639
3640   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3641           node, info_type(node), node_loc);
3642
3643   /* 
3644      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3645   */
3646   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3647        !end; // iterate until bqe points to a CONSTR
3648        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3649     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3650     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3651     /* types of closures that may appear in a blocking queue */
3652     ASSERT(get_itbl(bqe)->type == TSO ||           
3653            get_itbl(bqe)->type == CONSTR); 
3654     /* only BQs of an RBH end with an RBH_Save closure */
3655     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3656
3657     tso_loc = where_is((StgClosure *)bqe);
3658     switch (get_itbl(bqe)->type) {
3659     case TSO:
3660       debugBelch(" TSO %d (%p) on [PE %d],",
3661               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3662       break;
3663     case CONSTR:
3664       debugBelch(" %s (IP %p),",
3665               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3666                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3667                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3668                "RBH_Save_?"), get_itbl(bqe));
3669       break;
3670     default:
3671       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3672            info_type((StgClosure *)bqe), node, info_type(node));
3673       break;
3674     }
3675   } /* for */
3676   debugBelch("\n");
3677 }
3678 #else
3679 /* 
3680    Nice and easy: only TSOs on the blocking queue
3681 */
3682 void 
3683 print_bq (StgClosure *node)
3684 {
3685   StgTSO *tso;
3686
3687   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3688   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3689        tso != END_TSO_QUEUE; 
3690        tso=tso->link) {
3691     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3692     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3693     debugBelch(" TSO %d (%p),", tso->id, tso);
3694   }
3695   debugBelch("\n");
3696 }
3697 # endif
3698
3699 #if defined(PAR)
3700 static nat
3701 run_queue_len(void)
3702 {
3703   nat i;
3704   StgTSO *tso;
3705
3706   for (i=0, tso=run_queue_hd; 
3707        tso != END_TSO_QUEUE;
3708        i++, tso=tso->link)
3709     /* nothing */
3710
3711   return i;
3712 }
3713 #endif
3714
3715 void
3716 sched_belch(char *s, ...)
3717 {
3718   va_list ap;
3719   va_start(ap,s);
3720 #ifdef RTS_SUPPORTS_THREADS
3721   debugBelch("sched (task %p): ", osThreadId());
3722 #elif defined(PAR)
3723   debugBelch("== ");
3724 #else
3725   debugBelch("sched: ");
3726 #endif
3727   vdebugBelch(s, ap);
3728   debugBelch("\n");
3729   va_end(ap);
3730 }
3731
3732 #endif /* DEBUG */