[project @ 2005-04-07 14:33:30 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* if this flag is set as well, give up execution */
178 rtsBool interrupted = rtsFalse;
179
180 /* If this flag is set, we are running Haskell code.  Used to detect
181  * uses of 'foreign import unsafe' that should be 'safe'.
182  */
183 static rtsBool in_haskell = rtsFalse;
184
185 /* Next thread ID to allocate.
186  * Locks required: thread_id_mutex
187  */
188 static StgThreadID next_thread_id = 1;
189
190 /*
191  * Pointers to the state of the current thread.
192  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
194  */
195  
196 /* The smallest stack size that makes any sense is:
197  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
198  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
199  *  + 1                       (the closure to enter)
200  *  + 1                       (stg_ap_v_ret)
201  *  + 1                       (spare slot req'd by stg_ap_v_ret)
202  *
203  * A thread with this stack will bomb immediately with a stack
204  * overflow, which will increase its stack size.  
205  */
206
207 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
208
209
210 #if defined(GRAN)
211 StgTSO *CurrentTSO;
212 #endif
213
214 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
215  *  exists - earlier gccs apparently didn't.
216  *  -= chak
217  */
218 StgTSO dummy_tso;
219
220 # if defined(SMP)
221 static Condition gc_pending_cond = INIT_COND_VAR;
222 # endif
223
224 static rtsBool ready_to_gc;
225
226 /*
227  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
228  * in an MT setting, needed to signal that a worker thread shouldn't hang around
229  * in the scheduler when it is out of work.
230  */
231 static rtsBool shutting_down_scheduler = rtsFalse;
232
233 #if defined(RTS_SUPPORTS_THREADS)
234 /* ToDo: carefully document the invariants that go together
235  *       with these synchronisation objects.
236  */
237 Mutex     sched_mutex       = INIT_MUTEX_VAR;
238 Mutex     term_mutex        = INIT_MUTEX_VAR;
239
240 #endif /* RTS_SUPPORTS_THREADS */
241
242 #if defined(PARALLEL_HASKELL)
243 StgTSO *LastTSO;
244 rtsTime TimeOfLastYield;
245 rtsBool emitSchedule = rtsTrue;
246 #endif
247
248 #if DEBUG
249 static char *whatNext_strs[] = {
250   "(unknown)",
251   "ThreadRunGHC",
252   "ThreadInterpret",
253   "ThreadKilled",
254   "ThreadRelocated",
255   "ThreadComplete"
256 };
257 #endif
258
259 /* -----------------------------------------------------------------------------
260  * static function prototypes
261  * -------------------------------------------------------------------------- */
262
263 #if defined(RTS_SUPPORTS_THREADS)
264 static void taskStart(void);
265 #endif
266
267 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
268                       Capability *initialCapability );
269
270 //
271 // These function all encapsulate parts of the scheduler loop, and are
272 // abstracted only to make the structure and control flow of the
273 // scheduler clearer.
274 //
275 static void schedulePreLoop(void);
276 static void scheduleStartSignalHandlers(void);
277 static void scheduleCheckBlockedThreads(void);
278 static void scheduleCheckBlackHoles(void);
279 static void scheduleDetectDeadlock(void);
280 #if defined(GRAN)
281 static StgTSO *scheduleProcessEvent(rtsEvent *event);
282 #endif
283 #if defined(PARALLEL_HASKELL)
284 static StgTSO *scheduleSendPendingMessages(void);
285 static void scheduleActivateSpark(void);
286 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
287 #endif
288 #if defined(PAR) || defined(GRAN)
289 static void scheduleGranParReport(void);
290 #endif
291 static void schedulePostRunThread(void);
292 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
293 static void scheduleHandleStackOverflow( StgTSO *t);
294 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
295 static void scheduleHandleThreadBlocked( StgTSO *t );
296 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
297                                              Capability *cap, StgTSO *t );
298 static void scheduleDoHeapProfile(void);
299 static void scheduleDoGC(void);
300
301 static void unblockThread(StgTSO *tso);
302 static rtsBool checkBlackHoles(void);
303 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
304                                    Capability *initialCapability
305                                    );
306 static void scheduleThread_ (StgTSO* tso);
307 static void AllRoots(evac_fn evac);
308
309 static StgTSO *threadStackOverflow(StgTSO *tso);
310
311 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
312                         rtsBool stop_at_atomically);
313
314 static void printThreadBlockage(StgTSO *tso);
315 static void printThreadStatus(StgTSO *tso);
316
317 #if defined(PARALLEL_HASKELL)
318 StgTSO * createSparkThread(rtsSpark spark);
319 StgTSO * activateSpark (rtsSpark spark);  
320 #endif
321
322 /* ----------------------------------------------------------------------------
323  * Starting Tasks
324  * ------------------------------------------------------------------------- */
325
326 #if defined(RTS_SUPPORTS_THREADS)
327 static rtsBool startingWorkerThread = rtsFalse;
328
329 static void
330 taskStart(void)
331 {
332   ACQUIRE_LOCK(&sched_mutex);
333   startingWorkerThread = rtsFalse;
334   schedule(NULL,NULL);
335   taskStop();
336   RELEASE_LOCK(&sched_mutex);
337 }
338
339 void
340 startSchedulerTaskIfNecessary(void)
341 {
342     if ( !EMPTY_RUN_QUEUE()
343          && !shutting_down_scheduler // not if we're shutting down
344          && !startingWorkerThread )
345     {
346         // we don't want to start another worker thread
347         // just because the last one hasn't yet reached the
348         // "waiting for capability" state
349         startingWorkerThread = rtsTrue;
350         if (!maybeStartNewWorker(taskStart)) {
351             startingWorkerThread = rtsFalse;
352         }
353     }
354 }
355 #endif
356
357 /* -----------------------------------------------------------------------------
358  * Putting a thread on the run queue: different scheduling policies
359  * -------------------------------------------------------------------------- */
360
361 STATIC_INLINE void
362 addToRunQueue( StgTSO *t )
363 {
364 #if defined(PARALLEL_HASKELL)
365     if (RtsFlags.ParFlags.doFairScheduling) { 
366         // this does round-robin scheduling; good for concurrency
367         APPEND_TO_RUN_QUEUE(t);
368     } else {
369         // this does unfair scheduling; good for parallelism
370         PUSH_ON_RUN_QUEUE(t);
371     }
372 #else
373     // this does round-robin scheduling; good for concurrency
374     APPEND_TO_RUN_QUEUE(t);
375 #endif
376 }
377     
378 /* ---------------------------------------------------------------------------
379    Main scheduling loop.
380
381    We use round-robin scheduling, each thread returning to the
382    scheduler loop when one of these conditions is detected:
383
384       * out of heap space
385       * timer expires (thread yields)
386       * thread blocks
387       * thread ends
388       * stack overflow
389
390    Locking notes:  we acquire the scheduler lock once at the beginning
391    of the scheduler loop, and release it when
392     
393       * running a thread, or
394       * waiting for work, or
395       * waiting for a GC to complete.
396
397    GRAN version:
398      In a GranSim setup this loop iterates over the global event queue.
399      This revolves around the global event queue, which determines what 
400      to do next. Therefore, it's more complicated than either the 
401      concurrent or the parallel (GUM) setup.
402
403    GUM version:
404      GUM iterates over incoming messages.
405      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
406      and sends out a fish whenever it has nothing to do; in-between
407      doing the actual reductions (shared code below) it processes the
408      incoming messages and deals with delayed operations 
409      (see PendingFetches).
410      This is not the ugliest code you could imagine, but it's bloody close.
411
412    ------------------------------------------------------------------------ */
413
414 static void
415 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
416           Capability *initialCapability )
417 {
418   StgTSO *t;
419   Capability *cap;
420   StgThreadReturnCode ret;
421 #if defined(GRAN)
422   rtsEvent *event;
423 #elif defined(PARALLEL_HASKELL)
424   StgTSO *tso;
425   GlobalTaskId pe;
426   rtsBool receivedFinish = rtsFalse;
427 # if defined(DEBUG)
428   nat tp_size, sp_size; // stats only
429 # endif
430 #endif
431   nat prev_what_next;
432   
433   // Pre-condition: sched_mutex is held.
434   // We might have a capability, passed in as initialCapability.
435   cap = initialCapability;
436
437 #if !defined(RTS_SUPPORTS_THREADS)
438   // simply initialise it in the non-threaded case
439   grabCapability(&cap);
440 #endif
441
442   IF_DEBUG(scheduler,
443            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
444                        mainThread, initialCapability);
445       );
446
447   schedulePreLoop();
448
449   // -----------------------------------------------------------
450   // Scheduler loop starts here:
451
452 #if defined(PARALLEL_HASKELL)
453 #define TERMINATION_CONDITION        (!receivedFinish)
454 #elif defined(GRAN)
455 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
456 #else
457 #define TERMINATION_CONDITION        rtsTrue
458 #endif
459
460   while (TERMINATION_CONDITION) {
461
462 #if defined(GRAN)
463       /* Choose the processor with the next event */
464       CurrentProc = event->proc;
465       CurrentTSO = event->tso;
466 #endif
467
468       IF_DEBUG(scheduler, printAllThreads());
469
470 #if defined(SMP)
471       // 
472       // Wait until GC has completed, if necessary.
473       //
474       if (ready_to_gc) {
475           if (cap != NULL) {
476               releaseCapability(cap);
477               IF_DEBUG(scheduler,sched_belch("waiting for GC"));
478               waitCondition( &gc_pending_cond, &sched_mutex );
479           }
480       }
481 #endif
482
483 #if defined(RTS_SUPPORTS_THREADS)
484       // Yield the capability to higher-priority tasks if necessary.
485       //
486       if (cap != NULL) {
487           yieldCapability(&cap);
488       }
489
490       // If we do not currently hold a capability, we wait for one
491       //
492       if (cap == NULL) {
493           waitForCapability(&sched_mutex, &cap,
494                             mainThread ? &mainThread->bound_thread_cond : NULL);
495       }
496
497       // We now have a capability...
498 #endif
499
500     // Check whether we have re-entered the RTS from Haskell without
501     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
502     // call).
503     if (in_haskell) {
504           errorBelch("schedule: re-entered unsafely.\n"
505                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
506           stg_exit(1);
507     }
508
509     //
510     // Test for interruption.  If interrupted==rtsTrue, then either
511     // we received a keyboard interrupt (^C), or the scheduler is
512     // trying to shut down all the tasks (shutting_down_scheduler) in
513     // the threaded RTS.
514     //
515     if (interrupted) {
516         if (shutting_down_scheduler) {
517             IF_DEBUG(scheduler, sched_belch("shutting down"));
518             releaseCapability(cap);
519             if (mainThread) {
520                 mainThread->stat = Interrupted;
521                 mainThread->ret  = NULL;
522             }
523             return;
524         } else {
525             IF_DEBUG(scheduler, sched_belch("interrupted"));
526             deleteAllThreads();
527         }
528     }
529
530 #if defined(not_yet) && defined(SMP)
531     //
532     // Top up the run queue from our spark pool.  We try to make the
533     // number of threads in the run queue equal to the number of
534     // free capabilities.
535     //
536     {
537         StgClosure *spark;
538         if (EMPTY_RUN_QUEUE()) {
539             spark = findSpark(rtsFalse);
540             if (spark == NULL) {
541                 break; /* no more sparks in the pool */
542             } else {
543                 createSparkThread(spark);         
544                 IF_DEBUG(scheduler,
545                          sched_belch("==^^ turning spark of closure %p into a thread",
546                                      (StgClosure *)spark));
547             }
548         }
549     }
550 #endif // SMP
551
552     scheduleStartSignalHandlers();
553
554     // Only check the black holes here if we've nothing else to do.
555     // During normal execution, the black hole list only gets checked
556     // at GC time, to avoid repeatedly traversing this possibly long
557     // list each time around the scheduler.
558     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
559
560     scheduleCheckBlockedThreads();
561
562     scheduleDetectDeadlock();
563
564     // Normally, the only way we can get here with no threads to
565     // run is if a keyboard interrupt received during 
566     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
567     // Additionally, it is not fatal for the
568     // threaded RTS to reach here with no threads to run.
569     //
570     // win32: might be here due to awaitEvent() being abandoned
571     // as a result of a console event having been delivered.
572     if ( EMPTY_RUN_QUEUE() ) {
573 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
574         ASSERT(interrupted);
575 #endif
576         continue; // nothing to do
577     }
578
579 #if defined(PARALLEL_HASKELL)
580     scheduleSendPendingMessages();
581     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
582         continue;
583
584 #if defined(SPARKS)
585     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
586 #endif
587
588     /* If we still have no work we need to send a FISH to get a spark
589        from another PE */
590     if (EMPTY_RUN_QUEUE()) {
591         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
592         ASSERT(rtsFalse); // should not happen at the moment
593     }
594     // from here: non-empty run queue.
595     //  TODO: merge above case with this, only one call processMessages() !
596     if (PacketsWaiting()) {  /* process incoming messages, if
597                                 any pending...  only in else
598                                 because getRemoteWork waits for
599                                 messages as well */
600         receivedFinish = processMessages();
601     }
602 #endif
603
604 #if defined(GRAN)
605     scheduleProcessEvent(event);
606 #endif
607
608     // 
609     // Get a thread to run
610     //
611     ASSERT(run_queue_hd != END_TSO_QUEUE);
612     POP_RUN_QUEUE(t);
613
614 #if defined(GRAN) || defined(PAR)
615     scheduleGranParReport(); // some kind of debuging output
616 #else
617     // Sanity check the thread we're about to run.  This can be
618     // expensive if there is lots of thread switching going on...
619     IF_DEBUG(sanity,checkTSO(t));
620 #endif
621
622 #if defined(RTS_SUPPORTS_THREADS)
623     // Check whether we can run this thread in the current task.
624     // If not, we have to pass our capability to the right task.
625     {
626       StgMainThread *m = t->main;
627       
628       if(m)
629       {
630         if(m == mainThread)
631         {
632           IF_DEBUG(scheduler,
633             sched_belch("### Running thread %d in bound thread", t->id));
634           // yes, the Haskell thread is bound to the current native thread
635         }
636         else
637         {
638           IF_DEBUG(scheduler,
639             sched_belch("### thread %d bound to another OS thread", t->id));
640           // no, bound to a different Haskell thread: pass to that thread
641           PUSH_ON_RUN_QUEUE(t);
642           passCapability(&m->bound_thread_cond);
643           continue;
644         }
645       }
646       else
647       {
648         if(mainThread != NULL)
649         // The thread we want to run is bound.
650         {
651           IF_DEBUG(scheduler,
652             sched_belch("### this OS thread cannot run thread %d", t->id));
653           // no, the current native thread is bound to a different
654           // Haskell thread, so pass it to any worker thread
655           PUSH_ON_RUN_QUEUE(t);
656           passCapabilityToWorker();
657           continue; 
658         }
659       }
660     }
661 #endif
662
663     cap->r.rCurrentTSO = t;
664     
665     /* context switches are now initiated by the timer signal, unless
666      * the user specified "context switch as often as possible", with
667      * +RTS -C0
668      */
669     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
670          && (run_queue_hd != END_TSO_QUEUE
671              || blocked_queue_hd != END_TSO_QUEUE
672              || sleeping_queue != END_TSO_QUEUE)))
673         context_switch = 1;
674
675 run_thread:
676
677     RELEASE_LOCK(&sched_mutex);
678
679     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
680                               (long)t->id, whatNext_strs[t->what_next]));
681
682 #if defined(PROFILING)
683     startHeapProfTimer();
684 #endif
685
686     // ----------------------------------------------------------------------
687     // Run the current thread 
688
689     prev_what_next = t->what_next;
690
691     errno = t->saved_errno;
692     in_haskell = rtsTrue;
693
694     switch (prev_what_next) {
695
696     case ThreadKilled:
697     case ThreadComplete:
698         /* Thread already finished, return to scheduler. */
699         ret = ThreadFinished;
700         break;
701
702     case ThreadRunGHC:
703         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
704         break;
705
706     case ThreadInterpret:
707         ret = interpretBCO(cap);
708         break;
709
710     default:
711       barf("schedule: invalid what_next field");
712     }
713
714     // We have run some Haskell code: there might be blackhole-blocked
715     // threads to wake up now.
716     if ( blackhole_queue != END_TSO_QUEUE ) {
717         blackholes_need_checking = rtsTrue;
718     }
719
720     in_haskell = rtsFalse;
721
722     // The TSO might have moved, eg. if it re-entered the RTS and a GC
723     // happened.  So find the new location:
724     t = cap->r.rCurrentTSO;
725
726     // And save the current errno in this thread.
727     t->saved_errno = errno;
728
729     // ----------------------------------------------------------------------
730     
731     /* Costs for the scheduler are assigned to CCS_SYSTEM */
732 #if defined(PROFILING)
733     stopHeapProfTimer();
734     CCCS = CCS_SYSTEM;
735 #endif
736     
737     ACQUIRE_LOCK(&sched_mutex);
738     
739 #if defined(RTS_SUPPORTS_THREADS)
740     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
741 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
742     IF_DEBUG(scheduler,debugBelch("sched: "););
743 #endif
744     
745     schedulePostRunThread();
746
747     switch (ret) {
748     case HeapOverflow:
749         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
750         break;
751
752     case StackOverflow:
753         scheduleHandleStackOverflow(t);
754         break;
755
756     case ThreadYielding:
757         if (scheduleHandleYield(t, prev_what_next)) {
758             // shortcut for switching between compiler/interpreter:
759             goto run_thread; 
760         }
761         break;
762
763     case ThreadBlocked:
764         scheduleHandleThreadBlocked(t);
765         threadPaused(t);
766         break;
767
768     case ThreadFinished:
769         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
770         break;
771
772     default:
773       barf("schedule: invalid thread return code %d", (int)ret);
774     }
775
776     scheduleDoHeapProfile();
777     scheduleDoGC();
778   } /* end of while() */
779
780   IF_PAR_DEBUG(verbose,
781                debugBelch("== Leaving schedule() after having received Finish\n"));
782 }
783
784 /* ----------------------------------------------------------------------------
785  * Setting up the scheduler loop
786  * ASSUMES: sched_mutex
787  * ------------------------------------------------------------------------- */
788
789 static void
790 schedulePreLoop(void)
791 {
792 #if defined(GRAN) 
793     /* set up first event to get things going */
794     /* ToDo: assign costs for system setup and init MainTSO ! */
795     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
796               ContinueThread, 
797               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
798     
799     IF_DEBUG(gran,
800              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
801                         CurrentTSO);
802              G_TSO(CurrentTSO, 5));
803     
804     if (RtsFlags.GranFlags.Light) {
805         /* Save current time; GranSim Light only */
806         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
807     }      
808 #endif
809 }
810
811 /* ----------------------------------------------------------------------------
812  * Start any pending signal handlers
813  * ASSUMES: sched_mutex
814  * ------------------------------------------------------------------------- */
815
816 static void
817 scheduleStartSignalHandlers(void)
818 {
819 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
820     if (signals_pending()) {
821       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
822       startSignalHandlers();
823       ACQUIRE_LOCK(&sched_mutex);
824     }
825 #endif
826 }
827
828 /* ----------------------------------------------------------------------------
829  * Check for blocked threads that can be woken up.
830  * ASSUMES: sched_mutex
831  * ------------------------------------------------------------------------- */
832
833 static void
834 scheduleCheckBlockedThreads(void)
835 {
836     //
837     // Check whether any waiting threads need to be woken up.  If the
838     // run queue is empty, and there are no other tasks running, we
839     // can wait indefinitely for something to happen.
840     //
841     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
842     {
843 #if defined(RTS_SUPPORTS_THREADS)
844         // We shouldn't be here...
845         barf("schedule: awaitEvent() in threaded RTS");
846 #endif
847         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
848     }
849 }
850
851
852 /* ----------------------------------------------------------------------------
853  * Check for threads blocked on BLACKHOLEs that can be woken up
854  * ASSUMES: sched_mutex
855  * ------------------------------------------------------------------------- */
856 static void
857 scheduleCheckBlackHoles( void )
858 {
859     if ( blackholes_need_checking )
860     {
861         checkBlackHoles();
862         blackholes_need_checking = rtsFalse;
863     }
864 }
865
866 /* ----------------------------------------------------------------------------
867  * Detect deadlock conditions and attempt to resolve them.
868  * ASSUMES: sched_mutex
869  * ------------------------------------------------------------------------- */
870
871 static void
872 scheduleDetectDeadlock(void)
873 {
874     /* 
875      * Detect deadlock: when we have no threads to run, there are no
876      * threads blocked, waiting for I/O, or sleeping, and all the
877      * other tasks are waiting for work, we must have a deadlock of
878      * some description.
879      */
880     if ( EMPTY_THREAD_QUEUES() )
881     {
882 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
883         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
884
885         // Garbage collection can release some new threads due to
886         // either (a) finalizers or (b) threads resurrected because
887         // they are unreachable and will therefore be sent an
888         // exception.  Any threads thus released will be immediately
889         // runnable.
890         GarbageCollect(GetRoots,rtsTrue);
891         if ( !EMPTY_RUN_QUEUE() ) return;
892
893 #if defined(RTS_USER_SIGNALS)
894         /* If we have user-installed signal handlers, then wait
895          * for signals to arrive rather then bombing out with a
896          * deadlock.
897          */
898         if ( anyUserHandlers() ) {
899             IF_DEBUG(scheduler, 
900                      sched_belch("still deadlocked, waiting for signals..."));
901
902             awaitUserSignals();
903
904 #if !defined(RTS_SUPPORTS_THREADS)
905             if (signals_pending()) {
906                 RELEASE_LOCK(&sched_mutex);
907                 startSignalHandlers();
908                 ACQUIRE_LOCK(&sched_mutex);
909             }
910 #endif
911
912             // either we have threads to run, or we were interrupted:
913             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
914         }
915 #endif
916
917         /* Probably a real deadlock.  Send the current main thread the
918          * Deadlock exception (or in the SMP build, send *all* main
919          * threads the deadlock exception, since none of them can make
920          * progress).
921          */
922         {
923             StgMainThread *m;
924             m = main_threads;
925             switch (m->tso->why_blocked) {
926             case BlockedOnBlackHole:
927             case BlockedOnException:
928             case BlockedOnMVar:
929                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
930                 return;
931             default:
932                 barf("deadlock: main thread blocked in a strange way");
933             }
934         }
935
936 #elif defined(RTS_SUPPORTS_THREADS)
937     // ToDo: add deadlock detection in threaded RTS
938 #elif defined(PARALLEL_HASKELL)
939     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
940 #endif
941     }
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Process an event (GRAN only)
946  * ------------------------------------------------------------------------- */
947
948 #if defined(GRAN)
949 static StgTSO *
950 scheduleProcessEvent(rtsEvent *event)
951 {
952     StgTSO *t;
953
954     if (RtsFlags.GranFlags.Light)
955       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
956
957     /* adjust time based on time-stamp */
958     if (event->time > CurrentTime[CurrentProc] &&
959         event->evttype != ContinueThread)
960       CurrentTime[CurrentProc] = event->time;
961     
962     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
963     if (!RtsFlags.GranFlags.Light)
964       handleIdlePEs();
965
966     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
967
968     /* main event dispatcher in GranSim */
969     switch (event->evttype) {
970       /* Should just be continuing execution */
971     case ContinueThread:
972       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
973       /* ToDo: check assertion
974       ASSERT(run_queue_hd != (StgTSO*)NULL &&
975              run_queue_hd != END_TSO_QUEUE);
976       */
977       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
978       if (!RtsFlags.GranFlags.DoAsyncFetch &&
979           procStatus[CurrentProc]==Fetching) {
980         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
981               CurrentTSO->id, CurrentTSO, CurrentProc);
982         goto next_thread;
983       } 
984       /* Ignore ContinueThreads for completed threads */
985       if (CurrentTSO->what_next == ThreadComplete) {
986         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
987               CurrentTSO->id, CurrentTSO, CurrentProc);
988         goto next_thread;
989       } 
990       /* Ignore ContinueThreads for threads that are being migrated */
991       if (PROCS(CurrentTSO)==Nowhere) { 
992         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
993               CurrentTSO->id, CurrentTSO, CurrentProc);
994         goto next_thread;
995       }
996       /* The thread should be at the beginning of the run queue */
997       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
998         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
999               CurrentTSO->id, CurrentTSO, CurrentProc);
1000         break; // run the thread anyway
1001       }
1002       /*
1003       new_event(proc, proc, CurrentTime[proc],
1004                 FindWork,
1005                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1006       goto next_thread; 
1007       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1008       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1009
1010     case FetchNode:
1011       do_the_fetchnode(event);
1012       goto next_thread;             /* handle next event in event queue  */
1013       
1014     case GlobalBlock:
1015       do_the_globalblock(event);
1016       goto next_thread;             /* handle next event in event queue  */
1017       
1018     case FetchReply:
1019       do_the_fetchreply(event);
1020       goto next_thread;             /* handle next event in event queue  */
1021       
1022     case UnblockThread:   /* Move from the blocked queue to the tail of */
1023       do_the_unblock(event);
1024       goto next_thread;             /* handle next event in event queue  */
1025       
1026     case ResumeThread:  /* Move from the blocked queue to the tail of */
1027       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1028       event->tso->gran.blocktime += 
1029         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1030       do_the_startthread(event);
1031       goto next_thread;             /* handle next event in event queue  */
1032       
1033     case StartThread:
1034       do_the_startthread(event);
1035       goto next_thread;             /* handle next event in event queue  */
1036       
1037     case MoveThread:
1038       do_the_movethread(event);
1039       goto next_thread;             /* handle next event in event queue  */
1040       
1041     case MoveSpark:
1042       do_the_movespark(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case FindWork:
1046       do_the_findwork(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     default:
1050       barf("Illegal event type %u\n", event->evttype);
1051     }  /* switch */
1052     
1053     /* This point was scheduler_loop in the old RTS */
1054
1055     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1056
1057     TimeOfLastEvent = CurrentTime[CurrentProc];
1058     TimeOfNextEvent = get_time_of_next_event();
1059     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1060     // CurrentTSO = ThreadQueueHd;
1061
1062     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1063                          TimeOfNextEvent));
1064
1065     if (RtsFlags.GranFlags.Light) 
1066       GranSimLight_leave_system(event, &ActiveTSO); 
1067
1068     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1069
1070     IF_DEBUG(gran, 
1071              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1072
1073     /* in a GranSim setup the TSO stays on the run queue */
1074     t = CurrentTSO;
1075     /* Take a thread from the run queue. */
1076     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1077
1078     IF_DEBUG(gran, 
1079              debugBelch("GRAN: About to run current thread, which is\n");
1080              G_TSO(t,5));
1081
1082     context_switch = 0; // turned on via GranYield, checking events and time slice
1083
1084     IF_DEBUG(gran, 
1085              DumpGranEvent(GR_SCHEDULE, t));
1086
1087     procStatus[CurrentProc] = Busy;
1088 }
1089 #endif // GRAN
1090
1091 /* ----------------------------------------------------------------------------
1092  * Send pending messages (PARALLEL_HASKELL only)
1093  * ------------------------------------------------------------------------- */
1094
1095 #if defined(PARALLEL_HASKELL)
1096 static StgTSO *
1097 scheduleSendPendingMessages(void)
1098 {
1099     StgSparkPool *pool;
1100     rtsSpark spark;
1101     StgTSO *t;
1102
1103 # if defined(PAR) // global Mem.Mgmt., omit for now
1104     if (PendingFetches != END_BF_QUEUE) {
1105         processFetches();
1106     }
1107 # endif
1108     
1109     if (RtsFlags.ParFlags.BufferTime) {
1110         // if we use message buffering, we must send away all message
1111         // packets which have become too old...
1112         sendOldBuffers(); 
1113     }
1114 }
1115 #endif
1116
1117 /* ----------------------------------------------------------------------------
1118  * Activate spark threads (PARALLEL_HASKELL only)
1119  * ------------------------------------------------------------------------- */
1120
1121 #if defined(PARALLEL_HASKELL)
1122 static void
1123 scheduleActivateSpark(void)
1124 {
1125 #if defined(SPARKS)
1126   ASSERT(EMPTY_RUN_QUEUE());
1127 /* We get here if the run queue is empty and want some work.
1128    We try to turn a spark into a thread, and add it to the run queue,
1129    from where it will be picked up in the next iteration of the scheduler
1130    loop.
1131 */
1132
1133       /* :-[  no local threads => look out for local sparks */
1134       /* the spark pool for the current PE */
1135       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1136       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1137           pool->hd < pool->tl) {
1138         /* 
1139          * ToDo: add GC code check that we really have enough heap afterwards!!
1140          * Old comment:
1141          * If we're here (no runnable threads) and we have pending
1142          * sparks, we must have a space problem.  Get enough space
1143          * to turn one of those pending sparks into a
1144          * thread... 
1145          */
1146
1147         spark = findSpark(rtsFalse);            /* get a spark */
1148         if (spark != (rtsSpark) NULL) {
1149           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1150           IF_PAR_DEBUG(fish, // schedule,
1151                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1152                              tso->id, tso, advisory_thread_count));
1153
1154           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1155             IF_PAR_DEBUG(fish, // schedule,
1156                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1157                             spark));
1158             return rtsFalse; /* failed to generate a thread */
1159           }                  /* otherwise fall through & pick-up new tso */
1160         } else {
1161           IF_PAR_DEBUG(fish, // schedule,
1162                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1163                              spark_queue_len(pool)));
1164           return rtsFalse;  /* failed to generate a thread */
1165         }
1166         return rtsTrue;  /* success in generating a thread */
1167   } else { /* no more threads permitted or pool empty */
1168     return rtsFalse;  /* failed to generateThread */
1169   }
1170 #else
1171   tso = NULL; // avoid compiler warning only
1172   return rtsFalse;  /* dummy in non-PAR setup */
1173 #endif // SPARKS
1174 }
1175 #endif // PARALLEL_HASKELL
1176
1177 /* ----------------------------------------------------------------------------
1178  * Get work from a remote node (PARALLEL_HASKELL only)
1179  * ------------------------------------------------------------------------- */
1180     
1181 #if defined(PARALLEL_HASKELL)
1182 static rtsBool
1183 scheduleGetRemoteWork(rtsBool *receivedFinish)
1184 {
1185   ASSERT(EMPTY_RUN_QUEUE());
1186
1187   if (RtsFlags.ParFlags.BufferTime) {
1188         IF_PAR_DEBUG(verbose, 
1189                 debugBelch("...send all pending data,"));
1190         {
1191           nat i;
1192           for (i=1; i<=nPEs; i++)
1193             sendImmediately(i); // send all messages away immediately
1194         }
1195   }
1196 # ifndef SPARKS
1197         //++EDEN++ idle() , i.e. send all buffers, wait for work
1198         // suppress fishing in EDEN... just look for incoming messages
1199         // (blocking receive)
1200   IF_PAR_DEBUG(verbose, 
1201                debugBelch("...wait for incoming messages...\n"));
1202   *receivedFinish = processMessages(); // blocking receive...
1203
1204         // and reenter scheduling loop after having received something
1205         // (return rtsFalse below)
1206
1207 # else /* activate SPARKS machinery */
1208 /* We get here, if we have no work, tried to activate a local spark, but still
1209    have no work. We try to get a remote spark, by sending a FISH message.
1210    Thread migration should be added here, and triggered when a sequence of 
1211    fishes returns without work. */
1212         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1213
1214       /* =8-[  no local sparks => look for work on other PEs */
1215         /*
1216          * We really have absolutely no work.  Send out a fish
1217          * (there may be some out there already), and wait for
1218          * something to arrive.  We clearly can't run any threads
1219          * until a SCHEDULE or RESUME arrives, and so that's what
1220          * we're hoping to see.  (Of course, we still have to
1221          * respond to other types of messages.)
1222          */
1223         rtsTime now = msTime() /*CURRENT_TIME*/;
1224         IF_PAR_DEBUG(verbose, 
1225                      debugBelch("--  now=%ld\n", now));
1226         IF_PAR_DEBUG(fish, // verbose,
1227              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1228                  (last_fish_arrived_at!=0 &&
1229                   last_fish_arrived_at+delay > now)) {
1230                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1231                      now, last_fish_arrived_at+delay, 
1232                      last_fish_arrived_at,
1233                      delay);
1234              });
1235   
1236         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1237             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1238           if (last_fish_arrived_at==0 ||
1239               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1240             /* outstandingFishes is set in sendFish, processFish;
1241                avoid flooding system with fishes via delay */
1242     next_fish_to_send_at = 0;  
1243   } else {
1244     /* ToDo: this should be done in the main scheduling loop to avoid the
1245              busy wait here; not so bad if fish delay is very small  */
1246     int iq = 0; // DEBUGGING -- HWL
1247     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1248     /* send a fish when ready, but process messages that arrive in the meantime */
1249     do {
1250       if (PacketsWaiting()) {
1251         iq++; // DEBUGGING
1252         *receivedFinish = processMessages();
1253       }
1254       now = msTime();
1255     } while (!*receivedFinish || now<next_fish_to_send_at);
1256     // JB: This means the fish could become obsolete, if we receive
1257     // work. Better check for work again? 
1258     // last line: while (!receivedFinish || !haveWork || now<...)
1259     // next line: if (receivedFinish || haveWork )
1260
1261     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1262       return rtsFalse;  // NB: this will leave scheduler loop
1263                         // immediately after return!
1264                           
1265     IF_PAR_DEBUG(fish, // verbose,
1266                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1267
1268   }
1269
1270     // JB: IMHO, this should all be hidden inside sendFish(...)
1271     /* pe = choosePE(); 
1272        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1273                 NEW_FISH_HUNGER);
1274
1275     // Global statistics: count no. of fishes
1276     if (RtsFlags.ParFlags.ParStats.Global &&
1277          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1278            globalParStats.tot_fish_mess++;
1279            }
1280     */ 
1281
1282   /* delayed fishes must have been sent by now! */
1283   next_fish_to_send_at = 0;  
1284   }
1285       
1286   *receivedFinish = processMessages();
1287 # endif /* SPARKS */
1288
1289  return rtsFalse;
1290  /* NB: this function always returns rtsFalse, meaning the scheduler
1291     loop continues with the next iteration; 
1292     rationale: 
1293       return code means success in finding work; we enter this function
1294       if there is no local work, thus have to send a fish which takes
1295       time until it arrives with work; in the meantime we should process
1296       messages in the main loop;
1297  */
1298 }
1299 #endif // PARALLEL_HASKELL
1300
1301 /* ----------------------------------------------------------------------------
1302  * PAR/GRAN: Report stats & debugging info(?)
1303  * ------------------------------------------------------------------------- */
1304
1305 #if defined(PAR) || defined(GRAN)
1306 static void
1307 scheduleGranParReport(void)
1308 {
1309   ASSERT(run_queue_hd != END_TSO_QUEUE);
1310
1311   /* Take a thread from the run queue, if we have work */
1312   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1313
1314     /* If this TSO has got its outport closed in the meantime, 
1315      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1316      * It has to be marked as TH_DEAD for this purpose.
1317      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1318
1319 JB: TODO: investigate wether state change field could be nuked
1320      entirely and replaced by the normal tso state (whatnext
1321      field). All we want to do is to kill tsos from outside.
1322      */
1323
1324     /* ToDo: write something to the log-file
1325     if (RTSflags.ParFlags.granSimStats && !sameThread)
1326         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1327
1328     CurrentTSO = t;
1329     */
1330     /* the spark pool for the current PE */
1331     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1332
1333     IF_DEBUG(scheduler, 
1334              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1335                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1336
1337     IF_PAR_DEBUG(fish,
1338              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1339                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1340
1341     if (RtsFlags.ParFlags.ParStats.Full && 
1342         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1343         (emitSchedule || // forced emit
1344          (t && LastTSO && t->id != LastTSO->id))) {
1345       /* 
1346          we are running a different TSO, so write a schedule event to log file
1347          NB: If we use fair scheduling we also have to write  a deschedule 
1348              event for LastTSO; with unfair scheduling we know that the
1349              previous tso has blocked whenever we switch to another tso, so
1350              we don't need it in GUM for now
1351       */
1352       IF_PAR_DEBUG(fish, // schedule,
1353                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1354
1355       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1356                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1357       emitSchedule = rtsFalse;
1358     }
1359 }     
1360 #endif
1361
1362 /* ----------------------------------------------------------------------------
1363  * After running a thread...
1364  * ASSUMES: sched_mutex
1365  * ------------------------------------------------------------------------- */
1366
1367 static void
1368 schedulePostRunThread(void)
1369 {
1370 #if defined(PAR)
1371     /* HACK 675: if the last thread didn't yield, make sure to print a 
1372        SCHEDULE event to the log file when StgRunning the next thread, even
1373        if it is the same one as before */
1374     LastTSO = t; 
1375     TimeOfLastYield = CURRENT_TIME;
1376 #endif
1377
1378   /* some statistics gathering in the parallel case */
1379
1380 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1381   switch (ret) {
1382     case HeapOverflow:
1383 # if defined(GRAN)
1384       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1385       globalGranStats.tot_heapover++;
1386 # elif defined(PAR)
1387       globalParStats.tot_heapover++;
1388 # endif
1389       break;
1390
1391      case StackOverflow:
1392 # if defined(GRAN)
1393       IF_DEBUG(gran, 
1394                DumpGranEvent(GR_DESCHEDULE, t));
1395       globalGranStats.tot_stackover++;
1396 # elif defined(PAR)
1397       // IF_DEBUG(par, 
1398       // DumpGranEvent(GR_DESCHEDULE, t);
1399       globalParStats.tot_stackover++;
1400 # endif
1401       break;
1402
1403     case ThreadYielding:
1404 # if defined(GRAN)
1405       IF_DEBUG(gran, 
1406                DumpGranEvent(GR_DESCHEDULE, t));
1407       globalGranStats.tot_yields++;
1408 # elif defined(PAR)
1409       // IF_DEBUG(par, 
1410       // DumpGranEvent(GR_DESCHEDULE, t);
1411       globalParStats.tot_yields++;
1412 # endif
1413       break; 
1414
1415     case ThreadBlocked:
1416 # if defined(GRAN)
1417       IF_DEBUG(scheduler,
1418                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1419                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1420                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1421                if (t->block_info.closure!=(StgClosure*)NULL)
1422                  print_bq(t->block_info.closure);
1423                debugBelch("\n"));
1424
1425       // ??? needed; should emit block before
1426       IF_DEBUG(gran, 
1427                DumpGranEvent(GR_DESCHEDULE, t)); 
1428       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1429       /*
1430         ngoq Dogh!
1431       ASSERT(procStatus[CurrentProc]==Busy || 
1432               ((procStatus[CurrentProc]==Fetching) && 
1433               (t->block_info.closure!=(StgClosure*)NULL)));
1434       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1435           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1436             procStatus[CurrentProc]==Fetching)) 
1437         procStatus[CurrentProc] = Idle;
1438       */
1439 # elif defined(PAR)
1440 //++PAR++  blockThread() writes the event (change?)
1441 # endif
1442     break;
1443
1444   case ThreadFinished:
1445     break;
1446
1447   default:
1448     barf("parGlobalStats: unknown return code");
1449     break;
1450     }
1451 #endif
1452 }
1453
1454 /* -----------------------------------------------------------------------------
1455  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1456  * ASSUMES: sched_mutex
1457  * -------------------------------------------------------------------------- */
1458
1459 static rtsBool
1460 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1461 {
1462     // did the task ask for a large block?
1463     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1464         // if so, get one and push it on the front of the nursery.
1465         bdescr *bd;
1466         lnat blocks;
1467         
1468         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1469         
1470         IF_DEBUG(scheduler,
1471                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1472                             (long)t->id, whatNext_strs[t->what_next], blocks));
1473         
1474         // don't do this if it would push us over the
1475         // alloc_blocks_lim limit; we'll GC first.
1476         if (alloc_blocks + blocks < alloc_blocks_lim) {
1477             
1478             alloc_blocks += blocks;
1479             bd = allocGroup( blocks );
1480             
1481             // link the new group into the list
1482             bd->link = cap->r.rCurrentNursery;
1483             bd->u.back = cap->r.rCurrentNursery->u.back;
1484             if (cap->r.rCurrentNursery->u.back != NULL) {
1485                 cap->r.rCurrentNursery->u.back->link = bd;
1486             } else {
1487                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1488                        g0s0->blocks == cap->r.rNursery);
1489                 cap->r.rNursery = g0s0->blocks = bd;
1490             }             
1491             cap->r.rCurrentNursery->u.back = bd;
1492             
1493             // initialise it as a nursery block.  We initialise the
1494             // step, gen_no, and flags field of *every* sub-block in
1495             // this large block, because this is easier than making
1496             // sure that we always find the block head of a large
1497             // block whenever we call Bdescr() (eg. evacuate() and
1498             // isAlive() in the GC would both have to do this, at
1499             // least).
1500             { 
1501                 bdescr *x;
1502                 for (x = bd; x < bd + blocks; x++) {
1503                     x->step = g0s0;
1504                     x->gen_no = 0;
1505                     x->flags = 0;
1506                 }
1507             }
1508             
1509             // don't forget to update the block count in g0s0.
1510             g0s0->n_blocks += blocks;
1511             // This assert can be a killer if the app is doing lots
1512             // of large block allocations.
1513             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1514             
1515             // now update the nursery to point to the new block
1516             cap->r.rCurrentNursery = bd;
1517             
1518             // we might be unlucky and have another thread get on the
1519             // run queue before us and steal the large block, but in that
1520             // case the thread will just end up requesting another large
1521             // block.
1522             PUSH_ON_RUN_QUEUE(t);
1523             return rtsFalse;  /* not actually GC'ing */
1524         }
1525     }
1526     
1527     /* make all the running tasks block on a condition variable,
1528      * maybe set context_switch and wait till they all pile in,
1529      * then have them wait on a GC condition variable.
1530      */
1531     IF_DEBUG(scheduler,
1532              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1533                         (long)t->id, whatNext_strs[t->what_next]));
1534     threadPaused(t);
1535 #if defined(GRAN)
1536     ASSERT(!is_on_queue(t,CurrentProc));
1537 #elif defined(PARALLEL_HASKELL)
1538     /* Currently we emit a DESCHEDULE event before GC in GUM.
1539        ToDo: either add separate event to distinguish SYSTEM time from rest
1540        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1541     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1542         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1543                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1544         emitSchedule = rtsTrue;
1545     }
1546 #endif
1547       
1548     PUSH_ON_RUN_QUEUE(t);
1549     return rtsTrue;
1550     /* actual GC is done at the end of the while loop in schedule() */
1551 }
1552
1553 /* -----------------------------------------------------------------------------
1554  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1555  * ASSUMES: sched_mutex
1556  * -------------------------------------------------------------------------- */
1557
1558 static void
1559 scheduleHandleStackOverflow( StgTSO *t)
1560 {
1561     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1562                                   (long)t->id, whatNext_strs[t->what_next]));
1563     /* just adjust the stack for this thread, then pop it back
1564      * on the run queue.
1565      */
1566     threadPaused(t);
1567     { 
1568         /* enlarge the stack */
1569         StgTSO *new_t = threadStackOverflow(t);
1570         
1571         /* This TSO has moved, so update any pointers to it from the
1572          * main thread stack.  It better not be on any other queues...
1573          * (it shouldn't be).
1574          */
1575         if (t->main != NULL) {
1576             t->main->tso = new_t;
1577         }
1578         PUSH_ON_RUN_QUEUE(new_t);
1579     }
1580 }
1581
1582 /* -----------------------------------------------------------------------------
1583  * Handle a thread that returned to the scheduler with ThreadYielding
1584  * ASSUMES: sched_mutex
1585  * -------------------------------------------------------------------------- */
1586
1587 static rtsBool
1588 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1589 {
1590     // Reset the context switch flag.  We don't do this just before
1591     // running the thread, because that would mean we would lose ticks
1592     // during GC, which can lead to unfair scheduling (a thread hogs
1593     // the CPU because the tick always arrives during GC).  This way
1594     // penalises threads that do a lot of allocation, but that seems
1595     // better than the alternative.
1596     context_switch = 0;
1597     
1598     /* put the thread back on the run queue.  Then, if we're ready to
1599      * GC, check whether this is the last task to stop.  If so, wake
1600      * up the GC thread.  getThread will block during a GC until the
1601      * GC is finished.
1602      */
1603     IF_DEBUG(scheduler,
1604              if (t->what_next != prev_what_next) {
1605                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1606                             (long)t->id, whatNext_strs[t->what_next]);
1607              } else {
1608                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1609                             (long)t->id, whatNext_strs[t->what_next]);
1610              }
1611         );
1612     
1613     IF_DEBUG(sanity,
1614              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1615              checkTSO(t));
1616     ASSERT(t->link == END_TSO_QUEUE);
1617     
1618     // Shortcut if we're just switching evaluators: don't bother
1619     // doing stack squeezing (which can be expensive), just run the
1620     // thread.
1621     if (t->what_next != prev_what_next) {
1622         return rtsTrue;
1623     }
1624     
1625     threadPaused(t);
1626     
1627 #if defined(GRAN)
1628     ASSERT(!is_on_queue(t,CurrentProc));
1629       
1630     IF_DEBUG(sanity,
1631              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1632              checkThreadQsSanity(rtsTrue));
1633
1634 #endif
1635
1636     addToRunQueue(t);
1637
1638 #if defined(GRAN)
1639     /* add a ContinueThread event to actually process the thread */
1640     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1641               ContinueThread,
1642               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1643     IF_GRAN_DEBUG(bq, 
1644                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1645                   G_EVENTQ(0);
1646                   G_CURR_THREADQ(0));
1647 #endif
1648     return rtsFalse;
1649 }
1650
1651 /* -----------------------------------------------------------------------------
1652  * Handle a thread that returned to the scheduler with ThreadBlocked
1653  * ASSUMES: sched_mutex
1654  * -------------------------------------------------------------------------- */
1655
1656 static void
1657 scheduleHandleThreadBlocked( StgTSO *t
1658 #if !defined(GRAN) && !defined(DEBUG)
1659     STG_UNUSED
1660 #endif
1661     )
1662 {
1663 #if defined(GRAN)
1664     IF_DEBUG(scheduler,
1665              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1666                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1667              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1668     
1669     // ??? needed; should emit block before
1670     IF_DEBUG(gran, 
1671              DumpGranEvent(GR_DESCHEDULE, t)); 
1672     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1673     /*
1674       ngoq Dogh!
1675       ASSERT(procStatus[CurrentProc]==Busy || 
1676       ((procStatus[CurrentProc]==Fetching) && 
1677       (t->block_info.closure!=(StgClosure*)NULL)));
1678       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1679       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1680       procStatus[CurrentProc]==Fetching)) 
1681       procStatus[CurrentProc] = Idle;
1682     */
1683 #elif defined(PAR)
1684     IF_DEBUG(scheduler,
1685              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1686                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1687     IF_PAR_DEBUG(bq,
1688                  
1689                  if (t->block_info.closure!=(StgClosure*)NULL) 
1690                  print_bq(t->block_info.closure));
1691     
1692     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1693     blockThread(t);
1694     
1695     /* whatever we schedule next, we must log that schedule */
1696     emitSchedule = rtsTrue;
1697     
1698 #else /* !GRAN */
1699       /* don't need to do anything.  Either the thread is blocked on
1700        * I/O, in which case we'll have called addToBlockedQueue
1701        * previously, or it's blocked on an MVar or Blackhole, in which
1702        * case it'll be on the relevant queue already.
1703        */
1704     ASSERT(t->why_blocked != NotBlocked);
1705     IF_DEBUG(scheduler,
1706              debugBelch("--<< thread %d (%s) stopped: ", 
1707                         t->id, whatNext_strs[t->what_next]);
1708              printThreadBlockage(t);
1709              debugBelch("\n"));
1710     
1711     /* Only for dumping event to log file 
1712        ToDo: do I need this in GranSim, too?
1713        blockThread(t);
1714     */
1715 #endif
1716 }
1717
1718 /* -----------------------------------------------------------------------------
1719  * Handle a thread that returned to the scheduler with ThreadFinished
1720  * ASSUMES: sched_mutex
1721  * -------------------------------------------------------------------------- */
1722
1723 static rtsBool
1724 scheduleHandleThreadFinished( StgMainThread *mainThread
1725                               USED_WHEN_RTS_SUPPORTS_THREADS,
1726                               Capability *cap,
1727                               StgTSO *t )
1728 {
1729     /* Need to check whether this was a main thread, and if so,
1730      * return with the return value.
1731      *
1732      * We also end up here if the thread kills itself with an
1733      * uncaught exception, see Exception.cmm.
1734      */
1735     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1736                                   t->id, whatNext_strs[t->what_next]));
1737
1738 #if defined(GRAN)
1739       endThread(t, CurrentProc); // clean-up the thread
1740 #elif defined(PARALLEL_HASKELL)
1741       /* For now all are advisory -- HWL */
1742       //if(t->priority==AdvisoryPriority) ??
1743       advisory_thread_count--; // JB: Caution with this counter, buggy!
1744       
1745 # if defined(DIST)
1746       if(t->dist.priority==RevalPriority)
1747         FinishReval(t);
1748 # endif
1749     
1750 # if defined(EDENOLD)
1751       // the thread could still have an outport... (BUG)
1752       if (t->eden.outport != -1) {
1753       // delete the outport for the tso which has finished...
1754         IF_PAR_DEBUG(eden_ports,
1755                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1756                               t->eden.outport, t->id));
1757         deleteOPT(t);
1758       }
1759       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1760       if (t->eden.epid != -1) {
1761         IF_PAR_DEBUG(eden_ports,
1762                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1763                            t->id, t->eden.epid));
1764         removeTSOfromProcess(t);
1765       }
1766 # endif 
1767
1768 # if defined(PAR)
1769       if (RtsFlags.ParFlags.ParStats.Full &&
1770           !RtsFlags.ParFlags.ParStats.Suppressed) 
1771         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1772
1773       //  t->par only contains statistics: left out for now...
1774       IF_PAR_DEBUG(fish,
1775                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1776                               t->id,t,t->par.sparkname));
1777 # endif
1778 #endif // PARALLEL_HASKELL
1779
1780       //
1781       // Check whether the thread that just completed was a main
1782       // thread, and if so return with the result.  
1783       //
1784       // There is an assumption here that all thread completion goes
1785       // through this point; we need to make sure that if a thread
1786       // ends up in the ThreadKilled state, that it stays on the run
1787       // queue so it can be dealt with here.
1788       //
1789       if (
1790 #if defined(RTS_SUPPORTS_THREADS)
1791           mainThread != NULL
1792 #else
1793           mainThread->tso == t
1794 #endif
1795           )
1796       {
1797           // We are a bound thread: this must be our thread that just
1798           // completed.
1799           ASSERT(mainThread->tso == t);
1800
1801           if (t->what_next == ThreadComplete) {
1802               if (mainThread->ret) {
1803                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1804                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1805               }
1806               mainThread->stat = Success;
1807           } else {
1808               if (mainThread->ret) {
1809                   *(mainThread->ret) = NULL;
1810               }
1811               if (interrupted) {
1812                   mainThread->stat = Interrupted;
1813               } else {
1814                   mainThread->stat = Killed;
1815               }
1816           }
1817 #ifdef DEBUG
1818           removeThreadLabel((StgWord)mainThread->tso->id);
1819 #endif
1820           if (mainThread->prev == NULL) {
1821               main_threads = mainThread->link;
1822           } else {
1823               mainThread->prev->link = mainThread->link;
1824           }
1825           if (mainThread->link != NULL) {
1826               mainThread->link->prev = NULL;
1827           }
1828           releaseCapability(cap);
1829           return rtsTrue; // tells schedule() to return
1830       }
1831
1832 #ifdef RTS_SUPPORTS_THREADS
1833       ASSERT(t->main == NULL);
1834 #else
1835       if (t->main != NULL) {
1836           // Must be a main thread that is not the topmost one.  Leave
1837           // it on the run queue until the stack has unwound to the
1838           // point where we can deal with this.  Leaving it on the run
1839           // queue also ensures that the garbage collector knows about
1840           // this thread and its return value (it gets dropped from the
1841           // all_threads list so there's no other way to find it).
1842           APPEND_TO_RUN_QUEUE(t);
1843       }
1844 #endif
1845       return rtsFalse;
1846 }
1847
1848 /* -----------------------------------------------------------------------------
1849  * Perform a heap census, if PROFILING
1850  * -------------------------------------------------------------------------- */
1851
1852 static void
1853 scheduleDoHeapProfile(void)
1854 {
1855 #ifdef PROFILING
1856     // When we have +RTS -i0 and we're heap profiling, do a census at
1857     // every GC.  This lets us get repeatable runs for debugging.
1858     if (performHeapProfile ||
1859         (RtsFlags.ProfFlags.profileInterval==0 &&
1860          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1861         GarbageCollect(GetRoots, rtsTrue);
1862         heapCensus();
1863         performHeapProfile = rtsFalse;
1864         ready_to_gc = rtsFalse; // we already GC'd
1865     }
1866 #endif
1867 }
1868
1869 /* -----------------------------------------------------------------------------
1870  * Perform a garbage collection if necessary
1871  * ASSUMES: sched_mutex
1872  * -------------------------------------------------------------------------- */
1873
1874 static void
1875 scheduleDoGC(void)
1876 {
1877     StgTSO *t;
1878
1879 #ifdef SMP
1880     // The last task to stop actually gets to do the GC.  The rest
1881     // of the tasks release their capabilities and wait gc_pending_cond.
1882     if (ready_to_gc && allFreeCapabilities())
1883 #else
1884     if (ready_to_gc)
1885 #endif
1886     {
1887         /* Kick any transactions which are invalid back to their
1888          * atomically frames.  When next scheduled they will try to
1889          * commit, this commit will fail and they will retry.
1890          */
1891         for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1892             if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1893                 if (!stmValidateTransaction (t -> trec)) {
1894                     IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1895                     
1896                     // strip the stack back to the ATOMICALLY_FRAME, aborting
1897                     // the (nested) transaction, and saving the stack of any
1898                     // partially-evaluated thunks on the heap.
1899                     raiseAsync_(t, NULL, rtsTrue);
1900                     
1901 #ifdef REG_R1
1902                     ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1903 #endif
1904                 }
1905             }
1906         }
1907
1908         // so this happens periodically:
1909         scheduleCheckBlackHoles();
1910
1911         /* everybody back, start the GC.
1912          * Could do it in this thread, or signal a condition var
1913          * to do it in another thread.  Either way, we need to
1914          * broadcast on gc_pending_cond afterward.
1915          */
1916 #if defined(RTS_SUPPORTS_THREADS)
1917         IF_DEBUG(scheduler,sched_belch("doing GC"));
1918 #endif
1919         GarbageCollect(GetRoots,rtsFalse);
1920         ready_to_gc = rtsFalse;
1921 #if defined(SMP)
1922         broadcastCondition(&gc_pending_cond);
1923 #endif
1924 #if defined(GRAN)
1925         /* add a ContinueThread event to continue execution of current thread */
1926         new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1927                   ContinueThread,
1928                   t, (StgClosure*)NULL, (rtsSpark*)NULL);
1929         IF_GRAN_DEBUG(bq, 
1930                       debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1931                       G_EVENTQ(0);
1932                       G_CURR_THREADQ(0));
1933 #endif /* GRAN */
1934     }
1935 }
1936
1937 /* ---------------------------------------------------------------------------
1938  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1939  * used by Control.Concurrent for error checking.
1940  * ------------------------------------------------------------------------- */
1941  
1942 StgBool
1943 rtsSupportsBoundThreads(void)
1944 {
1945 #ifdef THREADED_RTS
1946   return rtsTrue;
1947 #else
1948   return rtsFalse;
1949 #endif
1950 }
1951
1952 /* ---------------------------------------------------------------------------
1953  * isThreadBound(tso): check whether tso is bound to an OS thread.
1954  * ------------------------------------------------------------------------- */
1955  
1956 StgBool
1957 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1958 {
1959 #ifdef THREADED_RTS
1960   return (tso->main != NULL);
1961 #endif
1962   return rtsFalse;
1963 }
1964
1965 /* ---------------------------------------------------------------------------
1966  * Singleton fork(). Do not copy any running threads.
1967  * ------------------------------------------------------------------------- */
1968
1969 #ifndef mingw32_HOST_OS
1970 #define FORKPROCESS_PRIMOP_SUPPORTED
1971 #endif
1972
1973 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1974 static void 
1975 deleteThreadImmediately(StgTSO *tso);
1976 #endif
1977 StgInt
1978 forkProcess(HsStablePtr *entry
1979 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1980             STG_UNUSED
1981 #endif
1982            )
1983 {
1984 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1985   pid_t pid;
1986   StgTSO* t,*next;
1987   StgMainThread *m;
1988   SchedulerStatus rc;
1989
1990   IF_DEBUG(scheduler,sched_belch("forking!"));
1991   rts_lock(); // This not only acquires sched_mutex, it also
1992               // makes sure that no other threads are running
1993
1994   pid = fork();
1995
1996   if (pid) { /* parent */
1997
1998   /* just return the pid */
1999     rts_unlock();
2000     return pid;
2001     
2002   } else { /* child */
2003     
2004     
2005       // delete all threads
2006     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2007     
2008     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2009       next = t->link;
2010
2011         // don't allow threads to catch the ThreadKilled exception
2012       deleteThreadImmediately(t);
2013     }
2014     
2015       // wipe the main thread list
2016     while((m = main_threads) != NULL) {
2017       main_threads = m->link;
2018 # ifdef THREADED_RTS
2019       closeCondition(&m->bound_thread_cond);
2020 # endif
2021       stgFree(m);
2022     }
2023     
2024     rc = rts_evalStableIO(entry, NULL);  // run the action
2025     rts_checkSchedStatus("forkProcess",rc);
2026     
2027     rts_unlock();
2028     
2029     hs_exit();                      // clean up and exit
2030     stg_exit(0);
2031   }
2032 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2033   barf("forkProcess#: primop not supported, sorry!\n");
2034   return -1;
2035 #endif
2036 }
2037
2038 /* ---------------------------------------------------------------------------
2039  * deleteAllThreads():  kill all the live threads.
2040  *
2041  * This is used when we catch a user interrupt (^C), before performing
2042  * any necessary cleanups and running finalizers.
2043  *
2044  * Locks: sched_mutex held.
2045  * ------------------------------------------------------------------------- */
2046    
2047 void
2048 deleteAllThreads ( void )
2049 {
2050   StgTSO* t, *next;
2051   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2052   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2053       next = t->global_link;
2054       deleteThread(t);
2055   }      
2056
2057   // The run queue now contains a bunch of ThreadKilled threads.  We
2058   // must not throw these away: the main thread(s) will be in there
2059   // somewhere, and the main scheduler loop has to deal with it.
2060   // Also, the run queue is the only thing keeping these threads from
2061   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2062
2063   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2064   ASSERT(blackhole_queue == END_TSO_QUEUE);
2065   ASSERT(sleeping_queue == END_TSO_QUEUE);
2066 }
2067
2068 /* startThread and  insertThread are now in GranSim.c -- HWL */
2069
2070
2071 /* ---------------------------------------------------------------------------
2072  * Suspending & resuming Haskell threads.
2073  * 
2074  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2075  * its capability before calling the C function.  This allows another
2076  * task to pick up the capability and carry on running Haskell
2077  * threads.  It also means that if the C call blocks, it won't lock
2078  * the whole system.
2079  *
2080  * The Haskell thread making the C call is put to sleep for the
2081  * duration of the call, on the susepended_ccalling_threads queue.  We
2082  * give out a token to the task, which it can use to resume the thread
2083  * on return from the C function.
2084  * ------------------------------------------------------------------------- */
2085    
2086 StgInt
2087 suspendThread( StgRegTable *reg )
2088 {
2089   nat tok;
2090   Capability *cap;
2091   int saved_errno = errno;
2092
2093   /* assume that *reg is a pointer to the StgRegTable part
2094    * of a Capability.
2095    */
2096   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2097
2098   ACQUIRE_LOCK(&sched_mutex);
2099
2100   IF_DEBUG(scheduler,
2101            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2102
2103   // XXX this might not be necessary --SDM
2104   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2105
2106   threadPaused(cap->r.rCurrentTSO);
2107   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2108   suspended_ccalling_threads = cap->r.rCurrentTSO;
2109
2110   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2111       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2112       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2113   } else {
2114       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2115   }
2116
2117   /* Use the thread ID as the token; it should be unique */
2118   tok = cap->r.rCurrentTSO->id;
2119
2120   /* Hand back capability */
2121   releaseCapability(cap);
2122   
2123 #if defined(RTS_SUPPORTS_THREADS)
2124   /* Preparing to leave the RTS, so ensure there's a native thread/task
2125      waiting to take over.
2126   */
2127   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2128 #endif
2129
2130   in_haskell = rtsFalse;
2131   RELEASE_LOCK(&sched_mutex);
2132   
2133   errno = saved_errno;
2134   return tok; 
2135 }
2136
2137 StgRegTable *
2138 resumeThread( StgInt tok )
2139 {
2140   StgTSO *tso, **prev;
2141   Capability *cap;
2142   int saved_errno = errno;
2143
2144 #if defined(RTS_SUPPORTS_THREADS)
2145   /* Wait for permission to re-enter the RTS with the result. */
2146   ACQUIRE_LOCK(&sched_mutex);
2147   waitForReturnCapability(&sched_mutex, &cap);
2148
2149   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2150 #else
2151   grabCapability(&cap);
2152 #endif
2153
2154   /* Remove the thread off of the suspended list */
2155   prev = &suspended_ccalling_threads;
2156   for (tso = suspended_ccalling_threads; 
2157        tso != END_TSO_QUEUE; 
2158        prev = &tso->link, tso = tso->link) {
2159     if (tso->id == (StgThreadID)tok) {
2160       *prev = tso->link;
2161       break;
2162     }
2163   }
2164   if (tso == END_TSO_QUEUE) {
2165     barf("resumeThread: thread not found");
2166   }
2167   tso->link = END_TSO_QUEUE;
2168   
2169   if(tso->why_blocked == BlockedOnCCall) {
2170       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2171       tso->blocked_exceptions = NULL;
2172   }
2173   
2174   /* Reset blocking status */
2175   tso->why_blocked  = NotBlocked;
2176
2177   cap->r.rCurrentTSO = tso;
2178   in_haskell = rtsTrue;
2179   RELEASE_LOCK(&sched_mutex);
2180   errno = saved_errno;
2181   return &cap->r;
2182 }
2183
2184 /* ---------------------------------------------------------------------------
2185  * Comparing Thread ids.
2186  *
2187  * This is used from STG land in the implementation of the
2188  * instances of Eq/Ord for ThreadIds.
2189  * ------------------------------------------------------------------------ */
2190
2191 int
2192 cmp_thread(StgPtr tso1, StgPtr tso2) 
2193
2194   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2195   StgThreadID id2 = ((StgTSO *)tso2)->id;
2196  
2197   if (id1 < id2) return (-1);
2198   if (id1 > id2) return 1;
2199   return 0;
2200 }
2201
2202 /* ---------------------------------------------------------------------------
2203  * Fetching the ThreadID from an StgTSO.
2204  *
2205  * This is used in the implementation of Show for ThreadIds.
2206  * ------------------------------------------------------------------------ */
2207 int
2208 rts_getThreadId(StgPtr tso) 
2209 {
2210   return ((StgTSO *)tso)->id;
2211 }
2212
2213 #ifdef DEBUG
2214 void
2215 labelThread(StgPtr tso, char *label)
2216 {
2217   int len;
2218   void *buf;
2219
2220   /* Caveat: Once set, you can only set the thread name to "" */
2221   len = strlen(label)+1;
2222   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2223   strncpy(buf,label,len);
2224   /* Update will free the old memory for us */
2225   updateThreadLabel(((StgTSO *)tso)->id,buf);
2226 }
2227 #endif /* DEBUG */
2228
2229 /* ---------------------------------------------------------------------------
2230    Create a new thread.
2231
2232    The new thread starts with the given stack size.  Before the
2233    scheduler can run, however, this thread needs to have a closure
2234    (and possibly some arguments) pushed on its stack.  See
2235    pushClosure() in Schedule.h.
2236
2237    createGenThread() and createIOThread() (in SchedAPI.h) are
2238    convenient packaged versions of this function.
2239
2240    currently pri (priority) is only used in a GRAN setup -- HWL
2241    ------------------------------------------------------------------------ */
2242 #if defined(GRAN)
2243 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2244 StgTSO *
2245 createThread(nat size, StgInt pri)
2246 #else
2247 StgTSO *
2248 createThread(nat size)
2249 #endif
2250 {
2251
2252     StgTSO *tso;
2253     nat stack_size;
2254
2255     /* First check whether we should create a thread at all */
2256 #if defined(PARALLEL_HASKELL)
2257   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2258   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2259     threadsIgnored++;
2260     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2261           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2262     return END_TSO_QUEUE;
2263   }
2264   threadsCreated++;
2265 #endif
2266
2267 #if defined(GRAN)
2268   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2269 #endif
2270
2271   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2272
2273   /* catch ridiculously small stack sizes */
2274   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2275     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2276   }
2277
2278   stack_size = size - TSO_STRUCT_SIZEW;
2279
2280   tso = (StgTSO *)allocate(size);
2281   TICK_ALLOC_TSO(stack_size, 0);
2282
2283   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2284 #if defined(GRAN)
2285   SET_GRAN_HDR(tso, ThisPE);
2286 #endif
2287
2288   // Always start with the compiled code evaluator
2289   tso->what_next = ThreadRunGHC;
2290
2291   tso->id = next_thread_id++; 
2292   tso->why_blocked  = NotBlocked;
2293   tso->blocked_exceptions = NULL;
2294
2295   tso->saved_errno = 0;
2296   tso->main = NULL;
2297   
2298   tso->stack_size   = stack_size;
2299   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2300                               - TSO_STRUCT_SIZEW;
2301   tso->sp           = (P_)&(tso->stack) + stack_size;
2302
2303   tso->trec = NO_TREC;
2304
2305 #ifdef PROFILING
2306   tso->prof.CCCS = CCS_MAIN;
2307 #endif
2308
2309   /* put a stop frame on the stack */
2310   tso->sp -= sizeofW(StgStopFrame);
2311   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2312   tso->link = END_TSO_QUEUE;
2313
2314   // ToDo: check this
2315 #if defined(GRAN)
2316   /* uses more flexible routine in GranSim */
2317   insertThread(tso, CurrentProc);
2318 #else
2319   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2320    * from its creation
2321    */
2322 #endif
2323
2324 #if defined(GRAN) 
2325   if (RtsFlags.GranFlags.GranSimStats.Full) 
2326     DumpGranEvent(GR_START,tso);
2327 #elif defined(PARALLEL_HASKELL)
2328   if (RtsFlags.ParFlags.ParStats.Full) 
2329     DumpGranEvent(GR_STARTQ,tso);
2330   /* HACk to avoid SCHEDULE 
2331      LastTSO = tso; */
2332 #endif
2333
2334   /* Link the new thread on the global thread list.
2335    */
2336   tso->global_link = all_threads;
2337   all_threads = tso;
2338
2339 #if defined(DIST)
2340   tso->dist.priority = MandatoryPriority; //by default that is...
2341 #endif
2342
2343 #if defined(GRAN)
2344   tso->gran.pri = pri;
2345 # if defined(DEBUG)
2346   tso->gran.magic = TSO_MAGIC; // debugging only
2347 # endif
2348   tso->gran.sparkname   = 0;
2349   tso->gran.startedat   = CURRENT_TIME; 
2350   tso->gran.exported    = 0;
2351   tso->gran.basicblocks = 0;
2352   tso->gran.allocs      = 0;
2353   tso->gran.exectime    = 0;
2354   tso->gran.fetchtime   = 0;
2355   tso->gran.fetchcount  = 0;
2356   tso->gran.blocktime   = 0;
2357   tso->gran.blockcount  = 0;
2358   tso->gran.blockedat   = 0;
2359   tso->gran.globalsparks = 0;
2360   tso->gran.localsparks  = 0;
2361   if (RtsFlags.GranFlags.Light)
2362     tso->gran.clock  = Now; /* local clock */
2363   else
2364     tso->gran.clock  = 0;
2365
2366   IF_DEBUG(gran,printTSO(tso));
2367 #elif defined(PARALLEL_HASKELL)
2368 # if defined(DEBUG)
2369   tso->par.magic = TSO_MAGIC; // debugging only
2370 # endif
2371   tso->par.sparkname   = 0;
2372   tso->par.startedat   = CURRENT_TIME; 
2373   tso->par.exported    = 0;
2374   tso->par.basicblocks = 0;
2375   tso->par.allocs      = 0;
2376   tso->par.exectime    = 0;
2377   tso->par.fetchtime   = 0;
2378   tso->par.fetchcount  = 0;
2379   tso->par.blocktime   = 0;
2380   tso->par.blockcount  = 0;
2381   tso->par.blockedat   = 0;
2382   tso->par.globalsparks = 0;
2383   tso->par.localsparks  = 0;
2384 #endif
2385
2386 #if defined(GRAN)
2387   globalGranStats.tot_threads_created++;
2388   globalGranStats.threads_created_on_PE[CurrentProc]++;
2389   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2390   globalGranStats.tot_sq_probes++;
2391 #elif defined(PARALLEL_HASKELL)
2392   // collect parallel global statistics (currently done together with GC stats)
2393   if (RtsFlags.ParFlags.ParStats.Global &&
2394       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2395     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2396     globalParStats.tot_threads_created++;
2397   }
2398 #endif 
2399
2400 #if defined(GRAN)
2401   IF_GRAN_DEBUG(pri,
2402                 sched_belch("==__ schedule: Created TSO %d (%p);",
2403                       CurrentProc, tso, tso->id));
2404 #elif defined(PARALLEL_HASKELL)
2405   IF_PAR_DEBUG(verbose,
2406                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2407                            (long)tso->id, tso, advisory_thread_count));
2408 #else
2409   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2410                                  (long)tso->id, (long)tso->stack_size));
2411 #endif    
2412   return tso;
2413 }
2414
2415 #if defined(PAR)
2416 /* RFP:
2417    all parallel thread creation calls should fall through the following routine.
2418 */
2419 StgTSO *
2420 createThreadFromSpark(rtsSpark spark) 
2421 { StgTSO *tso;
2422   ASSERT(spark != (rtsSpark)NULL);
2423 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2424   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2425   { threadsIgnored++;
2426     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2427           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2428     return END_TSO_QUEUE;
2429   }
2430   else
2431   { threadsCreated++;
2432     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2433     if (tso==END_TSO_QUEUE)     
2434       barf("createSparkThread: Cannot create TSO");
2435 #if defined(DIST)
2436     tso->priority = AdvisoryPriority;
2437 #endif
2438     pushClosure(tso,spark);
2439     addToRunQueue(tso);
2440     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2441   }
2442   return tso;
2443 }
2444 #endif
2445
2446 /*
2447   Turn a spark into a thread.
2448   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2449 */
2450 #if 0
2451 StgTSO *
2452 activateSpark (rtsSpark spark) 
2453 {
2454   StgTSO *tso;
2455
2456   tso = createSparkThread(spark);
2457   if (RtsFlags.ParFlags.ParStats.Full) {   
2458     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2459       IF_PAR_DEBUG(verbose,
2460                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2461                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2462   }
2463   // ToDo: fwd info on local/global spark to thread -- HWL
2464   // tso->gran.exported =  spark->exported;
2465   // tso->gran.locked =   !spark->global;
2466   // tso->gran.sparkname = spark->name;
2467
2468   return tso;
2469 }
2470 #endif
2471
2472 /* ---------------------------------------------------------------------------
2473  * scheduleThread()
2474  *
2475  * scheduleThread puts a thread on the head of the runnable queue.
2476  * This will usually be done immediately after a thread is created.
2477  * The caller of scheduleThread must create the thread using e.g.
2478  * createThread and push an appropriate closure
2479  * on this thread's stack before the scheduler is invoked.
2480  * ------------------------------------------------------------------------ */
2481
2482 static void
2483 scheduleThread_(StgTSO *tso)
2484 {
2485   // The thread goes at the *end* of the run-queue, to avoid possible
2486   // starvation of any threads already on the queue.
2487   APPEND_TO_RUN_QUEUE(tso);
2488   threadRunnable();
2489 }
2490
2491 void
2492 scheduleThread(StgTSO* tso)
2493 {
2494   ACQUIRE_LOCK(&sched_mutex);
2495   scheduleThread_(tso);
2496   RELEASE_LOCK(&sched_mutex);
2497 }
2498
2499 #if defined(RTS_SUPPORTS_THREADS)
2500 static Condition bound_cond_cache;
2501 static int bound_cond_cache_full = 0;
2502 #endif
2503
2504
2505 SchedulerStatus
2506 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2507                    Capability *initialCapability)
2508 {
2509     // Precondition: sched_mutex must be held
2510     StgMainThread *m;
2511
2512     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2513     m->tso = tso;
2514     tso->main = m;
2515     m->ret = ret;
2516     m->stat = NoStatus;
2517     m->link = main_threads;
2518     m->prev = NULL;
2519     if (main_threads != NULL) {
2520         main_threads->prev = m;
2521     }
2522     main_threads = m;
2523
2524 #if defined(RTS_SUPPORTS_THREADS)
2525     // Allocating a new condition for each thread is expensive, so we
2526     // cache one.  This is a pretty feeble hack, but it helps speed up
2527     // consecutive call-ins quite a bit.
2528     if (bound_cond_cache_full) {
2529         m->bound_thread_cond = bound_cond_cache;
2530         bound_cond_cache_full = 0;
2531     } else {
2532         initCondition(&m->bound_thread_cond);
2533     }
2534 #endif
2535
2536     /* Put the thread on the main-threads list prior to scheduling the TSO.
2537        Failure to do so introduces a race condition in the MT case (as
2538        identified by Wolfgang Thaller), whereby the new task/OS thread 
2539        created by scheduleThread_() would complete prior to the thread
2540        that spawned it managed to put 'itself' on the main-threads list.
2541        The upshot of it all being that the worker thread wouldn't get to
2542        signal the completion of the its work item for the main thread to
2543        see (==> it got stuck waiting.)    -- sof 6/02.
2544     */
2545     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2546     
2547     APPEND_TO_RUN_QUEUE(tso);
2548     // NB. Don't call threadRunnable() here, because the thread is
2549     // bound and only runnable by *this* OS thread, so waking up other
2550     // workers will just slow things down.
2551
2552     return waitThread_(m, initialCapability);
2553 }
2554
2555 /* ---------------------------------------------------------------------------
2556  * initScheduler()
2557  *
2558  * Initialise the scheduler.  This resets all the queues - if the
2559  * queues contained any threads, they'll be garbage collected at the
2560  * next pass.
2561  *
2562  * ------------------------------------------------------------------------ */
2563
2564 void 
2565 initScheduler(void)
2566 {
2567 #if defined(GRAN)
2568   nat i;
2569
2570   for (i=0; i<=MAX_PROC; i++) {
2571     run_queue_hds[i]      = END_TSO_QUEUE;
2572     run_queue_tls[i]      = END_TSO_QUEUE;
2573     blocked_queue_hds[i]  = END_TSO_QUEUE;
2574     blocked_queue_tls[i]  = END_TSO_QUEUE;
2575     ccalling_threadss[i]  = END_TSO_QUEUE;
2576     blackhole_queue[i]    = END_TSO_QUEUE;
2577     sleeping_queue        = END_TSO_QUEUE;
2578   }
2579 #else
2580   run_queue_hd      = END_TSO_QUEUE;
2581   run_queue_tl      = END_TSO_QUEUE;
2582   blocked_queue_hd  = END_TSO_QUEUE;
2583   blocked_queue_tl  = END_TSO_QUEUE;
2584   blackhole_queue   = END_TSO_QUEUE;
2585   sleeping_queue    = END_TSO_QUEUE;
2586 #endif 
2587
2588   suspended_ccalling_threads  = END_TSO_QUEUE;
2589
2590   main_threads = NULL;
2591   all_threads  = END_TSO_QUEUE;
2592
2593   context_switch = 0;
2594   interrupted    = 0;
2595
2596   RtsFlags.ConcFlags.ctxtSwitchTicks =
2597       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2598       
2599 #if defined(RTS_SUPPORTS_THREADS)
2600   /* Initialise the mutex and condition variables used by
2601    * the scheduler. */
2602   initMutex(&sched_mutex);
2603   initMutex(&term_mutex);
2604 #endif
2605   
2606   ACQUIRE_LOCK(&sched_mutex);
2607
2608   /* A capability holds the state a native thread needs in
2609    * order to execute STG code. At least one capability is
2610    * floating around (only SMP builds have more than one).
2611    */
2612   initCapabilities();
2613   
2614 #if defined(RTS_SUPPORTS_THREADS)
2615   initTaskManager();
2616 #endif
2617
2618 #if defined(SMP)
2619   /* eagerly start some extra workers */
2620   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2621 #endif
2622
2623 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2624   initSparkPools();
2625 #endif
2626
2627   RELEASE_LOCK(&sched_mutex);
2628 }
2629
2630 void
2631 exitScheduler( void )
2632 {
2633     interrupted = rtsTrue;
2634     shutting_down_scheduler = rtsTrue;
2635 #if defined(RTS_SUPPORTS_THREADS)
2636     if (threadIsTask(osThreadId())) { taskStop(); }
2637     stopTaskManager();
2638 #endif
2639 }
2640
2641 /* ----------------------------------------------------------------------------
2642    Managing the per-task allocation areas.
2643    
2644    Each capability comes with an allocation area.  These are
2645    fixed-length block lists into which allocation can be done.
2646
2647    ToDo: no support for two-space collection at the moment???
2648    ------------------------------------------------------------------------- */
2649
2650 static SchedulerStatus
2651 waitThread_(StgMainThread* m, Capability *initialCapability)
2652 {
2653   SchedulerStatus stat;
2654
2655   // Precondition: sched_mutex must be held.
2656   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2657
2658 #if defined(GRAN)
2659   /* GranSim specific init */
2660   CurrentTSO = m->tso;                // the TSO to run
2661   procStatus[MainProc] = Busy;        // status of main PE
2662   CurrentProc = MainProc;             // PE to run it on
2663   schedule(m,initialCapability);
2664 #else
2665   schedule(m,initialCapability);
2666   ASSERT(m->stat != NoStatus);
2667 #endif
2668
2669   stat = m->stat;
2670
2671 #if defined(RTS_SUPPORTS_THREADS)
2672   // Free the condition variable, returning it to the cache if possible.
2673   if (!bound_cond_cache_full) {
2674       bound_cond_cache = m->bound_thread_cond;
2675       bound_cond_cache_full = 1;
2676   } else {
2677       closeCondition(&m->bound_thread_cond);
2678   }
2679 #endif
2680
2681   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2682   stgFree(m);
2683
2684   // Postcondition: sched_mutex still held
2685   return stat;
2686 }
2687
2688 /* ---------------------------------------------------------------------------
2689    Where are the roots that we know about?
2690
2691         - all the threads on the runnable queue
2692         - all the threads on the blocked queue
2693         - all the threads on the sleeping queue
2694         - all the thread currently executing a _ccall_GC
2695         - all the "main threads"
2696      
2697    ------------------------------------------------------------------------ */
2698
2699 /* This has to be protected either by the scheduler monitor, or by the
2700         garbage collection monitor (probably the latter).
2701         KH @ 25/10/99
2702 */
2703
2704 void
2705 GetRoots( evac_fn evac )
2706 {
2707 #if defined(GRAN)
2708   {
2709     nat i;
2710     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2711       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2712           evac((StgClosure **)&run_queue_hds[i]);
2713       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2714           evac((StgClosure **)&run_queue_tls[i]);
2715       
2716       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2717           evac((StgClosure **)&blocked_queue_hds[i]);
2718       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2719           evac((StgClosure **)&blocked_queue_tls[i]);
2720       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2721           evac((StgClosure **)&ccalling_threads[i]);
2722     }
2723   }
2724
2725   markEventQueue();
2726
2727 #else /* !GRAN */
2728   if (run_queue_hd != END_TSO_QUEUE) {
2729       ASSERT(run_queue_tl != END_TSO_QUEUE);
2730       evac((StgClosure **)&run_queue_hd);
2731       evac((StgClosure **)&run_queue_tl);
2732   }
2733   
2734   if (blocked_queue_hd != END_TSO_QUEUE) {
2735       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2736       evac((StgClosure **)&blocked_queue_hd);
2737       evac((StgClosure **)&blocked_queue_tl);
2738   }
2739   
2740   if (sleeping_queue != END_TSO_QUEUE) {
2741       evac((StgClosure **)&sleeping_queue);
2742   }
2743 #endif 
2744
2745   if (blackhole_queue != END_TSO_QUEUE) {
2746       evac((StgClosure **)&blackhole_queue);
2747   }
2748
2749   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2750       evac((StgClosure **)&suspended_ccalling_threads);
2751   }
2752
2753 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2754   markSparkQueue(evac);
2755 #endif
2756
2757 #if defined(RTS_USER_SIGNALS)
2758   // mark the signal handlers (signals should be already blocked)
2759   markSignalHandlers(evac);
2760 #endif
2761 }
2762
2763 /* -----------------------------------------------------------------------------
2764    performGC
2765
2766    This is the interface to the garbage collector from Haskell land.
2767    We provide this so that external C code can allocate and garbage
2768    collect when called from Haskell via _ccall_GC.
2769
2770    It might be useful to provide an interface whereby the programmer
2771    can specify more roots (ToDo).
2772    
2773    This needs to be protected by the GC condition variable above.  KH.
2774    -------------------------------------------------------------------------- */
2775
2776 static void (*extra_roots)(evac_fn);
2777
2778 void
2779 performGC(void)
2780 {
2781   /* Obligated to hold this lock upon entry */
2782   ACQUIRE_LOCK(&sched_mutex);
2783   GarbageCollect(GetRoots,rtsFalse);
2784   RELEASE_LOCK(&sched_mutex);
2785 }
2786
2787 void
2788 performMajorGC(void)
2789 {
2790   ACQUIRE_LOCK(&sched_mutex);
2791   GarbageCollect(GetRoots,rtsTrue);
2792   RELEASE_LOCK(&sched_mutex);
2793 }
2794
2795 static void
2796 AllRoots(evac_fn evac)
2797 {
2798     GetRoots(evac);             // the scheduler's roots
2799     extra_roots(evac);          // the user's roots
2800 }
2801
2802 void
2803 performGCWithRoots(void (*get_roots)(evac_fn))
2804 {
2805   ACQUIRE_LOCK(&sched_mutex);
2806   extra_roots = get_roots;
2807   GarbageCollect(AllRoots,rtsFalse);
2808   RELEASE_LOCK(&sched_mutex);
2809 }
2810
2811 /* -----------------------------------------------------------------------------
2812    Stack overflow
2813
2814    If the thread has reached its maximum stack size, then raise the
2815    StackOverflow exception in the offending thread.  Otherwise
2816    relocate the TSO into a larger chunk of memory and adjust its stack
2817    size appropriately.
2818    -------------------------------------------------------------------------- */
2819
2820 static StgTSO *
2821 threadStackOverflow(StgTSO *tso)
2822 {
2823   nat new_stack_size, stack_words;
2824   lnat new_tso_size;
2825   StgPtr new_sp;
2826   StgTSO *dest;
2827
2828   IF_DEBUG(sanity,checkTSO(tso));
2829   if (tso->stack_size >= tso->max_stack_size) {
2830
2831     IF_DEBUG(gc,
2832              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2833                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2834              /* If we're debugging, just print out the top of the stack */
2835              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2836                                               tso->sp+64)));
2837
2838     /* Send this thread the StackOverflow exception */
2839     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2840     return tso;
2841   }
2842
2843   /* Try to double the current stack size.  If that takes us over the
2844    * maximum stack size for this thread, then use the maximum instead.
2845    * Finally round up so the TSO ends up as a whole number of blocks.
2846    */
2847   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2848   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2849                                        TSO_STRUCT_SIZE)/sizeof(W_);
2850   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2851   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2852
2853   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2854
2855   dest = (StgTSO *)allocate(new_tso_size);
2856   TICK_ALLOC_TSO(new_stack_size,0);
2857
2858   /* copy the TSO block and the old stack into the new area */
2859   memcpy(dest,tso,TSO_STRUCT_SIZE);
2860   stack_words = tso->stack + tso->stack_size - tso->sp;
2861   new_sp = (P_)dest + new_tso_size - stack_words;
2862   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2863
2864   /* relocate the stack pointers... */
2865   dest->sp         = new_sp;
2866   dest->stack_size = new_stack_size;
2867         
2868   /* Mark the old TSO as relocated.  We have to check for relocated
2869    * TSOs in the garbage collector and any primops that deal with TSOs.
2870    *
2871    * It's important to set the sp value to just beyond the end
2872    * of the stack, so we don't attempt to scavenge any part of the
2873    * dead TSO's stack.
2874    */
2875   tso->what_next = ThreadRelocated;
2876   tso->link = dest;
2877   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2878   tso->why_blocked = NotBlocked;
2879
2880   IF_PAR_DEBUG(verbose,
2881                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2882                      tso->id, tso, tso->stack_size);
2883                /* If we're debugging, just print out the top of the stack */
2884                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2885                                                 tso->sp+64)));
2886   
2887   IF_DEBUG(sanity,checkTSO(tso));
2888 #if 0
2889   IF_DEBUG(scheduler,printTSO(dest));
2890 #endif
2891
2892   return dest;
2893 }
2894
2895 /* ---------------------------------------------------------------------------
2896    Wake up a queue that was blocked on some resource.
2897    ------------------------------------------------------------------------ */
2898
2899 #if defined(GRAN)
2900 STATIC_INLINE void
2901 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2902 {
2903 }
2904 #elif defined(PARALLEL_HASKELL)
2905 STATIC_INLINE void
2906 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2907 {
2908   /* write RESUME events to log file and
2909      update blocked and fetch time (depending on type of the orig closure) */
2910   if (RtsFlags.ParFlags.ParStats.Full) {
2911     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2912                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2913                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2914     if (EMPTY_RUN_QUEUE())
2915       emitSchedule = rtsTrue;
2916
2917     switch (get_itbl(node)->type) {
2918         case FETCH_ME_BQ:
2919           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2920           break;
2921         case RBH:
2922         case FETCH_ME:
2923         case BLACKHOLE_BQ:
2924           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2925           break;
2926 #ifdef DIST
2927         case MVAR:
2928           break;
2929 #endif    
2930         default:
2931           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2932         }
2933       }
2934 }
2935 #endif
2936
2937 #if defined(GRAN)
2938 static StgBlockingQueueElement *
2939 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2940 {
2941     StgTSO *tso;
2942     PEs node_loc, tso_loc;
2943
2944     node_loc = where_is(node); // should be lifted out of loop
2945     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2946     tso_loc = where_is((StgClosure *)tso);
2947     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2948       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2949       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2950       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2951       // insertThread(tso, node_loc);
2952       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2953                 ResumeThread,
2954                 tso, node, (rtsSpark*)NULL);
2955       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2956       // len_local++;
2957       // len++;
2958     } else { // TSO is remote (actually should be FMBQ)
2959       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2960                                   RtsFlags.GranFlags.Costs.gunblocktime +
2961                                   RtsFlags.GranFlags.Costs.latency;
2962       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2963                 UnblockThread,
2964                 tso, node, (rtsSpark*)NULL);
2965       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2966       // len++;
2967     }
2968     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2969     IF_GRAN_DEBUG(bq,
2970                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2971                           (node_loc==tso_loc ? "Local" : "Global"), 
2972                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2973     tso->block_info.closure = NULL;
2974     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2975                              tso->id, tso));
2976 }
2977 #elif defined(PARALLEL_HASKELL)
2978 static StgBlockingQueueElement *
2979 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2980 {
2981     StgBlockingQueueElement *next;
2982
2983     switch (get_itbl(bqe)->type) {
2984     case TSO:
2985       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2986       /* if it's a TSO just push it onto the run_queue */
2987       next = bqe->link;
2988       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2989       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2990       threadRunnable();
2991       unblockCount(bqe, node);
2992       /* reset blocking status after dumping event */
2993       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2994       break;
2995
2996     case BLOCKED_FETCH:
2997       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2998       next = bqe->link;
2999       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3000       PendingFetches = (StgBlockedFetch *)bqe;
3001       break;
3002
3003 # if defined(DEBUG)
3004       /* can ignore this case in a non-debugging setup; 
3005          see comments on RBHSave closures above */
3006     case CONSTR:
3007       /* check that the closure is an RBHSave closure */
3008       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3009              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3010              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3011       break;
3012
3013     default:
3014       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3015            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3016            (StgClosure *)bqe);
3017 # endif
3018     }
3019   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3020   return next;
3021 }
3022
3023 #else /* !GRAN && !PARALLEL_HASKELL */
3024 static StgTSO *
3025 unblockOneLocked(StgTSO *tso)
3026 {
3027   StgTSO *next;
3028
3029   ASSERT(get_itbl(tso)->type == TSO);
3030   ASSERT(tso->why_blocked != NotBlocked);
3031   tso->why_blocked = NotBlocked;
3032   next = tso->link;
3033   tso->link = END_TSO_QUEUE;
3034   APPEND_TO_RUN_QUEUE(tso);
3035   threadRunnable();
3036   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3037   return next;
3038 }
3039 #endif
3040
3041 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3042 INLINE_ME StgBlockingQueueElement *
3043 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3044 {
3045   ACQUIRE_LOCK(&sched_mutex);
3046   bqe = unblockOneLocked(bqe, node);
3047   RELEASE_LOCK(&sched_mutex);
3048   return bqe;
3049 }
3050 #else
3051 INLINE_ME StgTSO *
3052 unblockOne(StgTSO *tso)
3053 {
3054   ACQUIRE_LOCK(&sched_mutex);
3055   tso = unblockOneLocked(tso);
3056   RELEASE_LOCK(&sched_mutex);
3057   return tso;
3058 }
3059 #endif
3060
3061 #if defined(GRAN)
3062 void 
3063 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3064 {
3065   StgBlockingQueueElement *bqe;
3066   PEs node_loc;
3067   nat len = 0; 
3068
3069   IF_GRAN_DEBUG(bq, 
3070                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3071                       node, CurrentProc, CurrentTime[CurrentProc], 
3072                       CurrentTSO->id, CurrentTSO));
3073
3074   node_loc = where_is(node);
3075
3076   ASSERT(q == END_BQ_QUEUE ||
3077          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3078          get_itbl(q)->type == CONSTR); // closure (type constructor)
3079   ASSERT(is_unique(node));
3080
3081   /* FAKE FETCH: magically copy the node to the tso's proc;
3082      no Fetch necessary because in reality the node should not have been 
3083      moved to the other PE in the first place
3084   */
3085   if (CurrentProc!=node_loc) {
3086     IF_GRAN_DEBUG(bq, 
3087                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3088                         node, node_loc, CurrentProc, CurrentTSO->id, 
3089                         // CurrentTSO, where_is(CurrentTSO),
3090                         node->header.gran.procs));
3091     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3092     IF_GRAN_DEBUG(bq, 
3093                   debugBelch("## new bitmask of node %p is %#x\n",
3094                         node, node->header.gran.procs));
3095     if (RtsFlags.GranFlags.GranSimStats.Global) {
3096       globalGranStats.tot_fake_fetches++;
3097     }
3098   }
3099
3100   bqe = q;
3101   // ToDo: check: ASSERT(CurrentProc==node_loc);
3102   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3103     //next = bqe->link;
3104     /* 
3105        bqe points to the current element in the queue
3106        next points to the next element in the queue
3107     */
3108     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3109     //tso_loc = where_is(tso);
3110     len++;
3111     bqe = unblockOneLocked(bqe, node);
3112   }
3113
3114   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3115      the closure to make room for the anchor of the BQ */
3116   if (bqe!=END_BQ_QUEUE) {
3117     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3118     /*
3119     ASSERT((info_ptr==&RBH_Save_0_info) ||
3120            (info_ptr==&RBH_Save_1_info) ||
3121            (info_ptr==&RBH_Save_2_info));
3122     */
3123     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3124     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3125     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3126
3127     IF_GRAN_DEBUG(bq,
3128                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3129                         node, info_type(node)));
3130   }
3131
3132   /* statistics gathering */
3133   if (RtsFlags.GranFlags.GranSimStats.Global) {
3134     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3135     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3136     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3137     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3138   }
3139   IF_GRAN_DEBUG(bq,
3140                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3141                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3142 }
3143 #elif defined(PARALLEL_HASKELL)
3144 void 
3145 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3146 {
3147   StgBlockingQueueElement *bqe;
3148
3149   ACQUIRE_LOCK(&sched_mutex);
3150
3151   IF_PAR_DEBUG(verbose, 
3152                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3153                      node, mytid));
3154 #ifdef DIST  
3155   //RFP
3156   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3157     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3158     return;
3159   }
3160 #endif
3161   
3162   ASSERT(q == END_BQ_QUEUE ||
3163          get_itbl(q)->type == TSO ||           
3164          get_itbl(q)->type == BLOCKED_FETCH || 
3165          get_itbl(q)->type == CONSTR); 
3166
3167   bqe = q;
3168   while (get_itbl(bqe)->type==TSO || 
3169          get_itbl(bqe)->type==BLOCKED_FETCH) {
3170     bqe = unblockOneLocked(bqe, node);
3171   }
3172   RELEASE_LOCK(&sched_mutex);
3173 }
3174
3175 #else   /* !GRAN && !PARALLEL_HASKELL */
3176
3177 void
3178 awakenBlockedQueueNoLock(StgTSO *tso)
3179 {
3180   while (tso != END_TSO_QUEUE) {
3181     tso = unblockOneLocked(tso);
3182   }
3183 }
3184
3185 void
3186 awakenBlockedQueue(StgTSO *tso)
3187 {
3188   ACQUIRE_LOCK(&sched_mutex);
3189   while (tso != END_TSO_QUEUE) {
3190     tso = unblockOneLocked(tso);
3191   }
3192   RELEASE_LOCK(&sched_mutex);
3193 }
3194 #endif
3195
3196 /* ---------------------------------------------------------------------------
3197    Interrupt execution
3198    - usually called inside a signal handler so it mustn't do anything fancy.   
3199    ------------------------------------------------------------------------ */
3200
3201 void
3202 interruptStgRts(void)
3203 {
3204     interrupted    = 1;
3205     context_switch = 1;
3206 }
3207
3208 /* -----------------------------------------------------------------------------
3209    Unblock a thread
3210
3211    This is for use when we raise an exception in another thread, which
3212    may be blocked.
3213    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3214    -------------------------------------------------------------------------- */
3215
3216 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3217 /*
3218   NB: only the type of the blocking queue is different in GranSim and GUM
3219       the operations on the queue-elements are the same
3220       long live polymorphism!
3221
3222   Locks: sched_mutex is held upon entry and exit.
3223
3224 */
3225 static void
3226 unblockThread(StgTSO *tso)
3227 {
3228   StgBlockingQueueElement *t, **last;
3229
3230   switch (tso->why_blocked) {
3231
3232   case NotBlocked:
3233     return;  /* not blocked */
3234
3235   case BlockedOnSTM:
3236     // Be careful: nothing to do here!  We tell the scheduler that the thread
3237     // is runnable and we leave it to the stack-walking code to abort the 
3238     // transaction while unwinding the stack.  We should perhaps have a debugging
3239     // test to make sure that this really happens and that the 'zombie' transaction
3240     // does not get committed.
3241     goto done;
3242
3243   case BlockedOnMVar:
3244     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3245     {
3246       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3247       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3248
3249       last = (StgBlockingQueueElement **)&mvar->head;
3250       for (t = (StgBlockingQueueElement *)mvar->head; 
3251            t != END_BQ_QUEUE; 
3252            last = &t->link, last_tso = t, t = t->link) {
3253         if (t == (StgBlockingQueueElement *)tso) {
3254           *last = (StgBlockingQueueElement *)tso->link;
3255           if (mvar->tail == tso) {
3256             mvar->tail = (StgTSO *)last_tso;
3257           }
3258           goto done;
3259         }
3260       }
3261       barf("unblockThread (MVAR): TSO not found");
3262     }
3263
3264   case BlockedOnBlackHole:
3265     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3266     {
3267       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3268
3269       last = &bq->blocking_queue;
3270       for (t = bq->blocking_queue; 
3271            t != END_BQ_QUEUE; 
3272            last = &t->link, t = t->link) {
3273         if (t == (StgBlockingQueueElement *)tso) {
3274           *last = (StgBlockingQueueElement *)tso->link;
3275           goto done;
3276         }
3277       }
3278       barf("unblockThread (BLACKHOLE): TSO not found");
3279     }
3280
3281   case BlockedOnException:
3282     {
3283       StgTSO *target  = tso->block_info.tso;
3284
3285       ASSERT(get_itbl(target)->type == TSO);
3286
3287       if (target->what_next == ThreadRelocated) {
3288           target = target->link;
3289           ASSERT(get_itbl(target)->type == TSO);
3290       }
3291
3292       ASSERT(target->blocked_exceptions != NULL);
3293
3294       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3295       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3296            t != END_BQ_QUEUE; 
3297            last = &t->link, t = t->link) {
3298         ASSERT(get_itbl(t)->type == TSO);
3299         if (t == (StgBlockingQueueElement *)tso) {
3300           *last = (StgBlockingQueueElement *)tso->link;
3301           goto done;
3302         }
3303       }
3304       barf("unblockThread (Exception): TSO not found");
3305     }
3306
3307   case BlockedOnRead:
3308   case BlockedOnWrite:
3309 #if defined(mingw32_HOST_OS)
3310   case BlockedOnDoProc:
3311 #endif
3312     {
3313       /* take TSO off blocked_queue */
3314       StgBlockingQueueElement *prev = NULL;
3315       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3316            prev = t, t = t->link) {
3317         if (t == (StgBlockingQueueElement *)tso) {
3318           if (prev == NULL) {
3319             blocked_queue_hd = (StgTSO *)t->link;
3320             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3321               blocked_queue_tl = END_TSO_QUEUE;
3322             }
3323           } else {
3324             prev->link = t->link;
3325             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3326               blocked_queue_tl = (StgTSO *)prev;
3327             }
3328           }
3329           goto done;
3330         }
3331       }
3332       barf("unblockThread (I/O): TSO not found");
3333     }
3334
3335   case BlockedOnDelay:
3336     {
3337       /* take TSO off sleeping_queue */
3338       StgBlockingQueueElement *prev = NULL;
3339       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3340            prev = t, t = t->link) {
3341         if (t == (StgBlockingQueueElement *)tso) {
3342           if (prev == NULL) {
3343             sleeping_queue = (StgTSO *)t->link;
3344           } else {
3345             prev->link = t->link;
3346           }
3347           goto done;
3348         }
3349       }
3350       barf("unblockThread (delay): TSO not found");
3351     }
3352
3353   default:
3354     barf("unblockThread");
3355   }
3356
3357  done:
3358   tso->link = END_TSO_QUEUE;
3359   tso->why_blocked = NotBlocked;
3360   tso->block_info.closure = NULL;
3361   PUSH_ON_RUN_QUEUE(tso);
3362 }
3363 #else
3364 static void
3365 unblockThread(StgTSO *tso)
3366 {
3367   StgTSO *t, **last;
3368   
3369   /* To avoid locking unnecessarily. */
3370   if (tso->why_blocked == NotBlocked) {
3371     return;
3372   }
3373
3374   switch (tso->why_blocked) {
3375
3376   case BlockedOnSTM:
3377     // Be careful: nothing to do here!  We tell the scheduler that the thread
3378     // is runnable and we leave it to the stack-walking code to abort the 
3379     // transaction while unwinding the stack.  We should perhaps have a debugging
3380     // test to make sure that this really happens and that the 'zombie' transaction
3381     // does not get committed.
3382     goto done;
3383
3384   case BlockedOnMVar:
3385     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3386     {
3387       StgTSO *last_tso = END_TSO_QUEUE;
3388       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3389
3390       last = &mvar->head;
3391       for (t = mvar->head; t != END_TSO_QUEUE; 
3392            last = &t->link, last_tso = t, t = t->link) {
3393         if (t == tso) {
3394           *last = tso->link;
3395           if (mvar->tail == tso) {
3396             mvar->tail = last_tso;
3397           }
3398           goto done;
3399         }
3400       }
3401       barf("unblockThread (MVAR): TSO not found");
3402     }
3403
3404   case BlockedOnBlackHole:
3405     {
3406       last = &blackhole_queue;
3407       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3408            last = &t->link, t = t->link) {
3409         if (t == tso) {
3410           *last = tso->link;
3411           goto done;
3412         }
3413       }
3414       barf("unblockThread (BLACKHOLE): TSO not found");
3415     }
3416
3417   case BlockedOnException:
3418     {
3419       StgTSO *target  = tso->block_info.tso;
3420
3421       ASSERT(get_itbl(target)->type == TSO);
3422
3423       while (target->what_next == ThreadRelocated) {
3424           target = target->link;
3425           ASSERT(get_itbl(target)->type == TSO);
3426       }
3427       
3428       ASSERT(target->blocked_exceptions != NULL);
3429
3430       last = &target->blocked_exceptions;
3431       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3432            last = &t->link, t = t->link) {
3433         ASSERT(get_itbl(t)->type == TSO);
3434         if (t == tso) {
3435           *last = tso->link;
3436           goto done;
3437         }
3438       }
3439       barf("unblockThread (Exception): TSO not found");
3440     }
3441
3442   case BlockedOnRead:
3443   case BlockedOnWrite:
3444 #if defined(mingw32_HOST_OS)
3445   case BlockedOnDoProc:
3446 #endif
3447     {
3448       StgTSO *prev = NULL;
3449       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3450            prev = t, t = t->link) {
3451         if (t == tso) {
3452           if (prev == NULL) {
3453             blocked_queue_hd = t->link;
3454             if (blocked_queue_tl == t) {
3455               blocked_queue_tl = END_TSO_QUEUE;
3456             }
3457           } else {
3458             prev->link = t->link;
3459             if (blocked_queue_tl == t) {
3460               blocked_queue_tl = prev;
3461             }
3462           }
3463           goto done;
3464         }
3465       }
3466       barf("unblockThread (I/O): TSO not found");
3467     }
3468
3469   case BlockedOnDelay:
3470     {
3471       StgTSO *prev = NULL;
3472       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3473            prev = t, t = t->link) {
3474         if (t == tso) {
3475           if (prev == NULL) {
3476             sleeping_queue = t->link;
3477           } else {
3478             prev->link = t->link;
3479           }
3480           goto done;
3481         }
3482       }
3483       barf("unblockThread (delay): TSO not found");
3484     }
3485
3486   default:
3487     barf("unblockThread");
3488   }
3489
3490  done:
3491   tso->link = END_TSO_QUEUE;
3492   tso->why_blocked = NotBlocked;
3493   tso->block_info.closure = NULL;
3494   APPEND_TO_RUN_QUEUE(tso);
3495 }
3496 #endif
3497
3498 /* -----------------------------------------------------------------------------
3499  * checkBlackHoles()
3500  *
3501  * Check the blackhole_queue for threads that can be woken up.  We do
3502  * this periodically: before every GC, and whenever the run queue is
3503  * empty.
3504  *
3505  * An elegant solution might be to just wake up all the blocked
3506  * threads with awakenBlockedQueue occasionally: they'll go back to
3507  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3508  * doesn't give us a way to tell whether we've actually managed to
3509  * wake up any threads, so we would be busy-waiting.
3510  *
3511  * -------------------------------------------------------------------------- */
3512
3513 static rtsBool
3514 checkBlackHoles( void )
3515 {
3516     StgTSO **prev, *t;
3517     rtsBool any_woke_up = rtsFalse;
3518     StgHalfWord type;
3519
3520     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3521
3522     // ASSUMES: sched_mutex
3523     prev = &blackhole_queue;
3524     t = blackhole_queue;
3525     while (t != END_TSO_QUEUE) {
3526         ASSERT(t->why_blocked == BlockedOnBlackHole);
3527         type = get_itbl(t->block_info.closure)->type;
3528         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3529             t = unblockOneLocked(t);
3530             *prev = t;
3531             any_woke_up = rtsTrue;
3532         } else {
3533             prev = &t->link;
3534             t = t->link;
3535         }
3536     }
3537
3538     return any_woke_up;
3539 }
3540
3541 /* -----------------------------------------------------------------------------
3542  * raiseAsync()
3543  *
3544  * The following function implements the magic for raising an
3545  * asynchronous exception in an existing thread.
3546  *
3547  * We first remove the thread from any queue on which it might be
3548  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3549  *
3550  * We strip the stack down to the innermost CATCH_FRAME, building
3551  * thunks in the heap for all the active computations, so they can 
3552  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3553  * an application of the handler to the exception, and push it on
3554  * the top of the stack.
3555  * 
3556  * How exactly do we save all the active computations?  We create an
3557  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3558  * AP_STACKs pushes everything from the corresponding update frame
3559  * upwards onto the stack.  (Actually, it pushes everything up to the
3560  * next update frame plus a pointer to the next AP_STACK object.
3561  * Entering the next AP_STACK object pushes more onto the stack until we
3562  * reach the last AP_STACK object - at which point the stack should look
3563  * exactly as it did when we killed the TSO and we can continue
3564  * execution by entering the closure on top of the stack.
3565  *
3566  * We can also kill a thread entirely - this happens if either (a) the 
3567  * exception passed to raiseAsync is NULL, or (b) there's no
3568  * CATCH_FRAME on the stack.  In either case, we strip the entire
3569  * stack and replace the thread with a zombie.
3570  *
3571  * Locks: sched_mutex held upon entry nor exit.
3572  *
3573  * -------------------------------------------------------------------------- */
3574  
3575 void 
3576 deleteThread(StgTSO *tso)
3577 {
3578   if (tso->why_blocked != BlockedOnCCall &&
3579       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3580       raiseAsync(tso,NULL);
3581   }
3582 }
3583
3584 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3585 static void 
3586 deleteThreadImmediately(StgTSO *tso)
3587 { // for forkProcess only:
3588   // delete thread without giving it a chance to catch the KillThread exception
3589
3590   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3591       return;
3592   }
3593
3594   if (tso->why_blocked != BlockedOnCCall &&
3595       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3596     unblockThread(tso);
3597   }
3598
3599   tso->what_next = ThreadKilled;
3600 }
3601 #endif
3602
3603 void
3604 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3605 {
3606   /* When raising async exs from contexts where sched_mutex isn't held;
3607      use raiseAsyncWithLock(). */
3608   ACQUIRE_LOCK(&sched_mutex);
3609   raiseAsync(tso,exception);
3610   RELEASE_LOCK(&sched_mutex);
3611 }
3612
3613 void
3614 raiseAsync(StgTSO *tso, StgClosure *exception)
3615 {
3616     raiseAsync_(tso, exception, rtsFalse);
3617 }
3618
3619 static void
3620 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3621 {
3622     StgRetInfoTable *info;
3623     StgPtr sp;
3624   
3625     // Thread already dead?
3626     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3627         return;
3628     }
3629
3630     IF_DEBUG(scheduler, 
3631              sched_belch("raising exception in thread %ld.", (long)tso->id));
3632     
3633     // Remove it from any blocking queues
3634     unblockThread(tso);
3635
3636     sp = tso->sp;
3637     
3638     // The stack freezing code assumes there's a closure pointer on
3639     // the top of the stack, so we have to arrange that this is the case...
3640     //
3641     if (sp[0] == (W_)&stg_enter_info) {
3642         sp++;
3643     } else {
3644         sp--;
3645         sp[0] = (W_)&stg_dummy_ret_closure;
3646     }
3647
3648     while (1) {
3649         nat i;
3650
3651         // 1. Let the top of the stack be the "current closure"
3652         //
3653         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3654         // CATCH_FRAME.
3655         //
3656         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3657         // current closure applied to the chunk of stack up to (but not
3658         // including) the update frame.  This closure becomes the "current
3659         // closure".  Go back to step 2.
3660         //
3661         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3662         // top of the stack applied to the exception.
3663         // 
3664         // 5. If it's a STOP_FRAME, then kill the thread.
3665         // 
3666         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3667         // transaction
3668        
3669         
3670         StgPtr frame;
3671         
3672         frame = sp + 1;
3673         info = get_ret_itbl((StgClosure *)frame);
3674         
3675         while (info->i.type != UPDATE_FRAME
3676                && (info->i.type != CATCH_FRAME || exception == NULL)
3677                && info->i.type != STOP_FRAME
3678                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3679         {
3680             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3681               // IF we find an ATOMICALLY_FRAME then we abort the
3682               // current transaction and propagate the exception.  In
3683               // this case (unlike ordinary exceptions) we do not care
3684               // whether the transaction is valid or not because its
3685               // possible validity cannot have caused the exception
3686               // and will not be visible after the abort.
3687               IF_DEBUG(stm,
3688                        debugBelch("Found atomically block delivering async exception\n"));
3689               stmAbortTransaction(tso -> trec);
3690               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3691             }
3692             frame += stack_frame_sizeW((StgClosure *)frame);
3693             info = get_ret_itbl((StgClosure *)frame);
3694         }
3695         
3696         switch (info->i.type) {
3697             
3698         case ATOMICALLY_FRAME:
3699             ASSERT(stop_at_atomically);
3700             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3701             stmCondemnTransaction(tso -> trec);
3702 #ifdef REG_R1
3703             tso->sp = frame;
3704 #else
3705             // R1 is not a register: the return convention for IO in
3706             // this case puts the return value on the stack, so we
3707             // need to set up the stack to return to the atomically
3708             // frame properly...
3709             tso->sp = frame - 2;
3710             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3711             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3712 #endif
3713             tso->what_next = ThreadRunGHC;
3714             return;
3715
3716         case CATCH_FRAME:
3717             // If we find a CATCH_FRAME, and we've got an exception to raise,
3718             // then build the THUNK raise(exception), and leave it on
3719             // top of the CATCH_FRAME ready to enter.
3720             //
3721         {
3722 #ifdef PROFILING
3723             StgCatchFrame *cf = (StgCatchFrame *)frame;
3724 #endif
3725             StgClosure *raise;
3726             
3727             // we've got an exception to raise, so let's pass it to the
3728             // handler in this frame.
3729             //
3730             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3731             TICK_ALLOC_SE_THK(1,0);
3732             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3733             raise->payload[0] = exception;
3734             
3735             // throw away the stack from Sp up to the CATCH_FRAME.
3736             //
3737             sp = frame - 1;
3738             
3739             /* Ensure that async excpetions are blocked now, so we don't get
3740              * a surprise exception before we get around to executing the
3741              * handler.
3742              */
3743             if (tso->blocked_exceptions == NULL) {
3744                 tso->blocked_exceptions = END_TSO_QUEUE;
3745             }
3746             
3747             /* Put the newly-built THUNK on top of the stack, ready to execute
3748              * when the thread restarts.
3749              */
3750             sp[0] = (W_)raise;
3751             sp[-1] = (W_)&stg_enter_info;
3752             tso->sp = sp-1;
3753             tso->what_next = ThreadRunGHC;
3754             IF_DEBUG(sanity, checkTSO(tso));
3755             return;
3756         }
3757         
3758         case UPDATE_FRAME:
3759         {
3760             StgAP_STACK * ap;
3761             nat words;
3762             
3763             // First build an AP_STACK consisting of the stack chunk above the
3764             // current update frame, with the top word on the stack as the
3765             // fun field.
3766             //
3767             words = frame - sp - 1;
3768             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3769             
3770             ap->size = words;
3771             ap->fun  = (StgClosure *)sp[0];
3772             sp++;
3773             for(i=0; i < (nat)words; ++i) {
3774                 ap->payload[i] = (StgClosure *)*sp++;
3775             }
3776             
3777             SET_HDR(ap,&stg_AP_STACK_info,
3778                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3779             TICK_ALLOC_UP_THK(words+1,0);
3780             
3781             IF_DEBUG(scheduler,
3782                      debugBelch("sched: Updating ");
3783                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3784                      debugBelch(" with ");
3785                      printObj((StgClosure *)ap);
3786                 );
3787
3788             // Replace the updatee with an indirection - happily
3789             // this will also wake up any threads currently
3790             // waiting on the result.
3791             //
3792             // Warning: if we're in a loop, more than one update frame on
3793             // the stack may point to the same object.  Be careful not to
3794             // overwrite an IND_OLDGEN in this case, because we'll screw
3795             // up the mutable lists.  To be on the safe side, don't
3796             // overwrite any kind of indirection at all.  See also
3797             // threadSqueezeStack in GC.c, where we have to make a similar
3798             // check.
3799             //
3800             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3801                 // revert the black hole
3802                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3803                                (StgClosure *)ap);
3804             }
3805             sp += sizeofW(StgUpdateFrame) - 1;
3806             sp[0] = (W_)ap; // push onto stack
3807             break;
3808         }
3809         
3810         case STOP_FRAME:
3811             // We've stripped the entire stack, the thread is now dead.
3812             sp += sizeofW(StgStopFrame);
3813             tso->what_next = ThreadKilled;
3814             tso->sp = sp;
3815             return;
3816             
3817         default:
3818             barf("raiseAsync");
3819         }
3820     }
3821     barf("raiseAsync");
3822 }
3823
3824 /* -----------------------------------------------------------------------------
3825    raiseExceptionHelper
3826    
3827    This function is called by the raise# primitve, just so that we can
3828    move some of the tricky bits of raising an exception from C-- into
3829    C.  Who knows, it might be a useful re-useable thing here too.
3830    -------------------------------------------------------------------------- */
3831
3832 StgWord
3833 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3834 {
3835     StgClosure *raise_closure = NULL;
3836     StgPtr p, next;
3837     StgRetInfoTable *info;
3838     //
3839     // This closure represents the expression 'raise# E' where E
3840     // is the exception raise.  It is used to overwrite all the
3841     // thunks which are currently under evaluataion.
3842     //
3843
3844     //    
3845     // LDV profiling: stg_raise_info has THUNK as its closure
3846     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3847     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3848     // 1 does not cause any problem unless profiling is performed.
3849     // However, when LDV profiling goes on, we need to linearly scan
3850     // small object pool, where raise_closure is stored, so we should
3851     // use MIN_UPD_SIZE.
3852     //
3853     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3854     //                                 sizeofW(StgClosure)+1);
3855     //
3856
3857     //
3858     // Walk up the stack, looking for the catch frame.  On the way,
3859     // we update any closures pointed to from update frames with the
3860     // raise closure that we just built.
3861     //
3862     p = tso->sp;
3863     while(1) {
3864         info = get_ret_itbl((StgClosure *)p);
3865         next = p + stack_frame_sizeW((StgClosure *)p);
3866         switch (info->i.type) {
3867             
3868         case UPDATE_FRAME:
3869             // Only create raise_closure if we need to.
3870             if (raise_closure == NULL) {
3871                 raise_closure = 
3872                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3873                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3874                 raise_closure->payload[0] = exception;
3875             }
3876             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3877             p = next;
3878             continue;
3879
3880         case ATOMICALLY_FRAME:
3881             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3882             tso->sp = p;
3883             return ATOMICALLY_FRAME;
3884             
3885         case CATCH_FRAME:
3886             tso->sp = p;
3887             return CATCH_FRAME;
3888
3889         case CATCH_STM_FRAME:
3890             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3891             tso->sp = p;
3892             return CATCH_STM_FRAME;
3893             
3894         case STOP_FRAME:
3895             tso->sp = p;
3896             return STOP_FRAME;
3897
3898         case CATCH_RETRY_FRAME:
3899         default:
3900             p = next; 
3901             continue;
3902         }
3903     }
3904 }
3905
3906
3907 /* -----------------------------------------------------------------------------
3908    findRetryFrameHelper
3909
3910    This function is called by the retry# primitive.  It traverses the stack
3911    leaving tso->sp referring to the frame which should handle the retry.  
3912
3913    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3914    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3915
3916    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3917    despite the similar implementation.
3918
3919    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3920    not be created within memory transactions.
3921    -------------------------------------------------------------------------- */
3922
3923 StgWord
3924 findRetryFrameHelper (StgTSO *tso)
3925 {
3926   StgPtr           p, next;
3927   StgRetInfoTable *info;
3928
3929   p = tso -> sp;
3930   while (1) {
3931     info = get_ret_itbl((StgClosure *)p);
3932     next = p + stack_frame_sizeW((StgClosure *)p);
3933     switch (info->i.type) {
3934       
3935     case ATOMICALLY_FRAME:
3936       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3937       tso->sp = p;
3938       return ATOMICALLY_FRAME;
3939       
3940     case CATCH_RETRY_FRAME:
3941       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3942       tso->sp = p;
3943       return CATCH_RETRY_FRAME;
3944       
3945     case CATCH_STM_FRAME:
3946     default:
3947       ASSERT(info->i.type != CATCH_FRAME);
3948       ASSERT(info->i.type != STOP_FRAME);
3949       p = next; 
3950       continue;
3951     }
3952   }
3953 }
3954
3955 /* -----------------------------------------------------------------------------
3956    resurrectThreads is called after garbage collection on the list of
3957    threads found to be garbage.  Each of these threads will be woken
3958    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3959    on an MVar, or NonTermination if the thread was blocked on a Black
3960    Hole.
3961
3962    Locks: sched_mutex isn't held upon entry nor exit.
3963    -------------------------------------------------------------------------- */
3964
3965 void
3966 resurrectThreads( StgTSO *threads )
3967 {
3968   StgTSO *tso, *next;
3969
3970   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3971     next = tso->global_link;
3972     tso->global_link = all_threads;
3973     all_threads = tso;
3974     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3975
3976     switch (tso->why_blocked) {
3977     case BlockedOnMVar:
3978     case BlockedOnException:
3979       /* Called by GC - sched_mutex lock is currently held. */
3980       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3981       break;
3982     case BlockedOnBlackHole:
3983       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3984       break;
3985     case BlockedOnSTM:
3986       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3987       break;
3988     case NotBlocked:
3989       /* This might happen if the thread was blocked on a black hole
3990        * belonging to a thread that we've just woken up (raiseAsync
3991        * can wake up threads, remember...).
3992        */
3993       continue;
3994     default:
3995       barf("resurrectThreads: thread blocked in a strange way");
3996     }
3997   }
3998 }
3999
4000 /* ----------------------------------------------------------------------------
4001  * Debugging: why is a thread blocked
4002  * [Also provides useful information when debugging threaded programs
4003  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4004    ------------------------------------------------------------------------- */
4005
4006 static void
4007 printThreadBlockage(StgTSO *tso)
4008 {
4009   switch (tso->why_blocked) {
4010   case BlockedOnRead:
4011     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4012     break;
4013   case BlockedOnWrite:
4014     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4015     break;
4016 #if defined(mingw32_HOST_OS)
4017     case BlockedOnDoProc:
4018     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4019     break;
4020 #endif
4021   case BlockedOnDelay:
4022     debugBelch("is blocked until %ld", tso->block_info.target);
4023     break;
4024   case BlockedOnMVar:
4025     debugBelch("is blocked on an MVar");
4026     break;
4027   case BlockedOnException:
4028     debugBelch("is blocked on delivering an exception to thread %d",
4029             tso->block_info.tso->id);
4030     break;
4031   case BlockedOnBlackHole:
4032     debugBelch("is blocked on a black hole");
4033     break;
4034   case NotBlocked:
4035     debugBelch("is not blocked");
4036     break;
4037 #if defined(PARALLEL_HASKELL)
4038   case BlockedOnGA:
4039     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4040             tso->block_info.closure, info_type(tso->block_info.closure));
4041     break;
4042   case BlockedOnGA_NoSend:
4043     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4044             tso->block_info.closure, info_type(tso->block_info.closure));
4045     break;
4046 #endif
4047   case BlockedOnCCall:
4048     debugBelch("is blocked on an external call");
4049     break;
4050   case BlockedOnCCall_NoUnblockExc:
4051     debugBelch("is blocked on an external call (exceptions were already blocked)");
4052     break;
4053   case BlockedOnSTM:
4054     debugBelch("is blocked on an STM operation");
4055     break;
4056   default:
4057     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4058          tso->why_blocked, tso->id, tso);
4059   }
4060 }
4061
4062 static void
4063 printThreadStatus(StgTSO *tso)
4064 {
4065   switch (tso->what_next) {
4066   case ThreadKilled:
4067     debugBelch("has been killed");
4068     break;
4069   case ThreadComplete:
4070     debugBelch("has completed");
4071     break;
4072   default:
4073     printThreadBlockage(tso);
4074   }
4075 }
4076
4077 void
4078 printAllThreads(void)
4079 {
4080   StgTSO *t;
4081
4082 # if defined(GRAN)
4083   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4084   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4085                        time_string, rtsFalse/*no commas!*/);
4086
4087   debugBelch("all threads at [%s]:\n", time_string);
4088 # elif defined(PARALLEL_HASKELL)
4089   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4090   ullong_format_string(CURRENT_TIME,
4091                        time_string, rtsFalse/*no commas!*/);
4092
4093   debugBelch("all threads at [%s]:\n", time_string);
4094 # else
4095   debugBelch("all threads:\n");
4096 # endif
4097
4098   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4099     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4100 #if defined(DEBUG)
4101     {
4102       void *label = lookupThreadLabel(t->id);
4103       if (label) debugBelch("[\"%s\"] ",(char *)label);
4104     }
4105 #endif
4106     printThreadStatus(t);
4107     debugBelch("\n");
4108   }
4109 }
4110     
4111 #ifdef DEBUG
4112
4113 /* 
4114    Print a whole blocking queue attached to node (debugging only).
4115 */
4116 # if defined(PARALLEL_HASKELL)
4117 void 
4118 print_bq (StgClosure *node)
4119 {
4120   StgBlockingQueueElement *bqe;
4121   StgTSO *tso;
4122   rtsBool end;
4123
4124   debugBelch("## BQ of closure %p (%s): ",
4125           node, info_type(node));
4126
4127   /* should cover all closures that may have a blocking queue */
4128   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4129          get_itbl(node)->type == FETCH_ME_BQ ||
4130          get_itbl(node)->type == RBH ||
4131          get_itbl(node)->type == MVAR);
4132     
4133   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4134
4135   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4136 }
4137
4138 /* 
4139    Print a whole blocking queue starting with the element bqe.
4140 */
4141 void 
4142 print_bqe (StgBlockingQueueElement *bqe)
4143 {
4144   rtsBool end;
4145
4146   /* 
4147      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4148   */
4149   for (end = (bqe==END_BQ_QUEUE);
4150        !end; // iterate until bqe points to a CONSTR
4151        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4152        bqe = end ? END_BQ_QUEUE : bqe->link) {
4153     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4154     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4155     /* types of closures that may appear in a blocking queue */
4156     ASSERT(get_itbl(bqe)->type == TSO ||           
4157            get_itbl(bqe)->type == BLOCKED_FETCH || 
4158            get_itbl(bqe)->type == CONSTR); 
4159     /* only BQs of an RBH end with an RBH_Save closure */
4160     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4161
4162     switch (get_itbl(bqe)->type) {
4163     case TSO:
4164       debugBelch(" TSO %u (%x),",
4165               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4166       break;
4167     case BLOCKED_FETCH:
4168       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4169               ((StgBlockedFetch *)bqe)->node, 
4170               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4171               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4172               ((StgBlockedFetch *)bqe)->ga.weight);
4173       break;
4174     case CONSTR:
4175       debugBelch(" %s (IP %p),",
4176               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4177                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4178                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4179                "RBH_Save_?"), get_itbl(bqe));
4180       break;
4181     default:
4182       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4183            info_type((StgClosure *)bqe)); // , node, info_type(node));
4184       break;
4185     }
4186   } /* for */
4187   debugBelch("\n");
4188 }
4189 # elif defined(GRAN)
4190 void 
4191 print_bq (StgClosure *node)
4192 {
4193   StgBlockingQueueElement *bqe;
4194   PEs node_loc, tso_loc;
4195   rtsBool end;
4196
4197   /* should cover all closures that may have a blocking queue */
4198   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4199          get_itbl(node)->type == FETCH_ME_BQ ||
4200          get_itbl(node)->type == RBH);
4201     
4202   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4203   node_loc = where_is(node);
4204
4205   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4206           node, info_type(node), node_loc);
4207
4208   /* 
4209      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4210   */
4211   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4212        !end; // iterate until bqe points to a CONSTR
4213        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4214     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4215     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4216     /* types of closures that may appear in a blocking queue */
4217     ASSERT(get_itbl(bqe)->type == TSO ||           
4218            get_itbl(bqe)->type == CONSTR); 
4219     /* only BQs of an RBH end with an RBH_Save closure */
4220     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4221
4222     tso_loc = where_is((StgClosure *)bqe);
4223     switch (get_itbl(bqe)->type) {
4224     case TSO:
4225       debugBelch(" TSO %d (%p) on [PE %d],",
4226               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4227       break;
4228     case CONSTR:
4229       debugBelch(" %s (IP %p),",
4230               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4231                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4232                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4233                "RBH_Save_?"), get_itbl(bqe));
4234       break;
4235     default:
4236       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4237            info_type((StgClosure *)bqe), node, info_type(node));
4238       break;
4239     }
4240   } /* for */
4241   debugBelch("\n");
4242 }
4243 # endif
4244
4245 #if defined(PARALLEL_HASKELL)
4246 static nat
4247 run_queue_len(void)
4248 {
4249   nat i;
4250   StgTSO *tso;
4251
4252   for (i=0, tso=run_queue_hd; 
4253        tso != END_TSO_QUEUE;
4254        i++, tso=tso->link)
4255     /* nothing */
4256
4257   return i;
4258 }
4259 #endif
4260
4261 void
4262 sched_belch(char *s, ...)
4263 {
4264   va_list ap;
4265   va_start(ap,s);
4266 #ifdef RTS_SUPPORTS_THREADS
4267   debugBelch("sched (task %p): ", osThreadId());
4268 #elif defined(PARALLEL_HASKELL)
4269   debugBelch("== ");
4270 #else
4271   debugBelch("sched: ");
4272 #endif
4273   vdebugBelch(s, ap);
4274   debugBelch("\n");
4275   va_end(ap);
4276 }
4277
4278 #endif /* DEBUG */