bfac5745003ef1872dcfc958212d2629c6b0b95d
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* if this flag is set as well, give up execution */
178 rtsBool interrupted = rtsFalse;
179
180 /* If this flag is set, we are running Haskell code.  Used to detect
181  * uses of 'foreign import unsafe' that should be 'safe'.
182  */
183 static rtsBool in_haskell = rtsFalse;
184
185 /* Next thread ID to allocate.
186  * Locks required: thread_id_mutex
187  */
188 static StgThreadID next_thread_id = 1;
189
190 /*
191  * Pointers to the state of the current thread.
192  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
194  */
195  
196 /* The smallest stack size that makes any sense is:
197  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
198  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
199  *  + 1                       (the closure to enter)
200  *  + 1                       (stg_ap_v_ret)
201  *  + 1                       (spare slot req'd by stg_ap_v_ret)
202  *
203  * A thread with this stack will bomb immediately with a stack
204  * overflow, which will increase its stack size.  
205  */
206
207 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
208
209
210 #if defined(GRAN)
211 StgTSO *CurrentTSO;
212 #endif
213
214 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
215  *  exists - earlier gccs apparently didn't.
216  *  -= chak
217  */
218 StgTSO dummy_tso;
219
220 /*
221  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
222  * in an MT setting, needed to signal that a worker thread shouldn't hang around
223  * in the scheduler when it is out of work.
224  */
225 static rtsBool shutting_down_scheduler = rtsFalse;
226
227 #if defined(RTS_SUPPORTS_THREADS)
228 /* ToDo: carefully document the invariants that go together
229  *       with these synchronisation objects.
230  */
231 Mutex     sched_mutex       = INIT_MUTEX_VAR;
232 Mutex     term_mutex        = INIT_MUTEX_VAR;
233
234 #endif /* RTS_SUPPORTS_THREADS */
235
236 #if defined(PARALLEL_HASKELL)
237 StgTSO *LastTSO;
238 rtsTime TimeOfLastYield;
239 rtsBool emitSchedule = rtsTrue;
240 #endif
241
242 #if DEBUG
243 static char *whatNext_strs[] = {
244   "(unknown)",
245   "ThreadRunGHC",
246   "ThreadInterpret",
247   "ThreadKilled",
248   "ThreadRelocated",
249   "ThreadComplete"
250 };
251 #endif
252
253 /* -----------------------------------------------------------------------------
254  * static function prototypes
255  * -------------------------------------------------------------------------- */
256
257 #if defined(RTS_SUPPORTS_THREADS)
258 static void taskStart(void);
259 #endif
260
261 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
262                       Capability *initialCapability );
263
264 //
265 // These function all encapsulate parts of the scheduler loop, and are
266 // abstracted only to make the structure and control flow of the
267 // scheduler clearer.
268 //
269 static void schedulePreLoop(void);
270 static void scheduleStartSignalHandlers(void);
271 static void scheduleCheckBlockedThreads(void);
272 static void scheduleCheckBlackHoles(void);
273 static void scheduleDetectDeadlock(void);
274 #if defined(GRAN)
275 static StgTSO *scheduleProcessEvent(rtsEvent *event);
276 #endif
277 #if defined(PARALLEL_HASKELL)
278 static StgTSO *scheduleSendPendingMessages(void);
279 static void scheduleActivateSpark(void);
280 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
281 #endif
282 #if defined(PAR) || defined(GRAN)
283 static void scheduleGranParReport(void);
284 #endif
285 static void schedulePostRunThread(void);
286 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
287 static void scheduleHandleStackOverflow( StgTSO *t);
288 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
289 static void scheduleHandleThreadBlocked( StgTSO *t );
290 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
291                                              Capability *cap, StgTSO *t );
292 static rtsBool scheduleDoHeapProfile(void);
293 static void scheduleDoGC(Capability *cap);
294
295 static void unblockThread(StgTSO *tso);
296 static rtsBool checkBlackHoles(void);
297 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
298                                    Capability *initialCapability
299                                    );
300 static void scheduleThread_ (StgTSO* tso);
301 static void AllRoots(evac_fn evac);
302
303 static StgTSO *threadStackOverflow(StgTSO *tso);
304
305 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
306                         rtsBool stop_at_atomically);
307
308 static void printThreadBlockage(StgTSO *tso);
309 static void printThreadStatus(StgTSO *tso);
310
311 #if defined(PARALLEL_HASKELL)
312 StgTSO * createSparkThread(rtsSpark spark);
313 StgTSO * activateSpark (rtsSpark spark);  
314 #endif
315
316 /* ----------------------------------------------------------------------------
317  * Starting Tasks
318  * ------------------------------------------------------------------------- */
319
320 #if defined(RTS_SUPPORTS_THREADS)
321 static rtsBool startingWorkerThread = rtsFalse;
322
323 static void
324 taskStart(void)
325 {
326   ACQUIRE_LOCK(&sched_mutex);
327   startingWorkerThread = rtsFalse;
328   schedule(NULL,NULL);
329   taskStop();
330   RELEASE_LOCK(&sched_mutex);
331 }
332
333 void
334 startSchedulerTaskIfNecessary(void)
335 {
336     if ( !EMPTY_RUN_QUEUE()
337          && !shutting_down_scheduler // not if we're shutting down
338          && !startingWorkerThread)
339     {
340         // we don't want to start another worker thread
341         // just because the last one hasn't yet reached the
342         // "waiting for capability" state
343         startingWorkerThread = rtsTrue;
344         if (!maybeStartNewWorker(taskStart)) {
345             startingWorkerThread = rtsFalse;
346         }
347     }
348 }
349 #endif
350
351 /* -----------------------------------------------------------------------------
352  * Putting a thread on the run queue: different scheduling policies
353  * -------------------------------------------------------------------------- */
354
355 STATIC_INLINE void
356 addToRunQueue( StgTSO *t )
357 {
358 #if defined(PARALLEL_HASKELL)
359     if (RtsFlags.ParFlags.doFairScheduling) { 
360         // this does round-robin scheduling; good for concurrency
361         APPEND_TO_RUN_QUEUE(t);
362     } else {
363         // this does unfair scheduling; good for parallelism
364         PUSH_ON_RUN_QUEUE(t);
365     }
366 #else
367     // this does round-robin scheduling; good for concurrency
368     APPEND_TO_RUN_QUEUE(t);
369 #endif
370 }
371     
372 /* ---------------------------------------------------------------------------
373    Main scheduling loop.
374
375    We use round-robin scheduling, each thread returning to the
376    scheduler loop when one of these conditions is detected:
377
378       * out of heap space
379       * timer expires (thread yields)
380       * thread blocks
381       * thread ends
382       * stack overflow
383
384    Locking notes:  we acquire the scheduler lock once at the beginning
385    of the scheduler loop, and release it when
386     
387       * running a thread, or
388       * waiting for work, or
389       * waiting for a GC to complete.
390
391    GRAN version:
392      In a GranSim setup this loop iterates over the global event queue.
393      This revolves around the global event queue, which determines what 
394      to do next. Therefore, it's more complicated than either the 
395      concurrent or the parallel (GUM) setup.
396
397    GUM version:
398      GUM iterates over incoming messages.
399      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
400      and sends out a fish whenever it has nothing to do; in-between
401      doing the actual reductions (shared code below) it processes the
402      incoming messages and deals with delayed operations 
403      (see PendingFetches).
404      This is not the ugliest code you could imagine, but it's bloody close.
405
406    ------------------------------------------------------------------------ */
407
408 static void
409 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
410           Capability *initialCapability )
411 {
412   StgTSO *t;
413   Capability *cap;
414   StgThreadReturnCode ret;
415 #if defined(GRAN)
416   rtsEvent *event;
417 #elif defined(PARALLEL_HASKELL)
418   StgTSO *tso;
419   GlobalTaskId pe;
420   rtsBool receivedFinish = rtsFalse;
421 # if defined(DEBUG)
422   nat tp_size, sp_size; // stats only
423 # endif
424 #endif
425   nat prev_what_next;
426   rtsBool ready_to_gc;
427   
428   // Pre-condition: sched_mutex is held.
429   // We might have a capability, passed in as initialCapability.
430   cap = initialCapability;
431
432 #if !defined(RTS_SUPPORTS_THREADS)
433   // simply initialise it in the non-threaded case
434   grabCapability(&cap);
435 #endif
436
437   IF_DEBUG(scheduler,
438            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
439                        mainThread, initialCapability);
440       );
441
442   schedulePreLoop();
443
444   // -----------------------------------------------------------
445   // Scheduler loop starts here:
446
447 #if defined(PARALLEL_HASKELL)
448 #define TERMINATION_CONDITION        (!receivedFinish)
449 #elif defined(GRAN)
450 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
451 #else
452 #define TERMINATION_CONDITION        rtsTrue
453 #endif
454
455   while (TERMINATION_CONDITION) {
456
457 #if defined(GRAN)
458       /* Choose the processor with the next event */
459       CurrentProc = event->proc;
460       CurrentTSO = event->tso;
461 #endif
462
463       IF_DEBUG(scheduler, printAllThreads());
464
465 #if defined(RTS_SUPPORTS_THREADS)
466       // Yield the capability to higher-priority tasks if necessary.
467       //
468       if (cap != NULL) {
469           yieldCapability(&cap);
470       }
471
472       // If we do not currently hold a capability, we wait for one
473       //
474       if (cap == NULL) {
475           waitForCapability(&sched_mutex, &cap,
476                             mainThread ? &mainThread->bound_thread_cond : NULL);
477       }
478
479       // We now have a capability...
480 #endif
481
482     // Check whether we have re-entered the RTS from Haskell without
483     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
484     // call).
485     if (in_haskell) {
486           errorBelch("schedule: re-entered unsafely.\n"
487                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
488           stg_exit(1);
489     }
490
491     //
492     // Test for interruption.  If interrupted==rtsTrue, then either
493     // we received a keyboard interrupt (^C), or the scheduler is
494     // trying to shut down all the tasks (shutting_down_scheduler) in
495     // the threaded RTS.
496     //
497     if (interrupted) {
498         if (shutting_down_scheduler) {
499             IF_DEBUG(scheduler, sched_belch("shutting down"));
500             releaseCapability(cap);
501             if (mainThread) {
502                 mainThread->stat = Interrupted;
503                 mainThread->ret  = NULL;
504             }
505             return;
506         } else {
507             IF_DEBUG(scheduler, sched_belch("interrupted"));
508             deleteAllThreads();
509         }
510     }
511
512 #if defined(not_yet) && defined(SMP)
513     //
514     // Top up the run queue from our spark pool.  We try to make the
515     // number of threads in the run queue equal to the number of
516     // free capabilities.
517     //
518     {
519         StgClosure *spark;
520         if (EMPTY_RUN_QUEUE()) {
521             spark = findSpark(rtsFalse);
522             if (spark == NULL) {
523                 break; /* no more sparks in the pool */
524             } else {
525                 createSparkThread(spark);         
526                 IF_DEBUG(scheduler,
527                          sched_belch("==^^ turning spark of closure %p into a thread",
528                                      (StgClosure *)spark));
529             }
530         }
531     }
532 #endif // SMP
533
534     scheduleStartSignalHandlers();
535
536     // Only check the black holes here if we've nothing else to do.
537     // During normal execution, the black hole list only gets checked
538     // at GC time, to avoid repeatedly traversing this possibly long
539     // list each time around the scheduler.
540     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
541
542     scheduleCheckBlockedThreads();
543
544     scheduleDetectDeadlock();
545
546     // Normally, the only way we can get here with no threads to
547     // run is if a keyboard interrupt received during 
548     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
549     // Additionally, it is not fatal for the
550     // threaded RTS to reach here with no threads to run.
551     //
552     // win32: might be here due to awaitEvent() being abandoned
553     // as a result of a console event having been delivered.
554     if ( EMPTY_RUN_QUEUE() ) {
555 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
556         ASSERT(interrupted);
557 #endif
558         continue; // nothing to do
559     }
560
561 #if defined(PARALLEL_HASKELL)
562     scheduleSendPendingMessages();
563     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
564         continue;
565
566 #if defined(SPARKS)
567     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
568 #endif
569
570     /* If we still have no work we need to send a FISH to get a spark
571        from another PE */
572     if (EMPTY_RUN_QUEUE()) {
573         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
574         ASSERT(rtsFalse); // should not happen at the moment
575     }
576     // from here: non-empty run queue.
577     //  TODO: merge above case with this, only one call processMessages() !
578     if (PacketsWaiting()) {  /* process incoming messages, if
579                                 any pending...  only in else
580                                 because getRemoteWork waits for
581                                 messages as well */
582         receivedFinish = processMessages();
583     }
584 #endif
585
586 #if defined(GRAN)
587     scheduleProcessEvent(event);
588 #endif
589
590     // 
591     // Get a thread to run
592     //
593     ASSERT(run_queue_hd != END_TSO_QUEUE);
594     POP_RUN_QUEUE(t);
595
596 #if defined(GRAN) || defined(PAR)
597     scheduleGranParReport(); // some kind of debuging output
598 #else
599     // Sanity check the thread we're about to run.  This can be
600     // expensive if there is lots of thread switching going on...
601     IF_DEBUG(sanity,checkTSO(t));
602 #endif
603
604 #if defined(RTS_SUPPORTS_THREADS)
605     // Check whether we can run this thread in the current task.
606     // If not, we have to pass our capability to the right task.
607     {
608       StgMainThread *m = t->main;
609       
610       if(m)
611       {
612         if(m == mainThread)
613         {
614           IF_DEBUG(scheduler,
615             sched_belch("### Running thread %d in bound thread", t->id));
616           // yes, the Haskell thread is bound to the current native thread
617         }
618         else
619         {
620           IF_DEBUG(scheduler,
621             sched_belch("### thread %d bound to another OS thread", t->id));
622           // no, bound to a different Haskell thread: pass to that thread
623           PUSH_ON_RUN_QUEUE(t);
624           passCapability(&m->bound_thread_cond);
625           continue;
626         }
627       }
628       else
629       {
630         if(mainThread != NULL)
631         // The thread we want to run is bound.
632         {
633           IF_DEBUG(scheduler,
634             sched_belch("### this OS thread cannot run thread %d", t->id));
635           // no, the current native thread is bound to a different
636           // Haskell thread, so pass it to any worker thread
637           PUSH_ON_RUN_QUEUE(t);
638           passCapabilityToWorker();
639           continue; 
640         }
641       }
642     }
643 #endif
644
645     cap->r.rCurrentTSO = t;
646     
647     /* context switches are now initiated by the timer signal, unless
648      * the user specified "context switch as often as possible", with
649      * +RTS -C0
650      */
651     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
652          && (run_queue_hd != END_TSO_QUEUE
653              || blocked_queue_hd != END_TSO_QUEUE
654              || sleeping_queue != END_TSO_QUEUE)))
655         context_switch = 1;
656
657 run_thread:
658
659     RELEASE_LOCK(&sched_mutex);
660
661     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
662                               (long)t->id, whatNext_strs[t->what_next]));
663
664 #if defined(PROFILING)
665     startHeapProfTimer();
666 #endif
667
668     // ----------------------------------------------------------------------
669     // Run the current thread 
670
671     prev_what_next = t->what_next;
672
673     errno = t->saved_errno;
674     in_haskell = rtsTrue;
675
676     switch (prev_what_next) {
677
678     case ThreadKilled:
679     case ThreadComplete:
680         /* Thread already finished, return to scheduler. */
681         ret = ThreadFinished;
682         break;
683
684     case ThreadRunGHC:
685         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
686         break;
687
688     case ThreadInterpret:
689         ret = interpretBCO(cap);
690         break;
691
692     default:
693       barf("schedule: invalid what_next field");
694     }
695
696     // We have run some Haskell code: there might be blackhole-blocked
697     // threads to wake up now.
698     if ( blackhole_queue != END_TSO_QUEUE ) {
699         blackholes_need_checking = rtsTrue;
700     }
701
702     in_haskell = rtsFalse;
703
704     // The TSO might have moved, eg. if it re-entered the RTS and a GC
705     // happened.  So find the new location:
706     t = cap->r.rCurrentTSO;
707
708     // And save the current errno in this thread.
709     t->saved_errno = errno;
710
711     // ----------------------------------------------------------------------
712     
713     /* Costs for the scheduler are assigned to CCS_SYSTEM */
714 #if defined(PROFILING)
715     stopHeapProfTimer();
716     CCCS = CCS_SYSTEM;
717 #endif
718     
719     ACQUIRE_LOCK(&sched_mutex);
720     
721 #if defined(RTS_SUPPORTS_THREADS)
722     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
723 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
724     IF_DEBUG(scheduler,debugBelch("sched: "););
725 #endif
726     
727     schedulePostRunThread();
728
729     ready_to_gc = rtsFalse;
730
731     switch (ret) {
732     case HeapOverflow:
733         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
734         break;
735
736     case StackOverflow:
737         scheduleHandleStackOverflow(t);
738         break;
739
740     case ThreadYielding:
741         if (scheduleHandleYield(t, prev_what_next)) {
742             // shortcut for switching between compiler/interpreter:
743             goto run_thread; 
744         }
745         break;
746
747     case ThreadBlocked:
748         scheduleHandleThreadBlocked(t);
749         threadPaused(t);
750         break;
751
752     case ThreadFinished:
753         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
754         break;
755
756     default:
757       barf("schedule: invalid thread return code %d", (int)ret);
758     }
759
760     if (scheduleDoHeapProfile()) { ready_to_gc = rtsFalse; }
761     if (ready_to_gc) { scheduleDoGC(cap); }
762   } /* end of while() */
763
764   IF_PAR_DEBUG(verbose,
765                debugBelch("== Leaving schedule() after having received Finish\n"));
766 }
767
768 /* ----------------------------------------------------------------------------
769  * Setting up the scheduler loop
770  * ASSUMES: sched_mutex
771  * ------------------------------------------------------------------------- */
772
773 static void
774 schedulePreLoop(void)
775 {
776 #if defined(GRAN) 
777     /* set up first event to get things going */
778     /* ToDo: assign costs for system setup and init MainTSO ! */
779     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
780               ContinueThread, 
781               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
782     
783     IF_DEBUG(gran,
784              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
785                         CurrentTSO);
786              G_TSO(CurrentTSO, 5));
787     
788     if (RtsFlags.GranFlags.Light) {
789         /* Save current time; GranSim Light only */
790         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
791     }      
792 #endif
793 }
794
795 /* ----------------------------------------------------------------------------
796  * Start any pending signal handlers
797  * ASSUMES: sched_mutex
798  * ------------------------------------------------------------------------- */
799
800 static void
801 scheduleStartSignalHandlers(void)
802 {
803 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
804     if (signals_pending()) {
805       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
806       startSignalHandlers();
807       ACQUIRE_LOCK(&sched_mutex);
808     }
809 #endif
810 }
811
812 /* ----------------------------------------------------------------------------
813  * Check for blocked threads that can be woken up.
814  * ASSUMES: sched_mutex
815  * ------------------------------------------------------------------------- */
816
817 static void
818 scheduleCheckBlockedThreads(void)
819 {
820     //
821     // Check whether any waiting threads need to be woken up.  If the
822     // run queue is empty, and there are no other tasks running, we
823     // can wait indefinitely for something to happen.
824     //
825     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
826     {
827 #if defined(RTS_SUPPORTS_THREADS)
828         // We shouldn't be here...
829         barf("schedule: awaitEvent() in threaded RTS");
830 #endif
831         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
832     }
833 }
834
835
836 /* ----------------------------------------------------------------------------
837  * Check for threads blocked on BLACKHOLEs that can be woken up
838  * ASSUMES: sched_mutex
839  * ------------------------------------------------------------------------- */
840 static void
841 scheduleCheckBlackHoles( void )
842 {
843     if ( blackholes_need_checking )
844     {
845         checkBlackHoles();
846         blackholes_need_checking = rtsFalse;
847     }
848 }
849
850 /* ----------------------------------------------------------------------------
851  * Detect deadlock conditions and attempt to resolve them.
852  * ASSUMES: sched_mutex
853  * ------------------------------------------------------------------------- */
854
855 static void
856 scheduleDetectDeadlock(void)
857 {
858     /* 
859      * Detect deadlock: when we have no threads to run, there are no
860      * threads blocked, waiting for I/O, or sleeping, and all the
861      * other tasks are waiting for work, we must have a deadlock of
862      * some description.
863      */
864     if ( EMPTY_THREAD_QUEUES() )
865     {
866 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
867         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
868
869         // Garbage collection can release some new threads due to
870         // either (a) finalizers or (b) threads resurrected because
871         // they are unreachable and will therefore be sent an
872         // exception.  Any threads thus released will be immediately
873         // runnable.
874         GarbageCollect(GetRoots,rtsTrue);
875         if ( !EMPTY_RUN_QUEUE() ) return;
876
877 #if defined(RTS_USER_SIGNALS)
878         /* If we have user-installed signal handlers, then wait
879          * for signals to arrive rather then bombing out with a
880          * deadlock.
881          */
882         if ( anyUserHandlers() ) {
883             IF_DEBUG(scheduler, 
884                      sched_belch("still deadlocked, waiting for signals..."));
885
886             awaitUserSignals();
887
888             if (signals_pending()) {
889                 RELEASE_LOCK(&sched_mutex);
890                 startSignalHandlers();
891                 ACQUIRE_LOCK(&sched_mutex);
892             }
893
894             // either we have threads to run, or we were interrupted:
895             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
896         }
897 #endif
898
899         /* Probably a real deadlock.  Send the current main thread the
900          * Deadlock exception (or in the SMP build, send *all* main
901          * threads the deadlock exception, since none of them can make
902          * progress).
903          */
904         {
905             StgMainThread *m;
906             m = main_threads;
907             switch (m->tso->why_blocked) {
908             case BlockedOnBlackHole:
909             case BlockedOnException:
910             case BlockedOnMVar:
911                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
912                 return;
913             default:
914                 barf("deadlock: main thread blocked in a strange way");
915             }
916         }
917
918 #elif defined(RTS_SUPPORTS_THREADS)
919     // ToDo: add deadlock detection in threaded RTS
920 #elif defined(PARALLEL_HASKELL)
921     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
922 #endif
923     }
924 }
925
926 /* ----------------------------------------------------------------------------
927  * Process an event (GRAN only)
928  * ------------------------------------------------------------------------- */
929
930 #if defined(GRAN)
931 static StgTSO *
932 scheduleProcessEvent(rtsEvent *event)
933 {
934     StgTSO *t;
935
936     if (RtsFlags.GranFlags.Light)
937       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
938
939     /* adjust time based on time-stamp */
940     if (event->time > CurrentTime[CurrentProc] &&
941         event->evttype != ContinueThread)
942       CurrentTime[CurrentProc] = event->time;
943     
944     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
945     if (!RtsFlags.GranFlags.Light)
946       handleIdlePEs();
947
948     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
949
950     /* main event dispatcher in GranSim */
951     switch (event->evttype) {
952       /* Should just be continuing execution */
953     case ContinueThread:
954       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
955       /* ToDo: check assertion
956       ASSERT(run_queue_hd != (StgTSO*)NULL &&
957              run_queue_hd != END_TSO_QUEUE);
958       */
959       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
960       if (!RtsFlags.GranFlags.DoAsyncFetch &&
961           procStatus[CurrentProc]==Fetching) {
962         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
963               CurrentTSO->id, CurrentTSO, CurrentProc);
964         goto next_thread;
965       } 
966       /* Ignore ContinueThreads for completed threads */
967       if (CurrentTSO->what_next == ThreadComplete) {
968         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
969               CurrentTSO->id, CurrentTSO, CurrentProc);
970         goto next_thread;
971       } 
972       /* Ignore ContinueThreads for threads that are being migrated */
973       if (PROCS(CurrentTSO)==Nowhere) { 
974         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
975               CurrentTSO->id, CurrentTSO, CurrentProc);
976         goto next_thread;
977       }
978       /* The thread should be at the beginning of the run queue */
979       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
980         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
981               CurrentTSO->id, CurrentTSO, CurrentProc);
982         break; // run the thread anyway
983       }
984       /*
985       new_event(proc, proc, CurrentTime[proc],
986                 FindWork,
987                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
988       goto next_thread; 
989       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
990       break; // now actually run the thread; DaH Qu'vam yImuHbej 
991
992     case FetchNode:
993       do_the_fetchnode(event);
994       goto next_thread;             /* handle next event in event queue  */
995       
996     case GlobalBlock:
997       do_the_globalblock(event);
998       goto next_thread;             /* handle next event in event queue  */
999       
1000     case FetchReply:
1001       do_the_fetchreply(event);
1002       goto next_thread;             /* handle next event in event queue  */
1003       
1004     case UnblockThread:   /* Move from the blocked queue to the tail of */
1005       do_the_unblock(event);
1006       goto next_thread;             /* handle next event in event queue  */
1007       
1008     case ResumeThread:  /* Move from the blocked queue to the tail of */
1009       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1010       event->tso->gran.blocktime += 
1011         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1012       do_the_startthread(event);
1013       goto next_thread;             /* handle next event in event queue  */
1014       
1015     case StartThread:
1016       do_the_startthread(event);
1017       goto next_thread;             /* handle next event in event queue  */
1018       
1019     case MoveThread:
1020       do_the_movethread(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case MoveSpark:
1024       do_the_movespark(event);
1025       goto next_thread;             /* handle next event in event queue  */
1026       
1027     case FindWork:
1028       do_the_findwork(event);
1029       goto next_thread;             /* handle next event in event queue  */
1030       
1031     default:
1032       barf("Illegal event type %u\n", event->evttype);
1033     }  /* switch */
1034     
1035     /* This point was scheduler_loop in the old RTS */
1036
1037     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1038
1039     TimeOfLastEvent = CurrentTime[CurrentProc];
1040     TimeOfNextEvent = get_time_of_next_event();
1041     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1042     // CurrentTSO = ThreadQueueHd;
1043
1044     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1045                          TimeOfNextEvent));
1046
1047     if (RtsFlags.GranFlags.Light) 
1048       GranSimLight_leave_system(event, &ActiveTSO); 
1049
1050     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1051
1052     IF_DEBUG(gran, 
1053              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1054
1055     /* in a GranSim setup the TSO stays on the run queue */
1056     t = CurrentTSO;
1057     /* Take a thread from the run queue. */
1058     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1059
1060     IF_DEBUG(gran, 
1061              debugBelch("GRAN: About to run current thread, which is\n");
1062              G_TSO(t,5));
1063
1064     context_switch = 0; // turned on via GranYield, checking events and time slice
1065
1066     IF_DEBUG(gran, 
1067              DumpGranEvent(GR_SCHEDULE, t));
1068
1069     procStatus[CurrentProc] = Busy;
1070 }
1071 #endif // GRAN
1072
1073 /* ----------------------------------------------------------------------------
1074  * Send pending messages (PARALLEL_HASKELL only)
1075  * ------------------------------------------------------------------------- */
1076
1077 #if defined(PARALLEL_HASKELL)
1078 static StgTSO *
1079 scheduleSendPendingMessages(void)
1080 {
1081     StgSparkPool *pool;
1082     rtsSpark spark;
1083     StgTSO *t;
1084
1085 # if defined(PAR) // global Mem.Mgmt., omit for now
1086     if (PendingFetches != END_BF_QUEUE) {
1087         processFetches();
1088     }
1089 # endif
1090     
1091     if (RtsFlags.ParFlags.BufferTime) {
1092         // if we use message buffering, we must send away all message
1093         // packets which have become too old...
1094         sendOldBuffers(); 
1095     }
1096 }
1097 #endif
1098
1099 /* ----------------------------------------------------------------------------
1100  * Activate spark threads (PARALLEL_HASKELL only)
1101  * ------------------------------------------------------------------------- */
1102
1103 #if defined(PARALLEL_HASKELL)
1104 static void
1105 scheduleActivateSpark(void)
1106 {
1107 #if defined(SPARKS)
1108   ASSERT(EMPTY_RUN_QUEUE());
1109 /* We get here if the run queue is empty and want some work.
1110    We try to turn a spark into a thread, and add it to the run queue,
1111    from where it will be picked up in the next iteration of the scheduler
1112    loop.
1113 */
1114
1115       /* :-[  no local threads => look out for local sparks */
1116       /* the spark pool for the current PE */
1117       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1118       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1119           pool->hd < pool->tl) {
1120         /* 
1121          * ToDo: add GC code check that we really have enough heap afterwards!!
1122          * Old comment:
1123          * If we're here (no runnable threads) and we have pending
1124          * sparks, we must have a space problem.  Get enough space
1125          * to turn one of those pending sparks into a
1126          * thread... 
1127          */
1128
1129         spark = findSpark(rtsFalse);            /* get a spark */
1130         if (spark != (rtsSpark) NULL) {
1131           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1132           IF_PAR_DEBUG(fish, // schedule,
1133                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1134                              tso->id, tso, advisory_thread_count));
1135
1136           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1137             IF_PAR_DEBUG(fish, // schedule,
1138                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1139                             spark));
1140             return rtsFalse; /* failed to generate a thread */
1141           }                  /* otherwise fall through & pick-up new tso */
1142         } else {
1143           IF_PAR_DEBUG(fish, // schedule,
1144                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1145                              spark_queue_len(pool)));
1146           return rtsFalse;  /* failed to generate a thread */
1147         }
1148         return rtsTrue;  /* success in generating a thread */
1149   } else { /* no more threads permitted or pool empty */
1150     return rtsFalse;  /* failed to generateThread */
1151   }
1152 #else
1153   tso = NULL; // avoid compiler warning only
1154   return rtsFalse;  /* dummy in non-PAR setup */
1155 #endif // SPARKS
1156 }
1157 #endif // PARALLEL_HASKELL
1158
1159 /* ----------------------------------------------------------------------------
1160  * Get work from a remote node (PARALLEL_HASKELL only)
1161  * ------------------------------------------------------------------------- */
1162     
1163 #if defined(PARALLEL_HASKELL)
1164 static rtsBool
1165 scheduleGetRemoteWork(rtsBool *receivedFinish)
1166 {
1167   ASSERT(EMPTY_RUN_QUEUE());
1168
1169   if (RtsFlags.ParFlags.BufferTime) {
1170         IF_PAR_DEBUG(verbose, 
1171                 debugBelch("...send all pending data,"));
1172         {
1173           nat i;
1174           for (i=1; i<=nPEs; i++)
1175             sendImmediately(i); // send all messages away immediately
1176         }
1177   }
1178 # ifndef SPARKS
1179         //++EDEN++ idle() , i.e. send all buffers, wait for work
1180         // suppress fishing in EDEN... just look for incoming messages
1181         // (blocking receive)
1182   IF_PAR_DEBUG(verbose, 
1183                debugBelch("...wait for incoming messages...\n"));
1184   *receivedFinish = processMessages(); // blocking receive...
1185
1186         // and reenter scheduling loop after having received something
1187         // (return rtsFalse below)
1188
1189 # else /* activate SPARKS machinery */
1190 /* We get here, if we have no work, tried to activate a local spark, but still
1191    have no work. We try to get a remote spark, by sending a FISH message.
1192    Thread migration should be added here, and triggered when a sequence of 
1193    fishes returns without work. */
1194         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1195
1196       /* =8-[  no local sparks => look for work on other PEs */
1197         /*
1198          * We really have absolutely no work.  Send out a fish
1199          * (there may be some out there already), and wait for
1200          * something to arrive.  We clearly can't run any threads
1201          * until a SCHEDULE or RESUME arrives, and so that's what
1202          * we're hoping to see.  (Of course, we still have to
1203          * respond to other types of messages.)
1204          */
1205         rtsTime now = msTime() /*CURRENT_TIME*/;
1206         IF_PAR_DEBUG(verbose, 
1207                      debugBelch("--  now=%ld\n", now));
1208         IF_PAR_DEBUG(fish, // verbose,
1209              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1210                  (last_fish_arrived_at!=0 &&
1211                   last_fish_arrived_at+delay > now)) {
1212                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1213                      now, last_fish_arrived_at+delay, 
1214                      last_fish_arrived_at,
1215                      delay);
1216              });
1217   
1218         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1219             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1220           if (last_fish_arrived_at==0 ||
1221               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1222             /* outstandingFishes is set in sendFish, processFish;
1223                avoid flooding system with fishes via delay */
1224     next_fish_to_send_at = 0;  
1225   } else {
1226     /* ToDo: this should be done in the main scheduling loop to avoid the
1227              busy wait here; not so bad if fish delay is very small  */
1228     int iq = 0; // DEBUGGING -- HWL
1229     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1230     /* send a fish when ready, but process messages that arrive in the meantime */
1231     do {
1232       if (PacketsWaiting()) {
1233         iq++; // DEBUGGING
1234         *receivedFinish = processMessages();
1235       }
1236       now = msTime();
1237     } while (!*receivedFinish || now<next_fish_to_send_at);
1238     // JB: This means the fish could become obsolete, if we receive
1239     // work. Better check for work again? 
1240     // last line: while (!receivedFinish || !haveWork || now<...)
1241     // next line: if (receivedFinish || haveWork )
1242
1243     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1244       return rtsFalse;  // NB: this will leave scheduler loop
1245                         // immediately after return!
1246                           
1247     IF_PAR_DEBUG(fish, // verbose,
1248                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1249
1250   }
1251
1252     // JB: IMHO, this should all be hidden inside sendFish(...)
1253     /* pe = choosePE(); 
1254        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1255                 NEW_FISH_HUNGER);
1256
1257     // Global statistics: count no. of fishes
1258     if (RtsFlags.ParFlags.ParStats.Global &&
1259          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1260            globalParStats.tot_fish_mess++;
1261            }
1262     */ 
1263
1264   /* delayed fishes must have been sent by now! */
1265   next_fish_to_send_at = 0;  
1266   }
1267       
1268   *receivedFinish = processMessages();
1269 # endif /* SPARKS */
1270
1271  return rtsFalse;
1272  /* NB: this function always returns rtsFalse, meaning the scheduler
1273     loop continues with the next iteration; 
1274     rationale: 
1275       return code means success in finding work; we enter this function
1276       if there is no local work, thus have to send a fish which takes
1277       time until it arrives with work; in the meantime we should process
1278       messages in the main loop;
1279  */
1280 }
1281 #endif // PARALLEL_HASKELL
1282
1283 /* ----------------------------------------------------------------------------
1284  * PAR/GRAN: Report stats & debugging info(?)
1285  * ------------------------------------------------------------------------- */
1286
1287 #if defined(PAR) || defined(GRAN)
1288 static void
1289 scheduleGranParReport(void)
1290 {
1291   ASSERT(run_queue_hd != END_TSO_QUEUE);
1292
1293   /* Take a thread from the run queue, if we have work */
1294   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1295
1296     /* If this TSO has got its outport closed in the meantime, 
1297      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1298      * It has to be marked as TH_DEAD for this purpose.
1299      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1300
1301 JB: TODO: investigate wether state change field could be nuked
1302      entirely and replaced by the normal tso state (whatnext
1303      field). All we want to do is to kill tsos from outside.
1304      */
1305
1306     /* ToDo: write something to the log-file
1307     if (RTSflags.ParFlags.granSimStats && !sameThread)
1308         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1309
1310     CurrentTSO = t;
1311     */
1312     /* the spark pool for the current PE */
1313     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1314
1315     IF_DEBUG(scheduler, 
1316              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1317                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1318
1319     IF_PAR_DEBUG(fish,
1320              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1321                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1322
1323     if (RtsFlags.ParFlags.ParStats.Full && 
1324         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1325         (emitSchedule || // forced emit
1326          (t && LastTSO && t->id != LastTSO->id))) {
1327       /* 
1328          we are running a different TSO, so write a schedule event to log file
1329          NB: If we use fair scheduling we also have to write  a deschedule 
1330              event for LastTSO; with unfair scheduling we know that the
1331              previous tso has blocked whenever we switch to another tso, so
1332              we don't need it in GUM for now
1333       */
1334       IF_PAR_DEBUG(fish, // schedule,
1335                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1336
1337       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1338                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1339       emitSchedule = rtsFalse;
1340     }
1341 }     
1342 #endif
1343
1344 /* ----------------------------------------------------------------------------
1345  * After running a thread...
1346  * ASSUMES: sched_mutex
1347  * ------------------------------------------------------------------------- */
1348
1349 static void
1350 schedulePostRunThread(void)
1351 {
1352 #if defined(PAR)
1353     /* HACK 675: if the last thread didn't yield, make sure to print a 
1354        SCHEDULE event to the log file when StgRunning the next thread, even
1355        if it is the same one as before */
1356     LastTSO = t; 
1357     TimeOfLastYield = CURRENT_TIME;
1358 #endif
1359
1360   /* some statistics gathering in the parallel case */
1361
1362 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1363   switch (ret) {
1364     case HeapOverflow:
1365 # if defined(GRAN)
1366       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1367       globalGranStats.tot_heapover++;
1368 # elif defined(PAR)
1369       globalParStats.tot_heapover++;
1370 # endif
1371       break;
1372
1373      case StackOverflow:
1374 # if defined(GRAN)
1375       IF_DEBUG(gran, 
1376                DumpGranEvent(GR_DESCHEDULE, t));
1377       globalGranStats.tot_stackover++;
1378 # elif defined(PAR)
1379       // IF_DEBUG(par, 
1380       // DumpGranEvent(GR_DESCHEDULE, t);
1381       globalParStats.tot_stackover++;
1382 # endif
1383       break;
1384
1385     case ThreadYielding:
1386 # if defined(GRAN)
1387       IF_DEBUG(gran, 
1388                DumpGranEvent(GR_DESCHEDULE, t));
1389       globalGranStats.tot_yields++;
1390 # elif defined(PAR)
1391       // IF_DEBUG(par, 
1392       // DumpGranEvent(GR_DESCHEDULE, t);
1393       globalParStats.tot_yields++;
1394 # endif
1395       break; 
1396
1397     case ThreadBlocked:
1398 # if defined(GRAN)
1399       IF_DEBUG(scheduler,
1400                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1401                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1402                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1403                if (t->block_info.closure!=(StgClosure*)NULL)
1404                  print_bq(t->block_info.closure);
1405                debugBelch("\n"));
1406
1407       // ??? needed; should emit block before
1408       IF_DEBUG(gran, 
1409                DumpGranEvent(GR_DESCHEDULE, t)); 
1410       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1411       /*
1412         ngoq Dogh!
1413       ASSERT(procStatus[CurrentProc]==Busy || 
1414               ((procStatus[CurrentProc]==Fetching) && 
1415               (t->block_info.closure!=(StgClosure*)NULL)));
1416       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1417           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1418             procStatus[CurrentProc]==Fetching)) 
1419         procStatus[CurrentProc] = Idle;
1420       */
1421 # elif defined(PAR)
1422 //++PAR++  blockThread() writes the event (change?)
1423 # endif
1424     break;
1425
1426   case ThreadFinished:
1427     break;
1428
1429   default:
1430     barf("parGlobalStats: unknown return code");
1431     break;
1432     }
1433 #endif
1434 }
1435
1436 /* -----------------------------------------------------------------------------
1437  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1438  * ASSUMES: sched_mutex
1439  * -------------------------------------------------------------------------- */
1440
1441 static rtsBool
1442 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1443 {
1444     // did the task ask for a large block?
1445     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1446         // if so, get one and push it on the front of the nursery.
1447         bdescr *bd;
1448         lnat blocks;
1449         
1450         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1451         
1452         IF_DEBUG(scheduler,
1453                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1454                             (long)t->id, whatNext_strs[t->what_next], blocks));
1455         
1456         // don't do this if it would push us over the
1457         // alloc_blocks_lim limit; we'll GC first.
1458         if (alloc_blocks + blocks < alloc_blocks_lim) {
1459             
1460             alloc_blocks += blocks;
1461             bd = allocGroup( blocks );
1462             
1463             // link the new group into the list
1464             bd->link = cap->r.rCurrentNursery;
1465             bd->u.back = cap->r.rCurrentNursery->u.back;
1466             if (cap->r.rCurrentNursery->u.back != NULL) {
1467                 cap->r.rCurrentNursery->u.back->link = bd;
1468             } else {
1469 #ifdef SMP
1470                 cap->r.rNursery = g0s0->blocks = bd;
1471 #else
1472                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1473                        g0s0->blocks == cap->r.rNursery);
1474                 cap->r.rNursery = g0s0->blocks = bd;
1475 #endif
1476             }             
1477             cap->r.rCurrentNursery->u.back = bd;
1478             
1479             // initialise it as a nursery block.  We initialise the
1480             // step, gen_no, and flags field of *every* sub-block in
1481             // this large block, because this is easier than making
1482             // sure that we always find the block head of a large
1483             // block whenever we call Bdescr() (eg. evacuate() and
1484             // isAlive() in the GC would both have to do this, at
1485             // least).
1486             { 
1487                 bdescr *x;
1488                 for (x = bd; x < bd + blocks; x++) {
1489                     x->step = g0s0;
1490                     x->gen_no = 0;
1491                     x->flags = 0;
1492                 }
1493             }
1494             
1495 #if !defined(SMP)
1496             // don't forget to update the block count in g0s0.
1497             g0s0->n_blocks += blocks;
1498
1499             // This assert can be a killer if the app is doing lots
1500             // of large block allocations.
1501             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1502 #endif
1503             
1504             // now update the nursery to point to the new block
1505             cap->r.rCurrentNursery = bd;
1506             
1507             // we might be unlucky and have another thread get on the
1508             // run queue before us and steal the large block, but in that
1509             // case the thread will just end up requesting another large
1510             // block.
1511             PUSH_ON_RUN_QUEUE(t);
1512             return rtsFalse;  /* not actually GC'ing */
1513         }
1514     }
1515     
1516     /* make all the running tasks block on a condition variable,
1517      * maybe set context_switch and wait till they all pile in,
1518      * then have them wait on a GC condition variable.
1519      */
1520     IF_DEBUG(scheduler,
1521              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1522                         (long)t->id, whatNext_strs[t->what_next]));
1523     threadPaused(t);
1524 #if defined(GRAN)
1525     ASSERT(!is_on_queue(t,CurrentProc));
1526 #elif defined(PARALLEL_HASKELL)
1527     /* Currently we emit a DESCHEDULE event before GC in GUM.
1528        ToDo: either add separate event to distinguish SYSTEM time from rest
1529        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1530     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1531         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1532                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1533         emitSchedule = rtsTrue;
1534     }
1535 #endif
1536       
1537     PUSH_ON_RUN_QUEUE(t);
1538     return rtsTrue;
1539     /* actual GC is done at the end of the while loop in schedule() */
1540 }
1541
1542 /* -----------------------------------------------------------------------------
1543  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1544  * ASSUMES: sched_mutex
1545  * -------------------------------------------------------------------------- */
1546
1547 static void
1548 scheduleHandleStackOverflow( StgTSO *t)
1549 {
1550     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1551                                   (long)t->id, whatNext_strs[t->what_next]));
1552     /* just adjust the stack for this thread, then pop it back
1553      * on the run queue.
1554      */
1555     threadPaused(t);
1556     { 
1557         /* enlarge the stack */
1558         StgTSO *new_t = threadStackOverflow(t);
1559         
1560         /* This TSO has moved, so update any pointers to it from the
1561          * main thread stack.  It better not be on any other queues...
1562          * (it shouldn't be).
1563          */
1564         if (t->main != NULL) {
1565             t->main->tso = new_t;
1566         }
1567         PUSH_ON_RUN_QUEUE(new_t);
1568     }
1569 }
1570
1571 /* -----------------------------------------------------------------------------
1572  * Handle a thread that returned to the scheduler with ThreadYielding
1573  * ASSUMES: sched_mutex
1574  * -------------------------------------------------------------------------- */
1575
1576 static rtsBool
1577 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1578 {
1579     // Reset the context switch flag.  We don't do this just before
1580     // running the thread, because that would mean we would lose ticks
1581     // during GC, which can lead to unfair scheduling (a thread hogs
1582     // the CPU because the tick always arrives during GC).  This way
1583     // penalises threads that do a lot of allocation, but that seems
1584     // better than the alternative.
1585     context_switch = 0;
1586     
1587     /* put the thread back on the run queue.  Then, if we're ready to
1588      * GC, check whether this is the last task to stop.  If so, wake
1589      * up the GC thread.  getThread will block during a GC until the
1590      * GC is finished.
1591      */
1592     IF_DEBUG(scheduler,
1593              if (t->what_next != prev_what_next) {
1594                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1595                             (long)t->id, whatNext_strs[t->what_next]);
1596              } else {
1597                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1598                             (long)t->id, whatNext_strs[t->what_next]);
1599              }
1600         );
1601     
1602     IF_DEBUG(sanity,
1603              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1604              checkTSO(t));
1605     ASSERT(t->link == END_TSO_QUEUE);
1606     
1607     // Shortcut if we're just switching evaluators: don't bother
1608     // doing stack squeezing (which can be expensive), just run the
1609     // thread.
1610     if (t->what_next != prev_what_next) {
1611         return rtsTrue;
1612     }
1613     
1614     threadPaused(t);
1615     
1616 #if defined(GRAN)
1617     ASSERT(!is_on_queue(t,CurrentProc));
1618       
1619     IF_DEBUG(sanity,
1620              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1621              checkThreadQsSanity(rtsTrue));
1622
1623 #endif
1624
1625     addToRunQueue(t);
1626
1627 #if defined(GRAN)
1628     /* add a ContinueThread event to actually process the thread */
1629     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1630               ContinueThread,
1631               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1632     IF_GRAN_DEBUG(bq, 
1633                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1634                   G_EVENTQ(0);
1635                   G_CURR_THREADQ(0));
1636 #endif
1637     return rtsFalse;
1638 }
1639
1640 /* -----------------------------------------------------------------------------
1641  * Handle a thread that returned to the scheduler with ThreadBlocked
1642  * ASSUMES: sched_mutex
1643  * -------------------------------------------------------------------------- */
1644
1645 static void
1646 scheduleHandleThreadBlocked( StgTSO *t
1647 #if !defined(GRAN) && !defined(DEBUG)
1648     STG_UNUSED
1649 #endif
1650     )
1651 {
1652 #if defined(GRAN)
1653     IF_DEBUG(scheduler,
1654              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1655                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1656              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1657     
1658     // ??? needed; should emit block before
1659     IF_DEBUG(gran, 
1660              DumpGranEvent(GR_DESCHEDULE, t)); 
1661     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1662     /*
1663       ngoq Dogh!
1664       ASSERT(procStatus[CurrentProc]==Busy || 
1665       ((procStatus[CurrentProc]==Fetching) && 
1666       (t->block_info.closure!=(StgClosure*)NULL)));
1667       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1668       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1669       procStatus[CurrentProc]==Fetching)) 
1670       procStatus[CurrentProc] = Idle;
1671     */
1672 #elif defined(PAR)
1673     IF_DEBUG(scheduler,
1674              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1675                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1676     IF_PAR_DEBUG(bq,
1677                  
1678                  if (t->block_info.closure!=(StgClosure*)NULL) 
1679                  print_bq(t->block_info.closure));
1680     
1681     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1682     blockThread(t);
1683     
1684     /* whatever we schedule next, we must log that schedule */
1685     emitSchedule = rtsTrue;
1686     
1687 #else /* !GRAN */
1688       /* don't need to do anything.  Either the thread is blocked on
1689        * I/O, in which case we'll have called addToBlockedQueue
1690        * previously, or it's blocked on an MVar or Blackhole, in which
1691        * case it'll be on the relevant queue already.
1692        */
1693     ASSERT(t->why_blocked != NotBlocked);
1694     IF_DEBUG(scheduler,
1695              debugBelch("--<< thread %d (%s) stopped: ", 
1696                         t->id, whatNext_strs[t->what_next]);
1697              printThreadBlockage(t);
1698              debugBelch("\n"));
1699     
1700     /* Only for dumping event to log file 
1701        ToDo: do I need this in GranSim, too?
1702        blockThread(t);
1703     */
1704 #endif
1705 }
1706
1707 /* -----------------------------------------------------------------------------
1708  * Handle a thread that returned to the scheduler with ThreadFinished
1709  * ASSUMES: sched_mutex
1710  * -------------------------------------------------------------------------- */
1711
1712 static rtsBool
1713 scheduleHandleThreadFinished( StgMainThread *mainThread
1714                               USED_WHEN_RTS_SUPPORTS_THREADS,
1715                               Capability *cap,
1716                               StgTSO *t )
1717 {
1718     /* Need to check whether this was a main thread, and if so,
1719      * return with the return value.
1720      *
1721      * We also end up here if the thread kills itself with an
1722      * uncaught exception, see Exception.cmm.
1723      */
1724     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1725                                   t->id, whatNext_strs[t->what_next]));
1726
1727 #if defined(GRAN)
1728       endThread(t, CurrentProc); // clean-up the thread
1729 #elif defined(PARALLEL_HASKELL)
1730       /* For now all are advisory -- HWL */
1731       //if(t->priority==AdvisoryPriority) ??
1732       advisory_thread_count--; // JB: Caution with this counter, buggy!
1733       
1734 # if defined(DIST)
1735       if(t->dist.priority==RevalPriority)
1736         FinishReval(t);
1737 # endif
1738     
1739 # if defined(EDENOLD)
1740       // the thread could still have an outport... (BUG)
1741       if (t->eden.outport != -1) {
1742       // delete the outport for the tso which has finished...
1743         IF_PAR_DEBUG(eden_ports,
1744                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1745                               t->eden.outport, t->id));
1746         deleteOPT(t);
1747       }
1748       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1749       if (t->eden.epid != -1) {
1750         IF_PAR_DEBUG(eden_ports,
1751                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1752                            t->id, t->eden.epid));
1753         removeTSOfromProcess(t);
1754       }
1755 # endif 
1756
1757 # if defined(PAR)
1758       if (RtsFlags.ParFlags.ParStats.Full &&
1759           !RtsFlags.ParFlags.ParStats.Suppressed) 
1760         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1761
1762       //  t->par only contains statistics: left out for now...
1763       IF_PAR_DEBUG(fish,
1764                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1765                               t->id,t,t->par.sparkname));
1766 # endif
1767 #endif // PARALLEL_HASKELL
1768
1769       //
1770       // Check whether the thread that just completed was a main
1771       // thread, and if so return with the result.  
1772       //
1773       // There is an assumption here that all thread completion goes
1774       // through this point; we need to make sure that if a thread
1775       // ends up in the ThreadKilled state, that it stays on the run
1776       // queue so it can be dealt with here.
1777       //
1778       if (
1779 #if defined(RTS_SUPPORTS_THREADS)
1780           mainThread != NULL
1781 #else
1782           mainThread->tso == t
1783 #endif
1784           )
1785       {
1786           // We are a bound thread: this must be our thread that just
1787           // completed.
1788           ASSERT(mainThread->tso == t);
1789
1790           if (t->what_next == ThreadComplete) {
1791               if (mainThread->ret) {
1792                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1793                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1794               }
1795               mainThread->stat = Success;
1796           } else {
1797               if (mainThread->ret) {
1798                   *(mainThread->ret) = NULL;
1799               }
1800               if (interrupted) {
1801                   mainThread->stat = Interrupted;
1802               } else {
1803                   mainThread->stat = Killed;
1804               }
1805           }
1806 #ifdef DEBUG
1807           removeThreadLabel((StgWord)mainThread->tso->id);
1808 #endif
1809           if (mainThread->prev == NULL) {
1810               main_threads = mainThread->link;
1811           } else {
1812               mainThread->prev->link = mainThread->link;
1813           }
1814           if (mainThread->link != NULL) {
1815               mainThread->link->prev = NULL;
1816           }
1817           releaseCapability(cap);
1818           return rtsTrue; // tells schedule() to return
1819       }
1820
1821 #ifdef RTS_SUPPORTS_THREADS
1822       ASSERT(t->main == NULL);
1823 #else
1824       if (t->main != NULL) {
1825           // Must be a main thread that is not the topmost one.  Leave
1826           // it on the run queue until the stack has unwound to the
1827           // point where we can deal with this.  Leaving it on the run
1828           // queue also ensures that the garbage collector knows about
1829           // this thread and its return value (it gets dropped from the
1830           // all_threads list so there's no other way to find it).
1831           APPEND_TO_RUN_QUEUE(t);
1832       }
1833 #endif
1834       return rtsFalse;
1835 }
1836
1837 /* -----------------------------------------------------------------------------
1838  * Perform a heap census, if PROFILING
1839  * -------------------------------------------------------------------------- */
1840
1841 static rtsBool
1842 scheduleDoHeapProfile(void)
1843 {
1844 #if defined(PROFILING)
1845     // When we have +RTS -i0 and we're heap profiling, do a census at
1846     // every GC.  This lets us get repeatable runs for debugging.
1847     if (performHeapProfile ||
1848         (RtsFlags.ProfFlags.profileInterval==0 &&
1849          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1850         GarbageCollect(GetRoots, rtsTrue);
1851         heapCensus();
1852         performHeapProfile = rtsFalse;
1853         return rtsTrue;  // true <=> we already GC'd
1854     }
1855 #endif
1856     return rtsFalse;
1857 }
1858
1859 /* -----------------------------------------------------------------------------
1860  * Perform a garbage collection if necessary
1861  * ASSUMES: sched_mutex
1862  * -------------------------------------------------------------------------- */
1863
1864 static void
1865 scheduleDoGC( Capability *cap )
1866 {
1867     StgTSO *t;
1868 #ifdef SMP
1869     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
1870            // subtract one because we're already holding one.
1871     Capability *caps[n_capabilities];
1872 #endif
1873
1874 #ifdef SMP
1875     // In order to GC, there must be no threads running Haskell code.
1876     // Therefore, the GC thread needs to hold *all* the capabilities,
1877     // and release them after the GC has completed.  
1878     //
1879     // This seems to be the simplest way: previous attempts involved
1880     // making all the threads with capabilities give up their
1881     // capabilities and sleep except for the *last* one, which
1882     // actually did the GC.  But it's quite hard to arrange for all
1883     // the other tasks to sleep and stay asleep.
1884     //
1885         
1886     caps[n_capabilities] = cap;
1887     while (n_capabilities > 0) {
1888         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
1889         waitForReturnCapability(&sched_mutex, &cap);
1890         n_capabilities--;
1891         caps[n_capabilities] = cap;
1892     }
1893 #endif
1894
1895     /* Kick any transactions which are invalid back to their
1896      * atomically frames.  When next scheduled they will try to
1897      * commit, this commit will fail and they will retry.
1898      */
1899     for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1900         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1901             if (!stmValidateTransaction (t -> trec)) {
1902                 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1903                 
1904                 // strip the stack back to the ATOMICALLY_FRAME, aborting
1905                 // the (nested) transaction, and saving the stack of any
1906                 // partially-evaluated thunks on the heap.
1907                 raiseAsync_(t, NULL, rtsTrue);
1908                 
1909 #ifdef REG_R1
1910                 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1911 #endif
1912             }
1913         }
1914     }
1915     
1916     // so this happens periodically:
1917     scheduleCheckBlackHoles();
1918     
1919     /* everybody back, start the GC.
1920      * Could do it in this thread, or signal a condition var
1921      * to do it in another thread.  Either way, we need to
1922      * broadcast on gc_pending_cond afterward.
1923      */
1924 #if defined(RTS_SUPPORTS_THREADS)
1925     IF_DEBUG(scheduler,sched_belch("doing GC"));
1926 #endif
1927     GarbageCollect(GetRoots,rtsFalse);
1928     
1929 #if defined(SMP)
1930     {
1931         // release our stash of capabilities.
1932         nat i;
1933         for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
1934             releaseCapability(caps[i]);
1935         }
1936     }
1937 #endif
1938
1939 #if defined(GRAN)
1940     /* add a ContinueThread event to continue execution of current thread */
1941     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1942               ContinueThread,
1943               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1944     IF_GRAN_DEBUG(bq, 
1945                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1946                   G_EVENTQ(0);
1947                   G_CURR_THREADQ(0));
1948 #endif /* GRAN */
1949 }
1950
1951 /* ---------------------------------------------------------------------------
1952  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1953  * used by Control.Concurrent for error checking.
1954  * ------------------------------------------------------------------------- */
1955  
1956 StgBool
1957 rtsSupportsBoundThreads(void)
1958 {
1959 #ifdef THREADED_RTS
1960   return rtsTrue;
1961 #else
1962   return rtsFalse;
1963 #endif
1964 }
1965
1966 /* ---------------------------------------------------------------------------
1967  * isThreadBound(tso): check whether tso is bound to an OS thread.
1968  * ------------------------------------------------------------------------- */
1969  
1970 StgBool
1971 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1972 {
1973 #ifdef THREADED_RTS
1974   return (tso->main != NULL);
1975 #endif
1976   return rtsFalse;
1977 }
1978
1979 /* ---------------------------------------------------------------------------
1980  * Singleton fork(). Do not copy any running threads.
1981  * ------------------------------------------------------------------------- */
1982
1983 #ifndef mingw32_HOST_OS
1984 #define FORKPROCESS_PRIMOP_SUPPORTED
1985 #endif
1986
1987 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1988 static void 
1989 deleteThreadImmediately(StgTSO *tso);
1990 #endif
1991 StgInt
1992 forkProcess(HsStablePtr *entry
1993 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1994             STG_UNUSED
1995 #endif
1996            )
1997 {
1998 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1999   pid_t pid;
2000   StgTSO* t,*next;
2001   StgMainThread *m;
2002   SchedulerStatus rc;
2003
2004   IF_DEBUG(scheduler,sched_belch("forking!"));
2005   rts_lock(); // This not only acquires sched_mutex, it also
2006               // makes sure that no other threads are running
2007
2008   pid = fork();
2009
2010   if (pid) { /* parent */
2011
2012   /* just return the pid */
2013     rts_unlock();
2014     return pid;
2015     
2016   } else { /* child */
2017     
2018     
2019       // delete all threads
2020     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2021     
2022     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2023       next = t->link;
2024
2025         // don't allow threads to catch the ThreadKilled exception
2026       deleteThreadImmediately(t);
2027     }
2028     
2029       // wipe the main thread list
2030     while((m = main_threads) != NULL) {
2031       main_threads = m->link;
2032 # ifdef THREADED_RTS
2033       closeCondition(&m->bound_thread_cond);
2034 # endif
2035       stgFree(m);
2036     }
2037     
2038     rc = rts_evalStableIO(entry, NULL);  // run the action
2039     rts_checkSchedStatus("forkProcess",rc);
2040     
2041     rts_unlock();
2042     
2043     hs_exit();                      // clean up and exit
2044     stg_exit(0);
2045   }
2046 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2047   barf("forkProcess#: primop not supported, sorry!\n");
2048   return -1;
2049 #endif
2050 }
2051
2052 /* ---------------------------------------------------------------------------
2053  * deleteAllThreads():  kill all the live threads.
2054  *
2055  * This is used when we catch a user interrupt (^C), before performing
2056  * any necessary cleanups and running finalizers.
2057  *
2058  * Locks: sched_mutex held.
2059  * ------------------------------------------------------------------------- */
2060    
2061 void
2062 deleteAllThreads ( void )
2063 {
2064   StgTSO* t, *next;
2065   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2066   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2067       next = t->global_link;
2068       deleteThread(t);
2069   }      
2070
2071   // The run queue now contains a bunch of ThreadKilled threads.  We
2072   // must not throw these away: the main thread(s) will be in there
2073   // somewhere, and the main scheduler loop has to deal with it.
2074   // Also, the run queue is the only thing keeping these threads from
2075   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2076
2077   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2078   ASSERT(blackhole_queue == END_TSO_QUEUE);
2079   ASSERT(sleeping_queue == END_TSO_QUEUE);
2080 }
2081
2082 /* startThread and  insertThread are now in GranSim.c -- HWL */
2083
2084
2085 /* ---------------------------------------------------------------------------
2086  * Suspending & resuming Haskell threads.
2087  * 
2088  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2089  * its capability before calling the C function.  This allows another
2090  * task to pick up the capability and carry on running Haskell
2091  * threads.  It also means that if the C call blocks, it won't lock
2092  * the whole system.
2093  *
2094  * The Haskell thread making the C call is put to sleep for the
2095  * duration of the call, on the susepended_ccalling_threads queue.  We
2096  * give out a token to the task, which it can use to resume the thread
2097  * on return from the C function.
2098  * ------------------------------------------------------------------------- */
2099    
2100 StgInt
2101 suspendThread( StgRegTable *reg )
2102 {
2103   nat tok;
2104   Capability *cap;
2105   int saved_errno = errno;
2106
2107   /* assume that *reg is a pointer to the StgRegTable part
2108    * of a Capability.
2109    */
2110   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2111
2112   ACQUIRE_LOCK(&sched_mutex);
2113
2114   IF_DEBUG(scheduler,
2115            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2116
2117   // XXX this might not be necessary --SDM
2118   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2119
2120   threadPaused(cap->r.rCurrentTSO);
2121   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2122   suspended_ccalling_threads = cap->r.rCurrentTSO;
2123
2124   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2125       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2126       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2127   } else {
2128       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2129   }
2130
2131   /* Use the thread ID as the token; it should be unique */
2132   tok = cap->r.rCurrentTSO->id;
2133
2134   /* Hand back capability */
2135   releaseCapability(cap);
2136   
2137 #if defined(RTS_SUPPORTS_THREADS)
2138   /* Preparing to leave the RTS, so ensure there's a native thread/task
2139      waiting to take over.
2140   */
2141   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2142 #endif
2143
2144   in_haskell = rtsFalse;
2145   RELEASE_LOCK(&sched_mutex);
2146   
2147   errno = saved_errno;
2148   return tok; 
2149 }
2150
2151 StgRegTable *
2152 resumeThread( StgInt tok )
2153 {
2154   StgTSO *tso, **prev;
2155   Capability *cap;
2156   int saved_errno = errno;
2157
2158 #if defined(RTS_SUPPORTS_THREADS)
2159   /* Wait for permission to re-enter the RTS with the result. */
2160   ACQUIRE_LOCK(&sched_mutex);
2161   waitForReturnCapability(&sched_mutex, &cap);
2162
2163   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2164 #else
2165   grabCapability(&cap);
2166 #endif
2167
2168   /* Remove the thread off of the suspended list */
2169   prev = &suspended_ccalling_threads;
2170   for (tso = suspended_ccalling_threads; 
2171        tso != END_TSO_QUEUE; 
2172        prev = &tso->link, tso = tso->link) {
2173     if (tso->id == (StgThreadID)tok) {
2174       *prev = tso->link;
2175       break;
2176     }
2177   }
2178   if (tso == END_TSO_QUEUE) {
2179     barf("resumeThread: thread not found");
2180   }
2181   tso->link = END_TSO_QUEUE;
2182   
2183   if(tso->why_blocked == BlockedOnCCall) {
2184       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2185       tso->blocked_exceptions = NULL;
2186   }
2187   
2188   /* Reset blocking status */
2189   tso->why_blocked  = NotBlocked;
2190
2191   cap->r.rCurrentTSO = tso;
2192   in_haskell = rtsTrue;
2193   RELEASE_LOCK(&sched_mutex);
2194   errno = saved_errno;
2195   return &cap->r;
2196 }
2197
2198 /* ---------------------------------------------------------------------------
2199  * Comparing Thread ids.
2200  *
2201  * This is used from STG land in the implementation of the
2202  * instances of Eq/Ord for ThreadIds.
2203  * ------------------------------------------------------------------------ */
2204
2205 int
2206 cmp_thread(StgPtr tso1, StgPtr tso2) 
2207
2208   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2209   StgThreadID id2 = ((StgTSO *)tso2)->id;
2210  
2211   if (id1 < id2) return (-1);
2212   if (id1 > id2) return 1;
2213   return 0;
2214 }
2215
2216 /* ---------------------------------------------------------------------------
2217  * Fetching the ThreadID from an StgTSO.
2218  *
2219  * This is used in the implementation of Show for ThreadIds.
2220  * ------------------------------------------------------------------------ */
2221 int
2222 rts_getThreadId(StgPtr tso) 
2223 {
2224   return ((StgTSO *)tso)->id;
2225 }
2226
2227 #ifdef DEBUG
2228 void
2229 labelThread(StgPtr tso, char *label)
2230 {
2231   int len;
2232   void *buf;
2233
2234   /* Caveat: Once set, you can only set the thread name to "" */
2235   len = strlen(label)+1;
2236   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2237   strncpy(buf,label,len);
2238   /* Update will free the old memory for us */
2239   updateThreadLabel(((StgTSO *)tso)->id,buf);
2240 }
2241 #endif /* DEBUG */
2242
2243 /* ---------------------------------------------------------------------------
2244    Create a new thread.
2245
2246    The new thread starts with the given stack size.  Before the
2247    scheduler can run, however, this thread needs to have a closure
2248    (and possibly some arguments) pushed on its stack.  See
2249    pushClosure() in Schedule.h.
2250
2251    createGenThread() and createIOThread() (in SchedAPI.h) are
2252    convenient packaged versions of this function.
2253
2254    currently pri (priority) is only used in a GRAN setup -- HWL
2255    ------------------------------------------------------------------------ */
2256 #if defined(GRAN)
2257 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2258 StgTSO *
2259 createThread(nat size, StgInt pri)
2260 #else
2261 StgTSO *
2262 createThread(nat size)
2263 #endif
2264 {
2265
2266     StgTSO *tso;
2267     nat stack_size;
2268
2269     /* First check whether we should create a thread at all */
2270 #if defined(PARALLEL_HASKELL)
2271   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2272   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2273     threadsIgnored++;
2274     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2275           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2276     return END_TSO_QUEUE;
2277   }
2278   threadsCreated++;
2279 #endif
2280
2281 #if defined(GRAN)
2282   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2283 #endif
2284
2285   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2286
2287   /* catch ridiculously small stack sizes */
2288   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2289     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2290   }
2291
2292   stack_size = size - TSO_STRUCT_SIZEW;
2293
2294   tso = (StgTSO *)allocate(size);
2295   TICK_ALLOC_TSO(stack_size, 0);
2296
2297   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2298 #if defined(GRAN)
2299   SET_GRAN_HDR(tso, ThisPE);
2300 #endif
2301
2302   // Always start with the compiled code evaluator
2303   tso->what_next = ThreadRunGHC;
2304
2305   tso->id = next_thread_id++; 
2306   tso->why_blocked  = NotBlocked;
2307   tso->blocked_exceptions = NULL;
2308
2309   tso->saved_errno = 0;
2310   tso->main = NULL;
2311   
2312   tso->stack_size   = stack_size;
2313   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2314                               - TSO_STRUCT_SIZEW;
2315   tso->sp           = (P_)&(tso->stack) + stack_size;
2316
2317   tso->trec = NO_TREC;
2318
2319 #ifdef PROFILING
2320   tso->prof.CCCS = CCS_MAIN;
2321 #endif
2322
2323   /* put a stop frame on the stack */
2324   tso->sp -= sizeofW(StgStopFrame);
2325   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2326   tso->link = END_TSO_QUEUE;
2327
2328   // ToDo: check this
2329 #if defined(GRAN)
2330   /* uses more flexible routine in GranSim */
2331   insertThread(tso, CurrentProc);
2332 #else
2333   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2334    * from its creation
2335    */
2336 #endif
2337
2338 #if defined(GRAN) 
2339   if (RtsFlags.GranFlags.GranSimStats.Full) 
2340     DumpGranEvent(GR_START,tso);
2341 #elif defined(PARALLEL_HASKELL)
2342   if (RtsFlags.ParFlags.ParStats.Full) 
2343     DumpGranEvent(GR_STARTQ,tso);
2344   /* HACk to avoid SCHEDULE 
2345      LastTSO = tso; */
2346 #endif
2347
2348   /* Link the new thread on the global thread list.
2349    */
2350   tso->global_link = all_threads;
2351   all_threads = tso;
2352
2353 #if defined(DIST)
2354   tso->dist.priority = MandatoryPriority; //by default that is...
2355 #endif
2356
2357 #if defined(GRAN)
2358   tso->gran.pri = pri;
2359 # if defined(DEBUG)
2360   tso->gran.magic = TSO_MAGIC; // debugging only
2361 # endif
2362   tso->gran.sparkname   = 0;
2363   tso->gran.startedat   = CURRENT_TIME; 
2364   tso->gran.exported    = 0;
2365   tso->gran.basicblocks = 0;
2366   tso->gran.allocs      = 0;
2367   tso->gran.exectime    = 0;
2368   tso->gran.fetchtime   = 0;
2369   tso->gran.fetchcount  = 0;
2370   tso->gran.blocktime   = 0;
2371   tso->gran.blockcount  = 0;
2372   tso->gran.blockedat   = 0;
2373   tso->gran.globalsparks = 0;
2374   tso->gran.localsparks  = 0;
2375   if (RtsFlags.GranFlags.Light)
2376     tso->gran.clock  = Now; /* local clock */
2377   else
2378     tso->gran.clock  = 0;
2379
2380   IF_DEBUG(gran,printTSO(tso));
2381 #elif defined(PARALLEL_HASKELL)
2382 # if defined(DEBUG)
2383   tso->par.magic = TSO_MAGIC; // debugging only
2384 # endif
2385   tso->par.sparkname   = 0;
2386   tso->par.startedat   = CURRENT_TIME; 
2387   tso->par.exported    = 0;
2388   tso->par.basicblocks = 0;
2389   tso->par.allocs      = 0;
2390   tso->par.exectime    = 0;
2391   tso->par.fetchtime   = 0;
2392   tso->par.fetchcount  = 0;
2393   tso->par.blocktime   = 0;
2394   tso->par.blockcount  = 0;
2395   tso->par.blockedat   = 0;
2396   tso->par.globalsparks = 0;
2397   tso->par.localsparks  = 0;
2398 #endif
2399
2400 #if defined(GRAN)
2401   globalGranStats.tot_threads_created++;
2402   globalGranStats.threads_created_on_PE[CurrentProc]++;
2403   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2404   globalGranStats.tot_sq_probes++;
2405 #elif defined(PARALLEL_HASKELL)
2406   // collect parallel global statistics (currently done together with GC stats)
2407   if (RtsFlags.ParFlags.ParStats.Global &&
2408       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2409     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2410     globalParStats.tot_threads_created++;
2411   }
2412 #endif 
2413
2414 #if defined(GRAN)
2415   IF_GRAN_DEBUG(pri,
2416                 sched_belch("==__ schedule: Created TSO %d (%p);",
2417                       CurrentProc, tso, tso->id));
2418 #elif defined(PARALLEL_HASKELL)
2419   IF_PAR_DEBUG(verbose,
2420                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2421                            (long)tso->id, tso, advisory_thread_count));
2422 #else
2423   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2424                                  (long)tso->id, (long)tso->stack_size));
2425 #endif    
2426   return tso;
2427 }
2428
2429 #if defined(PAR)
2430 /* RFP:
2431    all parallel thread creation calls should fall through the following routine.
2432 */
2433 StgTSO *
2434 createThreadFromSpark(rtsSpark spark) 
2435 { StgTSO *tso;
2436   ASSERT(spark != (rtsSpark)NULL);
2437 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2438   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2439   { threadsIgnored++;
2440     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2441           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2442     return END_TSO_QUEUE;
2443   }
2444   else
2445   { threadsCreated++;
2446     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2447     if (tso==END_TSO_QUEUE)     
2448       barf("createSparkThread: Cannot create TSO");
2449 #if defined(DIST)
2450     tso->priority = AdvisoryPriority;
2451 #endif
2452     pushClosure(tso,spark);
2453     addToRunQueue(tso);
2454     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2455   }
2456   return tso;
2457 }
2458 #endif
2459
2460 /*
2461   Turn a spark into a thread.
2462   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2463 */
2464 #if 0
2465 StgTSO *
2466 activateSpark (rtsSpark spark) 
2467 {
2468   StgTSO *tso;
2469
2470   tso = createSparkThread(spark);
2471   if (RtsFlags.ParFlags.ParStats.Full) {   
2472     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2473       IF_PAR_DEBUG(verbose,
2474                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2475                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2476   }
2477   // ToDo: fwd info on local/global spark to thread -- HWL
2478   // tso->gran.exported =  spark->exported;
2479   // tso->gran.locked =   !spark->global;
2480   // tso->gran.sparkname = spark->name;
2481
2482   return tso;
2483 }
2484 #endif
2485
2486 /* ---------------------------------------------------------------------------
2487  * scheduleThread()
2488  *
2489  * scheduleThread puts a thread on the head of the runnable queue.
2490  * This will usually be done immediately after a thread is created.
2491  * The caller of scheduleThread must create the thread using e.g.
2492  * createThread and push an appropriate closure
2493  * on this thread's stack before the scheduler is invoked.
2494  * ------------------------------------------------------------------------ */
2495
2496 static void
2497 scheduleThread_(StgTSO *tso)
2498 {
2499   // The thread goes at the *end* of the run-queue, to avoid possible
2500   // starvation of any threads already on the queue.
2501   APPEND_TO_RUN_QUEUE(tso);
2502   threadRunnable();
2503 }
2504
2505 void
2506 scheduleThread(StgTSO* tso)
2507 {
2508   ACQUIRE_LOCK(&sched_mutex);
2509   scheduleThread_(tso);
2510   RELEASE_LOCK(&sched_mutex);
2511 }
2512
2513 #if defined(RTS_SUPPORTS_THREADS)
2514 static Condition bound_cond_cache;
2515 static int bound_cond_cache_full = 0;
2516 #endif
2517
2518
2519 SchedulerStatus
2520 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2521                    Capability *initialCapability)
2522 {
2523     // Precondition: sched_mutex must be held
2524     StgMainThread *m;
2525
2526     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2527     m->tso = tso;
2528     tso->main = m;
2529     m->ret = ret;
2530     m->stat = NoStatus;
2531     m->link = main_threads;
2532     m->prev = NULL;
2533     if (main_threads != NULL) {
2534         main_threads->prev = m;
2535     }
2536     main_threads = m;
2537
2538 #if defined(RTS_SUPPORTS_THREADS)
2539     // Allocating a new condition for each thread is expensive, so we
2540     // cache one.  This is a pretty feeble hack, but it helps speed up
2541     // consecutive call-ins quite a bit.
2542     if (bound_cond_cache_full) {
2543         m->bound_thread_cond = bound_cond_cache;
2544         bound_cond_cache_full = 0;
2545     } else {
2546         initCondition(&m->bound_thread_cond);
2547     }
2548 #endif
2549
2550     /* Put the thread on the main-threads list prior to scheduling the TSO.
2551        Failure to do so introduces a race condition in the MT case (as
2552        identified by Wolfgang Thaller), whereby the new task/OS thread 
2553        created by scheduleThread_() would complete prior to the thread
2554        that spawned it managed to put 'itself' on the main-threads list.
2555        The upshot of it all being that the worker thread wouldn't get to
2556        signal the completion of the its work item for the main thread to
2557        see (==> it got stuck waiting.)    -- sof 6/02.
2558     */
2559     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2560     
2561     APPEND_TO_RUN_QUEUE(tso);
2562     // NB. Don't call threadRunnable() here, because the thread is
2563     // bound and only runnable by *this* OS thread, so waking up other
2564     // workers will just slow things down.
2565
2566     return waitThread_(m, initialCapability);
2567 }
2568
2569 /* ---------------------------------------------------------------------------
2570  * initScheduler()
2571  *
2572  * Initialise the scheduler.  This resets all the queues - if the
2573  * queues contained any threads, they'll be garbage collected at the
2574  * next pass.
2575  *
2576  * ------------------------------------------------------------------------ */
2577
2578 void 
2579 initScheduler(void)
2580 {
2581 #if defined(GRAN)
2582   nat i;
2583
2584   for (i=0; i<=MAX_PROC; i++) {
2585     run_queue_hds[i]      = END_TSO_QUEUE;
2586     run_queue_tls[i]      = END_TSO_QUEUE;
2587     blocked_queue_hds[i]  = END_TSO_QUEUE;
2588     blocked_queue_tls[i]  = END_TSO_QUEUE;
2589     ccalling_threadss[i]  = END_TSO_QUEUE;
2590     blackhole_queue[i]    = END_TSO_QUEUE;
2591     sleeping_queue        = END_TSO_QUEUE;
2592   }
2593 #else
2594   run_queue_hd      = END_TSO_QUEUE;
2595   run_queue_tl      = END_TSO_QUEUE;
2596   blocked_queue_hd  = END_TSO_QUEUE;
2597   blocked_queue_tl  = END_TSO_QUEUE;
2598   blackhole_queue   = END_TSO_QUEUE;
2599   sleeping_queue    = END_TSO_QUEUE;
2600 #endif 
2601
2602   suspended_ccalling_threads  = END_TSO_QUEUE;
2603
2604   main_threads = NULL;
2605   all_threads  = END_TSO_QUEUE;
2606
2607   context_switch = 0;
2608   interrupted    = 0;
2609
2610   RtsFlags.ConcFlags.ctxtSwitchTicks =
2611       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2612       
2613 #if defined(RTS_SUPPORTS_THREADS)
2614   /* Initialise the mutex and condition variables used by
2615    * the scheduler. */
2616   initMutex(&sched_mutex);
2617   initMutex(&term_mutex);
2618 #endif
2619   
2620   ACQUIRE_LOCK(&sched_mutex);
2621
2622   /* A capability holds the state a native thread needs in
2623    * order to execute STG code. At least one capability is
2624    * floating around (only SMP builds have more than one).
2625    */
2626   initCapabilities();
2627   
2628 #if defined(RTS_SUPPORTS_THREADS)
2629   initTaskManager();
2630 #endif
2631
2632 #if defined(SMP)
2633   /* eagerly start some extra workers */
2634   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2635 #endif
2636
2637 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2638   initSparkPools();
2639 #endif
2640
2641   RELEASE_LOCK(&sched_mutex);
2642 }
2643
2644 void
2645 exitScheduler( void )
2646 {
2647     interrupted = rtsTrue;
2648     shutting_down_scheduler = rtsTrue;
2649 #if defined(RTS_SUPPORTS_THREADS)
2650     if (threadIsTask(osThreadId())) { taskStop(); }
2651     stopTaskManager();
2652 #endif
2653 }
2654
2655 /* ----------------------------------------------------------------------------
2656    Managing the per-task allocation areas.
2657    
2658    Each capability comes with an allocation area.  These are
2659    fixed-length block lists into which allocation can be done.
2660
2661    ToDo: no support for two-space collection at the moment???
2662    ------------------------------------------------------------------------- */
2663
2664 static SchedulerStatus
2665 waitThread_(StgMainThread* m, Capability *initialCapability)
2666 {
2667   SchedulerStatus stat;
2668
2669   // Precondition: sched_mutex must be held.
2670   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2671
2672 #if defined(GRAN)
2673   /* GranSim specific init */
2674   CurrentTSO = m->tso;                // the TSO to run
2675   procStatus[MainProc] = Busy;        // status of main PE
2676   CurrentProc = MainProc;             // PE to run it on
2677   schedule(m,initialCapability);
2678 #else
2679   schedule(m,initialCapability);
2680   ASSERT(m->stat != NoStatus);
2681 #endif
2682
2683   stat = m->stat;
2684
2685 #if defined(RTS_SUPPORTS_THREADS)
2686   // Free the condition variable, returning it to the cache if possible.
2687   if (!bound_cond_cache_full) {
2688       bound_cond_cache = m->bound_thread_cond;
2689       bound_cond_cache_full = 1;
2690   } else {
2691       closeCondition(&m->bound_thread_cond);
2692   }
2693 #endif
2694
2695   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2696   stgFree(m);
2697
2698   // Postcondition: sched_mutex still held
2699   return stat;
2700 }
2701
2702 /* ---------------------------------------------------------------------------
2703    Where are the roots that we know about?
2704
2705         - all the threads on the runnable queue
2706         - all the threads on the blocked queue
2707         - all the threads on the sleeping queue
2708         - all the thread currently executing a _ccall_GC
2709         - all the "main threads"
2710      
2711    ------------------------------------------------------------------------ */
2712
2713 /* This has to be protected either by the scheduler monitor, or by the
2714         garbage collection monitor (probably the latter).
2715         KH @ 25/10/99
2716 */
2717
2718 void
2719 GetRoots( evac_fn evac )
2720 {
2721 #if defined(GRAN)
2722   {
2723     nat i;
2724     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2725       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2726           evac((StgClosure **)&run_queue_hds[i]);
2727       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2728           evac((StgClosure **)&run_queue_tls[i]);
2729       
2730       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2731           evac((StgClosure **)&blocked_queue_hds[i]);
2732       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2733           evac((StgClosure **)&blocked_queue_tls[i]);
2734       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2735           evac((StgClosure **)&ccalling_threads[i]);
2736     }
2737   }
2738
2739   markEventQueue();
2740
2741 #else /* !GRAN */
2742   if (run_queue_hd != END_TSO_QUEUE) {
2743       ASSERT(run_queue_tl != END_TSO_QUEUE);
2744       evac((StgClosure **)&run_queue_hd);
2745       evac((StgClosure **)&run_queue_tl);
2746   }
2747   
2748   if (blocked_queue_hd != END_TSO_QUEUE) {
2749       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2750       evac((StgClosure **)&blocked_queue_hd);
2751       evac((StgClosure **)&blocked_queue_tl);
2752   }
2753   
2754   if (sleeping_queue != END_TSO_QUEUE) {
2755       evac((StgClosure **)&sleeping_queue);
2756   }
2757 #endif 
2758
2759   if (blackhole_queue != END_TSO_QUEUE) {
2760       evac((StgClosure **)&blackhole_queue);
2761   }
2762
2763   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2764       evac((StgClosure **)&suspended_ccalling_threads);
2765   }
2766
2767 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2768   markSparkQueue(evac);
2769 #endif
2770
2771 #if defined(RTS_USER_SIGNALS)
2772   // mark the signal handlers (signals should be already blocked)
2773   markSignalHandlers(evac);
2774 #endif
2775 }
2776
2777 /* -----------------------------------------------------------------------------
2778    performGC
2779
2780    This is the interface to the garbage collector from Haskell land.
2781    We provide this so that external C code can allocate and garbage
2782    collect when called from Haskell via _ccall_GC.
2783
2784    It might be useful to provide an interface whereby the programmer
2785    can specify more roots (ToDo).
2786    
2787    This needs to be protected by the GC condition variable above.  KH.
2788    -------------------------------------------------------------------------- */
2789
2790 static void (*extra_roots)(evac_fn);
2791
2792 void
2793 performGC(void)
2794 {
2795   /* Obligated to hold this lock upon entry */
2796   ACQUIRE_LOCK(&sched_mutex);
2797   GarbageCollect(GetRoots,rtsFalse);
2798   RELEASE_LOCK(&sched_mutex);
2799 }
2800
2801 void
2802 performMajorGC(void)
2803 {
2804   ACQUIRE_LOCK(&sched_mutex);
2805   GarbageCollect(GetRoots,rtsTrue);
2806   RELEASE_LOCK(&sched_mutex);
2807 }
2808
2809 static void
2810 AllRoots(evac_fn evac)
2811 {
2812     GetRoots(evac);             // the scheduler's roots
2813     extra_roots(evac);          // the user's roots
2814 }
2815
2816 void
2817 performGCWithRoots(void (*get_roots)(evac_fn))
2818 {
2819   ACQUIRE_LOCK(&sched_mutex);
2820   extra_roots = get_roots;
2821   GarbageCollect(AllRoots,rtsFalse);
2822   RELEASE_LOCK(&sched_mutex);
2823 }
2824
2825 /* -----------------------------------------------------------------------------
2826    Stack overflow
2827
2828    If the thread has reached its maximum stack size, then raise the
2829    StackOverflow exception in the offending thread.  Otherwise
2830    relocate the TSO into a larger chunk of memory and adjust its stack
2831    size appropriately.
2832    -------------------------------------------------------------------------- */
2833
2834 static StgTSO *
2835 threadStackOverflow(StgTSO *tso)
2836 {
2837   nat new_stack_size, stack_words;
2838   lnat new_tso_size;
2839   StgPtr new_sp;
2840   StgTSO *dest;
2841
2842   IF_DEBUG(sanity,checkTSO(tso));
2843   if (tso->stack_size >= tso->max_stack_size) {
2844
2845     IF_DEBUG(gc,
2846              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2847                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2848              /* If we're debugging, just print out the top of the stack */
2849              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2850                                               tso->sp+64)));
2851
2852     /* Send this thread the StackOverflow exception */
2853     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2854     return tso;
2855   }
2856
2857   /* Try to double the current stack size.  If that takes us over the
2858    * maximum stack size for this thread, then use the maximum instead.
2859    * Finally round up so the TSO ends up as a whole number of blocks.
2860    */
2861   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2862   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2863                                        TSO_STRUCT_SIZE)/sizeof(W_);
2864   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2865   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2866
2867   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2868
2869   dest = (StgTSO *)allocate(new_tso_size);
2870   TICK_ALLOC_TSO(new_stack_size,0);
2871
2872   /* copy the TSO block and the old stack into the new area */
2873   memcpy(dest,tso,TSO_STRUCT_SIZE);
2874   stack_words = tso->stack + tso->stack_size - tso->sp;
2875   new_sp = (P_)dest + new_tso_size - stack_words;
2876   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2877
2878   /* relocate the stack pointers... */
2879   dest->sp         = new_sp;
2880   dest->stack_size = new_stack_size;
2881         
2882   /* Mark the old TSO as relocated.  We have to check for relocated
2883    * TSOs in the garbage collector and any primops that deal with TSOs.
2884    *
2885    * It's important to set the sp value to just beyond the end
2886    * of the stack, so we don't attempt to scavenge any part of the
2887    * dead TSO's stack.
2888    */
2889   tso->what_next = ThreadRelocated;
2890   tso->link = dest;
2891   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2892   tso->why_blocked = NotBlocked;
2893
2894   IF_PAR_DEBUG(verbose,
2895                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2896                      tso->id, tso, tso->stack_size);
2897                /* If we're debugging, just print out the top of the stack */
2898                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2899                                                 tso->sp+64)));
2900   
2901   IF_DEBUG(sanity,checkTSO(tso));
2902 #if 0
2903   IF_DEBUG(scheduler,printTSO(dest));
2904 #endif
2905
2906   return dest;
2907 }
2908
2909 /* ---------------------------------------------------------------------------
2910    Wake up a queue that was blocked on some resource.
2911    ------------------------------------------------------------------------ */
2912
2913 #if defined(GRAN)
2914 STATIC_INLINE void
2915 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2916 {
2917 }
2918 #elif defined(PARALLEL_HASKELL)
2919 STATIC_INLINE void
2920 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2921 {
2922   /* write RESUME events to log file and
2923      update blocked and fetch time (depending on type of the orig closure) */
2924   if (RtsFlags.ParFlags.ParStats.Full) {
2925     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2926                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2927                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2928     if (EMPTY_RUN_QUEUE())
2929       emitSchedule = rtsTrue;
2930
2931     switch (get_itbl(node)->type) {
2932         case FETCH_ME_BQ:
2933           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2934           break;
2935         case RBH:
2936         case FETCH_ME:
2937         case BLACKHOLE_BQ:
2938           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2939           break;
2940 #ifdef DIST
2941         case MVAR:
2942           break;
2943 #endif    
2944         default:
2945           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2946         }
2947       }
2948 }
2949 #endif
2950
2951 #if defined(GRAN)
2952 static StgBlockingQueueElement *
2953 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2954 {
2955     StgTSO *tso;
2956     PEs node_loc, tso_loc;
2957
2958     node_loc = where_is(node); // should be lifted out of loop
2959     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2960     tso_loc = where_is((StgClosure *)tso);
2961     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2962       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2963       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2964       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2965       // insertThread(tso, node_loc);
2966       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2967                 ResumeThread,
2968                 tso, node, (rtsSpark*)NULL);
2969       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2970       // len_local++;
2971       // len++;
2972     } else { // TSO is remote (actually should be FMBQ)
2973       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2974                                   RtsFlags.GranFlags.Costs.gunblocktime +
2975                                   RtsFlags.GranFlags.Costs.latency;
2976       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2977                 UnblockThread,
2978                 tso, node, (rtsSpark*)NULL);
2979       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2980       // len++;
2981     }
2982     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2983     IF_GRAN_DEBUG(bq,
2984                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2985                           (node_loc==tso_loc ? "Local" : "Global"), 
2986                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2987     tso->block_info.closure = NULL;
2988     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2989                              tso->id, tso));
2990 }
2991 #elif defined(PARALLEL_HASKELL)
2992 static StgBlockingQueueElement *
2993 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2994 {
2995     StgBlockingQueueElement *next;
2996
2997     switch (get_itbl(bqe)->type) {
2998     case TSO:
2999       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3000       /* if it's a TSO just push it onto the run_queue */
3001       next = bqe->link;
3002       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3003       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3004       threadRunnable();
3005       unblockCount(bqe, node);
3006       /* reset blocking status after dumping event */
3007       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3008       break;
3009
3010     case BLOCKED_FETCH:
3011       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3012       next = bqe->link;
3013       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3014       PendingFetches = (StgBlockedFetch *)bqe;
3015       break;
3016
3017 # if defined(DEBUG)
3018       /* can ignore this case in a non-debugging setup; 
3019          see comments on RBHSave closures above */
3020     case CONSTR:
3021       /* check that the closure is an RBHSave closure */
3022       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3023              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3024              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3025       break;
3026
3027     default:
3028       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3029            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3030            (StgClosure *)bqe);
3031 # endif
3032     }
3033   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3034   return next;
3035 }
3036
3037 #else /* !GRAN && !PARALLEL_HASKELL */
3038 static StgTSO *
3039 unblockOneLocked(StgTSO *tso)
3040 {
3041   StgTSO *next;
3042
3043   ASSERT(get_itbl(tso)->type == TSO);
3044   ASSERT(tso->why_blocked != NotBlocked);
3045   tso->why_blocked = NotBlocked;
3046   next = tso->link;
3047   tso->link = END_TSO_QUEUE;
3048   APPEND_TO_RUN_QUEUE(tso);
3049   threadRunnable();
3050   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3051   return next;
3052 }
3053 #endif
3054
3055 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3056 INLINE_ME StgBlockingQueueElement *
3057 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3058 {
3059   ACQUIRE_LOCK(&sched_mutex);
3060   bqe = unblockOneLocked(bqe, node);
3061   RELEASE_LOCK(&sched_mutex);
3062   return bqe;
3063 }
3064 #else
3065 INLINE_ME StgTSO *
3066 unblockOne(StgTSO *tso)
3067 {
3068   ACQUIRE_LOCK(&sched_mutex);
3069   tso = unblockOneLocked(tso);
3070   RELEASE_LOCK(&sched_mutex);
3071   return tso;
3072 }
3073 #endif
3074
3075 #if defined(GRAN)
3076 void 
3077 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3078 {
3079   StgBlockingQueueElement *bqe;
3080   PEs node_loc;
3081   nat len = 0; 
3082
3083   IF_GRAN_DEBUG(bq, 
3084                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3085                       node, CurrentProc, CurrentTime[CurrentProc], 
3086                       CurrentTSO->id, CurrentTSO));
3087
3088   node_loc = where_is(node);
3089
3090   ASSERT(q == END_BQ_QUEUE ||
3091          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3092          get_itbl(q)->type == CONSTR); // closure (type constructor)
3093   ASSERT(is_unique(node));
3094
3095   /* FAKE FETCH: magically copy the node to the tso's proc;
3096      no Fetch necessary because in reality the node should not have been 
3097      moved to the other PE in the first place
3098   */
3099   if (CurrentProc!=node_loc) {
3100     IF_GRAN_DEBUG(bq, 
3101                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3102                         node, node_loc, CurrentProc, CurrentTSO->id, 
3103                         // CurrentTSO, where_is(CurrentTSO),
3104                         node->header.gran.procs));
3105     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3106     IF_GRAN_DEBUG(bq, 
3107                   debugBelch("## new bitmask of node %p is %#x\n",
3108                         node, node->header.gran.procs));
3109     if (RtsFlags.GranFlags.GranSimStats.Global) {
3110       globalGranStats.tot_fake_fetches++;
3111     }
3112   }
3113
3114   bqe = q;
3115   // ToDo: check: ASSERT(CurrentProc==node_loc);
3116   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3117     //next = bqe->link;
3118     /* 
3119        bqe points to the current element in the queue
3120        next points to the next element in the queue
3121     */
3122     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3123     //tso_loc = where_is(tso);
3124     len++;
3125     bqe = unblockOneLocked(bqe, node);
3126   }
3127
3128   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3129      the closure to make room for the anchor of the BQ */
3130   if (bqe!=END_BQ_QUEUE) {
3131     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3132     /*
3133     ASSERT((info_ptr==&RBH_Save_0_info) ||
3134            (info_ptr==&RBH_Save_1_info) ||
3135            (info_ptr==&RBH_Save_2_info));
3136     */
3137     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3138     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3139     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3140
3141     IF_GRAN_DEBUG(bq,
3142                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3143                         node, info_type(node)));
3144   }
3145
3146   /* statistics gathering */
3147   if (RtsFlags.GranFlags.GranSimStats.Global) {
3148     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3149     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3150     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3151     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3152   }
3153   IF_GRAN_DEBUG(bq,
3154                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3155                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3156 }
3157 #elif defined(PARALLEL_HASKELL)
3158 void 
3159 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3160 {
3161   StgBlockingQueueElement *bqe;
3162
3163   ACQUIRE_LOCK(&sched_mutex);
3164
3165   IF_PAR_DEBUG(verbose, 
3166                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3167                      node, mytid));
3168 #ifdef DIST  
3169   //RFP
3170   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3171     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3172     return;
3173   }
3174 #endif
3175   
3176   ASSERT(q == END_BQ_QUEUE ||
3177          get_itbl(q)->type == TSO ||           
3178          get_itbl(q)->type == BLOCKED_FETCH || 
3179          get_itbl(q)->type == CONSTR); 
3180
3181   bqe = q;
3182   while (get_itbl(bqe)->type==TSO || 
3183          get_itbl(bqe)->type==BLOCKED_FETCH) {
3184     bqe = unblockOneLocked(bqe, node);
3185   }
3186   RELEASE_LOCK(&sched_mutex);
3187 }
3188
3189 #else   /* !GRAN && !PARALLEL_HASKELL */
3190
3191 void
3192 awakenBlockedQueueNoLock(StgTSO *tso)
3193 {
3194   while (tso != END_TSO_QUEUE) {
3195     tso = unblockOneLocked(tso);
3196   }
3197 }
3198
3199 void
3200 awakenBlockedQueue(StgTSO *tso)
3201 {
3202   ACQUIRE_LOCK(&sched_mutex);
3203   while (tso != END_TSO_QUEUE) {
3204     tso = unblockOneLocked(tso);
3205   }
3206   RELEASE_LOCK(&sched_mutex);
3207 }
3208 #endif
3209
3210 /* ---------------------------------------------------------------------------
3211    Interrupt execution
3212    - usually called inside a signal handler so it mustn't do anything fancy.   
3213    ------------------------------------------------------------------------ */
3214
3215 void
3216 interruptStgRts(void)
3217 {
3218     interrupted    = 1;
3219     context_switch = 1;
3220 }
3221
3222 /* -----------------------------------------------------------------------------
3223    Unblock a thread
3224
3225    This is for use when we raise an exception in another thread, which
3226    may be blocked.
3227    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3228    -------------------------------------------------------------------------- */
3229
3230 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3231 /*
3232   NB: only the type of the blocking queue is different in GranSim and GUM
3233       the operations on the queue-elements are the same
3234       long live polymorphism!
3235
3236   Locks: sched_mutex is held upon entry and exit.
3237
3238 */
3239 static void
3240 unblockThread(StgTSO *tso)
3241 {
3242   StgBlockingQueueElement *t, **last;
3243
3244   switch (tso->why_blocked) {
3245
3246   case NotBlocked:
3247     return;  /* not blocked */
3248
3249   case BlockedOnSTM:
3250     // Be careful: nothing to do here!  We tell the scheduler that the thread
3251     // is runnable and we leave it to the stack-walking code to abort the 
3252     // transaction while unwinding the stack.  We should perhaps have a debugging
3253     // test to make sure that this really happens and that the 'zombie' transaction
3254     // does not get committed.
3255     goto done;
3256
3257   case BlockedOnMVar:
3258     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3259     {
3260       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3261       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3262
3263       last = (StgBlockingQueueElement **)&mvar->head;
3264       for (t = (StgBlockingQueueElement *)mvar->head; 
3265            t != END_BQ_QUEUE; 
3266            last = &t->link, last_tso = t, t = t->link) {
3267         if (t == (StgBlockingQueueElement *)tso) {
3268           *last = (StgBlockingQueueElement *)tso->link;
3269           if (mvar->tail == tso) {
3270             mvar->tail = (StgTSO *)last_tso;
3271           }
3272           goto done;
3273         }
3274       }
3275       barf("unblockThread (MVAR): TSO not found");
3276     }
3277
3278   case BlockedOnBlackHole:
3279     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3280     {
3281       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3282
3283       last = &bq->blocking_queue;
3284       for (t = bq->blocking_queue; 
3285            t != END_BQ_QUEUE; 
3286            last = &t->link, t = t->link) {
3287         if (t == (StgBlockingQueueElement *)tso) {
3288           *last = (StgBlockingQueueElement *)tso->link;
3289           goto done;
3290         }
3291       }
3292       barf("unblockThread (BLACKHOLE): TSO not found");
3293     }
3294
3295   case BlockedOnException:
3296     {
3297       StgTSO *target  = tso->block_info.tso;
3298
3299       ASSERT(get_itbl(target)->type == TSO);
3300
3301       if (target->what_next == ThreadRelocated) {
3302           target = target->link;
3303           ASSERT(get_itbl(target)->type == TSO);
3304       }
3305
3306       ASSERT(target->blocked_exceptions != NULL);
3307
3308       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3309       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3310            t != END_BQ_QUEUE; 
3311            last = &t->link, t = t->link) {
3312         ASSERT(get_itbl(t)->type == TSO);
3313         if (t == (StgBlockingQueueElement *)tso) {
3314           *last = (StgBlockingQueueElement *)tso->link;
3315           goto done;
3316         }
3317       }
3318       barf("unblockThread (Exception): TSO not found");
3319     }
3320
3321   case BlockedOnRead:
3322   case BlockedOnWrite:
3323 #if defined(mingw32_HOST_OS)
3324   case BlockedOnDoProc:
3325 #endif
3326     {
3327       /* take TSO off blocked_queue */
3328       StgBlockingQueueElement *prev = NULL;
3329       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3330            prev = t, t = t->link) {
3331         if (t == (StgBlockingQueueElement *)tso) {
3332           if (prev == NULL) {
3333             blocked_queue_hd = (StgTSO *)t->link;
3334             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3335               blocked_queue_tl = END_TSO_QUEUE;
3336             }
3337           } else {
3338             prev->link = t->link;
3339             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3340               blocked_queue_tl = (StgTSO *)prev;
3341             }
3342           }
3343           goto done;
3344         }
3345       }
3346       barf("unblockThread (I/O): TSO not found");
3347     }
3348
3349   case BlockedOnDelay:
3350     {
3351       /* take TSO off sleeping_queue */
3352       StgBlockingQueueElement *prev = NULL;
3353       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3354            prev = t, t = t->link) {
3355         if (t == (StgBlockingQueueElement *)tso) {
3356           if (prev == NULL) {
3357             sleeping_queue = (StgTSO *)t->link;
3358           } else {
3359             prev->link = t->link;
3360           }
3361           goto done;
3362         }
3363       }
3364       barf("unblockThread (delay): TSO not found");
3365     }
3366
3367   default:
3368     barf("unblockThread");
3369   }
3370
3371  done:
3372   tso->link = END_TSO_QUEUE;
3373   tso->why_blocked = NotBlocked;
3374   tso->block_info.closure = NULL;
3375   PUSH_ON_RUN_QUEUE(tso);
3376 }
3377 #else
3378 static void
3379 unblockThread(StgTSO *tso)
3380 {
3381   StgTSO *t, **last;
3382   
3383   /* To avoid locking unnecessarily. */
3384   if (tso->why_blocked == NotBlocked) {
3385     return;
3386   }
3387
3388   switch (tso->why_blocked) {
3389
3390   case BlockedOnSTM:
3391     // Be careful: nothing to do here!  We tell the scheduler that the thread
3392     // is runnable and we leave it to the stack-walking code to abort the 
3393     // transaction while unwinding the stack.  We should perhaps have a debugging
3394     // test to make sure that this really happens and that the 'zombie' transaction
3395     // does not get committed.
3396     goto done;
3397
3398   case BlockedOnMVar:
3399     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3400     {
3401       StgTSO *last_tso = END_TSO_QUEUE;
3402       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3403
3404       last = &mvar->head;
3405       for (t = mvar->head; t != END_TSO_QUEUE; 
3406            last = &t->link, last_tso = t, t = t->link) {
3407         if (t == tso) {
3408           *last = tso->link;
3409           if (mvar->tail == tso) {
3410             mvar->tail = last_tso;
3411           }
3412           goto done;
3413         }
3414       }
3415       barf("unblockThread (MVAR): TSO not found");
3416     }
3417
3418   case BlockedOnBlackHole:
3419     {
3420       last = &blackhole_queue;
3421       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3422            last = &t->link, t = t->link) {
3423         if (t == tso) {
3424           *last = tso->link;
3425           goto done;
3426         }
3427       }
3428       barf("unblockThread (BLACKHOLE): TSO not found");
3429     }
3430
3431   case BlockedOnException:
3432     {
3433       StgTSO *target  = tso->block_info.tso;
3434
3435       ASSERT(get_itbl(target)->type == TSO);
3436
3437       while (target->what_next == ThreadRelocated) {
3438           target = target->link;
3439           ASSERT(get_itbl(target)->type == TSO);
3440       }
3441       
3442       ASSERT(target->blocked_exceptions != NULL);
3443
3444       last = &target->blocked_exceptions;
3445       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3446            last = &t->link, t = t->link) {
3447         ASSERT(get_itbl(t)->type == TSO);
3448         if (t == tso) {
3449           *last = tso->link;
3450           goto done;
3451         }
3452       }
3453       barf("unblockThread (Exception): TSO not found");
3454     }
3455
3456   case BlockedOnRead:
3457   case BlockedOnWrite:
3458 #if defined(mingw32_HOST_OS)
3459   case BlockedOnDoProc:
3460 #endif
3461     {
3462       StgTSO *prev = NULL;
3463       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3464            prev = t, t = t->link) {
3465         if (t == tso) {
3466           if (prev == NULL) {
3467             blocked_queue_hd = t->link;
3468             if (blocked_queue_tl == t) {
3469               blocked_queue_tl = END_TSO_QUEUE;
3470             }
3471           } else {
3472             prev->link = t->link;
3473             if (blocked_queue_tl == t) {
3474               blocked_queue_tl = prev;
3475             }
3476           }
3477           goto done;
3478         }
3479       }
3480       barf("unblockThread (I/O): TSO not found");
3481     }
3482
3483   case BlockedOnDelay:
3484     {
3485       StgTSO *prev = NULL;
3486       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3487            prev = t, t = t->link) {
3488         if (t == tso) {
3489           if (prev == NULL) {
3490             sleeping_queue = t->link;
3491           } else {
3492             prev->link = t->link;
3493           }
3494           goto done;
3495         }
3496       }
3497       barf("unblockThread (delay): TSO not found");
3498     }
3499
3500   default:
3501     barf("unblockThread");
3502   }
3503
3504  done:
3505   tso->link = END_TSO_QUEUE;
3506   tso->why_blocked = NotBlocked;
3507   tso->block_info.closure = NULL;
3508   APPEND_TO_RUN_QUEUE(tso);
3509 }
3510 #endif
3511
3512 /* -----------------------------------------------------------------------------
3513  * checkBlackHoles()
3514  *
3515  * Check the blackhole_queue for threads that can be woken up.  We do
3516  * this periodically: before every GC, and whenever the run queue is
3517  * empty.
3518  *
3519  * An elegant solution might be to just wake up all the blocked
3520  * threads with awakenBlockedQueue occasionally: they'll go back to
3521  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3522  * doesn't give us a way to tell whether we've actually managed to
3523  * wake up any threads, so we would be busy-waiting.
3524  *
3525  * -------------------------------------------------------------------------- */
3526
3527 static rtsBool
3528 checkBlackHoles( void )
3529 {
3530     StgTSO **prev, *t;
3531     rtsBool any_woke_up = rtsFalse;
3532     StgHalfWord type;
3533
3534     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3535
3536     // ASSUMES: sched_mutex
3537     prev = &blackhole_queue;
3538     t = blackhole_queue;
3539     while (t != END_TSO_QUEUE) {
3540         ASSERT(t->why_blocked == BlockedOnBlackHole);
3541         type = get_itbl(t->block_info.closure)->type;
3542         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3543             t = unblockOneLocked(t);
3544             *prev = t;
3545             any_woke_up = rtsTrue;
3546         } else {
3547             prev = &t->link;
3548             t = t->link;
3549         }
3550     }
3551
3552     return any_woke_up;
3553 }
3554
3555 /* -----------------------------------------------------------------------------
3556  * raiseAsync()
3557  *
3558  * The following function implements the magic for raising an
3559  * asynchronous exception in an existing thread.
3560  *
3561  * We first remove the thread from any queue on which it might be
3562  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3563  *
3564  * We strip the stack down to the innermost CATCH_FRAME, building
3565  * thunks in the heap for all the active computations, so they can 
3566  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3567  * an application of the handler to the exception, and push it on
3568  * the top of the stack.
3569  * 
3570  * How exactly do we save all the active computations?  We create an
3571  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3572  * AP_STACKs pushes everything from the corresponding update frame
3573  * upwards onto the stack.  (Actually, it pushes everything up to the
3574  * next update frame plus a pointer to the next AP_STACK object.
3575  * Entering the next AP_STACK object pushes more onto the stack until we
3576  * reach the last AP_STACK object - at which point the stack should look
3577  * exactly as it did when we killed the TSO and we can continue
3578  * execution by entering the closure on top of the stack.
3579  *
3580  * We can also kill a thread entirely - this happens if either (a) the 
3581  * exception passed to raiseAsync is NULL, or (b) there's no
3582  * CATCH_FRAME on the stack.  In either case, we strip the entire
3583  * stack and replace the thread with a zombie.
3584  *
3585  * Locks: sched_mutex held upon entry nor exit.
3586  *
3587  * -------------------------------------------------------------------------- */
3588  
3589 void 
3590 deleteThread(StgTSO *tso)
3591 {
3592   if (tso->why_blocked != BlockedOnCCall &&
3593       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3594       raiseAsync(tso,NULL);
3595   }
3596 }
3597
3598 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3599 static void 
3600 deleteThreadImmediately(StgTSO *tso)
3601 { // for forkProcess only:
3602   // delete thread without giving it a chance to catch the KillThread exception
3603
3604   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3605       return;
3606   }
3607
3608   if (tso->why_blocked != BlockedOnCCall &&
3609       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3610     unblockThread(tso);
3611   }
3612
3613   tso->what_next = ThreadKilled;
3614 }
3615 #endif
3616
3617 void
3618 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3619 {
3620   /* When raising async exs from contexts where sched_mutex isn't held;
3621      use raiseAsyncWithLock(). */
3622   ACQUIRE_LOCK(&sched_mutex);
3623   raiseAsync(tso,exception);
3624   RELEASE_LOCK(&sched_mutex);
3625 }
3626
3627 void
3628 raiseAsync(StgTSO *tso, StgClosure *exception)
3629 {
3630     raiseAsync_(tso, exception, rtsFalse);
3631 }
3632
3633 static void
3634 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3635 {
3636     StgRetInfoTable *info;
3637     StgPtr sp;
3638   
3639     // Thread already dead?
3640     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3641         return;
3642     }
3643
3644     IF_DEBUG(scheduler, 
3645              sched_belch("raising exception in thread %ld.", (long)tso->id));
3646     
3647     // Remove it from any blocking queues
3648     unblockThread(tso);
3649
3650     sp = tso->sp;
3651     
3652     // The stack freezing code assumes there's a closure pointer on
3653     // the top of the stack, so we have to arrange that this is the case...
3654     //
3655     if (sp[0] == (W_)&stg_enter_info) {
3656         sp++;
3657     } else {
3658         sp--;
3659         sp[0] = (W_)&stg_dummy_ret_closure;
3660     }
3661
3662     while (1) {
3663         nat i;
3664
3665         // 1. Let the top of the stack be the "current closure"
3666         //
3667         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3668         // CATCH_FRAME.
3669         //
3670         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3671         // current closure applied to the chunk of stack up to (but not
3672         // including) the update frame.  This closure becomes the "current
3673         // closure".  Go back to step 2.
3674         //
3675         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3676         // top of the stack applied to the exception.
3677         // 
3678         // 5. If it's a STOP_FRAME, then kill the thread.
3679         // 
3680         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3681         // transaction
3682        
3683         
3684         StgPtr frame;
3685         
3686         frame = sp + 1;
3687         info = get_ret_itbl((StgClosure *)frame);
3688         
3689         while (info->i.type != UPDATE_FRAME
3690                && (info->i.type != CATCH_FRAME || exception == NULL)
3691                && info->i.type != STOP_FRAME
3692                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3693         {
3694             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3695               // IF we find an ATOMICALLY_FRAME then we abort the
3696               // current transaction and propagate the exception.  In
3697               // this case (unlike ordinary exceptions) we do not care
3698               // whether the transaction is valid or not because its
3699               // possible validity cannot have caused the exception
3700               // and will not be visible after the abort.
3701               IF_DEBUG(stm,
3702                        debugBelch("Found atomically block delivering async exception\n"));
3703               stmAbortTransaction(tso -> trec);
3704               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3705             }
3706             frame += stack_frame_sizeW((StgClosure *)frame);
3707             info = get_ret_itbl((StgClosure *)frame);
3708         }
3709         
3710         switch (info->i.type) {
3711             
3712         case ATOMICALLY_FRAME:
3713             ASSERT(stop_at_atomically);
3714             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3715             stmCondemnTransaction(tso -> trec);
3716 #ifdef REG_R1
3717             tso->sp = frame;
3718 #else
3719             // R1 is not a register: the return convention for IO in
3720             // this case puts the return value on the stack, so we
3721             // need to set up the stack to return to the atomically
3722             // frame properly...
3723             tso->sp = frame - 2;
3724             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3725             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3726 #endif
3727             tso->what_next = ThreadRunGHC;
3728             return;
3729
3730         case CATCH_FRAME:
3731             // If we find a CATCH_FRAME, and we've got an exception to raise,
3732             // then build the THUNK raise(exception), and leave it on
3733             // top of the CATCH_FRAME ready to enter.
3734             //
3735         {
3736 #ifdef PROFILING
3737             StgCatchFrame *cf = (StgCatchFrame *)frame;
3738 #endif
3739             StgClosure *raise;
3740             
3741             // we've got an exception to raise, so let's pass it to the
3742             // handler in this frame.
3743             //
3744             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3745             TICK_ALLOC_SE_THK(1,0);
3746             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3747             raise->payload[0] = exception;
3748             
3749             // throw away the stack from Sp up to the CATCH_FRAME.
3750             //
3751             sp = frame - 1;
3752             
3753             /* Ensure that async excpetions are blocked now, so we don't get
3754              * a surprise exception before we get around to executing the
3755              * handler.
3756              */
3757             if (tso->blocked_exceptions == NULL) {
3758                 tso->blocked_exceptions = END_TSO_QUEUE;
3759             }
3760             
3761             /* Put the newly-built THUNK on top of the stack, ready to execute
3762              * when the thread restarts.
3763              */
3764             sp[0] = (W_)raise;
3765             sp[-1] = (W_)&stg_enter_info;
3766             tso->sp = sp-1;
3767             tso->what_next = ThreadRunGHC;
3768             IF_DEBUG(sanity, checkTSO(tso));
3769             return;
3770         }
3771         
3772         case UPDATE_FRAME:
3773         {
3774             StgAP_STACK * ap;
3775             nat words;
3776             
3777             // First build an AP_STACK consisting of the stack chunk above the
3778             // current update frame, with the top word on the stack as the
3779             // fun field.
3780             //
3781             words = frame - sp - 1;
3782             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3783             
3784             ap->size = words;
3785             ap->fun  = (StgClosure *)sp[0];
3786             sp++;
3787             for(i=0; i < (nat)words; ++i) {
3788                 ap->payload[i] = (StgClosure *)*sp++;
3789             }
3790             
3791             SET_HDR(ap,&stg_AP_STACK_info,
3792                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3793             TICK_ALLOC_UP_THK(words+1,0);
3794             
3795             IF_DEBUG(scheduler,
3796                      debugBelch("sched: Updating ");
3797                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3798                      debugBelch(" with ");
3799                      printObj((StgClosure *)ap);
3800                 );
3801
3802             // Replace the updatee with an indirection - happily
3803             // this will also wake up any threads currently
3804             // waiting on the result.
3805             //
3806             // Warning: if we're in a loop, more than one update frame on
3807             // the stack may point to the same object.  Be careful not to
3808             // overwrite an IND_OLDGEN in this case, because we'll screw
3809             // up the mutable lists.  To be on the safe side, don't
3810             // overwrite any kind of indirection at all.  See also
3811             // threadSqueezeStack in GC.c, where we have to make a similar
3812             // check.
3813             //
3814             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3815                 // revert the black hole
3816                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3817                                (StgClosure *)ap);
3818             }
3819             sp += sizeofW(StgUpdateFrame) - 1;
3820             sp[0] = (W_)ap; // push onto stack
3821             break;
3822         }
3823         
3824         case STOP_FRAME:
3825             // We've stripped the entire stack, the thread is now dead.
3826             sp += sizeofW(StgStopFrame);
3827             tso->what_next = ThreadKilled;
3828             tso->sp = sp;
3829             return;
3830             
3831         default:
3832             barf("raiseAsync");
3833         }
3834     }
3835     barf("raiseAsync");
3836 }
3837
3838 /* -----------------------------------------------------------------------------
3839    raiseExceptionHelper
3840    
3841    This function is called by the raise# primitve, just so that we can
3842    move some of the tricky bits of raising an exception from C-- into
3843    C.  Who knows, it might be a useful re-useable thing here too.
3844    -------------------------------------------------------------------------- */
3845
3846 StgWord
3847 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3848 {
3849     StgClosure *raise_closure = NULL;
3850     StgPtr p, next;
3851     StgRetInfoTable *info;
3852     //
3853     // This closure represents the expression 'raise# E' where E
3854     // is the exception raise.  It is used to overwrite all the
3855     // thunks which are currently under evaluataion.
3856     //
3857
3858     //    
3859     // LDV profiling: stg_raise_info has THUNK as its closure
3860     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3861     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3862     // 1 does not cause any problem unless profiling is performed.
3863     // However, when LDV profiling goes on, we need to linearly scan
3864     // small object pool, where raise_closure is stored, so we should
3865     // use MIN_UPD_SIZE.
3866     //
3867     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3868     //                                 sizeofW(StgClosure)+1);
3869     //
3870
3871     //
3872     // Walk up the stack, looking for the catch frame.  On the way,
3873     // we update any closures pointed to from update frames with the
3874     // raise closure that we just built.
3875     //
3876     p = tso->sp;
3877     while(1) {
3878         info = get_ret_itbl((StgClosure *)p);
3879         next = p + stack_frame_sizeW((StgClosure *)p);
3880         switch (info->i.type) {
3881             
3882         case UPDATE_FRAME:
3883             // Only create raise_closure if we need to.
3884             if (raise_closure == NULL) {
3885                 raise_closure = 
3886                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3887                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3888                 raise_closure->payload[0] = exception;
3889             }
3890             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3891             p = next;
3892             continue;
3893
3894         case ATOMICALLY_FRAME:
3895             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3896             tso->sp = p;
3897             return ATOMICALLY_FRAME;
3898             
3899         case CATCH_FRAME:
3900             tso->sp = p;
3901             return CATCH_FRAME;
3902
3903         case CATCH_STM_FRAME:
3904             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3905             tso->sp = p;
3906             return CATCH_STM_FRAME;
3907             
3908         case STOP_FRAME:
3909             tso->sp = p;
3910             return STOP_FRAME;
3911
3912         case CATCH_RETRY_FRAME:
3913         default:
3914             p = next; 
3915             continue;
3916         }
3917     }
3918 }
3919
3920
3921 /* -----------------------------------------------------------------------------
3922    findRetryFrameHelper
3923
3924    This function is called by the retry# primitive.  It traverses the stack
3925    leaving tso->sp referring to the frame which should handle the retry.  
3926
3927    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3928    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3929
3930    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3931    despite the similar implementation.
3932
3933    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3934    not be created within memory transactions.
3935    -------------------------------------------------------------------------- */
3936
3937 StgWord
3938 findRetryFrameHelper (StgTSO *tso)
3939 {
3940   StgPtr           p, next;
3941   StgRetInfoTable *info;
3942
3943   p = tso -> sp;
3944   while (1) {
3945     info = get_ret_itbl((StgClosure *)p);
3946     next = p + stack_frame_sizeW((StgClosure *)p);
3947     switch (info->i.type) {
3948       
3949     case ATOMICALLY_FRAME:
3950       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3951       tso->sp = p;
3952       return ATOMICALLY_FRAME;
3953       
3954     case CATCH_RETRY_FRAME:
3955       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3956       tso->sp = p;
3957       return CATCH_RETRY_FRAME;
3958       
3959     case CATCH_STM_FRAME:
3960     default:
3961       ASSERT(info->i.type != CATCH_FRAME);
3962       ASSERT(info->i.type != STOP_FRAME);
3963       p = next; 
3964       continue;
3965     }
3966   }
3967 }
3968
3969 /* -----------------------------------------------------------------------------
3970    resurrectThreads is called after garbage collection on the list of
3971    threads found to be garbage.  Each of these threads will be woken
3972    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3973    on an MVar, or NonTermination if the thread was blocked on a Black
3974    Hole.
3975
3976    Locks: sched_mutex isn't held upon entry nor exit.
3977    -------------------------------------------------------------------------- */
3978
3979 void
3980 resurrectThreads( StgTSO *threads )
3981 {
3982   StgTSO *tso, *next;
3983
3984   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3985     next = tso->global_link;
3986     tso->global_link = all_threads;
3987     all_threads = tso;
3988     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3989
3990     switch (tso->why_blocked) {
3991     case BlockedOnMVar:
3992     case BlockedOnException:
3993       /* Called by GC - sched_mutex lock is currently held. */
3994       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3995       break;
3996     case BlockedOnBlackHole:
3997       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3998       break;
3999     case BlockedOnSTM:
4000       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
4001       break;
4002     case NotBlocked:
4003       /* This might happen if the thread was blocked on a black hole
4004        * belonging to a thread that we've just woken up (raiseAsync
4005        * can wake up threads, remember...).
4006        */
4007       continue;
4008     default:
4009       barf("resurrectThreads: thread blocked in a strange way");
4010     }
4011   }
4012 }
4013
4014 /* ----------------------------------------------------------------------------
4015  * Debugging: why is a thread blocked
4016  * [Also provides useful information when debugging threaded programs
4017  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4018    ------------------------------------------------------------------------- */
4019
4020 static void
4021 printThreadBlockage(StgTSO *tso)
4022 {
4023   switch (tso->why_blocked) {
4024   case BlockedOnRead:
4025     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4026     break;
4027   case BlockedOnWrite:
4028     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4029     break;
4030 #if defined(mingw32_HOST_OS)
4031     case BlockedOnDoProc:
4032     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4033     break;
4034 #endif
4035   case BlockedOnDelay:
4036     debugBelch("is blocked until %ld", tso->block_info.target);
4037     break;
4038   case BlockedOnMVar:
4039     debugBelch("is blocked on an MVar");
4040     break;
4041   case BlockedOnException:
4042     debugBelch("is blocked on delivering an exception to thread %d",
4043             tso->block_info.tso->id);
4044     break;
4045   case BlockedOnBlackHole:
4046     debugBelch("is blocked on a black hole");
4047     break;
4048   case NotBlocked:
4049     debugBelch("is not blocked");
4050     break;
4051 #if defined(PARALLEL_HASKELL)
4052   case BlockedOnGA:
4053     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4054             tso->block_info.closure, info_type(tso->block_info.closure));
4055     break;
4056   case BlockedOnGA_NoSend:
4057     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4058             tso->block_info.closure, info_type(tso->block_info.closure));
4059     break;
4060 #endif
4061   case BlockedOnCCall:
4062     debugBelch("is blocked on an external call");
4063     break;
4064   case BlockedOnCCall_NoUnblockExc:
4065     debugBelch("is blocked on an external call (exceptions were already blocked)");
4066     break;
4067   case BlockedOnSTM:
4068     debugBelch("is blocked on an STM operation");
4069     break;
4070   default:
4071     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4072          tso->why_blocked, tso->id, tso);
4073   }
4074 }
4075
4076 static void
4077 printThreadStatus(StgTSO *tso)
4078 {
4079   switch (tso->what_next) {
4080   case ThreadKilled:
4081     debugBelch("has been killed");
4082     break;
4083   case ThreadComplete:
4084     debugBelch("has completed");
4085     break;
4086   default:
4087     printThreadBlockage(tso);
4088   }
4089 }
4090
4091 void
4092 printAllThreads(void)
4093 {
4094   StgTSO *t;
4095
4096 # if defined(GRAN)
4097   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4098   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4099                        time_string, rtsFalse/*no commas!*/);
4100
4101   debugBelch("all threads at [%s]:\n", time_string);
4102 # elif defined(PARALLEL_HASKELL)
4103   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4104   ullong_format_string(CURRENT_TIME,
4105                        time_string, rtsFalse/*no commas!*/);
4106
4107   debugBelch("all threads at [%s]:\n", time_string);
4108 # else
4109   debugBelch("all threads:\n");
4110 # endif
4111
4112   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4113     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4114 #if defined(DEBUG)
4115     {
4116       void *label = lookupThreadLabel(t->id);
4117       if (label) debugBelch("[\"%s\"] ",(char *)label);
4118     }
4119 #endif
4120     printThreadStatus(t);
4121     debugBelch("\n");
4122   }
4123 }
4124     
4125 #ifdef DEBUG
4126
4127 /* 
4128    Print a whole blocking queue attached to node (debugging only).
4129 */
4130 # if defined(PARALLEL_HASKELL)
4131 void 
4132 print_bq (StgClosure *node)
4133 {
4134   StgBlockingQueueElement *bqe;
4135   StgTSO *tso;
4136   rtsBool end;
4137
4138   debugBelch("## BQ of closure %p (%s): ",
4139           node, info_type(node));
4140
4141   /* should cover all closures that may have a blocking queue */
4142   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4143          get_itbl(node)->type == FETCH_ME_BQ ||
4144          get_itbl(node)->type == RBH ||
4145          get_itbl(node)->type == MVAR);
4146     
4147   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4148
4149   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4150 }
4151
4152 /* 
4153    Print a whole blocking queue starting with the element bqe.
4154 */
4155 void 
4156 print_bqe (StgBlockingQueueElement *bqe)
4157 {
4158   rtsBool end;
4159
4160   /* 
4161      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4162   */
4163   for (end = (bqe==END_BQ_QUEUE);
4164        !end; // iterate until bqe points to a CONSTR
4165        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4166        bqe = end ? END_BQ_QUEUE : bqe->link) {
4167     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4168     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4169     /* types of closures that may appear in a blocking queue */
4170     ASSERT(get_itbl(bqe)->type == TSO ||           
4171            get_itbl(bqe)->type == BLOCKED_FETCH || 
4172            get_itbl(bqe)->type == CONSTR); 
4173     /* only BQs of an RBH end with an RBH_Save closure */
4174     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4175
4176     switch (get_itbl(bqe)->type) {
4177     case TSO:
4178       debugBelch(" TSO %u (%x),",
4179               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4180       break;
4181     case BLOCKED_FETCH:
4182       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4183               ((StgBlockedFetch *)bqe)->node, 
4184               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4185               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4186               ((StgBlockedFetch *)bqe)->ga.weight);
4187       break;
4188     case CONSTR:
4189       debugBelch(" %s (IP %p),",
4190               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4191                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4192                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4193                "RBH_Save_?"), get_itbl(bqe));
4194       break;
4195     default:
4196       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4197            info_type((StgClosure *)bqe)); // , node, info_type(node));
4198       break;
4199     }
4200   } /* for */
4201   debugBelch("\n");
4202 }
4203 # elif defined(GRAN)
4204 void 
4205 print_bq (StgClosure *node)
4206 {
4207   StgBlockingQueueElement *bqe;
4208   PEs node_loc, tso_loc;
4209   rtsBool end;
4210
4211   /* should cover all closures that may have a blocking queue */
4212   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4213          get_itbl(node)->type == FETCH_ME_BQ ||
4214          get_itbl(node)->type == RBH);
4215     
4216   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4217   node_loc = where_is(node);
4218
4219   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4220           node, info_type(node), node_loc);
4221
4222   /* 
4223      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4224   */
4225   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4226        !end; // iterate until bqe points to a CONSTR
4227        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4228     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4229     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4230     /* types of closures that may appear in a blocking queue */
4231     ASSERT(get_itbl(bqe)->type == TSO ||           
4232            get_itbl(bqe)->type == CONSTR); 
4233     /* only BQs of an RBH end with an RBH_Save closure */
4234     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4235
4236     tso_loc = where_is((StgClosure *)bqe);
4237     switch (get_itbl(bqe)->type) {
4238     case TSO:
4239       debugBelch(" TSO %d (%p) on [PE %d],",
4240               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4241       break;
4242     case CONSTR:
4243       debugBelch(" %s (IP %p),",
4244               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4245                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4246                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4247                "RBH_Save_?"), get_itbl(bqe));
4248       break;
4249     default:
4250       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4251            info_type((StgClosure *)bqe), node, info_type(node));
4252       break;
4253     }
4254   } /* for */
4255   debugBelch("\n");
4256 }
4257 # endif
4258
4259 #if defined(PARALLEL_HASKELL)
4260 static nat
4261 run_queue_len(void)
4262 {
4263   nat i;
4264   StgTSO *tso;
4265
4266   for (i=0, tso=run_queue_hd; 
4267        tso != END_TSO_QUEUE;
4268        i++, tso=tso->link)
4269     /* nothing */
4270
4271   return i;
4272 }
4273 #endif
4274
4275 void
4276 sched_belch(char *s, ...)
4277 {
4278   va_list ap;
4279   va_start(ap,s);
4280 #ifdef RTS_SUPPORTS_THREADS
4281   debugBelch("sched (task %p): ", osThreadId());
4282 #elif defined(PARALLEL_HASKELL)
4283   debugBelch("== ");
4284 #else
4285   debugBelch("sched: ");
4286 #endif
4287   vdebugBelch(s, ap);
4288   debugBelch("\n");
4289   va_end(ap);
4290 }
4291
4292 #endif /* DEBUG */