[project @ 2005-04-13 11:11:31 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* if this flag is set as well, give up execution */
178 rtsBool interrupted = rtsFalse;
179
180 /* Next thread ID to allocate.
181  * Locks required: thread_id_mutex
182  */
183 static StgThreadID next_thread_id = 1;
184
185 /*
186  * Pointers to the state of the current thread.
187  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
188  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
189  */
190  
191 /* The smallest stack size that makes any sense is:
192  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
193  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
194  *  + 1                       (the closure to enter)
195  *  + 1                       (stg_ap_v_ret)
196  *  + 1                       (spare slot req'd by stg_ap_v_ret)
197  *
198  * A thread with this stack will bomb immediately with a stack
199  * overflow, which will increase its stack size.  
200  */
201
202 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
203
204
205 #if defined(GRAN)
206 StgTSO *CurrentTSO;
207 #endif
208
209 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
210  *  exists - earlier gccs apparently didn't.
211  *  -= chak
212  */
213 StgTSO dummy_tso;
214
215 /*
216  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
217  * in an MT setting, needed to signal that a worker thread shouldn't hang around
218  * in the scheduler when it is out of work.
219  */
220 static rtsBool shutting_down_scheduler = rtsFalse;
221
222 #if defined(RTS_SUPPORTS_THREADS)
223 /* ToDo: carefully document the invariants that go together
224  *       with these synchronisation objects.
225  */
226 Mutex     sched_mutex       = INIT_MUTEX_VAR;
227 Mutex     term_mutex        = INIT_MUTEX_VAR;
228
229 #endif /* RTS_SUPPORTS_THREADS */
230
231 #if defined(PARALLEL_HASKELL)
232 StgTSO *LastTSO;
233 rtsTime TimeOfLastYield;
234 rtsBool emitSchedule = rtsTrue;
235 #endif
236
237 #if DEBUG
238 static char *whatNext_strs[] = {
239   "(unknown)",
240   "ThreadRunGHC",
241   "ThreadInterpret",
242   "ThreadKilled",
243   "ThreadRelocated",
244   "ThreadComplete"
245 };
246 #endif
247
248 /* -----------------------------------------------------------------------------
249  * static function prototypes
250  * -------------------------------------------------------------------------- */
251
252 #if defined(RTS_SUPPORTS_THREADS)
253 static void taskStart(void);
254 #endif
255
256 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
257                       Capability *initialCapability );
258
259 //
260 // These function all encapsulate parts of the scheduler loop, and are
261 // abstracted only to make the structure and control flow of the
262 // scheduler clearer.
263 //
264 static void schedulePreLoop(void);
265 static void scheduleStartSignalHandlers(void);
266 static void scheduleCheckBlockedThreads(void);
267 static void scheduleCheckBlackHoles(void);
268 static void scheduleDetectDeadlock(void);
269 #if defined(GRAN)
270 static StgTSO *scheduleProcessEvent(rtsEvent *event);
271 #endif
272 #if defined(PARALLEL_HASKELL)
273 static StgTSO *scheduleSendPendingMessages(void);
274 static void scheduleActivateSpark(void);
275 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
276 #endif
277 #if defined(PAR) || defined(GRAN)
278 static void scheduleGranParReport(void);
279 #endif
280 static void schedulePostRunThread(void);
281 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
282 static void scheduleHandleStackOverflow( StgTSO *t);
283 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
284 static void scheduleHandleThreadBlocked( StgTSO *t );
285 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
286                                              Capability *cap, StgTSO *t );
287 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
288 static void scheduleDoGC(Capability *cap);
289
290 static void unblockThread(StgTSO *tso);
291 static rtsBool checkBlackHoles(void);
292 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
293                                    Capability *initialCapability
294                                    );
295 static void scheduleThread_ (StgTSO* tso);
296 static void AllRoots(evac_fn evac);
297
298 static StgTSO *threadStackOverflow(StgTSO *tso);
299
300 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
301                         rtsBool stop_at_atomically);
302
303 static void printThreadBlockage(StgTSO *tso);
304 static void printThreadStatus(StgTSO *tso);
305
306 #if defined(PARALLEL_HASKELL)
307 StgTSO * createSparkThread(rtsSpark spark);
308 StgTSO * activateSpark (rtsSpark spark);  
309 #endif
310
311 /* ----------------------------------------------------------------------------
312  * Starting Tasks
313  * ------------------------------------------------------------------------- */
314
315 #if defined(RTS_SUPPORTS_THREADS)
316 static nat startingWorkerThread = 0;
317
318 static void
319 taskStart(void)
320 {
321   ACQUIRE_LOCK(&sched_mutex);
322   startingWorkerThread--;
323   schedule(NULL,NULL);
324   taskStop();
325   RELEASE_LOCK(&sched_mutex);
326 }
327
328 void
329 startSchedulerTaskIfNecessary(void)
330 {
331     if ( !EMPTY_RUN_QUEUE()
332          && !shutting_down_scheduler // not if we're shutting down
333          && startingWorkerThread==0)
334     {
335         // we don't want to start another worker thread
336         // just because the last one hasn't yet reached the
337         // "waiting for capability" state
338         startingWorkerThread++;
339         if (!maybeStartNewWorker(taskStart)) {
340             startingWorkerThread--;
341         }
342     }
343 }
344 #endif
345
346 /* -----------------------------------------------------------------------------
347  * Putting a thread on the run queue: different scheduling policies
348  * -------------------------------------------------------------------------- */
349
350 STATIC_INLINE void
351 addToRunQueue( StgTSO *t )
352 {
353 #if defined(PARALLEL_HASKELL)
354     if (RtsFlags.ParFlags.doFairScheduling) { 
355         // this does round-robin scheduling; good for concurrency
356         APPEND_TO_RUN_QUEUE(t);
357     } else {
358         // this does unfair scheduling; good for parallelism
359         PUSH_ON_RUN_QUEUE(t);
360     }
361 #else
362     // this does round-robin scheduling; good for concurrency
363     APPEND_TO_RUN_QUEUE(t);
364 #endif
365 }
366     
367 /* ---------------------------------------------------------------------------
368    Main scheduling loop.
369
370    We use round-robin scheduling, each thread returning to the
371    scheduler loop when one of these conditions is detected:
372
373       * out of heap space
374       * timer expires (thread yields)
375       * thread blocks
376       * thread ends
377       * stack overflow
378
379    Locking notes:  we acquire the scheduler lock once at the beginning
380    of the scheduler loop, and release it when
381     
382       * running a thread, or
383       * waiting for work, or
384       * waiting for a GC to complete.
385
386    GRAN version:
387      In a GranSim setup this loop iterates over the global event queue.
388      This revolves around the global event queue, which determines what 
389      to do next. Therefore, it's more complicated than either the 
390      concurrent or the parallel (GUM) setup.
391
392    GUM version:
393      GUM iterates over incoming messages.
394      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
395      and sends out a fish whenever it has nothing to do; in-between
396      doing the actual reductions (shared code below) it processes the
397      incoming messages and deals with delayed operations 
398      (see PendingFetches).
399      This is not the ugliest code you could imagine, but it's bloody close.
400
401    ------------------------------------------------------------------------ */
402
403 static void
404 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
405           Capability *initialCapability )
406 {
407   StgTSO *t;
408   Capability *cap;
409   StgThreadReturnCode ret;
410 #if defined(GRAN)
411   rtsEvent *event;
412 #elif defined(PARALLEL_HASKELL)
413   StgTSO *tso;
414   GlobalTaskId pe;
415   rtsBool receivedFinish = rtsFalse;
416 # if defined(DEBUG)
417   nat tp_size, sp_size; // stats only
418 # endif
419 #endif
420   nat prev_what_next;
421   rtsBool ready_to_gc;
422   
423   // Pre-condition: sched_mutex is held.
424   // We might have a capability, passed in as initialCapability.
425   cap = initialCapability;
426
427 #if !defined(RTS_SUPPORTS_THREADS)
428   // simply initialise it in the non-threaded case
429   grabCapability(&cap);
430 #endif
431
432   IF_DEBUG(scheduler,
433            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
434                        mainThread, initialCapability);
435       );
436
437   schedulePreLoop();
438
439   // -----------------------------------------------------------
440   // Scheduler loop starts here:
441
442 #if defined(PARALLEL_HASKELL)
443 #define TERMINATION_CONDITION        (!receivedFinish)
444 #elif defined(GRAN)
445 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
446 #else
447 #define TERMINATION_CONDITION        rtsTrue
448 #endif
449
450   while (TERMINATION_CONDITION) {
451
452 #if defined(GRAN)
453       /* Choose the processor with the next event */
454       CurrentProc = event->proc;
455       CurrentTSO = event->tso;
456 #endif
457
458       IF_DEBUG(scheduler, printAllThreads());
459
460 #if defined(RTS_SUPPORTS_THREADS)
461       // Yield the capability to higher-priority tasks if necessary.
462       //
463       if (cap != NULL) {
464           yieldCapability(&cap);
465       }
466
467       // If we do not currently hold a capability, we wait for one
468       //
469       if (cap == NULL) {
470           waitForCapability(&sched_mutex, &cap,
471                             mainThread ? &mainThread->bound_thread_cond : NULL);
472       }
473
474       // We now have a capability...
475 #endif
476
477     // Check whether we have re-entered the RTS from Haskell without
478     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
479     // call).
480     if (cap->r.rInHaskell) {
481           errorBelch("schedule: re-entered unsafely.\n"
482                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
483           stg_exit(1);
484     }
485
486     //
487     // Test for interruption.  If interrupted==rtsTrue, then either
488     // we received a keyboard interrupt (^C), or the scheduler is
489     // trying to shut down all the tasks (shutting_down_scheduler) in
490     // the threaded RTS.
491     //
492     if (interrupted) {
493         if (shutting_down_scheduler) {
494             IF_DEBUG(scheduler, sched_belch("shutting down"));
495             releaseCapability(cap);
496             if (mainThread) {
497                 mainThread->stat = Interrupted;
498                 mainThread->ret  = NULL;
499             }
500             return;
501         } else {
502             IF_DEBUG(scheduler, sched_belch("interrupted"));
503             deleteAllThreads();
504         }
505     }
506
507 #if defined(not_yet) && defined(SMP)
508     //
509     // Top up the run queue from our spark pool.  We try to make the
510     // number of threads in the run queue equal to the number of
511     // free capabilities.
512     //
513     {
514         StgClosure *spark;
515         if (EMPTY_RUN_QUEUE()) {
516             spark = findSpark(rtsFalse);
517             if (spark == NULL) {
518                 break; /* no more sparks in the pool */
519             } else {
520                 createSparkThread(spark);         
521                 IF_DEBUG(scheduler,
522                          sched_belch("==^^ turning spark of closure %p into a thread",
523                                      (StgClosure *)spark));
524             }
525         }
526     }
527 #endif // SMP
528
529     scheduleStartSignalHandlers();
530
531     // Only check the black holes here if we've nothing else to do.
532     // During normal execution, the black hole list only gets checked
533     // at GC time, to avoid repeatedly traversing this possibly long
534     // list each time around the scheduler.
535     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
536
537     scheduleCheckBlockedThreads();
538
539     scheduleDetectDeadlock();
540
541     // Normally, the only way we can get here with no threads to
542     // run is if a keyboard interrupt received during 
543     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
544     // Additionally, it is not fatal for the
545     // threaded RTS to reach here with no threads to run.
546     //
547     // win32: might be here due to awaitEvent() being abandoned
548     // as a result of a console event having been delivered.
549     if ( EMPTY_RUN_QUEUE() ) {
550 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
551         ASSERT(interrupted);
552 #endif
553         continue; // nothing to do
554     }
555
556 #if defined(PARALLEL_HASKELL)
557     scheduleSendPendingMessages();
558     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
559         continue;
560
561 #if defined(SPARKS)
562     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
563 #endif
564
565     /* If we still have no work we need to send a FISH to get a spark
566        from another PE */
567     if (EMPTY_RUN_QUEUE()) {
568         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
569         ASSERT(rtsFalse); // should not happen at the moment
570     }
571     // from here: non-empty run queue.
572     //  TODO: merge above case with this, only one call processMessages() !
573     if (PacketsWaiting()) {  /* process incoming messages, if
574                                 any pending...  only in else
575                                 because getRemoteWork waits for
576                                 messages as well */
577         receivedFinish = processMessages();
578     }
579 #endif
580
581 #if defined(GRAN)
582     scheduleProcessEvent(event);
583 #endif
584
585     // 
586     // Get a thread to run
587     //
588     ASSERT(run_queue_hd != END_TSO_QUEUE);
589     POP_RUN_QUEUE(t);
590
591 #if defined(GRAN) || defined(PAR)
592     scheduleGranParReport(); // some kind of debuging output
593 #else
594     // Sanity check the thread we're about to run.  This can be
595     // expensive if there is lots of thread switching going on...
596     IF_DEBUG(sanity,checkTSO(t));
597 #endif
598
599 #if defined(RTS_SUPPORTS_THREADS)
600     // Check whether we can run this thread in the current task.
601     // If not, we have to pass our capability to the right task.
602     {
603       StgMainThread *m = t->main;
604       
605       if(m)
606       {
607         if(m == mainThread)
608         {
609           IF_DEBUG(scheduler,
610             sched_belch("### Running thread %d in bound thread", t->id));
611           // yes, the Haskell thread is bound to the current native thread
612         }
613         else
614         {
615           IF_DEBUG(scheduler,
616             sched_belch("### thread %d bound to another OS thread", t->id));
617           // no, bound to a different Haskell thread: pass to that thread
618           PUSH_ON_RUN_QUEUE(t);
619           passCapability(&m->bound_thread_cond);
620           continue;
621         }
622       }
623       else
624       {
625         if(mainThread != NULL)
626         // The thread we want to run is bound.
627         {
628           IF_DEBUG(scheduler,
629             sched_belch("### this OS thread cannot run thread %d", t->id));
630           // no, the current native thread is bound to a different
631           // Haskell thread, so pass it to any worker thread
632           PUSH_ON_RUN_QUEUE(t);
633           passCapabilityToWorker();
634           continue; 
635         }
636       }
637     }
638 #endif
639
640     cap->r.rCurrentTSO = t;
641     
642     /* context switches are now initiated by the timer signal, unless
643      * the user specified "context switch as often as possible", with
644      * +RTS -C0
645      */
646     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
647          && (run_queue_hd != END_TSO_QUEUE
648              || blocked_queue_hd != END_TSO_QUEUE
649              || sleeping_queue != END_TSO_QUEUE)))
650         context_switch = 1;
651
652 run_thread:
653
654     RELEASE_LOCK(&sched_mutex);
655
656     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
657                               (long)t->id, whatNext_strs[t->what_next]));
658
659 #if defined(PROFILING)
660     startHeapProfTimer();
661 #endif
662
663     // ----------------------------------------------------------------------
664     // Run the current thread 
665
666     prev_what_next = t->what_next;
667
668     errno = t->saved_errno;
669     cap->r.rInHaskell = rtsTrue;
670
671     switch (prev_what_next) {
672
673     case ThreadKilled:
674     case ThreadComplete:
675         /* Thread already finished, return to scheduler. */
676         ret = ThreadFinished;
677         break;
678
679     case ThreadRunGHC:
680         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
681         break;
682
683     case ThreadInterpret:
684         ret = interpretBCO(cap);
685         break;
686
687     default:
688       barf("schedule: invalid what_next field");
689     }
690
691     // We have run some Haskell code: there might be blackhole-blocked
692     // threads to wake up now.
693     if ( blackhole_queue != END_TSO_QUEUE ) {
694         blackholes_need_checking = rtsTrue;
695     }
696
697     cap->r.rInHaskell = rtsFalse;
698
699     // The TSO might have moved, eg. if it re-entered the RTS and a GC
700     // happened.  So find the new location:
701     t = cap->r.rCurrentTSO;
702
703     // And save the current errno in this thread.
704     t->saved_errno = errno;
705
706     // ----------------------------------------------------------------------
707     
708     /* Costs for the scheduler are assigned to CCS_SYSTEM */
709 #if defined(PROFILING)
710     stopHeapProfTimer();
711     CCCS = CCS_SYSTEM;
712 #endif
713     
714     ACQUIRE_LOCK(&sched_mutex);
715     
716 #if defined(RTS_SUPPORTS_THREADS)
717     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
718 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
719     IF_DEBUG(scheduler,debugBelch("sched: "););
720 #endif
721     
722     schedulePostRunThread();
723
724     ready_to_gc = rtsFalse;
725
726     switch (ret) {
727     case HeapOverflow:
728         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
729         break;
730
731     case StackOverflow:
732         scheduleHandleStackOverflow(t);
733         break;
734
735     case ThreadYielding:
736         if (scheduleHandleYield(t, prev_what_next)) {
737             // shortcut for switching between compiler/interpreter:
738             goto run_thread; 
739         }
740         break;
741
742     case ThreadBlocked:
743         scheduleHandleThreadBlocked(t);
744         threadPaused(t);
745         break;
746
747     case ThreadFinished:
748         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
749         break;
750
751     default:
752       barf("schedule: invalid thread return code %d", (int)ret);
753     }
754
755     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
756     if (ready_to_gc) { scheduleDoGC(cap); }
757   } /* end of while() */
758
759   IF_PAR_DEBUG(verbose,
760                debugBelch("== Leaving schedule() after having received Finish\n"));
761 }
762
763 /* ----------------------------------------------------------------------------
764  * Setting up the scheduler loop
765  * ASSUMES: sched_mutex
766  * ------------------------------------------------------------------------- */
767
768 static void
769 schedulePreLoop(void)
770 {
771 #if defined(GRAN) 
772     /* set up first event to get things going */
773     /* ToDo: assign costs for system setup and init MainTSO ! */
774     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
775               ContinueThread, 
776               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
777     
778     IF_DEBUG(gran,
779              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
780                         CurrentTSO);
781              G_TSO(CurrentTSO, 5));
782     
783     if (RtsFlags.GranFlags.Light) {
784         /* Save current time; GranSim Light only */
785         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
786     }      
787 #endif
788 }
789
790 /* ----------------------------------------------------------------------------
791  * Start any pending signal handlers
792  * ASSUMES: sched_mutex
793  * ------------------------------------------------------------------------- */
794
795 static void
796 scheduleStartSignalHandlers(void)
797 {
798 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
799     if (signals_pending()) {
800       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
801       startSignalHandlers();
802       ACQUIRE_LOCK(&sched_mutex);
803     }
804 #endif
805 }
806
807 /* ----------------------------------------------------------------------------
808  * Check for blocked threads that can be woken up.
809  * ASSUMES: sched_mutex
810  * ------------------------------------------------------------------------- */
811
812 static void
813 scheduleCheckBlockedThreads(void)
814 {
815     //
816     // Check whether any waiting threads need to be woken up.  If the
817     // run queue is empty, and there are no other tasks running, we
818     // can wait indefinitely for something to happen.
819     //
820     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
821     {
822 #if defined(RTS_SUPPORTS_THREADS)
823         // We shouldn't be here...
824         barf("schedule: awaitEvent() in threaded RTS");
825 #endif
826         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
827     }
828 }
829
830
831 /* ----------------------------------------------------------------------------
832  * Check for threads blocked on BLACKHOLEs that can be woken up
833  * ASSUMES: sched_mutex
834  * ------------------------------------------------------------------------- */
835 static void
836 scheduleCheckBlackHoles( void )
837 {
838     if ( blackholes_need_checking )
839     {
840         checkBlackHoles();
841         blackholes_need_checking = rtsFalse;
842     }
843 }
844
845 /* ----------------------------------------------------------------------------
846  * Detect deadlock conditions and attempt to resolve them.
847  * ASSUMES: sched_mutex
848  * ------------------------------------------------------------------------- */
849
850 static void
851 scheduleDetectDeadlock(void)
852 {
853     /* 
854      * Detect deadlock: when we have no threads to run, there are no
855      * threads blocked, waiting for I/O, or sleeping, and all the
856      * other tasks are waiting for work, we must have a deadlock of
857      * some description.
858      */
859     if ( EMPTY_THREAD_QUEUES() )
860     {
861 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
862         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
863
864         // Garbage collection can release some new threads due to
865         // either (a) finalizers or (b) threads resurrected because
866         // they are unreachable and will therefore be sent an
867         // exception.  Any threads thus released will be immediately
868         // runnable.
869         GarbageCollect(GetRoots,rtsTrue);
870         if ( !EMPTY_RUN_QUEUE() ) return;
871
872 #if defined(RTS_USER_SIGNALS)
873         /* If we have user-installed signal handlers, then wait
874          * for signals to arrive rather then bombing out with a
875          * deadlock.
876          */
877         if ( anyUserHandlers() ) {
878             IF_DEBUG(scheduler, 
879                      sched_belch("still deadlocked, waiting for signals..."));
880
881             awaitUserSignals();
882
883             if (signals_pending()) {
884                 RELEASE_LOCK(&sched_mutex);
885                 startSignalHandlers();
886                 ACQUIRE_LOCK(&sched_mutex);
887             }
888
889             // either we have threads to run, or we were interrupted:
890             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
891         }
892 #endif
893
894         /* Probably a real deadlock.  Send the current main thread the
895          * Deadlock exception (or in the SMP build, send *all* main
896          * threads the deadlock exception, since none of them can make
897          * progress).
898          */
899         {
900             StgMainThread *m;
901             m = main_threads;
902             switch (m->tso->why_blocked) {
903             case BlockedOnBlackHole:
904             case BlockedOnException:
905             case BlockedOnMVar:
906                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
907                 return;
908             default:
909                 barf("deadlock: main thread blocked in a strange way");
910             }
911         }
912
913 #elif defined(RTS_SUPPORTS_THREADS)
914     // ToDo: add deadlock detection in threaded RTS
915 #elif defined(PARALLEL_HASKELL)
916     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
917 #endif
918     }
919 }
920
921 /* ----------------------------------------------------------------------------
922  * Process an event (GRAN only)
923  * ------------------------------------------------------------------------- */
924
925 #if defined(GRAN)
926 static StgTSO *
927 scheduleProcessEvent(rtsEvent *event)
928 {
929     StgTSO *t;
930
931     if (RtsFlags.GranFlags.Light)
932       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
933
934     /* adjust time based on time-stamp */
935     if (event->time > CurrentTime[CurrentProc] &&
936         event->evttype != ContinueThread)
937       CurrentTime[CurrentProc] = event->time;
938     
939     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
940     if (!RtsFlags.GranFlags.Light)
941       handleIdlePEs();
942
943     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
944
945     /* main event dispatcher in GranSim */
946     switch (event->evttype) {
947       /* Should just be continuing execution */
948     case ContinueThread:
949       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
950       /* ToDo: check assertion
951       ASSERT(run_queue_hd != (StgTSO*)NULL &&
952              run_queue_hd != END_TSO_QUEUE);
953       */
954       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
955       if (!RtsFlags.GranFlags.DoAsyncFetch &&
956           procStatus[CurrentProc]==Fetching) {
957         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
958               CurrentTSO->id, CurrentTSO, CurrentProc);
959         goto next_thread;
960       } 
961       /* Ignore ContinueThreads for completed threads */
962       if (CurrentTSO->what_next == ThreadComplete) {
963         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
964               CurrentTSO->id, CurrentTSO, CurrentProc);
965         goto next_thread;
966       } 
967       /* Ignore ContinueThreads for threads that are being migrated */
968       if (PROCS(CurrentTSO)==Nowhere) { 
969         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
970               CurrentTSO->id, CurrentTSO, CurrentProc);
971         goto next_thread;
972       }
973       /* The thread should be at the beginning of the run queue */
974       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
975         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
976               CurrentTSO->id, CurrentTSO, CurrentProc);
977         break; // run the thread anyway
978       }
979       /*
980       new_event(proc, proc, CurrentTime[proc],
981                 FindWork,
982                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
983       goto next_thread; 
984       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
985       break; // now actually run the thread; DaH Qu'vam yImuHbej 
986
987     case FetchNode:
988       do_the_fetchnode(event);
989       goto next_thread;             /* handle next event in event queue  */
990       
991     case GlobalBlock:
992       do_the_globalblock(event);
993       goto next_thread;             /* handle next event in event queue  */
994       
995     case FetchReply:
996       do_the_fetchreply(event);
997       goto next_thread;             /* handle next event in event queue  */
998       
999     case UnblockThread:   /* Move from the blocked queue to the tail of */
1000       do_the_unblock(event);
1001       goto next_thread;             /* handle next event in event queue  */
1002       
1003     case ResumeThread:  /* Move from the blocked queue to the tail of */
1004       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1005       event->tso->gran.blocktime += 
1006         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1007       do_the_startthread(event);
1008       goto next_thread;             /* handle next event in event queue  */
1009       
1010     case StartThread:
1011       do_the_startthread(event);
1012       goto next_thread;             /* handle next event in event queue  */
1013       
1014     case MoveThread:
1015       do_the_movethread(event);
1016       goto next_thread;             /* handle next event in event queue  */
1017       
1018     case MoveSpark:
1019       do_the_movespark(event);
1020       goto next_thread;             /* handle next event in event queue  */
1021       
1022     case FindWork:
1023       do_the_findwork(event);
1024       goto next_thread;             /* handle next event in event queue  */
1025       
1026     default:
1027       barf("Illegal event type %u\n", event->evttype);
1028     }  /* switch */
1029     
1030     /* This point was scheduler_loop in the old RTS */
1031
1032     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1033
1034     TimeOfLastEvent = CurrentTime[CurrentProc];
1035     TimeOfNextEvent = get_time_of_next_event();
1036     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1037     // CurrentTSO = ThreadQueueHd;
1038
1039     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1040                          TimeOfNextEvent));
1041
1042     if (RtsFlags.GranFlags.Light) 
1043       GranSimLight_leave_system(event, &ActiveTSO); 
1044
1045     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1046
1047     IF_DEBUG(gran, 
1048              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1049
1050     /* in a GranSim setup the TSO stays on the run queue */
1051     t = CurrentTSO;
1052     /* Take a thread from the run queue. */
1053     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1054
1055     IF_DEBUG(gran, 
1056              debugBelch("GRAN: About to run current thread, which is\n");
1057              G_TSO(t,5));
1058
1059     context_switch = 0; // turned on via GranYield, checking events and time slice
1060
1061     IF_DEBUG(gran, 
1062              DumpGranEvent(GR_SCHEDULE, t));
1063
1064     procStatus[CurrentProc] = Busy;
1065 }
1066 #endif // GRAN
1067
1068 /* ----------------------------------------------------------------------------
1069  * Send pending messages (PARALLEL_HASKELL only)
1070  * ------------------------------------------------------------------------- */
1071
1072 #if defined(PARALLEL_HASKELL)
1073 static StgTSO *
1074 scheduleSendPendingMessages(void)
1075 {
1076     StgSparkPool *pool;
1077     rtsSpark spark;
1078     StgTSO *t;
1079
1080 # if defined(PAR) // global Mem.Mgmt., omit for now
1081     if (PendingFetches != END_BF_QUEUE) {
1082         processFetches();
1083     }
1084 # endif
1085     
1086     if (RtsFlags.ParFlags.BufferTime) {
1087         // if we use message buffering, we must send away all message
1088         // packets which have become too old...
1089         sendOldBuffers(); 
1090     }
1091 }
1092 #endif
1093
1094 /* ----------------------------------------------------------------------------
1095  * Activate spark threads (PARALLEL_HASKELL only)
1096  * ------------------------------------------------------------------------- */
1097
1098 #if defined(PARALLEL_HASKELL)
1099 static void
1100 scheduleActivateSpark(void)
1101 {
1102 #if defined(SPARKS)
1103   ASSERT(EMPTY_RUN_QUEUE());
1104 /* We get here if the run queue is empty and want some work.
1105    We try to turn a spark into a thread, and add it to the run queue,
1106    from where it will be picked up in the next iteration of the scheduler
1107    loop.
1108 */
1109
1110       /* :-[  no local threads => look out for local sparks */
1111       /* the spark pool for the current PE */
1112       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1113       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1114           pool->hd < pool->tl) {
1115         /* 
1116          * ToDo: add GC code check that we really have enough heap afterwards!!
1117          * Old comment:
1118          * If we're here (no runnable threads) and we have pending
1119          * sparks, we must have a space problem.  Get enough space
1120          * to turn one of those pending sparks into a
1121          * thread... 
1122          */
1123
1124         spark = findSpark(rtsFalse);            /* get a spark */
1125         if (spark != (rtsSpark) NULL) {
1126           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1127           IF_PAR_DEBUG(fish, // schedule,
1128                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1129                              tso->id, tso, advisory_thread_count));
1130
1131           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1132             IF_PAR_DEBUG(fish, // schedule,
1133                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1134                             spark));
1135             return rtsFalse; /* failed to generate a thread */
1136           }                  /* otherwise fall through & pick-up new tso */
1137         } else {
1138           IF_PAR_DEBUG(fish, // schedule,
1139                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1140                              spark_queue_len(pool)));
1141           return rtsFalse;  /* failed to generate a thread */
1142         }
1143         return rtsTrue;  /* success in generating a thread */
1144   } else { /* no more threads permitted or pool empty */
1145     return rtsFalse;  /* failed to generateThread */
1146   }
1147 #else
1148   tso = NULL; // avoid compiler warning only
1149   return rtsFalse;  /* dummy in non-PAR setup */
1150 #endif // SPARKS
1151 }
1152 #endif // PARALLEL_HASKELL
1153
1154 /* ----------------------------------------------------------------------------
1155  * Get work from a remote node (PARALLEL_HASKELL only)
1156  * ------------------------------------------------------------------------- */
1157     
1158 #if defined(PARALLEL_HASKELL)
1159 static rtsBool
1160 scheduleGetRemoteWork(rtsBool *receivedFinish)
1161 {
1162   ASSERT(EMPTY_RUN_QUEUE());
1163
1164   if (RtsFlags.ParFlags.BufferTime) {
1165         IF_PAR_DEBUG(verbose, 
1166                 debugBelch("...send all pending data,"));
1167         {
1168           nat i;
1169           for (i=1; i<=nPEs; i++)
1170             sendImmediately(i); // send all messages away immediately
1171         }
1172   }
1173 # ifndef SPARKS
1174         //++EDEN++ idle() , i.e. send all buffers, wait for work
1175         // suppress fishing in EDEN... just look for incoming messages
1176         // (blocking receive)
1177   IF_PAR_DEBUG(verbose, 
1178                debugBelch("...wait for incoming messages...\n"));
1179   *receivedFinish = processMessages(); // blocking receive...
1180
1181         // and reenter scheduling loop after having received something
1182         // (return rtsFalse below)
1183
1184 # else /* activate SPARKS machinery */
1185 /* We get here, if we have no work, tried to activate a local spark, but still
1186    have no work. We try to get a remote spark, by sending a FISH message.
1187    Thread migration should be added here, and triggered when a sequence of 
1188    fishes returns without work. */
1189         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1190
1191       /* =8-[  no local sparks => look for work on other PEs */
1192         /*
1193          * We really have absolutely no work.  Send out a fish
1194          * (there may be some out there already), and wait for
1195          * something to arrive.  We clearly can't run any threads
1196          * until a SCHEDULE or RESUME arrives, and so that's what
1197          * we're hoping to see.  (Of course, we still have to
1198          * respond to other types of messages.)
1199          */
1200         rtsTime now = msTime() /*CURRENT_TIME*/;
1201         IF_PAR_DEBUG(verbose, 
1202                      debugBelch("--  now=%ld\n", now));
1203         IF_PAR_DEBUG(fish, // verbose,
1204              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1205                  (last_fish_arrived_at!=0 &&
1206                   last_fish_arrived_at+delay > now)) {
1207                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1208                      now, last_fish_arrived_at+delay, 
1209                      last_fish_arrived_at,
1210                      delay);
1211              });
1212   
1213         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1214             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1215           if (last_fish_arrived_at==0 ||
1216               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1217             /* outstandingFishes is set in sendFish, processFish;
1218                avoid flooding system with fishes via delay */
1219     next_fish_to_send_at = 0;  
1220   } else {
1221     /* ToDo: this should be done in the main scheduling loop to avoid the
1222              busy wait here; not so bad if fish delay is very small  */
1223     int iq = 0; // DEBUGGING -- HWL
1224     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1225     /* send a fish when ready, but process messages that arrive in the meantime */
1226     do {
1227       if (PacketsWaiting()) {
1228         iq++; // DEBUGGING
1229         *receivedFinish = processMessages();
1230       }
1231       now = msTime();
1232     } while (!*receivedFinish || now<next_fish_to_send_at);
1233     // JB: This means the fish could become obsolete, if we receive
1234     // work. Better check for work again? 
1235     // last line: while (!receivedFinish || !haveWork || now<...)
1236     // next line: if (receivedFinish || haveWork )
1237
1238     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1239       return rtsFalse;  // NB: this will leave scheduler loop
1240                         // immediately after return!
1241                           
1242     IF_PAR_DEBUG(fish, // verbose,
1243                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1244
1245   }
1246
1247     // JB: IMHO, this should all be hidden inside sendFish(...)
1248     /* pe = choosePE(); 
1249        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1250                 NEW_FISH_HUNGER);
1251
1252     // Global statistics: count no. of fishes
1253     if (RtsFlags.ParFlags.ParStats.Global &&
1254          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1255            globalParStats.tot_fish_mess++;
1256            }
1257     */ 
1258
1259   /* delayed fishes must have been sent by now! */
1260   next_fish_to_send_at = 0;  
1261   }
1262       
1263   *receivedFinish = processMessages();
1264 # endif /* SPARKS */
1265
1266  return rtsFalse;
1267  /* NB: this function always returns rtsFalse, meaning the scheduler
1268     loop continues with the next iteration; 
1269     rationale: 
1270       return code means success in finding work; we enter this function
1271       if there is no local work, thus have to send a fish which takes
1272       time until it arrives with work; in the meantime we should process
1273       messages in the main loop;
1274  */
1275 }
1276 #endif // PARALLEL_HASKELL
1277
1278 /* ----------------------------------------------------------------------------
1279  * PAR/GRAN: Report stats & debugging info(?)
1280  * ------------------------------------------------------------------------- */
1281
1282 #if defined(PAR) || defined(GRAN)
1283 static void
1284 scheduleGranParReport(void)
1285 {
1286   ASSERT(run_queue_hd != END_TSO_QUEUE);
1287
1288   /* Take a thread from the run queue, if we have work */
1289   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1290
1291     /* If this TSO has got its outport closed in the meantime, 
1292      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1293      * It has to be marked as TH_DEAD for this purpose.
1294      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1295
1296 JB: TODO: investigate wether state change field could be nuked
1297      entirely and replaced by the normal tso state (whatnext
1298      field). All we want to do is to kill tsos from outside.
1299      */
1300
1301     /* ToDo: write something to the log-file
1302     if (RTSflags.ParFlags.granSimStats && !sameThread)
1303         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1304
1305     CurrentTSO = t;
1306     */
1307     /* the spark pool for the current PE */
1308     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1309
1310     IF_DEBUG(scheduler, 
1311              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1312                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1313
1314     IF_PAR_DEBUG(fish,
1315              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1316                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1317
1318     if (RtsFlags.ParFlags.ParStats.Full && 
1319         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1320         (emitSchedule || // forced emit
1321          (t && LastTSO && t->id != LastTSO->id))) {
1322       /* 
1323          we are running a different TSO, so write a schedule event to log file
1324          NB: If we use fair scheduling we also have to write  a deschedule 
1325              event for LastTSO; with unfair scheduling we know that the
1326              previous tso has blocked whenever we switch to another tso, so
1327              we don't need it in GUM for now
1328       */
1329       IF_PAR_DEBUG(fish, // schedule,
1330                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1331
1332       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1333                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1334       emitSchedule = rtsFalse;
1335     }
1336 }     
1337 #endif
1338
1339 /* ----------------------------------------------------------------------------
1340  * After running a thread...
1341  * ASSUMES: sched_mutex
1342  * ------------------------------------------------------------------------- */
1343
1344 static void
1345 schedulePostRunThread(void)
1346 {
1347 #if defined(PAR)
1348     /* HACK 675: if the last thread didn't yield, make sure to print a 
1349        SCHEDULE event to the log file when StgRunning the next thread, even
1350        if it is the same one as before */
1351     LastTSO = t; 
1352     TimeOfLastYield = CURRENT_TIME;
1353 #endif
1354
1355   /* some statistics gathering in the parallel case */
1356
1357 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1358   switch (ret) {
1359     case HeapOverflow:
1360 # if defined(GRAN)
1361       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1362       globalGranStats.tot_heapover++;
1363 # elif defined(PAR)
1364       globalParStats.tot_heapover++;
1365 # endif
1366       break;
1367
1368      case StackOverflow:
1369 # if defined(GRAN)
1370       IF_DEBUG(gran, 
1371                DumpGranEvent(GR_DESCHEDULE, t));
1372       globalGranStats.tot_stackover++;
1373 # elif defined(PAR)
1374       // IF_DEBUG(par, 
1375       // DumpGranEvent(GR_DESCHEDULE, t);
1376       globalParStats.tot_stackover++;
1377 # endif
1378       break;
1379
1380     case ThreadYielding:
1381 # if defined(GRAN)
1382       IF_DEBUG(gran, 
1383                DumpGranEvent(GR_DESCHEDULE, t));
1384       globalGranStats.tot_yields++;
1385 # elif defined(PAR)
1386       // IF_DEBUG(par, 
1387       // DumpGranEvent(GR_DESCHEDULE, t);
1388       globalParStats.tot_yields++;
1389 # endif
1390       break; 
1391
1392     case ThreadBlocked:
1393 # if defined(GRAN)
1394       IF_DEBUG(scheduler,
1395                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1396                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1397                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1398                if (t->block_info.closure!=(StgClosure*)NULL)
1399                  print_bq(t->block_info.closure);
1400                debugBelch("\n"));
1401
1402       // ??? needed; should emit block before
1403       IF_DEBUG(gran, 
1404                DumpGranEvent(GR_DESCHEDULE, t)); 
1405       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1406       /*
1407         ngoq Dogh!
1408       ASSERT(procStatus[CurrentProc]==Busy || 
1409               ((procStatus[CurrentProc]==Fetching) && 
1410               (t->block_info.closure!=(StgClosure*)NULL)));
1411       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1412           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1413             procStatus[CurrentProc]==Fetching)) 
1414         procStatus[CurrentProc] = Idle;
1415       */
1416 # elif defined(PAR)
1417 //++PAR++  blockThread() writes the event (change?)
1418 # endif
1419     break;
1420
1421   case ThreadFinished:
1422     break;
1423
1424   default:
1425     barf("parGlobalStats: unknown return code");
1426     break;
1427     }
1428 #endif
1429 }
1430
1431 /* -----------------------------------------------------------------------------
1432  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1433  * ASSUMES: sched_mutex
1434  * -------------------------------------------------------------------------- */
1435
1436 static rtsBool
1437 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1438 {
1439     // did the task ask for a large block?
1440     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1441         // if so, get one and push it on the front of the nursery.
1442         bdescr *bd;
1443         lnat blocks;
1444         
1445         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1446         
1447         IF_DEBUG(scheduler,
1448                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1449                             (long)t->id, whatNext_strs[t->what_next], blocks));
1450         
1451         // don't do this if it would push us over the
1452         // alloc_blocks_lim limit; we'll GC first.
1453         if (alloc_blocks + blocks < alloc_blocks_lim) {
1454             
1455             alloc_blocks += blocks;
1456             bd = allocGroup( blocks );
1457             
1458             // link the new group into the list
1459             bd->link = cap->r.rCurrentNursery;
1460             bd->u.back = cap->r.rCurrentNursery->u.back;
1461             if (cap->r.rCurrentNursery->u.back != NULL) {
1462                 cap->r.rCurrentNursery->u.back->link = bd;
1463             } else {
1464 #if !defined(SMP)
1465                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1466                        g0s0 == cap->r.rNursery);
1467                 g0s0->blocks = bd;
1468 #endif
1469                 cap->r.rNursery->blocks = bd;
1470             }             
1471             cap->r.rCurrentNursery->u.back = bd;
1472             
1473             // initialise it as a nursery block.  We initialise the
1474             // step, gen_no, and flags field of *every* sub-block in
1475             // this large block, because this is easier than making
1476             // sure that we always find the block head of a large
1477             // block whenever we call Bdescr() (eg. evacuate() and
1478             // isAlive() in the GC would both have to do this, at
1479             // least).
1480             { 
1481                 bdescr *x;
1482                 for (x = bd; x < bd + blocks; x++) {
1483                     x->step = g0s0;
1484                     x->gen_no = 0;
1485                     x->flags = 0;
1486                 }
1487             }
1488             
1489 #if !defined(SMP)
1490             // don't forget to update the block count in g0s0.
1491             g0s0->n_blocks += blocks;
1492
1493             // This assert can be a killer if the app is doing lots
1494             // of large block allocations.
1495             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1496 #endif
1497             
1498             // now update the nursery to point to the new block
1499             cap->r.rCurrentNursery = bd;
1500             
1501             // we might be unlucky and have another thread get on the
1502             // run queue before us and steal the large block, but in that
1503             // case the thread will just end up requesting another large
1504             // block.
1505             PUSH_ON_RUN_QUEUE(t);
1506             return rtsFalse;  /* not actually GC'ing */
1507         }
1508     }
1509     
1510     /* make all the running tasks block on a condition variable,
1511      * maybe set context_switch and wait till they all pile in,
1512      * then have them wait on a GC condition variable.
1513      */
1514     IF_DEBUG(scheduler,
1515              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1516                         (long)t->id, whatNext_strs[t->what_next]));
1517     threadPaused(t);
1518 #if defined(GRAN)
1519     ASSERT(!is_on_queue(t,CurrentProc));
1520 #elif defined(PARALLEL_HASKELL)
1521     /* Currently we emit a DESCHEDULE event before GC in GUM.
1522        ToDo: either add separate event to distinguish SYSTEM time from rest
1523        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1524     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1525         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1526                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1527         emitSchedule = rtsTrue;
1528     }
1529 #endif
1530       
1531     PUSH_ON_RUN_QUEUE(t);
1532     return rtsTrue;
1533     /* actual GC is done at the end of the while loop in schedule() */
1534 }
1535
1536 /* -----------------------------------------------------------------------------
1537  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1538  * ASSUMES: sched_mutex
1539  * -------------------------------------------------------------------------- */
1540
1541 static void
1542 scheduleHandleStackOverflow( StgTSO *t)
1543 {
1544     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1545                                   (long)t->id, whatNext_strs[t->what_next]));
1546     /* just adjust the stack for this thread, then pop it back
1547      * on the run queue.
1548      */
1549     threadPaused(t);
1550     { 
1551         /* enlarge the stack */
1552         StgTSO *new_t = threadStackOverflow(t);
1553         
1554         /* This TSO has moved, so update any pointers to it from the
1555          * main thread stack.  It better not be on any other queues...
1556          * (it shouldn't be).
1557          */
1558         if (t->main != NULL) {
1559             t->main->tso = new_t;
1560         }
1561         PUSH_ON_RUN_QUEUE(new_t);
1562     }
1563 }
1564
1565 /* -----------------------------------------------------------------------------
1566  * Handle a thread that returned to the scheduler with ThreadYielding
1567  * ASSUMES: sched_mutex
1568  * -------------------------------------------------------------------------- */
1569
1570 static rtsBool
1571 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1572 {
1573     // Reset the context switch flag.  We don't do this just before
1574     // running the thread, because that would mean we would lose ticks
1575     // during GC, which can lead to unfair scheduling (a thread hogs
1576     // the CPU because the tick always arrives during GC).  This way
1577     // penalises threads that do a lot of allocation, but that seems
1578     // better than the alternative.
1579     context_switch = 0;
1580     
1581     /* put the thread back on the run queue.  Then, if we're ready to
1582      * GC, check whether this is the last task to stop.  If so, wake
1583      * up the GC thread.  getThread will block during a GC until the
1584      * GC is finished.
1585      */
1586     IF_DEBUG(scheduler,
1587              if (t->what_next != prev_what_next) {
1588                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1589                             (long)t->id, whatNext_strs[t->what_next]);
1590              } else {
1591                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1592                             (long)t->id, whatNext_strs[t->what_next]);
1593              }
1594         );
1595     
1596     IF_DEBUG(sanity,
1597              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1598              checkTSO(t));
1599     ASSERT(t->link == END_TSO_QUEUE);
1600     
1601     // Shortcut if we're just switching evaluators: don't bother
1602     // doing stack squeezing (which can be expensive), just run the
1603     // thread.
1604     if (t->what_next != prev_what_next) {
1605         return rtsTrue;
1606     }
1607     
1608     threadPaused(t);
1609     
1610 #if defined(GRAN)
1611     ASSERT(!is_on_queue(t,CurrentProc));
1612       
1613     IF_DEBUG(sanity,
1614              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1615              checkThreadQsSanity(rtsTrue));
1616
1617 #endif
1618
1619     addToRunQueue(t);
1620
1621 #if defined(GRAN)
1622     /* add a ContinueThread event to actually process the thread */
1623     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1624               ContinueThread,
1625               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1626     IF_GRAN_DEBUG(bq, 
1627                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1628                   G_EVENTQ(0);
1629                   G_CURR_THREADQ(0));
1630 #endif
1631     return rtsFalse;
1632 }
1633
1634 /* -----------------------------------------------------------------------------
1635  * Handle a thread that returned to the scheduler with ThreadBlocked
1636  * ASSUMES: sched_mutex
1637  * -------------------------------------------------------------------------- */
1638
1639 static void
1640 scheduleHandleThreadBlocked( StgTSO *t
1641 #if !defined(GRAN) && !defined(DEBUG)
1642     STG_UNUSED
1643 #endif
1644     )
1645 {
1646 #if defined(GRAN)
1647     IF_DEBUG(scheduler,
1648              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1649                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1650              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1651     
1652     // ??? needed; should emit block before
1653     IF_DEBUG(gran, 
1654              DumpGranEvent(GR_DESCHEDULE, t)); 
1655     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1656     /*
1657       ngoq Dogh!
1658       ASSERT(procStatus[CurrentProc]==Busy || 
1659       ((procStatus[CurrentProc]==Fetching) && 
1660       (t->block_info.closure!=(StgClosure*)NULL)));
1661       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1662       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1663       procStatus[CurrentProc]==Fetching)) 
1664       procStatus[CurrentProc] = Idle;
1665     */
1666 #elif defined(PAR)
1667     IF_DEBUG(scheduler,
1668              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1669                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1670     IF_PAR_DEBUG(bq,
1671                  
1672                  if (t->block_info.closure!=(StgClosure*)NULL) 
1673                  print_bq(t->block_info.closure));
1674     
1675     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1676     blockThread(t);
1677     
1678     /* whatever we schedule next, we must log that schedule */
1679     emitSchedule = rtsTrue;
1680     
1681 #else /* !GRAN */
1682       /* don't need to do anything.  Either the thread is blocked on
1683        * I/O, in which case we'll have called addToBlockedQueue
1684        * previously, or it's blocked on an MVar or Blackhole, in which
1685        * case it'll be on the relevant queue already.
1686        */
1687     ASSERT(t->why_blocked != NotBlocked);
1688     IF_DEBUG(scheduler,
1689              debugBelch("--<< thread %d (%s) stopped: ", 
1690                         t->id, whatNext_strs[t->what_next]);
1691              printThreadBlockage(t);
1692              debugBelch("\n"));
1693     
1694     /* Only for dumping event to log file 
1695        ToDo: do I need this in GranSim, too?
1696        blockThread(t);
1697     */
1698 #endif
1699 }
1700
1701 /* -----------------------------------------------------------------------------
1702  * Handle a thread that returned to the scheduler with ThreadFinished
1703  * ASSUMES: sched_mutex
1704  * -------------------------------------------------------------------------- */
1705
1706 static rtsBool
1707 scheduleHandleThreadFinished( StgMainThread *mainThread
1708                               USED_WHEN_RTS_SUPPORTS_THREADS,
1709                               Capability *cap,
1710                               StgTSO *t )
1711 {
1712     /* Need to check whether this was a main thread, and if so,
1713      * return with the return value.
1714      *
1715      * We also end up here if the thread kills itself with an
1716      * uncaught exception, see Exception.cmm.
1717      */
1718     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1719                                   t->id, whatNext_strs[t->what_next]));
1720
1721 #if defined(GRAN)
1722       endThread(t, CurrentProc); // clean-up the thread
1723 #elif defined(PARALLEL_HASKELL)
1724       /* For now all are advisory -- HWL */
1725       //if(t->priority==AdvisoryPriority) ??
1726       advisory_thread_count--; // JB: Caution with this counter, buggy!
1727       
1728 # if defined(DIST)
1729       if(t->dist.priority==RevalPriority)
1730         FinishReval(t);
1731 # endif
1732     
1733 # if defined(EDENOLD)
1734       // the thread could still have an outport... (BUG)
1735       if (t->eden.outport != -1) {
1736       // delete the outport for the tso which has finished...
1737         IF_PAR_DEBUG(eden_ports,
1738                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1739                               t->eden.outport, t->id));
1740         deleteOPT(t);
1741       }
1742       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1743       if (t->eden.epid != -1) {
1744         IF_PAR_DEBUG(eden_ports,
1745                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1746                            t->id, t->eden.epid));
1747         removeTSOfromProcess(t);
1748       }
1749 # endif 
1750
1751 # if defined(PAR)
1752       if (RtsFlags.ParFlags.ParStats.Full &&
1753           !RtsFlags.ParFlags.ParStats.Suppressed) 
1754         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1755
1756       //  t->par only contains statistics: left out for now...
1757       IF_PAR_DEBUG(fish,
1758                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1759                               t->id,t,t->par.sparkname));
1760 # endif
1761 #endif // PARALLEL_HASKELL
1762
1763       //
1764       // Check whether the thread that just completed was a main
1765       // thread, and if so return with the result.  
1766       //
1767       // There is an assumption here that all thread completion goes
1768       // through this point; we need to make sure that if a thread
1769       // ends up in the ThreadKilled state, that it stays on the run
1770       // queue so it can be dealt with here.
1771       //
1772       if (
1773 #if defined(RTS_SUPPORTS_THREADS)
1774           mainThread != NULL
1775 #else
1776           mainThread->tso == t
1777 #endif
1778           )
1779       {
1780           // We are a bound thread: this must be our thread that just
1781           // completed.
1782           ASSERT(mainThread->tso == t);
1783
1784           if (t->what_next == ThreadComplete) {
1785               if (mainThread->ret) {
1786                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1787                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1788               }
1789               mainThread->stat = Success;
1790           } else {
1791               if (mainThread->ret) {
1792                   *(mainThread->ret) = NULL;
1793               }
1794               if (interrupted) {
1795                   mainThread->stat = Interrupted;
1796               } else {
1797                   mainThread->stat = Killed;
1798               }
1799           }
1800 #ifdef DEBUG
1801           removeThreadLabel((StgWord)mainThread->tso->id);
1802 #endif
1803           if (mainThread->prev == NULL) {
1804               main_threads = mainThread->link;
1805           } else {
1806               mainThread->prev->link = mainThread->link;
1807           }
1808           if (mainThread->link != NULL) {
1809               mainThread->link->prev = NULL;
1810           }
1811           releaseCapability(cap);
1812           return rtsTrue; // tells schedule() to return
1813       }
1814
1815 #ifdef RTS_SUPPORTS_THREADS
1816       ASSERT(t->main == NULL);
1817 #else
1818       if (t->main != NULL) {
1819           // Must be a main thread that is not the topmost one.  Leave
1820           // it on the run queue until the stack has unwound to the
1821           // point where we can deal with this.  Leaving it on the run
1822           // queue also ensures that the garbage collector knows about
1823           // this thread and its return value (it gets dropped from the
1824           // all_threads list so there's no other way to find it).
1825           APPEND_TO_RUN_QUEUE(t);
1826       }
1827 #endif
1828       return rtsFalse;
1829 }
1830
1831 /* -----------------------------------------------------------------------------
1832  * Perform a heap census, if PROFILING
1833  * -------------------------------------------------------------------------- */
1834
1835 static rtsBool
1836 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1837 {
1838 #if defined(PROFILING)
1839     // When we have +RTS -i0 and we're heap profiling, do a census at
1840     // every GC.  This lets us get repeatable runs for debugging.
1841     if (performHeapProfile ||
1842         (RtsFlags.ProfFlags.profileInterval==0 &&
1843          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1844         GarbageCollect(GetRoots, rtsTrue);
1845         heapCensus();
1846         performHeapProfile = rtsFalse;
1847         return rtsTrue;  // true <=> we already GC'd
1848     }
1849 #endif
1850     return rtsFalse;
1851 }
1852
1853 /* -----------------------------------------------------------------------------
1854  * Perform a garbage collection if necessary
1855  * ASSUMES: sched_mutex
1856  * -------------------------------------------------------------------------- */
1857
1858 static void
1859 scheduleDoGC( Capability *cap STG_UNUSED )
1860 {
1861     StgTSO *t;
1862 #ifdef SMP
1863     static rtsBool waiting_for_gc;
1864     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
1865            // subtract one because we're already holding one.
1866     Capability *caps[n_capabilities];
1867 #endif
1868
1869 #ifdef SMP
1870     // In order to GC, there must be no threads running Haskell code.
1871     // Therefore, the GC thread needs to hold *all* the capabilities,
1872     // and release them after the GC has completed.  
1873     //
1874     // This seems to be the simplest way: previous attempts involved
1875     // making all the threads with capabilities give up their
1876     // capabilities and sleep except for the *last* one, which
1877     // actually did the GC.  But it's quite hard to arrange for all
1878     // the other tasks to sleep and stay asleep.
1879     //
1880         
1881     // Someone else is already trying to GC
1882     if (waiting_for_gc) return;
1883     waiting_for_gc = rtsTrue;
1884
1885     caps[n_capabilities] = cap;
1886     while (n_capabilities > 0) {
1887         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
1888         waitForReturnCapability(&sched_mutex, &cap);
1889         n_capabilities--;
1890         caps[n_capabilities] = cap;
1891     }
1892
1893     waiting_for_gc = rtsFalse;
1894 #endif
1895
1896     /* Kick any transactions which are invalid back to their
1897      * atomically frames.  When next scheduled they will try to
1898      * commit, this commit will fail and they will retry.
1899      */
1900     for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1901         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1902             if (!stmValidateTransaction (t -> trec)) {
1903                 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1904                 
1905                 // strip the stack back to the ATOMICALLY_FRAME, aborting
1906                 // the (nested) transaction, and saving the stack of any
1907                 // partially-evaluated thunks on the heap.
1908                 raiseAsync_(t, NULL, rtsTrue);
1909                 
1910 #ifdef REG_R1
1911                 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1912 #endif
1913             }
1914         }
1915     }
1916     
1917     // so this happens periodically:
1918     scheduleCheckBlackHoles();
1919     
1920     /* everybody back, start the GC.
1921      * Could do it in this thread, or signal a condition var
1922      * to do it in another thread.  Either way, we need to
1923      * broadcast on gc_pending_cond afterward.
1924      */
1925 #if defined(RTS_SUPPORTS_THREADS)
1926     IF_DEBUG(scheduler,sched_belch("doing GC"));
1927 #endif
1928     GarbageCollect(GetRoots,rtsFalse);
1929     
1930 #if defined(SMP)
1931     {
1932         // release our stash of capabilities.
1933         nat i;
1934         for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
1935             releaseCapability(caps[i]);
1936         }
1937     }
1938 #endif
1939
1940 #if defined(GRAN)
1941     /* add a ContinueThread event to continue execution of current thread */
1942     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1943               ContinueThread,
1944               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1945     IF_GRAN_DEBUG(bq, 
1946                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1947                   G_EVENTQ(0);
1948                   G_CURR_THREADQ(0));
1949 #endif /* GRAN */
1950 }
1951
1952 /* ---------------------------------------------------------------------------
1953  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1954  * used by Control.Concurrent for error checking.
1955  * ------------------------------------------------------------------------- */
1956  
1957 StgBool
1958 rtsSupportsBoundThreads(void)
1959 {
1960 #ifdef THREADED_RTS
1961   return rtsTrue;
1962 #else
1963   return rtsFalse;
1964 #endif
1965 }
1966
1967 /* ---------------------------------------------------------------------------
1968  * isThreadBound(tso): check whether tso is bound to an OS thread.
1969  * ------------------------------------------------------------------------- */
1970  
1971 StgBool
1972 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1973 {
1974 #ifdef THREADED_RTS
1975   return (tso->main != NULL);
1976 #endif
1977   return rtsFalse;
1978 }
1979
1980 /* ---------------------------------------------------------------------------
1981  * Singleton fork(). Do not copy any running threads.
1982  * ------------------------------------------------------------------------- */
1983
1984 #ifndef mingw32_HOST_OS
1985 #define FORKPROCESS_PRIMOP_SUPPORTED
1986 #endif
1987
1988 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1989 static void 
1990 deleteThreadImmediately(StgTSO *tso);
1991 #endif
1992 StgInt
1993 forkProcess(HsStablePtr *entry
1994 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1995             STG_UNUSED
1996 #endif
1997            )
1998 {
1999 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2000   pid_t pid;
2001   StgTSO* t,*next;
2002   StgMainThread *m;
2003   SchedulerStatus rc;
2004
2005   IF_DEBUG(scheduler,sched_belch("forking!"));
2006   rts_lock(); // This not only acquires sched_mutex, it also
2007               // makes sure that no other threads are running
2008
2009   pid = fork();
2010
2011   if (pid) { /* parent */
2012
2013   /* just return the pid */
2014     rts_unlock();
2015     return pid;
2016     
2017   } else { /* child */
2018     
2019     
2020       // delete all threads
2021     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2022     
2023     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2024       next = t->link;
2025
2026         // don't allow threads to catch the ThreadKilled exception
2027       deleteThreadImmediately(t);
2028     }
2029     
2030       // wipe the main thread list
2031     while((m = main_threads) != NULL) {
2032       main_threads = m->link;
2033 # ifdef THREADED_RTS
2034       closeCondition(&m->bound_thread_cond);
2035 # endif
2036       stgFree(m);
2037     }
2038     
2039     rc = rts_evalStableIO(entry, NULL);  // run the action
2040     rts_checkSchedStatus("forkProcess",rc);
2041     
2042     rts_unlock();
2043     
2044     hs_exit();                      // clean up and exit
2045     stg_exit(0);
2046   }
2047 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2048   barf("forkProcess#: primop not supported, sorry!\n");
2049   return -1;
2050 #endif
2051 }
2052
2053 /* ---------------------------------------------------------------------------
2054  * deleteAllThreads():  kill all the live threads.
2055  *
2056  * This is used when we catch a user interrupt (^C), before performing
2057  * any necessary cleanups and running finalizers.
2058  *
2059  * Locks: sched_mutex held.
2060  * ------------------------------------------------------------------------- */
2061    
2062 void
2063 deleteAllThreads ( void )
2064 {
2065   StgTSO* t, *next;
2066   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2067   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2068       next = t->global_link;
2069       deleteThread(t);
2070   }      
2071
2072   // The run queue now contains a bunch of ThreadKilled threads.  We
2073   // must not throw these away: the main thread(s) will be in there
2074   // somewhere, and the main scheduler loop has to deal with it.
2075   // Also, the run queue is the only thing keeping these threads from
2076   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2077
2078   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2079   ASSERT(blackhole_queue == END_TSO_QUEUE);
2080   ASSERT(sleeping_queue == END_TSO_QUEUE);
2081 }
2082
2083 /* startThread and  insertThread are now in GranSim.c -- HWL */
2084
2085
2086 /* ---------------------------------------------------------------------------
2087  * Suspending & resuming Haskell threads.
2088  * 
2089  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2090  * its capability before calling the C function.  This allows another
2091  * task to pick up the capability and carry on running Haskell
2092  * threads.  It also means that if the C call blocks, it won't lock
2093  * the whole system.
2094  *
2095  * The Haskell thread making the C call is put to sleep for the
2096  * duration of the call, on the susepended_ccalling_threads queue.  We
2097  * give out a token to the task, which it can use to resume the thread
2098  * on return from the C function.
2099  * ------------------------------------------------------------------------- */
2100    
2101 StgInt
2102 suspendThread( StgRegTable *reg )
2103 {
2104   nat tok;
2105   Capability *cap;
2106   int saved_errno = errno;
2107
2108   /* assume that *reg is a pointer to the StgRegTable part
2109    * of a Capability.
2110    */
2111   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2112
2113   ACQUIRE_LOCK(&sched_mutex);
2114
2115   IF_DEBUG(scheduler,
2116            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2117
2118   // XXX this might not be necessary --SDM
2119   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2120
2121   threadPaused(cap->r.rCurrentTSO);
2122   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2123   suspended_ccalling_threads = cap->r.rCurrentTSO;
2124
2125   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2126       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2127       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2128   } else {
2129       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2130   }
2131
2132   /* Use the thread ID as the token; it should be unique */
2133   tok = cap->r.rCurrentTSO->id;
2134
2135   /* Hand back capability */
2136   cap->r.rInHaskell = rtsFalse;
2137   releaseCapability(cap);
2138   
2139 #if defined(RTS_SUPPORTS_THREADS)
2140   /* Preparing to leave the RTS, so ensure there's a native thread/task
2141      waiting to take over.
2142   */
2143   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2144 #endif
2145
2146   RELEASE_LOCK(&sched_mutex);
2147   
2148   errno = saved_errno;
2149   return tok; 
2150 }
2151
2152 StgRegTable *
2153 resumeThread( StgInt tok )
2154 {
2155   StgTSO *tso, **prev;
2156   Capability *cap;
2157   int saved_errno = errno;
2158
2159 #if defined(RTS_SUPPORTS_THREADS)
2160   /* Wait for permission to re-enter the RTS with the result. */
2161   ACQUIRE_LOCK(&sched_mutex);
2162   waitForReturnCapability(&sched_mutex, &cap);
2163
2164   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2165 #else
2166   grabCapability(&cap);
2167 #endif
2168
2169   /* Remove the thread off of the suspended list */
2170   prev = &suspended_ccalling_threads;
2171   for (tso = suspended_ccalling_threads; 
2172        tso != END_TSO_QUEUE; 
2173        prev = &tso->link, tso = tso->link) {
2174     if (tso->id == (StgThreadID)tok) {
2175       *prev = tso->link;
2176       break;
2177     }
2178   }
2179   if (tso == END_TSO_QUEUE) {
2180     barf("resumeThread: thread not found");
2181   }
2182   tso->link = END_TSO_QUEUE;
2183   
2184   if(tso->why_blocked == BlockedOnCCall) {
2185       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2186       tso->blocked_exceptions = NULL;
2187   }
2188   
2189   /* Reset blocking status */
2190   tso->why_blocked  = NotBlocked;
2191
2192   cap->r.rCurrentTSO = tso;
2193   cap->r.rInHaskell = rtsTrue;
2194   RELEASE_LOCK(&sched_mutex);
2195   errno = saved_errno;
2196   return &cap->r;
2197 }
2198
2199 /* ---------------------------------------------------------------------------
2200  * Comparing Thread ids.
2201  *
2202  * This is used from STG land in the implementation of the
2203  * instances of Eq/Ord for ThreadIds.
2204  * ------------------------------------------------------------------------ */
2205
2206 int
2207 cmp_thread(StgPtr tso1, StgPtr tso2) 
2208
2209   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2210   StgThreadID id2 = ((StgTSO *)tso2)->id;
2211  
2212   if (id1 < id2) return (-1);
2213   if (id1 > id2) return 1;
2214   return 0;
2215 }
2216
2217 /* ---------------------------------------------------------------------------
2218  * Fetching the ThreadID from an StgTSO.
2219  *
2220  * This is used in the implementation of Show for ThreadIds.
2221  * ------------------------------------------------------------------------ */
2222 int
2223 rts_getThreadId(StgPtr tso) 
2224 {
2225   return ((StgTSO *)tso)->id;
2226 }
2227
2228 #ifdef DEBUG
2229 void
2230 labelThread(StgPtr tso, char *label)
2231 {
2232   int len;
2233   void *buf;
2234
2235   /* Caveat: Once set, you can only set the thread name to "" */
2236   len = strlen(label)+1;
2237   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2238   strncpy(buf,label,len);
2239   /* Update will free the old memory for us */
2240   updateThreadLabel(((StgTSO *)tso)->id,buf);
2241 }
2242 #endif /* DEBUG */
2243
2244 /* ---------------------------------------------------------------------------
2245    Create a new thread.
2246
2247    The new thread starts with the given stack size.  Before the
2248    scheduler can run, however, this thread needs to have a closure
2249    (and possibly some arguments) pushed on its stack.  See
2250    pushClosure() in Schedule.h.
2251
2252    createGenThread() and createIOThread() (in SchedAPI.h) are
2253    convenient packaged versions of this function.
2254
2255    currently pri (priority) is only used in a GRAN setup -- HWL
2256    ------------------------------------------------------------------------ */
2257 #if defined(GRAN)
2258 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2259 StgTSO *
2260 createThread(nat size, StgInt pri)
2261 #else
2262 StgTSO *
2263 createThread(nat size)
2264 #endif
2265 {
2266
2267     StgTSO *tso;
2268     nat stack_size;
2269
2270     /* First check whether we should create a thread at all */
2271 #if defined(PARALLEL_HASKELL)
2272   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2273   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2274     threadsIgnored++;
2275     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2276           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2277     return END_TSO_QUEUE;
2278   }
2279   threadsCreated++;
2280 #endif
2281
2282 #if defined(GRAN)
2283   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2284 #endif
2285
2286   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2287
2288   /* catch ridiculously small stack sizes */
2289   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2290     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2291   }
2292
2293   stack_size = size - TSO_STRUCT_SIZEW;
2294
2295   tso = (StgTSO *)allocate(size);
2296   TICK_ALLOC_TSO(stack_size, 0);
2297
2298   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2299 #if defined(GRAN)
2300   SET_GRAN_HDR(tso, ThisPE);
2301 #endif
2302
2303   // Always start with the compiled code evaluator
2304   tso->what_next = ThreadRunGHC;
2305
2306   tso->id = next_thread_id++; 
2307   tso->why_blocked  = NotBlocked;
2308   tso->blocked_exceptions = NULL;
2309
2310   tso->saved_errno = 0;
2311   tso->main = NULL;
2312   
2313   tso->stack_size   = stack_size;
2314   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2315                               - TSO_STRUCT_SIZEW;
2316   tso->sp           = (P_)&(tso->stack) + stack_size;
2317
2318   tso->trec = NO_TREC;
2319
2320 #ifdef PROFILING
2321   tso->prof.CCCS = CCS_MAIN;
2322 #endif
2323
2324   /* put a stop frame on the stack */
2325   tso->sp -= sizeofW(StgStopFrame);
2326   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2327   tso->link = END_TSO_QUEUE;
2328
2329   // ToDo: check this
2330 #if defined(GRAN)
2331   /* uses more flexible routine in GranSim */
2332   insertThread(tso, CurrentProc);
2333 #else
2334   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2335    * from its creation
2336    */
2337 #endif
2338
2339 #if defined(GRAN) 
2340   if (RtsFlags.GranFlags.GranSimStats.Full) 
2341     DumpGranEvent(GR_START,tso);
2342 #elif defined(PARALLEL_HASKELL)
2343   if (RtsFlags.ParFlags.ParStats.Full) 
2344     DumpGranEvent(GR_STARTQ,tso);
2345   /* HACk to avoid SCHEDULE 
2346      LastTSO = tso; */
2347 #endif
2348
2349   /* Link the new thread on the global thread list.
2350    */
2351   tso->global_link = all_threads;
2352   all_threads = tso;
2353
2354 #if defined(DIST)
2355   tso->dist.priority = MandatoryPriority; //by default that is...
2356 #endif
2357
2358 #if defined(GRAN)
2359   tso->gran.pri = pri;
2360 # if defined(DEBUG)
2361   tso->gran.magic = TSO_MAGIC; // debugging only
2362 # endif
2363   tso->gran.sparkname   = 0;
2364   tso->gran.startedat   = CURRENT_TIME; 
2365   tso->gran.exported    = 0;
2366   tso->gran.basicblocks = 0;
2367   tso->gran.allocs      = 0;
2368   tso->gran.exectime    = 0;
2369   tso->gran.fetchtime   = 0;
2370   tso->gran.fetchcount  = 0;
2371   tso->gran.blocktime   = 0;
2372   tso->gran.blockcount  = 0;
2373   tso->gran.blockedat   = 0;
2374   tso->gran.globalsparks = 0;
2375   tso->gran.localsparks  = 0;
2376   if (RtsFlags.GranFlags.Light)
2377     tso->gran.clock  = Now; /* local clock */
2378   else
2379     tso->gran.clock  = 0;
2380
2381   IF_DEBUG(gran,printTSO(tso));
2382 #elif defined(PARALLEL_HASKELL)
2383 # if defined(DEBUG)
2384   tso->par.magic = TSO_MAGIC; // debugging only
2385 # endif
2386   tso->par.sparkname   = 0;
2387   tso->par.startedat   = CURRENT_TIME; 
2388   tso->par.exported    = 0;
2389   tso->par.basicblocks = 0;
2390   tso->par.allocs      = 0;
2391   tso->par.exectime    = 0;
2392   tso->par.fetchtime   = 0;
2393   tso->par.fetchcount  = 0;
2394   tso->par.blocktime   = 0;
2395   tso->par.blockcount  = 0;
2396   tso->par.blockedat   = 0;
2397   tso->par.globalsparks = 0;
2398   tso->par.localsparks  = 0;
2399 #endif
2400
2401 #if defined(GRAN)
2402   globalGranStats.tot_threads_created++;
2403   globalGranStats.threads_created_on_PE[CurrentProc]++;
2404   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2405   globalGranStats.tot_sq_probes++;
2406 #elif defined(PARALLEL_HASKELL)
2407   // collect parallel global statistics (currently done together with GC stats)
2408   if (RtsFlags.ParFlags.ParStats.Global &&
2409       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2410     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2411     globalParStats.tot_threads_created++;
2412   }
2413 #endif 
2414
2415 #if defined(GRAN)
2416   IF_GRAN_DEBUG(pri,
2417                 sched_belch("==__ schedule: Created TSO %d (%p);",
2418                       CurrentProc, tso, tso->id));
2419 #elif defined(PARALLEL_HASKELL)
2420   IF_PAR_DEBUG(verbose,
2421                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2422                            (long)tso->id, tso, advisory_thread_count));
2423 #else
2424   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2425                                  (long)tso->id, (long)tso->stack_size));
2426 #endif    
2427   return tso;
2428 }
2429
2430 #if defined(PAR)
2431 /* RFP:
2432    all parallel thread creation calls should fall through the following routine.
2433 */
2434 StgTSO *
2435 createThreadFromSpark(rtsSpark spark) 
2436 { StgTSO *tso;
2437   ASSERT(spark != (rtsSpark)NULL);
2438 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2439   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2440   { threadsIgnored++;
2441     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2442           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2443     return END_TSO_QUEUE;
2444   }
2445   else
2446   { threadsCreated++;
2447     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2448     if (tso==END_TSO_QUEUE)     
2449       barf("createSparkThread: Cannot create TSO");
2450 #if defined(DIST)
2451     tso->priority = AdvisoryPriority;
2452 #endif
2453     pushClosure(tso,spark);
2454     addToRunQueue(tso);
2455     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2456   }
2457   return tso;
2458 }
2459 #endif
2460
2461 /*
2462   Turn a spark into a thread.
2463   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2464 */
2465 #if 0
2466 StgTSO *
2467 activateSpark (rtsSpark spark) 
2468 {
2469   StgTSO *tso;
2470
2471   tso = createSparkThread(spark);
2472   if (RtsFlags.ParFlags.ParStats.Full) {   
2473     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2474       IF_PAR_DEBUG(verbose,
2475                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2476                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2477   }
2478   // ToDo: fwd info on local/global spark to thread -- HWL
2479   // tso->gran.exported =  spark->exported;
2480   // tso->gran.locked =   !spark->global;
2481   // tso->gran.sparkname = spark->name;
2482
2483   return tso;
2484 }
2485 #endif
2486
2487 /* ---------------------------------------------------------------------------
2488  * scheduleThread()
2489  *
2490  * scheduleThread puts a thread on the head of the runnable queue.
2491  * This will usually be done immediately after a thread is created.
2492  * The caller of scheduleThread must create the thread using e.g.
2493  * createThread and push an appropriate closure
2494  * on this thread's stack before the scheduler is invoked.
2495  * ------------------------------------------------------------------------ */
2496
2497 static void
2498 scheduleThread_(StgTSO *tso)
2499 {
2500   // The thread goes at the *end* of the run-queue, to avoid possible
2501   // starvation of any threads already on the queue.
2502   APPEND_TO_RUN_QUEUE(tso);
2503   threadRunnable();
2504 }
2505
2506 void
2507 scheduleThread(StgTSO* tso)
2508 {
2509   ACQUIRE_LOCK(&sched_mutex);
2510   scheduleThread_(tso);
2511   RELEASE_LOCK(&sched_mutex);
2512 }
2513
2514 #if defined(RTS_SUPPORTS_THREADS)
2515 static Condition bound_cond_cache;
2516 static int bound_cond_cache_full = 0;
2517 #endif
2518
2519
2520 SchedulerStatus
2521 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2522                    Capability *initialCapability)
2523 {
2524     // Precondition: sched_mutex must be held
2525     StgMainThread *m;
2526
2527     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2528     m->tso = tso;
2529     tso->main = m;
2530     m->ret = ret;
2531     m->stat = NoStatus;
2532     m->link = main_threads;
2533     m->prev = NULL;
2534     if (main_threads != NULL) {
2535         main_threads->prev = m;
2536     }
2537     main_threads = m;
2538
2539 #if defined(RTS_SUPPORTS_THREADS)
2540     // Allocating a new condition for each thread is expensive, so we
2541     // cache one.  This is a pretty feeble hack, but it helps speed up
2542     // consecutive call-ins quite a bit.
2543     if (bound_cond_cache_full) {
2544         m->bound_thread_cond = bound_cond_cache;
2545         bound_cond_cache_full = 0;
2546     } else {
2547         initCondition(&m->bound_thread_cond);
2548     }
2549 #endif
2550
2551     /* Put the thread on the main-threads list prior to scheduling the TSO.
2552        Failure to do so introduces a race condition in the MT case (as
2553        identified by Wolfgang Thaller), whereby the new task/OS thread 
2554        created by scheduleThread_() would complete prior to the thread
2555        that spawned it managed to put 'itself' on the main-threads list.
2556        The upshot of it all being that the worker thread wouldn't get to
2557        signal the completion of the its work item for the main thread to
2558        see (==> it got stuck waiting.)    -- sof 6/02.
2559     */
2560     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2561     
2562     APPEND_TO_RUN_QUEUE(tso);
2563     // NB. Don't call threadRunnable() here, because the thread is
2564     // bound and only runnable by *this* OS thread, so waking up other
2565     // workers will just slow things down.
2566
2567     return waitThread_(m, initialCapability);
2568 }
2569
2570 /* ---------------------------------------------------------------------------
2571  * initScheduler()
2572  *
2573  * Initialise the scheduler.  This resets all the queues - if the
2574  * queues contained any threads, they'll be garbage collected at the
2575  * next pass.
2576  *
2577  * ------------------------------------------------------------------------ */
2578
2579 void 
2580 initScheduler(void)
2581 {
2582 #if defined(GRAN)
2583   nat i;
2584
2585   for (i=0; i<=MAX_PROC; i++) {
2586     run_queue_hds[i]      = END_TSO_QUEUE;
2587     run_queue_tls[i]      = END_TSO_QUEUE;
2588     blocked_queue_hds[i]  = END_TSO_QUEUE;
2589     blocked_queue_tls[i]  = END_TSO_QUEUE;
2590     ccalling_threadss[i]  = END_TSO_QUEUE;
2591     blackhole_queue[i]    = END_TSO_QUEUE;
2592     sleeping_queue        = END_TSO_QUEUE;
2593   }
2594 #else
2595   run_queue_hd      = END_TSO_QUEUE;
2596   run_queue_tl      = END_TSO_QUEUE;
2597   blocked_queue_hd  = END_TSO_QUEUE;
2598   blocked_queue_tl  = END_TSO_QUEUE;
2599   blackhole_queue   = END_TSO_QUEUE;
2600   sleeping_queue    = END_TSO_QUEUE;
2601 #endif 
2602
2603   suspended_ccalling_threads  = END_TSO_QUEUE;
2604
2605   main_threads = NULL;
2606   all_threads  = END_TSO_QUEUE;
2607
2608   context_switch = 0;
2609   interrupted    = 0;
2610
2611   RtsFlags.ConcFlags.ctxtSwitchTicks =
2612       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2613       
2614 #if defined(RTS_SUPPORTS_THREADS)
2615   /* Initialise the mutex and condition variables used by
2616    * the scheduler. */
2617   initMutex(&sched_mutex);
2618   initMutex(&term_mutex);
2619 #endif
2620   
2621   ACQUIRE_LOCK(&sched_mutex);
2622
2623   /* A capability holds the state a native thread needs in
2624    * order to execute STG code. At least one capability is
2625    * floating around (only SMP builds have more than one).
2626    */
2627   initCapabilities();
2628   
2629 #if defined(RTS_SUPPORTS_THREADS)
2630   initTaskManager();
2631 #endif
2632
2633 #if defined(SMP)
2634   /* eagerly start some extra workers */
2635   startingWorkerThread = RtsFlags.ParFlags.nNodes;
2636   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2637 #endif
2638
2639 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2640   initSparkPools();
2641 #endif
2642
2643   RELEASE_LOCK(&sched_mutex);
2644 }
2645
2646 void
2647 exitScheduler( void )
2648 {
2649     interrupted = rtsTrue;
2650     shutting_down_scheduler = rtsTrue;
2651 #if defined(RTS_SUPPORTS_THREADS)
2652     if (threadIsTask(osThreadId())) { taskStop(); }
2653     stopTaskManager();
2654 #endif
2655 }
2656
2657 /* ----------------------------------------------------------------------------
2658    Managing the per-task allocation areas.
2659    
2660    Each capability comes with an allocation area.  These are
2661    fixed-length block lists into which allocation can be done.
2662
2663    ToDo: no support for two-space collection at the moment???
2664    ------------------------------------------------------------------------- */
2665
2666 static SchedulerStatus
2667 waitThread_(StgMainThread* m, Capability *initialCapability)
2668 {
2669   SchedulerStatus stat;
2670
2671   // Precondition: sched_mutex must be held.
2672   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2673
2674 #if defined(GRAN)
2675   /* GranSim specific init */
2676   CurrentTSO = m->tso;                // the TSO to run
2677   procStatus[MainProc] = Busy;        // status of main PE
2678   CurrentProc = MainProc;             // PE to run it on
2679   schedule(m,initialCapability);
2680 #else
2681   schedule(m,initialCapability);
2682   ASSERT(m->stat != NoStatus);
2683 #endif
2684
2685   stat = m->stat;
2686
2687 #if defined(RTS_SUPPORTS_THREADS)
2688   // Free the condition variable, returning it to the cache if possible.
2689   if (!bound_cond_cache_full) {
2690       bound_cond_cache = m->bound_thread_cond;
2691       bound_cond_cache_full = 1;
2692   } else {
2693       closeCondition(&m->bound_thread_cond);
2694   }
2695 #endif
2696
2697   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2698   stgFree(m);
2699
2700   // Postcondition: sched_mutex still held
2701   return stat;
2702 }
2703
2704 /* ---------------------------------------------------------------------------
2705    Where are the roots that we know about?
2706
2707         - all the threads on the runnable queue
2708         - all the threads on the blocked queue
2709         - all the threads on the sleeping queue
2710         - all the thread currently executing a _ccall_GC
2711         - all the "main threads"
2712      
2713    ------------------------------------------------------------------------ */
2714
2715 /* This has to be protected either by the scheduler monitor, or by the
2716         garbage collection monitor (probably the latter).
2717         KH @ 25/10/99
2718 */
2719
2720 void
2721 GetRoots( evac_fn evac )
2722 {
2723 #if defined(GRAN)
2724   {
2725     nat i;
2726     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2727       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2728           evac((StgClosure **)&run_queue_hds[i]);
2729       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2730           evac((StgClosure **)&run_queue_tls[i]);
2731       
2732       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2733           evac((StgClosure **)&blocked_queue_hds[i]);
2734       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2735           evac((StgClosure **)&blocked_queue_tls[i]);
2736       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2737           evac((StgClosure **)&ccalling_threads[i]);
2738     }
2739   }
2740
2741   markEventQueue();
2742
2743 #else /* !GRAN */
2744   if (run_queue_hd != END_TSO_QUEUE) {
2745       ASSERT(run_queue_tl != END_TSO_QUEUE);
2746       evac((StgClosure **)&run_queue_hd);
2747       evac((StgClosure **)&run_queue_tl);
2748   }
2749   
2750   if (blocked_queue_hd != END_TSO_QUEUE) {
2751       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2752       evac((StgClosure **)&blocked_queue_hd);
2753       evac((StgClosure **)&blocked_queue_tl);
2754   }
2755   
2756   if (sleeping_queue != END_TSO_QUEUE) {
2757       evac((StgClosure **)&sleeping_queue);
2758   }
2759 #endif 
2760
2761   if (blackhole_queue != END_TSO_QUEUE) {
2762       evac((StgClosure **)&blackhole_queue);
2763   }
2764
2765   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2766       evac((StgClosure **)&suspended_ccalling_threads);
2767   }
2768
2769 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2770   markSparkQueue(evac);
2771 #endif
2772
2773 #if defined(RTS_USER_SIGNALS)
2774   // mark the signal handlers (signals should be already blocked)
2775   markSignalHandlers(evac);
2776 #endif
2777 }
2778
2779 /* -----------------------------------------------------------------------------
2780    performGC
2781
2782    This is the interface to the garbage collector from Haskell land.
2783    We provide this so that external C code can allocate and garbage
2784    collect when called from Haskell via _ccall_GC.
2785
2786    It might be useful to provide an interface whereby the programmer
2787    can specify more roots (ToDo).
2788    
2789    This needs to be protected by the GC condition variable above.  KH.
2790    -------------------------------------------------------------------------- */
2791
2792 static void (*extra_roots)(evac_fn);
2793
2794 void
2795 performGC(void)
2796 {
2797   /* Obligated to hold this lock upon entry */
2798   ACQUIRE_LOCK(&sched_mutex);
2799   GarbageCollect(GetRoots,rtsFalse);
2800   RELEASE_LOCK(&sched_mutex);
2801 }
2802
2803 void
2804 performMajorGC(void)
2805 {
2806   ACQUIRE_LOCK(&sched_mutex);
2807   GarbageCollect(GetRoots,rtsTrue);
2808   RELEASE_LOCK(&sched_mutex);
2809 }
2810
2811 static void
2812 AllRoots(evac_fn evac)
2813 {
2814     GetRoots(evac);             // the scheduler's roots
2815     extra_roots(evac);          // the user's roots
2816 }
2817
2818 void
2819 performGCWithRoots(void (*get_roots)(evac_fn))
2820 {
2821   ACQUIRE_LOCK(&sched_mutex);
2822   extra_roots = get_roots;
2823   GarbageCollect(AllRoots,rtsFalse);
2824   RELEASE_LOCK(&sched_mutex);
2825 }
2826
2827 /* -----------------------------------------------------------------------------
2828    Stack overflow
2829
2830    If the thread has reached its maximum stack size, then raise the
2831    StackOverflow exception in the offending thread.  Otherwise
2832    relocate the TSO into a larger chunk of memory and adjust its stack
2833    size appropriately.
2834    -------------------------------------------------------------------------- */
2835
2836 static StgTSO *
2837 threadStackOverflow(StgTSO *tso)
2838 {
2839   nat new_stack_size, stack_words;
2840   lnat new_tso_size;
2841   StgPtr new_sp;
2842   StgTSO *dest;
2843
2844   IF_DEBUG(sanity,checkTSO(tso));
2845   if (tso->stack_size >= tso->max_stack_size) {
2846
2847     IF_DEBUG(gc,
2848              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2849                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2850              /* If we're debugging, just print out the top of the stack */
2851              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2852                                               tso->sp+64)));
2853
2854     /* Send this thread the StackOverflow exception */
2855     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2856     return tso;
2857   }
2858
2859   /* Try to double the current stack size.  If that takes us over the
2860    * maximum stack size for this thread, then use the maximum instead.
2861    * Finally round up so the TSO ends up as a whole number of blocks.
2862    */
2863   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2864   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2865                                        TSO_STRUCT_SIZE)/sizeof(W_);
2866   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2867   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2868
2869   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2870
2871   dest = (StgTSO *)allocate(new_tso_size);
2872   TICK_ALLOC_TSO(new_stack_size,0);
2873
2874   /* copy the TSO block and the old stack into the new area */
2875   memcpy(dest,tso,TSO_STRUCT_SIZE);
2876   stack_words = tso->stack + tso->stack_size - tso->sp;
2877   new_sp = (P_)dest + new_tso_size - stack_words;
2878   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2879
2880   /* relocate the stack pointers... */
2881   dest->sp         = new_sp;
2882   dest->stack_size = new_stack_size;
2883         
2884   /* Mark the old TSO as relocated.  We have to check for relocated
2885    * TSOs in the garbage collector and any primops that deal with TSOs.
2886    *
2887    * It's important to set the sp value to just beyond the end
2888    * of the stack, so we don't attempt to scavenge any part of the
2889    * dead TSO's stack.
2890    */
2891   tso->what_next = ThreadRelocated;
2892   tso->link = dest;
2893   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2894   tso->why_blocked = NotBlocked;
2895
2896   IF_PAR_DEBUG(verbose,
2897                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2898                      tso->id, tso, tso->stack_size);
2899                /* If we're debugging, just print out the top of the stack */
2900                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2901                                                 tso->sp+64)));
2902   
2903   IF_DEBUG(sanity,checkTSO(tso));
2904 #if 0
2905   IF_DEBUG(scheduler,printTSO(dest));
2906 #endif
2907
2908   return dest;
2909 }
2910
2911 /* ---------------------------------------------------------------------------
2912    Wake up a queue that was blocked on some resource.
2913    ------------------------------------------------------------------------ */
2914
2915 #if defined(GRAN)
2916 STATIC_INLINE void
2917 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2918 {
2919 }
2920 #elif defined(PARALLEL_HASKELL)
2921 STATIC_INLINE void
2922 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2923 {
2924   /* write RESUME events to log file and
2925      update blocked and fetch time (depending on type of the orig closure) */
2926   if (RtsFlags.ParFlags.ParStats.Full) {
2927     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2928                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2929                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2930     if (EMPTY_RUN_QUEUE())
2931       emitSchedule = rtsTrue;
2932
2933     switch (get_itbl(node)->type) {
2934         case FETCH_ME_BQ:
2935           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2936           break;
2937         case RBH:
2938         case FETCH_ME:
2939         case BLACKHOLE_BQ:
2940           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2941           break;
2942 #ifdef DIST
2943         case MVAR:
2944           break;
2945 #endif    
2946         default:
2947           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2948         }
2949       }
2950 }
2951 #endif
2952
2953 #if defined(GRAN)
2954 static StgBlockingQueueElement *
2955 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2956 {
2957     StgTSO *tso;
2958     PEs node_loc, tso_loc;
2959
2960     node_loc = where_is(node); // should be lifted out of loop
2961     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2962     tso_loc = where_is((StgClosure *)tso);
2963     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2964       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2965       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2966       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2967       // insertThread(tso, node_loc);
2968       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2969                 ResumeThread,
2970                 tso, node, (rtsSpark*)NULL);
2971       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2972       // len_local++;
2973       // len++;
2974     } else { // TSO is remote (actually should be FMBQ)
2975       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2976                                   RtsFlags.GranFlags.Costs.gunblocktime +
2977                                   RtsFlags.GranFlags.Costs.latency;
2978       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2979                 UnblockThread,
2980                 tso, node, (rtsSpark*)NULL);
2981       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2982       // len++;
2983     }
2984     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2985     IF_GRAN_DEBUG(bq,
2986                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2987                           (node_loc==tso_loc ? "Local" : "Global"), 
2988                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2989     tso->block_info.closure = NULL;
2990     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2991                              tso->id, tso));
2992 }
2993 #elif defined(PARALLEL_HASKELL)
2994 static StgBlockingQueueElement *
2995 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2996 {
2997     StgBlockingQueueElement *next;
2998
2999     switch (get_itbl(bqe)->type) {
3000     case TSO:
3001       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3002       /* if it's a TSO just push it onto the run_queue */
3003       next = bqe->link;
3004       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3005       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3006       threadRunnable();
3007       unblockCount(bqe, node);
3008       /* reset blocking status after dumping event */
3009       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3010       break;
3011
3012     case BLOCKED_FETCH:
3013       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3014       next = bqe->link;
3015       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3016       PendingFetches = (StgBlockedFetch *)bqe;
3017       break;
3018
3019 # if defined(DEBUG)
3020       /* can ignore this case in a non-debugging setup; 
3021          see comments on RBHSave closures above */
3022     case CONSTR:
3023       /* check that the closure is an RBHSave closure */
3024       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3025              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3026              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3027       break;
3028
3029     default:
3030       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3031            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3032            (StgClosure *)bqe);
3033 # endif
3034     }
3035   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3036   return next;
3037 }
3038
3039 #else /* !GRAN && !PARALLEL_HASKELL */
3040 static StgTSO *
3041 unblockOneLocked(StgTSO *tso)
3042 {
3043   StgTSO *next;
3044
3045   ASSERT(get_itbl(tso)->type == TSO);
3046   ASSERT(tso->why_blocked != NotBlocked);
3047   tso->why_blocked = NotBlocked;
3048   next = tso->link;
3049   tso->link = END_TSO_QUEUE;
3050   APPEND_TO_RUN_QUEUE(tso);
3051   threadRunnable();
3052   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3053   return next;
3054 }
3055 #endif
3056
3057 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3058 INLINE_ME StgBlockingQueueElement *
3059 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3060 {
3061   ACQUIRE_LOCK(&sched_mutex);
3062   bqe = unblockOneLocked(bqe, node);
3063   RELEASE_LOCK(&sched_mutex);
3064   return bqe;
3065 }
3066 #else
3067 INLINE_ME StgTSO *
3068 unblockOne(StgTSO *tso)
3069 {
3070   ACQUIRE_LOCK(&sched_mutex);
3071   tso = unblockOneLocked(tso);
3072   RELEASE_LOCK(&sched_mutex);
3073   return tso;
3074 }
3075 #endif
3076
3077 #if defined(GRAN)
3078 void 
3079 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3080 {
3081   StgBlockingQueueElement *bqe;
3082   PEs node_loc;
3083   nat len = 0; 
3084
3085   IF_GRAN_DEBUG(bq, 
3086                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3087                       node, CurrentProc, CurrentTime[CurrentProc], 
3088                       CurrentTSO->id, CurrentTSO));
3089
3090   node_loc = where_is(node);
3091
3092   ASSERT(q == END_BQ_QUEUE ||
3093          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3094          get_itbl(q)->type == CONSTR); // closure (type constructor)
3095   ASSERT(is_unique(node));
3096
3097   /* FAKE FETCH: magically copy the node to the tso's proc;
3098      no Fetch necessary because in reality the node should not have been 
3099      moved to the other PE in the first place
3100   */
3101   if (CurrentProc!=node_loc) {
3102     IF_GRAN_DEBUG(bq, 
3103                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3104                         node, node_loc, CurrentProc, CurrentTSO->id, 
3105                         // CurrentTSO, where_is(CurrentTSO),
3106                         node->header.gran.procs));
3107     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3108     IF_GRAN_DEBUG(bq, 
3109                   debugBelch("## new bitmask of node %p is %#x\n",
3110                         node, node->header.gran.procs));
3111     if (RtsFlags.GranFlags.GranSimStats.Global) {
3112       globalGranStats.tot_fake_fetches++;
3113     }
3114   }
3115
3116   bqe = q;
3117   // ToDo: check: ASSERT(CurrentProc==node_loc);
3118   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3119     //next = bqe->link;
3120     /* 
3121        bqe points to the current element in the queue
3122        next points to the next element in the queue
3123     */
3124     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3125     //tso_loc = where_is(tso);
3126     len++;
3127     bqe = unblockOneLocked(bqe, node);
3128   }
3129
3130   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3131      the closure to make room for the anchor of the BQ */
3132   if (bqe!=END_BQ_QUEUE) {
3133     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3134     /*
3135     ASSERT((info_ptr==&RBH_Save_0_info) ||
3136            (info_ptr==&RBH_Save_1_info) ||
3137            (info_ptr==&RBH_Save_2_info));
3138     */
3139     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3140     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3141     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3142
3143     IF_GRAN_DEBUG(bq,
3144                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3145                         node, info_type(node)));
3146   }
3147
3148   /* statistics gathering */
3149   if (RtsFlags.GranFlags.GranSimStats.Global) {
3150     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3151     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3152     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3153     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3154   }
3155   IF_GRAN_DEBUG(bq,
3156                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3157                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3158 }
3159 #elif defined(PARALLEL_HASKELL)
3160 void 
3161 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3162 {
3163   StgBlockingQueueElement *bqe;
3164
3165   ACQUIRE_LOCK(&sched_mutex);
3166
3167   IF_PAR_DEBUG(verbose, 
3168                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3169                      node, mytid));
3170 #ifdef DIST  
3171   //RFP
3172   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3173     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3174     return;
3175   }
3176 #endif
3177   
3178   ASSERT(q == END_BQ_QUEUE ||
3179          get_itbl(q)->type == TSO ||           
3180          get_itbl(q)->type == BLOCKED_FETCH || 
3181          get_itbl(q)->type == CONSTR); 
3182
3183   bqe = q;
3184   while (get_itbl(bqe)->type==TSO || 
3185          get_itbl(bqe)->type==BLOCKED_FETCH) {
3186     bqe = unblockOneLocked(bqe, node);
3187   }
3188   RELEASE_LOCK(&sched_mutex);
3189 }
3190
3191 #else   /* !GRAN && !PARALLEL_HASKELL */
3192
3193 void
3194 awakenBlockedQueueNoLock(StgTSO *tso)
3195 {
3196   while (tso != END_TSO_QUEUE) {
3197     tso = unblockOneLocked(tso);
3198   }
3199 }
3200
3201 void
3202 awakenBlockedQueue(StgTSO *tso)
3203 {
3204   ACQUIRE_LOCK(&sched_mutex);
3205   while (tso != END_TSO_QUEUE) {
3206     tso = unblockOneLocked(tso);
3207   }
3208   RELEASE_LOCK(&sched_mutex);
3209 }
3210 #endif
3211
3212 /* ---------------------------------------------------------------------------
3213    Interrupt execution
3214    - usually called inside a signal handler so it mustn't do anything fancy.   
3215    ------------------------------------------------------------------------ */
3216
3217 void
3218 interruptStgRts(void)
3219 {
3220     interrupted    = 1;
3221     context_switch = 1;
3222 }
3223
3224 /* -----------------------------------------------------------------------------
3225    Unblock a thread
3226
3227    This is for use when we raise an exception in another thread, which
3228    may be blocked.
3229    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3230    -------------------------------------------------------------------------- */
3231
3232 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3233 /*
3234   NB: only the type of the blocking queue is different in GranSim and GUM
3235       the operations on the queue-elements are the same
3236       long live polymorphism!
3237
3238   Locks: sched_mutex is held upon entry and exit.
3239
3240 */
3241 static void
3242 unblockThread(StgTSO *tso)
3243 {
3244   StgBlockingQueueElement *t, **last;
3245
3246   switch (tso->why_blocked) {
3247
3248   case NotBlocked:
3249     return;  /* not blocked */
3250
3251   case BlockedOnSTM:
3252     // Be careful: nothing to do here!  We tell the scheduler that the thread
3253     // is runnable and we leave it to the stack-walking code to abort the 
3254     // transaction while unwinding the stack.  We should perhaps have a debugging
3255     // test to make sure that this really happens and that the 'zombie' transaction
3256     // does not get committed.
3257     goto done;
3258
3259   case BlockedOnMVar:
3260     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3261     {
3262       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3263       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3264
3265       last = (StgBlockingQueueElement **)&mvar->head;
3266       for (t = (StgBlockingQueueElement *)mvar->head; 
3267            t != END_BQ_QUEUE; 
3268            last = &t->link, last_tso = t, t = t->link) {
3269         if (t == (StgBlockingQueueElement *)tso) {
3270           *last = (StgBlockingQueueElement *)tso->link;
3271           if (mvar->tail == tso) {
3272             mvar->tail = (StgTSO *)last_tso;
3273           }
3274           goto done;
3275         }
3276       }
3277       barf("unblockThread (MVAR): TSO not found");
3278     }
3279
3280   case BlockedOnBlackHole:
3281     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3282     {
3283       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3284
3285       last = &bq->blocking_queue;
3286       for (t = bq->blocking_queue; 
3287            t != END_BQ_QUEUE; 
3288            last = &t->link, t = t->link) {
3289         if (t == (StgBlockingQueueElement *)tso) {
3290           *last = (StgBlockingQueueElement *)tso->link;
3291           goto done;
3292         }
3293       }
3294       barf("unblockThread (BLACKHOLE): TSO not found");
3295     }
3296
3297   case BlockedOnException:
3298     {
3299       StgTSO *target  = tso->block_info.tso;
3300
3301       ASSERT(get_itbl(target)->type == TSO);
3302
3303       if (target->what_next == ThreadRelocated) {
3304           target = target->link;
3305           ASSERT(get_itbl(target)->type == TSO);
3306       }
3307
3308       ASSERT(target->blocked_exceptions != NULL);
3309
3310       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3311       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3312            t != END_BQ_QUEUE; 
3313            last = &t->link, t = t->link) {
3314         ASSERT(get_itbl(t)->type == TSO);
3315         if (t == (StgBlockingQueueElement *)tso) {
3316           *last = (StgBlockingQueueElement *)tso->link;
3317           goto done;
3318         }
3319       }
3320       barf("unblockThread (Exception): TSO not found");
3321     }
3322
3323   case BlockedOnRead:
3324   case BlockedOnWrite:
3325 #if defined(mingw32_HOST_OS)
3326   case BlockedOnDoProc:
3327 #endif
3328     {
3329       /* take TSO off blocked_queue */
3330       StgBlockingQueueElement *prev = NULL;
3331       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3332            prev = t, t = t->link) {
3333         if (t == (StgBlockingQueueElement *)tso) {
3334           if (prev == NULL) {
3335             blocked_queue_hd = (StgTSO *)t->link;
3336             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3337               blocked_queue_tl = END_TSO_QUEUE;
3338             }
3339           } else {
3340             prev->link = t->link;
3341             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3342               blocked_queue_tl = (StgTSO *)prev;
3343             }
3344           }
3345           goto done;
3346         }
3347       }
3348       barf("unblockThread (I/O): TSO not found");
3349     }
3350
3351   case BlockedOnDelay:
3352     {
3353       /* take TSO off sleeping_queue */
3354       StgBlockingQueueElement *prev = NULL;
3355       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3356            prev = t, t = t->link) {
3357         if (t == (StgBlockingQueueElement *)tso) {
3358           if (prev == NULL) {
3359             sleeping_queue = (StgTSO *)t->link;
3360           } else {
3361             prev->link = t->link;
3362           }
3363           goto done;
3364         }
3365       }
3366       barf("unblockThread (delay): TSO not found");
3367     }
3368
3369   default:
3370     barf("unblockThread");
3371   }
3372
3373  done:
3374   tso->link = END_TSO_QUEUE;
3375   tso->why_blocked = NotBlocked;
3376   tso->block_info.closure = NULL;
3377   PUSH_ON_RUN_QUEUE(tso);
3378 }
3379 #else
3380 static void
3381 unblockThread(StgTSO *tso)
3382 {
3383   StgTSO *t, **last;
3384   
3385   /* To avoid locking unnecessarily. */
3386   if (tso->why_blocked == NotBlocked) {
3387     return;
3388   }
3389
3390   switch (tso->why_blocked) {
3391
3392   case BlockedOnSTM:
3393     // Be careful: nothing to do here!  We tell the scheduler that the thread
3394     // is runnable and we leave it to the stack-walking code to abort the 
3395     // transaction while unwinding the stack.  We should perhaps have a debugging
3396     // test to make sure that this really happens and that the 'zombie' transaction
3397     // does not get committed.
3398     goto done;
3399
3400   case BlockedOnMVar:
3401     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3402     {
3403       StgTSO *last_tso = END_TSO_QUEUE;
3404       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3405
3406       last = &mvar->head;
3407       for (t = mvar->head; t != END_TSO_QUEUE; 
3408            last = &t->link, last_tso = t, t = t->link) {
3409         if (t == tso) {
3410           *last = tso->link;
3411           if (mvar->tail == tso) {
3412             mvar->tail = last_tso;
3413           }
3414           goto done;
3415         }
3416       }
3417       barf("unblockThread (MVAR): TSO not found");
3418     }
3419
3420   case BlockedOnBlackHole:
3421     {
3422       last = &blackhole_queue;
3423       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3424            last = &t->link, t = t->link) {
3425         if (t == tso) {
3426           *last = tso->link;
3427           goto done;
3428         }
3429       }
3430       barf("unblockThread (BLACKHOLE): TSO not found");
3431     }
3432
3433   case BlockedOnException:
3434     {
3435       StgTSO *target  = tso->block_info.tso;
3436
3437       ASSERT(get_itbl(target)->type == TSO);
3438
3439       while (target->what_next == ThreadRelocated) {
3440           target = target->link;
3441           ASSERT(get_itbl(target)->type == TSO);
3442       }
3443       
3444       ASSERT(target->blocked_exceptions != NULL);
3445
3446       last = &target->blocked_exceptions;
3447       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3448            last = &t->link, t = t->link) {
3449         ASSERT(get_itbl(t)->type == TSO);
3450         if (t == tso) {
3451           *last = tso->link;
3452           goto done;
3453         }
3454       }
3455       barf("unblockThread (Exception): TSO not found");
3456     }
3457
3458   case BlockedOnRead:
3459   case BlockedOnWrite:
3460 #if defined(mingw32_HOST_OS)
3461   case BlockedOnDoProc:
3462 #endif
3463     {
3464       StgTSO *prev = NULL;
3465       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3466            prev = t, t = t->link) {
3467         if (t == tso) {
3468           if (prev == NULL) {
3469             blocked_queue_hd = t->link;
3470             if (blocked_queue_tl == t) {
3471               blocked_queue_tl = END_TSO_QUEUE;
3472             }
3473           } else {
3474             prev->link = t->link;
3475             if (blocked_queue_tl == t) {
3476               blocked_queue_tl = prev;
3477             }
3478           }
3479           goto done;
3480         }
3481       }
3482       barf("unblockThread (I/O): TSO not found");
3483     }
3484
3485   case BlockedOnDelay:
3486     {
3487       StgTSO *prev = NULL;
3488       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3489            prev = t, t = t->link) {
3490         if (t == tso) {
3491           if (prev == NULL) {
3492             sleeping_queue = t->link;
3493           } else {
3494             prev->link = t->link;
3495           }
3496           goto done;
3497         }
3498       }
3499       barf("unblockThread (delay): TSO not found");
3500     }
3501
3502   default:
3503     barf("unblockThread");
3504   }
3505
3506  done:
3507   tso->link = END_TSO_QUEUE;
3508   tso->why_blocked = NotBlocked;
3509   tso->block_info.closure = NULL;
3510   APPEND_TO_RUN_QUEUE(tso);
3511 }
3512 #endif
3513
3514 /* -----------------------------------------------------------------------------
3515  * checkBlackHoles()
3516  *
3517  * Check the blackhole_queue for threads that can be woken up.  We do
3518  * this periodically: before every GC, and whenever the run queue is
3519  * empty.
3520  *
3521  * An elegant solution might be to just wake up all the blocked
3522  * threads with awakenBlockedQueue occasionally: they'll go back to
3523  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3524  * doesn't give us a way to tell whether we've actually managed to
3525  * wake up any threads, so we would be busy-waiting.
3526  *
3527  * -------------------------------------------------------------------------- */
3528
3529 static rtsBool
3530 checkBlackHoles( void )
3531 {
3532     StgTSO **prev, *t;
3533     rtsBool any_woke_up = rtsFalse;
3534     StgHalfWord type;
3535
3536     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3537
3538     // ASSUMES: sched_mutex
3539     prev = &blackhole_queue;
3540     t = blackhole_queue;
3541     while (t != END_TSO_QUEUE) {
3542         ASSERT(t->why_blocked == BlockedOnBlackHole);
3543         type = get_itbl(t->block_info.closure)->type;
3544         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3545             t = unblockOneLocked(t);
3546             *prev = t;
3547             any_woke_up = rtsTrue;
3548         } else {
3549             prev = &t->link;
3550             t = t->link;
3551         }
3552     }
3553
3554     return any_woke_up;
3555 }
3556
3557 /* -----------------------------------------------------------------------------
3558  * raiseAsync()
3559  *
3560  * The following function implements the magic for raising an
3561  * asynchronous exception in an existing thread.
3562  *
3563  * We first remove the thread from any queue on which it might be
3564  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3565  *
3566  * We strip the stack down to the innermost CATCH_FRAME, building
3567  * thunks in the heap for all the active computations, so they can 
3568  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3569  * an application of the handler to the exception, and push it on
3570  * the top of the stack.
3571  * 
3572  * How exactly do we save all the active computations?  We create an
3573  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3574  * AP_STACKs pushes everything from the corresponding update frame
3575  * upwards onto the stack.  (Actually, it pushes everything up to the
3576  * next update frame plus a pointer to the next AP_STACK object.
3577  * Entering the next AP_STACK object pushes more onto the stack until we
3578  * reach the last AP_STACK object - at which point the stack should look
3579  * exactly as it did when we killed the TSO and we can continue
3580  * execution by entering the closure on top of the stack.
3581  *
3582  * We can also kill a thread entirely - this happens if either (a) the 
3583  * exception passed to raiseAsync is NULL, or (b) there's no
3584  * CATCH_FRAME on the stack.  In either case, we strip the entire
3585  * stack and replace the thread with a zombie.
3586  *
3587  * Locks: sched_mutex held upon entry nor exit.
3588  *
3589  * -------------------------------------------------------------------------- */
3590  
3591 void 
3592 deleteThread(StgTSO *tso)
3593 {
3594   if (tso->why_blocked != BlockedOnCCall &&
3595       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3596       raiseAsync(tso,NULL);
3597   }
3598 }
3599
3600 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3601 static void 
3602 deleteThreadImmediately(StgTSO *tso)
3603 { // for forkProcess only:
3604   // delete thread without giving it a chance to catch the KillThread exception
3605
3606   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3607       return;
3608   }
3609
3610   if (tso->why_blocked != BlockedOnCCall &&
3611       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3612     unblockThread(tso);
3613   }
3614
3615   tso->what_next = ThreadKilled;
3616 }
3617 #endif
3618
3619 void
3620 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3621 {
3622   /* When raising async exs from contexts where sched_mutex isn't held;
3623      use raiseAsyncWithLock(). */
3624   ACQUIRE_LOCK(&sched_mutex);
3625   raiseAsync(tso,exception);
3626   RELEASE_LOCK(&sched_mutex);
3627 }
3628
3629 void
3630 raiseAsync(StgTSO *tso, StgClosure *exception)
3631 {
3632     raiseAsync_(tso, exception, rtsFalse);
3633 }
3634
3635 static void
3636 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3637 {
3638     StgRetInfoTable *info;
3639     StgPtr sp;
3640   
3641     // Thread already dead?
3642     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3643         return;
3644     }
3645
3646     IF_DEBUG(scheduler, 
3647              sched_belch("raising exception in thread %ld.", (long)tso->id));
3648     
3649     // Remove it from any blocking queues
3650     unblockThread(tso);
3651
3652     sp = tso->sp;
3653     
3654     // The stack freezing code assumes there's a closure pointer on
3655     // the top of the stack, so we have to arrange that this is the case...
3656     //
3657     if (sp[0] == (W_)&stg_enter_info) {
3658         sp++;
3659     } else {
3660         sp--;
3661         sp[0] = (W_)&stg_dummy_ret_closure;
3662     }
3663
3664     while (1) {
3665         nat i;
3666
3667         // 1. Let the top of the stack be the "current closure"
3668         //
3669         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3670         // CATCH_FRAME.
3671         //
3672         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3673         // current closure applied to the chunk of stack up to (but not
3674         // including) the update frame.  This closure becomes the "current
3675         // closure".  Go back to step 2.
3676         //
3677         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3678         // top of the stack applied to the exception.
3679         // 
3680         // 5. If it's a STOP_FRAME, then kill the thread.
3681         // 
3682         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3683         // transaction
3684        
3685         
3686         StgPtr frame;
3687         
3688         frame = sp + 1;
3689         info = get_ret_itbl((StgClosure *)frame);
3690         
3691         while (info->i.type != UPDATE_FRAME
3692                && (info->i.type != CATCH_FRAME || exception == NULL)
3693                && info->i.type != STOP_FRAME
3694                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3695         {
3696             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3697               // IF we find an ATOMICALLY_FRAME then we abort the
3698               // current transaction and propagate the exception.  In
3699               // this case (unlike ordinary exceptions) we do not care
3700               // whether the transaction is valid or not because its
3701               // possible validity cannot have caused the exception
3702               // and will not be visible after the abort.
3703               IF_DEBUG(stm,
3704                        debugBelch("Found atomically block delivering async exception\n"));
3705               stmAbortTransaction(tso -> trec);
3706               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3707             }
3708             frame += stack_frame_sizeW((StgClosure *)frame);
3709             info = get_ret_itbl((StgClosure *)frame);
3710         }
3711         
3712         switch (info->i.type) {
3713             
3714         case ATOMICALLY_FRAME:
3715             ASSERT(stop_at_atomically);
3716             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3717             stmCondemnTransaction(tso -> trec);
3718 #ifdef REG_R1
3719             tso->sp = frame;
3720 #else
3721             // R1 is not a register: the return convention for IO in
3722             // this case puts the return value on the stack, so we
3723             // need to set up the stack to return to the atomically
3724             // frame properly...
3725             tso->sp = frame - 2;
3726             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3727             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3728 #endif
3729             tso->what_next = ThreadRunGHC;
3730             return;
3731
3732         case CATCH_FRAME:
3733             // If we find a CATCH_FRAME, and we've got an exception to raise,
3734             // then build the THUNK raise(exception), and leave it on
3735             // top of the CATCH_FRAME ready to enter.
3736             //
3737         {
3738 #ifdef PROFILING
3739             StgCatchFrame *cf = (StgCatchFrame *)frame;
3740 #endif
3741             StgClosure *raise;
3742             
3743             // we've got an exception to raise, so let's pass it to the
3744             // handler in this frame.
3745             //
3746             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3747             TICK_ALLOC_SE_THK(1,0);
3748             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3749             raise->payload[0] = exception;
3750             
3751             // throw away the stack from Sp up to the CATCH_FRAME.
3752             //
3753             sp = frame - 1;
3754             
3755             /* Ensure that async excpetions are blocked now, so we don't get
3756              * a surprise exception before we get around to executing the
3757              * handler.
3758              */
3759             if (tso->blocked_exceptions == NULL) {
3760                 tso->blocked_exceptions = END_TSO_QUEUE;
3761             }
3762             
3763             /* Put the newly-built THUNK on top of the stack, ready to execute
3764              * when the thread restarts.
3765              */
3766             sp[0] = (W_)raise;
3767             sp[-1] = (W_)&stg_enter_info;
3768             tso->sp = sp-1;
3769             tso->what_next = ThreadRunGHC;
3770             IF_DEBUG(sanity, checkTSO(tso));
3771             return;
3772         }
3773         
3774         case UPDATE_FRAME:
3775         {
3776             StgAP_STACK * ap;
3777             nat words;
3778             
3779             // First build an AP_STACK consisting of the stack chunk above the
3780             // current update frame, with the top word on the stack as the
3781             // fun field.
3782             //
3783             words = frame - sp - 1;
3784             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3785             
3786             ap->size = words;
3787             ap->fun  = (StgClosure *)sp[0];
3788             sp++;
3789             for(i=0; i < (nat)words; ++i) {
3790                 ap->payload[i] = (StgClosure *)*sp++;
3791             }
3792             
3793             SET_HDR(ap,&stg_AP_STACK_info,
3794                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3795             TICK_ALLOC_UP_THK(words+1,0);
3796             
3797             IF_DEBUG(scheduler,
3798                      debugBelch("sched: Updating ");
3799                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3800                      debugBelch(" with ");
3801                      printObj((StgClosure *)ap);
3802                 );
3803
3804             // Replace the updatee with an indirection - happily
3805             // this will also wake up any threads currently
3806             // waiting on the result.
3807             //
3808             // Warning: if we're in a loop, more than one update frame on
3809             // the stack may point to the same object.  Be careful not to
3810             // overwrite an IND_OLDGEN in this case, because we'll screw
3811             // up the mutable lists.  To be on the safe side, don't
3812             // overwrite any kind of indirection at all.  See also
3813             // threadSqueezeStack in GC.c, where we have to make a similar
3814             // check.
3815             //
3816             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3817                 // revert the black hole
3818                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3819                                (StgClosure *)ap);
3820             }
3821             sp += sizeofW(StgUpdateFrame) - 1;
3822             sp[0] = (W_)ap; // push onto stack
3823             break;
3824         }
3825         
3826         case STOP_FRAME:
3827             // We've stripped the entire stack, the thread is now dead.
3828             sp += sizeofW(StgStopFrame);
3829             tso->what_next = ThreadKilled;
3830             tso->sp = sp;
3831             return;
3832             
3833         default:
3834             barf("raiseAsync");
3835         }
3836     }
3837     barf("raiseAsync");
3838 }
3839
3840 /* -----------------------------------------------------------------------------
3841    raiseExceptionHelper
3842    
3843    This function is called by the raise# primitve, just so that we can
3844    move some of the tricky bits of raising an exception from C-- into
3845    C.  Who knows, it might be a useful re-useable thing here too.
3846    -------------------------------------------------------------------------- */
3847
3848 StgWord
3849 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3850 {
3851     StgClosure *raise_closure = NULL;
3852     StgPtr p, next;
3853     StgRetInfoTable *info;
3854     //
3855     // This closure represents the expression 'raise# E' where E
3856     // is the exception raise.  It is used to overwrite all the
3857     // thunks which are currently under evaluataion.
3858     //
3859
3860     //    
3861     // LDV profiling: stg_raise_info has THUNK as its closure
3862     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3863     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3864     // 1 does not cause any problem unless profiling is performed.
3865     // However, when LDV profiling goes on, we need to linearly scan
3866     // small object pool, where raise_closure is stored, so we should
3867     // use MIN_UPD_SIZE.
3868     //
3869     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3870     //                                 sizeofW(StgClosure)+1);
3871     //
3872
3873     //
3874     // Walk up the stack, looking for the catch frame.  On the way,
3875     // we update any closures pointed to from update frames with the
3876     // raise closure that we just built.
3877     //
3878     p = tso->sp;
3879     while(1) {
3880         info = get_ret_itbl((StgClosure *)p);
3881         next = p + stack_frame_sizeW((StgClosure *)p);
3882         switch (info->i.type) {
3883             
3884         case UPDATE_FRAME:
3885             // Only create raise_closure if we need to.
3886             if (raise_closure == NULL) {
3887                 raise_closure = 
3888                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3889                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3890                 raise_closure->payload[0] = exception;
3891             }
3892             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3893             p = next;
3894             continue;
3895
3896         case ATOMICALLY_FRAME:
3897             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3898             tso->sp = p;
3899             return ATOMICALLY_FRAME;
3900             
3901         case CATCH_FRAME:
3902             tso->sp = p;
3903             return CATCH_FRAME;
3904
3905         case CATCH_STM_FRAME:
3906             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3907             tso->sp = p;
3908             return CATCH_STM_FRAME;
3909             
3910         case STOP_FRAME:
3911             tso->sp = p;
3912             return STOP_FRAME;
3913
3914         case CATCH_RETRY_FRAME:
3915         default:
3916             p = next; 
3917             continue;
3918         }
3919     }
3920 }
3921
3922
3923 /* -----------------------------------------------------------------------------
3924    findRetryFrameHelper
3925
3926    This function is called by the retry# primitive.  It traverses the stack
3927    leaving tso->sp referring to the frame which should handle the retry.  
3928
3929    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3930    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3931
3932    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3933    despite the similar implementation.
3934
3935    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3936    not be created within memory transactions.
3937    -------------------------------------------------------------------------- */
3938
3939 StgWord
3940 findRetryFrameHelper (StgTSO *tso)
3941 {
3942   StgPtr           p, next;
3943   StgRetInfoTable *info;
3944
3945   p = tso -> sp;
3946   while (1) {
3947     info = get_ret_itbl((StgClosure *)p);
3948     next = p + stack_frame_sizeW((StgClosure *)p);
3949     switch (info->i.type) {
3950       
3951     case ATOMICALLY_FRAME:
3952       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3953       tso->sp = p;
3954       return ATOMICALLY_FRAME;
3955       
3956     case CATCH_RETRY_FRAME:
3957       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3958       tso->sp = p;
3959       return CATCH_RETRY_FRAME;
3960       
3961     case CATCH_STM_FRAME:
3962     default:
3963       ASSERT(info->i.type != CATCH_FRAME);
3964       ASSERT(info->i.type != STOP_FRAME);
3965       p = next; 
3966       continue;
3967     }
3968   }
3969 }
3970
3971 /* -----------------------------------------------------------------------------
3972    resurrectThreads is called after garbage collection on the list of
3973    threads found to be garbage.  Each of these threads will be woken
3974    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3975    on an MVar, or NonTermination if the thread was blocked on a Black
3976    Hole.
3977
3978    Locks: sched_mutex isn't held upon entry nor exit.
3979    -------------------------------------------------------------------------- */
3980
3981 void
3982 resurrectThreads( StgTSO *threads )
3983 {
3984   StgTSO *tso, *next;
3985
3986   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3987     next = tso->global_link;
3988     tso->global_link = all_threads;
3989     all_threads = tso;
3990     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3991
3992     switch (tso->why_blocked) {
3993     case BlockedOnMVar:
3994     case BlockedOnException:
3995       /* Called by GC - sched_mutex lock is currently held. */
3996       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3997       break;
3998     case BlockedOnBlackHole:
3999       raiseAsync(tso,(StgClosure *)NonTermination_closure);
4000       break;
4001     case BlockedOnSTM:
4002       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
4003       break;
4004     case NotBlocked:
4005       /* This might happen if the thread was blocked on a black hole
4006        * belonging to a thread that we've just woken up (raiseAsync
4007        * can wake up threads, remember...).
4008        */
4009       continue;
4010     default:
4011       barf("resurrectThreads: thread blocked in a strange way");
4012     }
4013   }
4014 }
4015
4016 /* ----------------------------------------------------------------------------
4017  * Debugging: why is a thread blocked
4018  * [Also provides useful information when debugging threaded programs
4019  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4020    ------------------------------------------------------------------------- */
4021
4022 static void
4023 printThreadBlockage(StgTSO *tso)
4024 {
4025   switch (tso->why_blocked) {
4026   case BlockedOnRead:
4027     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4028     break;
4029   case BlockedOnWrite:
4030     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4031     break;
4032 #if defined(mingw32_HOST_OS)
4033     case BlockedOnDoProc:
4034     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4035     break;
4036 #endif
4037   case BlockedOnDelay:
4038     debugBelch("is blocked until %ld", tso->block_info.target);
4039     break;
4040   case BlockedOnMVar:
4041     debugBelch("is blocked on an MVar");
4042     break;
4043   case BlockedOnException:
4044     debugBelch("is blocked on delivering an exception to thread %d",
4045             tso->block_info.tso->id);
4046     break;
4047   case BlockedOnBlackHole:
4048     debugBelch("is blocked on a black hole");
4049     break;
4050   case NotBlocked:
4051     debugBelch("is not blocked");
4052     break;
4053 #if defined(PARALLEL_HASKELL)
4054   case BlockedOnGA:
4055     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4056             tso->block_info.closure, info_type(tso->block_info.closure));
4057     break;
4058   case BlockedOnGA_NoSend:
4059     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4060             tso->block_info.closure, info_type(tso->block_info.closure));
4061     break;
4062 #endif
4063   case BlockedOnCCall:
4064     debugBelch("is blocked on an external call");
4065     break;
4066   case BlockedOnCCall_NoUnblockExc:
4067     debugBelch("is blocked on an external call (exceptions were already blocked)");
4068     break;
4069   case BlockedOnSTM:
4070     debugBelch("is blocked on an STM operation");
4071     break;
4072   default:
4073     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4074          tso->why_blocked, tso->id, tso);
4075   }
4076 }
4077
4078 static void
4079 printThreadStatus(StgTSO *tso)
4080 {
4081   switch (tso->what_next) {
4082   case ThreadKilled:
4083     debugBelch("has been killed");
4084     break;
4085   case ThreadComplete:
4086     debugBelch("has completed");
4087     break;
4088   default:
4089     printThreadBlockage(tso);
4090   }
4091 }
4092
4093 void
4094 printAllThreads(void)
4095 {
4096   StgTSO *t;
4097
4098 # if defined(GRAN)
4099   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4100   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4101                        time_string, rtsFalse/*no commas!*/);
4102
4103   debugBelch("all threads at [%s]:\n", time_string);
4104 # elif defined(PARALLEL_HASKELL)
4105   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4106   ullong_format_string(CURRENT_TIME,
4107                        time_string, rtsFalse/*no commas!*/);
4108
4109   debugBelch("all threads at [%s]:\n", time_string);
4110 # else
4111   debugBelch("all threads:\n");
4112 # endif
4113
4114   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4115     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4116 #if defined(DEBUG)
4117     {
4118       void *label = lookupThreadLabel(t->id);
4119       if (label) debugBelch("[\"%s\"] ",(char *)label);
4120     }
4121 #endif
4122     printThreadStatus(t);
4123     debugBelch("\n");
4124   }
4125 }
4126     
4127 #ifdef DEBUG
4128
4129 /* 
4130    Print a whole blocking queue attached to node (debugging only).
4131 */
4132 # if defined(PARALLEL_HASKELL)
4133 void 
4134 print_bq (StgClosure *node)
4135 {
4136   StgBlockingQueueElement *bqe;
4137   StgTSO *tso;
4138   rtsBool end;
4139
4140   debugBelch("## BQ of closure %p (%s): ",
4141           node, info_type(node));
4142
4143   /* should cover all closures that may have a blocking queue */
4144   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4145          get_itbl(node)->type == FETCH_ME_BQ ||
4146          get_itbl(node)->type == RBH ||
4147          get_itbl(node)->type == MVAR);
4148     
4149   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4150
4151   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4152 }
4153
4154 /* 
4155    Print a whole blocking queue starting with the element bqe.
4156 */
4157 void 
4158 print_bqe (StgBlockingQueueElement *bqe)
4159 {
4160   rtsBool end;
4161
4162   /* 
4163      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4164   */
4165   for (end = (bqe==END_BQ_QUEUE);
4166        !end; // iterate until bqe points to a CONSTR
4167        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4168        bqe = end ? END_BQ_QUEUE : bqe->link) {
4169     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4170     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4171     /* types of closures that may appear in a blocking queue */
4172     ASSERT(get_itbl(bqe)->type == TSO ||           
4173            get_itbl(bqe)->type == BLOCKED_FETCH || 
4174            get_itbl(bqe)->type == CONSTR); 
4175     /* only BQs of an RBH end with an RBH_Save closure */
4176     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4177
4178     switch (get_itbl(bqe)->type) {
4179     case TSO:
4180       debugBelch(" TSO %u (%x),",
4181               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4182       break;
4183     case BLOCKED_FETCH:
4184       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4185               ((StgBlockedFetch *)bqe)->node, 
4186               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4187               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4188               ((StgBlockedFetch *)bqe)->ga.weight);
4189       break;
4190     case CONSTR:
4191       debugBelch(" %s (IP %p),",
4192               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4193                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4194                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4195                "RBH_Save_?"), get_itbl(bqe));
4196       break;
4197     default:
4198       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4199            info_type((StgClosure *)bqe)); // , node, info_type(node));
4200       break;
4201     }
4202   } /* for */
4203   debugBelch("\n");
4204 }
4205 # elif defined(GRAN)
4206 void 
4207 print_bq (StgClosure *node)
4208 {
4209   StgBlockingQueueElement *bqe;
4210   PEs node_loc, tso_loc;
4211   rtsBool end;
4212
4213   /* should cover all closures that may have a blocking queue */
4214   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4215          get_itbl(node)->type == FETCH_ME_BQ ||
4216          get_itbl(node)->type == RBH);
4217     
4218   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4219   node_loc = where_is(node);
4220
4221   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4222           node, info_type(node), node_loc);
4223
4224   /* 
4225      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4226   */
4227   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4228        !end; // iterate until bqe points to a CONSTR
4229        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4230     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4231     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4232     /* types of closures that may appear in a blocking queue */
4233     ASSERT(get_itbl(bqe)->type == TSO ||           
4234            get_itbl(bqe)->type == CONSTR); 
4235     /* only BQs of an RBH end with an RBH_Save closure */
4236     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4237
4238     tso_loc = where_is((StgClosure *)bqe);
4239     switch (get_itbl(bqe)->type) {
4240     case TSO:
4241       debugBelch(" TSO %d (%p) on [PE %d],",
4242               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4243       break;
4244     case CONSTR:
4245       debugBelch(" %s (IP %p),",
4246               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4247                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4248                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4249                "RBH_Save_?"), get_itbl(bqe));
4250       break;
4251     default:
4252       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4253            info_type((StgClosure *)bqe), node, info_type(node));
4254       break;
4255     }
4256   } /* for */
4257   debugBelch("\n");
4258 }
4259 # endif
4260
4261 #if defined(PARALLEL_HASKELL)
4262 static nat
4263 run_queue_len(void)
4264 {
4265   nat i;
4266   StgTSO *tso;
4267
4268   for (i=0, tso=run_queue_hd; 
4269        tso != END_TSO_QUEUE;
4270        i++, tso=tso->link)
4271     /* nothing */
4272
4273   return i;
4274 }
4275 #endif
4276
4277 void
4278 sched_belch(char *s, ...)
4279 {
4280   va_list ap;
4281   va_start(ap,s);
4282 #ifdef RTS_SUPPORTS_THREADS
4283   debugBelch("sched (task %p): ", osThreadId());
4284 #elif defined(PARALLEL_HASKELL)
4285   debugBelch("== ");
4286 #else
4287   debugBelch("sched: ");
4288 #endif
4289   vdebugBelch(s, ap);
4290   debugBelch("\n");
4291   va_end(ap);
4292 }
4293
4294 #endif /* DEBUG */