[project @ 2005-05-25 08:23:17 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* flag that tracks whether we have done any execution in this time slice. */
178 nat recent_activity = ACTIVITY_YES;
179
180 /* if this flag is set as well, give up execution */
181 rtsBool interrupted = rtsFalse;
182
183 /* Next thread ID to allocate.
184  * Locks required: thread_id_mutex
185  */
186 static StgThreadID next_thread_id = 1;
187
188 /*
189  * Pointers to the state of the current thread.
190  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
191  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
192  */
193  
194 /* The smallest stack size that makes any sense is:
195  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
196  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
197  *  + 1                       (the closure to enter)
198  *  + 1                       (stg_ap_v_ret)
199  *  + 1                       (spare slot req'd by stg_ap_v_ret)
200  *
201  * A thread with this stack will bomb immediately with a stack
202  * overflow, which will increase its stack size.  
203  */
204
205 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
206
207
208 #if defined(GRAN)
209 StgTSO *CurrentTSO;
210 #endif
211
212 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
213  *  exists - earlier gccs apparently didn't.
214  *  -= chak
215  */
216 StgTSO dummy_tso;
217
218 /*
219  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
220  * in an MT setting, needed to signal that a worker thread shouldn't hang around
221  * in the scheduler when it is out of work.
222  */
223 static rtsBool shutting_down_scheduler = rtsFalse;
224
225 #if defined(RTS_SUPPORTS_THREADS)
226 /* ToDo: carefully document the invariants that go together
227  *       with these synchronisation objects.
228  */
229 Mutex     sched_mutex       = INIT_MUTEX_VAR;
230 Mutex     term_mutex        = INIT_MUTEX_VAR;
231
232 #endif /* RTS_SUPPORTS_THREADS */
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO *LastTSO;
236 rtsTime TimeOfLastYield;
237 rtsBool emitSchedule = rtsTrue;
238 #endif
239
240 #if DEBUG
241 static char *whatNext_strs[] = {
242   "(unknown)",
243   "ThreadRunGHC",
244   "ThreadInterpret",
245   "ThreadKilled",
246   "ThreadRelocated",
247   "ThreadComplete"
248 };
249 #endif
250
251 /* -----------------------------------------------------------------------------
252  * static function prototypes
253  * -------------------------------------------------------------------------- */
254
255 #if defined(RTS_SUPPORTS_THREADS)
256 static void taskStart(void);
257 #endif
258
259 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
260                       Capability *initialCapability );
261
262 //
263 // These function all encapsulate parts of the scheduler loop, and are
264 // abstracted only to make the structure and control flow of the
265 // scheduler clearer.
266 //
267 static void schedulePreLoop(void);
268 static void scheduleStartSignalHandlers(void);
269 static void scheduleCheckBlockedThreads(void);
270 static void scheduleCheckBlackHoles(void);
271 static void scheduleDetectDeadlock(void);
272 #if defined(GRAN)
273 static StgTSO *scheduleProcessEvent(rtsEvent *event);
274 #endif
275 #if defined(PARALLEL_HASKELL)
276 static StgTSO *scheduleSendPendingMessages(void);
277 static void scheduleActivateSpark(void);
278 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
279 #endif
280 #if defined(PAR) || defined(GRAN)
281 static void scheduleGranParReport(void);
282 #endif
283 static void schedulePostRunThread(void);
284 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
285 static void scheduleHandleStackOverflow( StgTSO *t);
286 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
287 static void scheduleHandleThreadBlocked( StgTSO *t );
288 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
289                                              Capability *cap, StgTSO *t );
290 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
291 static void scheduleDoGC(rtsBool force_major);
292
293 static void unblockThread(StgTSO *tso);
294 static rtsBool checkBlackHoles(void);
295 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
296                                    Capability *initialCapability
297                                    );
298 static void scheduleThread_ (StgTSO* tso);
299 static void AllRoots(evac_fn evac);
300
301 static StgTSO *threadStackOverflow(StgTSO *tso);
302
303 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
304                         rtsBool stop_at_atomically);
305
306 static void printThreadBlockage(StgTSO *tso);
307 static void printThreadStatus(StgTSO *tso);
308 void printThreadQueue(StgTSO *tso);
309
310 #if defined(PARALLEL_HASKELL)
311 StgTSO * createSparkThread(rtsSpark spark);
312 StgTSO * activateSpark (rtsSpark spark);  
313 #endif
314
315 /* ----------------------------------------------------------------------------
316  * Starting Tasks
317  * ------------------------------------------------------------------------- */
318
319 #if defined(RTS_SUPPORTS_THREADS)
320 static nat startingWorkerThread = 0;
321
322 static void
323 taskStart(void)
324 {
325   ACQUIRE_LOCK(&sched_mutex);
326   startingWorkerThread--;
327   schedule(NULL,NULL);
328   taskStop();
329   RELEASE_LOCK(&sched_mutex);
330 }
331
332 void
333 startSchedulerTaskIfNecessary(void)
334 {
335     if ( !EMPTY_RUN_QUEUE()
336          && !shutting_down_scheduler // not if we're shutting down
337          && startingWorkerThread==0)
338     {
339         // we don't want to start another worker thread
340         // just because the last one hasn't yet reached the
341         // "waiting for capability" state
342         startingWorkerThread++;
343         if (!maybeStartNewWorker(taskStart)) {
344             startingWorkerThread--;
345         }
346     }
347 }
348 #endif
349
350 /* -----------------------------------------------------------------------------
351  * Putting a thread on the run queue: different scheduling policies
352  * -------------------------------------------------------------------------- */
353
354 STATIC_INLINE void
355 addToRunQueue( StgTSO *t )
356 {
357 #if defined(PARALLEL_HASKELL)
358     if (RtsFlags.ParFlags.doFairScheduling) { 
359         // this does round-robin scheduling; good for concurrency
360         APPEND_TO_RUN_QUEUE(t);
361     } else {
362         // this does unfair scheduling; good for parallelism
363         PUSH_ON_RUN_QUEUE(t);
364     }
365 #else
366     // this does round-robin scheduling; good for concurrency
367     APPEND_TO_RUN_QUEUE(t);
368 #endif
369 }
370     
371 /* ---------------------------------------------------------------------------
372    Main scheduling loop.
373
374    We use round-robin scheduling, each thread returning to the
375    scheduler loop when one of these conditions is detected:
376
377       * out of heap space
378       * timer expires (thread yields)
379       * thread blocks
380       * thread ends
381       * stack overflow
382
383    Locking notes:  we acquire the scheduler lock once at the beginning
384    of the scheduler loop, and release it when
385     
386       * running a thread, or
387       * waiting for work, or
388       * waiting for a GC to complete.
389
390    GRAN version:
391      In a GranSim setup this loop iterates over the global event queue.
392      This revolves around the global event queue, which determines what 
393      to do next. Therefore, it's more complicated than either the 
394      concurrent or the parallel (GUM) setup.
395
396    GUM version:
397      GUM iterates over incoming messages.
398      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
399      and sends out a fish whenever it has nothing to do; in-between
400      doing the actual reductions (shared code below) it processes the
401      incoming messages and deals with delayed operations 
402      (see PendingFetches).
403      This is not the ugliest code you could imagine, but it's bloody close.
404
405    ------------------------------------------------------------------------ */
406
407 static void
408 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
409           Capability *initialCapability )
410 {
411   StgTSO *t;
412   Capability *cap;
413   StgThreadReturnCode ret;
414 #if defined(GRAN)
415   rtsEvent *event;
416 #elif defined(PARALLEL_HASKELL)
417   StgTSO *tso;
418   GlobalTaskId pe;
419   rtsBool receivedFinish = rtsFalse;
420 # if defined(DEBUG)
421   nat tp_size, sp_size; // stats only
422 # endif
423 #endif
424   nat prev_what_next;
425   rtsBool ready_to_gc;
426   
427   // Pre-condition: sched_mutex is held.
428   // We might have a capability, passed in as initialCapability.
429   cap = initialCapability;
430
431 #if !defined(RTS_SUPPORTS_THREADS)
432   // simply initialise it in the non-threaded case
433   grabCapability(&cap);
434 #endif
435
436   IF_DEBUG(scheduler,
437            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
438                        mainThread, initialCapability);
439       );
440
441   schedulePreLoop();
442
443   // -----------------------------------------------------------
444   // Scheduler loop starts here:
445
446 #if defined(PARALLEL_HASKELL)
447 #define TERMINATION_CONDITION        (!receivedFinish)
448 #elif defined(GRAN)
449 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
450 #else
451 #define TERMINATION_CONDITION        rtsTrue
452 #endif
453
454   while (TERMINATION_CONDITION) {
455
456 #if defined(GRAN)
457       /* Choose the processor with the next event */
458       CurrentProc = event->proc;
459       CurrentTSO = event->tso;
460 #endif
461
462 #if defined(RTS_SUPPORTS_THREADS)
463       // Yield the capability to higher-priority tasks if necessary.
464       //
465       if (cap != NULL) {
466           yieldCapability(&cap, 
467                           mainThread ? &mainThread->bound_thread_cond : NULL );
468       }
469
470       // If we do not currently hold a capability, we wait for one
471       //
472       if (cap == NULL) {
473           waitForCapability(&sched_mutex, &cap,
474                             mainThread ? &mainThread->bound_thread_cond : NULL);
475       }
476
477       // We now have a capability...
478 #endif
479
480 #if 0 /* extra sanity checking */
481       { 
482           StgMainThread *m;
483           for (m = main_threads; m != NULL; m = m->link) {
484               ASSERT(get_itbl(m->tso)->type == TSO);
485           }
486       }
487 #endif
488
489     // Check whether we have re-entered the RTS from Haskell without
490     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
491     // call).
492     if (cap->r.rInHaskell) {
493           errorBelch("schedule: re-entered unsafely.\n"
494                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
495           stg_exit(1);
496     }
497
498     //
499     // Test for interruption.  If interrupted==rtsTrue, then either
500     // we received a keyboard interrupt (^C), or the scheduler is
501     // trying to shut down all the tasks (shutting_down_scheduler) in
502     // the threaded RTS.
503     //
504     if (interrupted) {
505         if (shutting_down_scheduler) {
506             IF_DEBUG(scheduler, sched_belch("shutting down"));
507             releaseCapability(cap);
508             if (mainThread) {
509                 mainThread->stat = Interrupted;
510                 mainThread->ret  = NULL;
511             }
512             return;
513         } else {
514             IF_DEBUG(scheduler, sched_belch("interrupted"));
515             deleteAllThreads();
516         }
517     }
518
519 #if defined(not_yet) && defined(SMP)
520     //
521     // Top up the run queue from our spark pool.  We try to make the
522     // number of threads in the run queue equal to the number of
523     // free capabilities.
524     //
525     {
526         StgClosure *spark;
527         if (EMPTY_RUN_QUEUE()) {
528             spark = findSpark(rtsFalse);
529             if (spark == NULL) {
530                 break; /* no more sparks in the pool */
531             } else {
532                 createSparkThread(spark);         
533                 IF_DEBUG(scheduler,
534                          sched_belch("==^^ turning spark of closure %p into a thread",
535                                      (StgClosure *)spark));
536             }
537         }
538     }
539 #endif // SMP
540
541     scheduleStartSignalHandlers();
542
543     // Only check the black holes here if we've nothing else to do.
544     // During normal execution, the black hole list only gets checked
545     // at GC time, to avoid repeatedly traversing this possibly long
546     // list each time around the scheduler.
547     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
548
549     scheduleCheckBlockedThreads();
550
551     scheduleDetectDeadlock();
552
553     // Normally, the only way we can get here with no threads to
554     // run is if a keyboard interrupt received during 
555     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
556     // Additionally, it is not fatal for the
557     // threaded RTS to reach here with no threads to run.
558     //
559     // win32: might be here due to awaitEvent() being abandoned
560     // as a result of a console event having been delivered.
561     if ( EMPTY_RUN_QUEUE() ) {
562 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
563         ASSERT(interrupted);
564 #endif
565         continue; // nothing to do
566     }
567
568 #if defined(PARALLEL_HASKELL)
569     scheduleSendPendingMessages();
570     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
571         continue;
572
573 #if defined(SPARKS)
574     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
575 #endif
576
577     /* If we still have no work we need to send a FISH to get a spark
578        from another PE */
579     if (EMPTY_RUN_QUEUE()) {
580         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
581         ASSERT(rtsFalse); // should not happen at the moment
582     }
583     // from here: non-empty run queue.
584     //  TODO: merge above case with this, only one call processMessages() !
585     if (PacketsWaiting()) {  /* process incoming messages, if
586                                 any pending...  only in else
587                                 because getRemoteWork waits for
588                                 messages as well */
589         receivedFinish = processMessages();
590     }
591 #endif
592
593 #if defined(GRAN)
594     scheduleProcessEvent(event);
595 #endif
596
597     // 
598     // Get a thread to run
599     //
600     ASSERT(run_queue_hd != END_TSO_QUEUE);
601     POP_RUN_QUEUE(t);
602
603 #if defined(GRAN) || defined(PAR)
604     scheduleGranParReport(); // some kind of debuging output
605 #else
606     // Sanity check the thread we're about to run.  This can be
607     // expensive if there is lots of thread switching going on...
608     IF_DEBUG(sanity,checkTSO(t));
609 #endif
610
611 #if defined(RTS_SUPPORTS_THREADS)
612     // Check whether we can run this thread in the current task.
613     // If not, we have to pass our capability to the right task.
614     {
615       StgMainThread *m = t->main;
616       
617       if(m)
618       {
619         if(m == mainThread)
620         {
621           IF_DEBUG(scheduler,
622             sched_belch("### Running thread %d in bound thread", t->id));
623           // yes, the Haskell thread is bound to the current native thread
624         }
625         else
626         {
627           IF_DEBUG(scheduler,
628             sched_belch("### thread %d bound to another OS thread", t->id));
629           // no, bound to a different Haskell thread: pass to that thread
630           PUSH_ON_RUN_QUEUE(t);
631           continue;
632         }
633       }
634       else
635       {
636         if(mainThread != NULL)
637         // The thread we want to run is unbound.
638         {
639           IF_DEBUG(scheduler,
640             sched_belch("### this OS thread cannot run thread %d", t->id));
641           // no, the current native thread is bound to a different
642           // Haskell thread, so pass it to any worker thread
643           PUSH_ON_RUN_QUEUE(t);
644           continue; 
645         }
646       }
647     }
648 #endif
649
650     cap->r.rCurrentTSO = t;
651     
652     /* context switches are now initiated by the timer signal, unless
653      * the user specified "context switch as often as possible", with
654      * +RTS -C0
655      */
656     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
657          && (run_queue_hd != END_TSO_QUEUE
658              || blocked_queue_hd != END_TSO_QUEUE
659              || sleeping_queue != END_TSO_QUEUE)))
660         context_switch = 1;
661
662 run_thread:
663
664     RELEASE_LOCK(&sched_mutex);
665
666     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
667                               (long)t->id, whatNext_strs[t->what_next]));
668
669 #if defined(PROFILING)
670     startHeapProfTimer();
671 #endif
672
673     // ----------------------------------------------------------------------
674     // Run the current thread 
675
676     prev_what_next = t->what_next;
677
678     errno = t->saved_errno;
679     cap->r.rInHaskell = rtsTrue;
680
681     recent_activity = ACTIVITY_YES;
682
683     switch (prev_what_next) {
684
685     case ThreadKilled:
686     case ThreadComplete:
687         /* Thread already finished, return to scheduler. */
688         ret = ThreadFinished;
689         break;
690
691     case ThreadRunGHC:
692         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
693         break;
694
695     case ThreadInterpret:
696         ret = interpretBCO(cap);
697         break;
698
699     default:
700       barf("schedule: invalid what_next field");
701     }
702
703 #if defined(SMP)
704     // in SMP mode, we might return with a different capability than
705     // we started with, if the Haskell thread made a foreign call.  So
706     // let's find out what our current Capability is:
707     cap = myCapability();
708 #endif
709
710     // We have run some Haskell code: there might be blackhole-blocked
711     // threads to wake up now.
712     if ( blackhole_queue != END_TSO_QUEUE ) {
713         blackholes_need_checking = rtsTrue;
714     }
715
716     cap->r.rInHaskell = rtsFalse;
717
718     // The TSO might have moved, eg. if it re-entered the RTS and a GC
719     // happened.  So find the new location:
720     t = cap->r.rCurrentTSO;
721
722     // And save the current errno in this thread.
723     t->saved_errno = errno;
724
725     // ----------------------------------------------------------------------
726     
727     /* Costs for the scheduler are assigned to CCS_SYSTEM */
728 #if defined(PROFILING)
729     stopHeapProfTimer();
730     CCCS = CCS_SYSTEM;
731 #endif
732     
733     ACQUIRE_LOCK(&sched_mutex);
734     
735 #if defined(RTS_SUPPORTS_THREADS)
736     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
737 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
738     IF_DEBUG(scheduler,debugBelch("sched: "););
739 #endif
740     
741     schedulePostRunThread();
742
743     ready_to_gc = rtsFalse;
744
745     switch (ret) {
746     case HeapOverflow:
747         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
748         break;
749
750     case StackOverflow:
751         scheduleHandleStackOverflow(t);
752         break;
753
754     case ThreadYielding:
755         if (scheduleHandleYield(t, prev_what_next)) {
756             // shortcut for switching between compiler/interpreter:
757             goto run_thread; 
758         }
759         break;
760
761     case ThreadBlocked:
762         scheduleHandleThreadBlocked(t);
763         break;
764
765     case ThreadFinished:
766         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
767         break;
768
769     default:
770       barf("schedule: invalid thread return code %d", (int)ret);
771     }
772
773     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
774     if (ready_to_gc) { scheduleDoGC(rtsFalse); }
775   } /* end of while() */
776
777   IF_PAR_DEBUG(verbose,
778                debugBelch("== Leaving schedule() after having received Finish\n"));
779 }
780
781 /* ----------------------------------------------------------------------------
782  * Setting up the scheduler loop
783  * ASSUMES: sched_mutex
784  * ------------------------------------------------------------------------- */
785
786 static void
787 schedulePreLoop(void)
788 {
789 #if defined(GRAN) 
790     /* set up first event to get things going */
791     /* ToDo: assign costs for system setup and init MainTSO ! */
792     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
793               ContinueThread, 
794               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
795     
796     IF_DEBUG(gran,
797              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
798                         CurrentTSO);
799              G_TSO(CurrentTSO, 5));
800     
801     if (RtsFlags.GranFlags.Light) {
802         /* Save current time; GranSim Light only */
803         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
804     }      
805 #endif
806 }
807
808 /* ----------------------------------------------------------------------------
809  * Start any pending signal handlers
810  * ASSUMES: sched_mutex
811  * ------------------------------------------------------------------------- */
812
813 static void
814 scheduleStartSignalHandlers(void)
815 {
816 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
817     if (signals_pending()) {
818       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
819       startSignalHandlers();
820       ACQUIRE_LOCK(&sched_mutex);
821     }
822 #endif
823 }
824
825 /* ----------------------------------------------------------------------------
826  * Check for blocked threads that can be woken up.
827  * ASSUMES: sched_mutex
828  * ------------------------------------------------------------------------- */
829
830 static void
831 scheduleCheckBlockedThreads(void)
832 {
833     //
834     // Check whether any waiting threads need to be woken up.  If the
835     // run queue is empty, and there are no other tasks running, we
836     // can wait indefinitely for something to happen.
837     //
838     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
839     {
840 #if defined(RTS_SUPPORTS_THREADS)
841         // We shouldn't be here...
842         barf("schedule: awaitEvent() in threaded RTS");
843 #else
844         awaitEvent( EMPTY_RUN_QUEUE() );
845 #endif
846 }
847
848
849 /* ----------------------------------------------------------------------------
850  * Check for threads blocked on BLACKHOLEs that can be woken up
851  * ASSUMES: sched_mutex
852  * ------------------------------------------------------------------------- */
853 static void
854 scheduleCheckBlackHoles( void )
855 {
856     if ( blackholes_need_checking )
857     {
858         checkBlackHoles();
859         blackholes_need_checking = rtsFalse;
860     }
861 }
862
863 /* ----------------------------------------------------------------------------
864  * Detect deadlock conditions and attempt to resolve them.
865  * ASSUMES: sched_mutex
866  * ------------------------------------------------------------------------- */
867
868 static void
869 scheduleDetectDeadlock()
870 {
871
872 #if defined(PARALLEL_HASKELL)
873     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
874     return;
875 #endif
876
877     /* 
878      * Detect deadlock: when we have no threads to run, there are no
879      * threads blocked, waiting for I/O, or sleeping, and all the
880      * other tasks are waiting for work, we must have a deadlock of
881      * some description.
882      */
883     if ( EMPTY_THREAD_QUEUES() )
884     {
885 #if defined(RTS_SUPPORTS_THREADS)
886         /* 
887          * In the threaded RTS, we only check for deadlock if there
888          * has been no activity in a complete timeslice.  This means
889          * we won't eagerly start a full GC just because we don't have
890          * any threads to run currently.
891          */
892         if (recent_activity != ACTIVITY_INACTIVE) return;
893 #endif
894
895         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
896
897         // Garbage collection can release some new threads due to
898         // either (a) finalizers or (b) threads resurrected because
899         // they are unreachable and will therefore be sent an
900         // exception.  Any threads thus released will be immediately
901         // runnable.
902
903         scheduleDoGC( rtsTrue/*force  major GC*/ );
904         recent_activity = ACTIVITY_DONE_GC;
905         if ( !EMPTY_RUN_QUEUE() ) return;
906
907 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
908         /* If we have user-installed signal handlers, then wait
909          * for signals to arrive rather then bombing out with a
910          * deadlock.
911          */
912         if ( anyUserHandlers() ) {
913             IF_DEBUG(scheduler, 
914                      sched_belch("still deadlocked, waiting for signals..."));
915
916             awaitUserSignals();
917
918             if (signals_pending()) {
919                 RELEASE_LOCK(&sched_mutex);
920                 startSignalHandlers();
921                 ACQUIRE_LOCK(&sched_mutex);
922             }
923
924             // either we have threads to run, or we were interrupted:
925             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
926         }
927 #endif
928
929 #if !defined(RTS_SUPPORTS_THREADS)
930         /* Probably a real deadlock.  Send the current main thread the
931          * Deadlock exception (or in the SMP build, send *all* main
932          * threads the deadlock exception, since none of them can make
933          * progress).
934          */
935         {
936             StgMainThread *m;
937             m = main_threads;
938             switch (m->tso->why_blocked) {
939             case BlockedOnSTM:
940             case BlockedOnBlackHole:
941             case BlockedOnException:
942             case BlockedOnMVar:
943                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
944                 return;
945             default:
946                 barf("deadlock: main thread blocked in a strange way");
947             }
948         }
949 #endif
950     }
951 }
952
953 /* ----------------------------------------------------------------------------
954  * Process an event (GRAN only)
955  * ------------------------------------------------------------------------- */
956
957 #if defined(GRAN)
958 static StgTSO *
959 scheduleProcessEvent(rtsEvent *event)
960 {
961     StgTSO *t;
962
963     if (RtsFlags.GranFlags.Light)
964       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
965
966     /* adjust time based on time-stamp */
967     if (event->time > CurrentTime[CurrentProc] &&
968         event->evttype != ContinueThread)
969       CurrentTime[CurrentProc] = event->time;
970     
971     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
972     if (!RtsFlags.GranFlags.Light)
973       handleIdlePEs();
974
975     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
976
977     /* main event dispatcher in GranSim */
978     switch (event->evttype) {
979       /* Should just be continuing execution */
980     case ContinueThread:
981       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
982       /* ToDo: check assertion
983       ASSERT(run_queue_hd != (StgTSO*)NULL &&
984              run_queue_hd != END_TSO_QUEUE);
985       */
986       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
987       if (!RtsFlags.GranFlags.DoAsyncFetch &&
988           procStatus[CurrentProc]==Fetching) {
989         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
990               CurrentTSO->id, CurrentTSO, CurrentProc);
991         goto next_thread;
992       } 
993       /* Ignore ContinueThreads for completed threads */
994       if (CurrentTSO->what_next == ThreadComplete) {
995         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
996               CurrentTSO->id, CurrentTSO, CurrentProc);
997         goto next_thread;
998       } 
999       /* Ignore ContinueThreads for threads that are being migrated */
1000       if (PROCS(CurrentTSO)==Nowhere) { 
1001         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1002               CurrentTSO->id, CurrentTSO, CurrentProc);
1003         goto next_thread;
1004       }
1005       /* The thread should be at the beginning of the run queue */
1006       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1007         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1008               CurrentTSO->id, CurrentTSO, CurrentProc);
1009         break; // run the thread anyway
1010       }
1011       /*
1012       new_event(proc, proc, CurrentTime[proc],
1013                 FindWork,
1014                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1015       goto next_thread; 
1016       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1017       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1018
1019     case FetchNode:
1020       do_the_fetchnode(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case GlobalBlock:
1024       do_the_globalblock(event);
1025       goto next_thread;             /* handle next event in event queue  */
1026       
1027     case FetchReply:
1028       do_the_fetchreply(event);
1029       goto next_thread;             /* handle next event in event queue  */
1030       
1031     case UnblockThread:   /* Move from the blocked queue to the tail of */
1032       do_the_unblock(event);
1033       goto next_thread;             /* handle next event in event queue  */
1034       
1035     case ResumeThread:  /* Move from the blocked queue to the tail of */
1036       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1037       event->tso->gran.blocktime += 
1038         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1039       do_the_startthread(event);
1040       goto next_thread;             /* handle next event in event queue  */
1041       
1042     case StartThread:
1043       do_the_startthread(event);
1044       goto next_thread;             /* handle next event in event queue  */
1045       
1046     case MoveThread:
1047       do_the_movethread(event);
1048       goto next_thread;             /* handle next event in event queue  */
1049       
1050     case MoveSpark:
1051       do_the_movespark(event);
1052       goto next_thread;             /* handle next event in event queue  */
1053       
1054     case FindWork:
1055       do_the_findwork(event);
1056       goto next_thread;             /* handle next event in event queue  */
1057       
1058     default:
1059       barf("Illegal event type %u\n", event->evttype);
1060     }  /* switch */
1061     
1062     /* This point was scheduler_loop in the old RTS */
1063
1064     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1065
1066     TimeOfLastEvent = CurrentTime[CurrentProc];
1067     TimeOfNextEvent = get_time_of_next_event();
1068     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1069     // CurrentTSO = ThreadQueueHd;
1070
1071     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1072                          TimeOfNextEvent));
1073
1074     if (RtsFlags.GranFlags.Light) 
1075       GranSimLight_leave_system(event, &ActiveTSO); 
1076
1077     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1078
1079     IF_DEBUG(gran, 
1080              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1081
1082     /* in a GranSim setup the TSO stays on the run queue */
1083     t = CurrentTSO;
1084     /* Take a thread from the run queue. */
1085     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1086
1087     IF_DEBUG(gran, 
1088              debugBelch("GRAN: About to run current thread, which is\n");
1089              G_TSO(t,5));
1090
1091     context_switch = 0; // turned on via GranYield, checking events and time slice
1092
1093     IF_DEBUG(gran, 
1094              DumpGranEvent(GR_SCHEDULE, t));
1095
1096     procStatus[CurrentProc] = Busy;
1097 }
1098 #endif // GRAN
1099
1100 /* ----------------------------------------------------------------------------
1101  * Send pending messages (PARALLEL_HASKELL only)
1102  * ------------------------------------------------------------------------- */
1103
1104 #if defined(PARALLEL_HASKELL)
1105 static StgTSO *
1106 scheduleSendPendingMessages(void)
1107 {
1108     StgSparkPool *pool;
1109     rtsSpark spark;
1110     StgTSO *t;
1111
1112 # if defined(PAR) // global Mem.Mgmt., omit for now
1113     if (PendingFetches != END_BF_QUEUE) {
1114         processFetches();
1115     }
1116 # endif
1117     
1118     if (RtsFlags.ParFlags.BufferTime) {
1119         // if we use message buffering, we must send away all message
1120         // packets which have become too old...
1121         sendOldBuffers(); 
1122     }
1123 }
1124 #endif
1125
1126 /* ----------------------------------------------------------------------------
1127  * Activate spark threads (PARALLEL_HASKELL only)
1128  * ------------------------------------------------------------------------- */
1129
1130 #if defined(PARALLEL_HASKELL)
1131 static void
1132 scheduleActivateSpark(void)
1133 {
1134 #if defined(SPARKS)
1135   ASSERT(EMPTY_RUN_QUEUE());
1136 /* We get here if the run queue is empty and want some work.
1137    We try to turn a spark into a thread, and add it to the run queue,
1138    from where it will be picked up in the next iteration of the scheduler
1139    loop.
1140 */
1141
1142       /* :-[  no local threads => look out for local sparks */
1143       /* the spark pool for the current PE */
1144       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1145       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1146           pool->hd < pool->tl) {
1147         /* 
1148          * ToDo: add GC code check that we really have enough heap afterwards!!
1149          * Old comment:
1150          * If we're here (no runnable threads) and we have pending
1151          * sparks, we must have a space problem.  Get enough space
1152          * to turn one of those pending sparks into a
1153          * thread... 
1154          */
1155
1156         spark = findSpark(rtsFalse);            /* get a spark */
1157         if (spark != (rtsSpark) NULL) {
1158           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1159           IF_PAR_DEBUG(fish, // schedule,
1160                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1161                              tso->id, tso, advisory_thread_count));
1162
1163           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1164             IF_PAR_DEBUG(fish, // schedule,
1165                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1166                             spark));
1167             return rtsFalse; /* failed to generate a thread */
1168           }                  /* otherwise fall through & pick-up new tso */
1169         } else {
1170           IF_PAR_DEBUG(fish, // schedule,
1171                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1172                              spark_queue_len(pool)));
1173           return rtsFalse;  /* failed to generate a thread */
1174         }
1175         return rtsTrue;  /* success in generating a thread */
1176   } else { /* no more threads permitted or pool empty */
1177     return rtsFalse;  /* failed to generateThread */
1178   }
1179 #else
1180   tso = NULL; // avoid compiler warning only
1181   return rtsFalse;  /* dummy in non-PAR setup */
1182 #endif // SPARKS
1183 }
1184 #endif // PARALLEL_HASKELL
1185
1186 /* ----------------------------------------------------------------------------
1187  * Get work from a remote node (PARALLEL_HASKELL only)
1188  * ------------------------------------------------------------------------- */
1189     
1190 #if defined(PARALLEL_HASKELL)
1191 static rtsBool
1192 scheduleGetRemoteWork(rtsBool *receivedFinish)
1193 {
1194   ASSERT(EMPTY_RUN_QUEUE());
1195
1196   if (RtsFlags.ParFlags.BufferTime) {
1197         IF_PAR_DEBUG(verbose, 
1198                 debugBelch("...send all pending data,"));
1199         {
1200           nat i;
1201           for (i=1; i<=nPEs; i++)
1202             sendImmediately(i); // send all messages away immediately
1203         }
1204   }
1205 # ifndef SPARKS
1206         //++EDEN++ idle() , i.e. send all buffers, wait for work
1207         // suppress fishing in EDEN... just look for incoming messages
1208         // (blocking receive)
1209   IF_PAR_DEBUG(verbose, 
1210                debugBelch("...wait for incoming messages...\n"));
1211   *receivedFinish = processMessages(); // blocking receive...
1212
1213         // and reenter scheduling loop after having received something
1214         // (return rtsFalse below)
1215
1216 # else /* activate SPARKS machinery */
1217 /* We get here, if we have no work, tried to activate a local spark, but still
1218    have no work. We try to get a remote spark, by sending a FISH message.
1219    Thread migration should be added here, and triggered when a sequence of 
1220    fishes returns without work. */
1221         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1222
1223       /* =8-[  no local sparks => look for work on other PEs */
1224         /*
1225          * We really have absolutely no work.  Send out a fish
1226          * (there may be some out there already), and wait for
1227          * something to arrive.  We clearly can't run any threads
1228          * until a SCHEDULE or RESUME arrives, and so that's what
1229          * we're hoping to see.  (Of course, we still have to
1230          * respond to other types of messages.)
1231          */
1232         rtsTime now = msTime() /*CURRENT_TIME*/;
1233         IF_PAR_DEBUG(verbose, 
1234                      debugBelch("--  now=%ld\n", now));
1235         IF_PAR_DEBUG(fish, // verbose,
1236              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1237                  (last_fish_arrived_at!=0 &&
1238                   last_fish_arrived_at+delay > now)) {
1239                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1240                      now, last_fish_arrived_at+delay, 
1241                      last_fish_arrived_at,
1242                      delay);
1243              });
1244   
1245         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1246             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1247           if (last_fish_arrived_at==0 ||
1248               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1249             /* outstandingFishes is set in sendFish, processFish;
1250                avoid flooding system with fishes via delay */
1251     next_fish_to_send_at = 0;  
1252   } else {
1253     /* ToDo: this should be done in the main scheduling loop to avoid the
1254              busy wait here; not so bad if fish delay is very small  */
1255     int iq = 0; // DEBUGGING -- HWL
1256     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1257     /* send a fish when ready, but process messages that arrive in the meantime */
1258     do {
1259       if (PacketsWaiting()) {
1260         iq++; // DEBUGGING
1261         *receivedFinish = processMessages();
1262       }
1263       now = msTime();
1264     } while (!*receivedFinish || now<next_fish_to_send_at);
1265     // JB: This means the fish could become obsolete, if we receive
1266     // work. Better check for work again? 
1267     // last line: while (!receivedFinish || !haveWork || now<...)
1268     // next line: if (receivedFinish || haveWork )
1269
1270     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1271       return rtsFalse;  // NB: this will leave scheduler loop
1272                         // immediately after return!
1273                           
1274     IF_PAR_DEBUG(fish, // verbose,
1275                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1276
1277   }
1278
1279     // JB: IMHO, this should all be hidden inside sendFish(...)
1280     /* pe = choosePE(); 
1281        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1282                 NEW_FISH_HUNGER);
1283
1284     // Global statistics: count no. of fishes
1285     if (RtsFlags.ParFlags.ParStats.Global &&
1286          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1287            globalParStats.tot_fish_mess++;
1288            }
1289     */ 
1290
1291   /* delayed fishes must have been sent by now! */
1292   next_fish_to_send_at = 0;  
1293   }
1294       
1295   *receivedFinish = processMessages();
1296 # endif /* SPARKS */
1297
1298  return rtsFalse;
1299  /* NB: this function always returns rtsFalse, meaning the scheduler
1300     loop continues with the next iteration; 
1301     rationale: 
1302       return code means success in finding work; we enter this function
1303       if there is no local work, thus have to send a fish which takes
1304       time until it arrives with work; in the meantime we should process
1305       messages in the main loop;
1306  */
1307 }
1308 #endif // PARALLEL_HASKELL
1309
1310 /* ----------------------------------------------------------------------------
1311  * PAR/GRAN: Report stats & debugging info(?)
1312  * ------------------------------------------------------------------------- */
1313
1314 #if defined(PAR) || defined(GRAN)
1315 static void
1316 scheduleGranParReport(void)
1317 {
1318   ASSERT(run_queue_hd != END_TSO_QUEUE);
1319
1320   /* Take a thread from the run queue, if we have work */
1321   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1322
1323     /* If this TSO has got its outport closed in the meantime, 
1324      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1325      * It has to be marked as TH_DEAD for this purpose.
1326      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1327
1328 JB: TODO: investigate wether state change field could be nuked
1329      entirely and replaced by the normal tso state (whatnext
1330      field). All we want to do is to kill tsos from outside.
1331      */
1332
1333     /* ToDo: write something to the log-file
1334     if (RTSflags.ParFlags.granSimStats && !sameThread)
1335         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1336
1337     CurrentTSO = t;
1338     */
1339     /* the spark pool for the current PE */
1340     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1341
1342     IF_DEBUG(scheduler, 
1343              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1344                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1345
1346     IF_PAR_DEBUG(fish,
1347              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1348                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1349
1350     if (RtsFlags.ParFlags.ParStats.Full && 
1351         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1352         (emitSchedule || // forced emit
1353          (t && LastTSO && t->id != LastTSO->id))) {
1354       /* 
1355          we are running a different TSO, so write a schedule event to log file
1356          NB: If we use fair scheduling we also have to write  a deschedule 
1357              event for LastTSO; with unfair scheduling we know that the
1358              previous tso has blocked whenever we switch to another tso, so
1359              we don't need it in GUM for now
1360       */
1361       IF_PAR_DEBUG(fish, // schedule,
1362                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1363
1364       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1365                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1366       emitSchedule = rtsFalse;
1367     }
1368 }     
1369 #endif
1370
1371 /* ----------------------------------------------------------------------------
1372  * After running a thread...
1373  * ASSUMES: sched_mutex
1374  * ------------------------------------------------------------------------- */
1375
1376 static void
1377 schedulePostRunThread(void)
1378 {
1379 #if defined(PAR)
1380     /* HACK 675: if the last thread didn't yield, make sure to print a 
1381        SCHEDULE event to the log file when StgRunning the next thread, even
1382        if it is the same one as before */
1383     LastTSO = t; 
1384     TimeOfLastYield = CURRENT_TIME;
1385 #endif
1386
1387   /* some statistics gathering in the parallel case */
1388
1389 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1390   switch (ret) {
1391     case HeapOverflow:
1392 # if defined(GRAN)
1393       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1394       globalGranStats.tot_heapover++;
1395 # elif defined(PAR)
1396       globalParStats.tot_heapover++;
1397 # endif
1398       break;
1399
1400      case StackOverflow:
1401 # if defined(GRAN)
1402       IF_DEBUG(gran, 
1403                DumpGranEvent(GR_DESCHEDULE, t));
1404       globalGranStats.tot_stackover++;
1405 # elif defined(PAR)
1406       // IF_DEBUG(par, 
1407       // DumpGranEvent(GR_DESCHEDULE, t);
1408       globalParStats.tot_stackover++;
1409 # endif
1410       break;
1411
1412     case ThreadYielding:
1413 # if defined(GRAN)
1414       IF_DEBUG(gran, 
1415                DumpGranEvent(GR_DESCHEDULE, t));
1416       globalGranStats.tot_yields++;
1417 # elif defined(PAR)
1418       // IF_DEBUG(par, 
1419       // DumpGranEvent(GR_DESCHEDULE, t);
1420       globalParStats.tot_yields++;
1421 # endif
1422       break; 
1423
1424     case ThreadBlocked:
1425 # if defined(GRAN)
1426       IF_DEBUG(scheduler,
1427                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1428                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1429                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1430                if (t->block_info.closure!=(StgClosure*)NULL)
1431                  print_bq(t->block_info.closure);
1432                debugBelch("\n"));
1433
1434       // ??? needed; should emit block before
1435       IF_DEBUG(gran, 
1436                DumpGranEvent(GR_DESCHEDULE, t)); 
1437       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1438       /*
1439         ngoq Dogh!
1440       ASSERT(procStatus[CurrentProc]==Busy || 
1441               ((procStatus[CurrentProc]==Fetching) && 
1442               (t->block_info.closure!=(StgClosure*)NULL)));
1443       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1444           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1445             procStatus[CurrentProc]==Fetching)) 
1446         procStatus[CurrentProc] = Idle;
1447       */
1448 # elif defined(PAR)
1449 //++PAR++  blockThread() writes the event (change?)
1450 # endif
1451     break;
1452
1453   case ThreadFinished:
1454     break;
1455
1456   default:
1457     barf("parGlobalStats: unknown return code");
1458     break;
1459     }
1460 #endif
1461 }
1462
1463 /* -----------------------------------------------------------------------------
1464  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1465  * ASSUMES: sched_mutex
1466  * -------------------------------------------------------------------------- */
1467
1468 static rtsBool
1469 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1470 {
1471     // did the task ask for a large block?
1472     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1473         // if so, get one and push it on the front of the nursery.
1474         bdescr *bd;
1475         lnat blocks;
1476         
1477         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1478         
1479         IF_DEBUG(scheduler,
1480                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1481                             (long)t->id, whatNext_strs[t->what_next], blocks));
1482         
1483         // don't do this if the nursery is (nearly) full, we'll GC first.
1484         if (cap->r.rCurrentNursery->link != NULL ||
1485             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1486                                                // if the nursery has only one block.
1487             
1488             bd = allocGroup( blocks );
1489             cap->r.rNursery->n_blocks += blocks;
1490             
1491             // link the new group into the list
1492             bd->link = cap->r.rCurrentNursery;
1493             bd->u.back = cap->r.rCurrentNursery->u.back;
1494             if (cap->r.rCurrentNursery->u.back != NULL) {
1495                 cap->r.rCurrentNursery->u.back->link = bd;
1496             } else {
1497 #if !defined(SMP)
1498                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1499                        g0s0 == cap->r.rNursery);
1500 #endif
1501                 cap->r.rNursery->blocks = bd;
1502             }             
1503             cap->r.rCurrentNursery->u.back = bd;
1504             
1505             // initialise it as a nursery block.  We initialise the
1506             // step, gen_no, and flags field of *every* sub-block in
1507             // this large block, because this is easier than making
1508             // sure that we always find the block head of a large
1509             // block whenever we call Bdescr() (eg. evacuate() and
1510             // isAlive() in the GC would both have to do this, at
1511             // least).
1512             { 
1513                 bdescr *x;
1514                 for (x = bd; x < bd + blocks; x++) {
1515                     x->step = cap->r.rNursery;
1516                     x->gen_no = 0;
1517                     x->flags = 0;
1518                 }
1519             }
1520             
1521             // This assert can be a killer if the app is doing lots
1522             // of large block allocations.
1523             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1524             
1525             // now update the nursery to point to the new block
1526             cap->r.rCurrentNursery = bd;
1527             
1528             // we might be unlucky and have another thread get on the
1529             // run queue before us and steal the large block, but in that
1530             // case the thread will just end up requesting another large
1531             // block.
1532             PUSH_ON_RUN_QUEUE(t);
1533             return rtsFalse;  /* not actually GC'ing */
1534         }
1535     }
1536     
1537     IF_DEBUG(scheduler,
1538              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1539                         (long)t->id, whatNext_strs[t->what_next]));
1540 #if defined(GRAN)
1541     ASSERT(!is_on_queue(t,CurrentProc));
1542 #elif defined(PARALLEL_HASKELL)
1543     /* Currently we emit a DESCHEDULE event before GC in GUM.
1544        ToDo: either add separate event to distinguish SYSTEM time from rest
1545        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1546     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1547         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1548                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1549         emitSchedule = rtsTrue;
1550     }
1551 #endif
1552       
1553     PUSH_ON_RUN_QUEUE(t);
1554     return rtsTrue;
1555     /* actual GC is done at the end of the while loop in schedule() */
1556 }
1557
1558 /* -----------------------------------------------------------------------------
1559  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1560  * ASSUMES: sched_mutex
1561  * -------------------------------------------------------------------------- */
1562
1563 static void
1564 scheduleHandleStackOverflow( StgTSO *t)
1565 {
1566     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1567                                   (long)t->id, whatNext_strs[t->what_next]));
1568     /* just adjust the stack for this thread, then pop it back
1569      * on the run queue.
1570      */
1571     { 
1572         /* enlarge the stack */
1573         StgTSO *new_t = threadStackOverflow(t);
1574         
1575         /* This TSO has moved, so update any pointers to it from the
1576          * main thread stack.  It better not be on any other queues...
1577          * (it shouldn't be).
1578          */
1579         if (t->main != NULL) {
1580             t->main->tso = new_t;
1581         }
1582         PUSH_ON_RUN_QUEUE(new_t);
1583     }
1584 }
1585
1586 /* -----------------------------------------------------------------------------
1587  * Handle a thread that returned to the scheduler with ThreadYielding
1588  * ASSUMES: sched_mutex
1589  * -------------------------------------------------------------------------- */
1590
1591 static rtsBool
1592 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1593 {
1594     // Reset the context switch flag.  We don't do this just before
1595     // running the thread, because that would mean we would lose ticks
1596     // during GC, which can lead to unfair scheduling (a thread hogs
1597     // the CPU because the tick always arrives during GC).  This way
1598     // penalises threads that do a lot of allocation, but that seems
1599     // better than the alternative.
1600     context_switch = 0;
1601     
1602     /* put the thread back on the run queue.  Then, if we're ready to
1603      * GC, check whether this is the last task to stop.  If so, wake
1604      * up the GC thread.  getThread will block during a GC until the
1605      * GC is finished.
1606      */
1607     IF_DEBUG(scheduler,
1608              if (t->what_next != prev_what_next) {
1609                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1610                             (long)t->id, whatNext_strs[t->what_next]);
1611              } else {
1612                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1613                             (long)t->id, whatNext_strs[t->what_next]);
1614              }
1615         );
1616     
1617     IF_DEBUG(sanity,
1618              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1619              checkTSO(t));
1620     ASSERT(t->link == END_TSO_QUEUE);
1621     
1622     // Shortcut if we're just switching evaluators: don't bother
1623     // doing stack squeezing (which can be expensive), just run the
1624     // thread.
1625     if (t->what_next != prev_what_next) {
1626         return rtsTrue;
1627     }
1628     
1629 #if defined(GRAN)
1630     ASSERT(!is_on_queue(t,CurrentProc));
1631       
1632     IF_DEBUG(sanity,
1633              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1634              checkThreadQsSanity(rtsTrue));
1635
1636 #endif
1637
1638     addToRunQueue(t);
1639
1640 #if defined(GRAN)
1641     /* add a ContinueThread event to actually process the thread */
1642     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1643               ContinueThread,
1644               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1645     IF_GRAN_DEBUG(bq, 
1646                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1647                   G_EVENTQ(0);
1648                   G_CURR_THREADQ(0));
1649 #endif
1650     return rtsFalse;
1651 }
1652
1653 /* -----------------------------------------------------------------------------
1654  * Handle a thread that returned to the scheduler with ThreadBlocked
1655  * ASSUMES: sched_mutex
1656  * -------------------------------------------------------------------------- */
1657
1658 static void
1659 scheduleHandleThreadBlocked( StgTSO *t
1660 #if !defined(GRAN) && !defined(DEBUG)
1661     STG_UNUSED
1662 #endif
1663     )
1664 {
1665 #if defined(GRAN)
1666     IF_DEBUG(scheduler,
1667              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1668                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1669              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1670     
1671     // ??? needed; should emit block before
1672     IF_DEBUG(gran, 
1673              DumpGranEvent(GR_DESCHEDULE, t)); 
1674     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1675     /*
1676       ngoq Dogh!
1677       ASSERT(procStatus[CurrentProc]==Busy || 
1678       ((procStatus[Curren
1679    * from its creation
1680    */
1681 #endif
1682
1683 #if defined(GRAN) 
1684   if (RtsFlags.GranFlags.GranSimStats.Full) 
1685     DumpGranEvent(GR_START,tso);
1686 #elif defined(PARALLEL_HASKELL)
1687   if (RtsFlags.ParFlags.ParStats.Full) 
1688     DumpGranEvent(GR_STARTQ,tso);
1689   /* HACk to avoid SCHEDULE 
1690      LastTSO = tso; */
1691 #endif
1692
1693   /* Link the new thread on the global thread list.
1694    */
1695   tso->global_link = all_threads;
1696   all_threads = tso;
1697
1698 #if defined(DIST)
1699   tso->dist.priority = MandatoryPriority; //by default that is...
1700 #endif
1701
1702 #if defined(GRAN)
1703   tso->gran.pri = pri;
1704 # if defined(DEBUG)
1705   tso->gran.magic = TSO_MAGIC; // debugging only
1706 # endif
1707   tso->gran.sparkname   = 0;
1708   tso->gran.startedat   = CURRENT_TIME; 
1709   tso->gran.exported    = 0;
1710   tso->gran.basicblocks = 0;
1711   tso->gran.allocs      = 0;
1712   tso->gran.exectime    = 0;
1713   tso->gran.fetchtime   = 0;
1714   tso->gran.fetchcount  = 0;
1715   tso->gran.blocktime   = 0;
1716   tso->gran.blockcount  = 0;
1717   tso->gran.blockedat   = 0;
1718   tso->gran.globalsparks = 0;
1719   tso->gran.localsparks  = 0;
1720   if (RtsFlags.GranFlags.Light)
1721     tso->gran.clock  = Now; /* local clock */
1722   else
1723     tso->gran.clock  = 0;
1724
1725   IF_DEBUG(gran,printTSO(tso));
1726 #elif defined(PARALLEL_HASKELL)
1727 # if defined(DEBUG)
1728   tso->par.magic = TSO_MAGIC; // debugging only
1729 # endif
1730   tso->par.sparkname   = 0;
1731   tso->par.startedat   = CURRENT_TIME; 
1732   tso->par.exported    = 0;
1733   tso->par.basicblocks = 0;
1734   tso->par.allocs      = 0;
1735   tso->par.exectime    = 0;
1736   tso->par.fetchtime   = 0;
1737   tso->par.fetchcount  = 0;
1738   tso->par.blocktime   = 0;
1739   tso->par.blockcount  = 0;
1740   tso->par.blockedat   = 0;
1741   tso->par.globalsparks = 0;
1742   tso->par.localsparks  = 0;
1743 #endif
1744
1745 #if defined(GRAN)
1746   globalGranStats.tot_threads_created++;
1747   globalGranStats.threads_created_on_PE[CurrentProc]++;
1748   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1749   globalGranStats.tot_sq_probes++;
1750 #elif defined(PARALLEL_HASKELL)
1751   // collect parallel global statistics (currently done together with GC stats)
1752   if (RtsFlags.ParFlags.ParStats.Global &&
1753       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1754     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1755     globalParStats.tot_threads_created++;
1756   }
1757 #endif 
1758
1759 #if defined(GRAN)
1760   IF_GRAN_DEBUG(pri,
1761                 sched_belch("==__ schedule: Created TSO %d (%p);",
1762                       CurrentProc, tso, tso->id));
1763 #elif defined(PARALLEL_HASKELL)
1764   IF_PAR_DEBUG(verbose,
1765                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1766                            (long)tso->id, tso, advisory_thread_count));
1767 #else
1768   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1769                                  (long)tso->id, (long)tso->stack_size));
1770 #endif    
1771   return tso;
1772 }
1773
1774 #if defined(PAR)
1775 /* RFP:
1776    all parallel thread creation calls should fall through the following routine.
1777 */
1778 StgTSO *
1779 createThreadFromSpark(rtsSpark spark) 
1780 { StgTSO *tso;
1781   ASSERT(spark != (rtsSpark)NULL);
1782 // JB: TAKE CARE OF THIS COUNTER! BUGGY
1783   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1784   { threadsIgnored++;
1785     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1786           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1787     return END_TSO_QUEUE;
1788   }
1789   else
1790   { threadsCreated++;
1791     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1792     if (tso==END_TSO_QUEUE)     
1793       barf("createSparkThread: Cannot create TSO");
1794 #if defined(DIST)
1795     tso->priority = AdvisoryPriority;
1796 #endif
1797     pushClosure(tso,spark);
1798     addToRunQueue(tso);
1799     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
1800   }
1801   return tso;
1802 }
1803 #endif
1804
1805 /*
1806   Turn a spark into a thread.
1807   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1808 */
1809 #if 0
1810 StgTSO *
1811 activateSpark (rtsSpark spark) 
1812 {
1813   StgTSO *tso;
1814
1815   tso = createSparkThread(spark);
1816   if (RtsFlags.ParFlags.ParStats.Full) {   
1817     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1818       IF_PAR_DEBUG(verbose,
1819                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1820                               (StgClosure *)spark, info_type((StgClosure *)spark)));
1821   }
1822   // ToDo: fwd info on local/global spark to thread -- HWL
1823   // tso->gran.exported =  spark->exported;
1824   // tso->gran.locked =   !spark->global;
1825   // tso->gran.sparkname = spark->name;
1826
1827   return tso;
1828 }
1829 #endif
1830
1831 /* ---------------------------------------------------------------------------
1832  * scheduleThread()
1833  *
1834  * scheduleThread puts a thread on the head of the runnable queue.
1835  * This will usually be done immediately after a thread is created.
1836  * The caller of scheduleThread must create the thread using e.g.
1837  * createThread and push an appropriate closure
1838  * on this thread's stack before the scheduler is invoked.
1839  * ------------------------------------------------------------------------ */
1840
1841 static void
1842 scheduleThread_(StgTSO *tso)
1843 {
1844   // The thread goes at the *end* of the run-queue, to avoid possible
1845   // starvation of any threads already on the queue.
1846   APPEND_TO_RUN_QUEUE(tso);
1847   threadRunnable();
1848 }
1849
1850 void
1851 scheduleThread(StgTSO* tso)
1852 {
1853   ACQUIRE_LOCK(&sched_mutex);
1854   scheduleThread_(tso);
1855   RELEASE_LOCK(&sched_mutex);
1856 }
1857
1858 #if defined(RTS_SUPPORTS_THREADS)
1859 static Condition bound_cond_cache;
1860 static int bound_cond_cache_full = 0;
1861 #endif
1862
1863
1864 SchedulerStatus
1865 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1866                    Capability *initialCapability)
1867 {
1868     // Precondition: sched_mutex must be held
1869     StgMainThread *m;
1870
1871     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1872     m->tso = tso;
1873     tso->main = m;
1874     m->ret = ret;
1875     m->stat = NoStatus;
1876     m->link = main_threads;
1877     m->prev = NULL;
1878     if (main_threads != NULL) {
1879         main_threads->prev = m;
1880     }
1881     main_threads = m;
1882
1883 #if defined(RTS_SUPPORTS_THREADS)
1884     // Allocating a new condition for each thread is expensive, so we
1885     // cache one.  This is a pretty feeble hack, but it helps speed up
1886     // consecutive call-ins quite a bit.
1887     if (bound_cond_cache_full) {
1888         m->bound_thread_cond = bound_cond_cache;
1889         bound_cond_cache_full = 0;
1890     } else {
1891         initCondition(&m->bound_thread_cond);
1892     }
1893 #endif
1894
1895     /* Put the thread on the main-threads list prior to scheduling the TSO.
1896        Failure to do so introduces a race condition in the MT case (as
1897        identified by Wolfgang Thaller), whereby the new task/OS thread 
1898        created by scheduleThread_() would complete prior to the thread
1899        that spawned it managed to put 'itself' on the main-threads list.
1900        The upshot of it all being that the worker thread wouldn't get to
1901        signal the completion of the its work item for the main thread to
1902        see (==> it got stuck waiting.)    -- sof 6/02.
1903     */
1904     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1905     
1906     APPEND_TO_RUN_QUEUE(tso);
1907     // NB. Don't call threadRunnable() here, because the thread is
1908     // bound and only runnable by *this* OS thread, so waking up other
1909     // workers will just slow things down.
1910
1911     return waitThread_(m, initialCapability);
1912 }
1913
1914 /* ---------------------------------------------------------------------------
1915  * initScheduler()
1916  *
1917  * Initialise the scheduler.  This resets all the queues - if the
1918  * queues contained any threads, they'll be garbage collected at the
1919  * next pass.
1920  *
1921  * ------------------------------------------------------------------------ */
1922
1923 void 
1924 initScheduler(void)
1925 {
1926 #if defined(GRAN)
1927   nat i;
1928
1929   for (i=0; i<=MAX_PROC; i++) {
1930     run_queue_hds[i]      = END_TSO_QUEUE;
1931     run_queue_tls[i]      = END_TSO_QUEUE;
1932     blocked_queue_hds[i]  = END_TSO_QUEUE;
1933     blocked_queue_tls[i]  = END_TSO_QUEUE;
1934     ccalling_threadss[i]  = END_TSO_QUEUE;
1935     blackhole_queue[i]    = END_TSO_QUEUE;
1936     sleeping_queue        = END_TSO_QUEUE;
1937   }
1938 #else
1939   run_queue_hd      = END_TSO_QUEUE;
1940   run_queue_tl      = END_TSO_QUEUE;
1941   blocked_queue_hd  = END_TSO_QUEUE;
1942   blocked_queue_tl  = END_TSO_QUEUE;
1943   blackhole_queue   = END_TSO_QUEUE;
1944   sleeping_queue    = END_TSO_QUEUE;
1945 #endif 
1946
1947   suspended_ccalling_threads  = END_TSO_QUEUE;
1948
1949   main_threads = NULL;
1950   all_threads  = END_TSO_QUEUE;
1951
1952   context_switch = 0;
1953   interrupted    = 0;
1954
1955   RtsFlags.ConcFlags.ctxtSwitchTicks =
1956       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1957       
1958 #if defined(RTS_SUPPORTS_THREADS)
1959   /* Initialise the mutex and condition variables used by
1960    * the scheduler. */
1961   initMutex(&sched_mutex);
1962   initMutex(&term_mutex);
1963 #endif
1964   
1965   ACQUIRE_LOCK(&sched_mutex);
1966
1967   /* A capability holds the state a native thread needs in
1968    * order to execute STG code. At least one capability is
1969    * floating around (only SMP builds have more than one).
1970    */
1971   initCapabilities();
1972   
1973 #if defined(RTS_SUPPORTS_THREADS)
1974   initTaskManager();
1975 #endif
1976
1977 #if defined(SMP)
1978   /* eagerly start some extra workers */
1979   startingWorkerThread = RtsFlags.ParFlags.nNodes;
1980   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
1981 #endif
1982
1983 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
1984   initSparkPools();
1985 #endif
1986
1987   RELEASE_LOCK(&sched_mutex);
1988 }
1989
1990 void
1991 exitScheduler( void )
1992 {
1993     interrupted = rtsTrue;
1994     shutting_down_scheduler = rtsTrue;
1995 #if defined(RTS_SUPPORTS_THREADS)
1996     if (threadIsTask(osThreadId())) { taskStop(); }
1997     stopTaskManager();
1998 #endif
1999 }
2000
2001 /* ----------------------------------------------------------------------------
2002    Managing the per-task allocation areas.
2003    
2004    Each capability comes with an allocation area.  These are
2005    fixed-length block lists into which allocation can be done.
2006
2007    ToDo: no support for two-space collection at the moment???
2008    ------------------------------------------------------------------------- */
2009
2010 static SchedulerStatus
2011 waitThread_(StgMainThread* m, Capability *initialCapability)
2012 {
2013   SchedulerStatus stat;
2014
2015   // Precondition: sched_mutex must be held.
2016   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2017
2018 #if defined(GRAN)
2019   /* GranSim specific init */
2020   CurrentTSO = m->tso;                // the TSO to run
2021   procStatus[MainProc] = Busy;        // status of main PE
2022   CurrentProc = MainProc;             // PE to run it on
2023   schedule(m,initialCapability);
2024 #else
2025   schedule(m,initialCapability);
2026   ASSERT(m->stat != NoStatus);
2027 #endif
2028
2029   stat = m->stat;
2030
2031 #if defined(RTS_SUPPORTS_THREADS)
2032   // Free the condition variable, returning it to the cache if possible.
2033   if (!bound_cond_cache_full) {
2034       bound_cond_cache = m->bound_thread_cond;
2035       bound_cond_cache_full = 1;
2036   } else {
2037       closeCondition(&m->bound_thread_cond);
2038   }
2039 #endif
2040
2041   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2042   stgFree(m);
2043
2044   // Postcondition: sched_mutex still held
2045   return stat;
2046 }
2047
2048 /* ---------------------------------------------------------------------------
2049    Where are the roots that we know about?
2050
2051         - all the threads on the runnable queue
2052         - all the threads on the blocked queue
2053         - all the threads on the sleeping queue
2054         - all the thread currently executing a _ccall_GC
2055         - all the "main threads"
2056      
2057    ------------------------------------------------------------------------ */
2058
2059 /* This has to be protected either by the scheduler monitor, or by the
2060         garbage collection monitor (probably the latter).
2061         KH @ 25/10/99
2062 */
2063
2064 void
2065 GetRoots( evac_fn evac )
2066 {
2067 #if defined(GRAN)
2068   {
2069     nat i;
2070     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2071       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2072           evac((StgClosure **)&run_queue_hds[i]);
2073       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2074           evac((StgClosure **)&run_queue_tls[i]);
2075       
2076       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2077           evac((StgClosure **)&blocked_queue_hds[i]);
2078       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2079           evac((StgClosure **)&blocked_queue_tls[i]);
2080       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2081           evac((StgClosure **)&ccalling_threads[i]);
2082     }
2083   }
2084
2085   markEventQueue();
2086
2087 #else /* !GRAN */
2088   if (run_queue_hd != END_TSO_QUEUE) {
2089       ASSERT(run_queue_tl != END_TSO_QUEUE);
2090       evac((StgClosure **)&run_queue_hd);
2091       evac((StgClosure **)&run_queue_tl);
2092   }
2093   
2094   if (blocked_queue_hd != END_TSO_QUEUE) {
2095       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2096       evac((StgClosure **)&blocked_queue_hd);
2097       evac((StgClosure **)&blocked_queue_tl);
2098   }
2099   
2100   if (sleeping_queue != END_TSO_QUEUE) {
2101       evac((StgClosure **)&sleeping_queue);
2102   }
2103 #endif 
2104
2105   if (blackhole_queue != END_TSO_QUEUE) {
2106       evac((StgClosure **)&blackhole_queue);
2107   }
2108
2109   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2110       evac((StgClosure **)&suspended_ccalling_threads);
2111   }
2112
2113 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2114   markSparkQueue(evac);
2115 #endif
2116
2117 #if defined(RTS_USER_SIGNALS)
2118   // mark the signal handlers (signals should be already blocked)
2119   markSignalHandlers(evac);
2120 #endif
2121 }
2122
2123 /* -----------------------------------------------------------------------------
2124    performGC
2125
2126    This is the interface to the garbage collector from Haskell land.
2127    We provide this so that external C code can allocate and garbage
2128    collect when called from Haskell via _ccall_GC.
2129
2130    It might be useful to provide an interface whereby the programmer
2131    can specify more roots (ToDo).
2132    
2133    This needs to be protected by the GC condition variable above.  KH.
2134    -------------------------------------------------------------------------- */
2135
2136 static void (*extra_roots)(evac_fn);
2137
2138 void
2139 performGC(void)
2140 {
2141   /* Obligated to hold this lock upon entry */
2142   ACQUIRE_LOCK(&sched_mutex);
2143   GarbageCollect(GetRoots,rtsFalse);
2144   RELEASE_LOCK(&sched_mutex);
2145 }
2146
2147 void
2148 performMajorGC(void)
2149 {
2150   ACQUIRE_LOCK(&sched_mutex);
2151   GarbageCollect(GetRoots,rtsTrue);
2152   RELEASE_LOCK(&sched_mutex);
2153 }
2154
2155 static void
2156 AllRoots(evac_fn evac)
2157 {
2158     GetRoots(evac);             // the scheduler's roots
2159     extra_roots(evac);          // the user's roots
2160 }
2161
2162 void
2163 performGCWithRoots(void (*get_roots)(evac_fn))
2164 {
2165   ACQUIRE_LOCK(&sched_mutex);
2166   extra_roots = get_roots;
2167   GarbageCollect(AllRoots,rtsFalse);
2168   RELEASE_LOCK(&sched_mutex);
2169 }
2170
2171 /* -----------------------------------------------------------------------------
2172    Stack overflow
2173
2174    If the thread has reached its maximum stack size, then raise the
2175    StackOverflow exception in the offending thread.  Otherwise
2176    relocate the TSO into a larger chunk of memory and adjust its stack
2177    size appropriately.
2178    -------------------------------------------------------------------------- */
2179
2180 static StgTSO *
2181 threadStackOverflow(StgTSO *tso)
2182 {
2183   nat new_stack_size, stack_words;
2184   lnat new_tso_size;
2185   StgPtr new_sp;
2186   StgTSO *dest;
2187
2188   IF_DEBUG(sanity,checkTSO(tso));
2189   if (tso->stack_size >= tso->max_stack_size) {
2190
2191     IF_DEBUG(gc,
2192              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2193                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2194              /* If we're debugging, just print out the top of the stack */
2195              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2196                                               tso->sp+64)));
2197
2198     /* Send this thread the StackOverflow exception */
2199     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2200     return tso;
2201   }
2202
2203   /* Try to double the current stack size.  If that takes us over the
2204    * maximum stack size for this thread, then use the maximum instead.
2205    * Finally round up so the TSO ends up as a whole number of blocks.
2206    */
2207   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2208   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2209                                        TSO_STRUCT_SIZE)/sizeof(W_);
2210   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2211   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2212
2213   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2214
2215   dest = (StgTSO *)allocate(new_tso_size);
2216   TICK_ALLOC_TSO(new_stack_size,0);
2217
2218   /* copy the TSO block and the old stack into the new area */
2219   memcpy(dest,tso,TSO_STRUCT_SIZE);
2220   stack_words = tso->stack + tso->stack_size - tso->sp;
2221   new_sp = (P_)dest + new_tso_size - stack_words;
2222   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2223
2224   /* relocate the stack pointers... */
2225   dest->sp         = new_sp;
2226   dest->stack_size = new_stack_size;
2227         
2228   /* Mark the old TSO as relocated.  We have to check for relocated
2229    * TSOs in the garbage collector and any primops that deal with TSOs.
2230    *
2231    * It's important to set the sp value to just beyond the end
2232    * of the stack, so we don't attempt to scavenge any part of the
2233    * dead TSO's stack.
2234    */
2235   tso->what_next = ThreadRelocated;
2236   tso->link = dest;
2237   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2238   tso->why_blocked = NotBlocked;
2239
2240   IF_PAR_DEBUG(verbose,
2241                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2242                      tso->id, tso, tso->stack_size);
2243                /* If we're debugging, just print out the top of the stack */
2244                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2245                                                 tso->sp+64)));
2246   
2247   IF_DEBUG(sanity,checkTSO(tso));
2248 #if 0
2249   IF_DEBUG(scheduler,printTSO(dest));
2250 #endif
2251
2252   return dest;
2253 }
2254
2255 /* ---------------------------------------------------------------------------
2256    Wake up a queue that was blocked on some resource.
2257    ------------------------------------------------------------------------ */
2258
2259 #if defined(GRAN)
2260 STATIC_INLINE void
2261 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2262 {
2263 }
2264 #elif defined(PARALLEL_HASKELL)
2265 STATIC_INLINE void
2266 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2267 {
2268   /* write RESUME events to log file and
2269      update blocked and fetch time (depending on type of the orig closure) */
2270   if (RtsFlags.ParFlags.ParStats.Full) {
2271     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2272                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2273                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2274     if (EMPTY_RUN_QUEUE())
2275       emitSchedule = rtsTrue;
2276
2277     switch (get_itbl(node)->type) {
2278         case FETCH_ME_BQ:
2279           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2280           break;
2281         case RBH:
2282         case FETCH_ME:
2283         case BLACKHOLE_BQ:
2284           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2285           break;
2286 #ifdef DIST
2287         case MVAR:
2288           break;
2289 #endif    
2290         default:
2291           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2292         }
2293       }
2294 }
2295 #endif
2296
2297 #if defined(GRAN)
2298 StgBlockingQueueElement *
2299 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2300 {
2301     StgTSO *tso;
2302     PEs node_loc, tso_loc;
2303
2304     node_loc = where_is(node); // should be lifted out of loop
2305     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2306     tso_loc = where_is((StgClosure *)tso);
2307     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2308       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2309       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2310       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2311       // insertThread(tso, node_loc);
2312       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2313                 ResumeThread,
2314                 tso, node, (rtsSpark*)NULL);
2315       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2316       // len_local++;
2317       // len++;
2318     } else { // TSO is remote (actually should be FMBQ)
2319       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2320                                   RtsFlags.GranFlags.Costs.gunblocktime +
2321                                   RtsFlags.GranFlags.Costs.latency;
2322       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2323                 UnblockThread,
2324                 tso, node, (rtsSpark*)NULL);
2325       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2326       // len++;
2327     }
2328     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2329     IF_GRAN_DEBUG(bq,
2330                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2331                           (node_loc==tso_loc ? "Local" : "Global"), 
2332                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2333     tso->block_info.closure = NULL;
2334     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2335                              tso->id, tso));
2336 }
2337 #elif defined(PARALLEL_HASKELL)
2338 StgBlockingQueueElement *
2339 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2340 {
2341     StgBlockingQueueElement *next;
2342
2343     switch (get_itbl(bqe)->type) {
2344     case TSO:
2345       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2346       /* if it's a TSO just push it onto the run_queue */
2347       next = bqe->link;
2348       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2349       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2350       threadRunnable();
2351       unblockCount(bqe, node);
2352       /* reset blocking status after dumping event */
2353       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2354       break;
2355
2356     case BLOCKED_FETCH:
2357       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2358       next = bqe->link;
2359       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2360       PendingFetches = (StgBlockedFetch *)bqe;
2361       break;
2362
2363 # if defined(DEBUG)
2364       /* can ignore this case in a non-debugging setup; 
2365          see comments on RBHSave closures above */
2366     case CONSTR:
2367       /* check that the closure is an RBHSave closure */
2368       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2369              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2370              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2371       break;
2372
2373     default:
2374       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2375            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2376            (StgClosure *)bqe);
2377 # endif
2378     }
2379   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2380   return next;
2381 }
2382
2383 #else /* !GRAN && !PARALLEL_HASKELL */
2384 StgTSO *
2385 unblockOneLocked(StgTSO *tso)
2386 {
2387   StgTSO *next;
2388
2389   ASSERT(get_itbl(tso)->type == TSO);
2390   ASSERT(tso->why_blocked != NotBlocked);
2391   tso->why_blocked = NotBlocked;
2392   next = tso->link;
2393   tso->link = END_TSO_QUEUE;
2394   APPEND_TO_RUN_QUEUE(tso);
2395   threadRunnable();
2396   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2397   return next;
2398 }
2399 #endif
2400
2401 #if defined(GRAN) || defined(PARALLEL_HASKELL)
2402 INLINE_ME StgBlockingQueueElement *
2403 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2404 {
2405   ACQUIRE_LOCK(&sched_mutex);
2406   bqe = unblockOneLocked(bqe, node);
2407   RELEASE_LOCK(&sched_mutex);
2408   return bqe;
2409 }
2410 #else
2411 INLINE_ME StgTSO *
2412 unblockOne(StgTSO *tso)
2413 {
2414   ACQUIRE_LOCK(&sched_mutex);
2415   tso = unblockOneLocked(tso);
2416   RELEASE_LOCK(&sched_mutex);
2417   return tso;
2418 }
2419 #endif
2420
2421 #if defined(GRAN)
2422 void 
2423 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2424 {
2425   StgBlockingQueueElement *bqe;
2426   PEs node_loc;
2427   nat len = 0; 
2428
2429   IF_GRAN_DEBUG(bq, 
2430                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2431                       node, CurrentProc, CurrentTime[CurrentProc], 
2432                       CurrentTSO->id, CurrentTSO));
2433
2434   node_loc = where_is(node);
2435
2436   ASSERT(q == END_BQ_QUEUE ||
2437          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2438          get_itbl(q)->type == CONSTR); // closure (type constructor)
2439   ASSERT(is_unique(node));
2440
2441   /* FAKE FETCH: magically copy the node to the tso's proc;
2442      no Fetch necessary because in reality the node should not have been 
2443      moved to the other PE in the first place
2444   */
2445   if (CurrentProc!=node_loc) {
2446     IF_GRAN_DEBUG(bq, 
2447                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2448                         node, node_loc, CurrentProc, CurrentTSO->id, 
2449                         // CurrentTSO, where_is(CurrentTSO),
2450                         node->header.gran.procs));
2451     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2452     IF_GRAN_DEBUG(bq, 
2453                   debugBelch("## new bitmask of node %p is %#x\n",
2454                         node, node->header.gran.procs));
2455     if (RtsFlags.GranFlags.GranSimStats.Global) {
2456       globalGranStats.tot_fake_fetches++;
2457     }
2458   }
2459
2460   bqe = q;
2461   // ToDo: check: ASSERT(CurrentProc==node_loc);
2462   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2463     //next = bqe->link;
2464     /* 
2465        bqe points to the current element in the queue
2466        next points to the next element in the queue
2467     */
2468     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2469     //tso_loc = where_is(tso);
2470     len++;
2471     bqe = unblockOneLocked(bqe, node);
2472   }
2473
2474   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2475      the closure to make room for the anchor of the BQ */
2476   if (bqe!=END_BQ_QUEUE) {
2477     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2478     /*
2479     ASSERT((info_ptr==&RBH_Save_0_info) ||
2480            (info_ptr==&RBH_Save_1_info) ||
2481            (info_ptr==&RBH_Save_2_info));
2482     */
2483     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2484     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2485     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2486
2487     IF_GRAN_DEBUG(bq,
2488                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2489                         node, info_type(node)));
2490   }
2491
2492   /* statistics gathering */
2493   if (RtsFlags.GranFlags.GranSimStats.Global) {
2494     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2495     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2496     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2497     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2498   }
2499   IF_GRAN_DEBUG(bq,
2500                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2501                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2502 }
2503 #elif defined(PARALLEL_HASKELL)
2504 void 
2505 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2506 {
2507   StgBlockingQueueElement *bqe;
2508
2509   ACQUIRE_LOCK(&sched_mutex);
2510
2511   IF_PAR_DEBUG(verbose, 
2512                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2513                      node, mytid));
2514 #ifdef DIST  
2515   //RFP
2516   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2517     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2518     return;
2519   }
2520 #endif
2521   
2522   ASSERT(q == END_BQ_QUEUE ||
2523          get_itbl(q)->type == TSO ||           
2524          get_itbl(q)->type == BLOCKED_FETCH || 
2525          get_itbl(q)->type == CONSTR); 
2526
2527   bqe = q;
2528   while (get_itbl(bqe)->type==TSO || 
2529          get_itbl(bqe)->type==BLOCKED_FETCH) {
2530     bqe = unblockOneLocked(bqe, node);
2531   }
2532   RELEASE_LOCK(&sched_mutex);
2533 }
2534
2535 #else   /* !GRAN && !PARALLEL_HASKELL */
2536
2537 void
2538 awakenBlockedQueueNoLock(StgTSO *tso)
2539 {
2540   while (tso != END_TSO_QUEUE) {
2541     tso = unblockOneLocked(tso);
2542   }
2543 }
2544
2545 void
2546 awakenBlockedQueue(StgTSO *tso)
2547 {
2548   ACQUIRE_LOCK(&sched_mutex);
2549   while (tso != END_TSO_QUEUE) {
2550     tso = unblockOneLocked(tso);
2551   }
2552   RELEASE_LOCK(&sched_mutex);
2553 }
2554 #endif
2555
2556 /* ---------------------------------------------------------------------------
2557    Interrupt execution
2558    - usually called inside a signal handler so it mustn't do anything fancy.   
2559    ------------------------------------------------------------------------ */
2560
2561 void
2562 interruptStgRts(void)
2563 {
2564     interrupted    = 1;
2565     context_switch = 1;
2566     threadRunnable();
2567     /* ToDo: if invoked from a signal handler, this threadRunnable
2568      * only works if there's another thread (not this one) waiting to
2569      * be woken up.
2570      */
2571 }
2572
2573 /* -----------------------------------------------------------------------------
2574    Unblock a thread
2575
2576    This is for use when we raise an exception in another thread, which
2577    may be blocked.
2578    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2579    -------------------------------------------------------------------------- */
2580
2581 #if defined(GRAN) || defined(PARALLEL_HASKELL)
2582 /*
2583   NB: only the type of the blocking queue is different in GranSim and GUM
2584       the operations on the queue-elements are the same
2585       long live polymorphism!
2586
2587   Locks: sched_mutex is held upon entry and exit.
2588
2589 */
2590 static void
2591 unblockThread(StgTSO *tso)
2592 {
2593   StgBlockingQueueElement *t, **last;
2594
2595   switch (tso->why_blocked) {
2596
2597   case NotBlocked:
2598     return;  /* not blocked */
2599
2600   case BlockedOnSTM:
2601     // Be careful: nothing to do here!  We tell the scheduler that the thread
2602     // is runnable and we leave it to the stack-walking code to abort the 
2603     // transaction while unwinding the stack.  We should perhaps have a debugging
2604     // test to make sure that this really happens and that the 'zombie' transaction
2605     // does not get committed.
2606     goto done;
2607
2608   case BlockedOnMVar:
2609     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2610     {
2611       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2612       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2613
2614       last = (StgBlockingQueueElement **)&mvar->head;
2615       for (t = (StgBlockingQueueElement *)mvar->head; 
2616            t != END_BQ_QUEUE; 
2617            last = &t->link, last_tso = t, t = t->link) {
2618         if (t == (StgBlockingQueueElement *)tso) {
2619           *last = (StgBlockingQueueElement *)tso->link;
2620           if (mvar->tail == tso) {
2621             mvar->tail = (StgTSO *)last_tso;
2622           }
2623           goto done;
2624         }
2625       }
2626       barf("unblockThread (MVAR): TSO not found");
2627     }
2628
2629   case BlockedOnBlackHole:
2630     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2631     {
2632       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2633
2634       last = &bq->blocking_queue;
2635       for (t = bq->blocking_queue; 
2636            t != END_BQ_QUEUE; 
2637            last = &t->link, t = t->link) {
2638         if (t == (StgBlockingQueueElement *)tso) {
2639           *last = (StgBlockingQueueElement *)tso->link;
2640           goto done;
2641         }
2642       }
2643       barf("unblockThread (BLACKHOLE): TSO not found");
2644     }
2645
2646   case BlockedOnException:
2647     {
2648       StgTSO *target  = tso->block_info.tso;
2649
2650       ASSERT(get_itbl(target)->type == TSO);
2651
2652       if (target->what_next == ThreadRelocated) {
2653           target = target->link;
2654           ASSERT(get_itbl(target)->type == TSO);
2655       }
2656
2657       ASSERT(target->blocked_exceptions != NULL);
2658
2659       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2660       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2661            t != END_BQ_QUEUE; 
2662            last = &t->link, t = t->link) {
2663         ASSERT(get_itbl(t)->type == TSO);
2664         if (t == (StgBlockingQueueElement *)tso) {
2665           *last = (StgBlockingQueueElement *)tso->link;
2666           goto done;
2667         }
2668       }
2669       barf("unblockThread (Exception): TSO not found");
2670     }
2671
2672   case BlockedOnRead:
2673   case BlockedOnWrite:
2674 #if defined(mingw32_HOST_OS)
2675   case BlockedOnDoProc:
2676 #endif
2677     {
2678       /* take TSO off blocked_queue */
2679       StgBlockingQueueElement *prev = NULL;
2680       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2681            prev = t, t = t->link) {
2682         if (t == (StgBlockingQueueElement *)tso) {
2683           if (prev == NULL) {
2684             blocked_queue_hd = (StgTSO *)t->link;
2685             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2686               blocked_queue_tl = END_TSO_QUEUE;
2687             }
2688           } else {
2689             prev->link = t->link;
2690             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2691               blocked_queue_tl = (StgTSO *)prev;
2692             }
2693           }
2694 #if defined(mingw32_HOST_OS)
2695           /* (Cooperatively) signal that the worker thread should abort
2696            * the request.
2697            */
2698           abandonWorkRequest(tso->block_info.async_result->reqID);
2699 #endif
2700           goto done;
2701         }
2702       }
2703       barf("unblockThread (I/O): TSO not found");
2704     }
2705
2706   case BlockedOnDelay:
2707     {
2708       /* take TSO off sleeping_queue */
2709       StgBlockingQueueElement *prev = NULL;
2710       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2711            prev = t, t = t->link) {
2712         if (t == (StgBlockingQueueElement *)tso) {
2713           if (prev == NULL) {
2714             sleeping_queue = (StgTSO *)t->link;
2715           } else {
2716             prev->link = t->link;
2717           }
2718           goto done;
2719         }
2720       }
2721       barf("unblockThread (delay): TSO not found");
2722     }
2723
2724   default:
2725     barf("unblockThread");
2726   }
2727
2728  done:
2729   tso->link = END_TSO_QUEUE;
2730   tso->why_blocked = NotBlocked;
2731   tso->block_info.closure = NULL;
2732   PUSH_ON_RUN_QUEUE(tso);
2733 }
2734 #else
2735 static void
2736 unblockThread(StgTSO *tso)
2737 {
2738   StgTSO *t, **last;
2739   
2740   /* To avoid locking unnecessarily. */
2741   if (tso->why_blocked == NotBlocked) {
2742     return;
2743   }
2744
2745   switch (tso->why_blocked) {
2746
2747   case BlockedOnSTM:
2748     // Be careful: nothing to do here!  We tell the scheduler that the thread
2749     // is runnable and we leave it to the stack-walking code to abort the 
2750     // transaction while unwinding the stack.  We should perhaps have a debugging
2751     // test to make sure that this really happens and that the 'zombie' transaction
2752     // does not get committed.
2753     goto done;
2754
2755   case BlockedOnMVar:
2756     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2757     {
2758       StgTSO *last_tso = END_TSO_QUEUE;
2759       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2760
2761       last = &mvar->head;
2762       for (t = mvar->head; t != END_TSO_QUEUE; 
2763            last = &t->link, last_tso = t, t = t->link) {
2764         if (t == tso) {
2765           *last = tso->link;
2766           if (mvar->tail == tso) {
2767             mvar->tail = last_tso;
2768           }
2769           goto done;
2770         }
2771       }
2772       barf("unblockThread (MVAR): TSO not found");
2773     }
2774
2775   case BlockedOnBlackHole:
2776     {
2777       last = &blackhole_queue;
2778       for (t = blackhole_queue; t != END_TSO_QUEUE; 
2779            last = &t->link, t = t->link) {
2780         if (t == tso) {
2781           *last = tso->link;
2782           goto done;
2783         }
2784       }
2785       barf("unblockThread (BLACKHOLE): TSO not found");
2786     }
2787
2788   case BlockedOnException:
2789     {
2790       StgTSO *target  = tso->block_info.tso;
2791
2792       ASSERT(get_itbl(target)->type == TSO);
2793
2794       while (target->what_next == ThreadRelocated) {
2795           target = target->link;
2796           ASSERT(get_itbl(target)->type == TSO);
2797       }
2798       
2799       ASSERT(target->blocked_exceptions != NULL);
2800
2801       last = &target->blocked_exceptions;
2802       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2803            last = &t->link, t = t->link) {
2804         ASSERT(get_itbl(t)->type == TSO);
2805         if (t == tso) {
2806           *last = tso->link;
2807           goto done;
2808         }
2809       }
2810       barf("unblockThread (Exception): TSO not found");
2811     }
2812
2813   case BlockedOnRead:
2814   case BlockedOnWrite:
2815 #if defined(mingw32_HOST_OS)
2816   case BlockedOnDoProc:
2817 #endif
2818     {
2819       StgTSO *prev = NULL;
2820       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2821            prev = t, t = t->link) {
2822         if (t == tso) {
2823           if (prev == NULL) {
2824             blocked_queue_hd = t->link;
2825             if (blocked_queue_tl == t) {
2826               blocked_queue_tl = END_TSO_QUEUE;
2827             }
2828           } else {
2829             prev->link = t->link;
2830             if (blocked_queue_tl == t) {
2831               blocked_queue_tl = prev;
2832             }
2833           }
2834 #if defined(mingw32_HOST_OS)
2835           /* (Cooperatively) signal that the worker thread should abort
2836            * the request.
2837            */
2838           abandonWorkRequest(tso->block_info.async_result->reqID);
2839 #endif
2840           goto done;
2841         }
2842       }
2843       barf("unblockThread (I/O): TSO not found");
2844     }
2845
2846   case BlockedOnDelay:
2847     {
2848       StgTSO *prev = NULL;
2849       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2850            prev = t, t = t->link) {
2851         if (t == tso) {
2852           if (prev == NULL) {
2853             sleeping_queue = t->link;
2854           } else {
2855             prev->link = t->link;
2856           }
2857           goto done;
2858         }
2859       }
2860       barf("unblockThread (delay): TSO not found");
2861     }
2862
2863   default:
2864     barf("unblockThread");
2865   }
2866
2867  done:
2868   tso->link = END_TSO_QUEUE;
2869   tso->why_blocked = NotBlocked;
2870   tso->block_info.closure = NULL;
2871   APPEND_TO_RUN_QUEUE(tso);
2872 }
2873 #endif
2874
2875 /* -----------------------------------------------------------------------------
2876  * checkBlackHoles()
2877  *
2878  * Check the blackhole_queue for threads that can be woken up.  We do
2879  * this periodically: before every GC, and whenever the run queue is
2880  * empty.
2881  *
2882  * An elegant solution might be to just wake up all the blocked
2883  * threads with awakenBlockedQueue occasionally: they'll go back to
2884  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2885  * doesn't give us a way to tell whether we've actually managed to
2886  * wake up any threads, so we would be busy-waiting.
2887  *
2888  * -------------------------------------------------------------------------- */
2889
2890 static rtsBool
2891 checkBlackHoles( void )
2892 {
2893     StgTSO **prev, *t;
2894     rtsBool any_woke_up = rtsFalse;
2895     StgHalfWord type;
2896
2897     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
2898
2899     // ASSUMES: sched_mutex
2900     prev = &blackhole_queue;
2901     t = blackhole_queue;
2902     while (t != END_TSO_QUEUE) {
2903         ASSERT(t->why_blocked == BlockedOnBlackHole);
2904         type = get_itbl(t->block_info.closure)->type;
2905         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2906             t = unblockOneLocked(t);
2907             *prev = t;
2908             any_woke_up = rtsTrue;
2909         } else {
2910             prev = &t->link;
2911             t = t->link;
2912         }
2913     }
2914
2915     return any_woke_up;
2916 }
2917
2918 /* -----------------------------------------------------------------------------
2919  * raiseAsync()
2920  *
2921  * The following function implements the magic for raising an
2922  * asynchronous exception in an existing thread.
2923  *
2924  * We first remove the thread from any queue on which it might be
2925  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2926  *
2927  * We strip the stack down to the innermost CATCH_FRAME, building
2928  * thunks in the heap for all the active computations, so they can 
2929  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2930  * an application of the handler to the exception, and push it on
2931  * the top of the stack.
2932  * 
2933  * How exactly do we save all the active computations?  We create an
2934  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2935  * AP_STACKs pushes everything from the corresponding update frame
2936  * upwards onto the stack.  (Actually, it pushes everything up to the
2937  * next update frame plus a pointer to the next AP_STACK object.
2938  * Entering the next AP_STACK object pushes more onto the stack until we
2939  * reach the last AP_STACK object - at which point the stack should look
2940  * exactly as it did when we killed the TSO and we can continue
2941  * execution by entering the closure on top of the stack.
2942  *
2943  * We can also kill a thread entirely - this happens if either (a) the 
2944  * exception passed to raiseAsync is NULL, or (b) there's no
2945  * CATCH_FRAME on the stack.  In either case, we strip the entire
2946  * stack and replace the thread with a zombie.
2947  *
2948  * Locks: sched_mutex held upon entry nor exit.
2949  *
2950  * -------------------------------------------------------------------------- */
2951  
2952 void 
2953 deleteThread(StgTSO *tso)
2954 {
2955   if (tso->why_blocked != BlockedOnCCall &&
2956       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2957       raiseAsync(tso,NULL);
2958   }
2959 }
2960
2961 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2962 static void 
2963 deleteThreadImmediately(StgTSO *tso)
2964 { // for forkProcess only:
2965   // delete thread without giving it a chance to catch the KillThread exception
2966
2967   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2968       return;
2969   }
2970
2971   if (tso->why_blocked != BlockedOnCCall &&
2972       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2973     unblockThread(tso);
2974   }
2975
2976   tso->what_next = ThreadKilled;
2977 }
2978 #endif
2979
2980 void
2981 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2982 {
2983   /* When raising async exs from contexts where sched_mutex isn't held;
2984      use raiseAsyncWithLock(). */
2985   ACQUIRE_LOCK(&sched_mutex);
2986   raiseAsync(tso,exception);
2987   RELEASE_LOCK(&sched_mutex);
2988 }
2989
2990 void
2991 raiseAsync(StgTSO *tso, StgClosure *exception)
2992 {
2993     raiseAsync_(tso, exception, rtsFalse);
2994 }
2995
2996 static void
2997 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
2998 {
2999     StgRetInfoTable *info;
3000     StgPtr sp;
3001   
3002     // Thread already dead?
3003     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3004         return;
3005     }
3006
3007     IF_DEBUG(scheduler, 
3008              sched_belch("raising exception in thread %ld.", (long)tso->id));
3009     
3010     // Remove it from any blocking queues
3011     unblockThread(tso);
3012
3013     sp = tso->sp;
3014     
3015     // The stack freezing code assumes there's a closure pointer on
3016     // the top of the stack, so we have to arrange that this is the case...
3017     //
3018     if (sp[0] == (W_)&stg_enter_info) {
3019         sp++;
3020     } else {
3021         sp--;
3022         sp[0] = (W_)&stg_dummy_ret_closure;
3023     }
3024
3025     while (1) {
3026         nat i;
3027
3028         // 1. Let the top of the stack be the "current closure"
3029         //
3030         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3031         // CATCH_FRAME.
3032         //
3033         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3034         // current closure applied to the chunk of stack up to (but not
3035         // including) the update frame.  This closure becomes the "current
3036         // closure".  Go back to step 2.
3037         //
3038         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3039         // top of the stack applied to the exception.
3040         // 
3041         // 5. If it's a STOP_FRAME, then kill the thread.
3042         // 
3043         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3044         // transaction
3045        
3046         
3047         StgPtr frame;
3048         
3049         frame = sp + 1;
3050         info = get_ret_itbl((StgClosure *)frame);
3051         
3052         while (info->i.type != UPDATE_FRAME
3053                && (info->i.type != CATCH_FRAME || exception == NULL)
3054                && info->i.type != STOP_FRAME
3055                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3056         {
3057             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3058               // IF we find an ATOMICALLY_FRAME then we abort the
3059               // current transaction and propagate the exception.  In
3060               // this case (unlike ordinary exceptions) we do not care
3061               // whether the transaction is valid or not because its
3062               // possible validity cannot have caused the exception
3063               // and will not be visible after the abort.
3064               IF_DEBUG(stm,
3065                        debugBelch("Found atomically block delivering async exception\n"));
3066               stmAbortTransaction(tso -> trec);
3067               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3068             }
3069             frame += stack_frame_sizeW((StgClosure *)frame);
3070             info = get_ret_itbl((StgClosure *)frame);
3071         }
3072         
3073         switch (info->i.type) {
3074             
3075         case ATOMICALLY_FRAME:
3076             ASSERT(stop_at_atomically);
3077             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3078             stmCondemnTransaction(tso -> trec);
3079 #ifdef REG_R1
3080             tso->sp = frame;
3081 #else
3082             // R1 is not a register: the return convention for IO in
3083             // this case puts the return value on the stack, so we
3084             // need to set up the stack to return to the atomically
3085             // frame properly...
3086             tso->sp = frame - 2;
3087             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3088             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3089 #endif
3090             tso->what_next = ThreadRunGHC;
3091             return;
3092
3093         case CATCH_FRAME:
3094             // If we find a CATCH_FRAME, and we've got an exception to raise,
3095             // then build the THUNK raise(exception), and leave it on
3096             // top of the CATCH_FRAME ready to enter.
3097             //
3098         {
3099 #ifdef PROFILING
3100             StgCatchFrame *cf = (StgCatchFrame *)frame;
3101 #endif
3102             StgThunk *raise;
3103             
3104             // we've got an exception to raise, so let's pass it to the
3105             // handler in this frame.
3106             //
3107             raise = (StgThunk *)allocate(sizeofW(StgThunk)+1);
3108             TICK_ALLOC_SE_THK(1,0);
3109             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3110             raise->payload[0] = exception;
3111             
3112             // throw away the stack from Sp up to the CATCH_FRAME.
3113             //
3114             sp = frame - 1;
3115             
3116             /* Ensure that async excpetions are blocked now, so we don't get
3117              * a surprise exception before we get around to executing the
3118              * handler.
3119              */
3120             if (tso->blocked_exceptions == NULL) {
3121                 tso->blocked_exceptions = END_TSO_QUEUE;
3122             }
3123             
3124             /* Put the newly-built THUNK on top of the stack, ready to execute
3125              * when the thread restarts.
3126              */
3127             sp[0] = (W_)raise;
3128             sp[-1] = (W_)&stg_enter_info;
3129             tso->sp = sp-1;
3130             tso->what_next = ThreadRunGHC;
3131             IF_DEBUG(sanity, checkTSO(tso));
3132             return;
3133         }
3134         
3135         case UPDATE_FRAME:
3136         {
3137             StgAP_STACK * ap;
3138             nat words;
3139             
3140             // First build an AP_STACK consisting of the stack chunk above the
3141             // current update frame, with the top word on the stack as the
3142             // fun field.
3143             //
3144             words = frame - sp - 1;
3145             ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words));
3146             
3147             ap->size = words;
3148             ap->fun  = (StgClosure *)sp[0];
3149             sp++;
3150             for(i=0; i < (nat)words; ++i) {
3151                 ap->payload[i] = (StgClosure *)*sp++;
3152             }
3153             
3154             SET_HDR(ap,&stg_AP_STACK_info,
3155                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3156             TICK_ALLOC_UP_THK(words+1,0);
3157             
3158             IF_DEBUG(scheduler,
3159                      debugBelch("sched: Updating ");
3160                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3161                      debugBelch(" with ");
3162                      printObj((StgClosure *)ap);
3163                 );
3164
3165             // Replace the updatee with an indirection - happily
3166             // this will also wake up any threads currently
3167             // waiting on the result.
3168             //
3169             // Warning: if we're in a loop, more than one update frame on
3170             // the stack may point to the same object.  Be careful not to
3171             // overwrite an IND_OLDGEN in this case, because we'll screw
3172             // up the mutable lists.  To be on the safe side, don't
3173             // overwrite any kind of indirection at all.  See also
3174             // threadSqueezeStack in GC.c, where we have to make a similar
3175             // check.
3176             //
3177             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3178                 // revert the black hole
3179                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3180                                (StgClosure *)ap);
3181             }
3182             sp += sizeofW(StgUpdateFrame) - 1;
3183             sp[0] = (W_)ap; // push onto stack
3184             break;
3185         }
3186         
3187         case STOP_FRAME:
3188             // We've stripped the entire stack, the thread is now dead.
3189             sp += sizeofW(StgStopFrame);
3190             tso->what_next = ThreadKilled;
3191             tso->sp = sp;
3192             return;
3193             
3194         default:
3195             barf("raiseAsync");
3196         }
3197     }
3198     barf("raiseAsync");
3199 }
3200
3201 /* -----------------------------------------------------------------------------
3202    raiseExceptionHelper
3203    
3204    This function is called by the raise# primitve, just so that we can
3205    move some of the tricky bits of raising an exception from C-- into
3206    C.  Who knows, it might be a useful re-useable thing here too.
3207    -------------------------------------------------------------------------- */
3208
3209 StgWord
3210 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3211 {
3212     StgThunk *raise_closure = NULL;
3213     StgPtr p, next;
3214     StgRetInfoTable *info;
3215     //
3216     // This closure represents the expression 'raise# E' where E
3217     // is the exception raise.  It is used to overwrite all the
3218     // thunks which are currently under evaluataion.
3219     //
3220
3221     //    
3222     // LDV profiling: stg_raise_info has THUNK as its closure
3223     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3224     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3225     // 1 does not cause any problem unless profiling is performed.
3226     // However, when LDV profiling goes on, we need to linearly scan
3227     // small object pool, where raise_closure is stored, so we should
3228     // use MIN_UPD_SIZE.
3229     //
3230     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3231     //                                 sizeofW(StgClosure)+1);
3232     //
3233
3234     //
3235     // Walk up the stack, looking for the catch frame.  On the way,
3236     // we update any closures pointed to from update frames with the
3237     // raise closure that we just built.
3238     //
3239     p = tso->sp;
3240     while(1) {
3241         info = get_ret_itbl((StgClosure *)p);
3242         next = p + stack_frame_sizeW((StgClosure *)p);
3243         switch (info->i.type) {
3244             
3245         case UPDATE_FRAME:
3246             // Only create raise_closure if we need to.
3247             if (raise_closure == NULL) {
3248                 raise_closure = 
3249                     (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE);
3250                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3251                 raise_closure->payload[0] = exception;
3252             }
3253             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3254             p = next;
3255             continue;
3256
3257         case ATOMICALLY_FRAME:
3258             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3259             tso->sp = p;
3260             return ATOMICALLY_FRAME;
3261             
3262         case CATCH_FRAME:
3263             tso->sp = p;
3264             return CATCH_FRAME;
3265
3266         case CATCH_STM_FRAME:
3267             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3268             tso->sp = p;
3269             return CATCH_STM_FRAME;
3270             
3271         case STOP_FRAME:
3272             tso->sp = p;
3273             return STOP_FRAME;
3274
3275         case CATCH_RETRY_FRAME:
3276         default:
3277             p = next; 
3278             continue;
3279         }
3280     }
3281 }
3282
3283
3284 /* -----------------------------------------------------------------------------
3285    findRetryFrameHelper
3286
3287    This function is called by the retry# primitive.  It traverses the stack
3288    leaving tso->sp referring to the frame which should handle the retry.  
3289
3290    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3291    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3292
3293    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3294    despite the similar implementation.
3295
3296    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3297    not be created within memory transactions.
3298    -------------------------------------------------------------------------- */
3299
3300 StgWord
3301 findRetryFrameHelper (StgTSO *tso)
3302 {
3303   StgPtr           p, next;
3304   StgRetInfoTable *info;
3305
3306   p = tso -> sp;
3307   while (1) {
3308     info = get_ret_itbl((StgClosure *)p);
3309     next = p + stack_frame_sizeW((StgClosure *)p);
3310     switch (info->i.type) {
3311       
3312     case ATOMICALLY_FRAME:
3313       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3314       tso->sp = p;
3315       return ATOMICALLY_FRAME;
3316       
3317     case CATCH_RETRY_FRAME:
3318       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3319       tso->sp = p;
3320       return CATCH_RETRY_FRAME;
3321       
3322     case CATCH_STM_FRAME:
3323     default:
3324       ASSERT(info->i.type != CATCH_FRAME);
3325       ASSERT(info->i.type != STOP_FRAME);
3326       p = next; 
3327       continue;
3328     }
3329   }
3330 }
3331
3332 /* -----------------------------------------------------------------------------
3333    resurrectThreads is called after garbage collection on the list of
3334    threads found to be garbage.  Each of these threads will be woken
3335    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3336    on an MVar, or NonTermination if the thread was blocked on a Black
3337    Hole.
3338
3339    Locks: sched_mutex isn't held upon entry nor exit.
3340    -------------------------------------------------------------------------- */
3341
3342 void
3343 resurrectThreads( StgTSO *threads )
3344 {
3345   StgTSO *tso, *next;
3346
3347   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3348     next = tso->global_link;
3349     tso->global_link = all_threads;
3350     all_threads = tso;
3351     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3352
3353     switch (tso->why_blocked) {
3354     case BlockedOnMVar:
3355     case BlockedOnException:
3356       /* Called by GC - sched_mutex lock is currently held. */
3357       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3358       break;
3359     case BlockedOnBlackHole:
3360       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3361       break;
3362     case BlockedOnSTM:
3363       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3364       break;
3365     case NotBlocked:
3366       /* This might happen if the thread was blocked on a black hole
3367        * belonging to a thread that we've just woken up (raiseAsync
3368        * can wake up threads, remember...).
3369        */
3370       continue;
3371     default:
3372       barf("resurrectThreads: thread blocked in a strange way");
3373     }
3374   }
3375 }
3376
3377 /* ----------------------------------------------------------------------------
3378  * Debugging: why is a thread blocked
3379  * [Also provides useful information when debugging threaded programs
3380  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3381    ------------------------------------------------------------------------- */
3382
3383 static void
3384 printThreadBlockage(StgTSO *tso)
3385 {
3386   switch (tso->why_blocked) {
3387   case BlockedOnRead:
3388     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
3389     break;
3390   case BlockedOnWrite:
3391     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
3392     break;
3393 #if defined(mingw32_HOST_OS)
3394     case BlockedOnDoProc:
3395     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
3396     break;
3397 #endif
3398   case BlockedOnDelay:
3399     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
3400     break;
3401   case BlockedOnMVar:
3402     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
3403     break;
3404   case BlockedOnException:
3405     debugBelch("is blocked on delivering an exception to thread %d",
3406             tso->block_info.tso->id);
3407     break;
3408   case BlockedOnBlackHole:
3409     debugBelch("is blocked on a black hole");
3410     break;
3411   case NotBlocked:
3412     debugBelch("is not blocked");
3413     break;
3414 #if defined(PARALLEL_HASKELL)
3415   case BlockedOnGA:
3416     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3417             tso->block_info.closure, info_type(tso->block_info.closure));
3418     break;
3419   case BlockedOnGA_NoSend:
3420     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3421             tso->block_info.closure, info_type(tso->block_info.closure));
3422     break;
3423 #endif
3424   case BlockedOnCCall:
3425     debugBelch("is blocked on an external call");
3426     break;
3427   case BlockedOnCCall_NoUnblockExc:
3428     debugBelch("is blocked on an external call (exceptions were already blocked)");
3429     break;
3430   case BlockedOnSTM:
3431     debugBelch("is blocked on an STM operation");
3432     break;
3433   default:
3434     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3435          tso->why_blocked, tso->id, tso);
3436   }
3437 }
3438
3439 static void
3440 printThreadStatus(StgTSO *tso)
3441 {
3442   switch (tso->what_next) {
3443   case ThreadKilled:
3444     debugBelch("has 
3445
3446   /* should cover all closures that may have a blocking queue */
3447   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3448          get_itbl(node)->type == FETCH_ME_BQ ||
3449          get_itbl(node)->type == RBH);
3450     
3451   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3452   node_loc = where_is(node);
3453
3454   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3455           node, info_type(node), node_loc);
3456
3457   /* 
3458      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3459   */
3460   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3461        !end; // iterate until bqe points to a CONSTR
3462        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3463     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3464     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3465     /* types of closures that may appear in a blocking queue */
3466     ASSERT(get_itbl(bqe)->type == TSO ||           
3467            get_itbl(bqe)->type == CONSTR); 
3468     /* only BQs of an RBH end with an RBH_Save closure */
3469     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3470
3471     tso_loc = where_is((StgClosure *)bqe);
3472     switch (get_itbl(bqe)->type) {
3473     case TSO:
3474       debugBelch(" TSO %d (%p) on [PE %d],",
3475               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3476       break;
3477     case CONSTR:
3478       debugBelch(" %s (IP %p),",
3479               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3480                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3481                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3482                "RBH_Save_?"), get_itbl(bqe));
3483       break;
3484     default:
3485       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3486            info_type((StgClosure *)bqe), node, info_type(node));
3487       break;
3488     }
3489   } /* for */
3490   debugBelch("\n");
3491 }
3492 # endif
3493
3494 #if defined(PARALLEL_HASKELL)
3495 static nat
3496 run_queue_len(void)
3497 {
3498   nat i;
3499   StgTSO *tso;
3500
3501   for (i=0, tso=run_queue_hd; 
3502        tso != END_TSO_QUEUE;
3503        i++, tso=tso->link)
3504     /* nothing */
3505
3506   return i;
3507 }
3508 #endif
3509
3510 void
3511 sched_belch(char *s, ...)
3512 {
3513   va_list ap;
3514   va_start(ap,s);
3515 #ifdef RTS_SUPPORTS_THREADS
3516   debugBelch("sched (task %p): ", osThreadId());
3517 #elif defined(PARALLEL_HASKELL)
3518   debugBelch("== ");
3519 #else
3520   debugBelch("sched: ");
3521 #endif
3522   vdebugBelch(s, ap);
3523   debugBelch("\n");
3524   va_end(ap);
3525 }
3526
3527 #endif /* DEBUG */