[project @ 2005-04-06 15:27:06 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* if this flag is set as well, give up execution */
178 rtsBool interrupted = rtsFalse;
179
180 /* If this flag is set, we are running Haskell code.  Used to detect
181  * uses of 'foreign import unsafe' that should be 'safe'.
182  */
183 static rtsBool in_haskell = rtsFalse;
184
185 /* Next thread ID to allocate.
186  * Locks required: thread_id_mutex
187  */
188 static StgThreadID next_thread_id = 1;
189
190 /*
191  * Pointers to the state of the current thread.
192  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
194  */
195  
196 /* The smallest stack size that makes any sense is:
197  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
198  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
199  *  + 1                       (the closure to enter)
200  *  + 1                       (stg_ap_v_ret)
201  *  + 1                       (spare slot req'd by stg_ap_v_ret)
202  *
203  * A thread with this stack will bomb immediately with a stack
204  * overflow, which will increase its stack size.  
205  */
206
207 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
208
209
210 #if defined(GRAN)
211 StgTSO *CurrentTSO;
212 #endif
213
214 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
215  *  exists - earlier gccs apparently didn't.
216  *  -= chak
217  */
218 StgTSO dummy_tso;
219
220 # if defined(SMP)
221 static Condition gc_pending_cond = INIT_COND_VAR;
222 # endif
223
224 static rtsBool ready_to_gc;
225
226 /*
227  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
228  * in an MT setting, needed to signal that a worker thread shouldn't hang around
229  * in the scheduler when it is out of work.
230  */
231 static rtsBool shutting_down_scheduler = rtsFalse;
232
233 #if defined(RTS_SUPPORTS_THREADS)
234 /* ToDo: carefully document the invariants that go together
235  *       with these synchronisation objects.
236  */
237 Mutex     sched_mutex       = INIT_MUTEX_VAR;
238 Mutex     term_mutex        = INIT_MUTEX_VAR;
239
240 #endif /* RTS_SUPPORTS_THREADS */
241
242 #if defined(PARALLEL_HASKELL)
243 StgTSO *LastTSO;
244 rtsTime TimeOfLastYield;
245 rtsBool emitSchedule = rtsTrue;
246 #endif
247
248 #if DEBUG
249 static char *whatNext_strs[] = {
250   "(unknown)",
251   "ThreadRunGHC",
252   "ThreadInterpret",
253   "ThreadKilled",
254   "ThreadRelocated",
255   "ThreadComplete"
256 };
257 #endif
258
259 /* -----------------------------------------------------------------------------
260  * static function prototypes
261  * -------------------------------------------------------------------------- */
262
263 #if defined(RTS_SUPPORTS_THREADS)
264 static void taskStart(void);
265 #endif
266
267 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
268                       Capability *initialCapability );
269
270 //
271 // These function all encapsulate parts of the scheduler loop, and are
272 // abstracted only to make the structure and control flow of the
273 // scheduler clearer.
274 //
275 static void schedulePreLoop(void);
276 static void scheduleStartSignalHandlers(void);
277 static void scheduleCheckBlockedThreads(void);
278 static void scheduleCheckBlackHoles(void);
279 static void scheduleDetectDeadlock(void);
280 #if defined(GRAN)
281 static StgTSO *scheduleProcessEvent(rtsEvent *event);
282 #endif
283 #if defined(PARALLEL_HASKELL)
284 static StgTSO *scheduleSendPendingMessages(void);
285 static void scheduleActivateSpark(void);
286 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
287 #endif
288 #if defined(PAR) || defined(GRAN)
289 static void scheduleGranParReport(void);
290 #endif
291 static void schedulePostRunThread(void);
292 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
293 static void scheduleHandleStackOverflow( StgTSO *t);
294 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
295 static void scheduleHandleThreadBlocked( StgTSO *t );
296 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
297                                              Capability *cap, StgTSO *t );
298 static void scheduleDoHeapProfile(void);
299 static void scheduleDoGC(void);
300
301 static void unblockThread(StgTSO *tso);
302 static rtsBool checkBlackHoles(void);
303 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
304                                    Capability *initialCapability
305                                    );
306 static void scheduleThread_ (StgTSO* tso);
307 static void AllRoots(evac_fn evac);
308
309 static StgTSO *threadStackOverflow(StgTSO *tso);
310
311 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
312                         rtsBool stop_at_atomically);
313
314 static void printThreadBlockage(StgTSO *tso);
315 static void printThreadStatus(StgTSO *tso);
316
317 #if defined(PARALLEL_HASKELL)
318 StgTSO * createSparkThread(rtsSpark spark);
319 StgTSO * activateSpark (rtsSpark spark);  
320 #endif
321
322 /* ----------------------------------------------------------------------------
323  * Starting Tasks
324  * ------------------------------------------------------------------------- */
325
326 #if defined(RTS_SUPPORTS_THREADS)
327 static rtsBool startingWorkerThread = rtsFalse;
328
329 static void
330 taskStart(void)
331 {
332   ACQUIRE_LOCK(&sched_mutex);
333   startingWorkerThread = rtsFalse;
334   schedule(NULL,NULL);
335   taskStop();
336   RELEASE_LOCK(&sched_mutex);
337 }
338
339 void
340 startSchedulerTaskIfNecessary(void)
341 {
342     if ( !EMPTY_RUN_QUEUE()
343          && !shutting_down_scheduler // not if we're shutting down
344          && !startingWorkerThread )
345     {
346         // we don't want to start another worker thread
347         // just because the last one hasn't yet reached the
348         // "waiting for capability" state
349         startingWorkerThread = rtsTrue;
350         if (!maybeStartNewWorker(taskStart)) {
351             startingWorkerThread = rtsFalse;
352         }
353     }
354 }
355 #endif
356
357 /* -----------------------------------------------------------------------------
358  * Putting a thread on the run queue: different scheduling policies
359  * -------------------------------------------------------------------------- */
360
361 STATIC_INLINE void
362 addToRunQueue( StgTSO *t )
363 {
364 #if defined(PARALLEL_HASKELL)
365     if (RtsFlags.ParFlags.doFairScheduling) { 
366         // this does round-robin scheduling; good for concurrency
367         APPEND_TO_RUN_QUEUE(t);
368     } else {
369         // this does unfair scheduling; good for parallelism
370         PUSH_ON_RUN_QUEUE(t);
371     }
372 #else
373     // this does round-robin scheduling; good for concurrency
374     APPEND_TO_RUN_QUEUE(t);
375 #endif
376 }
377     
378 /* ---------------------------------------------------------------------------
379    Main scheduling loop.
380
381    We use round-robin scheduling, each thread returning to the
382    scheduler loop when one of these conditions is detected:
383
384       * out of heap space
385       * timer expires (thread yields)
386       * thread blocks
387       * thread ends
388       * stack overflow
389
390    Locking notes:  we acquire the scheduler lock once at the beginning
391    of the scheduler loop, and release it when
392     
393       * running a thread, or
394       * waiting for work, or
395       * waiting for a GC to complete.
396
397    GRAN version:
398      In a GranSim setup this loop iterates over the global event queue.
399      This revolves around the global event queue, which determines what 
400      to do next. Therefore, it's more complicated than either the 
401      concurrent or the parallel (GUM) setup.
402
403    GUM version:
404      GUM iterates over incoming messages.
405      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
406      and sends out a fish whenever it has nothing to do; in-between
407      doing the actual reductions (shared code below) it processes the
408      incoming messages and deals with delayed operations 
409      (see PendingFetches).
410      This is not the ugliest code you could imagine, but it's bloody close.
411
412    ------------------------------------------------------------------------ */
413
414 static void
415 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
416           Capability *initialCapability )
417 {
418   StgTSO *t;
419   Capability *cap;
420   StgThreadReturnCode ret;
421 #if defined(GRAN)
422   rtsEvent *event;
423 #elif defined(PARALLEL_HASKELL)
424   StgTSO *tso;
425   GlobalTaskId pe;
426   rtsBool receivedFinish = rtsFalse;
427 # if defined(DEBUG)
428   nat tp_size, sp_size; // stats only
429 # endif
430 #endif
431   nat prev_what_next;
432   
433   // Pre-condition: sched_mutex is held.
434   // We might have a capability, passed in as initialCapability.
435   cap = initialCapability;
436
437 #if !defined(RTS_SUPPORTS_THREADS)
438   // simply initialise it in the non-threaded case
439   grabCapability(&cap);
440 #endif
441
442   IF_DEBUG(scheduler,
443            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
444                        mainThread, initialCapability);
445       );
446
447   schedulePreLoop();
448
449   // -----------------------------------------------------------
450   // Scheduler loop starts here:
451
452 #if defined(PARALLEL_HASKELL)
453 #define TERMINATION_CONDITION        (!receivedFinish)
454 #elif defined(GRAN)
455 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
456 #else
457 #define TERMINATION_CONDITION        rtsTrue
458 #endif
459
460   while (TERMINATION_CONDITION) {
461
462 #if defined(GRAN)
463       /* Choose the processor with the next event */
464       CurrentProc = event->proc;
465       CurrentTSO = event->tso;
466 #endif
467
468       IF_DEBUG(scheduler, printAllThreads());
469
470 #if defined(SMP)
471       // 
472       // Wait until GC has completed, if necessary.
473       //
474       if (ready_to_gc) {
475           if (cap != NULL) {
476               releaseCapability(cap);
477               IF_DEBUG(scheduler,sched_belch("waiting for GC"));
478               waitCondition( &gc_pending_cond, &sched_mutex );
479           }
480       }
481 #endif
482
483 #if defined(RTS_SUPPORTS_THREADS)
484       // Yield the capability to higher-priority tasks if necessary.
485       //
486       if (cap != NULL) {
487           yieldCapability(&cap);
488       }
489
490       // If we do not currently hold a capability, we wait for one
491       //
492       if (cap == NULL) {
493           waitForCapability(&sched_mutex, &cap,
494                             mainThread ? &mainThread->bound_thread_cond : NULL);
495       }
496
497       // We now have a capability...
498 #endif
499
500     // Check whether we have re-entered the RTS from Haskell without
501     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
502     // call).
503     if (in_haskell) {
504           errorBelch("schedule: re-entered unsafely.\n"
505                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
506           stg_exit(1);
507     }
508
509     //
510     // Test for interruption.  If interrupted==rtsTrue, then either
511     // we received a keyboard interrupt (^C), or the scheduler is
512     // trying to shut down all the tasks (shutting_down_scheduler) in
513     // the threaded RTS.
514     //
515     if (interrupted) {
516         if (shutting_down_scheduler) {
517             IF_DEBUG(scheduler, sched_belch("shutting down"));
518             releaseCapability(cap);
519             if (mainThread) {
520                 mainThread->stat = Interrupted;
521                 mainThread->ret  = NULL;
522             }
523             return;
524         } else {
525             IF_DEBUG(scheduler, sched_belch("interrupted"));
526             deleteAllThreads();
527         }
528     }
529
530 #if defined(not_yet) && defined(SMP)
531     //
532     // Top up the run queue from our spark pool.  We try to make the
533     // number of threads in the run queue equal to the number of
534     // free capabilities.
535     //
536     {
537         StgClosure *spark;
538         if (EMPTY_RUN_QUEUE()) {
539             spark = findSpark(rtsFalse);
540             if (spark == NULL) {
541                 break; /* no more sparks in the pool */
542             } else {
543                 createSparkThread(spark);         
544                 IF_DEBUG(scheduler,
545                          sched_belch("==^^ turning spark of closure %p into a thread",
546                                      (StgClosure *)spark));
547             }
548         }
549     }
550 #endif // SMP
551
552     scheduleStartSignalHandlers();
553
554     // Only check the black holes here if we've nothing else to do.
555     // During normal execution, the black hole list only gets checked
556     // at GC time, to avoid repeatedly traversing this possibly long
557     // list each time around the scheduler.
558     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
559
560     scheduleCheckBlockedThreads();
561
562     scheduleDetectDeadlock();
563
564     // Normally, the only way we can get here with no threads to
565     // run is if a keyboard interrupt received during 
566     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
567     // Additionally, it is not fatal for the
568     // threaded RTS to reach here with no threads to run.
569     //
570     // win32: might be here due to awaitEvent() being abandoned
571     // as a result of a console event having been delivered.
572     if ( EMPTY_RUN_QUEUE() ) {
573 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
574         ASSERT(interrupted);
575 #endif
576         continue; // nothing to do
577     }
578
579 #if defined(PARALLEL_HASKELL)
580     scheduleSendPendingMessages();
581     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
582         continue;
583
584 #if defined(SPARKS)
585     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
586 #endif
587
588     /* If we still have no work we need to send a FISH to get a spark
589        from another PE */
590     if (EMPTY_RUN_QUEUE()) {
591         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
592         ASSERT(rtsFalse); // should not happen at the moment
593     }
594     // from here: non-empty run queue.
595     //  TODO: merge above case with this, only one call processMessages() !
596     if (PacketsWaiting()) {  /* process incoming messages, if
597                                 any pending...  only in else
598                                 because getRemoteWork waits for
599                                 messages as well */
600         receivedFinish = processMessages();
601     }
602 #endif
603
604 #if defined(GRAN)
605     scheduleProcessEvent(event);
606 #endif
607
608     // 
609     // Get a thread to run
610     //
611     ASSERT(run_queue_hd != END_TSO_QUEUE);
612     POP_RUN_QUEUE(t);
613
614 #if defined(GRAN) || defined(PAR)
615     scheduleGranParReport(); // some kind of debuging output
616 #else
617     // Sanity check the thread we're about to run.  This can be
618     // expensive if there is lots of thread switching going on...
619     IF_DEBUG(sanity,checkTSO(t));
620 #endif
621
622 #if defined(RTS_SUPPORTS_THREADS)
623     // Check whether we can run this thread in the current task.
624     // If not, we have to pass our capability to the right task.
625     {
626       StgMainThread *m = t->main;
627       
628       if(m)
629       {
630         if(m == mainThread)
631         {
632           IF_DEBUG(scheduler,
633             sched_belch("### Running thread %d in bound thread", t->id));
634           // yes, the Haskell thread is bound to the current native thread
635         }
636         else
637         {
638           IF_DEBUG(scheduler,
639             sched_belch("### thread %d bound to another OS thread", t->id));
640           // no, bound to a different Haskell thread: pass to that thread
641           PUSH_ON_RUN_QUEUE(t);
642           passCapability(&m->bound_thread_cond);
643           continue;
644         }
645       }
646       else
647       {
648         if(mainThread != NULL)
649         // The thread we want to run is bound.
650         {
651           IF_DEBUG(scheduler,
652             sched_belch("### this OS thread cannot run thread %d", t->id));
653           // no, the current native thread is bound to a different
654           // Haskell thread, so pass it to any worker thread
655           PUSH_ON_RUN_QUEUE(t);
656           passCapabilityToWorker();
657           continue; 
658         }
659       }
660     }
661 #endif
662
663     cap->r.rCurrentTSO = t;
664     
665     /* context switches are now initiated by the timer signal, unless
666      * the user specified "context switch as often as possible", with
667      * +RTS -C0
668      */
669     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
670          && (run_queue_hd != END_TSO_QUEUE
671              || blocked_queue_hd != END_TSO_QUEUE
672              || sleeping_queue != END_TSO_QUEUE)))
673         context_switch = 1;
674
675 run_thread:
676
677     RELEASE_LOCK(&sched_mutex);
678
679     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
680                               (long)t->id, whatNext_strs[t->what_next]));
681
682 #if defined(PROFILING)
683     startHeapProfTimer();
684 #endif
685
686     // ----------------------------------------------------------------------
687     // Run the current thread 
688
689     prev_what_next = t->what_next;
690
691     errno = t->saved_errno;
692     in_haskell = rtsTrue;
693
694     switch (prev_what_next) {
695
696     case ThreadKilled:
697     case ThreadComplete:
698         /* Thread already finished, return to scheduler. */
699         ret = ThreadFinished;
700         break;
701
702     case ThreadRunGHC:
703         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
704         break;
705
706     case ThreadInterpret:
707         ret = interpretBCO(cap);
708         break;
709
710     default:
711       barf("schedule: invalid what_next field");
712     }
713
714     // We have run some Haskell code: there might be blackhole-blocked
715     // threads to wake up now.
716     if ( blackhole_queue != END_TSO_QUEUE ) {
717         blackholes_need_checking = rtsTrue;
718     }
719
720     in_haskell = rtsFalse;
721
722     // The TSO might have moved, eg. if it re-entered the RTS and a GC
723     // happened.  So find the new location:
724     t = cap->r.rCurrentTSO;
725
726     // And save the current errno in this thread.
727     t->saved_errno = errno;
728
729     // ----------------------------------------------------------------------
730     
731     /* Costs for the scheduler are assigned to CCS_SYSTEM */
732 #if defined(PROFILING)
733     stopHeapProfTimer();
734     CCCS = CCS_SYSTEM;
735 #endif
736     
737     ACQUIRE_LOCK(&sched_mutex);
738     
739 #if defined(RTS_SUPPORTS_THREADS)
740     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
741 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
742     IF_DEBUG(scheduler,debugBelch("sched: "););
743 #endif
744     
745     schedulePostRunThread();
746
747     switch (ret) {
748     case HeapOverflow:
749         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
750         break;
751
752     case StackOverflow:
753         scheduleHandleStackOverflow(t);
754         break;
755
756     case ThreadYielding:
757         if (scheduleHandleYield(t, prev_what_next)) {
758             // shortcut for switching between compiler/interpreter:
759             goto run_thread; 
760         }
761         break;
762
763     case ThreadBlocked:
764         scheduleHandleThreadBlocked(t);
765         threadPaused(t);
766         break;
767
768     case ThreadFinished:
769         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
770         break;
771
772     default:
773       barf("schedule: invalid thread return code %d", (int)ret);
774     }
775
776     scheduleDoHeapProfile();
777     scheduleDoGC();
778   } /* end of while() */
779
780   IF_PAR_DEBUG(verbose,
781                debugBelch("== Leaving schedule() after having received Finish\n"));
782 }
783
784 /* ----------------------------------------------------------------------------
785  * Setting up the scheduler loop
786  * ASSUMES: sched_mutex
787  * ------------------------------------------------------------------------- */
788
789 static void
790 schedulePreLoop(void)
791 {
792 #if defined(GRAN) 
793     /* set up first event to get things going */
794     /* ToDo: assign costs for system setup and init MainTSO ! */
795     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
796               ContinueThread, 
797               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
798     
799     IF_DEBUG(gran,
800              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
801                         CurrentTSO);
802              G_TSO(CurrentTSO, 5));
803     
804     if (RtsFlags.GranFlags.Light) {
805         /* Save current time; GranSim Light only */
806         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
807     }      
808 #endif
809 }
810
811 /* ----------------------------------------------------------------------------
812  * Start any pending signal handlers
813  * ASSUMES: sched_mutex
814  * ------------------------------------------------------------------------- */
815
816 static void
817 scheduleStartSignalHandlers(void)
818 {
819 #if defined(RTS_USER_SIGNALS)
820     if (signals_pending()) {
821       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
822       startSignalHandlers();
823       ACQUIRE_LOCK(&sched_mutex);
824     }
825 #endif
826 }
827
828 /* ----------------------------------------------------------------------------
829  * Check for blocked threads that can be woken up.
830  * ASSUMES: sched_mutex
831  * ------------------------------------------------------------------------- */
832
833 static void
834 scheduleCheckBlockedThreads(void)
835 {
836     //
837     // Check whether any waiting threads need to be woken up.  If the
838     // run queue is empty, and there are no other tasks running, we
839     // can wait indefinitely for something to happen.
840     //
841     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
842     {
843 #if defined(RTS_SUPPORTS_THREADS)
844         // We shouldn't be here...
845         barf("schedule: awaitEvent() in threaded RTS");
846 #endif
847         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
848     }
849 }
850
851
852 /* ----------------------------------------------------------------------------
853  * Check for threads blocked on BLACKHOLEs that can be woken up
854  * ASSUMES: sched_mutex
855  * ------------------------------------------------------------------------- */
856 static void
857 scheduleCheckBlackHoles( void )
858 {
859     if ( blackholes_need_checking )
860     {
861         checkBlackHoles();
862         blackholes_need_checking = rtsFalse;
863     }
864 }
865
866 /* ----------------------------------------------------------------------------
867  * Detect deadlock conditions and attempt to resolve them.
868  * ASSUMES: sched_mutex
869  * ------------------------------------------------------------------------- */
870
871 static void
872 scheduleDetectDeadlock(void)
873 {
874     /* 
875      * Detect deadlock: when we have no threads to run, there are no
876      * threads blocked, waiting for I/O, or sleeping, and all the
877      * other tasks are waiting for work, we must have a deadlock of
878      * some description.
879      */
880     if ( EMPTY_THREAD_QUEUES() )
881     {
882 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
883         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
884
885         // Garbage collection can release some new threads due to
886         // either (a) finalizers or (b) threads resurrected because
887         // they are unreachable and will therefore be sent an
888         // exception.  Any threads thus released will be immediately
889         // runnable.
890         GarbageCollect(GetRoots,rtsTrue);
891         if ( !EMPTY_RUN_QUEUE() ) return;
892
893 #if defined(RTS_USER_SIGNALS)
894         /* If we have user-installed signal handlers, then wait
895          * for signals to arrive rather then bombing out with a
896          * deadlock.
897          */
898         if ( anyUserHandlers() ) {
899             IF_DEBUG(scheduler, 
900                      sched_belch("still deadlocked, waiting for signals..."));
901
902             awaitUserSignals();
903
904             if (signals_pending()) {
905                 RELEASE_LOCK(&sched_mutex);
906                 startSignalHandlers();
907                 ACQUIRE_LOCK(&sched_mutex);
908             }
909
910             // either we have threads to run, or we were interrupted:
911             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
912         }
913 #endif
914
915         /* Probably a real deadlock.  Send the current main thread the
916          * Deadlock exception (or in the SMP build, send *all* main
917          * threads the deadlock exception, since none of them can make
918          * progress).
919          */
920         {
921             StgMainThread *m;
922             m = main_threads;
923             switch (m->tso->why_blocked) {
924             case BlockedOnBlackHole:
925             case BlockedOnException:
926             case BlockedOnMVar:
927                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
928                 return;
929             default:
930                 barf("deadlock: main thread blocked in a strange way");
931             }
932         }
933
934 #elif defined(RTS_SUPPORTS_THREADS)
935     // ToDo: add deadlock detection in threaded RTS
936 #elif defined(PARALLEL_HASKELL)
937     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
938 #endif
939     }
940 }
941
942 /* ----------------------------------------------------------------------------
943  * Process an event (GRAN only)
944  * ------------------------------------------------------------------------- */
945
946 #if defined(GRAN)
947 static StgTSO *
948 scheduleProcessEvent(rtsEvent *event)
949 {
950     StgTSO *t;
951
952     if (RtsFlags.GranFlags.Light)
953       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
954
955     /* adjust time based on time-stamp */
956     if (event->time > CurrentTime[CurrentProc] &&
957         event->evttype != ContinueThread)
958       CurrentTime[CurrentProc] = event->time;
959     
960     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
961     if (!RtsFlags.GranFlags.Light)
962       handleIdlePEs();
963
964     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
965
966     /* main event dispatcher in GranSim */
967     switch (event->evttype) {
968       /* Should just be continuing execution */
969     case ContinueThread:
970       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
971       /* ToDo: check assertion
972       ASSERT(run_queue_hd != (StgTSO*)NULL &&
973              run_queue_hd != END_TSO_QUEUE);
974       */
975       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
976       if (!RtsFlags.GranFlags.DoAsyncFetch &&
977           procStatus[CurrentProc]==Fetching) {
978         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
979               CurrentTSO->id, CurrentTSO, CurrentProc);
980         goto next_thread;
981       } 
982       /* Ignore ContinueThreads for completed threads */
983       if (CurrentTSO->what_next == ThreadComplete) {
984         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
985               CurrentTSO->id, CurrentTSO, CurrentProc);
986         goto next_thread;
987       } 
988       /* Ignore ContinueThreads for threads that are being migrated */
989       if (PROCS(CurrentTSO)==Nowhere) { 
990         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
991               CurrentTSO->id, CurrentTSO, CurrentProc);
992         goto next_thread;
993       }
994       /* The thread should be at the beginning of the run queue */
995       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
996         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
997               CurrentTSO->id, CurrentTSO, CurrentProc);
998         break; // run the thread anyway
999       }
1000       /*
1001       new_event(proc, proc, CurrentTime[proc],
1002                 FindWork,
1003                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1004       goto next_thread; 
1005       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1006       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1007
1008     case FetchNode:
1009       do_the_fetchnode(event);
1010       goto next_thread;             /* handle next event in event queue  */
1011       
1012     case GlobalBlock:
1013       do_the_globalblock(event);
1014       goto next_thread;             /* handle next event in event queue  */
1015       
1016     case FetchReply:
1017       do_the_fetchreply(event);
1018       goto next_thread;             /* handle next event in event queue  */
1019       
1020     case UnblockThread:   /* Move from the blocked queue to the tail of */
1021       do_the_unblock(event);
1022       goto next_thread;             /* handle next event in event queue  */
1023       
1024     case ResumeThread:  /* Move from the blocked queue to the tail of */
1025       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1026       event->tso->gran.blocktime += 
1027         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1028       do_the_startthread(event);
1029       goto next_thread;             /* handle next event in event queue  */
1030       
1031     case StartThread:
1032       do_the_startthread(event);
1033       goto next_thread;             /* handle next event in event queue  */
1034       
1035     case MoveThread:
1036       do_the_movethread(event);
1037       goto next_thread;             /* handle next event in event queue  */
1038       
1039     case MoveSpark:
1040       do_the_movespark(event);
1041       goto next_thread;             /* handle next event in event queue  */
1042       
1043     case FindWork:
1044       do_the_findwork(event);
1045       goto next_thread;             /* handle next event in event queue  */
1046       
1047     default:
1048       barf("Illegal event type %u\n", event->evttype);
1049     }  /* switch */
1050     
1051     /* This point was scheduler_loop in the old RTS */
1052
1053     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1054
1055     TimeOfLastEvent = CurrentTime[CurrentProc];
1056     TimeOfNextEvent = get_time_of_next_event();
1057     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1058     // CurrentTSO = ThreadQueueHd;
1059
1060     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1061                          TimeOfNextEvent));
1062
1063     if (RtsFlags.GranFlags.Light) 
1064       GranSimLight_leave_system(event, &ActiveTSO); 
1065
1066     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1067
1068     IF_DEBUG(gran, 
1069              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1070
1071     /* in a GranSim setup the TSO stays on the run queue */
1072     t = CurrentTSO;
1073     /* Take a thread from the run queue. */
1074     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1075
1076     IF_DEBUG(gran, 
1077              debugBelch("GRAN: About to run current thread, which is\n");
1078              G_TSO(t,5));
1079
1080     context_switch = 0; // turned on via GranYield, checking events and time slice
1081
1082     IF_DEBUG(gran, 
1083              DumpGranEvent(GR_SCHEDULE, t));
1084
1085     procStatus[CurrentProc] = Busy;
1086 }
1087 #endif // GRAN
1088
1089 /* ----------------------------------------------------------------------------
1090  * Send pending messages (PARALLEL_HASKELL only)
1091  * ------------------------------------------------------------------------- */
1092
1093 #if defined(PARALLEL_HASKELL)
1094 static StgTSO *
1095 scheduleSendPendingMessages(void)
1096 {
1097     StgSparkPool *pool;
1098     rtsSpark spark;
1099     StgTSO *t;
1100
1101 # if defined(PAR) // global Mem.Mgmt., omit for now
1102     if (PendingFetches != END_BF_QUEUE) {
1103         processFetches();
1104     }
1105 # endif
1106     
1107     if (RtsFlags.ParFlags.BufferTime) {
1108         // if we use message buffering, we must send away all message
1109         // packets which have become too old...
1110         sendOldBuffers(); 
1111     }
1112 }
1113 #endif
1114
1115 /* ----------------------------------------------------------------------------
1116  * Activate spark threads (PARALLEL_HASKELL only)
1117  * ------------------------------------------------------------------------- */
1118
1119 #if defined(PARALLEL_HASKELL)
1120 static void
1121 scheduleActivateSpark(void)
1122 {
1123 #if defined(SPARKS)
1124   ASSERT(EMPTY_RUN_QUEUE());
1125 /* We get here if the run queue is empty and want some work.
1126    We try to turn a spark into a thread, and add it to the run queue,
1127    from where it will be picked up in the next iteration of the scheduler
1128    loop.
1129 */
1130
1131       /* :-[  no local threads => look out for local sparks */
1132       /* the spark pool for the current PE */
1133       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1134       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1135           pool->hd < pool->tl) {
1136         /* 
1137          * ToDo: add GC code check that we really have enough heap afterwards!!
1138          * Old comment:
1139          * If we're here (no runnable threads) and we have pending
1140          * sparks, we must have a space problem.  Get enough space
1141          * to turn one of those pending sparks into a
1142          * thread... 
1143          */
1144
1145         spark = findSpark(rtsFalse);            /* get a spark */
1146         if (spark != (rtsSpark) NULL) {
1147           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1148           IF_PAR_DEBUG(fish, // schedule,
1149                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1150                              tso->id, tso, advisory_thread_count));
1151
1152           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1153             IF_PAR_DEBUG(fish, // schedule,
1154                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1155                             spark));
1156             return rtsFalse; /* failed to generate a thread */
1157           }                  /* otherwise fall through & pick-up new tso */
1158         } else {
1159           IF_PAR_DEBUG(fish, // schedule,
1160                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1161                              spark_queue_len(pool)));
1162           return rtsFalse;  /* failed to generate a thread */
1163         }
1164         return rtsTrue;  /* success in generating a thread */
1165   } else { /* no more threads permitted or pool empty */
1166     return rtsFalse;  /* failed to generateThread */
1167   }
1168 #else
1169   tso = NULL; // avoid compiler warning only
1170   return rtsFalse;  /* dummy in non-PAR setup */
1171 #endif // SPARKS
1172 }
1173 #endif // PARALLEL_HASKELL
1174
1175 /* ----------------------------------------------------------------------------
1176  * Get work from a remote node (PARALLEL_HASKELL only)
1177  * ------------------------------------------------------------------------- */
1178     
1179 #if defined(PARALLEL_HASKELL)
1180 static rtsBool
1181 scheduleGetRemoteWork(rtsBool *receivedFinish)
1182 {
1183   ASSERT(EMPTY_RUN_QUEUE());
1184
1185   if (RtsFlags.ParFlags.BufferTime) {
1186         IF_PAR_DEBUG(verbose, 
1187                 debugBelch("...send all pending data,"));
1188         {
1189           nat i;
1190           for (i=1; i<=nPEs; i++)
1191             sendImmediately(i); // send all messages away immediately
1192         }
1193   }
1194 # ifndef SPARKS
1195         //++EDEN++ idle() , i.e. send all buffers, wait for work
1196         // suppress fishing in EDEN... just look for incoming messages
1197         // (blocking receive)
1198   IF_PAR_DEBUG(verbose, 
1199                debugBelch("...wait for incoming messages...\n"));
1200   *receivedFinish = processMessages(); // blocking receive...
1201
1202         // and reenter scheduling loop after having received something
1203         // (return rtsFalse below)
1204
1205 # else /* activate SPARKS machinery */
1206 /* We get here, if we have no work, tried to activate a local spark, but still
1207    have no work. We try to get a remote spark, by sending a FISH message.
1208    Thread migration should be added here, and triggered when a sequence of 
1209    fishes returns without work. */
1210         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1211
1212       /* =8-[  no local sparks => look for work on other PEs */
1213         /*
1214          * We really have absolutely no work.  Send out a fish
1215          * (there may be some out there already), and wait for
1216          * something to arrive.  We clearly can't run any threads
1217          * until a SCHEDULE or RESUME arrives, and so that's what
1218          * we're hoping to see.  (Of course, we still have to
1219          * respond to other types of messages.)
1220          */
1221         rtsTime now = msTime() /*CURRENT_TIME*/;
1222         IF_PAR_DEBUG(verbose, 
1223                      debugBelch("--  now=%ld\n", now));
1224         IF_PAR_DEBUG(fish, // verbose,
1225              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1226                  (last_fish_arrived_at!=0 &&
1227                   last_fish_arrived_at+delay > now)) {
1228                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1229                      now, last_fish_arrived_at+delay, 
1230                      last_fish_arrived_at,
1231                      delay);
1232              });
1233   
1234         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1235             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1236           if (last_fish_arrived_at==0 ||
1237               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1238             /* outstandingFishes is set in sendFish, processFish;
1239                avoid flooding system with fishes via delay */
1240     next_fish_to_send_at = 0;  
1241   } else {
1242     /* ToDo: this should be done in the main scheduling loop to avoid the
1243              busy wait here; not so bad if fish delay is very small  */
1244     int iq = 0; // DEBUGGING -- HWL
1245     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1246     /* send a fish when ready, but process messages that arrive in the meantime */
1247     do {
1248       if (PacketsWaiting()) {
1249         iq++; // DEBUGGING
1250         *receivedFinish = processMessages();
1251       }
1252       now = msTime();
1253     } while (!*receivedFinish || now<next_fish_to_send_at);
1254     // JB: This means the fish could become obsolete, if we receive
1255     // work. Better check for work again? 
1256     // last line: while (!receivedFinish || !haveWork || now<...)
1257     // next line: if (receivedFinish || haveWork )
1258
1259     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1260       return rtsFalse;  // NB: this will leave scheduler loop
1261                         // immediately after return!
1262                           
1263     IF_PAR_DEBUG(fish, // verbose,
1264                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1265
1266   }
1267
1268     // JB: IMHO, this should all be hidden inside sendFish(...)
1269     /* pe = choosePE(); 
1270        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1271                 NEW_FISH_HUNGER);
1272
1273     // Global statistics: count no. of fishes
1274     if (RtsFlags.ParFlags.ParStats.Global &&
1275          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1276            globalParStats.tot_fish_mess++;
1277            }
1278     */ 
1279
1280   /* delayed fishes must have been sent by now! */
1281   next_fish_to_send_at = 0;  
1282   }
1283       
1284   *receivedFinish = processMessages();
1285 # endif /* SPARKS */
1286
1287  return rtsFalse;
1288  /* NB: this function always returns rtsFalse, meaning the scheduler
1289     loop continues with the next iteration; 
1290     rationale: 
1291       return code means success in finding work; we enter this function
1292       if there is no local work, thus have to send a fish which takes
1293       time until it arrives with work; in the meantime we should process
1294       messages in the main loop;
1295  */
1296 }
1297 #endif // PARALLEL_HASKELL
1298
1299 /* ----------------------------------------------------------------------------
1300  * PAR/GRAN: Report stats & debugging info(?)
1301  * ------------------------------------------------------------------------- */
1302
1303 #if defined(PAR) || defined(GRAN)
1304 static void
1305 scheduleGranParReport(void)
1306 {
1307   ASSERT(run_queue_hd != END_TSO_QUEUE);
1308
1309   /* Take a thread from the run queue, if we have work */
1310   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1311
1312     /* If this TSO has got its outport closed in the meantime, 
1313      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1314      * It has to be marked as TH_DEAD for this purpose.
1315      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1316
1317 JB: TODO: investigate wether state change field could be nuked
1318      entirely and replaced by the normal tso state (whatnext
1319      field). All we want to do is to kill tsos from outside.
1320      */
1321
1322     /* ToDo: write something to the log-file
1323     if (RTSflags.ParFlags.granSimStats && !sameThread)
1324         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1325
1326     CurrentTSO = t;
1327     */
1328     /* the spark pool for the current PE */
1329     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1330
1331     IF_DEBUG(scheduler, 
1332              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1333                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1334
1335     IF_PAR_DEBUG(fish,
1336              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1337                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1338
1339     if (RtsFlags.ParFlags.ParStats.Full && 
1340         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1341         (emitSchedule || // forced emit
1342          (t && LastTSO && t->id != LastTSO->id))) {
1343       /* 
1344          we are running a different TSO, so write a schedule event to log file
1345          NB: If we use fair scheduling we also have to write  a deschedule 
1346              event for LastTSO; with unfair scheduling we know that the
1347              previous tso has blocked whenever we switch to another tso, so
1348              we don't need it in GUM for now
1349       */
1350       IF_PAR_DEBUG(fish, // schedule,
1351                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1352
1353       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1354                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1355       emitSchedule = rtsFalse;
1356     }
1357 }     
1358 #endif
1359
1360 /* ----------------------------------------------------------------------------
1361  * After running a thread...
1362  * ASSUMES: sched_mutex
1363  * ------------------------------------------------------------------------- */
1364
1365 static void
1366 schedulePostRunThread(void)
1367 {
1368 #if defined(PAR)
1369     /* HACK 675: if the last thread didn't yield, make sure to print a 
1370        SCHEDULE event to the log file when StgRunning the next thread, even
1371        if it is the same one as before */
1372     LastTSO = t; 
1373     TimeOfLastYield = CURRENT_TIME;
1374 #endif
1375
1376   /* some statistics gathering in the parallel case */
1377
1378 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1379   switch (ret) {
1380     case HeapOverflow:
1381 # if defined(GRAN)
1382       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1383       globalGranStats.tot_heapover++;
1384 # elif defined(PAR)
1385       globalParStats.tot_heapover++;
1386 # endif
1387       break;
1388
1389      case StackOverflow:
1390 # if defined(GRAN)
1391       IF_DEBUG(gran, 
1392                DumpGranEvent(GR_DESCHEDULE, t));
1393       globalGranStats.tot_stackover++;
1394 # elif defined(PAR)
1395       // IF_DEBUG(par, 
1396       // DumpGranEvent(GR_DESCHEDULE, t);
1397       globalParStats.tot_stackover++;
1398 # endif
1399       break;
1400
1401     case ThreadYielding:
1402 # if defined(GRAN)
1403       IF_DEBUG(gran, 
1404                DumpGranEvent(GR_DESCHEDULE, t));
1405       globalGranStats.tot_yields++;
1406 # elif defined(PAR)
1407       // IF_DEBUG(par, 
1408       // DumpGranEvent(GR_DESCHEDULE, t);
1409       globalParStats.tot_yields++;
1410 # endif
1411       break; 
1412
1413     case ThreadBlocked:
1414 # if defined(GRAN)
1415       IF_DEBUG(scheduler,
1416                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1417                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1418                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1419                if (t->block_info.closure!=(StgClosure*)NULL)
1420                  print_bq(t->block_info.closure);
1421                debugBelch("\n"));
1422
1423       // ??? needed; should emit block before
1424       IF_DEBUG(gran, 
1425                DumpGranEvent(GR_DESCHEDULE, t)); 
1426       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1427       /*
1428         ngoq Dogh!
1429       ASSERT(procStatus[CurrentProc]==Busy || 
1430               ((procStatus[CurrentProc]==Fetching) && 
1431               (t->block_info.closure!=(StgClosure*)NULL)));
1432       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1433           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1434             procStatus[CurrentProc]==Fetching)) 
1435         procStatus[CurrentProc] = Idle;
1436       */
1437 # elif defined(PAR)
1438 //++PAR++  blockThread() writes the event (change?)
1439 # endif
1440     break;
1441
1442   case ThreadFinished:
1443     break;
1444
1445   default:
1446     barf("parGlobalStats: unknown return code");
1447     break;
1448     }
1449 #endif
1450 }
1451
1452 /* -----------------------------------------------------------------------------
1453  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1454  * ASSUMES: sched_mutex
1455  * -------------------------------------------------------------------------- */
1456
1457 static rtsBool
1458 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1459 {
1460     // did the task ask for a large block?
1461     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1462         // if so, get one and push it on the front of the nursery.
1463         bdescr *bd;
1464         lnat blocks;
1465         
1466         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1467         
1468         IF_DEBUG(scheduler,
1469                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1470                             (long)t->id, whatNext_strs[t->what_next], blocks));
1471         
1472         // don't do this if it would push us over the
1473         // alloc_blocks_lim limit; we'll GC first.
1474         if (alloc_blocks + blocks < alloc_blocks_lim) {
1475             
1476             alloc_blocks += blocks;
1477             bd = allocGroup( blocks );
1478             
1479             // link the new group into the list
1480             bd->link = cap->r.rCurrentNursery;
1481             bd->u.back = cap->r.rCurrentNursery->u.back;
1482             if (cap->r.rCurrentNursery->u.back != NULL) {
1483                 cap->r.rCurrentNursery->u.back->link = bd;
1484             } else {
1485                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1486                        g0s0->blocks == cap->r.rNursery);
1487                 cap->r.rNursery = g0s0->blocks = bd;
1488             }             
1489             cap->r.rCurrentNursery->u.back = bd;
1490             
1491             // initialise it as a nursery block.  We initialise the
1492             // step, gen_no, and flags field of *every* sub-block in
1493             // this large block, because this is easier than making
1494             // sure that we always find the block head of a large
1495             // block whenever we call Bdescr() (eg. evacuate() and
1496             // isAlive() in the GC would both have to do this, at
1497             // least).
1498             { 
1499                 bdescr *x;
1500                 for (x = bd; x < bd + blocks; x++) {
1501                     x->step = g0s0;
1502                     x->gen_no = 0;
1503                     x->flags = 0;
1504                 }
1505             }
1506             
1507             // don't forget to update the block count in g0s0.
1508             g0s0->n_blocks += blocks;
1509             // This assert can be a killer if the app is doing lots
1510             // of large block allocations.
1511             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1512             
1513             // now update the nursery to point to the new block
1514             cap->r.rCurrentNursery = bd;
1515             
1516             // we might be unlucky and have another thread get on the
1517             // run queue before us and steal the large block, but in that
1518             // case the thread will just end up requesting another large
1519             // block.
1520             PUSH_ON_RUN_QUEUE(t);
1521             return rtsFalse;  /* not actually GC'ing */
1522         }
1523     }
1524     
1525     /* make all the running tasks block on a condition variable,
1526      * maybe set context_switch and wait till they all pile in,
1527      * then have them wait on a GC condition variable.
1528      */
1529     IF_DEBUG(scheduler,
1530              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1531                         (long)t->id, whatNext_strs[t->what_next]));
1532     threadPaused(t);
1533 #if defined(GRAN)
1534     ASSERT(!is_on_queue(t,CurrentProc));
1535 #elif defined(PARALLEL_HASKELL)
1536     /* Currently we emit a DESCHEDULE event before GC in GUM.
1537        ToDo: either add separate event to distinguish SYSTEM time from rest
1538        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1539     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1540         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1541                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1542         emitSchedule = rtsTrue;
1543     }
1544 #endif
1545       
1546     PUSH_ON_RUN_QUEUE(t);
1547     return rtsTrue;
1548     /* actual GC is done at the end of the while loop in schedule() */
1549 }
1550
1551 /* -----------------------------------------------------------------------------
1552  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1553  * ASSUMES: sched_mutex
1554  * -------------------------------------------------------------------------- */
1555
1556 static void
1557 scheduleHandleStackOverflow( StgTSO *t)
1558 {
1559     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1560                                   (long)t->id, whatNext_strs[t->what_next]));
1561     /* just adjust the stack for this thread, then pop it back
1562      * on the run queue.
1563      */
1564     threadPaused(t);
1565     { 
1566         /* enlarge the stack */
1567         StgTSO *new_t = threadStackOverflow(t);
1568         
1569         /* This TSO has moved, so update any pointers to it from the
1570          * main thread stack.  It better not be on any other queues...
1571          * (it shouldn't be).
1572          */
1573         if (t->main != NULL) {
1574             t->main->tso = new_t;
1575         }
1576         PUSH_ON_RUN_QUEUE(new_t);
1577     }
1578 }
1579
1580 /* -----------------------------------------------------------------------------
1581  * Handle a thread that returned to the scheduler with ThreadYielding
1582  * ASSUMES: sched_mutex
1583  * -------------------------------------------------------------------------- */
1584
1585 static rtsBool
1586 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1587 {
1588     // Reset the context switch flag.  We don't do this just before
1589     // running the thread, because that would mean we would lose ticks
1590     // during GC, which can lead to unfair scheduling (a thread hogs
1591     // the CPU because the tick always arrives during GC).  This way
1592     // penalises threads that do a lot of allocation, but that seems
1593     // better than the alternative.
1594     context_switch = 0;
1595     
1596     /* put the thread back on the run queue.  Then, if we're ready to
1597      * GC, check whether this is the last task to stop.  If so, wake
1598      * up the GC thread.  getThread will block during a GC until the
1599      * GC is finished.
1600      */
1601     IF_DEBUG(scheduler,
1602              if (t->what_next != prev_what_next) {
1603                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1604                             (long)t->id, whatNext_strs[t->what_next]);
1605              } else {
1606                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1607                             (long)t->id, whatNext_strs[t->what_next]);
1608              }
1609         );
1610     
1611     IF_DEBUG(sanity,
1612              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1613              checkTSO(t));
1614     ASSERT(t->link == END_TSO_QUEUE);
1615     
1616     // Shortcut if we're just switching evaluators: don't bother
1617     // doing stack squeezing (which can be expensive), just run the
1618     // thread.
1619     if (t->what_next != prev_what_next) {
1620         return rtsTrue;
1621     }
1622     
1623     threadPaused(t);
1624     
1625 #if defined(GRAN)
1626     ASSERT(!is_on_queue(t,CurrentProc));
1627       
1628     IF_DEBUG(sanity,
1629              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1630              checkThreadQsSanity(rtsTrue));
1631
1632 #endif
1633
1634     addToRunQueue(t);
1635
1636 #if defined(GRAN)
1637     /* add a ContinueThread event to actually process the thread */
1638     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1639               ContinueThread,
1640               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1641     IF_GRAN_DEBUG(bq, 
1642                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1643                   G_EVENTQ(0);
1644                   G_CURR_THREADQ(0));
1645 #endif
1646     return rtsFalse;
1647 }
1648
1649 /* -----------------------------------------------------------------------------
1650  * Handle a thread that returned to the scheduler with ThreadBlocked
1651  * ASSUMES: sched_mutex
1652  * -------------------------------------------------------------------------- */
1653
1654 static void
1655 scheduleHandleThreadBlocked( StgTSO *t
1656 #if !defined(GRAN) && !defined(DEBUG)
1657     STG_UNUSED
1658 #endif
1659     )
1660 {
1661 #if defined(GRAN)
1662     IF_DEBUG(scheduler,
1663              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1664                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1665              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1666     
1667     // ??? needed; should emit block before
1668     IF_DEBUG(gran, 
1669              DumpGranEvent(GR_DESCHEDULE, t)); 
1670     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1671     /*
1672       ngoq Dogh!
1673       ASSERT(procStatus[CurrentProc]==Busy || 
1674       ((procStatus[CurrentProc]==Fetching) && 
1675       (t->block_info.closure!=(StgClosure*)NULL)));
1676       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1677       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1678       procStatus[CurrentProc]==Fetching)) 
1679       procStatus[CurrentProc] = Idle;
1680     */
1681 #elif defined(PAR)
1682     IF_DEBUG(scheduler,
1683              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1684                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1685     IF_PAR_DEBUG(bq,
1686                  
1687                  if (t->block_info.closure!=(StgClosure*)NULL) 
1688                  print_bq(t->block_info.closure));
1689     
1690     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1691     blockThread(t);
1692     
1693     /* whatever we schedule next, we must log that schedule */
1694     emitSchedule = rtsTrue;
1695     
1696 #else /* !GRAN */
1697       /* don't need to do anything.  Either the thread is blocked on
1698        * I/O, in which case we'll have called addToBlockedQueue
1699        * previously, or it's blocked on an MVar or Blackhole, in which
1700        * case it'll be on the relevant queue already.
1701        */
1702     ASSERT(t->why_blocked != NotBlocked);
1703     IF_DEBUG(scheduler,
1704              debugBelch("--<< thread %d (%s) stopped: ", 
1705                         t->id, whatNext_strs[t->what_next]);
1706              printThreadBlockage(t);
1707              debugBelch("\n"));
1708     
1709     /* Only for dumping event to log file 
1710        ToDo: do I need this in GranSim, too?
1711        blockThread(t);
1712     */
1713 #endif
1714 }
1715
1716 /* -----------------------------------------------------------------------------
1717  * Handle a thread that returned to the scheduler with ThreadFinished
1718  * ASSUMES: sched_mutex
1719  * -------------------------------------------------------------------------- */
1720
1721 static rtsBool
1722 scheduleHandleThreadFinished( StgMainThread *mainThread
1723                               USED_WHEN_RTS_SUPPORTS_THREADS,
1724                               Capability *cap,
1725                               StgTSO *t )
1726 {
1727     /* Need to check whether this was a main thread, and if so,
1728      * return with the return value.
1729      *
1730      * We also end up here if the thread kills itself with an
1731      * uncaught exception, see Exception.cmm.
1732      */
1733     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1734                                   t->id, whatNext_strs[t->what_next]));
1735
1736 #if defined(GRAN)
1737       endThread(t, CurrentProc); // clean-up the thread
1738 #elif defined(PARALLEL_HASKELL)
1739       /* For now all are advisory -- HWL */
1740       //if(t->priority==AdvisoryPriority) ??
1741       advisory_thread_count--; // JB: Caution with this counter, buggy!
1742       
1743 # if defined(DIST)
1744       if(t->dist.priority==RevalPriority)
1745         FinishReval(t);
1746 # endif
1747     
1748 # if defined(EDENOLD)
1749       // the thread could still have an outport... (BUG)
1750       if (t->eden.outport != -1) {
1751       // delete the outport for the tso which has finished...
1752         IF_PAR_DEBUG(eden_ports,
1753                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1754                               t->eden.outport, t->id));
1755         deleteOPT(t);
1756       }
1757       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1758       if (t->eden.epid != -1) {
1759         IF_PAR_DEBUG(eden_ports,
1760                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1761                            t->id, t->eden.epid));
1762         removeTSOfromProcess(t);
1763       }
1764 # endif 
1765
1766 # if defined(PAR)
1767       if (RtsFlags.ParFlags.ParStats.Full &&
1768           !RtsFlags.ParFlags.ParStats.Suppressed) 
1769         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1770
1771       //  t->par only contains statistics: left out for now...
1772       IF_PAR_DEBUG(fish,
1773                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1774                               t->id,t,t->par.sparkname));
1775 # endif
1776 #endif // PARALLEL_HASKELL
1777
1778       //
1779       // Check whether the thread that just completed was a main
1780       // thread, and if so return with the result.  
1781       //
1782       // There is an assumption here that all thread completion goes
1783       // through this point; we need to make sure that if a thread
1784       // ends up in the ThreadKilled state, that it stays on the run
1785       // queue so it can be dealt with here.
1786       //
1787       if (
1788 #if defined(RTS_SUPPORTS_THREADS)
1789           mainThread != NULL
1790 #else
1791           mainThread->tso == t
1792 #endif
1793           )
1794       {
1795           // We are a bound thread: this must be our thread that just
1796           // completed.
1797           ASSERT(mainThread->tso == t);
1798
1799           if (t->what_next == ThreadComplete) {
1800               if (mainThread->ret) {
1801                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1802                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1803               }
1804               mainThread->stat = Success;
1805           } else {
1806               if (mainThread->ret) {
1807                   *(mainThread->ret) = NULL;
1808               }
1809               if (interrupted) {
1810                   mainThread->stat = Interrupted;
1811               } else {
1812                   mainThread->stat = Killed;
1813               }
1814           }
1815 #ifdef DEBUG
1816           removeThreadLabel((StgWord)mainThread->tso->id);
1817 #endif
1818           if (mainThread->prev == NULL) {
1819               main_threads = mainThread->link;
1820           } else {
1821               mainThread->prev->link = mainThread->link;
1822           }
1823           if (mainThread->link != NULL) {
1824               mainThread->link->prev = NULL;
1825           }
1826           releaseCapability(cap);
1827           return rtsTrue; // tells schedule() to return
1828       }
1829
1830 #ifdef RTS_SUPPORTS_THREADS
1831       ASSERT(t->main == NULL);
1832 #else
1833       if (t->main != NULL) {
1834           // Must be a main thread that is not the topmost one.  Leave
1835           // it on the run queue until the stack has unwound to the
1836           // point where we can deal with this.  Leaving it on the run
1837           // queue also ensures that the garbage collector knows about
1838           // this thread and its return value (it gets dropped from the
1839           // all_threads list so there's no other way to find it).
1840           APPEND_TO_RUN_QUEUE(t);
1841       }
1842 #endif
1843       return rtsFalse;
1844 }
1845
1846 /* -----------------------------------------------------------------------------
1847  * Perform a heap census, if PROFILING
1848  * -------------------------------------------------------------------------- */
1849
1850 static void
1851 scheduleDoHeapProfile(void)
1852 {
1853 #ifdef PROFILING
1854     // When we have +RTS -i0 and we're heap profiling, do a census at
1855     // every GC.  This lets us get repeatable runs for debugging.
1856     if (performHeapProfile ||
1857         (RtsFlags.ProfFlags.profileInterval==0 &&
1858          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1859         GarbageCollect(GetRoots, rtsTrue);
1860         heapCensus();
1861         performHeapProfile = rtsFalse;
1862         ready_to_gc = rtsFalse; // we already GC'd
1863     }
1864 #endif
1865 }
1866
1867 /* -----------------------------------------------------------------------------
1868  * Perform a garbage collection if necessary
1869  * ASSUMES: sched_mutex
1870  * -------------------------------------------------------------------------- */
1871
1872 static void
1873 scheduleDoGC(void)
1874 {
1875     StgTSO *t;
1876
1877 #ifdef SMP
1878     // The last task to stop actually gets to do the GC.  The rest
1879     // of the tasks release their capabilities and wait gc_pending_cond.
1880     if (ready_to_gc && allFreeCapabilities())
1881 #else
1882     if (ready_to_gc)
1883 #endif
1884     {
1885         /* Kick any transactions which are invalid back to their
1886          * atomically frames.  When next scheduled they will try to
1887          * commit, this commit will fail and they will retry.
1888          */
1889         for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1890             if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1891                 if (!stmValidateTransaction (t -> trec)) {
1892                     IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1893                     
1894                     // strip the stack back to the ATOMICALLY_FRAME, aborting
1895                     // the (nested) transaction, and saving the stack of any
1896                     // partially-evaluated thunks on the heap.
1897                     raiseAsync_(t, NULL, rtsTrue);
1898                     
1899 #ifdef REG_R1
1900                     ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1901 #endif
1902                 }
1903             }
1904         }
1905
1906         // so this happens periodically:
1907         scheduleCheckBlackHoles();
1908
1909         /* everybody back, start the GC.
1910          * Could do it in this thread, or signal a condition var
1911          * to do it in another thread.  Either way, we need to
1912          * broadcast on gc_pending_cond afterward.
1913          */
1914 #if defined(RTS_SUPPORTS_THREADS)
1915         IF_DEBUG(scheduler,sched_belch("doing GC"));
1916 #endif
1917         GarbageCollect(GetRoots,rtsFalse);
1918         ready_to_gc = rtsFalse;
1919 #if defined(SMP)
1920         broadcastCondition(&gc_pending_cond);
1921 #endif
1922 #if defined(GRAN)
1923         /* add a ContinueThread event to continue execution of current thread */
1924         new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1925                   ContinueThread,
1926                   t, (StgClosure*)NULL, (rtsSpark*)NULL);
1927         IF_GRAN_DEBUG(bq, 
1928                       debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1929                       G_EVENTQ(0);
1930                       G_CURR_THREADQ(0));
1931 #endif /* GRAN */
1932     }
1933 }
1934
1935 /* ---------------------------------------------------------------------------
1936  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1937  * used by Control.Concurrent for error checking.
1938  * ------------------------------------------------------------------------- */
1939  
1940 StgBool
1941 rtsSupportsBoundThreads(void)
1942 {
1943 #ifdef THREADED_RTS
1944   return rtsTrue;
1945 #else
1946   return rtsFalse;
1947 #endif
1948 }
1949
1950 /* ---------------------------------------------------------------------------
1951  * isThreadBound(tso): check whether tso is bound to an OS thread.
1952  * ------------------------------------------------------------------------- */
1953  
1954 StgBool
1955 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1956 {
1957 #ifdef THREADED_RTS
1958   return (tso->main != NULL);
1959 #endif
1960   return rtsFalse;
1961 }
1962
1963 /* ---------------------------------------------------------------------------
1964  * Singleton fork(). Do not copy any running threads.
1965  * ------------------------------------------------------------------------- */
1966
1967 #ifndef mingw32_HOST_OS
1968 #define FORKPROCESS_PRIMOP_SUPPORTED
1969 #endif
1970
1971 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1972 static void 
1973 deleteThreadImmediately(StgTSO *tso);
1974 #endif
1975 StgInt
1976 forkProcess(HsStablePtr *entry
1977 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1978             STG_UNUSED
1979 #endif
1980            )
1981 {
1982 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1983   pid_t pid;
1984   StgTSO* t,*next;
1985   StgMainThread *m;
1986   SchedulerStatus rc;
1987
1988   IF_DEBUG(scheduler,sched_belch("forking!"));
1989   rts_lock(); // This not only acquires sched_mutex, it also
1990               // makes sure that no other threads are running
1991
1992   pid = fork();
1993
1994   if (pid) { /* parent */
1995
1996   /* just return the pid */
1997     rts_unlock();
1998     return pid;
1999     
2000   } else { /* child */
2001     
2002     
2003       // delete all threads
2004     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2005     
2006     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2007       next = t->link;
2008
2009         // don't allow threads to catch the ThreadKilled exception
2010       deleteThreadImmediately(t);
2011     }
2012     
2013       // wipe the main thread list
2014     while((m = main_threads) != NULL) {
2015       main_threads = m->link;
2016 # ifdef THREADED_RTS
2017       closeCondition(&m->bound_thread_cond);
2018 # endif
2019       stgFree(m);
2020     }
2021     
2022     rc = rts_evalStableIO(entry, NULL);  // run the action
2023     rts_checkSchedStatus("forkProcess",rc);
2024     
2025     rts_unlock();
2026     
2027     hs_exit();                      // clean up and exit
2028     stg_exit(0);
2029   }
2030 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2031   barf("forkProcess#: primop not supported, sorry!\n");
2032   return -1;
2033 #endif
2034 }
2035
2036 /* ---------------------------------------------------------------------------
2037  * deleteAllThreads():  kill all the live threads.
2038  *
2039  * This is used when we catch a user interrupt (^C), before performing
2040  * any necessary cleanups and running finalizers.
2041  *
2042  * Locks: sched_mutex held.
2043  * ------------------------------------------------------------------------- */
2044    
2045 void
2046 deleteAllThreads ( void )
2047 {
2048   StgTSO* t, *next;
2049   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2050   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2051       next = t->global_link;
2052       deleteThread(t);
2053   }      
2054
2055   // The run queue now contains a bunch of ThreadKilled threads.  We
2056   // must not throw these away: the main thread(s) will be in there
2057   // somewhere, and the main scheduler loop has to deal with it.
2058   // Also, the run queue is the only thing keeping these threads from
2059   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2060
2061   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2062   ASSERT(blackhole_queue == END_TSO_QUEUE);
2063   ASSERT(sleeping_queue == END_TSO_QUEUE);
2064 }
2065
2066 /* startThread and  insertThread are now in GranSim.c -- HWL */
2067
2068
2069 /* ---------------------------------------------------------------------------
2070  * Suspending & resuming Haskell threads.
2071  * 
2072  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2073  * its capability before calling the C function.  This allows another
2074  * task to pick up the capability and carry on running Haskell
2075  * threads.  It also means that if the C call blocks, it won't lock
2076  * the whole system.
2077  *
2078  * The Haskell thread making the C call is put to sleep for the
2079  * duration of the call, on the susepended_ccalling_threads queue.  We
2080  * give out a token to the task, which it can use to resume the thread
2081  * on return from the C function.
2082  * ------------------------------------------------------------------------- */
2083    
2084 StgInt
2085 suspendThread( StgRegTable *reg )
2086 {
2087   nat tok;
2088   Capability *cap;
2089   int saved_errno = errno;
2090
2091   /* assume that *reg is a pointer to the StgRegTable part
2092    * of a Capability.
2093    */
2094   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2095
2096   ACQUIRE_LOCK(&sched_mutex);
2097
2098   IF_DEBUG(scheduler,
2099            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2100
2101   // XXX this might not be necessary --SDM
2102   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2103
2104   threadPaused(cap->r.rCurrentTSO);
2105   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2106   suspended_ccalling_threads = cap->r.rCurrentTSO;
2107
2108   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2109       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2110       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2111   } else {
2112       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2113   }
2114
2115   /* Use the thread ID as the token; it should be unique */
2116   tok = cap->r.rCurrentTSO->id;
2117
2118   /* Hand back capability */
2119   releaseCapability(cap);
2120   
2121 #if defined(RTS_SUPPORTS_THREADS)
2122   /* Preparing to leave the RTS, so ensure there's a native thread/task
2123      waiting to take over.
2124   */
2125   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2126 #endif
2127
2128   in_haskell = rtsFalse;
2129   RELEASE_LOCK(&sched_mutex);
2130   
2131   errno = saved_errno;
2132   return tok; 
2133 }
2134
2135 StgRegTable *
2136 resumeThread( StgInt tok )
2137 {
2138   StgTSO *tso, **prev;
2139   Capability *cap;
2140   int saved_errno = errno;
2141
2142 #if defined(RTS_SUPPORTS_THREADS)
2143   /* Wait for permission to re-enter the RTS with the result. */
2144   ACQUIRE_LOCK(&sched_mutex);
2145   waitForReturnCapability(&sched_mutex, &cap);
2146
2147   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2148 #else
2149   grabCapability(&cap);
2150 #endif
2151
2152   /* Remove the thread off of the suspended list */
2153   prev = &suspended_ccalling_threads;
2154   for (tso = suspended_ccalling_threads; 
2155        tso != END_TSO_QUEUE; 
2156        prev = &tso->link, tso = tso->link) {
2157     if (tso->id == (StgThreadID)tok) {
2158       *prev = tso->link;
2159       break;
2160     }
2161   }
2162   if (tso == END_TSO_QUEUE) {
2163     barf("resumeThread: thread not found");
2164   }
2165   tso->link = END_TSO_QUEUE;
2166   
2167   if(tso->why_blocked == BlockedOnCCall) {
2168       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2169       tso->blocked_exceptions = NULL;
2170   }
2171   
2172   /* Reset blocking status */
2173   tso->why_blocked  = NotBlocked;
2174
2175   cap->r.rCurrentTSO = tso;
2176   in_haskell = rtsTrue;
2177   RELEASE_LOCK(&sched_mutex);
2178   errno = saved_errno;
2179   return &cap->r;
2180 }
2181
2182 /* ---------------------------------------------------------------------------
2183  * Comparing Thread ids.
2184  *
2185  * This is used from STG land in the implementation of the
2186  * instances of Eq/Ord for ThreadIds.
2187  * ------------------------------------------------------------------------ */
2188
2189 int
2190 cmp_thread(StgPtr tso1, StgPtr tso2) 
2191
2192   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2193   StgThreadID id2 = ((StgTSO *)tso2)->id;
2194  
2195   if (id1 < id2) return (-1);
2196   if (id1 > id2) return 1;
2197   return 0;
2198 }
2199
2200 /* ---------------------------------------------------------------------------
2201  * Fetching the ThreadID from an StgTSO.
2202  *
2203  * This is used in the implementation of Show for ThreadIds.
2204  * ------------------------------------------------------------------------ */
2205 int
2206 rts_getThreadId(StgPtr tso) 
2207 {
2208   return ((StgTSO *)tso)->id;
2209 }
2210
2211 #ifdef DEBUG
2212 void
2213 labelThread(StgPtr tso, char *label)
2214 {
2215   int len;
2216   void *buf;
2217
2218   /* Caveat: Once set, you can only set the thread name to "" */
2219   len = strlen(label)+1;
2220   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2221   strncpy(buf,label,len);
2222   /* Update will free the old memory for us */
2223   updateThreadLabel(((StgTSO *)tso)->id,buf);
2224 }
2225 #endif /* DEBUG */
2226
2227 /* ---------------------------------------------------------------------------
2228    Create a new thread.
2229
2230    The new thread starts with the given stack size.  Before the
2231    scheduler can run, however, this thread needs to have a closure
2232    (and possibly some arguments) pushed on its stack.  See
2233    pushClosure() in Schedule.h.
2234
2235    createGenThread() and createIOThread() (in SchedAPI.h) are
2236    convenient packaged versions of this function.
2237
2238    currently pri (priority) is only used in a GRAN setup -- HWL
2239    ------------------------------------------------------------------------ */
2240 #if defined(GRAN)
2241 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2242 StgTSO *
2243 createThread(nat size, StgInt pri)
2244 #else
2245 StgTSO *
2246 createThread(nat size)
2247 #endif
2248 {
2249
2250     StgTSO *tso;
2251     nat stack_size;
2252
2253     /* First check whether we should create a thread at all */
2254 #if defined(PARALLEL_HASKELL)
2255   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2256   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2257     threadsIgnored++;
2258     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2259           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2260     return END_TSO_QUEUE;
2261   }
2262   threadsCreated++;
2263 #endif
2264
2265 #if defined(GRAN)
2266   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2267 #endif
2268
2269   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2270
2271   /* catch ridiculously small stack sizes */
2272   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2273     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2274   }
2275
2276   stack_size = size - TSO_STRUCT_SIZEW;
2277
2278   tso = (StgTSO *)allocate(size);
2279   TICK_ALLOC_TSO(stack_size, 0);
2280
2281   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2282 #if defined(GRAN)
2283   SET_GRAN_HDR(tso, ThisPE);
2284 #endif
2285
2286   // Always start with the compiled code evaluator
2287   tso->what_next = ThreadRunGHC;
2288
2289   tso->id = next_thread_id++; 
2290   tso->why_blocked  = NotBlocked;
2291   tso->blocked_exceptions = NULL;
2292
2293   tso->saved_errno = 0;
2294   tso->main = NULL;
2295   
2296   tso->stack_size   = stack_size;
2297   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2298                               - TSO_STRUCT_SIZEW;
2299   tso->sp           = (P_)&(tso->stack) + stack_size;
2300
2301   tso->trec = NO_TREC;
2302
2303 #ifdef PROFILING
2304   tso->prof.CCCS = CCS_MAIN;
2305 #endif
2306
2307   /* put a stop frame on the stack */
2308   tso->sp -= sizeofW(StgStopFrame);
2309   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2310   tso->link = END_TSO_QUEUE;
2311
2312   // ToDo: check this
2313 #if defined(GRAN)
2314   /* uses more flexible routine in GranSim */
2315   insertThread(tso, CurrentProc);
2316 #else
2317   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2318    * from its creation
2319    */
2320 #endif
2321
2322 #if defined(GRAN) 
2323   if (RtsFlags.GranFlags.GranSimStats.Full) 
2324     DumpGranEvent(GR_START,tso);
2325 #elif defined(PARALLEL_HASKELL)
2326   if (RtsFlags.ParFlags.ParStats.Full) 
2327     DumpGranEvent(GR_STARTQ,tso);
2328   /* HACk to avoid SCHEDULE 
2329      LastTSO = tso; */
2330 #endif
2331
2332   /* Link the new thread on the global thread list.
2333    */
2334   tso->global_link = all_threads;
2335   all_threads = tso;
2336
2337 #if defined(DIST)
2338   tso->dist.priority = MandatoryPriority; //by default that is...
2339 #endif
2340
2341 #if defined(GRAN)
2342   tso->gran.pri = pri;
2343 # if defined(DEBUG)
2344   tso->gran.magic = TSO_MAGIC; // debugging only
2345 # endif
2346   tso->gran.sparkname   = 0;
2347   tso->gran.startedat   = CURRENT_TIME; 
2348   tso->gran.exported    = 0;
2349   tso->gran.basicblocks = 0;
2350   tso->gran.allocs      = 0;
2351   tso->gran.exectime    = 0;
2352   tso->gran.fetchtime   = 0;
2353   tso->gran.fetchcount  = 0;
2354   tso->gran.blocktime   = 0;
2355   tso->gran.blockcount  = 0;
2356   tso->gran.blockedat   = 0;
2357   tso->gran.globalsparks = 0;
2358   tso->gran.localsparks  = 0;
2359   if (RtsFlags.GranFlags.Light)
2360     tso->gran.clock  = Now; /* local clock */
2361   else
2362     tso->gran.clock  = 0;
2363
2364   IF_DEBUG(gran,printTSO(tso));
2365 #elif defined(PARALLEL_HASKELL)
2366 # if defined(DEBUG)
2367   tso->par.magic = TSO_MAGIC; // debugging only
2368 # endif
2369   tso->par.sparkname   = 0;
2370   tso->par.startedat   = CURRENT_TIME; 
2371   tso->par.exported    = 0;
2372   tso->par.basicblocks = 0;
2373   tso->par.allocs      = 0;
2374   tso->par.exectime    = 0;
2375   tso->par.fetchtime   = 0;
2376   tso->par.fetchcount  = 0;
2377   tso->par.blocktime   = 0;
2378   tso->par.blockcount  = 0;
2379   tso->par.blockedat   = 0;
2380   tso->par.globalsparks = 0;
2381   tso->par.localsparks  = 0;
2382 #endif
2383
2384 #if defined(GRAN)
2385   globalGranStats.tot_threads_created++;
2386   globalGranStats.threads_created_on_PE[CurrentProc]++;
2387   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2388   globalGranStats.tot_sq_probes++;
2389 #elif defined(PARALLEL_HASKELL)
2390   // collect parallel global statistics (currently done together with GC stats)
2391   if (RtsFlags.ParFlags.ParStats.Global &&
2392       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2393     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2394     globalParStats.tot_threads_created++;
2395   }
2396 #endif 
2397
2398 #if defined(GRAN)
2399   IF_GRAN_DEBUG(pri,
2400                 sched_belch("==__ schedule: Created TSO %d (%p);",
2401                       CurrentProc, tso, tso->id));
2402 #elif defined(PARALLEL_HASKELL)
2403   IF_PAR_DEBUG(verbose,
2404                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2405                            (long)tso->id, tso, advisory_thread_count));
2406 #else
2407   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2408                                  (long)tso->id, (long)tso->stack_size));
2409 #endif    
2410   return tso;
2411 }
2412
2413 #if defined(PAR)
2414 /* RFP:
2415    all parallel thread creation calls should fall through the following routine.
2416 */
2417 StgTSO *
2418 createThreadFromSpark(rtsSpark spark) 
2419 { StgTSO *tso;
2420   ASSERT(spark != (rtsSpark)NULL);
2421 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2422   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2423   { threadsIgnored++;
2424     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2425           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2426     return END_TSO_QUEUE;
2427   }
2428   else
2429   { threadsCreated++;
2430     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2431     if (tso==END_TSO_QUEUE)     
2432       barf("createSparkThread: Cannot create TSO");
2433 #if defined(DIST)
2434     tso->priority = AdvisoryPriority;
2435 #endif
2436     pushClosure(tso,spark);
2437     addToRunQueue(tso);
2438     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2439   }
2440   return tso;
2441 }
2442 #endif
2443
2444 /*
2445   Turn a spark into a thread.
2446   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2447 */
2448 #if 0
2449 StgTSO *
2450 activateSpark (rtsSpark spark) 
2451 {
2452   StgTSO *tso;
2453
2454   tso = createSparkThread(spark);
2455   if (RtsFlags.ParFlags.ParStats.Full) {   
2456     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2457       IF_PAR_DEBUG(verbose,
2458                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2459                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2460   }
2461   // ToDo: fwd info on local/global spark to thread -- HWL
2462   // tso->gran.exported =  spark->exported;
2463   // tso->gran.locked =   !spark->global;
2464   // tso->gran.sparkname = spark->name;
2465
2466   return tso;
2467 }
2468 #endif
2469
2470 /* ---------------------------------------------------------------------------
2471  * scheduleThread()
2472  *
2473  * scheduleThread puts a thread on the head of the runnable queue.
2474  * This will usually be done immediately after a thread is created.
2475  * The caller of scheduleThread must create the thread using e.g.
2476  * createThread and push an appropriate closure
2477  * on this thread's stack before the scheduler is invoked.
2478  * ------------------------------------------------------------------------ */
2479
2480 static void
2481 scheduleThread_(StgTSO *tso)
2482 {
2483   // The thread goes at the *end* of the run-queue, to avoid possible
2484   // starvation of any threads already on the queue.
2485   APPEND_TO_RUN_QUEUE(tso);
2486   threadRunnable();
2487 }
2488
2489 void
2490 scheduleThread(StgTSO* tso)
2491 {
2492   ACQUIRE_LOCK(&sched_mutex);
2493   scheduleThread_(tso);
2494   RELEASE_LOCK(&sched_mutex);
2495 }
2496
2497 #if defined(RTS_SUPPORTS_THREADS)
2498 static Condition bound_cond_cache;
2499 static int bound_cond_cache_full = 0;
2500 #endif
2501
2502
2503 SchedulerStatus
2504 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2505                    Capability *initialCapability)
2506 {
2507     // Precondition: sched_mutex must be held
2508     StgMainThread *m;
2509
2510     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2511     m->tso = tso;
2512     tso->main = m;
2513     m->ret = ret;
2514     m->stat = NoStatus;
2515     m->link = main_threads;
2516     m->prev = NULL;
2517     if (main_threads != NULL) {
2518         main_threads->prev = m;
2519     }
2520     main_threads = m;
2521
2522 #if defined(RTS_SUPPORTS_THREADS)
2523     // Allocating a new condition for each thread is expensive, so we
2524     // cache one.  This is a pretty feeble hack, but it helps speed up
2525     // consecutive call-ins quite a bit.
2526     if (bound_cond_cache_full) {
2527         m->bound_thread_cond = bound_cond_cache;
2528         bound_cond_cache_full = 0;
2529     } else {
2530         initCondition(&m->bound_thread_cond);
2531     }
2532 #endif
2533
2534     /* Put the thread on the main-threads list prior to scheduling the TSO.
2535        Failure to do so introduces a race condition in the MT case (as
2536        identified by Wolfgang Thaller), whereby the new task/OS thread 
2537        created by scheduleThread_() would complete prior to the thread
2538        that spawned it managed to put 'itself' on the main-threads list.
2539        The upshot of it all being that the worker thread wouldn't get to
2540        signal the completion of the its work item for the main thread to
2541        see (==> it got stuck waiting.)    -- sof 6/02.
2542     */
2543     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2544     
2545     APPEND_TO_RUN_QUEUE(tso);
2546     // NB. Don't call threadRunnable() here, because the thread is
2547     // bound and only runnable by *this* OS thread, so waking up other
2548     // workers will just slow things down.
2549
2550     return waitThread_(m, initialCapability);
2551 }
2552
2553 /* ---------------------------------------------------------------------------
2554  * initScheduler()
2555  *
2556  * Initialise the scheduler.  This resets all the queues - if the
2557  * queues contained any threads, they'll be garbage collected at the
2558  * next pass.
2559  *
2560  * ------------------------------------------------------------------------ */
2561
2562 void 
2563 initScheduler(void)
2564 {
2565 #if defined(GRAN)
2566   nat i;
2567
2568   for (i=0; i<=MAX_PROC; i++) {
2569     run_queue_hds[i]      = END_TSO_QUEUE;
2570     run_queue_tls[i]      = END_TSO_QUEUE;
2571     blocked_queue_hds[i]  = END_TSO_QUEUE;
2572     blocked_queue_tls[i]  = END_TSO_QUEUE;
2573     ccalling_threadss[i]  = END_TSO_QUEUE;
2574     blackhole_queue[i]    = END_TSO_QUEUE;
2575     sleeping_queue        = END_TSO_QUEUE;
2576   }
2577 #else
2578   run_queue_hd      = END_TSO_QUEUE;
2579   run_queue_tl      = END_TSO_QUEUE;
2580   blocked_queue_hd  = END_TSO_QUEUE;
2581   blocked_queue_tl  = END_TSO_QUEUE;
2582   blackhole_queue   = END_TSO_QUEUE;
2583   sleeping_queue    = END_TSO_QUEUE;
2584 #endif 
2585
2586   suspended_ccalling_threads  = END_TSO_QUEUE;
2587
2588   main_threads = NULL;
2589   all_threads  = END_TSO_QUEUE;
2590
2591   context_switch = 0;
2592   interrupted    = 0;
2593
2594   RtsFlags.ConcFlags.ctxtSwitchTicks =
2595       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2596       
2597 #if defined(RTS_SUPPORTS_THREADS)
2598   /* Initialise the mutex and condition variables used by
2599    * the scheduler. */
2600   initMutex(&sched_mutex);
2601   initMutex(&term_mutex);
2602 #endif
2603   
2604   ACQUIRE_LOCK(&sched_mutex);
2605
2606   /* A capability holds the state a native thread needs in
2607    * order to execute STG code. At least one capability is
2608    * floating around (only SMP builds have more than one).
2609    */
2610   initCapabilities();
2611   
2612 #if defined(RTS_SUPPORTS_THREADS)
2613   initTaskManager();
2614 #endif
2615
2616 #if defined(SMP)
2617   /* eagerly start some extra workers */
2618   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2619 #endif
2620
2621 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2622   initSparkPools();
2623 #endif
2624
2625   RELEASE_LOCK(&sched_mutex);
2626 }
2627
2628 void
2629 exitScheduler( void )
2630 {
2631     interrupted = rtsTrue;
2632     shutting_down_scheduler = rtsTrue;
2633 #if defined(RTS_SUPPORTS_THREADS)
2634     if (threadIsTask(osThreadId())) { taskStop(); }
2635     stopTaskManager();
2636 #endif
2637 }
2638
2639 /* ----------------------------------------------------------------------------
2640    Managing the per-task allocation areas.
2641    
2642    Each capability comes with an allocation area.  These are
2643    fixed-length block lists into which allocation can be done.
2644
2645    ToDo: no support for two-space collection at the moment???
2646    ------------------------------------------------------------------------- */
2647
2648 static SchedulerStatus
2649 waitThread_(StgMainThread* m, Capability *initialCapability)
2650 {
2651   SchedulerStatus stat;
2652
2653   // Precondition: sched_mutex must be held.
2654   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2655
2656 #if defined(GRAN)
2657   /* GranSim specific init */
2658   CurrentTSO = m->tso;                // the TSO to run
2659   procStatus[MainProc] = Busy;        // status of main PE
2660   CurrentProc = MainProc;             // PE to run it on
2661   schedule(m,initialCapability);
2662 #else
2663   schedule(m,initialCapability);
2664   ASSERT(m->stat != NoStatus);
2665 #endif
2666
2667   stat = m->stat;
2668
2669 #if defined(RTS_SUPPORTS_THREADS)
2670   // Free the condition variable, returning it to the cache if possible.
2671   if (!bound_cond_cache_full) {
2672       bound_cond_cache = m->bound_thread_cond;
2673       bound_cond_cache_full = 1;
2674   } else {
2675       closeCondition(&m->bound_thread_cond);
2676   }
2677 #endif
2678
2679   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2680   stgFree(m);
2681
2682   // Postcondition: sched_mutex still held
2683   return stat;
2684 }
2685
2686 /* ---------------------------------------------------------------------------
2687    Where are the roots that we know about?
2688
2689         - all the threads on the runnable queue
2690         - all the threads on the blocked queue
2691         - all the threads on the sleeping queue
2692         - all the thread currently executing a _ccall_GC
2693         - all the "main threads"
2694      
2695    ------------------------------------------------------------------------ */
2696
2697 /* This has to be protected either by the scheduler monitor, or by the
2698         garbage collection monitor (probably the latter).
2699         KH @ 25/10/99
2700 */
2701
2702 void
2703 GetRoots( evac_fn evac )
2704 {
2705 #if defined(GRAN)
2706   {
2707     nat i;
2708     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2709       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2710           evac((StgClosure **)&run_queue_hds[i]);
2711       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2712           evac((StgClosure **)&run_queue_tls[i]);
2713       
2714       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2715           evac((StgClosure **)&blocked_queue_hds[i]);
2716       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2717           evac((StgClosure **)&blocked_queue_tls[i]);
2718       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2719           evac((StgClosure **)&ccalling_threads[i]);
2720     }
2721   }
2722
2723   markEventQueue();
2724
2725 #else /* !GRAN */
2726   if (run_queue_hd != END_TSO_QUEUE) {
2727       ASSERT(run_queue_tl != END_TSO_QUEUE);
2728       evac((StgClosure **)&run_queue_hd);
2729       evac((StgClosure **)&run_queue_tl);
2730   }
2731   
2732   if (blocked_queue_hd != END_TSO_QUEUE) {
2733       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2734       evac((StgClosure **)&blocked_queue_hd);
2735       evac((StgClosure **)&blocked_queue_tl);
2736   }
2737   
2738   if (sleeping_queue != END_TSO_QUEUE) {
2739       evac((StgClosure **)&sleeping_queue);
2740   }
2741 #endif 
2742
2743   if (blackhole_queue != END_TSO_QUEUE) {
2744       evac((StgClosure **)&blackhole_queue);
2745   }
2746
2747   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2748       evac((StgClosure **)&suspended_ccalling_threads);
2749   }
2750
2751 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2752   markSparkQueue(evac);
2753 #endif
2754
2755 #if defined(RTS_USER_SIGNALS)
2756   // mark the signal handlers (signals should be already blocked)
2757   markSignalHandlers(evac);
2758 #endif
2759 }
2760
2761 /* -----------------------------------------------------------------------------
2762    performGC
2763
2764    This is the interface to the garbage collector from Haskell land.
2765    We provide this so that external C code can allocate and garbage
2766    collect when called from Haskell via _ccall_GC.
2767
2768    It might be useful to provide an interface whereby the programmer
2769    can specify more roots (ToDo).
2770    
2771    This needs to be protected by the GC condition variable above.  KH.
2772    -------------------------------------------------------------------------- */
2773
2774 static void (*extra_roots)(evac_fn);
2775
2776 void
2777 performGC(void)
2778 {
2779   /* Obligated to hold this lock upon entry */
2780   ACQUIRE_LOCK(&sched_mutex);
2781   GarbageCollect(GetRoots,rtsFalse);
2782   RELEASE_LOCK(&sched_mutex);
2783 }
2784
2785 void
2786 performMajorGC(void)
2787 {
2788   ACQUIRE_LOCK(&sched_mutex);
2789   GarbageCollect(GetRoots,rtsTrue);
2790   RELEASE_LOCK(&sched_mutex);
2791 }
2792
2793 static void
2794 AllRoots(evac_fn evac)
2795 {
2796     GetRoots(evac);             // the scheduler's roots
2797     extra_roots(evac);          // the user's roots
2798 }
2799
2800 void
2801 performGCWithRoots(void (*get_roots)(evac_fn))
2802 {
2803   ACQUIRE_LOCK(&sched_mutex);
2804   extra_roots = get_roots;
2805   GarbageCollect(AllRoots,rtsFalse);
2806   RELEASE_LOCK(&sched_mutex);
2807 }
2808
2809 /* -----------------------------------------------------------------------------
2810    Stack overflow
2811
2812    If the thread has reached its maximum stack size, then raise the
2813    StackOverflow exception in the offending thread.  Otherwise
2814    relocate the TSO into a larger chunk of memory and adjust its stack
2815    size appropriately.
2816    -------------------------------------------------------------------------- */
2817
2818 static StgTSO *
2819 threadStackOverflow(StgTSO *tso)
2820 {
2821   nat new_stack_size, stack_words;
2822   lnat new_tso_size;
2823   StgPtr new_sp;
2824   StgTSO *dest;
2825
2826   IF_DEBUG(sanity,checkTSO(tso));
2827   if (tso->stack_size >= tso->max_stack_size) {
2828
2829     IF_DEBUG(gc,
2830              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2831                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2832              /* If we're debugging, just print out the top of the stack */
2833              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2834                                               tso->sp+64)));
2835
2836     /* Send this thread the StackOverflow exception */
2837     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2838     return tso;
2839   }
2840
2841   /* Try to double the current stack size.  If that takes us over the
2842    * maximum stack size for this thread, then use the maximum instead.
2843    * Finally round up so the TSO ends up as a whole number of blocks.
2844    */
2845   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2846   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2847                                        TSO_STRUCT_SIZE)/sizeof(W_);
2848   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2849   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2850
2851   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2852
2853   dest = (StgTSO *)allocate(new_tso_size);
2854   TICK_ALLOC_TSO(new_stack_size,0);
2855
2856   /* copy the TSO block and the old stack into the new area */
2857   memcpy(dest,tso,TSO_STRUCT_SIZE);
2858   stack_words = tso->stack + tso->stack_size - tso->sp;
2859   new_sp = (P_)dest + new_tso_size - stack_words;
2860   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2861
2862   /* relocate the stack pointers... */
2863   dest->sp         = new_sp;
2864   dest->stack_size = new_stack_size;
2865         
2866   /* Mark the old TSO as relocated.  We have to check for relocated
2867    * TSOs in the garbage collector and any primops that deal with TSOs.
2868    *
2869    * It's important to set the sp value to just beyond the end
2870    * of the stack, so we don't attempt to scavenge any part of the
2871    * dead TSO's stack.
2872    */
2873   tso->what_next = ThreadRelocated;
2874   tso->link = dest;
2875   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2876   tso->why_blocked = NotBlocked;
2877
2878   IF_PAR_DEBUG(verbose,
2879                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2880                      tso->id, tso, tso->stack_size);
2881                /* If we're debugging, just print out the top of the stack */
2882                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2883                                                 tso->sp+64)));
2884   
2885   IF_DEBUG(sanity,checkTSO(tso));
2886 #if 0
2887   IF_DEBUG(scheduler,printTSO(dest));
2888 #endif
2889
2890   return dest;
2891 }
2892
2893 /* ---------------------------------------------------------------------------
2894    Wake up a queue that was blocked on some resource.
2895    ------------------------------------------------------------------------ */
2896
2897 #if defined(GRAN)
2898 STATIC_INLINE void
2899 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2900 {
2901 }
2902 #elif defined(PARALLEL_HASKELL)
2903 STATIC_INLINE void
2904 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2905 {
2906   /* write RESUME events to log file and
2907      update blocked and fetch time (depending on type of the orig closure) */
2908   if (RtsFlags.ParFlags.ParStats.Full) {
2909     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2910                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2911                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2912     if (EMPTY_RUN_QUEUE())
2913       emitSchedule = rtsTrue;
2914
2915     switch (get_itbl(node)->type) {
2916         case FETCH_ME_BQ:
2917           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2918           break;
2919         case RBH:
2920         case FETCH_ME:
2921         case BLACKHOLE_BQ:
2922           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2923           break;
2924 #ifdef DIST
2925         case MVAR:
2926           break;
2927 #endif    
2928         default:
2929           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2930         }
2931       }
2932 }
2933 #endif
2934
2935 #if defined(GRAN)
2936 static StgBlockingQueueElement *
2937 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2938 {
2939     StgTSO *tso;
2940     PEs node_loc, tso_loc;
2941
2942     node_loc = where_is(node); // should be lifted out of loop
2943     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2944     tso_loc = where_is((StgClosure *)tso);
2945     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2946       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2947       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2948       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2949       // insertThread(tso, node_loc);
2950       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2951                 ResumeThread,
2952                 tso, node, (rtsSpark*)NULL);
2953       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2954       // len_local++;
2955       // len++;
2956     } else { // TSO is remote (actually should be FMBQ)
2957       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2958                                   RtsFlags.GranFlags.Costs.gunblocktime +
2959                                   RtsFlags.GranFlags.Costs.latency;
2960       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2961                 UnblockThread,
2962                 tso, node, (rtsSpark*)NULL);
2963       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2964       // len++;
2965     }
2966     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2967     IF_GRAN_DEBUG(bq,
2968                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2969                           (node_loc==tso_loc ? "Local" : "Global"), 
2970                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2971     tso->block_info.closure = NULL;
2972     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2973                              tso->id, tso));
2974 }
2975 #elif defined(PARALLEL_HASKELL)
2976 static StgBlockingQueueElement *
2977 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2978 {
2979     StgBlockingQueueElement *next;
2980
2981     switch (get_itbl(bqe)->type) {
2982     case TSO:
2983       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2984       /* if it's a TSO just push it onto the run_queue */
2985       next = bqe->link;
2986       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2987       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2988       threadRunnable();
2989       unblockCount(bqe, node);
2990       /* reset blocking status after dumping event */
2991       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2992       break;
2993
2994     case BLOCKED_FETCH:
2995       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2996       next = bqe->link;
2997       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2998       PendingFetches = (StgBlockedFetch *)bqe;
2999       break;
3000
3001 # if defined(DEBUG)
3002       /* can ignore this case in a non-debugging setup; 
3003          see comments on RBHSave closures above */
3004     case CONSTR:
3005       /* check that the closure is an RBHSave closure */
3006       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3007              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3008              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3009       break;
3010
3011     default:
3012       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3013            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3014            (StgClosure *)bqe);
3015 # endif
3016     }
3017   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3018   return next;
3019 }
3020
3021 #else /* !GRAN && !PARALLEL_HASKELL */
3022 static StgTSO *
3023 unblockOneLocked(StgTSO *tso)
3024 {
3025   StgTSO *next;
3026
3027   ASSERT(get_itbl(tso)->type == TSO);
3028   ASSERT(tso->why_blocked != NotBlocked);
3029   tso->why_blocked = NotBlocked;
3030   next = tso->link;
3031   tso->link = END_TSO_QUEUE;
3032   APPEND_TO_RUN_QUEUE(tso);
3033   threadRunnable();
3034   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3035   return next;
3036 }
3037 #endif
3038
3039 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3040 INLINE_ME StgBlockingQueueElement *
3041 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3042 {
3043   ACQUIRE_LOCK(&sched_mutex);
3044   bqe = unblockOneLocked(bqe, node);
3045   RELEASE_LOCK(&sched_mutex);
3046   return bqe;
3047 }
3048 #else
3049 INLINE_ME StgTSO *
3050 unblockOne(StgTSO *tso)
3051 {
3052   ACQUIRE_LOCK(&sched_mutex);
3053   tso = unblockOneLocked(tso);
3054   RELEASE_LOCK(&sched_mutex);
3055   return tso;
3056 }
3057 #endif
3058
3059 #if defined(GRAN)
3060 void 
3061 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3062 {
3063   StgBlockingQueueElement *bqe;
3064   PEs node_loc;
3065   nat len = 0; 
3066
3067   IF_GRAN_DEBUG(bq, 
3068                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3069                       node, CurrentProc, CurrentTime[CurrentProc], 
3070                       CurrentTSO->id, CurrentTSO));
3071
3072   node_loc = where_is(node);
3073
3074   ASSERT(q == END_BQ_QUEUE ||
3075          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3076          get_itbl(q)->type == CONSTR); // closure (type constructor)
3077   ASSERT(is_unique(node));
3078
3079   /* FAKE FETCH: magically copy the node to the tso's proc;
3080      no Fetch necessary because in reality the node should not have been 
3081      moved to the other PE in the first place
3082   */
3083   if (CurrentProc!=node_loc) {
3084     IF_GRAN_DEBUG(bq, 
3085                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3086                         node, node_loc, CurrentProc, CurrentTSO->id, 
3087                         // CurrentTSO, where_is(CurrentTSO),
3088                         node->header.gran.procs));
3089     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3090     IF_GRAN_DEBUG(bq, 
3091                   debugBelch("## new bitmask of node %p is %#x\n",
3092                         node, node->header.gran.procs));
3093     if (RtsFlags.GranFlags.GranSimStats.Global) {
3094       globalGranStats.tot_fake_fetches++;
3095     }
3096   }
3097
3098   bqe = q;
3099   // ToDo: check: ASSERT(CurrentProc==node_loc);
3100   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3101     //next = bqe->link;
3102     /* 
3103        bqe points to the current element in the queue
3104        next points to the next element in the queue
3105     */
3106     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3107     //tso_loc = where_is(tso);
3108     len++;
3109     bqe = unblockOneLocked(bqe, node);
3110   }
3111
3112   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3113      the closure to make room for the anchor of the BQ */
3114   if (bqe!=END_BQ_QUEUE) {
3115     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3116     /*
3117     ASSERT((info_ptr==&RBH_Save_0_info) ||
3118            (info_ptr==&RBH_Save_1_info) ||
3119            (info_ptr==&RBH_Save_2_info));
3120     */
3121     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3122     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3123     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3124
3125     IF_GRAN_DEBUG(bq,
3126                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3127                         node, info_type(node)));
3128   }
3129
3130   /* statistics gathering */
3131   if (RtsFlags.GranFlags.GranSimStats.Global) {
3132     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3133     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3134     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3135     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3136   }
3137   IF_GRAN_DEBUG(bq,
3138                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3139                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3140 }
3141 #elif defined(PARALLEL_HASKELL)
3142 void 
3143 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3144 {
3145   StgBlockingQueueElement *bqe;
3146
3147   ACQUIRE_LOCK(&sched_mutex);
3148
3149   IF_PAR_DEBUG(verbose, 
3150                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3151                      node, mytid));
3152 #ifdef DIST  
3153   //RFP
3154   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3155     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3156     return;
3157   }
3158 #endif
3159   
3160   ASSERT(q == END_BQ_QUEUE ||
3161          get_itbl(q)->type == TSO ||           
3162          get_itbl(q)->type == BLOCKED_FETCH || 
3163          get_itbl(q)->type == CONSTR); 
3164
3165   bqe = q;
3166   while (get_itbl(bqe)->type==TSO || 
3167          get_itbl(bqe)->type==BLOCKED_FETCH) {
3168     bqe = unblockOneLocked(bqe, node);
3169   }
3170   RELEASE_LOCK(&sched_mutex);
3171 }
3172
3173 #else   /* !GRAN && !PARALLEL_HASKELL */
3174
3175 void
3176 awakenBlockedQueueNoLock(StgTSO *tso)
3177 {
3178   while (tso != END_TSO_QUEUE) {
3179     tso = unblockOneLocked(tso);
3180   }
3181 }
3182
3183 void
3184 awakenBlockedQueue(StgTSO *tso)
3185 {
3186   ACQUIRE_LOCK(&sched_mutex);
3187   while (tso != END_TSO_QUEUE) {
3188     tso = unblockOneLocked(tso);
3189   }
3190   RELEASE_LOCK(&sched_mutex);
3191 }
3192 #endif
3193
3194 /* ---------------------------------------------------------------------------
3195    Interrupt execution
3196    - usually called inside a signal handler so it mustn't do anything fancy.   
3197    ------------------------------------------------------------------------ */
3198
3199 void
3200 interruptStgRts(void)
3201 {
3202     interrupted    = 1;
3203     context_switch = 1;
3204 }
3205
3206 /* -----------------------------------------------------------------------------
3207    Unblock a thread
3208
3209    This is for use when we raise an exception in another thread, which
3210    may be blocked.
3211    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3212    -------------------------------------------------------------------------- */
3213
3214 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3215 /*
3216   NB: only the type of the blocking queue is different in GranSim and GUM
3217       the operations on the queue-elements are the same
3218       long live polymorphism!
3219
3220   Locks: sched_mutex is held upon entry and exit.
3221
3222 */
3223 static void
3224 unblockThread(StgTSO *tso)
3225 {
3226   StgBlockingQueueElement *t, **last;
3227
3228   switch (tso->why_blocked) {
3229
3230   case NotBlocked:
3231     return;  /* not blocked */
3232
3233   case BlockedOnSTM:
3234     // Be careful: nothing to do here!  We tell the scheduler that the thread
3235     // is runnable and we leave it to the stack-walking code to abort the 
3236     // transaction while unwinding the stack.  We should perhaps have a debugging
3237     // test to make sure that this really happens and that the 'zombie' transaction
3238     // does not get committed.
3239     goto done;
3240
3241   case BlockedOnMVar:
3242     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3243     {
3244       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3245       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3246
3247       last = (StgBlockingQueueElement **)&mvar->head;
3248       for (t = (StgBlockingQueueElement *)mvar->head; 
3249            t != END_BQ_QUEUE; 
3250            last = &t->link, last_tso = t, t = t->link) {
3251         if (t == (StgBlockingQueueElement *)tso) {
3252           *last = (StgBlockingQueueElement *)tso->link;
3253           if (mvar->tail == tso) {
3254             mvar->tail = (StgTSO *)last_tso;
3255           }
3256           goto done;
3257         }
3258       }
3259       barf("unblockThread (MVAR): TSO not found");
3260     }
3261
3262   case BlockedOnBlackHole:
3263     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3264     {
3265       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3266
3267       last = &bq->blocking_queue;
3268       for (t = bq->blocking_queue; 
3269            t != END_BQ_QUEUE; 
3270            last = &t->link, t = t->link) {
3271         if (t == (StgBlockingQueueElement *)tso) {
3272           *last = (StgBlockingQueueElement *)tso->link;
3273           goto done;
3274         }
3275       }
3276       barf("unblockThread (BLACKHOLE): TSO not found");
3277     }
3278
3279   case BlockedOnException:
3280     {
3281       StgTSO *target  = tso->block_info.tso;
3282
3283       ASSERT(get_itbl(target)->type == TSO);
3284
3285       if (target->what_next == ThreadRelocated) {
3286           target = target->link;
3287           ASSERT(get_itbl(target)->type == TSO);
3288       }
3289
3290       ASSERT(target->blocked_exceptions != NULL);
3291
3292       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3293       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3294            t != END_BQ_QUEUE; 
3295            last = &t->link, t = t->link) {
3296         ASSERT(get_itbl(t)->type == TSO);
3297         if (t == (StgBlockingQueueElement *)tso) {
3298           *last = (StgBlockingQueueElement *)tso->link;
3299           goto done;
3300         }
3301       }
3302       barf("unblockThread (Exception): TSO not found");
3303     }
3304
3305   case BlockedOnRead:
3306   case BlockedOnWrite:
3307 #if defined(mingw32_HOST_OS)
3308   case BlockedOnDoProc:
3309 #endif
3310     {
3311       /* take TSO off blocked_queue */
3312       StgBlockingQueueElement *prev = NULL;
3313       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3314            prev = t, t = t->link) {
3315         if (t == (StgBlockingQueueElement *)tso) {
3316           if (prev == NULL) {
3317             blocked_queue_hd = (StgTSO *)t->link;
3318             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3319               blocked_queue_tl = END_TSO_QUEUE;
3320             }
3321           } else {
3322             prev->link = t->link;
3323             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3324               blocked_queue_tl = (StgTSO *)prev;
3325             }
3326           }
3327           goto done;
3328         }
3329       }
3330       barf("unblockThread (I/O): TSO not found");
3331     }
3332
3333   case BlockedOnDelay:
3334     {
3335       /* take TSO off sleeping_queue */
3336       StgBlockingQueueElement *prev = NULL;
3337       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3338            prev = t, t = t->link) {
3339         if (t == (StgBlockingQueueElement *)tso) {
3340           if (prev == NULL) {
3341             sleeping_queue = (StgTSO *)t->link;
3342           } else {
3343             prev->link = t->link;
3344           }
3345           goto done;
3346         }
3347       }
3348       barf("unblockThread (delay): TSO not found");
3349     }
3350
3351   default:
3352     barf("unblockThread");
3353   }
3354
3355  done:
3356   tso->link = END_TSO_QUEUE;
3357   tso->why_blocked = NotBlocked;
3358   tso->block_info.closure = NULL;
3359   PUSH_ON_RUN_QUEUE(tso);
3360 }
3361 #else
3362 static void
3363 unblockThread(StgTSO *tso)
3364 {
3365   StgTSO *t, **last;
3366   
3367   /* To avoid locking unnecessarily. */
3368   if (tso->why_blocked == NotBlocked) {
3369     return;
3370   }
3371
3372   switch (tso->why_blocked) {
3373
3374   case BlockedOnSTM:
3375     // Be careful: nothing to do here!  We tell the scheduler that the thread
3376     // is runnable and we leave it to the stack-walking code to abort the 
3377     // transaction while unwinding the stack.  We should perhaps have a debugging
3378     // test to make sure that this really happens and that the 'zombie' transaction
3379     // does not get committed.
3380     goto done;
3381
3382   case BlockedOnMVar:
3383     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3384     {
3385       StgTSO *last_tso = END_TSO_QUEUE;
3386       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3387
3388       last = &mvar->head;
3389       for (t = mvar->head; t != END_TSO_QUEUE; 
3390            last = &t->link, last_tso = t, t = t->link) {
3391         if (t == tso) {
3392           *last = tso->link;
3393           if (mvar->tail == tso) {
3394             mvar->tail = last_tso;
3395           }
3396           goto done;
3397         }
3398       }
3399       barf("unblockThread (MVAR): TSO not found");
3400     }
3401
3402   case BlockedOnBlackHole:
3403     {
3404       last = &blackhole_queue;
3405       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3406            last = &t->link, t = t->link) {
3407         if (t == tso) {
3408           *last = tso->link;
3409           goto done;
3410         }
3411       }
3412       barf("unblockThread (BLACKHOLE): TSO not found");
3413     }
3414
3415   case BlockedOnException:
3416     {
3417       StgTSO *target  = tso->block_info.tso;
3418
3419       ASSERT(get_itbl(target)->type == TSO);
3420
3421       while (target->what_next == ThreadRelocated) {
3422           target = target->link;
3423           ASSERT(get_itbl(target)->type == TSO);
3424       }
3425       
3426       ASSERT(target->blocked_exceptions != NULL);
3427
3428       last = &target->blocked_exceptions;
3429       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3430            last = &t->link, t = t->link) {
3431         ASSERT(get_itbl(t)->type == TSO);
3432         if (t == tso) {
3433           *last = tso->link;
3434           goto done;
3435         }
3436       }
3437       barf("unblockThread (Exception): TSO not found");
3438     }
3439
3440   case BlockedOnRead:
3441   case BlockedOnWrite:
3442 #if defined(mingw32_HOST_OS)
3443   case BlockedOnDoProc:
3444 #endif
3445     {
3446       StgTSO *prev = NULL;
3447       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3448            prev = t, t = t->link) {
3449         if (t == tso) {
3450           if (prev == NULL) {
3451             blocked_queue_hd = t->link;
3452             if (blocked_queue_tl == t) {
3453               blocked_queue_tl = END_TSO_QUEUE;
3454             }
3455           } else {
3456             prev->link = t->link;
3457             if (blocked_queue_tl == t) {
3458               blocked_queue_tl = prev;
3459             }
3460           }
3461           goto done;
3462         }
3463       }
3464       barf("unblockThread (I/O): TSO not found");
3465     }
3466
3467   case BlockedOnDelay:
3468     {
3469       StgTSO *prev = NULL;
3470       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3471            prev = t, t = t->link) {
3472         if (t == tso) {
3473           if (prev == NULL) {
3474             sleeping_queue = t->link;
3475           } else {
3476             prev->link = t->link;
3477           }
3478           goto done;
3479         }
3480       }
3481       barf("unblockThread (delay): TSO not found");
3482     }
3483
3484   default:
3485     barf("unblockThread");
3486   }
3487
3488  done:
3489   tso->link = END_TSO_QUEUE;
3490   tso->why_blocked = NotBlocked;
3491   tso->block_info.closure = NULL;
3492   APPEND_TO_RUN_QUEUE(tso);
3493 }
3494 #endif
3495
3496 /* -----------------------------------------------------------------------------
3497  * checkBlackHoles()
3498  *
3499  * Check the blackhole_queue for threads that can be woken up.  We do
3500  * this periodically: before every GC, and whenever the run queue is
3501  * empty.
3502  *
3503  * An elegant solution might be to just wake up all the blocked
3504  * threads with awakenBlockedQueue occasionally: they'll go back to
3505  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3506  * doesn't give us a way to tell whether we've actually managed to
3507  * wake up any threads, so we would be busy-waiting.
3508  *
3509  * -------------------------------------------------------------------------- */
3510
3511 static rtsBool
3512 checkBlackHoles( void )
3513 {
3514     StgTSO **prev, *t;
3515     rtsBool any_woke_up = rtsFalse;
3516     StgHalfWord type;
3517
3518     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3519
3520     // ASSUMES: sched_mutex
3521     prev = &blackhole_queue;
3522     t = blackhole_queue;
3523     while (t != END_TSO_QUEUE) {
3524         ASSERT(t->why_blocked == BlockedOnBlackHole);
3525         type = get_itbl(t->block_info.closure)->type;
3526         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3527             t = unblockOneLocked(t);
3528             *prev = t;
3529             any_woke_up = rtsTrue;
3530         } else {
3531             prev = &t->link;
3532             t = t->link;
3533         }
3534     }
3535
3536     return any_woke_up;
3537 }
3538
3539 /* -----------------------------------------------------------------------------
3540  * raiseAsync()
3541  *
3542  * The following function implements the magic for raising an
3543  * asynchronous exception in an existing thread.
3544  *
3545  * We first remove the thread from any queue on which it might be
3546  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3547  *
3548  * We strip the stack down to the innermost CATCH_FRAME, building
3549  * thunks in the heap for all the active computations, so they can 
3550  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3551  * an application of the handler to the exception, and push it on
3552  * the top of the stack.
3553  * 
3554  * How exactly do we save all the active computations?  We create an
3555  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3556  * AP_STACKs pushes everything from the corresponding update frame
3557  * upwards onto the stack.  (Actually, it pushes everything up to the
3558  * next update frame plus a pointer to the next AP_STACK object.
3559  * Entering the next AP_STACK object pushes more onto the stack until we
3560  * reach the last AP_STACK object - at which point the stack should look
3561  * exactly as it did when we killed the TSO and we can continue
3562  * execution by entering the closure on top of the stack.
3563  *
3564  * We can also kill a thread entirely - this happens if either (a) the 
3565  * exception passed to raiseAsync is NULL, or (b) there's no
3566  * CATCH_FRAME on the stack.  In either case, we strip the entire
3567  * stack and replace the thread with a zombie.
3568  *
3569  * Locks: sched_mutex held upon entry nor exit.
3570  *
3571  * -------------------------------------------------------------------------- */
3572  
3573 void 
3574 deleteThread(StgTSO *tso)
3575 {
3576   if (tso->why_blocked != BlockedOnCCall &&
3577       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3578       raiseAsync(tso,NULL);
3579   }
3580 }
3581
3582 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3583 static void 
3584 deleteThreadImmediately(StgTSO *tso)
3585 { // for forkProcess only:
3586   // delete thread without giving it a chance to catch the KillThread exception
3587
3588   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3589       return;
3590   }
3591
3592   if (tso->why_blocked != BlockedOnCCall &&
3593       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3594     unblockThread(tso);
3595   }
3596
3597   tso->what_next = ThreadKilled;
3598 }
3599 #endif
3600
3601 void
3602 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3603 {
3604   /* When raising async exs from contexts where sched_mutex isn't held;
3605      use raiseAsyncWithLock(). */
3606   ACQUIRE_LOCK(&sched_mutex);
3607   raiseAsync(tso,exception);
3608   RELEASE_LOCK(&sched_mutex);
3609 }
3610
3611 void
3612 raiseAsync(StgTSO *tso, StgClosure *exception)
3613 {
3614     raiseAsync_(tso, exception, rtsFalse);
3615 }
3616
3617 static void
3618 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3619 {
3620     StgRetInfoTable *info;
3621     StgPtr sp;
3622   
3623     // Thread already dead?
3624     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3625         return;
3626     }
3627
3628     IF_DEBUG(scheduler, 
3629              sched_belch("raising exception in thread %ld.", (long)tso->id));
3630     
3631     // Remove it from any blocking queues
3632     unblockThread(tso);
3633
3634     sp = tso->sp;
3635     
3636     // The stack freezing code assumes there's a closure pointer on
3637     // the top of the stack, so we have to arrange that this is the case...
3638     //
3639     if (sp[0] == (W_)&stg_enter_info) {
3640         sp++;
3641     } else {
3642         sp--;
3643         sp[0] = (W_)&stg_dummy_ret_closure;
3644     }
3645
3646     while (1) {
3647         nat i;
3648
3649         // 1. Let the top of the stack be the "current closure"
3650         //
3651         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3652         // CATCH_FRAME.
3653         //
3654         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3655         // current closure applied to the chunk of stack up to (but not
3656         // including) the update frame.  This closure becomes the "current
3657         // closure".  Go back to step 2.
3658         //
3659         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3660         // top of the stack applied to the exception.
3661         // 
3662         // 5. If it's a STOP_FRAME, then kill the thread.
3663         // 
3664         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3665         // transaction
3666        
3667         
3668         StgPtr frame;
3669         
3670         frame = sp + 1;
3671         info = get_ret_itbl((StgClosure *)frame);
3672         
3673         while (info->i.type != UPDATE_FRAME
3674                && (info->i.type != CATCH_FRAME || exception == NULL)
3675                && info->i.type != STOP_FRAME
3676                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3677         {
3678             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3679               // IF we find an ATOMICALLY_FRAME then we abort the
3680               // current transaction and propagate the exception.  In
3681               // this case (unlike ordinary exceptions) we do not care
3682               // whether the transaction is valid or not because its
3683               // possible validity cannot have caused the exception
3684               // and will not be visible after the abort.
3685               IF_DEBUG(stm,
3686                        debugBelch("Found atomically block delivering async exception\n"));
3687               stmAbortTransaction(tso -> trec);
3688               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3689             }
3690             frame += stack_frame_sizeW((StgClosure *)frame);
3691             info = get_ret_itbl((StgClosure *)frame);
3692         }
3693         
3694         switch (info->i.type) {
3695             
3696         case ATOMICALLY_FRAME:
3697             ASSERT(stop_at_atomically);
3698             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3699             stmCondemnTransaction(tso -> trec);
3700 #ifdef REG_R1
3701             tso->sp = frame;
3702 #else
3703             // R1 is not a register: the return convention for IO in
3704             // this case puts the return value on the stack, so we
3705             // need to set up the stack to return to the atomically
3706             // frame properly...
3707             tso->sp = frame - 2;
3708             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3709             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3710 #endif
3711             tso->what_next = ThreadRunGHC;
3712             return;
3713
3714         case CATCH_FRAME:
3715             // If we find a CATCH_FRAME, and we've got an exception to raise,
3716             // then build the THUNK raise(exception), and leave it on
3717             // top of the CATCH_FRAME ready to enter.
3718             //
3719         {
3720 #ifdef PROFILING
3721             StgCatchFrame *cf = (StgCatchFrame *)frame;
3722 #endif
3723             StgClosure *raise;
3724             
3725             // we've got an exception to raise, so let's pass it to the
3726             // handler in this frame.
3727             //
3728             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3729             TICK_ALLOC_SE_THK(1,0);
3730             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3731             raise->payload[0] = exception;
3732             
3733             // throw away the stack from Sp up to the CATCH_FRAME.
3734             //
3735             sp = frame - 1;
3736             
3737             /* Ensure that async excpetions are blocked now, so we don't get
3738              * a surprise exception before we get around to executing the
3739              * handler.
3740              */
3741             if (tso->blocked_exceptions == NULL) {
3742                 tso->blocked_exceptions = END_TSO_QUEUE;
3743             }
3744             
3745             /* Put the newly-built THUNK on top of the stack, ready to execute
3746              * when the thread restarts.
3747              */
3748             sp[0] = (W_)raise;
3749             sp[-1] = (W_)&stg_enter_info;
3750             tso->sp = sp-1;
3751             tso->what_next = ThreadRunGHC;
3752             IF_DEBUG(sanity, checkTSO(tso));
3753             return;
3754         }
3755         
3756         case UPDATE_FRAME:
3757         {
3758             StgAP_STACK * ap;
3759             nat words;
3760             
3761             // First build an AP_STACK consisting of the stack chunk above the
3762             // current update frame, with the top word on the stack as the
3763             // fun field.
3764             //
3765             words = frame - sp - 1;
3766             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3767             
3768             ap->size = words;
3769             ap->fun  = (StgClosure *)sp[0];
3770             sp++;
3771             for(i=0; i < (nat)words; ++i) {
3772                 ap->payload[i] = (StgClosure *)*sp++;
3773             }
3774             
3775             SET_HDR(ap,&stg_AP_STACK_info,
3776                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3777             TICK_ALLOC_UP_THK(words+1,0);
3778             
3779             IF_DEBUG(scheduler,
3780                      debugBelch("sched: Updating ");
3781                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3782                      debugBelch(" with ");
3783                      printObj((StgClosure *)ap);
3784                 );
3785
3786             // Replace the updatee with an indirection - happily
3787             // this will also wake up any threads currently
3788             // waiting on the result.
3789             //
3790             // Warning: if we're in a loop, more than one update frame on
3791             // the stack may point to the same object.  Be careful not to
3792             // overwrite an IND_OLDGEN in this case, because we'll screw
3793             // up the mutable lists.  To be on the safe side, don't
3794             // overwrite any kind of indirection at all.  See also
3795             // threadSqueezeStack in GC.c, where we have to make a similar
3796             // check.
3797             //
3798             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3799                 // revert the black hole
3800                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3801                                (StgClosure *)ap);
3802             }
3803             sp += sizeofW(StgUpdateFrame) - 1;
3804             sp[0] = (W_)ap; // push onto stack
3805             break;
3806         }
3807         
3808         case STOP_FRAME:
3809             // We've stripped the entire stack, the thread is now dead.
3810             sp += sizeofW(StgStopFrame);
3811             tso->what_next = ThreadKilled;
3812             tso->sp = sp;
3813             return;
3814             
3815         default:
3816             barf("raiseAsync");
3817         }
3818     }
3819     barf("raiseAsync");
3820 }
3821
3822 /* -----------------------------------------------------------------------------
3823    raiseExceptionHelper
3824    
3825    This function is called by the raise# primitve, just so that we can
3826    move some of the tricky bits of raising an exception from C-- into
3827    C.  Who knows, it might be a useful re-useable thing here too.
3828    -------------------------------------------------------------------------- */
3829
3830 StgWord
3831 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3832 {
3833     StgClosure *raise_closure = NULL;
3834     StgPtr p, next;
3835     StgRetInfoTable *info;
3836     //
3837     // This closure represents the expression 'raise# E' where E
3838     // is the exception raise.  It is used to overwrite all the
3839     // thunks which are currently under evaluataion.
3840     //
3841
3842     //    
3843     // LDV profiling: stg_raise_info has THUNK as its closure
3844     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3845     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3846     // 1 does not cause any problem unless profiling is performed.
3847     // However, when LDV profiling goes on, we need to linearly scan
3848     // small object pool, where raise_closure is stored, so we should
3849     // use MIN_UPD_SIZE.
3850     //
3851     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3852     //                                 sizeofW(StgClosure)+1);
3853     //
3854
3855     //
3856     // Walk up the stack, looking for the catch frame.  On the way,
3857     // we update any closures pointed to from update frames with the
3858     // raise closure that we just built.
3859     //
3860     p = tso->sp;
3861     while(1) {
3862         info = get_ret_itbl((StgClosure *)p);
3863         next = p + stack_frame_sizeW((StgClosure *)p);
3864         switch (info->i.type) {
3865             
3866         case UPDATE_FRAME:
3867             // Only create raise_closure if we need to.
3868             if (raise_closure == NULL) {
3869                 raise_closure = 
3870                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3871                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3872                 raise_closure->payload[0] = exception;
3873             }
3874             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3875             p = next;
3876             continue;
3877
3878         case ATOMICALLY_FRAME:
3879             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3880             tso->sp = p;
3881             return ATOMICALLY_FRAME;
3882             
3883         case CATCH_FRAME:
3884             tso->sp = p;
3885             return CATCH_FRAME;
3886
3887         case CATCH_STM_FRAME:
3888             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3889             tso->sp = p;
3890             return CATCH_STM_FRAME;
3891             
3892         case STOP_FRAME:
3893             tso->sp = p;
3894             return STOP_FRAME;
3895
3896         case CATCH_RETRY_FRAME:
3897         default:
3898             p = next; 
3899             continue;
3900         }
3901     }
3902 }
3903
3904
3905 /* -----------------------------------------------------------------------------
3906    findRetryFrameHelper
3907
3908    This function is called by the retry# primitive.  It traverses the stack
3909    leaving tso->sp referring to the frame which should handle the retry.  
3910
3911    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3912    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3913
3914    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3915    despite the similar implementation.
3916
3917    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3918    not be created within memory transactions.
3919    -------------------------------------------------------------------------- */
3920
3921 StgWord
3922 findRetryFrameHelper (StgTSO *tso)
3923 {
3924   StgPtr           p, next;
3925   StgRetInfoTable *info;
3926
3927   p = tso -> sp;
3928   while (1) {
3929     info = get_ret_itbl((StgClosure *)p);
3930     next = p + stack_frame_sizeW((StgClosure *)p);
3931     switch (info->i.type) {
3932       
3933     case ATOMICALLY_FRAME:
3934       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3935       tso->sp = p;
3936       return ATOMICALLY_FRAME;
3937       
3938     case CATCH_RETRY_FRAME:
3939       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3940       tso->sp = p;
3941       return CATCH_RETRY_FRAME;
3942       
3943     case CATCH_STM_FRAME:
3944     default:
3945       ASSERT(info->i.type != CATCH_FRAME);
3946       ASSERT(info->i.type != STOP_FRAME);
3947       p = next; 
3948       continue;
3949     }
3950   }
3951 }
3952
3953 /* -----------------------------------------------------------------------------
3954    resurrectThreads is called after garbage collection on the list of
3955    threads found to be garbage.  Each of these threads will be woken
3956    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3957    on an MVar, or NonTermination if the thread was blocked on a Black
3958    Hole.
3959
3960    Locks: sched_mutex isn't held upon entry nor exit.
3961    -------------------------------------------------------------------------- */
3962
3963 void
3964 resurrectThreads( StgTSO *threads )
3965 {
3966   StgTSO *tso, *next;
3967
3968   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3969     next = tso->global_link;
3970     tso->global_link = all_threads;
3971     all_threads = tso;
3972     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3973
3974     switch (tso->why_blocked) {
3975     case BlockedOnMVar:
3976     case BlockedOnException:
3977       /* Called by GC - sched_mutex lock is currently held. */
3978       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3979       break;
3980     case BlockedOnBlackHole:
3981       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3982       break;
3983     case BlockedOnSTM:
3984       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3985       break;
3986     case NotBlocked:
3987       /* This might happen if the thread was blocked on a black hole
3988        * belonging to a thread that we've just woken up (raiseAsync
3989        * can wake up threads, remember...).
3990        */
3991       continue;
3992     default:
3993       barf("resurrectThreads: thread blocked in a strange way");
3994     }
3995   }
3996 }
3997
3998 /* ----------------------------------------------------------------------------
3999  * Debugging: why is a thread blocked
4000  * [Also provides useful information when debugging threaded programs
4001  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4002    ------------------------------------------------------------------------- */
4003
4004 static void
4005 printThreadBlockage(StgTSO *tso)
4006 {
4007   switch (tso->why_blocked) {
4008   case BlockedOnRead:
4009     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4010     break;
4011   case BlockedOnWrite:
4012     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4013     break;
4014 #if defined(mingw32_HOST_OS)
4015     case BlockedOnDoProc:
4016     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4017     break;
4018 #endif
4019   case BlockedOnDelay:
4020     debugBelch("is blocked until %ld", tso->block_info.target);
4021     break;
4022   case BlockedOnMVar:
4023     debugBelch("is blocked on an MVar");
4024     break;
4025   case BlockedOnException:
4026     debugBelch("is blocked on delivering an exception to thread %d",
4027             tso->block_info.tso->id);
4028     break;
4029   case BlockedOnBlackHole:
4030     debugBelch("is blocked on a black hole");
4031     break;
4032   case NotBlocked:
4033     debugBelch("is not blocked");
4034     break;
4035 #if defined(PARALLEL_HASKELL)
4036   case BlockedOnGA:
4037     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4038             tso->block_info.closure, info_type(tso->block_info.closure));
4039     break;
4040   case BlockedOnGA_NoSend:
4041     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4042             tso->block_info.closure, info_type(tso->block_info.closure));
4043     break;
4044 #endif
4045   case BlockedOnCCall:
4046     debugBelch("is blocked on an external call");
4047     break;
4048   case BlockedOnCCall_NoUnblockExc:
4049     debugBelch("is blocked on an external call (exceptions were already blocked)");
4050     break;
4051   case BlockedOnSTM:
4052     debugBelch("is blocked on an STM operation");
4053     break;
4054   default:
4055     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4056          tso->why_blocked, tso->id, tso);
4057   }
4058 }
4059
4060 static void
4061 printThreadStatus(StgTSO *tso)
4062 {
4063   switch (tso->what_next) {
4064   case ThreadKilled:
4065     debugBelch("has been killed");
4066     break;
4067   case ThreadComplete:
4068     debugBelch("has completed");
4069     break;
4070   default:
4071     printThreadBlockage(tso);
4072   }
4073 }
4074
4075 void
4076 printAllThreads(void)
4077 {
4078   StgTSO *t;
4079
4080 # if defined(GRAN)
4081   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4082   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4083                        time_string, rtsFalse/*no commas!*/);
4084
4085   debugBelch("all threads at [%s]:\n", time_string);
4086 # elif defined(PARALLEL_HASKELL)
4087   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4088   ullong_format_string(CURRENT_TIME,
4089                        time_string, rtsFalse/*no commas!*/);
4090
4091   debugBelch("all threads at [%s]:\n", time_string);
4092 # else
4093   debugBelch("all threads:\n");
4094 # endif
4095
4096   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4097     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4098 #if defined(DEBUG)
4099     {
4100       void *label = lookupThreadLabel(t->id);
4101       if (label) debugBelch("[\"%s\"] ",(char *)label);
4102     }
4103 #endif
4104     printThreadStatus(t);
4105     debugBelch("\n");
4106   }
4107 }
4108     
4109 #ifdef DEBUG
4110
4111 /* 
4112    Print a whole blocking queue attached to node (debugging only).
4113 */
4114 # if defined(PARALLEL_HASKELL)
4115 void 
4116 print_bq (StgClosure *node)
4117 {
4118   StgBlockingQueueElement *bqe;
4119   StgTSO *tso;
4120   rtsBool end;
4121
4122   debugBelch("## BQ of closure %p (%s): ",
4123           node, info_type(node));
4124
4125   /* should cover all closures that may have a blocking queue */
4126   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4127          get_itbl(node)->type == FETCH_ME_BQ ||
4128          get_itbl(node)->type == RBH ||
4129          get_itbl(node)->type == MVAR);
4130     
4131   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4132
4133   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4134 }
4135
4136 /* 
4137    Print a whole blocking queue starting with the element bqe.
4138 */
4139 void 
4140 print_bqe (StgBlockingQueueElement *bqe)
4141 {
4142   rtsBool end;
4143
4144   /* 
4145      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4146   */
4147   for (end = (bqe==END_BQ_QUEUE);
4148        !end; // iterate until bqe points to a CONSTR
4149        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4150        bqe = end ? END_BQ_QUEUE : bqe->link) {
4151     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4152     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4153     /* types of closures that may appear in a blocking queue */
4154     ASSERT(get_itbl(bqe)->type == TSO ||           
4155            get_itbl(bqe)->type == BLOCKED_FETCH || 
4156            get_itbl(bqe)->type == CONSTR); 
4157     /* only BQs of an RBH end with an RBH_Save closure */
4158     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4159
4160     switch (get_itbl(bqe)->type) {
4161     case TSO:
4162       debugBelch(" TSO %u (%x),",
4163               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4164       break;
4165     case BLOCKED_FETCH:
4166       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4167               ((StgBlockedFetch *)bqe)->node, 
4168               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4169               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4170               ((StgBlockedFetch *)bqe)->ga.weight);
4171       break;
4172     case CONSTR:
4173       debugBelch(" %s (IP %p),",
4174               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4175                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4176                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4177                "RBH_Save_?"), get_itbl(bqe));
4178       break;
4179     default:
4180       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4181            info_type((StgClosure *)bqe)); // , node, info_type(node));
4182       break;
4183     }
4184   } /* for */
4185   debugBelch("\n");
4186 }
4187 # elif defined(GRAN)
4188 void 
4189 print_bq (StgClosure *node)
4190 {
4191   StgBlockingQueueElement *bqe;
4192   PEs node_loc, tso_loc;
4193   rtsBool end;
4194
4195   /* should cover all closures that may have a blocking queue */
4196   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4197          get_itbl(node)->type == FETCH_ME_BQ ||
4198          get_itbl(node)->type == RBH);
4199     
4200   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4201   node_loc = where_is(node);
4202
4203   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4204           node, info_type(node), node_loc);
4205
4206   /* 
4207      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4208   */
4209   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4210        !end; // iterate until bqe points to a CONSTR
4211        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4212     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4213     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4214     /* types of closures that may appear in a blocking queue */
4215     ASSERT(get_itbl(bqe)->type == TSO ||           
4216            get_itbl(bqe)->type == CONSTR); 
4217     /* only BQs of an RBH end with an RBH_Save closure */
4218     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4219
4220     tso_loc = where_is((StgClosure *)bqe);
4221     switch (get_itbl(bqe)->type) {
4222     case TSO:
4223       debugBelch(" TSO %d (%p) on [PE %d],",
4224               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4225       break;
4226     case CONSTR:
4227       debugBelch(" %s (IP %p),",
4228               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4229                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4230                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4231                "RBH_Save_?"), get_itbl(bqe));
4232       break;
4233     default:
4234       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4235            info_type((StgClosure *)bqe), node, info_type(node));
4236       break;
4237     }
4238   } /* for */
4239   debugBelch("\n");
4240 }
4241 # endif
4242
4243 #if defined(PARALLEL_HASKELL)
4244 static nat
4245 run_queue_len(void)
4246 {
4247   nat i;
4248   StgTSO *tso;
4249
4250   for (i=0, tso=run_queue_hd; 
4251        tso != END_TSO_QUEUE;
4252        i++, tso=tso->link)
4253     /* nothing */
4254
4255   return i;
4256 }
4257 #endif
4258
4259 void
4260 sched_belch(char *s, ...)
4261 {
4262   va_list ap;
4263   va_start(ap,s);
4264 #ifdef RTS_SUPPORTS_THREADS
4265   debugBelch("sched (task %p): ", osThreadId());
4266 #elif defined(PARALLEL_HASKELL)
4267   debugBelch("== ");
4268 #else
4269   debugBelch("sched: ");
4270 #endif
4271   vdebugBelch(s, ap);
4272   debugBelch("\n");
4273   va_end(ap);
4274 }
4275
4276 #endif /* DEBUG */