Add an --install-signal-handlers=<yes|no> RTS flag; fixes trac #804
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #ifdef PROFILING
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #endif
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
37 # include "GranSim.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
41 # include "FetchMe.h"
42 # include "HLC.h"
43 #endif
44 #include "Sparks.h"
45 #include "Capability.h"
46 #include "Task.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
50 #endif
51 #include "Trace.h"
52 #include "RaiseAsync.h"
53 #include "Threads.h"
54 #include "ThrIOManager.h"
55
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
58 #endif
59 #ifdef HAVE_UNISTD_H
60 #include <unistd.h>
61 #endif
62
63 #include <string.h>
64 #include <stdlib.h>
65 #include <stdarg.h>
66
67 #ifdef HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 // Turn off inlining when debugging - it obfuscates things
72 #ifdef DEBUG
73 # undef  STATIC_INLINE
74 # define STATIC_INLINE static
75 #endif
76
77 /* -----------------------------------------------------------------------------
78  * Global variables
79  * -------------------------------------------------------------------------- */
80
81 #if defined(GRAN)
82
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86 /* 
87    In GranSim we have a runnable and a blocked queue for each processor.
88    In order to minimise code changes new arrays run_queue_hds/tls
89    are created. run_queue_hd is then a short cut (macro) for
90    run_queue_hds[CurrentProc] (see GranSim.h).
91    -- HWL
92 */
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97    the std RTS (i.e. we are cheating). However, we don't use this list in
98    the GranSim specific code at the moment (so we are only potentially
99    cheating).  */
100
101 #else /* !GRAN */
102
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
108 #endif
109
110 /* Threads blocked on blackholes.
111  * LOCK: sched_mutex+capability, or all capabilities
112  */
113 StgTSO *blackhole_queue = NULL;
114 #endif
115
116 /* The blackhole_queue should be checked for threads to wake up.  See
117  * Schedule.h for more thorough comment.
118  * LOCK: none (doesn't matter if we miss an update)
119  */
120 rtsBool blackholes_need_checking = rtsFalse;
121
122 /* Linked list of all threads.
123  * Used for detecting garbage collected threads.
124  * LOCK: sched_mutex+capability, or all capabilities
125  */
126 StgTSO *all_threads = NULL;
127
128 /* flag set by signal handler to precipitate a context switch
129  * LOCK: none (just an advisory flag)
130  */
131 int context_switch = 0;
132
133 /* flag that tracks whether we have done any execution in this time slice.
134  * LOCK: currently none, perhaps we should lock (but needs to be
135  * updated in the fast path of the scheduler).
136  */
137 nat recent_activity = ACTIVITY_YES;
138
139 /* if this flag is set as well, give up execution
140  * LOCK: none (changes once, from false->true)
141  */
142 rtsBool sched_state = SCHED_RUNNING;
143
144 #if defined(GRAN)
145 StgTSO *CurrentTSO;
146 #endif
147
148 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
149  *  exists - earlier gccs apparently didn't.
150  *  -= chak
151  */
152 StgTSO dummy_tso;
153
154 /*
155  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156  * in an MT setting, needed to signal that a worker thread shouldn't hang around
157  * in the scheduler when it is out of work.
158  */
159 rtsBool shutting_down_scheduler = rtsFalse;
160
161 /*
162  * This mutex protects most of the global scheduler data in
163  * the THREADED_RTS runtime.
164  */
165 #if defined(THREADED_RTS)
166 Mutex sched_mutex;
167 #endif
168
169 #if defined(PARALLEL_HASKELL)
170 StgTSO *LastTSO;
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
173 #endif
174
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
177 #endif
178
179 /* -----------------------------------------------------------------------------
180  * static function prototypes
181  * -------------------------------------------------------------------------- */
182
183 static Capability *schedule (Capability *initialCapability, Task *task);
184
185 //
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
189 //
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
193 #endif
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 #if defined(GRAN)
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #endif
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #endif
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
209 #endif
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
213                                          StgTSO *t);
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
215                                     nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218                                              StgTSO *t );
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221                                 rtsBool force_major);
222
223 static rtsBool checkBlackHoles(Capability *cap);
224
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
229
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
232 #endif
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);  
237 #endif
238
239 #ifdef DEBUG
240 static char *whatNext_strs[] = {
241   "(unknown)",
242   "ThreadRunGHC",
243   "ThreadInterpret",
244   "ThreadKilled",
245   "ThreadRelocated",
246   "ThreadComplete"
247 };
248 #endif
249
250 /* -----------------------------------------------------------------------------
251  * Putting a thread on the run queue: different scheduling policies
252  * -------------------------------------------------------------------------- */
253
254 STATIC_INLINE void
255 addToRunQueue( Capability *cap, StgTSO *t )
256 {
257 #if defined(PARALLEL_HASKELL)
258     if (RtsFlags.ParFlags.doFairScheduling) { 
259         // this does round-robin scheduling; good for concurrency
260         appendToRunQueue(cap,t);
261     } else {
262         // this does unfair scheduling; good for parallelism
263         pushOnRunQueue(cap,t);
264     }
265 #else
266     // this does round-robin scheduling; good for concurrency
267     appendToRunQueue(cap,t);
268 #endif
269 }
270
271 /* ---------------------------------------------------------------------------
272    Main scheduling loop.
273
274    We use round-robin scheduling, each thread returning to the
275    scheduler loop when one of these conditions is detected:
276
277       * out of heap space
278       * timer expires (thread yields)
279       * thread blocks
280       * thread ends
281       * stack overflow
282
283    GRAN version:
284      In a GranSim setup this loop iterates over the global event queue.
285      This revolves around the global event queue, which determines what 
286      to do next. Therefore, it's more complicated than either the 
287      concurrent or the parallel (GUM) setup.
288
289    GUM version:
290      GUM iterates over incoming messages.
291      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292      and sends out a fish whenever it has nothing to do; in-between
293      doing the actual reductions (shared code below) it processes the
294      incoming messages and deals with delayed operations 
295      (see PendingFetches).
296      This is not the ugliest code you could imagine, but it's bloody close.
297
298    ------------------------------------------------------------------------ */
299
300 static Capability *
301 schedule (Capability *initialCapability, Task *task)
302 {
303   StgTSO *t;
304   Capability *cap;
305   StgThreadReturnCode ret;
306 #if defined(GRAN)
307   rtsEvent *event;
308 #elif defined(PARALLEL_HASKELL)
309   StgTSO *tso;
310   GlobalTaskId pe;
311   rtsBool receivedFinish = rtsFalse;
312 # if defined(DEBUG)
313   nat tp_size, sp_size; // stats only
314 # endif
315 #endif
316   nat prev_what_next;
317   rtsBool ready_to_gc;
318 #if defined(THREADED_RTS)
319   rtsBool first = rtsTrue;
320 #endif
321   
322   cap = initialCapability;
323
324   // Pre-condition: this task owns initialCapability.
325   // The sched_mutex is *NOT* held
326   // NB. on return, we still hold a capability.
327
328   debugTrace (DEBUG_sched, 
329               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330               task, initialCapability);
331
332   schedulePreLoop();
333
334   // -----------------------------------------------------------
335   // Scheduler loop starts here:
336
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION        (!receivedFinish)
339 #elif defined(GRAN)
340 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
341 #else
342 #define TERMINATION_CONDITION        rtsTrue
343 #endif
344
345   while (TERMINATION_CONDITION) {
346
347 #if defined(GRAN)
348       /* Choose the processor with the next event */
349       CurrentProc = event->proc;
350       CurrentTSO = event->tso;
351 #endif
352
353 #if defined(THREADED_RTS)
354       if (first) {
355           // don't yield the first time, we want a chance to run this
356           // thread for a bit, even if there are others banging at the
357           // door.
358           first = rtsFalse;
359           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360       } else {
361           // Yield the capability to higher-priority tasks if necessary.
362           yieldCapability(&cap, task);
363       }
364 #endif
365       
366 #if defined(THREADED_RTS)
367       schedulePushWork(cap,task);
368 #endif
369
370     // Check whether we have re-entered the RTS from Haskell without
371     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372     // call).
373     if (cap->in_haskell) {
374           errorBelch("schedule: re-entered unsafely.\n"
375                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
376           stg_exit(EXIT_FAILURE);
377     }
378
379     // The interruption / shutdown sequence.
380     // 
381     // In order to cleanly shut down the runtime, we want to:
382     //   * make sure that all main threads return to their callers
383     //     with the state 'Interrupted'.
384     //   * clean up all OS threads assocated with the runtime
385     //   * free all memory etc.
386     //
387     // So the sequence for ^C goes like this:
388     //
389     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
390     //     arranges for some Capability to wake up
391     //
392     //   * all threads in the system are halted, and the zombies are
393     //     placed on the run queue for cleaning up.  We acquire all
394     //     the capabilities in order to delete the threads, this is
395     //     done by scheduleDoGC() for convenience (because GC already
396     //     needs to acquire all the capabilities).  We can't kill
397     //     threads involved in foreign calls.
398     // 
399     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
400     //
401     //   * sched_state := SCHED_SHUTTING_DOWN
402     //
403     //   * all workers exit when the run queue on their capability
404     //     drains.  All main threads will also exit when their TSO
405     //     reaches the head of the run queue and they can return.
406     //
407     //   * eventually all Capabilities will shut down, and the RTS can
408     //     exit.
409     //
410     //   * We might be left with threads blocked in foreign calls, 
411     //     we should really attempt to kill these somehow (TODO);
412     
413     switch (sched_state) {
414     case SCHED_RUNNING:
415         break;
416     case SCHED_INTERRUPTING:
417         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419         discardSparksCap(cap);
420 #endif
421         /* scheduleDoGC() deletes all the threads */
422         cap = scheduleDoGC(cap,task,rtsFalse);
423         break;
424     case SCHED_SHUTTING_DOWN:
425         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426         // If we are a worker, just exit.  If we're a bound thread
427         // then we will exit below when we've removed our TSO from
428         // the run queue.
429         if (task->tso == NULL && emptyRunQueue(cap)) {
430             return cap;
431         }
432         break;
433     default:
434         barf("sched_state: %d", sched_state);
435     }
436
437 #if defined(THREADED_RTS)
438     // If the run queue is empty, take a spark and turn it into a thread.
439     {
440         if (emptyRunQueue(cap)) {
441             StgClosure *spark;
442             spark = findSpark(cap);
443             if (spark != NULL) {
444                 debugTrace(DEBUG_sched,
445                            "turning spark of closure %p into a thread",
446                            (StgClosure *)spark);
447                 createSparkThread(cap,spark);     
448             }
449         }
450     }
451 #endif // THREADED_RTS
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckWakeupThreads(cap);
462
463     scheduleCheckBlockedThreads(cap);
464
465     scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467     cap = task->cap;    // reload cap, it might have changed
468 #endif
469
470     // Normally, the only way we can get here with no threads to
471     // run is if a keyboard interrupt received during 
472     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473     // Additionally, it is not fatal for the
474     // threaded RTS to reach here with no threads to run.
475     //
476     // win32: might be here due to awaitEvent() being abandoned
477     // as a result of a console event having been delivered.
478     if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480         ASSERT(sched_state >= SCHED_INTERRUPTING);
481 #endif
482         continue; // nothing to do
483     }
484
485 #if defined(PARALLEL_HASKELL)
486     scheduleSendPendingMessages();
487     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
488         continue;
489
490 #if defined(SPARKS)
491     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
492 #endif
493
494     /* If we still have no work we need to send a FISH to get a spark
495        from another PE */
496     if (emptyRunQueue(cap)) {
497         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498         ASSERT(rtsFalse); // should not happen at the moment
499     }
500     // from here: non-empty run queue.
501     //  TODO: merge above case with this, only one call processMessages() !
502     if (PacketsWaiting()) {  /* process incoming messages, if
503                                 any pending...  only in else
504                                 because getRemoteWork waits for
505                                 messages as well */
506         receivedFinish = processMessages();
507     }
508 #endif
509
510 #if defined(GRAN)
511     scheduleProcessEvent(event);
512 #endif
513
514     // 
515     // Get a thread to run
516     //
517     t = popRunQueue(cap);
518
519 #if defined(GRAN) || defined(PAR)
520     scheduleGranParReport(); // some kind of debuging output
521 #else
522     // Sanity check the thread we're about to run.  This can be
523     // expensive if there is lots of thread switching going on...
524     IF_DEBUG(sanity,checkTSO(t));
525 #endif
526
527 #if defined(THREADED_RTS)
528     // Check whether we can run this thread in the current task.
529     // If not, we have to pass our capability to the right task.
530     {
531         Task *bound = t->bound;
532       
533         if (bound) {
534             if (bound == task) {
535                 debugTrace(DEBUG_sched,
536                            "### Running thread %lu in bound thread", (unsigned long)t->id);
537                 // yes, the Haskell thread is bound to the current native thread
538             } else {
539                 debugTrace(DEBUG_sched,
540                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
541                 // no, bound to a different Haskell thread: pass to that thread
542                 pushOnRunQueue(cap,t);
543                 continue;
544             }
545         } else {
546             // The thread we want to run is unbound.
547             if (task->tso) { 
548                 debugTrace(DEBUG_sched,
549                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550                 // no, the current native thread is bound to a different
551                 // Haskell thread, so pass it to any worker thread
552                 pushOnRunQueue(cap,t);
553                 continue; 
554             }
555         }
556     }
557 #endif
558
559     cap->r.rCurrentTSO = t;
560     
561     /* context switches are initiated by the timer signal, unless
562      * the user specified "context switch as often as possible", with
563      * +RTS -C0
564      */
565     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566         && !emptyThreadQueues(cap)) {
567         context_switch = 1;
568     }
569          
570 run_thread:
571
572     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
573                               (long)t->id, whatNext_strs[t->what_next]);
574
575 #if defined(PROFILING)
576     startHeapProfTimer();
577 #endif
578
579     // Check for exceptions blocked on this thread
580     maybePerformBlockedException (cap, t);
581
582     // ----------------------------------------------------------------------
583     // Run the current thread 
584
585     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586     ASSERT(t->cap == cap);
587
588     prev_what_next = t->what_next;
589
590     errno = t->saved_errno;
591 #if mingw32_HOST_OS
592     SetLastError(t->saved_winerror);
593 #endif
594
595     cap->in_haskell = rtsTrue;
596
597     dirtyTSO(t);
598
599     recent_activity = ACTIVITY_YES;
600
601     switch (prev_what_next) {
602         
603     case ThreadKilled:
604     case ThreadComplete:
605         /* Thread already finished, return to scheduler. */
606         ret = ThreadFinished;
607         break;
608         
609     case ThreadRunGHC:
610     {
611         StgRegTable *r;
612         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
613         cap = regTableToCapability(r);
614         ret = r->rRet;
615         break;
616     }
617     
618     case ThreadInterpret:
619         cap = interpretBCO(cap);
620         ret = cap->r.rRet;
621         break;
622         
623     default:
624         barf("schedule: invalid what_next field");
625     }
626
627     cap->in_haskell = rtsFalse;
628
629     // The TSO might have moved, eg. if it re-entered the RTS and a GC
630     // happened.  So find the new location:
631     t = cap->r.rCurrentTSO;
632
633     // We have run some Haskell code: there might be blackhole-blocked
634     // threads to wake up now.
635     // Lock-free test here should be ok, we're just setting a flag.
636     if ( blackhole_queue != END_TSO_QUEUE ) {
637         blackholes_need_checking = rtsTrue;
638     }
639     
640     // And save the current errno in this thread.
641     // XXX: possibly bogus for SMP because this thread might already
642     // be running again, see code below.
643     t->saved_errno = errno;
644 #if mingw32_HOST_OS
645     // Similarly for Windows error code
646     t->saved_winerror = GetLastError();
647 #endif
648
649 #if defined(THREADED_RTS)
650     // If ret is ThreadBlocked, and this Task is bound to the TSO that
651     // blocked, we are in limbo - the TSO is now owned by whatever it
652     // is blocked on, and may in fact already have been woken up,
653     // perhaps even on a different Capability.  It may be the case
654     // that task->cap != cap.  We better yield this Capability
655     // immediately and return to normaility.
656     if (ret == ThreadBlocked) {
657         debugTrace(DEBUG_sched,
658                    "--<< thread %lu (%s) stopped: blocked",
659                    (unsigned long)t->id, whatNext_strs[t->what_next]);
660         continue;
661     }
662 #endif
663
664     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
665     ASSERT(t->cap == cap);
666
667     // ----------------------------------------------------------------------
668     
669     // Costs for the scheduler are assigned to CCS_SYSTEM
670 #if defined(PROFILING)
671     stopHeapProfTimer();
672     CCCS = CCS_SYSTEM;
673 #endif
674     
675     schedulePostRunThread();
676
677     ready_to_gc = rtsFalse;
678
679     switch (ret) {
680     case HeapOverflow:
681         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
682         break;
683
684     case StackOverflow:
685         scheduleHandleStackOverflow(cap,task,t);
686         break;
687
688     case ThreadYielding:
689         if (scheduleHandleYield(cap, t, prev_what_next)) {
690             // shortcut for switching between compiler/interpreter:
691             goto run_thread; 
692         }
693         break;
694
695     case ThreadBlocked:
696         scheduleHandleThreadBlocked(t);
697         break;
698
699     case ThreadFinished:
700         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
701         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
702         break;
703
704     default:
705       barf("schedule: invalid thread return code %d", (int)ret);
706     }
707
708     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
709     if (ready_to_gc) {
710       cap = scheduleDoGC(cap,task,rtsFalse);
711     }
712   } /* end of while() */
713
714   debugTrace(PAR_DEBUG_verbose,
715              "== Leaving schedule() after having received Finish");
716 }
717
718 /* ----------------------------------------------------------------------------
719  * Setting up the scheduler loop
720  * ------------------------------------------------------------------------- */
721
722 static void
723 schedulePreLoop(void)
724 {
725 #if defined(GRAN) 
726     /* set up first event to get things going */
727     /* ToDo: assign costs for system setup and init MainTSO ! */
728     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
729               ContinueThread, 
730               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
731     
732     debugTrace (DEBUG_gran,
733                 "GRAN: Init CurrentTSO (in schedule) = %p", 
734                 CurrentTSO);
735     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
736     
737     if (RtsFlags.GranFlags.Light) {
738         /* Save current time; GranSim Light only */
739         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
740     }      
741 #endif
742 }
743
744 /* -----------------------------------------------------------------------------
745  * schedulePushWork()
746  *
747  * Push work to other Capabilities if we have some.
748  * -------------------------------------------------------------------------- */
749
750 #if defined(THREADED_RTS)
751 static void
752 schedulePushWork(Capability *cap USED_IF_THREADS, 
753                  Task *task      USED_IF_THREADS)
754 {
755     Capability *free_caps[n_capabilities], *cap0;
756     nat i, n_free_caps;
757
758     // migration can be turned off with +RTS -qg
759     if (!RtsFlags.ParFlags.migrate) return;
760
761     // Check whether we have more threads on our run queue, or sparks
762     // in our pool, that we could hand to another Capability.
763     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
764         && sparkPoolSizeCap(cap) < 2) {
765         return;
766     }
767
768     // First grab as many free Capabilities as we can.
769     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
770         cap0 = &capabilities[i];
771         if (cap != cap0 && tryGrabCapability(cap0,task)) {
772             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
773                 // it already has some work, we just grabbed it at 
774                 // the wrong moment.  Or maybe it's deadlocked!
775                 releaseCapability(cap0);
776             } else {
777                 free_caps[n_free_caps++] = cap0;
778             }
779         }
780     }
781
782     // we now have n_free_caps free capabilities stashed in
783     // free_caps[].  Share our run queue equally with them.  This is
784     // probably the simplest thing we could do; improvements we might
785     // want to do include:
786     //
787     //   - giving high priority to moving relatively new threads, on 
788     //     the gournds that they haven't had time to build up a
789     //     working set in the cache on this CPU/Capability.
790     //
791     //   - giving low priority to moving long-lived threads
792
793     if (n_free_caps > 0) {
794         StgTSO *prev, *t, *next;
795         rtsBool pushed_to_all;
796
797         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
798
799         i = 0;
800         pushed_to_all = rtsFalse;
801
802         if (cap->run_queue_hd != END_TSO_QUEUE) {
803             prev = cap->run_queue_hd;
804             t = prev->link;
805             prev->link = END_TSO_QUEUE;
806             for (; t != END_TSO_QUEUE; t = next) {
807                 next = t->link;
808                 t->link = END_TSO_QUEUE;
809                 if (t->what_next == ThreadRelocated
810                     || t->bound == task // don't move my bound thread
811                     || tsoLocked(t)) {  // don't move a locked thread
812                     prev->link = t;
813                     prev = t;
814                 } else if (i == n_free_caps) {
815                     pushed_to_all = rtsTrue;
816                     i = 0;
817                     // keep one for us
818                     prev->link = t;
819                     prev = t;
820                 } else {
821                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
822                     appendToRunQueue(free_caps[i],t);
823                     if (t->bound) { t->bound->cap = free_caps[i]; }
824                     t->cap = free_caps[i];
825                     i++;
826                 }
827             }
828             cap->run_queue_tl = prev;
829         }
830
831         // If there are some free capabilities that we didn't push any
832         // threads to, then try to push a spark to each one.
833         if (!pushed_to_all) {
834             StgClosure *spark;
835             // i is the next free capability to push to
836             for (; i < n_free_caps; i++) {
837                 if (emptySparkPoolCap(free_caps[i])) {
838                     spark = findSpark(cap);
839                     if (spark != NULL) {
840                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
841                         newSpark(&(free_caps[i]->r), spark);
842                     }
843                 }
844             }
845         }
846
847         // release the capabilities
848         for (i = 0; i < n_free_caps; i++) {
849             task->cap = free_caps[i];
850             releaseCapability(free_caps[i]);
851         }
852     }
853     task->cap = cap; // reset to point to our Capability.
854 }
855 #endif
856
857 /* ----------------------------------------------------------------------------
858  * Start any pending signal handlers
859  * ------------------------------------------------------------------------- */
860
861 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
862 static void
863 scheduleStartSignalHandlers(Capability *cap)
864 {
865     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
866         // safe outside the lock
867         startSignalHandlers(cap);
868     }
869 }
870 #else
871 static void
872 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
873 {
874 }
875 #endif
876
877 /* ----------------------------------------------------------------------------
878  * Check for blocked threads that can be woken up.
879  * ------------------------------------------------------------------------- */
880
881 static void
882 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
883 {
884 #if !defined(THREADED_RTS)
885     //
886     // Check whether any waiting threads need to be woken up.  If the
887     // run queue is empty, and there are no other tasks running, we
888     // can wait indefinitely for something to happen.
889     //
890     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
891     {
892         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
893     }
894 #endif
895 }
896
897
898 /* ----------------------------------------------------------------------------
899  * Check for threads woken up by other Capabilities
900  * ------------------------------------------------------------------------- */
901
902 static void
903 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
904 {
905 #if defined(THREADED_RTS)
906     // Any threads that were woken up by other Capabilities get
907     // appended to our run queue.
908     if (!emptyWakeupQueue(cap)) {
909         ACQUIRE_LOCK(&cap->lock);
910         if (emptyRunQueue(cap)) {
911             cap->run_queue_hd = cap->wakeup_queue_hd;
912             cap->run_queue_tl = cap->wakeup_queue_tl;
913         } else {
914             cap->run_queue_tl->link = cap->wakeup_queue_hd;
915             cap->run_queue_tl = cap->wakeup_queue_tl;
916         }
917         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
918         RELEASE_LOCK(&cap->lock);
919     }
920 #endif
921 }
922
923 /* ----------------------------------------------------------------------------
924  * Check for threads blocked on BLACKHOLEs that can be woken up
925  * ------------------------------------------------------------------------- */
926 static void
927 scheduleCheckBlackHoles (Capability *cap)
928 {
929     if ( blackholes_need_checking ) // check without the lock first
930     {
931         ACQUIRE_LOCK(&sched_mutex);
932         if ( blackholes_need_checking ) {
933             checkBlackHoles(cap);
934             blackholes_need_checking = rtsFalse;
935         }
936         RELEASE_LOCK(&sched_mutex);
937     }
938 }
939
940 /* ----------------------------------------------------------------------------
941  * Detect deadlock conditions and attempt to resolve them.
942  * ------------------------------------------------------------------------- */
943
944 static void
945 scheduleDetectDeadlock (Capability *cap, Task *task)
946 {
947
948 #if defined(PARALLEL_HASKELL)
949     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
950     return;
951 #endif
952
953     /* 
954      * Detect deadlock: when we have no threads to run, there are no
955      * threads blocked, waiting for I/O, or sleeping, and all the
956      * other tasks are waiting for work, we must have a deadlock of
957      * some description.
958      */
959     if ( emptyThreadQueues(cap) )
960     {
961 #if defined(THREADED_RTS)
962         /* 
963          * In the threaded RTS, we only check for deadlock if there
964          * has been no activity in a complete timeslice.  This means
965          * we won't eagerly start a full GC just because we don't have
966          * any threads to run currently.
967          */
968         if (recent_activity != ACTIVITY_INACTIVE) return;
969 #endif
970
971         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
972
973         // Garbage collection can release some new threads due to
974         // either (a) finalizers or (b) threads resurrected because
975         // they are unreachable and will therefore be sent an
976         // exception.  Any threads thus released will be immediately
977         // runnable.
978         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
979
980         recent_activity = ACTIVITY_DONE_GC;
981         
982         if ( !emptyRunQueue(cap) ) return;
983
984 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
985         /* If we have user-installed signal handlers, then wait
986          * for signals to arrive rather then bombing out with a
987          * deadlock.
988          */
989         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
990             debugTrace(DEBUG_sched,
991                        "still deadlocked, waiting for signals...");
992
993             awaitUserSignals();
994
995             if (signals_pending()) {
996                 startSignalHandlers(cap);
997             }
998
999             // either we have threads to run, or we were interrupted:
1000             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1001         }
1002 #endif
1003
1004 #if !defined(THREADED_RTS)
1005         /* Probably a real deadlock.  Send the current main thread the
1006          * Deadlock exception.
1007          */
1008         if (task->tso) {
1009             switch (task->tso->why_blocked) {
1010             case BlockedOnSTM:
1011             case BlockedOnBlackHole:
1012             case BlockedOnException:
1013             case BlockedOnMVar:
1014                 throwToSingleThreaded(cap, task->tso, 
1015                                       (StgClosure *)NonTermination_closure);
1016                 return;
1017             default:
1018                 barf("deadlock: main thread blocked in a strange way");
1019             }
1020         }
1021         return;
1022 #endif
1023     }
1024 }
1025
1026 /* ----------------------------------------------------------------------------
1027  * Process an event (GRAN only)
1028  * ------------------------------------------------------------------------- */
1029
1030 #if defined(GRAN)
1031 static StgTSO *
1032 scheduleProcessEvent(rtsEvent *event)
1033 {
1034     StgTSO *t;
1035
1036     if (RtsFlags.GranFlags.Light)
1037       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1038
1039     /* adjust time based on time-stamp */
1040     if (event->time > CurrentTime[CurrentProc] &&
1041         event->evttype != ContinueThread)
1042       CurrentTime[CurrentProc] = event->time;
1043     
1044     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1045     if (!RtsFlags.GranFlags.Light)
1046       handleIdlePEs();
1047
1048     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1049
1050     /* main event dispatcher in GranSim */
1051     switch (event->evttype) {
1052       /* Should just be continuing execution */
1053     case ContinueThread:
1054       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1055       /* ToDo: check assertion
1056       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1057              run_queue_hd != END_TSO_QUEUE);
1058       */
1059       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1060       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1061           procStatus[CurrentProc]==Fetching) {
1062         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1063               CurrentTSO->id, CurrentTSO, CurrentProc);
1064         goto next_thread;
1065       } 
1066       /* Ignore ContinueThreads for completed threads */
1067       if (CurrentTSO->what_next == ThreadComplete) {
1068         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1069               CurrentTSO->id, CurrentTSO, CurrentProc);
1070         goto next_thread;
1071       } 
1072       /* Ignore ContinueThreads for threads that are being migrated */
1073       if (PROCS(CurrentTSO)==Nowhere) { 
1074         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1075               CurrentTSO->id, CurrentTSO, CurrentProc);
1076         goto next_thread;
1077       }
1078       /* The thread should be at the beginning of the run queue */
1079       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1080         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1081               CurrentTSO->id, CurrentTSO, CurrentProc);
1082         break; // run the thread anyway
1083       }
1084       /*
1085       new_event(proc, proc, CurrentTime[proc],
1086                 FindWork,
1087                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1088       goto next_thread; 
1089       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1090       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1091
1092     case FetchNode:
1093       do_the_fetchnode(event);
1094       goto next_thread;             /* handle next event in event queue  */
1095       
1096     case GlobalBlock:
1097       do_the_globalblock(event);
1098       goto next_thread;             /* handle next event in event queue  */
1099       
1100     case FetchReply:
1101       do_the_fetchreply(event);
1102       goto next_thread;             /* handle next event in event queue  */
1103       
1104     case UnblockThread:   /* Move from the blocked queue to the tail of */
1105       do_the_unblock(event);
1106       goto next_thread;             /* handle next event in event queue  */
1107       
1108     case ResumeThread:  /* Move from the blocked queue to the tail of */
1109       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1110       event->tso->gran.blocktime += 
1111         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1112       do_the_startthread(event);
1113       goto next_thread;             /* handle next event in event queue  */
1114       
1115     case StartThread:
1116       do_the_startthread(event);
1117       goto next_thread;             /* handle next event in event queue  */
1118       
1119     case MoveThread:
1120       do_the_movethread(event);
1121       goto next_thread;             /* handle next event in event queue  */
1122       
1123     case MoveSpark:
1124       do_the_movespark(event);
1125       goto next_thread;             /* handle next event in event queue  */
1126       
1127     case FindWork:
1128       do_the_findwork(event);
1129       goto next_thread;             /* handle next event in event queue  */
1130       
1131     default:
1132       barf("Illegal event type %u\n", event->evttype);
1133     }  /* switch */
1134     
1135     /* This point was scheduler_loop in the old RTS */
1136
1137     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1138
1139     TimeOfLastEvent = CurrentTime[CurrentProc];
1140     TimeOfNextEvent = get_time_of_next_event();
1141     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1142     // CurrentTSO = ThreadQueueHd;
1143
1144     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1145                          TimeOfNextEvent));
1146
1147     if (RtsFlags.GranFlags.Light) 
1148       GranSimLight_leave_system(event, &ActiveTSO); 
1149
1150     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1151
1152     IF_DEBUG(gran, 
1153              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1154
1155     /* in a GranSim setup the TSO stays on the run queue */
1156     t = CurrentTSO;
1157     /* Take a thread from the run queue. */
1158     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1159
1160     IF_DEBUG(gran, 
1161              debugBelch("GRAN: About to run current thread, which is\n");
1162              G_TSO(t,5));
1163
1164     context_switch = 0; // turned on via GranYield, checking events and time slice
1165
1166     IF_DEBUG(gran, 
1167              DumpGranEvent(GR_SCHEDULE, t));
1168
1169     procStatus[CurrentProc] = Busy;
1170 }
1171 #endif // GRAN
1172
1173 /* ----------------------------------------------------------------------------
1174  * Send pending messages (PARALLEL_HASKELL only)
1175  * ------------------------------------------------------------------------- */
1176
1177 #if defined(PARALLEL_HASKELL)
1178 static StgTSO *
1179 scheduleSendPendingMessages(void)
1180 {
1181     StgSparkPool *pool;
1182     rtsSpark spark;
1183     StgTSO *t;
1184
1185 # if defined(PAR) // global Mem.Mgmt., omit for now
1186     if (PendingFetches != END_BF_QUEUE) {
1187         processFetches();
1188     }
1189 # endif
1190     
1191     if (RtsFlags.ParFlags.BufferTime) {
1192         // if we use message buffering, we must send away all message
1193         // packets which have become too old...
1194         sendOldBuffers(); 
1195     }
1196 }
1197 #endif
1198
1199 /* ----------------------------------------------------------------------------
1200  * Activate spark threads (PARALLEL_HASKELL only)
1201  * ------------------------------------------------------------------------- */
1202
1203 #if defined(PARALLEL_HASKELL)
1204 static void
1205 scheduleActivateSpark(void)
1206 {
1207 #if defined(SPARKS)
1208   ASSERT(emptyRunQueue());
1209 /* We get here if the run queue is empty and want some work.
1210    We try to turn a spark into a thread, and add it to the run queue,
1211    from where it will be picked up in the next iteration of the scheduler
1212    loop.
1213 */
1214
1215       /* :-[  no local threads => look out for local sparks */
1216       /* the spark pool for the current PE */
1217       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1218       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1219           pool->hd < pool->tl) {
1220         /* 
1221          * ToDo: add GC code check that we really have enough heap afterwards!!
1222          * Old comment:
1223          * If we're here (no runnable threads) and we have pending
1224          * sparks, we must have a space problem.  Get enough space
1225          * to turn one of those pending sparks into a
1226          * thread... 
1227          */
1228
1229         spark = findSpark(rtsFalse);            /* get a spark */
1230         if (spark != (rtsSpark) NULL) {
1231           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1232           IF_PAR_DEBUG(fish, // schedule,
1233                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1234                              tso->id, tso, advisory_thread_count));
1235
1236           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1237             IF_PAR_DEBUG(fish, // schedule,
1238                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1239                             spark));
1240             return rtsFalse; /* failed to generate a thread */
1241           }                  /* otherwise fall through & pick-up new tso */
1242         } else {
1243           IF_PAR_DEBUG(fish, // schedule,
1244                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1245                              spark_queue_len(pool)));
1246           return rtsFalse;  /* failed to generate a thread */
1247         }
1248         return rtsTrue;  /* success in generating a thread */
1249   } else { /* no more threads permitted or pool empty */
1250     return rtsFalse;  /* failed to generateThread */
1251   }
1252 #else
1253   tso = NULL; // avoid compiler warning only
1254   return rtsFalse;  /* dummy in non-PAR setup */
1255 #endif // SPARKS
1256 }
1257 #endif // PARALLEL_HASKELL
1258
1259 /* ----------------------------------------------------------------------------
1260  * Get work from a remote node (PARALLEL_HASKELL only)
1261  * ------------------------------------------------------------------------- */
1262     
1263 #if defined(PARALLEL_HASKELL)
1264 static rtsBool
1265 scheduleGetRemoteWork(rtsBool *receivedFinish)
1266 {
1267   ASSERT(emptyRunQueue());
1268
1269   if (RtsFlags.ParFlags.BufferTime) {
1270         IF_PAR_DEBUG(verbose, 
1271                 debugBelch("...send all pending data,"));
1272         {
1273           nat i;
1274           for (i=1; i<=nPEs; i++)
1275             sendImmediately(i); // send all messages away immediately
1276         }
1277   }
1278 # ifndef SPARKS
1279         //++EDEN++ idle() , i.e. send all buffers, wait for work
1280         // suppress fishing in EDEN... just look for incoming messages
1281         // (blocking receive)
1282   IF_PAR_DEBUG(verbose, 
1283                debugBelch("...wait for incoming messages...\n"));
1284   *receivedFinish = processMessages(); // blocking receive...
1285
1286         // and reenter scheduling loop after having received something
1287         // (return rtsFalse below)
1288
1289 # else /* activate SPARKS machinery */
1290 /* We get here, if we have no work, tried to activate a local spark, but still
1291    have no work. We try to get a remote spark, by sending a FISH message.
1292    Thread migration should be added here, and triggered when a sequence of 
1293    fishes returns without work. */
1294         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1295
1296       /* =8-[  no local sparks => look for work on other PEs */
1297         /*
1298          * We really have absolutely no work.  Send out a fish
1299          * (there may be some out there already), and wait for
1300          * something to arrive.  We clearly can't run any threads
1301          * until a SCHEDULE or RESUME arrives, and so that's what
1302          * we're hoping to see.  (Of course, we still have to
1303          * respond to other types of messages.)
1304          */
1305         rtsTime now = msTime() /*CURRENT_TIME*/;
1306         IF_PAR_DEBUG(verbose, 
1307                      debugBelch("--  now=%ld\n", now));
1308         IF_PAR_DEBUG(fish, // verbose,
1309              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1310                  (last_fish_arrived_at!=0 &&
1311                   last_fish_arrived_at+delay > now)) {
1312                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1313                      now, last_fish_arrived_at+delay, 
1314                      last_fish_arrived_at,
1315                      delay);
1316              });
1317   
1318         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1319             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1320           if (last_fish_arrived_at==0 ||
1321               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1322             /* outstandingFishes is set in sendFish, processFish;
1323                avoid flooding system with fishes via delay */
1324     next_fish_to_send_at = 0;  
1325   } else {
1326     /* ToDo: this should be done in the main scheduling loop to avoid the
1327              busy wait here; not so bad if fish delay is very small  */
1328     int iq = 0; // DEBUGGING -- HWL
1329     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1330     /* send a fish when ready, but process messages that arrive in the meantime */
1331     do {
1332       if (PacketsWaiting()) {
1333         iq++; // DEBUGGING
1334         *receivedFinish = processMessages();
1335       }
1336       now = msTime();
1337     } while (!*receivedFinish || now<next_fish_to_send_at);
1338     // JB: This means the fish could become obsolete, if we receive
1339     // work. Better check for work again? 
1340     // last line: while (!receivedFinish || !haveWork || now<...)
1341     // next line: if (receivedFinish || haveWork )
1342
1343     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1344       return rtsFalse;  // NB: this will leave scheduler loop
1345                         // immediately after return!
1346                           
1347     IF_PAR_DEBUG(fish, // verbose,
1348                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1349
1350   }
1351
1352     // JB: IMHO, this should all be hidden inside sendFish(...)
1353     /* pe = choosePE(); 
1354        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1355                 NEW_FISH_HUNGER);
1356
1357     // Global statistics: count no. of fishes
1358     if (RtsFlags.ParFlags.ParStats.Global &&
1359          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1360            globalParStats.tot_fish_mess++;
1361            }
1362     */ 
1363
1364   /* delayed fishes must have been sent by now! */
1365   next_fish_to_send_at = 0;  
1366   }
1367       
1368   *receivedFinish = processMessages();
1369 # endif /* SPARKS */
1370
1371  return rtsFalse;
1372  /* NB: this function always returns rtsFalse, meaning the scheduler
1373     loop continues with the next iteration; 
1374     rationale: 
1375       return code means success in finding work; we enter this function
1376       if there is no local work, thus have to send a fish which takes
1377       time until it arrives with work; in the meantime we should process
1378       messages in the main loop;
1379  */
1380 }
1381 #endif // PARALLEL_HASKELL
1382
1383 /* ----------------------------------------------------------------------------
1384  * PAR/GRAN: Report stats & debugging info(?)
1385  * ------------------------------------------------------------------------- */
1386
1387 #if defined(PAR) || defined(GRAN)
1388 static void
1389 scheduleGranParReport(void)
1390 {
1391   ASSERT(run_queue_hd != END_TSO_QUEUE);
1392
1393   /* Take a thread from the run queue, if we have work */
1394   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1395
1396     /* If this TSO has got its outport closed in the meantime, 
1397      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1398      * It has to be marked as TH_DEAD for this purpose.
1399      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1400
1401 JB: TODO: investigate wether state change field could be nuked
1402      entirely and replaced by the normal tso state (whatnext
1403      field). All we want to do is to kill tsos from outside.
1404      */
1405
1406     /* ToDo: write something to the log-file
1407     if (RTSflags.ParFlags.granSimStats && !sameThread)
1408         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1409
1410     CurrentTSO = t;
1411     */
1412     /* the spark pool for the current PE */
1413     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1414
1415     IF_DEBUG(scheduler, 
1416              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1417                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1418
1419     IF_PAR_DEBUG(fish,
1420              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1421                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1422
1423     if (RtsFlags.ParFlags.ParStats.Full && 
1424         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1425         (emitSchedule || // forced emit
1426          (t && LastTSO && t->id != LastTSO->id))) {
1427       /* 
1428          we are running a different TSO, so write a schedule event to log file
1429          NB: If we use fair scheduling we also have to write  a deschedule 
1430              event for LastTSO; with unfair scheduling we know that the
1431              previous tso has blocked whenever we switch to another tso, so
1432              we don't need it in GUM for now
1433       */
1434       IF_PAR_DEBUG(fish, // schedule,
1435                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1436
1437       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1438                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1439       emitSchedule = rtsFalse;
1440     }
1441 }     
1442 #endif
1443
1444 /* ----------------------------------------------------------------------------
1445  * After running a thread...
1446  * ------------------------------------------------------------------------- */
1447
1448 static void
1449 schedulePostRunThread(void)
1450 {
1451 #if defined(PAR)
1452     /* HACK 675: if the last thread didn't yield, make sure to print a 
1453        SCHEDULE event to the log file when StgRunning the next thread, even
1454        if it is the same one as before */
1455     LastTSO = t; 
1456     TimeOfLastYield = CURRENT_TIME;
1457 #endif
1458
1459   /* some statistics gathering in the parallel case */
1460
1461 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1462   switch (ret) {
1463     case HeapOverflow:
1464 # if defined(GRAN)
1465       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1466       globalGranStats.tot_heapover++;
1467 # elif defined(PAR)
1468       globalParStats.tot_heapover++;
1469 # endif
1470       break;
1471
1472      case StackOverflow:
1473 # if defined(GRAN)
1474       IF_DEBUG(gran, 
1475                DumpGranEvent(GR_DESCHEDULE, t));
1476       globalGranStats.tot_stackover++;
1477 # elif defined(PAR)
1478       // IF_DEBUG(par, 
1479       // DumpGranEvent(GR_DESCHEDULE, t);
1480       globalParStats.tot_stackover++;
1481 # endif
1482       break;
1483
1484     case ThreadYielding:
1485 # if defined(GRAN)
1486       IF_DEBUG(gran, 
1487                DumpGranEvent(GR_DESCHEDULE, t));
1488       globalGranStats.tot_yields++;
1489 # elif defined(PAR)
1490       // IF_DEBUG(par, 
1491       // DumpGranEvent(GR_DESCHEDULE, t);
1492       globalParStats.tot_yields++;
1493 # endif
1494       break; 
1495
1496     case ThreadBlocked:
1497 # if defined(GRAN)
1498         debugTrace(DEBUG_sched, 
1499                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1500                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1501                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1502                if (t->block_info.closure!=(StgClosure*)NULL)
1503                  print_bq(t->block_info.closure);
1504                debugBelch("\n"));
1505
1506       // ??? needed; should emit block before
1507       IF_DEBUG(gran, 
1508                DumpGranEvent(GR_DESCHEDULE, t)); 
1509       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1510       /*
1511         ngoq Dogh!
1512       ASSERT(procStatus[CurrentProc]==Busy || 
1513               ((procStatus[CurrentProc]==Fetching) && 
1514               (t->block_info.closure!=(StgClosure*)NULL)));
1515       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1516           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1517             procStatus[CurrentProc]==Fetching)) 
1518         procStatus[CurrentProc] = Idle;
1519       */
1520 # elif defined(PAR)
1521 //++PAR++  blockThread() writes the event (change?)
1522 # endif
1523     break;
1524
1525   case ThreadFinished:
1526     break;
1527
1528   default:
1529     barf("parGlobalStats: unknown return code");
1530     break;
1531     }
1532 #endif
1533 }
1534
1535 /* -----------------------------------------------------------------------------
1536  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1537  * -------------------------------------------------------------------------- */
1538
1539 static rtsBool
1540 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1541 {
1542     // did the task ask for a large block?
1543     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1544         // if so, get one and push it on the front of the nursery.
1545         bdescr *bd;
1546         lnat blocks;
1547         
1548         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1549         
1550         debugTrace(DEBUG_sched,
1551                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1552                    (long)t->id, whatNext_strs[t->what_next], blocks);
1553     
1554         // don't do this if the nursery is (nearly) full, we'll GC first.
1555         if (cap->r.rCurrentNursery->link != NULL ||
1556             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1557                                                // if the nursery has only one block.
1558             
1559             ACQUIRE_SM_LOCK
1560             bd = allocGroup( blocks );
1561             RELEASE_SM_LOCK
1562             cap->r.rNursery->n_blocks += blocks;
1563             
1564             // link the new group into the list
1565             bd->link = cap->r.rCurrentNursery;
1566             bd->u.back = cap->r.rCurrentNursery->u.back;
1567             if (cap->r.rCurrentNursery->u.back != NULL) {
1568                 cap->r.rCurrentNursery->u.back->link = bd;
1569             } else {
1570 #if !defined(THREADED_RTS)
1571                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1572                        g0s0 == cap->r.rNursery);
1573 #endif
1574                 cap->r.rNursery->blocks = bd;
1575             }             
1576             cap->r.rCurrentNursery->u.back = bd;
1577             
1578             // initialise it as a nursery block.  We initialise the
1579             // step, gen_no, and flags field of *every* sub-block in
1580             // this large block, because this is easier than making
1581             // sure that we always find the block head of a large
1582             // block whenever we call Bdescr() (eg. evacuate() and
1583             // isAlive() in the GC would both have to do this, at
1584             // least).
1585             { 
1586                 bdescr *x;
1587                 for (x = bd; x < bd + blocks; x++) {
1588                     x->step = cap->r.rNursery;
1589                     x->gen_no = 0;
1590                     x->flags = 0;
1591                 }
1592             }
1593             
1594             // This assert can be a killer if the app is doing lots
1595             // of large block allocations.
1596             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1597             
1598             // now update the nursery to point to the new block
1599             cap->r.rCurrentNursery = bd;
1600             
1601             // we might be unlucky and have another thread get on the
1602             // run queue before us and steal the large block, but in that
1603             // case the thread will just end up requesting another large
1604             // block.
1605             pushOnRunQueue(cap,t);
1606             return rtsFalse;  /* not actually GC'ing */
1607         }
1608     }
1609     
1610     debugTrace(DEBUG_sched,
1611                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1612                (long)t->id, whatNext_strs[t->what_next]);
1613
1614 #if defined(GRAN)
1615     ASSERT(!is_on_queue(t,CurrentProc));
1616 #elif defined(PARALLEL_HASKELL)
1617     /* Currently we emit a DESCHEDULE event before GC in GUM.
1618        ToDo: either add separate event to distinguish SYSTEM time from rest
1619        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1620     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1621         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1622                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1623         emitSchedule = rtsTrue;
1624     }
1625 #endif
1626       
1627     pushOnRunQueue(cap,t);
1628     return rtsTrue;
1629     /* actual GC is done at the end of the while loop in schedule() */
1630 }
1631
1632 /* -----------------------------------------------------------------------------
1633  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1634  * -------------------------------------------------------------------------- */
1635
1636 static void
1637 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1638 {
1639     debugTrace (DEBUG_sched,
1640                 "--<< thread %ld (%s) stopped, StackOverflow", 
1641                 (long)t->id, whatNext_strs[t->what_next]);
1642
1643     /* just adjust the stack for this thread, then pop it back
1644      * on the run queue.
1645      */
1646     { 
1647         /* enlarge the stack */
1648         StgTSO *new_t = threadStackOverflow(cap, t);
1649         
1650         /* The TSO attached to this Task may have moved, so update the
1651          * pointer to it.
1652          */
1653         if (task->tso == t) {
1654             task->tso = new_t;
1655         }
1656         pushOnRunQueue(cap,new_t);
1657     }
1658 }
1659
1660 /* -----------------------------------------------------------------------------
1661  * Handle a thread that returned to the scheduler with ThreadYielding
1662  * -------------------------------------------------------------------------- */
1663
1664 static rtsBool
1665 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1666 {
1667     // Reset the context switch flag.  We don't do this just before
1668     // running the thread, because that would mean we would lose ticks
1669     // during GC, which can lead to unfair scheduling (a thread hogs
1670     // the CPU because the tick always arrives during GC).  This way
1671     // penalises threads that do a lot of allocation, but that seems
1672     // better than the alternative.
1673     context_switch = 0;
1674     
1675     /* put the thread back on the run queue.  Then, if we're ready to
1676      * GC, check whether this is the last task to stop.  If so, wake
1677      * up the GC thread.  getThread will block during a GC until the
1678      * GC is finished.
1679      */
1680 #ifdef DEBUG
1681     if (t->what_next != prev_what_next) {
1682         debugTrace(DEBUG_sched,
1683                    "--<< thread %ld (%s) stopped to switch evaluators", 
1684                    (long)t->id, whatNext_strs[t->what_next]);
1685     } else {
1686         debugTrace(DEBUG_sched,
1687                    "--<< thread %ld (%s) stopped, yielding",
1688                    (long)t->id, whatNext_strs[t->what_next]);
1689     }
1690 #endif
1691     
1692     IF_DEBUG(sanity,
1693              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1694              checkTSO(t));
1695     ASSERT(t->link == END_TSO_QUEUE);
1696     
1697     // Shortcut if we're just switching evaluators: don't bother
1698     // doing stack squeezing (which can be expensive), just run the
1699     // thread.
1700     if (t->what_next != prev_what_next) {
1701         return rtsTrue;
1702     }
1703     
1704 #if defined(GRAN)
1705     ASSERT(!is_on_queue(t,CurrentProc));
1706       
1707     IF_DEBUG(sanity,
1708              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1709              checkThreadQsSanity(rtsTrue));
1710
1711 #endif
1712
1713     addToRunQueue(cap,t);
1714
1715 #if defined(GRAN)
1716     /* add a ContinueThread event to actually process the thread */
1717     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1718               ContinueThread,
1719               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1720     IF_GRAN_DEBUG(bq, 
1721                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1722                   G_EVENTQ(0);
1723                   G_CURR_THREADQ(0));
1724 #endif
1725     return rtsFalse;
1726 }
1727
1728 /* -----------------------------------------------------------------------------
1729  * Handle a thread that returned to the scheduler with ThreadBlocked
1730  * -------------------------------------------------------------------------- */
1731
1732 static void
1733 scheduleHandleThreadBlocked( StgTSO *t
1734 #if !defined(GRAN) && !defined(DEBUG)
1735     STG_UNUSED
1736 #endif
1737     )
1738 {
1739 #if defined(GRAN)
1740     IF_DEBUG(scheduler,
1741              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1742                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1743              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1744     
1745     // ??? needed; should emit block before
1746     IF_DEBUG(gran, 
1747              DumpGranEvent(GR_DESCHEDULE, t)); 
1748     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1749     /*
1750       ngoq Dogh!
1751       ASSERT(procStatus[CurrentProc]==Busy || 
1752       ((procStatus[CurrentProc]==Fetching) && 
1753       (t->block_info.closure!=(StgClosure*)NULL)));
1754       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1755       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1756       procStatus[CurrentProc]==Fetching)) 
1757       procStatus[CurrentProc] = Idle;
1758     */
1759 #elif defined(PAR)
1760     IF_DEBUG(scheduler,
1761              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1762                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1763     IF_PAR_DEBUG(bq,
1764                  
1765                  if (t->block_info.closure!=(StgClosure*)NULL) 
1766                  print_bq(t->block_info.closure));
1767     
1768     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1769     blockThread(t);
1770     
1771     /* whatever we schedule next, we must log that schedule */
1772     emitSchedule = rtsTrue;
1773     
1774 #else /* !GRAN */
1775
1776       // We don't need to do anything.  The thread is blocked, and it
1777       // has tidied up its stack and placed itself on whatever queue
1778       // it needs to be on.
1779
1780     // ASSERT(t->why_blocked != NotBlocked);
1781     // Not true: for example,
1782     //    - in THREADED_RTS, the thread may already have been woken
1783     //      up by another Capability.  This actually happens: try
1784     //      conc023 +RTS -N2.
1785     //    - the thread may have woken itself up already, because
1786     //      threadPaused() might have raised a blocked throwTo
1787     //      exception, see maybePerformBlockedException().
1788
1789 #ifdef DEBUG
1790     if (traceClass(DEBUG_sched)) {
1791         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1792                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1793         printThreadBlockage(t);
1794         debugTraceEnd();
1795     }
1796 #endif
1797     
1798     /* Only for dumping event to log file 
1799        ToDo: do I need this in GranSim, too?
1800        blockThread(t);
1801     */
1802 #endif
1803 }
1804
1805 /* -----------------------------------------------------------------------------
1806  * Handle a thread that returned to the scheduler with ThreadFinished
1807  * -------------------------------------------------------------------------- */
1808
1809 static rtsBool
1810 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1811 {
1812     /* Need to check whether this was a main thread, and if so,
1813      * return with the return value.
1814      *
1815      * We also end up here if the thread kills itself with an
1816      * uncaught exception, see Exception.cmm.
1817      */
1818     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1819                (unsigned long)t->id, whatNext_strs[t->what_next]);
1820
1821     /* Inform the Hpc that a thread has finished */
1822     hs_hpc_thread_finished_event(t);
1823
1824 #if defined(GRAN)
1825       endThread(t, CurrentProc); // clean-up the thread
1826 #elif defined(PARALLEL_HASKELL)
1827       /* For now all are advisory -- HWL */
1828       //if(t->priority==AdvisoryPriority) ??
1829       advisory_thread_count--; // JB: Caution with this counter, buggy!
1830       
1831 # if defined(DIST)
1832       if(t->dist.priority==RevalPriority)
1833         FinishReval(t);
1834 # endif
1835     
1836 # if defined(EDENOLD)
1837       // the thread could still have an outport... (BUG)
1838       if (t->eden.outport != -1) {
1839       // delete the outport for the tso which has finished...
1840         IF_PAR_DEBUG(eden_ports,
1841                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1842                               t->eden.outport, t->id));
1843         deleteOPT(t);
1844       }
1845       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1846       if (t->eden.epid != -1) {
1847         IF_PAR_DEBUG(eden_ports,
1848                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1849                            t->id, t->eden.epid));
1850         removeTSOfromProcess(t);
1851       }
1852 # endif 
1853
1854 # if defined(PAR)
1855       if (RtsFlags.ParFlags.ParStats.Full &&
1856           !RtsFlags.ParFlags.ParStats.Suppressed) 
1857         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1858
1859       //  t->par only contains statistics: left out for now...
1860       IF_PAR_DEBUG(fish,
1861                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1862                               t->id,t,t->par.sparkname));
1863 # endif
1864 #endif // PARALLEL_HASKELL
1865
1866       //
1867       // Check whether the thread that just completed was a bound
1868       // thread, and if so return with the result.  
1869       //
1870       // There is an assumption here that all thread completion goes
1871       // through this point; we need to make sure that if a thread
1872       // ends up in the ThreadKilled state, that it stays on the run
1873       // queue so it can be dealt with here.
1874       //
1875
1876       if (t->bound) {
1877
1878           if (t->bound != task) {
1879 #if !defined(THREADED_RTS)
1880               // Must be a bound thread that is not the topmost one.  Leave
1881               // it on the run queue until the stack has unwound to the
1882               // point where we can deal with this.  Leaving it on the run
1883               // queue also ensures that the garbage collector knows about
1884               // this thread and its return value (it gets dropped from the
1885               // all_threads list so there's no other way to find it).
1886               appendToRunQueue(cap,t);
1887               return rtsFalse;
1888 #else
1889               // this cannot happen in the threaded RTS, because a
1890               // bound thread can only be run by the appropriate Task.
1891               barf("finished bound thread that isn't mine");
1892 #endif
1893           }
1894
1895           ASSERT(task->tso == t);
1896
1897           if (t->what_next == ThreadComplete) {
1898               if (task->ret) {
1899                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1900                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1901               }
1902               task->stat = Success;
1903           } else {
1904               if (task->ret) {
1905                   *(task->ret) = NULL;
1906               }
1907               if (sched_state >= SCHED_INTERRUPTING) {
1908                   task->stat = Interrupted;
1909               } else {
1910                   task->stat = Killed;
1911               }
1912           }
1913 #ifdef DEBUG
1914           removeThreadLabel((StgWord)task->tso->id);
1915 #endif
1916           return rtsTrue; // tells schedule() to return
1917       }
1918
1919       return rtsFalse;
1920 }
1921
1922 /* -----------------------------------------------------------------------------
1923  * Perform a heap census, if PROFILING
1924  * -------------------------------------------------------------------------- */
1925
1926 static rtsBool
1927 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1928 {
1929 #if defined(PROFILING)
1930     // When we have +RTS -i0 and we're heap profiling, do a census at
1931     // every GC.  This lets us get repeatable runs for debugging.
1932     if (performHeapProfile ||
1933         (RtsFlags.ProfFlags.profileInterval==0 &&
1934          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1935
1936         // checking black holes is necessary before GC, otherwise
1937         // there may be threads that are unreachable except by the
1938         // blackhole queue, which the GC will consider to be
1939         // deadlocked.
1940         scheduleCheckBlackHoles(&MainCapability);
1941
1942         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1943         GarbageCollect(rtsTrue);
1944
1945         debugTrace(DEBUG_sched, "performing heap census");
1946         heapCensus();
1947
1948         performHeapProfile = rtsFalse;
1949         return rtsTrue;  // true <=> we already GC'd
1950     }
1951 #endif
1952     return rtsFalse;
1953 }
1954
1955 /* -----------------------------------------------------------------------------
1956  * Perform a garbage collection if necessary
1957  * -------------------------------------------------------------------------- */
1958
1959 static Capability *
1960 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1961 {
1962     StgTSO *t;
1963 #ifdef THREADED_RTS
1964     static volatile StgWord waiting_for_gc;
1965     rtsBool was_waiting;
1966     nat i;
1967 #endif
1968
1969 #ifdef THREADED_RTS
1970     // In order to GC, there must be no threads running Haskell code.
1971     // Therefore, the GC thread needs to hold *all* the capabilities,
1972     // and release them after the GC has completed.  
1973     //
1974     // This seems to be the simplest way: previous attempts involved
1975     // making all the threads with capabilities give up their
1976     // capabilities and sleep except for the *last* one, which
1977     // actually did the GC.  But it's quite hard to arrange for all
1978     // the other tasks to sleep and stay asleep.
1979     //
1980         
1981     was_waiting = cas(&waiting_for_gc, 0, 1);
1982     if (was_waiting) {
1983         do {
1984             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1985             if (cap) yieldCapability(&cap,task);
1986         } while (waiting_for_gc);
1987         return cap;  // NOTE: task->cap might have changed here
1988     }
1989
1990     for (i=0; i < n_capabilities; i++) {
1991         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1992         if (cap != &capabilities[i]) {
1993             Capability *pcap = &capabilities[i];
1994             // we better hope this task doesn't get migrated to
1995             // another Capability while we're waiting for this one.
1996             // It won't, because load balancing happens while we have
1997             // all the Capabilities, but even so it's a slightly
1998             // unsavoury invariant.
1999             task->cap = pcap;
2000             context_switch = 1;
2001             waitForReturnCapability(&pcap, task);
2002             if (pcap != &capabilities[i]) {
2003                 barf("scheduleDoGC: got the wrong capability");
2004             }
2005         }
2006     }
2007
2008     waiting_for_gc = rtsFalse;
2009 #endif
2010
2011     /* Kick any transactions which are invalid back to their
2012      * atomically frames.  When next scheduled they will try to
2013      * commit, this commit will fail and they will retry.
2014      */
2015     { 
2016         StgTSO *next;
2017
2018         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2019             if (t->what_next == ThreadRelocated) {
2020                 next = t->link;
2021             } else {
2022                 next = t->global_link;
2023                 
2024                 // This is a good place to check for blocked
2025                 // exceptions.  It might be the case that a thread is
2026                 // blocked on delivering an exception to a thread that
2027                 // is also blocked - we try to ensure that this
2028                 // doesn't happen in throwTo(), but it's too hard (or
2029                 // impossible) to close all the race holes, so we
2030                 // accept that some might get through and deal with
2031                 // them here.  A GC will always happen at some point,
2032                 // even if the system is otherwise deadlocked.
2033                 maybePerformBlockedException (&capabilities[0], t);
2034
2035                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2036                     if (!stmValidateNestOfTransactions (t -> trec)) {
2037                         debugTrace(DEBUG_sched | DEBUG_stm,
2038                                    "trec %p found wasting its time", t);
2039                         
2040                         // strip the stack back to the
2041                         // ATOMICALLY_FRAME, aborting the (nested)
2042                         // transaction, and saving the stack of any
2043                         // partially-evaluated thunks on the heap.
2044                         throwToSingleThreaded_(&capabilities[0], t, 
2045                                                NULL, rtsTrue, NULL);
2046                         
2047 #ifdef REG_R1
2048                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2049 #endif
2050                     }
2051                 }
2052             }
2053         }
2054     }
2055     
2056     // so this happens periodically:
2057     if (cap) scheduleCheckBlackHoles(cap);
2058     
2059     IF_DEBUG(scheduler, printAllThreads());
2060
2061     /*
2062      * We now have all the capabilities; if we're in an interrupting
2063      * state, then we should take the opportunity to delete all the
2064      * threads in the system.
2065      */
2066     if (sched_state >= SCHED_INTERRUPTING) {
2067         deleteAllThreads(&capabilities[0]);
2068         sched_state = SCHED_SHUTTING_DOWN;
2069     }
2070
2071     /* everybody back, start the GC.
2072      * Could do it in this thread, or signal a condition var
2073      * to do it in another thread.  Either way, we need to
2074      * broadcast on gc_pending_cond afterward.
2075      */
2076 #if defined(THREADED_RTS)
2077     debugTrace(DEBUG_sched, "doing GC");
2078 #endif
2079     GarbageCollect(force_major);
2080     
2081 #if defined(THREADED_RTS)
2082     // release our stash of capabilities.
2083     for (i = 0; i < n_capabilities; i++) {
2084         if (cap != &capabilities[i]) {
2085             task->cap = &capabilities[i];
2086             releaseCapability(&capabilities[i]);
2087         }
2088     }
2089     if (cap) {
2090         task->cap = cap;
2091     } else {
2092         task->cap = NULL;
2093     }
2094 #endif
2095
2096 #if defined(GRAN)
2097     /* add a ContinueThread event to continue execution of current thread */
2098     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2099               ContinueThread,
2100               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2101     IF_GRAN_DEBUG(bq, 
2102                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2103                   G_EVENTQ(0);
2104                   G_CURR_THREADQ(0));
2105 #endif /* GRAN */
2106
2107     return cap;
2108 }
2109
2110 /* ---------------------------------------------------------------------------
2111  * Singleton fork(). Do not copy any running threads.
2112  * ------------------------------------------------------------------------- */
2113
2114 StgInt
2115 forkProcess(HsStablePtr *entry
2116 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2117             STG_UNUSED
2118 #endif
2119            )
2120 {
2121 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2122     Task *task;
2123     pid_t pid;
2124     StgTSO* t,*next;
2125     Capability *cap;
2126     
2127 #if defined(THREADED_RTS)
2128     if (RtsFlags.ParFlags.nNodes > 1) {
2129         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2130         stg_exit(EXIT_FAILURE);
2131     }
2132 #endif
2133
2134     debugTrace(DEBUG_sched, "forking!");
2135     
2136     // ToDo: for SMP, we should probably acquire *all* the capabilities
2137     cap = rts_lock();
2138     
2139     pid = fork();
2140     
2141     if (pid) { // parent
2142         
2143         // just return the pid
2144         rts_unlock(cap);
2145         return pid;
2146         
2147     } else { // child
2148         
2149         // Now, all OS threads except the thread that forked are
2150         // stopped.  We need to stop all Haskell threads, including
2151         // those involved in foreign calls.  Also we need to delete
2152         // all Tasks, because they correspond to OS threads that are
2153         // now gone.
2154
2155         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2156             if (t->what_next == ThreadRelocated) {
2157                 next = t->link;
2158             } else {
2159                 next = t->global_link;
2160                 // don't allow threads to catch the ThreadKilled
2161                 // exception, but we do want to raiseAsync() because these
2162                 // threads may be evaluating thunks that we need later.
2163                 deleteThread_(cap,t);
2164             }
2165         }
2166         
2167         // Empty the run queue.  It seems tempting to let all the
2168         // killed threads stay on the run queue as zombies to be
2169         // cleaned up later, but some of them correspond to bound
2170         // threads for which the corresponding Task does not exist.
2171         cap->run_queue_hd = END_TSO_QUEUE;
2172         cap->run_queue_tl = END_TSO_QUEUE;
2173
2174         // Any suspended C-calling Tasks are no more, their OS threads
2175         // don't exist now:
2176         cap->suspended_ccalling_tasks = NULL;
2177
2178         // Empty the all_threads list.  Otherwise, the garbage
2179         // collector may attempt to resurrect some of these threads.
2180         all_threads = END_TSO_QUEUE;
2181
2182         // Wipe the task list, except the current Task.
2183         ACQUIRE_LOCK(&sched_mutex);
2184         for (task = all_tasks; task != NULL; task=task->all_link) {
2185             if (task != cap->running_task) {
2186                 discardTask(task);
2187             }
2188         }
2189         RELEASE_LOCK(&sched_mutex);
2190
2191 #if defined(THREADED_RTS)
2192         // Wipe our spare workers list, they no longer exist.  New
2193         // workers will be created if necessary.
2194         cap->spare_workers = NULL;
2195         cap->returning_tasks_hd = NULL;
2196         cap->returning_tasks_tl = NULL;
2197 #endif
2198
2199         // On Unix, all timers are reset in the child, so we need to start
2200         // the timer again.
2201         startTimer();
2202
2203         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2204         rts_checkSchedStatus("forkProcess",cap);
2205         
2206         rts_unlock(cap);
2207         hs_exit();                      // clean up and exit
2208         stg_exit(EXIT_SUCCESS);
2209     }
2210 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2211     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2212     return -1;
2213 #endif
2214 }
2215
2216 /* ---------------------------------------------------------------------------
2217  * Delete all the threads in the system
2218  * ------------------------------------------------------------------------- */
2219    
2220 static void
2221 deleteAllThreads ( Capability *cap )
2222 {
2223     // NOTE: only safe to call if we own all capabilities.
2224
2225     StgTSO* t, *next;
2226     debugTrace(DEBUG_sched,"deleting all threads");
2227     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2228         if (t->what_next == ThreadRelocated) {
2229             next = t->link;
2230         } else {
2231             next = t->global_link;
2232             deleteThread(cap,t);
2233         }
2234     }      
2235
2236     // The run queue now contains a bunch of ThreadKilled threads.  We
2237     // must not throw these away: the main thread(s) will be in there
2238     // somewhere, and the main scheduler loop has to deal with it.
2239     // Also, the run queue is the only thing keeping these threads from
2240     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2241
2242 #if !defined(THREADED_RTS)
2243     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2244     ASSERT(sleeping_queue == END_TSO_QUEUE);
2245 #endif
2246 }
2247
2248 /* -----------------------------------------------------------------------------
2249    Managing the suspended_ccalling_tasks list.
2250    Locks required: sched_mutex
2251    -------------------------------------------------------------------------- */
2252
2253 STATIC_INLINE void
2254 suspendTask (Capability *cap, Task *task)
2255 {
2256     ASSERT(task->next == NULL && task->prev == NULL);
2257     task->next = cap->suspended_ccalling_tasks;
2258     task->prev = NULL;
2259     if (cap->suspended_ccalling_tasks) {
2260         cap->suspended_ccalling_tasks->prev = task;
2261     }
2262     cap->suspended_ccalling_tasks = task;
2263 }
2264
2265 STATIC_INLINE void
2266 recoverSuspendedTask (Capability *cap, Task *task)
2267 {
2268     if (task->prev) {
2269         task->prev->next = task->next;
2270     } else {
2271         ASSERT(cap->suspended_ccalling_tasks == task);
2272         cap->suspended_ccalling_tasks = task->next;
2273     }
2274     if (task->next) {
2275         task->next->prev = task->prev;
2276     }
2277     task->next = task->prev = NULL;
2278 }
2279
2280 /* ---------------------------------------------------------------------------
2281  * Suspending & resuming Haskell threads.
2282  * 
2283  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2284  * its capability before calling the C function.  This allows another
2285  * task to pick up the capability and carry on running Haskell
2286  * threads.  It also means that if the C call blocks, it won't lock
2287  * the whole system.
2288  *
2289  * The Haskell thread making the C call is put to sleep for the
2290  * duration of the call, on the susepended_ccalling_threads queue.  We
2291  * give out a token to the task, which it can use to resume the thread
2292  * on return from the C function.
2293  * ------------------------------------------------------------------------- */
2294    
2295 void *
2296 suspendThread (StgRegTable *reg)
2297 {
2298   Capability *cap;
2299   int saved_errno;
2300   StgTSO *tso;
2301   Task *task;
2302 #if mingw32_HOST_OS
2303   StgWord32 saved_winerror;
2304 #endif
2305
2306   saved_errno = errno;
2307 #if mingw32_HOST_OS
2308   saved_winerror = GetLastError();
2309 #endif
2310
2311   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2312    */
2313   cap = regTableToCapability(reg);
2314
2315   task = cap->running_task;
2316   tso = cap->r.rCurrentTSO;
2317
2318   debugTrace(DEBUG_sched, 
2319              "thread %lu did a safe foreign call", 
2320              (unsigned long)cap->r.rCurrentTSO->id);
2321
2322   // XXX this might not be necessary --SDM
2323   tso->what_next = ThreadRunGHC;
2324
2325   threadPaused(cap,tso);
2326
2327   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2328       tso->why_blocked = BlockedOnCCall;
2329       tso->flags |= TSO_BLOCKEX;
2330       tso->flags &= ~TSO_INTERRUPTIBLE;
2331   } else {
2332       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2333   }
2334
2335   // Hand back capability
2336   task->suspended_tso = tso;
2337
2338   ACQUIRE_LOCK(&cap->lock);
2339
2340   suspendTask(cap,task);
2341   cap->in_haskell = rtsFalse;
2342   releaseCapability_(cap);
2343   
2344   RELEASE_LOCK(&cap->lock);
2345
2346 #if defined(THREADED_RTS)
2347   /* Preparing to leave the RTS, so ensure there's a native thread/task
2348      waiting to take over.
2349   */
2350   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2351 #endif
2352
2353   errno = saved_errno;
2354 #if mingw32_HOST_OS
2355   SetLastError(saved_winerror);
2356 #endif
2357   return task;
2358 }
2359
2360 StgRegTable *
2361 resumeThread (void *task_)
2362 {
2363     StgTSO *tso;
2364     Capability *cap;
2365     Task *task = task_;
2366     int saved_errno;
2367 #if mingw32_HOST_OS
2368     StgWord32 saved_winerror;
2369 #endif
2370
2371     saved_errno = errno;
2372 #if mingw32_HOST_OS
2373     saved_winerror = GetLastError();
2374 #endif
2375
2376     cap = task->cap;
2377     // Wait for permission to re-enter the RTS with the result.
2378     waitForReturnCapability(&cap,task);
2379     // we might be on a different capability now... but if so, our
2380     // entry on the suspended_ccalling_tasks list will also have been
2381     // migrated.
2382
2383     // Remove the thread from the suspended list
2384     recoverSuspendedTask(cap,task);
2385
2386     tso = task->suspended_tso;
2387     task->suspended_tso = NULL;
2388     tso->link = END_TSO_QUEUE;
2389     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2390     
2391     if (tso->why_blocked == BlockedOnCCall) {
2392         awakenBlockedExceptionQueue(cap,tso);
2393         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2394     }
2395     
2396     /* Reset blocking status */
2397     tso->why_blocked  = NotBlocked;
2398     
2399     cap->r.rCurrentTSO = tso;
2400     cap->in_haskell = rtsTrue;
2401     errno = saved_errno;
2402 #if mingw32_HOST_OS
2403     SetLastError(saved_winerror);
2404 #endif
2405
2406     /* We might have GC'd, mark the TSO dirty again */
2407     dirtyTSO(tso);
2408
2409     IF_DEBUG(sanity, checkTSO(tso));
2410
2411     return &cap->r;
2412 }
2413
2414 /* ---------------------------------------------------------------------------
2415  * scheduleThread()
2416  *
2417  * scheduleThread puts a thread on the end  of the runnable queue.
2418  * This will usually be done immediately after a thread is created.
2419  * The caller of scheduleThread must create the thread using e.g.
2420  * createThread and push an appropriate closure
2421  * on this thread's stack before the scheduler is invoked.
2422  * ------------------------------------------------------------------------ */
2423
2424 void
2425 scheduleThread(Capability *cap, StgTSO *tso)
2426 {
2427     // The thread goes at the *end* of the run-queue, to avoid possible
2428     // starvation of any threads already on the queue.
2429     appendToRunQueue(cap,tso);
2430 }
2431
2432 void
2433 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2434 {
2435 #if defined(THREADED_RTS)
2436     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2437                               // move this thread from now on.
2438     cpu %= RtsFlags.ParFlags.nNodes;
2439     if (cpu == cap->no) {
2440         appendToRunQueue(cap,tso);
2441     } else {
2442         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2443     }
2444 #else
2445     appendToRunQueue(cap,tso);
2446 #endif
2447 }
2448
2449 Capability *
2450 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2451 {
2452     Task *task;
2453
2454     // We already created/initialised the Task
2455     task = cap->running_task;
2456
2457     // This TSO is now a bound thread; make the Task and TSO
2458     // point to each other.
2459     tso->bound = task;
2460     tso->cap = cap;
2461
2462     task->tso = tso;
2463     task->ret = ret;
2464     task->stat = NoStatus;
2465
2466     appendToRunQueue(cap,tso);
2467
2468     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2469
2470 #if defined(GRAN)
2471     /* GranSim specific init */
2472     CurrentTSO = m->tso;                // the TSO to run
2473     procStatus[MainProc] = Busy;        // status of main PE
2474     CurrentProc = MainProc;             // PE to run it on
2475 #endif
2476
2477     cap = schedule(cap,task);
2478
2479     ASSERT(task->stat != NoStatus);
2480     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2481
2482     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2483     return cap;
2484 }
2485
2486 /* ----------------------------------------------------------------------------
2487  * Starting Tasks
2488  * ------------------------------------------------------------------------- */
2489
2490 #if defined(THREADED_RTS)
2491 void
2492 workerStart(Task *task)
2493 {
2494     Capability *cap;
2495
2496     // See startWorkerTask().
2497     ACQUIRE_LOCK(&task->lock);
2498     cap = task->cap;
2499     RELEASE_LOCK(&task->lock);
2500
2501     // set the thread-local pointer to the Task:
2502     taskEnter(task);
2503
2504     // schedule() runs without a lock.
2505     cap = schedule(cap,task);
2506
2507     // On exit from schedule(), we have a Capability.
2508     releaseCapability(cap);
2509     workerTaskStop(task);
2510 }
2511 #endif
2512
2513 /* ---------------------------------------------------------------------------
2514  * initScheduler()
2515  *
2516  * Initialise the scheduler.  This resets all the queues - if the
2517  * queues contained any threads, they'll be garbage collected at the
2518  * next pass.
2519  *
2520  * ------------------------------------------------------------------------ */
2521
2522 void 
2523 initScheduler(void)
2524 {
2525 #if defined(GRAN)
2526   nat i;
2527   for (i=0; i<=MAX_PROC; i++) {
2528     run_queue_hds[i]      = END_TSO_QUEUE;
2529     run_queue_tls[i]      = END_TSO_QUEUE;
2530     blocked_queue_hds[i]  = END_TSO_QUEUE;
2531     blocked_queue_tls[i]  = END_TSO_QUEUE;
2532     ccalling_threadss[i]  = END_TSO_QUEUE;
2533     blackhole_queue[i]    = END_TSO_QUEUE;
2534     sleeping_queue        = END_TSO_QUEUE;
2535   }
2536 #elif !defined(THREADED_RTS)
2537   blocked_queue_hd  = END_TSO_QUEUE;
2538   blocked_queue_tl  = END_TSO_QUEUE;
2539   sleeping_queue    = END_TSO_QUEUE;
2540 #endif
2541
2542   blackhole_queue   = END_TSO_QUEUE;
2543   all_threads       = END_TSO_QUEUE;
2544
2545   context_switch = 0;
2546   sched_state    = SCHED_RUNNING;
2547
2548 #if defined(THREADED_RTS)
2549   /* Initialise the mutex and condition variables used by
2550    * the scheduler. */
2551   initMutex(&sched_mutex);
2552 #endif
2553   
2554   ACQUIRE_LOCK(&sched_mutex);
2555
2556   /* A capability holds the state a native thread needs in
2557    * order to execute STG code. At least one capability is
2558    * floating around (only THREADED_RTS builds have more than one).
2559    */
2560   initCapabilities();
2561
2562   initTaskManager();
2563
2564 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2565   initSparkPools();
2566 #endif
2567
2568 #if defined(THREADED_RTS)
2569   /*
2570    * Eagerly start one worker to run each Capability, except for
2571    * Capability 0.  The idea is that we're probably going to start a
2572    * bound thread on Capability 0 pretty soon, so we don't want a
2573    * worker task hogging it.
2574    */
2575   { 
2576       nat i;
2577       Capability *cap;
2578       for (i = 1; i < n_capabilities; i++) {
2579           cap = &capabilities[i];
2580           ACQUIRE_LOCK(&cap->lock);
2581           startWorkerTask(cap, workerStart);
2582           RELEASE_LOCK(&cap->lock);
2583       }
2584   }
2585 #endif
2586
2587   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2588
2589   RELEASE_LOCK(&sched_mutex);
2590 }
2591
2592 void
2593 exitScheduler( void )
2594 {
2595     Task *task = NULL;
2596
2597 #if defined(THREADED_RTS)
2598     ACQUIRE_LOCK(&sched_mutex);
2599     task = newBoundTask();
2600     RELEASE_LOCK(&sched_mutex);
2601 #endif
2602
2603     // If we haven't killed all the threads yet, do it now.
2604     if (sched_state < SCHED_SHUTTING_DOWN) {
2605         sched_state = SCHED_INTERRUPTING;
2606         scheduleDoGC(NULL,task,rtsFalse);    
2607     }
2608     sched_state = SCHED_SHUTTING_DOWN;
2609
2610 #if defined(THREADED_RTS)
2611     { 
2612         nat i;
2613         
2614         for (i = 0; i < n_capabilities; i++) {
2615             shutdownCapability(&capabilities[i], task);
2616         }
2617         boundTaskExiting(task);
2618         stopTaskManager();
2619     }
2620 #else
2621     freeCapability(&MainCapability);
2622 #endif
2623 }
2624
2625 void
2626 freeScheduler( void )
2627 {
2628     freeTaskManager();
2629     if (n_capabilities != 1) {
2630         stgFree(capabilities);
2631     }
2632 #if defined(THREADED_RTS)
2633     closeMutex(&sched_mutex);
2634 #endif
2635 }
2636
2637 /* ---------------------------------------------------------------------------
2638    Where are the roots that we know about?
2639
2640         - all the threads on the runnable queue
2641         - all the threads on the blocked queue
2642         - all the threads on the sleeping queue
2643         - all the thread currently executing a _ccall_GC
2644         - all the "main threads"
2645      
2646    ------------------------------------------------------------------------ */
2647
2648 /* This has to be protected either by the scheduler monitor, or by the
2649         garbage collection monitor (probably the latter).
2650         KH @ 25/10/99
2651 */
2652
2653 void
2654 GetRoots( evac_fn evac )
2655 {
2656     nat i;
2657     Capability *cap;
2658     Task *task;
2659
2660 #if defined(GRAN)
2661     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2662         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2663             evac((StgClosure **)&run_queue_hds[i]);
2664         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2665             evac((StgClosure **)&run_queue_tls[i]);
2666         
2667         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2668             evac((StgClosure **)&blocked_queue_hds[i]);
2669         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2670             evac((StgClosure **)&blocked_queue_tls[i]);
2671         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2672             evac((StgClosure **)&ccalling_threads[i]);
2673     }
2674
2675     markEventQueue();
2676
2677 #else /* !GRAN */
2678
2679     for (i = 0; i < n_capabilities; i++) {
2680         cap = &capabilities[i];
2681         evac((StgClosure **)(void *)&cap->run_queue_hd);
2682         evac((StgClosure **)(void *)&cap->run_queue_tl);
2683 #if defined(THREADED_RTS)
2684         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2685         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2686 #endif
2687         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2688              task=task->next) {
2689             debugTrace(DEBUG_sched,
2690                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2691             evac((StgClosure **)(void *)&task->suspended_tso);
2692         }
2693
2694     }
2695     
2696
2697 #if !defined(THREADED_RTS)
2698     evac((StgClosure **)(void *)&blocked_queue_hd);
2699     evac((StgClosure **)(void *)&blocked_queue_tl);
2700     evac((StgClosure **)(void *)&sleeping_queue);
2701 #endif 
2702 #endif
2703
2704     // evac((StgClosure **)&blackhole_queue);
2705
2706 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2707     markSparkQueue(evac);
2708 #endif
2709     
2710 #if defined(RTS_USER_SIGNALS)
2711     // mark the signal handlers (signals should be already blocked)
2712     if (RtsFlags.MiscFlags.install_signal_handlers) {
2713         markSignalHandlers(evac);
2714     }
2715 #endif
2716 }
2717
2718 /* -----------------------------------------------------------------------------
2719    performGC
2720
2721    This is the interface to the garbage collector from Haskell land.
2722    We provide this so that external C code can allocate and garbage
2723    collect when called from Haskell via _ccall_GC.
2724    -------------------------------------------------------------------------- */
2725
2726 static void
2727 performGC_(rtsBool force_major)
2728 {
2729     Task *task;
2730     // We must grab a new Task here, because the existing Task may be
2731     // associated with a particular Capability, and chained onto the 
2732     // suspended_ccalling_tasks queue.
2733     ACQUIRE_LOCK(&sched_mutex);
2734     task = newBoundTask();
2735     RELEASE_LOCK(&sched_mutex);
2736     scheduleDoGC(NULL,task,force_major);
2737     boundTaskExiting(task);
2738 }
2739
2740 void
2741 performGC(void)
2742 {
2743     performGC_(rtsFalse);
2744 }
2745
2746 void
2747 performMajorGC(void)
2748 {
2749     performGC_(rtsTrue);
2750 }
2751
2752 /* -----------------------------------------------------------------------------
2753    Stack overflow
2754
2755    If the thread has reached its maximum stack size, then raise the
2756    StackOverflow exception in the offending thread.  Otherwise
2757    relocate the TSO into a larger chunk of memory and adjust its stack
2758    size appropriately.
2759    -------------------------------------------------------------------------- */
2760
2761 static StgTSO *
2762 threadStackOverflow(Capability *cap, StgTSO *tso)
2763 {
2764   nat new_stack_size, stack_words;
2765   lnat new_tso_size;
2766   StgPtr new_sp;
2767   StgTSO *dest;
2768
2769   IF_DEBUG(sanity,checkTSO(tso));
2770
2771   // don't allow throwTo() to modify the blocked_exceptions queue
2772   // while we are moving the TSO:
2773   lockClosure((StgClosure *)tso);
2774
2775   if (tso->stack_size >= tso->max_stack_size) {
2776
2777       debugTrace(DEBUG_gc,
2778                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2779                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2780       IF_DEBUG(gc,
2781                /* If we're debugging, just print out the top of the stack */
2782                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2783                                                 tso->sp+64)));
2784
2785       // Send this thread the StackOverflow exception
2786       unlockTSO(tso);
2787       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2788       return tso;
2789   }
2790
2791   /* Try to double the current stack size.  If that takes us over the
2792    * maximum stack size for this thread, then use the maximum instead.
2793    * Finally round up so the TSO ends up as a whole number of blocks.
2794    */
2795   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2796   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2797                                        TSO_STRUCT_SIZE)/sizeof(W_);
2798   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2799   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2800
2801   debugTrace(DEBUG_sched, 
2802              "increasing stack size from %ld words to %d.",
2803              (long)tso->stack_size, new_stack_size);
2804
2805   dest = (StgTSO *)allocate(new_tso_size);
2806   TICK_ALLOC_TSO(new_stack_size,0);
2807
2808   /* copy the TSO block and the old stack into the new area */
2809   memcpy(dest,tso,TSO_STRUCT_SIZE);
2810   stack_words = tso->stack + tso->stack_size - tso->sp;
2811   new_sp = (P_)dest + new_tso_size - stack_words;
2812   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2813
2814   /* relocate the stack pointers... */
2815   dest->sp         = new_sp;
2816   dest->stack_size = new_stack_size;
2817         
2818   /* Mark the old TSO as relocated.  We have to check for relocated
2819    * TSOs in the garbage collector and any primops that deal with TSOs.
2820    *
2821    * It's important to set the sp value to just beyond the end
2822    * of the stack, so we don't attempt to scavenge any part of the
2823    * dead TSO's stack.
2824    */
2825   tso->what_next = ThreadRelocated;
2826   tso->link = dest;
2827   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2828   tso->why_blocked = NotBlocked;
2829
2830   IF_PAR_DEBUG(verbose,
2831                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2832                      tso->id, tso, tso->stack_size);
2833                /* If we're debugging, just print out the top of the stack */
2834                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2835                                                 tso->sp+64)));
2836   
2837   unlockTSO(dest);
2838   unlockTSO(tso);
2839
2840   IF_DEBUG(sanity,checkTSO(dest));
2841 #if 0
2842   IF_DEBUG(scheduler,printTSO(dest));
2843 #endif
2844
2845   return dest;
2846 }
2847
2848 /* ---------------------------------------------------------------------------
2849    Interrupt execution
2850    - usually called inside a signal handler so it mustn't do anything fancy.   
2851    ------------------------------------------------------------------------ */
2852
2853 void
2854 interruptStgRts(void)
2855 {
2856     sched_state = SCHED_INTERRUPTING;
2857     context_switch = 1;
2858     wakeUpRts();
2859 }
2860
2861 /* -----------------------------------------------------------------------------
2862    Wake up the RTS
2863    
2864    This function causes at least one OS thread to wake up and run the
2865    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2866    an external event has arrived that may need servicing (eg. a
2867    keyboard interrupt).
2868
2869    In the single-threaded RTS we don't do anything here; we only have
2870    one thread anyway, and the event that caused us to want to wake up
2871    will have interrupted any blocking system call in progress anyway.
2872    -------------------------------------------------------------------------- */
2873
2874 void
2875 wakeUpRts(void)
2876 {
2877 #if defined(THREADED_RTS)
2878     // This forces the IO Manager thread to wakeup, which will
2879     // in turn ensure that some OS thread wakes up and runs the
2880     // scheduler loop, which will cause a GC and deadlock check.
2881     ioManagerWakeup();
2882 #endif
2883 }
2884
2885 /* -----------------------------------------------------------------------------
2886  * checkBlackHoles()
2887  *
2888  * Check the blackhole_queue for threads that can be woken up.  We do
2889  * this periodically: before every GC, and whenever the run queue is
2890  * empty.
2891  *
2892  * An elegant solution might be to just wake up all the blocked
2893  * threads with awakenBlockedQueue occasionally: they'll go back to
2894  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2895  * doesn't give us a way to tell whether we've actually managed to
2896  * wake up any threads, so we would be busy-waiting.
2897  *
2898  * -------------------------------------------------------------------------- */
2899
2900 static rtsBool
2901 checkBlackHoles (Capability *cap)
2902 {
2903     StgTSO **prev, *t;
2904     rtsBool any_woke_up = rtsFalse;
2905     StgHalfWord type;
2906
2907     // blackhole_queue is global:
2908     ASSERT_LOCK_HELD(&sched_mutex);
2909
2910     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2911
2912     // ASSUMES: sched_mutex
2913     prev = &blackhole_queue;
2914     t = blackhole_queue;
2915     while (t != END_TSO_QUEUE) {
2916         ASSERT(t->why_blocked == BlockedOnBlackHole);
2917         type = get_itbl(t->block_info.closure)->type;
2918         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2919             IF_DEBUG(sanity,checkTSO(t));
2920             t = unblockOne(cap, t);
2921             // urk, the threads migrate to the current capability
2922             // here, but we'd like to keep them on the original one.
2923             *prev = t;
2924             any_woke_up = rtsTrue;
2925         } else {
2926             prev = &t->link;
2927             t = t->link;
2928         }
2929     }
2930
2931     return any_woke_up;
2932 }
2933
2934 /* -----------------------------------------------------------------------------
2935    Deleting threads
2936
2937    This is used for interruption (^C) and forking, and corresponds to
2938    raising an exception but without letting the thread catch the
2939    exception.
2940    -------------------------------------------------------------------------- */
2941
2942 static void 
2943 deleteThread (Capability *cap, StgTSO *tso)
2944 {
2945     // NOTE: must only be called on a TSO that we have exclusive
2946     // access to, because we will call throwToSingleThreaded() below.
2947     // The TSO must be on the run queue of the Capability we own, or 
2948     // we must own all Capabilities.
2949
2950     if (tso->why_blocked != BlockedOnCCall &&
2951         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2952         throwToSingleThreaded(cap,tso,NULL);
2953     }
2954 }
2955
2956 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2957 static void 
2958 deleteThread_(Capability *cap, StgTSO *tso)
2959 { // for forkProcess only:
2960   // like deleteThread(), but we delete threads in foreign calls, too.
2961
2962     if (tso->why_blocked == BlockedOnCCall ||
2963         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2964         unblockOne(cap,tso);
2965         tso->what_next = ThreadKilled;
2966     } else {
2967         deleteThread(cap,tso);
2968     }
2969 }
2970 #endif
2971
2972 /* -----------------------------------------------------------------------------
2973    raiseExceptionHelper
2974    
2975    This function is called by the raise# primitve, just so that we can
2976    move some of the tricky bits of raising an exception from C-- into
2977    C.  Who knows, it might be a useful re-useable thing here too.
2978    -------------------------------------------------------------------------- */
2979
2980 StgWord
2981 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2982 {
2983     Capability *cap = regTableToCapability(reg);
2984     StgThunk *raise_closure = NULL;
2985     StgPtr p, next;
2986     StgRetInfoTable *info;
2987     //
2988     // This closure represents the expression 'raise# E' where E
2989     // is the exception raise.  It is used to overwrite all the
2990     // thunks which are currently under evaluataion.
2991     //
2992
2993     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2994     // LDV profiling: stg_raise_info has THUNK as its closure
2995     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2996     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2997     // 1 does not cause any problem unless profiling is performed.
2998     // However, when LDV profiling goes on, we need to linearly scan
2999     // small object pool, where raise_closure is stored, so we should
3000     // use MIN_UPD_SIZE.
3001     //
3002     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3003     //                                 sizeofW(StgClosure)+1);
3004     //
3005
3006     //
3007     // Walk up the stack, looking for the catch frame.  On the way,
3008     // we update any closures pointed to from update frames with the
3009     // raise closure that we just built.
3010     //
3011     p = tso->sp;
3012     while(1) {
3013         info = get_ret_itbl((StgClosure *)p);
3014         next = p + stack_frame_sizeW((StgClosure *)p);
3015         switch (info->i.type) {
3016             
3017         case UPDATE_FRAME:
3018             // Only create raise_closure if we need to.
3019             if (raise_closure == NULL) {
3020                 raise_closure = 
3021                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3022                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3023                 raise_closure->payload[0] = exception;
3024             }
3025             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3026             p = next;
3027             continue;
3028
3029         case ATOMICALLY_FRAME:
3030             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3031             tso->sp = p;
3032             return ATOMICALLY_FRAME;
3033             
3034         case CATCH_FRAME:
3035             tso->sp = p;
3036             return CATCH_FRAME;
3037
3038         case CATCH_STM_FRAME:
3039             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3040             tso->sp = p;
3041             return CATCH_STM_FRAME;
3042             
3043         case STOP_FRAME:
3044             tso->sp = p;
3045             return STOP_FRAME;
3046
3047         case CATCH_RETRY_FRAME:
3048         default:
3049             p = next; 
3050             continue;
3051         }
3052     }
3053 }
3054
3055
3056 /* -----------------------------------------------------------------------------
3057    findRetryFrameHelper
3058
3059    This function is called by the retry# primitive.  It traverses the stack
3060    leaving tso->sp referring to the frame which should handle the retry.  
3061
3062    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3063    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3064
3065    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3066    create) because retries are not considered to be exceptions, despite the
3067    similar implementation.
3068
3069    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3070    not be created within memory transactions.
3071    -------------------------------------------------------------------------- */
3072
3073 StgWord
3074 findRetryFrameHelper (StgTSO *tso)
3075 {
3076   StgPtr           p, next;
3077   StgRetInfoTable *info;
3078
3079   p = tso -> sp;
3080   while (1) {
3081     info = get_ret_itbl((StgClosure *)p);
3082     next = p + stack_frame_sizeW((StgClosure *)p);
3083     switch (info->i.type) {
3084       
3085     case ATOMICALLY_FRAME:
3086         debugTrace(DEBUG_stm,
3087                    "found ATOMICALLY_FRAME at %p during retry", p);
3088         tso->sp = p;
3089         return ATOMICALLY_FRAME;
3090       
3091     case CATCH_RETRY_FRAME:
3092         debugTrace(DEBUG_stm,
3093                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3094         tso->sp = p;
3095         return CATCH_RETRY_FRAME;
3096       
3097     case CATCH_STM_FRAME: {
3098         debugTrace(DEBUG_stm,
3099                    "found CATCH_STM_FRAME at %p during retry", p);
3100         StgTRecHeader *trec = tso -> trec;
3101         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3102         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3103         stmAbortTransaction(tso -> cap, trec);
3104         stmFreeAbortedTRec(tso -> cap, trec);
3105         tso -> trec = outer;
3106         p = next; 
3107         continue;
3108     }
3109       
3110
3111     default:
3112       ASSERT(info->i.type != CATCH_FRAME);
3113       ASSERT(info->i.type != STOP_FRAME);
3114       p = next; 
3115       continue;
3116     }
3117   }
3118 }
3119
3120 /* -----------------------------------------------------------------------------
3121    resurrectThreads is called after garbage collection on the list of
3122    threads found to be garbage.  Each of these threads will be woken
3123    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3124    on an MVar, or NonTermination if the thread was blocked on a Black
3125    Hole.
3126
3127    Locks: assumes we hold *all* the capabilities.
3128    -------------------------------------------------------------------------- */
3129
3130 void
3131 resurrectThreads (StgTSO *threads)
3132 {
3133     StgTSO *tso, *next;
3134     Capability *cap;
3135
3136     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3137         next = tso->global_link;
3138         tso->global_link = all_threads;
3139         all_threads = tso;
3140         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3141         
3142         // Wake up the thread on the Capability it was last on
3143         cap = tso->cap;
3144         
3145         switch (tso->why_blocked) {
3146         case BlockedOnMVar:
3147         case BlockedOnException:
3148             /* Called by GC - sched_mutex lock is currently held. */
3149             throwToSingleThreaded(cap, tso,
3150                                   (StgClosure *)BlockedOnDeadMVar_closure);
3151             break;
3152         case BlockedOnBlackHole:
3153             throwToSingleThreaded(cap, tso,
3154                                   (StgClosure *)NonTermination_closure);
3155             break;
3156         case BlockedOnSTM:
3157             throwToSingleThreaded(cap, tso,
3158                                   (StgClosure *)BlockedIndefinitely_closure);
3159             break;
3160         case NotBlocked:
3161             /* This might happen if the thread was blocked on a black hole
3162              * belonging to a thread that we've just woken up (raiseAsync
3163              * can wake up threads, remember...).
3164              */
3165             continue;
3166         default:
3167             barf("resurrectThreads: thread blocked in a strange way");
3168         }
3169     }
3170 }