Add support for the IO manager thread on Windows
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #ifdef PROFILING
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #endif
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
37 # include "GranSim.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
41 # include "FetchMe.h"
42 # include "HLC.h"
43 #endif
44 #include "Sparks.h"
45 #include "Capability.h"
46 #include "Task.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
50 #endif
51 #include "Trace.h"
52 #include "RaiseAsync.h"
53 #include "Threads.h"
54 #include "ThrIOManager.h"
55
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
58 #endif
59 #ifdef HAVE_UNISTD_H
60 #include <unistd.h>
61 #endif
62
63 #include <string.h>
64 #include <stdlib.h>
65 #include <stdarg.h>
66
67 #ifdef HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 // Turn off inlining when debugging - it obfuscates things
72 #ifdef DEBUG
73 # undef  STATIC_INLINE
74 # define STATIC_INLINE static
75 #endif
76
77 /* -----------------------------------------------------------------------------
78  * Global variables
79  * -------------------------------------------------------------------------- */
80
81 #if defined(GRAN)
82
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86 /* 
87    In GranSim we have a runnable and a blocked queue for each processor.
88    In order to minimise code changes new arrays run_queue_hds/tls
89    are created. run_queue_hd is then a short cut (macro) for
90    run_queue_hds[CurrentProc] (see GranSim.h).
91    -- HWL
92 */
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97    the std RTS (i.e. we are cheating). However, we don't use this list in
98    the GranSim specific code at the moment (so we are only potentially
99    cheating).  */
100
101 #else /* !GRAN */
102
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
108 #endif
109
110 /* Threads blocked on blackholes.
111  * LOCK: sched_mutex+capability, or all capabilities
112  */
113 StgTSO *blackhole_queue = NULL;
114 #endif
115
116 /* The blackhole_queue should be checked for threads to wake up.  See
117  * Schedule.h for more thorough comment.
118  * LOCK: none (doesn't matter if we miss an update)
119  */
120 rtsBool blackholes_need_checking = rtsFalse;
121
122 /* Linked list of all threads.
123  * Used for detecting garbage collected threads.
124  * LOCK: sched_mutex+capability, or all capabilities
125  */
126 StgTSO *all_threads = NULL;
127
128 /* flag set by signal handler to precipitate a context switch
129  * LOCK: none (just an advisory flag)
130  */
131 int context_switch = 0;
132
133 /* flag that tracks whether we have done any execution in this time slice.
134  * LOCK: currently none, perhaps we should lock (but needs to be
135  * updated in the fast path of the scheduler).
136  */
137 nat recent_activity = ACTIVITY_YES;
138
139 /* if this flag is set as well, give up execution
140  * LOCK: none (changes once, from false->true)
141  */
142 rtsBool sched_state = SCHED_RUNNING;
143
144 #if defined(GRAN)
145 StgTSO *CurrentTSO;
146 #endif
147
148 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
149  *  exists - earlier gccs apparently didn't.
150  *  -= chak
151  */
152 StgTSO dummy_tso;
153
154 /*
155  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156  * in an MT setting, needed to signal that a worker thread shouldn't hang around
157  * in the scheduler when it is out of work.
158  */
159 rtsBool shutting_down_scheduler = rtsFalse;
160
161 /*
162  * This mutex protects most of the global scheduler data in
163  * the THREADED_RTS runtime.
164  */
165 #if defined(THREADED_RTS)
166 Mutex sched_mutex;
167 #endif
168
169 #if defined(PARALLEL_HASKELL)
170 StgTSO *LastTSO;
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
173 #endif
174
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
177 #endif
178
179 /* -----------------------------------------------------------------------------
180  * static function prototypes
181  * -------------------------------------------------------------------------- */
182
183 static Capability *schedule (Capability *initialCapability, Task *task);
184
185 //
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
189 //
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
193 #endif
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 #if defined(GRAN)
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #endif
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #endif
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
209 #endif
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
213                                          StgTSO *t);
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
215                                     nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218                                              StgTSO *t );
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221                                 rtsBool force_major);
222
223 static rtsBool checkBlackHoles(Capability *cap);
224
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
229
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
232 #endif
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);  
237 #endif
238
239 #ifdef DEBUG
240 static char *whatNext_strs[] = {
241   "(unknown)",
242   "ThreadRunGHC",
243   "ThreadInterpret",
244   "ThreadKilled",
245   "ThreadRelocated",
246   "ThreadComplete"
247 };
248 #endif
249
250 /* -----------------------------------------------------------------------------
251  * Putting a thread on the run queue: different scheduling policies
252  * -------------------------------------------------------------------------- */
253
254 STATIC_INLINE void
255 addToRunQueue( Capability *cap, StgTSO *t )
256 {
257 #if defined(PARALLEL_HASKELL)
258     if (RtsFlags.ParFlags.doFairScheduling) { 
259         // this does round-robin scheduling; good for concurrency
260         appendToRunQueue(cap,t);
261     } else {
262         // this does unfair scheduling; good for parallelism
263         pushOnRunQueue(cap,t);
264     }
265 #else
266     // this does round-robin scheduling; good for concurrency
267     appendToRunQueue(cap,t);
268 #endif
269 }
270
271 /* ---------------------------------------------------------------------------
272    Main scheduling loop.
273
274    We use round-robin scheduling, each thread returning to the
275    scheduler loop when one of these conditions is detected:
276
277       * out of heap space
278       * timer expires (thread yields)
279       * thread blocks
280       * thread ends
281       * stack overflow
282
283    GRAN version:
284      In a GranSim setup this loop iterates over the global event queue.
285      This revolves around the global event queue, which determines what 
286      to do next. Therefore, it's more complicated than either the 
287      concurrent or the parallel (GUM) setup.
288
289    GUM version:
290      GUM iterates over incoming messages.
291      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292      and sends out a fish whenever it has nothing to do; in-between
293      doing the actual reductions (shared code below) it processes the
294      incoming messages and deals with delayed operations 
295      (see PendingFetches).
296      This is not the ugliest code you could imagine, but it's bloody close.
297
298    ------------------------------------------------------------------------ */
299
300 static Capability *
301 schedule (Capability *initialCapability, Task *task)
302 {
303   StgTSO *t;
304   Capability *cap;
305   StgThreadReturnCode ret;
306 #if defined(GRAN)
307   rtsEvent *event;
308 #elif defined(PARALLEL_HASKELL)
309   StgTSO *tso;
310   GlobalTaskId pe;
311   rtsBool receivedFinish = rtsFalse;
312 # if defined(DEBUG)
313   nat tp_size, sp_size; // stats only
314 # endif
315 #endif
316   nat prev_what_next;
317   rtsBool ready_to_gc;
318 #if defined(THREADED_RTS)
319   rtsBool first = rtsTrue;
320 #endif
321   
322   cap = initialCapability;
323
324   // Pre-condition: this task owns initialCapability.
325   // The sched_mutex is *NOT* held
326   // NB. on return, we still hold a capability.
327
328   debugTrace (DEBUG_sched, 
329               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330               task, initialCapability);
331
332   schedulePreLoop();
333
334   // -----------------------------------------------------------
335   // Scheduler loop starts here:
336
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION        (!receivedFinish)
339 #elif defined(GRAN)
340 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
341 #else
342 #define TERMINATION_CONDITION        rtsTrue
343 #endif
344
345   while (TERMINATION_CONDITION) {
346
347 #if defined(GRAN)
348       /* Choose the processor with the next event */
349       CurrentProc = event->proc;
350       CurrentTSO = event->tso;
351 #endif
352
353 #if defined(THREADED_RTS)
354       if (first) {
355           // don't yield the first time, we want a chance to run this
356           // thread for a bit, even if there are others banging at the
357           // door.
358           first = rtsFalse;
359           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360       } else {
361           // Yield the capability to higher-priority tasks if necessary.
362           yieldCapability(&cap, task);
363       }
364 #endif
365       
366 #if defined(THREADED_RTS)
367       schedulePushWork(cap,task);
368 #endif
369
370     // Check whether we have re-entered the RTS from Haskell without
371     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372     // call).
373     if (cap->in_haskell) {
374           errorBelch("schedule: re-entered unsafely.\n"
375                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
376           stg_exit(EXIT_FAILURE);
377     }
378
379     // The interruption / shutdown sequence.
380     // 
381     // In order to cleanly shut down the runtime, we want to:
382     //   * make sure that all main threads return to their callers
383     //     with the state 'Interrupted'.
384     //   * clean up all OS threads assocated with the runtime
385     //   * free all memory etc.
386     //
387     // So the sequence for ^C goes like this:
388     //
389     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
390     //     arranges for some Capability to wake up
391     //
392     //   * all threads in the system are halted, and the zombies are
393     //     placed on the run queue for cleaning up.  We acquire all
394     //     the capabilities in order to delete the threads, this is
395     //     done by scheduleDoGC() for convenience (because GC already
396     //     needs to acquire all the capabilities).  We can't kill
397     //     threads involved in foreign calls.
398     // 
399     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
400     //
401     //   * sched_state := SCHED_SHUTTING_DOWN
402     //
403     //   * all workers exit when the run queue on their capability
404     //     drains.  All main threads will also exit when their TSO
405     //     reaches the head of the run queue and they can return.
406     //
407     //   * eventually all Capabilities will shut down, and the RTS can
408     //     exit.
409     //
410     //   * We might be left with threads blocked in foreign calls, 
411     //     we should really attempt to kill these somehow (TODO);
412     
413     switch (sched_state) {
414     case SCHED_RUNNING:
415         break;
416     case SCHED_INTERRUPTING:
417         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419         discardSparksCap(cap);
420 #endif
421         /* scheduleDoGC() deletes all the threads */
422         cap = scheduleDoGC(cap,task,rtsFalse);
423         break;
424     case SCHED_SHUTTING_DOWN:
425         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426         // If we are a worker, just exit.  If we're a bound thread
427         // then we will exit below when we've removed our TSO from
428         // the run queue.
429         if (task->tso == NULL && emptyRunQueue(cap)) {
430             return cap;
431         }
432         break;
433     default:
434         barf("sched_state: %d", sched_state);
435     }
436
437 #if defined(THREADED_RTS)
438     // If the run queue is empty, take a spark and turn it into a thread.
439     {
440         if (emptyRunQueue(cap)) {
441             StgClosure *spark;
442             spark = findSpark(cap);
443             if (spark != NULL) {
444                 debugTrace(DEBUG_sched,
445                            "turning spark of closure %p into a thread",
446                            (StgClosure *)spark);
447                 createSparkThread(cap,spark);     
448             }
449         }
450     }
451 #endif // THREADED_RTS
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckWakeupThreads(cap);
462
463     scheduleCheckBlockedThreads(cap);
464
465     scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467     cap = task->cap;    // reload cap, it might have changed
468 #endif
469
470     // Normally, the only way we can get here with no threads to
471     // run is if a keyboard interrupt received during 
472     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473     // Additionally, it is not fatal for the
474     // threaded RTS to reach here with no threads to run.
475     //
476     // win32: might be here due to awaitEvent() being abandoned
477     // as a result of a console event having been delivered.
478     if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480         ASSERT(sched_state >= SCHED_INTERRUPTING);
481 #endif
482         continue; // nothing to do
483     }
484
485 #if defined(PARALLEL_HASKELL)
486     scheduleSendPendingMessages();
487     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
488         continue;
489
490 #if defined(SPARKS)
491     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
492 #endif
493
494     /* If we still have no work we need to send a FISH to get a spark
495        from another PE */
496     if (emptyRunQueue(cap)) {
497         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498         ASSERT(rtsFalse); // should not happen at the moment
499     }
500     // from here: non-empty run queue.
501     //  TODO: merge above case with this, only one call processMessages() !
502     if (PacketsWaiting()) {  /* process incoming messages, if
503                                 any pending...  only in else
504                                 because getRemoteWork waits for
505                                 messages as well */
506         receivedFinish = processMessages();
507     }
508 #endif
509
510 #if defined(GRAN)
511     scheduleProcessEvent(event);
512 #endif
513
514     // 
515     // Get a thread to run
516     //
517     t = popRunQueue(cap);
518
519 #if defined(GRAN) || defined(PAR)
520     scheduleGranParReport(); // some kind of debuging output
521 #else
522     // Sanity check the thread we're about to run.  This can be
523     // expensive if there is lots of thread switching going on...
524     IF_DEBUG(sanity,checkTSO(t));
525 #endif
526
527 #if defined(THREADED_RTS)
528     // Check whether we can run this thread in the current task.
529     // If not, we have to pass our capability to the right task.
530     {
531         Task *bound = t->bound;
532       
533         if (bound) {
534             if (bound == task) {
535                 debugTrace(DEBUG_sched,
536                            "### Running thread %lu in bound thread", (unsigned long)t->id);
537                 // yes, the Haskell thread is bound to the current native thread
538             } else {
539                 debugTrace(DEBUG_sched,
540                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
541                 // no, bound to a different Haskell thread: pass to that thread
542                 pushOnRunQueue(cap,t);
543                 continue;
544             }
545         } else {
546             // The thread we want to run is unbound.
547             if (task->tso) { 
548                 debugTrace(DEBUG_sched,
549                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550                 // no, the current native thread is bound to a different
551                 // Haskell thread, so pass it to any worker thread
552                 pushOnRunQueue(cap,t);
553                 continue; 
554             }
555         }
556     }
557 #endif
558
559     cap->r.rCurrentTSO = t;
560     
561     /* context switches are initiated by the timer signal, unless
562      * the user specified "context switch as often as possible", with
563      * +RTS -C0
564      */
565     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566         && !emptyThreadQueues(cap)) {
567         context_switch = 1;
568     }
569          
570 run_thread:
571
572     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
573                               (long)t->id, whatNext_strs[t->what_next]);
574
575 #if defined(PROFILING)
576     startHeapProfTimer();
577 #endif
578
579     // Check for exceptions blocked on this thread
580     maybePerformBlockedException (cap, t);
581
582     // ----------------------------------------------------------------------
583     // Run the current thread 
584
585     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586     ASSERT(t->cap == cap);
587
588     prev_what_next = t->what_next;
589
590     errno = t->saved_errno;
591     cap->in_haskell = rtsTrue;
592
593     dirtyTSO(t);
594
595     recent_activity = ACTIVITY_YES;
596
597     switch (prev_what_next) {
598         
599     case ThreadKilled:
600     case ThreadComplete:
601         /* Thread already finished, return to scheduler. */
602         ret = ThreadFinished;
603         break;
604         
605     case ThreadRunGHC:
606     {
607         StgRegTable *r;
608         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
609         cap = regTableToCapability(r);
610         ret = r->rRet;
611         break;
612     }
613     
614     case ThreadInterpret:
615         cap = interpretBCO(cap);
616         ret = cap->r.rRet;
617         break;
618         
619     default:
620         barf("schedule: invalid what_next field");
621     }
622
623     cap->in_haskell = rtsFalse;
624
625     // The TSO might have moved, eg. if it re-entered the RTS and a GC
626     // happened.  So find the new location:
627     t = cap->r.rCurrentTSO;
628
629     // We have run some Haskell code: there might be blackhole-blocked
630     // threads to wake up now.
631     // Lock-free test here should be ok, we're just setting a flag.
632     if ( blackhole_queue != END_TSO_QUEUE ) {
633         blackholes_need_checking = rtsTrue;
634     }
635     
636     // And save the current errno in this thread.
637     // XXX: possibly bogus for SMP because this thread might already
638     // be running again, see code below.
639     t->saved_errno = errno;
640
641 #if defined(THREADED_RTS)
642     // If ret is ThreadBlocked, and this Task is bound to the TSO that
643     // blocked, we are in limbo - the TSO is now owned by whatever it
644     // is blocked on, and may in fact already have been woken up,
645     // perhaps even on a different Capability.  It may be the case
646     // that task->cap != cap.  We better yield this Capability
647     // immediately and return to normaility.
648     if (ret == ThreadBlocked) {
649         debugTrace(DEBUG_sched,
650                    "--<< thread %lu (%s) stopped: blocked",
651                    (unsigned long)t->id, whatNext_strs[t->what_next]);
652         continue;
653     }
654 #endif
655
656     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
657     ASSERT(t->cap == cap);
658
659     // ----------------------------------------------------------------------
660     
661     // Costs for the scheduler are assigned to CCS_SYSTEM
662 #if defined(PROFILING)
663     stopHeapProfTimer();
664     CCCS = CCS_SYSTEM;
665 #endif
666     
667     schedulePostRunThread();
668
669     ready_to_gc = rtsFalse;
670
671     switch (ret) {
672     case HeapOverflow:
673         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
674         break;
675
676     case StackOverflow:
677         scheduleHandleStackOverflow(cap,task,t);
678         break;
679
680     case ThreadYielding:
681         if (scheduleHandleYield(cap, t, prev_what_next)) {
682             // shortcut for switching between compiler/interpreter:
683             goto run_thread; 
684         }
685         break;
686
687     case ThreadBlocked:
688         scheduleHandleThreadBlocked(t);
689         break;
690
691     case ThreadFinished:
692         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
693         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
694         break;
695
696     default:
697       barf("schedule: invalid thread return code %d", (int)ret);
698     }
699
700     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
701     if (ready_to_gc) {
702       cap = scheduleDoGC(cap,task,rtsFalse);
703     }
704   } /* end of while() */
705
706   debugTrace(PAR_DEBUG_verbose,
707              "== Leaving schedule() after having received Finish");
708 }
709
710 /* ----------------------------------------------------------------------------
711  * Setting up the scheduler loop
712  * ------------------------------------------------------------------------- */
713
714 static void
715 schedulePreLoop(void)
716 {
717 #if defined(GRAN) 
718     /* set up first event to get things going */
719     /* ToDo: assign costs for system setup and init MainTSO ! */
720     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
721               ContinueThread, 
722               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
723     
724     debugTrace (DEBUG_gran,
725                 "GRAN: Init CurrentTSO (in schedule) = %p", 
726                 CurrentTSO);
727     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
728     
729     if (RtsFlags.GranFlags.Light) {
730         /* Save current time; GranSim Light only */
731         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
732     }      
733 #endif
734 }
735
736 /* -----------------------------------------------------------------------------
737  * schedulePushWork()
738  *
739  * Push work to other Capabilities if we have some.
740  * -------------------------------------------------------------------------- */
741
742 #if defined(THREADED_RTS)
743 static void
744 schedulePushWork(Capability *cap USED_IF_THREADS, 
745                  Task *task      USED_IF_THREADS)
746 {
747     Capability *free_caps[n_capabilities], *cap0;
748     nat i, n_free_caps;
749
750     // migration can be turned off with +RTS -qg
751     if (!RtsFlags.ParFlags.migrate) return;
752
753     // Check whether we have more threads on our run queue, or sparks
754     // in our pool, that we could hand to another Capability.
755     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
756         && sparkPoolSizeCap(cap) < 2) {
757         return;
758     }
759
760     // First grab as many free Capabilities as we can.
761     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
762         cap0 = &capabilities[i];
763         if (cap != cap0 && tryGrabCapability(cap0,task)) {
764             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
765                 // it already has some work, we just grabbed it at 
766                 // the wrong moment.  Or maybe it's deadlocked!
767                 releaseCapability(cap0);
768             } else {
769                 free_caps[n_free_caps++] = cap0;
770             }
771         }
772     }
773
774     // we now have n_free_caps free capabilities stashed in
775     // free_caps[].  Share our run queue equally with them.  This is
776     // probably the simplest thing we could do; improvements we might
777     // want to do include:
778     //
779     //   - giving high priority to moving relatively new threads, on 
780     //     the gournds that they haven't had time to build up a
781     //     working set in the cache on this CPU/Capability.
782     //
783     //   - giving low priority to moving long-lived threads
784
785     if (n_free_caps > 0) {
786         StgTSO *prev, *t, *next;
787         rtsBool pushed_to_all;
788
789         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
790
791         i = 0;
792         pushed_to_all = rtsFalse;
793
794         if (cap->run_queue_hd != END_TSO_QUEUE) {
795             prev = cap->run_queue_hd;
796             t = prev->link;
797             prev->link = END_TSO_QUEUE;
798             for (; t != END_TSO_QUEUE; t = next) {
799                 next = t->link;
800                 t->link = END_TSO_QUEUE;
801                 if (t->what_next == ThreadRelocated
802                     || t->bound == task // don't move my bound thread
803                     || tsoLocked(t)) {  // don't move a locked thread
804                     prev->link = t;
805                     prev = t;
806                 } else if (i == n_free_caps) {
807                     pushed_to_all = rtsTrue;
808                     i = 0;
809                     // keep one for us
810                     prev->link = t;
811                     prev = t;
812                 } else {
813                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
814                     appendToRunQueue(free_caps[i],t);
815                     if (t->bound) { t->bound->cap = free_caps[i]; }
816                     t->cap = free_caps[i];
817                     i++;
818                 }
819             }
820             cap->run_queue_tl = prev;
821         }
822
823         // If there are some free capabilities that we didn't push any
824         // threads to, then try to push a spark to each one.
825         if (!pushed_to_all) {
826             StgClosure *spark;
827             // i is the next free capability to push to
828             for (; i < n_free_caps; i++) {
829                 if (emptySparkPoolCap(free_caps[i])) {
830                     spark = findSpark(cap);
831                     if (spark != NULL) {
832                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
833                         newSpark(&(free_caps[i]->r), spark);
834                     }
835                 }
836             }
837         }
838
839         // release the capabilities
840         for (i = 0; i < n_free_caps; i++) {
841             task->cap = free_caps[i];
842             releaseCapability(free_caps[i]);
843         }
844     }
845     task->cap = cap; // reset to point to our Capability.
846 }
847 #endif
848
849 /* ----------------------------------------------------------------------------
850  * Start any pending signal handlers
851  * ------------------------------------------------------------------------- */
852
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854 static void
855 scheduleStartSignalHandlers(Capability *cap)
856 {
857     if (signals_pending()) { // safe outside the lock
858         startSignalHandlers(cap);
859     }
860 }
861 #else
862 static void
863 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
864 {
865 }
866 #endif
867
868 /* ----------------------------------------------------------------------------
869  * Check for blocked threads that can be woken up.
870  * ------------------------------------------------------------------------- */
871
872 static void
873 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
874 {
875 #if !defined(THREADED_RTS)
876     //
877     // Check whether any waiting threads need to be woken up.  If the
878     // run queue is empty, and there are no other tasks running, we
879     // can wait indefinitely for something to happen.
880     //
881     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
882     {
883         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
884     }
885 #endif
886 }
887
888
889 /* ----------------------------------------------------------------------------
890  * Check for threads woken up by other Capabilities
891  * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
895 {
896 #if defined(THREADED_RTS)
897     // Any threads that were woken up by other Capabilities get
898     // appended to our run queue.
899     if (!emptyWakeupQueue(cap)) {
900         ACQUIRE_LOCK(&cap->lock);
901         if (emptyRunQueue(cap)) {
902             cap->run_queue_hd = cap->wakeup_queue_hd;
903             cap->run_queue_tl = cap->wakeup_queue_tl;
904         } else {
905             cap->run_queue_tl->link = cap->wakeup_queue_hd;
906             cap->run_queue_tl = cap->wakeup_queue_tl;
907         }
908         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
909         RELEASE_LOCK(&cap->lock);
910     }
911 #endif
912 }
913
914 /* ----------------------------------------------------------------------------
915  * Check for threads blocked on BLACKHOLEs that can be woken up
916  * ------------------------------------------------------------------------- */
917 static void
918 scheduleCheckBlackHoles (Capability *cap)
919 {
920     if ( blackholes_need_checking ) // check without the lock first
921     {
922         ACQUIRE_LOCK(&sched_mutex);
923         if ( blackholes_need_checking ) {
924             checkBlackHoles(cap);
925             blackholes_need_checking = rtsFalse;
926         }
927         RELEASE_LOCK(&sched_mutex);
928     }
929 }
930
931 /* ----------------------------------------------------------------------------
932  * Detect deadlock conditions and attempt to resolve them.
933  * ------------------------------------------------------------------------- */
934
935 static void
936 scheduleDetectDeadlock (Capability *cap, Task *task)
937 {
938
939 #if defined(PARALLEL_HASKELL)
940     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
941     return;
942 #endif
943
944     /* 
945      * Detect deadlock: when we have no threads to run, there are no
946      * threads blocked, waiting for I/O, or sleeping, and all the
947      * other tasks are waiting for work, we must have a deadlock of
948      * some description.
949      */
950     if ( emptyThreadQueues(cap) )
951     {
952 #if defined(THREADED_RTS)
953         /* 
954          * In the threaded RTS, we only check for deadlock if there
955          * has been no activity in a complete timeslice.  This means
956          * we won't eagerly start a full GC just because we don't have
957          * any threads to run currently.
958          */
959         if (recent_activity != ACTIVITY_INACTIVE) return;
960 #endif
961
962         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
963
964         // Garbage collection can release some new threads due to
965         // either (a) finalizers or (b) threads resurrected because
966         // they are unreachable and will therefore be sent an
967         // exception.  Any threads thus released will be immediately
968         // runnable.
969         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
970
971         recent_activity = ACTIVITY_DONE_GC;
972         
973         if ( !emptyRunQueue(cap) ) return;
974
975 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
976         /* If we have user-installed signal handlers, then wait
977          * for signals to arrive rather then bombing out with a
978          * deadlock.
979          */
980         if ( anyUserHandlers() ) {
981             debugTrace(DEBUG_sched,
982                        "still deadlocked, waiting for signals...");
983
984             awaitUserSignals();
985
986             if (signals_pending()) {
987                 startSignalHandlers(cap);
988             }
989
990             // either we have threads to run, or we were interrupted:
991             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
992         }
993 #endif
994
995 #if !defined(THREADED_RTS)
996         /* Probably a real deadlock.  Send the current main thread the
997          * Deadlock exception.
998          */
999         if (task->tso) {
1000             switch (task->tso->why_blocked) {
1001             case BlockedOnSTM:
1002             case BlockedOnBlackHole:
1003             case BlockedOnException:
1004             case BlockedOnMVar:
1005                 throwToSingleThreaded(cap, task->tso, 
1006                                       (StgClosure *)NonTermination_closure);
1007                 return;
1008             default:
1009                 barf("deadlock: main thread blocked in a strange way");
1010             }
1011         }
1012         return;
1013 #endif
1014     }
1015 }
1016
1017 /* ----------------------------------------------------------------------------
1018  * Process an event (GRAN only)
1019  * ------------------------------------------------------------------------- */
1020
1021 #if defined(GRAN)
1022 static StgTSO *
1023 scheduleProcessEvent(rtsEvent *event)
1024 {
1025     StgTSO *t;
1026
1027     if (RtsFlags.GranFlags.Light)
1028       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1029
1030     /* adjust time based on time-stamp */
1031     if (event->time > CurrentTime[CurrentProc] &&
1032         event->evttype != ContinueThread)
1033       CurrentTime[CurrentProc] = event->time;
1034     
1035     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1036     if (!RtsFlags.GranFlags.Light)
1037       handleIdlePEs();
1038
1039     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1040
1041     /* main event dispatcher in GranSim */
1042     switch (event->evttype) {
1043       /* Should just be continuing execution */
1044     case ContinueThread:
1045       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1046       /* ToDo: check assertion
1047       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1048              run_queue_hd != END_TSO_QUEUE);
1049       */
1050       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1051       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1052           procStatus[CurrentProc]==Fetching) {
1053         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1054               CurrentTSO->id, CurrentTSO, CurrentProc);
1055         goto next_thread;
1056       } 
1057       /* Ignore ContinueThreads for completed threads */
1058       if (CurrentTSO->what_next == ThreadComplete) {
1059         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1060               CurrentTSO->id, CurrentTSO, CurrentProc);
1061         goto next_thread;
1062       } 
1063       /* Ignore ContinueThreads for threads that are being migrated */
1064       if (PROCS(CurrentTSO)==Nowhere) { 
1065         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1066               CurrentTSO->id, CurrentTSO, CurrentProc);
1067         goto next_thread;
1068       }
1069       /* The thread should be at the beginning of the run queue */
1070       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1071         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1072               CurrentTSO->id, CurrentTSO, CurrentProc);
1073         break; // run the thread anyway
1074       }
1075       /*
1076       new_event(proc, proc, CurrentTime[proc],
1077                 FindWork,
1078                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1079       goto next_thread; 
1080       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1081       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1082
1083     case FetchNode:
1084       do_the_fetchnode(event);
1085       goto next_thread;             /* handle next event in event queue  */
1086       
1087     case GlobalBlock:
1088       do_the_globalblock(event);
1089       goto next_thread;             /* handle next event in event queue  */
1090       
1091     case FetchReply:
1092       do_the_fetchreply(event);
1093       goto next_thread;             /* handle next event in event queue  */
1094       
1095     case UnblockThread:   /* Move from the blocked queue to the tail of */
1096       do_the_unblock(event);
1097       goto next_thread;             /* handle next event in event queue  */
1098       
1099     case ResumeThread:  /* Move from the blocked queue to the tail of */
1100       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1101       event->tso->gran.blocktime += 
1102         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1103       do_the_startthread(event);
1104       goto next_thread;             /* handle next event in event queue  */
1105       
1106     case StartThread:
1107       do_the_startthread(event);
1108       goto next_thread;             /* handle next event in event queue  */
1109       
1110     case MoveThread:
1111       do_the_movethread(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case MoveSpark:
1115       do_the_movespark(event);
1116       goto next_thread;             /* handle next event in event queue  */
1117       
1118     case FindWork:
1119       do_the_findwork(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     default:
1123       barf("Illegal event type %u\n", event->evttype);
1124     }  /* switch */
1125     
1126     /* This point was scheduler_loop in the old RTS */
1127
1128     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1129
1130     TimeOfLastEvent = CurrentTime[CurrentProc];
1131     TimeOfNextEvent = get_time_of_next_event();
1132     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1133     // CurrentTSO = ThreadQueueHd;
1134
1135     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1136                          TimeOfNextEvent));
1137
1138     if (RtsFlags.GranFlags.Light) 
1139       GranSimLight_leave_system(event, &ActiveTSO); 
1140
1141     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1142
1143     IF_DEBUG(gran, 
1144              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1145
1146     /* in a GranSim setup the TSO stays on the run queue */
1147     t = CurrentTSO;
1148     /* Take a thread from the run queue. */
1149     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1150
1151     IF_DEBUG(gran, 
1152              debugBelch("GRAN: About to run current thread, which is\n");
1153              G_TSO(t,5));
1154
1155     context_switch = 0; // turned on via GranYield, checking events and time slice
1156
1157     IF_DEBUG(gran, 
1158              DumpGranEvent(GR_SCHEDULE, t));
1159
1160     procStatus[CurrentProc] = Busy;
1161 }
1162 #endif // GRAN
1163
1164 /* ----------------------------------------------------------------------------
1165  * Send pending messages (PARALLEL_HASKELL only)
1166  * ------------------------------------------------------------------------- */
1167
1168 #if defined(PARALLEL_HASKELL)
1169 static StgTSO *
1170 scheduleSendPendingMessages(void)
1171 {
1172     StgSparkPool *pool;
1173     rtsSpark spark;
1174     StgTSO *t;
1175
1176 # if defined(PAR) // global Mem.Mgmt., omit for now
1177     if (PendingFetches != END_BF_QUEUE) {
1178         processFetches();
1179     }
1180 # endif
1181     
1182     if (RtsFlags.ParFlags.BufferTime) {
1183         // if we use message buffering, we must send away all message
1184         // packets which have become too old...
1185         sendOldBuffers(); 
1186     }
1187 }
1188 #endif
1189
1190 /* ----------------------------------------------------------------------------
1191  * Activate spark threads (PARALLEL_HASKELL only)
1192  * ------------------------------------------------------------------------- */
1193
1194 #if defined(PARALLEL_HASKELL)
1195 static void
1196 scheduleActivateSpark(void)
1197 {
1198 #if defined(SPARKS)
1199   ASSERT(emptyRunQueue());
1200 /* We get here if the run queue is empty and want some work.
1201    We try to turn a spark into a thread, and add it to the run queue,
1202    from where it will be picked up in the next iteration of the scheduler
1203    loop.
1204 */
1205
1206       /* :-[  no local threads => look out for local sparks */
1207       /* the spark pool for the current PE */
1208       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1209       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1210           pool->hd < pool->tl) {
1211         /* 
1212          * ToDo: add GC code check that we really have enough heap afterwards!!
1213          * Old comment:
1214          * If we're here (no runnable threads) and we have pending
1215          * sparks, we must have a space problem.  Get enough space
1216          * to turn one of those pending sparks into a
1217          * thread... 
1218          */
1219
1220         spark = findSpark(rtsFalse);            /* get a spark */
1221         if (spark != (rtsSpark) NULL) {
1222           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1223           IF_PAR_DEBUG(fish, // schedule,
1224                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1225                              tso->id, tso, advisory_thread_count));
1226
1227           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1228             IF_PAR_DEBUG(fish, // schedule,
1229                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1230                             spark));
1231             return rtsFalse; /* failed to generate a thread */
1232           }                  /* otherwise fall through & pick-up new tso */
1233         } else {
1234           IF_PAR_DEBUG(fish, // schedule,
1235                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1236                              spark_queue_len(pool)));
1237           return rtsFalse;  /* failed to generate a thread */
1238         }
1239         return rtsTrue;  /* success in generating a thread */
1240   } else { /* no more threads permitted or pool empty */
1241     return rtsFalse;  /* failed to generateThread */
1242   }
1243 #else
1244   tso = NULL; // avoid compiler warning only
1245   return rtsFalse;  /* dummy in non-PAR setup */
1246 #endif // SPARKS
1247 }
1248 #endif // PARALLEL_HASKELL
1249
1250 /* ----------------------------------------------------------------------------
1251  * Get work from a remote node (PARALLEL_HASKELL only)
1252  * ------------------------------------------------------------------------- */
1253     
1254 #if defined(PARALLEL_HASKELL)
1255 static rtsBool
1256 scheduleGetRemoteWork(rtsBool *receivedFinish)
1257 {
1258   ASSERT(emptyRunQueue());
1259
1260   if (RtsFlags.ParFlags.BufferTime) {
1261         IF_PAR_DEBUG(verbose, 
1262                 debugBelch("...send all pending data,"));
1263         {
1264           nat i;
1265           for (i=1; i<=nPEs; i++)
1266             sendImmediately(i); // send all messages away immediately
1267         }
1268   }
1269 # ifndef SPARKS
1270         //++EDEN++ idle() , i.e. send all buffers, wait for work
1271         // suppress fishing in EDEN... just look for incoming messages
1272         // (blocking receive)
1273   IF_PAR_DEBUG(verbose, 
1274                debugBelch("...wait for incoming messages...\n"));
1275   *receivedFinish = processMessages(); // blocking receive...
1276
1277         // and reenter scheduling loop after having received something
1278         // (return rtsFalse below)
1279
1280 # else /* activate SPARKS machinery */
1281 /* We get here, if we have no work, tried to activate a local spark, but still
1282    have no work. We try to get a remote spark, by sending a FISH message.
1283    Thread migration should be added here, and triggered when a sequence of 
1284    fishes returns without work. */
1285         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1286
1287       /* =8-[  no local sparks => look for work on other PEs */
1288         /*
1289          * We really have absolutely no work.  Send out a fish
1290          * (there may be some out there already), and wait for
1291          * something to arrive.  We clearly can't run any threads
1292          * until a SCHEDULE or RESUME arrives, and so that's what
1293          * we're hoping to see.  (Of course, we still have to
1294          * respond to other types of messages.)
1295          */
1296         rtsTime now = msTime() /*CURRENT_TIME*/;
1297         IF_PAR_DEBUG(verbose, 
1298                      debugBelch("--  now=%ld\n", now));
1299         IF_PAR_DEBUG(fish, // verbose,
1300              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1301                  (last_fish_arrived_at!=0 &&
1302                   last_fish_arrived_at+delay > now)) {
1303                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1304                      now, last_fish_arrived_at+delay, 
1305                      last_fish_arrived_at,
1306                      delay);
1307              });
1308   
1309         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1310             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1311           if (last_fish_arrived_at==0 ||
1312               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1313             /* outstandingFishes is set in sendFish, processFish;
1314                avoid flooding system with fishes via delay */
1315     next_fish_to_send_at = 0;  
1316   } else {
1317     /* ToDo: this should be done in the main scheduling loop to avoid the
1318              busy wait here; not so bad if fish delay is very small  */
1319     int iq = 0; // DEBUGGING -- HWL
1320     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1321     /* send a fish when ready, but process messages that arrive in the meantime */
1322     do {
1323       if (PacketsWaiting()) {
1324         iq++; // DEBUGGING
1325         *receivedFinish = processMessages();
1326       }
1327       now = msTime();
1328     } while (!*receivedFinish || now<next_fish_to_send_at);
1329     // JB: This means the fish could become obsolete, if we receive
1330     // work. Better check for work again? 
1331     // last line: while (!receivedFinish || !haveWork || now<...)
1332     // next line: if (receivedFinish || haveWork )
1333
1334     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1335       return rtsFalse;  // NB: this will leave scheduler loop
1336                         // immediately after return!
1337                           
1338     IF_PAR_DEBUG(fish, // verbose,
1339                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1340
1341   }
1342
1343     // JB: IMHO, this should all be hidden inside sendFish(...)
1344     /* pe = choosePE(); 
1345        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1346                 NEW_FISH_HUNGER);
1347
1348     // Global statistics: count no. of fishes
1349     if (RtsFlags.ParFlags.ParStats.Global &&
1350          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1351            globalParStats.tot_fish_mess++;
1352            }
1353     */ 
1354
1355   /* delayed fishes must have been sent by now! */
1356   next_fish_to_send_at = 0;  
1357   }
1358       
1359   *receivedFinish = processMessages();
1360 # endif /* SPARKS */
1361
1362  return rtsFalse;
1363  /* NB: this function always returns rtsFalse, meaning the scheduler
1364     loop continues with the next iteration; 
1365     rationale: 
1366       return code means success in finding work; we enter this function
1367       if there is no local work, thus have to send a fish which takes
1368       time until it arrives with work; in the meantime we should process
1369       messages in the main loop;
1370  */
1371 }
1372 #endif // PARALLEL_HASKELL
1373
1374 /* ----------------------------------------------------------------------------
1375  * PAR/GRAN: Report stats & debugging info(?)
1376  * ------------------------------------------------------------------------- */
1377
1378 #if defined(PAR) || defined(GRAN)
1379 static void
1380 scheduleGranParReport(void)
1381 {
1382   ASSERT(run_queue_hd != END_TSO_QUEUE);
1383
1384   /* Take a thread from the run queue, if we have work */
1385   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1386
1387     /* If this TSO has got its outport closed in the meantime, 
1388      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1389      * It has to be marked as TH_DEAD for this purpose.
1390      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1391
1392 JB: TODO: investigate wether state change field could be nuked
1393      entirely and replaced by the normal tso state (whatnext
1394      field). All we want to do is to kill tsos from outside.
1395      */
1396
1397     /* ToDo: write something to the log-file
1398     if (RTSflags.ParFlags.granSimStats && !sameThread)
1399         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1400
1401     CurrentTSO = t;
1402     */
1403     /* the spark pool for the current PE */
1404     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1405
1406     IF_DEBUG(scheduler, 
1407              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1408                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1409
1410     IF_PAR_DEBUG(fish,
1411              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1412                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1413
1414     if (RtsFlags.ParFlags.ParStats.Full && 
1415         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1416         (emitSchedule || // forced emit
1417          (t && LastTSO && t->id != LastTSO->id))) {
1418       /* 
1419          we are running a different TSO, so write a schedule event to log file
1420          NB: If we use fair scheduling we also have to write  a deschedule 
1421              event for LastTSO; with unfair scheduling we know that the
1422              previous tso has blocked whenever we switch to another tso, so
1423              we don't need it in GUM for now
1424       */
1425       IF_PAR_DEBUG(fish, // schedule,
1426                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1427
1428       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1429                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1430       emitSchedule = rtsFalse;
1431     }
1432 }     
1433 #endif
1434
1435 /* ----------------------------------------------------------------------------
1436  * After running a thread...
1437  * ------------------------------------------------------------------------- */
1438
1439 static void
1440 schedulePostRunThread(void)
1441 {
1442 #if defined(PAR)
1443     /* HACK 675: if the last thread didn't yield, make sure to print a 
1444        SCHEDULE event to the log file when StgRunning the next thread, even
1445        if it is the same one as before */
1446     LastTSO = t; 
1447     TimeOfLastYield = CURRENT_TIME;
1448 #endif
1449
1450   /* some statistics gathering in the parallel case */
1451
1452 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1453   switch (ret) {
1454     case HeapOverflow:
1455 # if defined(GRAN)
1456       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1457       globalGranStats.tot_heapover++;
1458 # elif defined(PAR)
1459       globalParStats.tot_heapover++;
1460 # endif
1461       break;
1462
1463      case StackOverflow:
1464 # if defined(GRAN)
1465       IF_DEBUG(gran, 
1466                DumpGranEvent(GR_DESCHEDULE, t));
1467       globalGranStats.tot_stackover++;
1468 # elif defined(PAR)
1469       // IF_DEBUG(par, 
1470       // DumpGranEvent(GR_DESCHEDULE, t);
1471       globalParStats.tot_stackover++;
1472 # endif
1473       break;
1474
1475     case ThreadYielding:
1476 # if defined(GRAN)
1477       IF_DEBUG(gran, 
1478                DumpGranEvent(GR_DESCHEDULE, t));
1479       globalGranStats.tot_yields++;
1480 # elif defined(PAR)
1481       // IF_DEBUG(par, 
1482       // DumpGranEvent(GR_DESCHEDULE, t);
1483       globalParStats.tot_yields++;
1484 # endif
1485       break; 
1486
1487     case ThreadBlocked:
1488 # if defined(GRAN)
1489         debugTrace(DEBUG_sched, 
1490                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1491                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1492                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1493                if (t->block_info.closure!=(StgClosure*)NULL)
1494                  print_bq(t->block_info.closure);
1495                debugBelch("\n"));
1496
1497       // ??? needed; should emit block before
1498       IF_DEBUG(gran, 
1499                DumpGranEvent(GR_DESCHEDULE, t)); 
1500       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1501       /*
1502         ngoq Dogh!
1503       ASSERT(procStatus[CurrentProc]==Busy || 
1504               ((procStatus[CurrentProc]==Fetching) && 
1505               (t->block_info.closure!=(StgClosure*)NULL)));
1506       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1507           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1508             procStatus[CurrentProc]==Fetching)) 
1509         procStatus[CurrentProc] = Idle;
1510       */
1511 # elif defined(PAR)
1512 //++PAR++  blockThread() writes the event (change?)
1513 # endif
1514     break;
1515
1516   case ThreadFinished:
1517     break;
1518
1519   default:
1520     barf("parGlobalStats: unknown return code");
1521     break;
1522     }
1523 #endif
1524 }
1525
1526 /* -----------------------------------------------------------------------------
1527  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1528  * -------------------------------------------------------------------------- */
1529
1530 static rtsBool
1531 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1532 {
1533     // did the task ask for a large block?
1534     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1535         // if so, get one and push it on the front of the nursery.
1536         bdescr *bd;
1537         lnat blocks;
1538         
1539         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1540         
1541         debugTrace(DEBUG_sched,
1542                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1543                    (long)t->id, whatNext_strs[t->what_next], blocks);
1544     
1545         // don't do this if the nursery is (nearly) full, we'll GC first.
1546         if (cap->r.rCurrentNursery->link != NULL ||
1547             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1548                                                // if the nursery has only one block.
1549             
1550             ACQUIRE_SM_LOCK
1551             bd = allocGroup( blocks );
1552             RELEASE_SM_LOCK
1553             cap->r.rNursery->n_blocks += blocks;
1554             
1555             // link the new group into the list
1556             bd->link = cap->r.rCurrentNursery;
1557             bd->u.back = cap->r.rCurrentNursery->u.back;
1558             if (cap->r.rCurrentNursery->u.back != NULL) {
1559                 cap->r.rCurrentNursery->u.back->link = bd;
1560             } else {
1561 #if !defined(THREADED_RTS)
1562                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1563                        g0s0 == cap->r.rNursery);
1564 #endif
1565                 cap->r.rNursery->blocks = bd;
1566             }             
1567             cap->r.rCurrentNursery->u.back = bd;
1568             
1569             // initialise it as a nursery block.  We initialise the
1570             // step, gen_no, and flags field of *every* sub-block in
1571             // this large block, because this is easier than making
1572             // sure that we always find the block head of a large
1573             // block whenever we call Bdescr() (eg. evacuate() and
1574             // isAlive() in the GC would both have to do this, at
1575             // least).
1576             { 
1577                 bdescr *x;
1578                 for (x = bd; x < bd + blocks; x++) {
1579                     x->step = cap->r.rNursery;
1580                     x->gen_no = 0;
1581                     x->flags = 0;
1582                 }
1583             }
1584             
1585             // This assert can be a killer if the app is doing lots
1586             // of large block allocations.
1587             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1588             
1589             // now update the nursery to point to the new block
1590             cap->r.rCurrentNursery = bd;
1591             
1592             // we might be unlucky and have another thread get on the
1593             // run queue before us and steal the large block, but in that
1594             // case the thread will just end up requesting another large
1595             // block.
1596             pushOnRunQueue(cap,t);
1597             return rtsFalse;  /* not actually GC'ing */
1598         }
1599     }
1600     
1601     debugTrace(DEBUG_sched,
1602                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1603                (long)t->id, whatNext_strs[t->what_next]);
1604
1605 #if defined(GRAN)
1606     ASSERT(!is_on_queue(t,CurrentProc));
1607 #elif defined(PARALLEL_HASKELL)
1608     /* Currently we emit a DESCHEDULE event before GC in GUM.
1609        ToDo: either add separate event to distinguish SYSTEM time from rest
1610        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1611     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1612         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1613                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1614         emitSchedule = rtsTrue;
1615     }
1616 #endif
1617       
1618     pushOnRunQueue(cap,t);
1619     return rtsTrue;
1620     /* actual GC is done at the end of the while loop in schedule() */
1621 }
1622
1623 /* -----------------------------------------------------------------------------
1624  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1625  * -------------------------------------------------------------------------- */
1626
1627 static void
1628 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1629 {
1630     debugTrace (DEBUG_sched,
1631                 "--<< thread %ld (%s) stopped, StackOverflow", 
1632                 (long)t->id, whatNext_strs[t->what_next]);
1633
1634     /* just adjust the stack for this thread, then pop it back
1635      * on the run queue.
1636      */
1637     { 
1638         /* enlarge the stack */
1639         StgTSO *new_t = threadStackOverflow(cap, t);
1640         
1641         /* The TSO attached to this Task may have moved, so update the
1642          * pointer to it.
1643          */
1644         if (task->tso == t) {
1645             task->tso = new_t;
1646         }
1647         pushOnRunQueue(cap,new_t);
1648     }
1649 }
1650
1651 /* -----------------------------------------------------------------------------
1652  * Handle a thread that returned to the scheduler with ThreadYielding
1653  * -------------------------------------------------------------------------- */
1654
1655 static rtsBool
1656 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1657 {
1658     // Reset the context switch flag.  We don't do this just before
1659     // running the thread, because that would mean we would lose ticks
1660     // during GC, which can lead to unfair scheduling (a thread hogs
1661     // the CPU because the tick always arrives during GC).  This way
1662     // penalises threads that do a lot of allocation, but that seems
1663     // better than the alternative.
1664     context_switch = 0;
1665     
1666     /* put the thread back on the run queue.  Then, if we're ready to
1667      * GC, check whether this is the last task to stop.  If so, wake
1668      * up the GC thread.  getThread will block during a GC until the
1669      * GC is finished.
1670      */
1671 #ifdef DEBUG
1672     if (t->what_next != prev_what_next) {
1673         debugTrace(DEBUG_sched,
1674                    "--<< thread %ld (%s) stopped to switch evaluators", 
1675                    (long)t->id, whatNext_strs[t->what_next]);
1676     } else {
1677         debugTrace(DEBUG_sched,
1678                    "--<< thread %ld (%s) stopped, yielding",
1679                    (long)t->id, whatNext_strs[t->what_next]);
1680     }
1681 #endif
1682     
1683     IF_DEBUG(sanity,
1684              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1685              checkTSO(t));
1686     ASSERT(t->link == END_TSO_QUEUE);
1687     
1688     // Shortcut if we're just switching evaluators: don't bother
1689     // doing stack squeezing (which can be expensive), just run the
1690     // thread.
1691     if (t->what_next != prev_what_next) {
1692         return rtsTrue;
1693     }
1694     
1695 #if defined(GRAN)
1696     ASSERT(!is_on_queue(t,CurrentProc));
1697       
1698     IF_DEBUG(sanity,
1699              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1700              checkThreadQsSanity(rtsTrue));
1701
1702 #endif
1703
1704     addToRunQueue(cap,t);
1705
1706 #if defined(GRAN)
1707     /* add a ContinueThread event to actually process the thread */
1708     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1709               ContinueThread,
1710               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1711     IF_GRAN_DEBUG(bq, 
1712                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1713                   G_EVENTQ(0);
1714                   G_CURR_THREADQ(0));
1715 #endif
1716     return rtsFalse;
1717 }
1718
1719 /* -----------------------------------------------------------------------------
1720  * Handle a thread that returned to the scheduler with ThreadBlocked
1721  * -------------------------------------------------------------------------- */
1722
1723 static void
1724 scheduleHandleThreadBlocked( StgTSO *t
1725 #if !defined(GRAN) && !defined(DEBUG)
1726     STG_UNUSED
1727 #endif
1728     )
1729 {
1730 #if defined(GRAN)
1731     IF_DEBUG(scheduler,
1732              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1733                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1734              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1735     
1736     // ??? needed; should emit block before
1737     IF_DEBUG(gran, 
1738              DumpGranEvent(GR_DESCHEDULE, t)); 
1739     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1740     /*
1741       ngoq Dogh!
1742       ASSERT(procStatus[CurrentProc]==Busy || 
1743       ((procStatus[CurrentProc]==Fetching) && 
1744       (t->block_info.closure!=(StgClosure*)NULL)));
1745       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1746       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1747       procStatus[CurrentProc]==Fetching)) 
1748       procStatus[CurrentProc] = Idle;
1749     */
1750 #elif defined(PAR)
1751     IF_DEBUG(scheduler,
1752              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1753                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1754     IF_PAR_DEBUG(bq,
1755                  
1756                  if (t->block_info.closure!=(StgClosure*)NULL) 
1757                  print_bq(t->block_info.closure));
1758     
1759     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1760     blockThread(t);
1761     
1762     /* whatever we schedule next, we must log that schedule */
1763     emitSchedule = rtsTrue;
1764     
1765 #else /* !GRAN */
1766
1767       // We don't need to do anything.  The thread is blocked, and it
1768       // has tidied up its stack and placed itself on whatever queue
1769       // it needs to be on.
1770
1771 #if !defined(THREADED_RTS)
1772     ASSERT(t->why_blocked != NotBlocked);
1773              // This might not be true under THREADED_RTS: we don't have
1774              // exclusive access to this TSO, so someone might have
1775              // woken it up by now.  This actually happens: try
1776              // conc023 +RTS -N2.
1777 #endif
1778
1779 #ifdef DEBUG
1780     if (traceClass(DEBUG_sched)) {
1781         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1782                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1783         printThreadBlockage(t);
1784         debugTraceEnd();
1785     }
1786 #endif
1787     
1788     /* Only for dumping event to log file 
1789        ToDo: do I need this in GranSim, too?
1790        blockThread(t);
1791     */
1792 #endif
1793 }
1794
1795 /* -----------------------------------------------------------------------------
1796  * Handle a thread that returned to the scheduler with ThreadFinished
1797  * -------------------------------------------------------------------------- */
1798
1799 static rtsBool
1800 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1801 {
1802     /* Need to check whether this was a main thread, and if so,
1803      * return with the return value.
1804      *
1805      * We also end up here if the thread kills itself with an
1806      * uncaught exception, see Exception.cmm.
1807      */
1808     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1809                (unsigned long)t->id, whatNext_strs[t->what_next]);
1810
1811 #if defined(GRAN)
1812       endThread(t, CurrentProc); // clean-up the thread
1813 #elif defined(PARALLEL_HASKELL)
1814       /* For now all are advisory -- HWL */
1815       //if(t->priority==AdvisoryPriority) ??
1816       advisory_thread_count--; // JB: Caution with this counter, buggy!
1817       
1818 # if defined(DIST)
1819       if(t->dist.priority==RevalPriority)
1820         FinishReval(t);
1821 # endif
1822     
1823 # if defined(EDENOLD)
1824       // the thread could still have an outport... (BUG)
1825       if (t->eden.outport != -1) {
1826       // delete the outport for the tso which has finished...
1827         IF_PAR_DEBUG(eden_ports,
1828                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1829                               t->eden.outport, t->id));
1830         deleteOPT(t);
1831       }
1832       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1833       if (t->eden.epid != -1) {
1834         IF_PAR_DEBUG(eden_ports,
1835                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1836                            t->id, t->eden.epid));
1837         removeTSOfromProcess(t);
1838       }
1839 # endif 
1840
1841 # if defined(PAR)
1842       if (RtsFlags.ParFlags.ParStats.Full &&
1843           !RtsFlags.ParFlags.ParStats.Suppressed) 
1844         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1845
1846       //  t->par only contains statistics: left out for now...
1847       IF_PAR_DEBUG(fish,
1848                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1849                               t->id,t,t->par.sparkname));
1850 # endif
1851 #endif // PARALLEL_HASKELL
1852
1853       //
1854       // Check whether the thread that just completed was a bound
1855       // thread, and if so return with the result.  
1856       //
1857       // There is an assumption here that all thread completion goes
1858       // through this point; we need to make sure that if a thread
1859       // ends up in the ThreadKilled state, that it stays on the run
1860       // queue so it can be dealt with here.
1861       //
1862
1863       if (t->bound) {
1864
1865           if (t->bound != task) {
1866 #if !defined(THREADED_RTS)
1867               // Must be a bound thread that is not the topmost one.  Leave
1868               // it on the run queue until the stack has unwound to the
1869               // point where we can deal with this.  Leaving it on the run
1870               // queue also ensures that the garbage collector knows about
1871               // this thread and its return value (it gets dropped from the
1872               // all_threads list so there's no other way to find it).
1873               appendToRunQueue(cap,t);
1874               return rtsFalse;
1875 #else
1876               // this cannot happen in the threaded RTS, because a
1877               // bound thread can only be run by the appropriate Task.
1878               barf("finished bound thread that isn't mine");
1879 #endif
1880           }
1881
1882           ASSERT(task->tso == t);
1883
1884           if (t->what_next == ThreadComplete) {
1885               if (task->ret) {
1886                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1887                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1888               }
1889               task->stat = Success;
1890           } else {
1891               if (task->ret) {
1892                   *(task->ret) = NULL;
1893               }
1894               if (sched_state >= SCHED_INTERRUPTING) {
1895                   task->stat = Interrupted;
1896               } else {
1897                   task->stat = Killed;
1898               }
1899           }
1900 #ifdef DEBUG
1901           removeThreadLabel((StgWord)task->tso->id);
1902 #endif
1903           return rtsTrue; // tells schedule() to return
1904       }
1905
1906       return rtsFalse;
1907 }
1908
1909 /* -----------------------------------------------------------------------------
1910  * Perform a heap census, if PROFILING
1911  * -------------------------------------------------------------------------- */
1912
1913 static rtsBool
1914 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1915 {
1916 #if defined(PROFILING)
1917     // When we have +RTS -i0 and we're heap profiling, do a census at
1918     // every GC.  This lets us get repeatable runs for debugging.
1919     if (performHeapProfile ||
1920         (RtsFlags.ProfFlags.profileInterval==0 &&
1921          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1922
1923         // checking black holes is necessary before GC, otherwise
1924         // there may be threads that are unreachable except by the
1925         // blackhole queue, which the GC will consider to be
1926         // deadlocked.
1927         scheduleCheckBlackHoles(&MainCapability);
1928
1929         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1930         GarbageCollect(rtsTrue);
1931
1932         debugTrace(DEBUG_sched, "performing heap census");
1933         heapCensus();
1934
1935         performHeapProfile = rtsFalse;
1936         return rtsTrue;  // true <=> we already GC'd
1937     }
1938 #endif
1939     return rtsFalse;
1940 }
1941
1942 /* -----------------------------------------------------------------------------
1943  * Perform a garbage collection if necessary
1944  * -------------------------------------------------------------------------- */
1945
1946 static Capability *
1947 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1948 {
1949     StgTSO *t;
1950 #ifdef THREADED_RTS
1951     static volatile StgWord waiting_for_gc;
1952     rtsBool was_waiting;
1953     nat i;
1954 #endif
1955
1956 #ifdef THREADED_RTS
1957     // In order to GC, there must be no threads running Haskell code.
1958     // Therefore, the GC thread needs to hold *all* the capabilities,
1959     // and release them after the GC has completed.  
1960     //
1961     // This seems to be the simplest way: previous attempts involved
1962     // making all the threads with capabilities give up their
1963     // capabilities and sleep except for the *last* one, which
1964     // actually did the GC.  But it's quite hard to arrange for all
1965     // the other tasks to sleep and stay asleep.
1966     //
1967         
1968     was_waiting = cas(&waiting_for_gc, 0, 1);
1969     if (was_waiting) {
1970         do {
1971             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1972             if (cap) yieldCapability(&cap,task);
1973         } while (waiting_for_gc);
1974         return cap;  // NOTE: task->cap might have changed here
1975     }
1976
1977     for (i=0; i < n_capabilities; i++) {
1978         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1979         if (cap != &capabilities[i]) {
1980             Capability *pcap = &capabilities[i];
1981             // we better hope this task doesn't get migrated to
1982             // another Capability while we're waiting for this one.
1983             // It won't, because load balancing happens while we have
1984             // all the Capabilities, but even so it's a slightly
1985             // unsavoury invariant.
1986             task->cap = pcap;
1987             context_switch = 1;
1988             waitForReturnCapability(&pcap, task);
1989             if (pcap != &capabilities[i]) {
1990                 barf("scheduleDoGC: got the wrong capability");
1991             }
1992         }
1993     }
1994
1995     waiting_for_gc = rtsFalse;
1996 #endif
1997
1998     /* Kick any transactions which are invalid back to their
1999      * atomically frames.  When next scheduled they will try to
2000      * commit, this commit will fail and they will retry.
2001      */
2002     { 
2003         StgTSO *next;
2004
2005         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2006             if (t->what_next == ThreadRelocated) {
2007                 next = t->link;
2008             } else {
2009                 next = t->global_link;
2010                 
2011                 // This is a good place to check for blocked
2012                 // exceptions.  It might be the case that a thread is
2013                 // blocked on delivering an exception to a thread that
2014                 // is also blocked - we try to ensure that this
2015                 // doesn't happen in throwTo(), but it's too hard (or
2016                 // impossible) to close all the race holes, so we
2017                 // accept that some might get through and deal with
2018                 // them here.  A GC will always happen at some point,
2019                 // even if the system is otherwise deadlocked.
2020                 maybePerformBlockedException (&capabilities[0], t);
2021
2022                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2023                     if (!stmValidateNestOfTransactions (t -> trec)) {
2024                         debugTrace(DEBUG_sched | DEBUG_stm,
2025                                    "trec %p found wasting its time", t);
2026                         
2027                         // strip the stack back to the
2028                         // ATOMICALLY_FRAME, aborting the (nested)
2029                         // transaction, and saving the stack of any
2030                         // partially-evaluated thunks on the heap.
2031                         throwToSingleThreaded_(&capabilities[0], t, 
2032                                                NULL, rtsTrue, NULL);
2033                         
2034 #ifdef REG_R1
2035                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2036 #endif
2037                     }
2038                 }
2039             }
2040         }
2041     }
2042     
2043     // so this happens periodically:
2044     if (cap) scheduleCheckBlackHoles(cap);
2045     
2046     IF_DEBUG(scheduler, printAllThreads());
2047
2048     /*
2049      * We now have all the capabilities; if we're in an interrupting
2050      * state, then we should take the opportunity to delete all the
2051      * threads in the system.
2052      */
2053     if (sched_state >= SCHED_INTERRUPTING) {
2054         deleteAllThreads(&capabilities[0]);
2055         sched_state = SCHED_SHUTTING_DOWN;
2056     }
2057
2058     /* everybody back, start the GC.
2059      * Could do it in this thread, or signal a condition var
2060      * to do it in another thread.  Either way, we need to
2061      * broadcast on gc_pending_cond afterward.
2062      */
2063 #if defined(THREADED_RTS)
2064     debugTrace(DEBUG_sched, "doing GC");
2065 #endif
2066     GarbageCollect(force_major);
2067     
2068 #if defined(THREADED_RTS)
2069     // release our stash of capabilities.
2070     for (i = 0; i < n_capabilities; i++) {
2071         if (cap != &capabilities[i]) {
2072             task->cap = &capabilities[i];
2073             releaseCapability(&capabilities[i]);
2074         }
2075     }
2076     if (cap) {
2077         task->cap = cap;
2078     } else {
2079         task->cap = NULL;
2080     }
2081 #endif
2082
2083 #if defined(GRAN)
2084     /* add a ContinueThread event to continue execution of current thread */
2085     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2086               ContinueThread,
2087               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2088     IF_GRAN_DEBUG(bq, 
2089                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2090                   G_EVENTQ(0);
2091                   G_CURR_THREADQ(0));
2092 #endif /* GRAN */
2093
2094     return cap;
2095 }
2096
2097 /* ---------------------------------------------------------------------------
2098  * Singleton fork(). Do not copy any running threads.
2099  * ------------------------------------------------------------------------- */
2100
2101 StgInt
2102 forkProcess(HsStablePtr *entry
2103 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2104             STG_UNUSED
2105 #endif
2106            )
2107 {
2108 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2109     Task *task;
2110     pid_t pid;
2111     StgTSO* t,*next;
2112     Capability *cap;
2113     
2114 #if defined(THREADED_RTS)
2115     if (RtsFlags.ParFlags.nNodes > 1) {
2116         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2117         stg_exit(EXIT_FAILURE);
2118     }
2119 #endif
2120
2121     debugTrace(DEBUG_sched, "forking!");
2122     
2123     // ToDo: for SMP, we should probably acquire *all* the capabilities
2124     cap = rts_lock();
2125     
2126     pid = fork();
2127     
2128     if (pid) { // parent
2129         
2130         // just return the pid
2131         rts_unlock(cap);
2132         return pid;
2133         
2134     } else { // child
2135         
2136         // Now, all OS threads except the thread that forked are
2137         // stopped.  We need to stop all Haskell threads, including
2138         // those involved in foreign calls.  Also we need to delete
2139         // all Tasks, because they correspond to OS threads that are
2140         // now gone.
2141
2142         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2143             if (t->what_next == ThreadRelocated) {
2144                 next = t->link;
2145             } else {
2146                 next = t->global_link;
2147                 // don't allow threads to catch the ThreadKilled
2148                 // exception, but we do want to raiseAsync() because these
2149                 // threads may be evaluating thunks that we need later.
2150                 deleteThread_(cap,t);
2151             }
2152         }
2153         
2154         // Empty the run queue.  It seems tempting to let all the
2155         // killed threads stay on the run queue as zombies to be
2156         // cleaned up later, but some of them correspond to bound
2157         // threads for which the corresponding Task does not exist.
2158         cap->run_queue_hd = END_TSO_QUEUE;
2159         cap->run_queue_tl = END_TSO_QUEUE;
2160
2161         // Any suspended C-calling Tasks are no more, their OS threads
2162         // don't exist now:
2163         cap->suspended_ccalling_tasks = NULL;
2164
2165         // Empty the all_threads list.  Otherwise, the garbage
2166         // collector may attempt to resurrect some of these threads.
2167         all_threads = END_TSO_QUEUE;
2168
2169         // Wipe the task list, except the current Task.
2170         ACQUIRE_LOCK(&sched_mutex);
2171         for (task = all_tasks; task != NULL; task=task->all_link) {
2172             if (task != cap->running_task) {
2173                 discardTask(task);
2174             }
2175         }
2176         RELEASE_LOCK(&sched_mutex);
2177
2178 #if defined(THREADED_RTS)
2179         // Wipe our spare workers list, they no longer exist.  New
2180         // workers will be created if necessary.
2181         cap->spare_workers = NULL;
2182         cap->returning_tasks_hd = NULL;
2183         cap->returning_tasks_tl = NULL;
2184 #endif
2185
2186         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2187         rts_checkSchedStatus("forkProcess",cap);
2188         
2189         rts_unlock(cap);
2190         hs_exit();                      // clean up and exit
2191         stg_exit(EXIT_SUCCESS);
2192     }
2193 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2194     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2195     return -1;
2196 #endif
2197 }
2198
2199 /* ---------------------------------------------------------------------------
2200  * Delete all the threads in the system
2201  * ------------------------------------------------------------------------- */
2202    
2203 static void
2204 deleteAllThreads ( Capability *cap )
2205 {
2206     // NOTE: only safe to call if we own all capabilities.
2207
2208     StgTSO* t, *next;
2209     debugTrace(DEBUG_sched,"deleting all threads");
2210     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2211         if (t->what_next == ThreadRelocated) {
2212             next = t->link;
2213         } else {
2214             next = t->global_link;
2215             deleteThread(cap,t);
2216         }
2217     }      
2218
2219     // The run queue now contains a bunch of ThreadKilled threads.  We
2220     // must not throw these away: the main thread(s) will be in there
2221     // somewhere, and the main scheduler loop has to deal with it.
2222     // Also, the run queue is the only thing keeping these threads from
2223     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2224
2225 #if !defined(THREADED_RTS)
2226     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2227     ASSERT(sleeping_queue == END_TSO_QUEUE);
2228 #endif
2229 }
2230
2231 /* -----------------------------------------------------------------------------
2232    Managing the suspended_ccalling_tasks list.
2233    Locks required: sched_mutex
2234    -------------------------------------------------------------------------- */
2235
2236 STATIC_INLINE void
2237 suspendTask (Capability *cap, Task *task)
2238 {
2239     ASSERT(task->next == NULL && task->prev == NULL);
2240     task->next = cap->suspended_ccalling_tasks;
2241     task->prev = NULL;
2242     if (cap->suspended_ccalling_tasks) {
2243         cap->suspended_ccalling_tasks->prev = task;
2244     }
2245     cap->suspended_ccalling_tasks = task;
2246 }
2247
2248 STATIC_INLINE void
2249 recoverSuspendedTask (Capability *cap, Task *task)
2250 {
2251     if (task->prev) {
2252         task->prev->next = task->next;
2253     } else {
2254         ASSERT(cap->suspended_ccalling_tasks == task);
2255         cap->suspended_ccalling_tasks = task->next;
2256     }
2257     if (task->next) {
2258         task->next->prev = task->prev;
2259     }
2260     task->next = task->prev = NULL;
2261 }
2262
2263 /* ---------------------------------------------------------------------------
2264  * Suspending & resuming Haskell threads.
2265  * 
2266  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2267  * its capability before calling the C function.  This allows another
2268  * task to pick up the capability and carry on running Haskell
2269  * threads.  It also means that if the C call blocks, it won't lock
2270  * the whole system.
2271  *
2272  * The Haskell thread making the C call is put to sleep for the
2273  * duration of the call, on the susepended_ccalling_threads queue.  We
2274  * give out a token to the task, which it can use to resume the thread
2275  * on return from the C function.
2276  * ------------------------------------------------------------------------- */
2277    
2278 void *
2279 suspendThread (StgRegTable *reg)
2280 {
2281   Capability *cap;
2282   int saved_errno = errno;
2283   StgTSO *tso;
2284   Task *task;
2285
2286   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2287    */
2288   cap = regTableToCapability(reg);
2289
2290   task = cap->running_task;
2291   tso = cap->r.rCurrentTSO;
2292
2293   debugTrace(DEBUG_sched, 
2294              "thread %lu did a safe foreign call", 
2295              (unsigned long)cap->r.rCurrentTSO->id);
2296
2297   // XXX this might not be necessary --SDM
2298   tso->what_next = ThreadRunGHC;
2299
2300   threadPaused(cap,tso);
2301
2302   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2303       tso->why_blocked = BlockedOnCCall;
2304       tso->flags |= TSO_BLOCKEX;
2305       tso->flags &= ~TSO_INTERRUPTIBLE;
2306   } else {
2307       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2308   }
2309
2310   // Hand back capability
2311   task->suspended_tso = tso;
2312
2313   ACQUIRE_LOCK(&cap->lock);
2314
2315   suspendTask(cap,task);
2316   cap->in_haskell = rtsFalse;
2317   releaseCapability_(cap);
2318   
2319   RELEASE_LOCK(&cap->lock);
2320
2321 #if defined(THREADED_RTS)
2322   /* Preparing to leave the RTS, so ensure there's a native thread/task
2323      waiting to take over.
2324   */
2325   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2326 #endif
2327
2328   errno = saved_errno;
2329   return task;
2330 }
2331
2332 StgRegTable *
2333 resumeThread (void *task_)
2334 {
2335     StgTSO *tso;
2336     Capability *cap;
2337     int saved_errno = errno;
2338     Task *task = task_;
2339
2340     cap = task->cap;
2341     // Wait for permission to re-enter the RTS with the result.
2342     waitForReturnCapability(&cap,task);
2343     // we might be on a different capability now... but if so, our
2344     // entry on the suspended_ccalling_tasks list will also have been
2345     // migrated.
2346
2347     // Remove the thread from the suspended list
2348     recoverSuspendedTask(cap,task);
2349
2350     tso = task->suspended_tso;
2351     task->suspended_tso = NULL;
2352     tso->link = END_TSO_QUEUE;
2353     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2354     
2355     if (tso->why_blocked == BlockedOnCCall) {
2356         awakenBlockedExceptionQueue(cap,tso);
2357         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2358     }
2359     
2360     /* Reset blocking status */
2361     tso->why_blocked  = NotBlocked;
2362     
2363     cap->r.rCurrentTSO = tso;
2364     cap->in_haskell = rtsTrue;
2365     errno = saved_errno;
2366
2367     /* We might have GC'd, mark the TSO dirty again */
2368     dirtyTSO(tso);
2369
2370     IF_DEBUG(sanity, checkTSO(tso));
2371
2372     return &cap->r;
2373 }
2374
2375 /* ---------------------------------------------------------------------------
2376  * scheduleThread()
2377  *
2378  * scheduleThread puts a thread on the end  of the runnable queue.
2379  * This will usually be done immediately after a thread is created.
2380  * The caller of scheduleThread must create the thread using e.g.
2381  * createThread and push an appropriate closure
2382  * on this thread's stack before the scheduler is invoked.
2383  * ------------------------------------------------------------------------ */
2384
2385 void
2386 scheduleThread(Capability *cap, StgTSO *tso)
2387 {
2388     // The thread goes at the *end* of the run-queue, to avoid possible
2389     // starvation of any threads already on the queue.
2390     appendToRunQueue(cap,tso);
2391 }
2392
2393 void
2394 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2395 {
2396 #if defined(THREADED_RTS)
2397     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2398                               // move this thread from now on.
2399     cpu %= RtsFlags.ParFlags.nNodes;
2400     if (cpu == cap->no) {
2401         appendToRunQueue(cap,tso);
2402     } else {
2403         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2404     }
2405 #else
2406     appendToRunQueue(cap,tso);
2407 #endif
2408 }
2409
2410 Capability *
2411 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2412 {
2413     Task *task;
2414
2415     // We already created/initialised the Task
2416     task = cap->running_task;
2417
2418     // This TSO is now a bound thread; make the Task and TSO
2419     // point to each other.
2420     tso->bound = task;
2421     tso->cap = cap;
2422
2423     task->tso = tso;
2424     task->ret = ret;
2425     task->stat = NoStatus;
2426
2427     appendToRunQueue(cap,tso);
2428
2429     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2430
2431 #if defined(GRAN)
2432     /* GranSim specific init */
2433     CurrentTSO = m->tso;                // the TSO to run
2434     procStatus[MainProc] = Busy;        // status of main PE
2435     CurrentProc = MainProc;             // PE to run it on
2436 #endif
2437
2438     cap = schedule(cap,task);
2439
2440     ASSERT(task->stat != NoStatus);
2441     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2442
2443     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2444     return cap;
2445 }
2446
2447 /* ----------------------------------------------------------------------------
2448  * Starting Tasks
2449  * ------------------------------------------------------------------------- */
2450
2451 #if defined(THREADED_RTS)
2452 void
2453 workerStart(Task *task)
2454 {
2455     Capability *cap;
2456
2457     // See startWorkerTask().
2458     ACQUIRE_LOCK(&task->lock);
2459     cap = task->cap;
2460     RELEASE_LOCK(&task->lock);
2461
2462     // set the thread-local pointer to the Task:
2463     taskEnter(task);
2464
2465     // schedule() runs without a lock.
2466     cap = schedule(cap,task);
2467
2468     // On exit from schedule(), we have a Capability.
2469     releaseCapability(cap);
2470     workerTaskStop(task);
2471 }
2472 #endif
2473
2474 /* ---------------------------------------------------------------------------
2475  * initScheduler()
2476  *
2477  * Initialise the scheduler.  This resets all the queues - if the
2478  * queues contained any threads, they'll be garbage collected at the
2479  * next pass.
2480  *
2481  * ------------------------------------------------------------------------ */
2482
2483 void 
2484 initScheduler(void)
2485 {
2486 #if defined(GRAN)
2487   nat i;
2488   for (i=0; i<=MAX_PROC; i++) {
2489     run_queue_hds[i]      = END_TSO_QUEUE;
2490     run_queue_tls[i]      = END_TSO_QUEUE;
2491     blocked_queue_hds[i]  = END_TSO_QUEUE;
2492     blocked_queue_tls[i]  = END_TSO_QUEUE;
2493     ccalling_threadss[i]  = END_TSO_QUEUE;
2494     blackhole_queue[i]    = END_TSO_QUEUE;
2495     sleeping_queue        = END_TSO_QUEUE;
2496   }
2497 #elif !defined(THREADED_RTS)
2498   blocked_queue_hd  = END_TSO_QUEUE;
2499   blocked_queue_tl  = END_TSO_QUEUE;
2500   sleeping_queue    = END_TSO_QUEUE;
2501 #endif
2502
2503   blackhole_queue   = END_TSO_QUEUE;
2504   all_threads       = END_TSO_QUEUE;
2505
2506   context_switch = 0;
2507   sched_state    = SCHED_RUNNING;
2508
2509 #if defined(THREADED_RTS)
2510   /* Initialise the mutex and condition variables used by
2511    * the scheduler. */
2512   initMutex(&sched_mutex);
2513 #endif
2514   
2515   ACQUIRE_LOCK(&sched_mutex);
2516
2517   /* A capability holds the state a native thread needs in
2518    * order to execute STG code. At least one capability is
2519    * floating around (only THREADED_RTS builds have more than one).
2520    */
2521   initCapabilities();
2522
2523   initTaskManager();
2524
2525 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2526   initSparkPools();
2527 #endif
2528
2529 #if defined(THREADED_RTS)
2530   /*
2531    * Eagerly start one worker to run each Capability, except for
2532    * Capability 0.  The idea is that we're probably going to start a
2533    * bound thread on Capability 0 pretty soon, so we don't want a
2534    * worker task hogging it.
2535    */
2536   { 
2537       nat i;
2538       Capability *cap;
2539       for (i = 1; i < n_capabilities; i++) {
2540           cap = &capabilities[i];
2541           ACQUIRE_LOCK(&cap->lock);
2542           startWorkerTask(cap, workerStart);
2543           RELEASE_LOCK(&cap->lock);
2544       }
2545   }
2546 #endif
2547
2548   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2549
2550   RELEASE_LOCK(&sched_mutex);
2551 }
2552
2553 void
2554 exitScheduler( void )
2555 {
2556     Task *task = NULL;
2557
2558 #if defined(THREADED_RTS)
2559     ACQUIRE_LOCK(&sched_mutex);
2560     task = newBoundTask();
2561     RELEASE_LOCK(&sched_mutex);
2562 #endif
2563
2564     // If we haven't killed all the threads yet, do it now.
2565     if (sched_state < SCHED_SHUTTING_DOWN) {
2566         sched_state = SCHED_INTERRUPTING;
2567         scheduleDoGC(NULL,task,rtsFalse);    
2568     }
2569     sched_state = SCHED_SHUTTING_DOWN;
2570
2571 #if defined(THREADED_RTS)
2572     { 
2573         nat i;
2574         
2575         for (i = 0; i < n_capabilities; i++) {
2576             shutdownCapability(&capabilities[i], task);
2577         }
2578         boundTaskExiting(task);
2579         stopTaskManager();
2580     }
2581     closeMutex(&sched_mutex);
2582 #endif
2583 }
2584
2585 /* ---------------------------------------------------------------------------
2586    Where are the roots that we know about?
2587
2588         - all the threads on the runnable queue
2589         - all the threads on the blocked queue
2590         - all the threads on the sleeping queue
2591         - all the thread currently executing a _ccall_GC
2592         - all the "main threads"
2593      
2594    ------------------------------------------------------------------------ */
2595
2596 /* This has to be protected either by the scheduler monitor, or by the
2597         garbage collection monitor (probably the latter).
2598         KH @ 25/10/99
2599 */
2600
2601 void
2602 GetRoots( evac_fn evac )
2603 {
2604     nat i;
2605     Capability *cap;
2606     Task *task;
2607
2608 #if defined(GRAN)
2609     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2610         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2611             evac((StgClosure **)&run_queue_hds[i]);
2612         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2613             evac((StgClosure **)&run_queue_tls[i]);
2614         
2615         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2616             evac((StgClosure **)&blocked_queue_hds[i]);
2617         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2618             evac((StgClosure **)&blocked_queue_tls[i]);
2619         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2620             evac((StgClosure **)&ccalling_threads[i]);
2621     }
2622
2623     markEventQueue();
2624
2625 #else /* !GRAN */
2626
2627     for (i = 0; i < n_capabilities; i++) {
2628         cap = &capabilities[i];
2629         evac((StgClosure **)(void *)&cap->run_queue_hd);
2630         evac((StgClosure **)(void *)&cap->run_queue_tl);
2631 #if defined(THREADED_RTS)
2632         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2633         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2634 #endif
2635         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2636              task=task->next) {
2637             debugTrace(DEBUG_sched,
2638                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2639             evac((StgClosure **)(void *)&task->suspended_tso);
2640         }
2641
2642     }
2643     
2644
2645 #if !defined(THREADED_RTS)
2646     evac((StgClosure **)(void *)&blocked_queue_hd);
2647     evac((StgClosure **)(void *)&blocked_queue_tl);
2648     evac((StgClosure **)(void *)&sleeping_queue);
2649 #endif 
2650 #endif
2651
2652     // evac((StgClosure **)&blackhole_queue);
2653
2654 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2655     markSparkQueue(evac);
2656 #endif
2657     
2658 #if defined(RTS_USER_SIGNALS)
2659     // mark the signal handlers (signals should be already blocked)
2660     markSignalHandlers(evac);
2661 #endif
2662 }
2663
2664 /* -----------------------------------------------------------------------------
2665    performGC
2666
2667    This is the interface to the garbage collector from Haskell land.
2668    We provide this so that external C code can allocate and garbage
2669    collect when called from Haskell via _ccall_GC.
2670    -------------------------------------------------------------------------- */
2671
2672 static void
2673 performGC_(rtsBool force_major)
2674 {
2675     Task *task;
2676     // We must grab a new Task here, because the existing Task may be
2677     // associated with a particular Capability, and chained onto the 
2678     // suspended_ccalling_tasks queue.
2679     ACQUIRE_LOCK(&sched_mutex);
2680     task = newBoundTask();
2681     RELEASE_LOCK(&sched_mutex);
2682     scheduleDoGC(NULL,task,force_major);
2683     boundTaskExiting(task);
2684 }
2685
2686 void
2687 performGC(void)
2688 {
2689     performGC_(rtsFalse);
2690 }
2691
2692 void
2693 performMajorGC(void)
2694 {
2695     performGC_(rtsTrue);
2696 }
2697
2698 /* -----------------------------------------------------------------------------
2699    Stack overflow
2700
2701    If the thread has reached its maximum stack size, then raise the
2702    StackOverflow exception in the offending thread.  Otherwise
2703    relocate the TSO into a larger chunk of memory and adjust its stack
2704    size appropriately.
2705    -------------------------------------------------------------------------- */
2706
2707 static StgTSO *
2708 threadStackOverflow(Capability *cap, StgTSO *tso)
2709 {
2710   nat new_stack_size, stack_words;
2711   lnat new_tso_size;
2712   StgPtr new_sp;
2713   StgTSO *dest;
2714
2715   IF_DEBUG(sanity,checkTSO(tso));
2716
2717   // don't allow throwTo() to modify the blocked_exceptions queue
2718   // while we are moving the TSO:
2719   lockClosure((StgClosure *)tso);
2720
2721   if (tso->stack_size >= tso->max_stack_size) {
2722
2723       debugTrace(DEBUG_gc,
2724                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2725                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2726       IF_DEBUG(gc,
2727                /* If we're debugging, just print out the top of the stack */
2728                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2729                                                 tso->sp+64)));
2730
2731       // Send this thread the StackOverflow exception
2732       unlockTSO(tso);
2733       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2734       return tso;
2735   }
2736
2737   /* Try to double the current stack size.  If that takes us over the
2738    * maximum stack size for this thread, then use the maximum instead.
2739    * Finally round up so the TSO ends up as a whole number of blocks.
2740    */
2741   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2742   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2743                                        TSO_STRUCT_SIZE)/sizeof(W_);
2744   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2745   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2746
2747   debugTrace(DEBUG_sched, 
2748              "increasing stack size from %ld words to %d.",
2749              (long)tso->stack_size, new_stack_size);
2750
2751   dest = (StgTSO *)allocate(new_tso_size);
2752   TICK_ALLOC_TSO(new_stack_size,0);
2753
2754   /* copy the TSO block and the old stack into the new area */
2755   memcpy(dest,tso,TSO_STRUCT_SIZE);
2756   stack_words = tso->stack + tso->stack_size - tso->sp;
2757   new_sp = (P_)dest + new_tso_size - stack_words;
2758   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2759
2760   /* relocate the stack pointers... */
2761   dest->sp         = new_sp;
2762   dest->stack_size = new_stack_size;
2763         
2764   /* Mark the old TSO as relocated.  We have to check for relocated
2765    * TSOs in the garbage collector and any primops that deal with TSOs.
2766    *
2767    * It's important to set the sp value to just beyond the end
2768    * of the stack, so we don't attempt to scavenge any part of the
2769    * dead TSO's stack.
2770    */
2771   tso->what_next = ThreadRelocated;
2772   tso->link = dest;
2773   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2774   tso->why_blocked = NotBlocked;
2775
2776   IF_PAR_DEBUG(verbose,
2777                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2778                      tso->id, tso, tso->stack_size);
2779                /* If we're debugging, just print out the top of the stack */
2780                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2781                                                 tso->sp+64)));
2782   
2783   unlockTSO(dest);
2784   unlockTSO(tso);
2785
2786   IF_DEBUG(sanity,checkTSO(dest));
2787 #if 0
2788   IF_DEBUG(scheduler,printTSO(dest));
2789 #endif
2790
2791   return dest;
2792 }
2793
2794 /* ---------------------------------------------------------------------------
2795    Interrupt execution
2796    - usually called inside a signal handler so it mustn't do anything fancy.   
2797    ------------------------------------------------------------------------ */
2798
2799 void
2800 interruptStgRts(void)
2801 {
2802     sched_state = SCHED_INTERRUPTING;
2803     context_switch = 1;
2804     wakeUpRts();
2805 }
2806
2807 /* -----------------------------------------------------------------------------
2808    Wake up the RTS
2809    
2810    This function causes at least one OS thread to wake up and run the
2811    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2812    an external event has arrived that may need servicing (eg. a
2813    keyboard interrupt).
2814
2815    In the single-threaded RTS we don't do anything here; we only have
2816    one thread anyway, and the event that caused us to want to wake up
2817    will have interrupted any blocking system call in progress anyway.
2818    -------------------------------------------------------------------------- */
2819
2820 void
2821 wakeUpRts(void)
2822 {
2823 #if defined(THREADED_RTS)
2824     // This forces the IO Manager thread to wakeup, which will
2825     // in turn ensure that some OS thread wakes up and runs the
2826     // scheduler loop, which will cause a GC and deadlock check.
2827     ioManagerWakeup();
2828 #endif
2829 }
2830
2831 /* -----------------------------------------------------------------------------
2832  * checkBlackHoles()
2833  *
2834  * Check the blackhole_queue for threads that can be woken up.  We do
2835  * this periodically: before every GC, and whenever the run queue is
2836  * empty.
2837  *
2838  * An elegant solution might be to just wake up all the blocked
2839  * threads with awakenBlockedQueue occasionally: they'll go back to
2840  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2841  * doesn't give us a way to tell whether we've actually managed to
2842  * wake up any threads, so we would be busy-waiting.
2843  *
2844  * -------------------------------------------------------------------------- */
2845
2846 static rtsBool
2847 checkBlackHoles (Capability *cap)
2848 {
2849     StgTSO **prev, *t;
2850     rtsBool any_woke_up = rtsFalse;
2851     StgHalfWord type;
2852
2853     // blackhole_queue is global:
2854     ASSERT_LOCK_HELD(&sched_mutex);
2855
2856     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2857
2858     // ASSUMES: sched_mutex
2859     prev = &blackhole_queue;
2860     t = blackhole_queue;
2861     while (t != END_TSO_QUEUE) {
2862         ASSERT(t->why_blocked == BlockedOnBlackHole);
2863         type = get_itbl(t->block_info.closure)->type;
2864         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2865             IF_DEBUG(sanity,checkTSO(t));
2866             t = unblockOne(cap, t);
2867             // urk, the threads migrate to the current capability
2868             // here, but we'd like to keep them on the original one.
2869             *prev = t;
2870             any_woke_up = rtsTrue;
2871         } else {
2872             prev = &t->link;
2873             t = t->link;
2874         }
2875     }
2876
2877     return any_woke_up;
2878 }
2879
2880 /* -----------------------------------------------------------------------------
2881    Deleting threads
2882
2883    This is used for interruption (^C) and forking, and corresponds to
2884    raising an exception but without letting the thread catch the
2885    exception.
2886    -------------------------------------------------------------------------- */
2887
2888 static void 
2889 deleteThread (Capability *cap, StgTSO *tso)
2890 {
2891     // NOTE: must only be called on a TSO that we have exclusive
2892     // access to, because we will call throwToSingleThreaded() below.
2893     // The TSO must be on the run queue of the Capability we own, or 
2894     // we must own all Capabilities.
2895
2896     if (tso->why_blocked != BlockedOnCCall &&
2897         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2898         throwToSingleThreaded(cap,tso,NULL);
2899     }
2900 }
2901
2902 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2903 static void 
2904 deleteThread_(Capability *cap, StgTSO *tso)
2905 { // for forkProcess only:
2906   // like deleteThread(), but we delete threads in foreign calls, too.
2907
2908     if (tso->why_blocked == BlockedOnCCall ||
2909         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2910         unblockOne(cap,tso);
2911         tso->what_next = ThreadKilled;
2912     } else {
2913         deleteThread(cap,tso);
2914     }
2915 }
2916 #endif
2917
2918 /* -----------------------------------------------------------------------------
2919    raiseExceptionHelper
2920    
2921    This function is called by the raise# primitve, just so that we can
2922    move some of the tricky bits of raising an exception from C-- into
2923    C.  Who knows, it might be a useful re-useable thing here too.
2924    -------------------------------------------------------------------------- */
2925
2926 StgWord
2927 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2928 {
2929     Capability *cap = regTableToCapability(reg);
2930     StgThunk *raise_closure = NULL;
2931     StgPtr p, next;
2932     StgRetInfoTable *info;
2933     //
2934     // This closure represents the expression 'raise# E' where E
2935     // is the exception raise.  It is used to overwrite all the
2936     // thunks which are currently under evaluataion.
2937     //
2938
2939     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2940     // LDV profiling: stg_raise_info has THUNK as its closure
2941     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2942     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2943     // 1 does not cause any problem unless profiling is performed.
2944     // However, when LDV profiling goes on, we need to linearly scan
2945     // small object pool, where raise_closure is stored, so we should
2946     // use MIN_UPD_SIZE.
2947     //
2948     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2949     //                                 sizeofW(StgClosure)+1);
2950     //
2951
2952     //
2953     // Walk up the stack, looking for the catch frame.  On the way,
2954     // we update any closures pointed to from update frames with the
2955     // raise closure that we just built.
2956     //
2957     p = tso->sp;
2958     while(1) {
2959         info = get_ret_itbl((StgClosure *)p);
2960         next = p + stack_frame_sizeW((StgClosure *)p);
2961         switch (info->i.type) {
2962             
2963         case UPDATE_FRAME:
2964             // Only create raise_closure if we need to.
2965             if (raise_closure == NULL) {
2966                 raise_closure = 
2967                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2968                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2969                 raise_closure->payload[0] = exception;
2970             }
2971             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2972             p = next;
2973             continue;
2974
2975         case ATOMICALLY_FRAME:
2976             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2977             tso->sp = p;
2978             return ATOMICALLY_FRAME;
2979             
2980         case CATCH_FRAME:
2981             tso->sp = p;
2982             return CATCH_FRAME;
2983
2984         case CATCH_STM_FRAME:
2985             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2986             tso->sp = p;
2987             return CATCH_STM_FRAME;
2988             
2989         case STOP_FRAME:
2990             tso->sp = p;
2991             return STOP_FRAME;
2992
2993         case CATCH_RETRY_FRAME:
2994         default:
2995             p = next; 
2996             continue;
2997         }
2998     }
2999 }
3000
3001
3002 /* -----------------------------------------------------------------------------
3003    findRetryFrameHelper
3004
3005    This function is called by the retry# primitive.  It traverses the stack
3006    leaving tso->sp referring to the frame which should handle the retry.  
3007
3008    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3009    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3010
3011    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3012    create) because retries are not considered to be exceptions, despite the
3013    similar implementation.
3014
3015    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3016    not be created within memory transactions.
3017    -------------------------------------------------------------------------- */
3018
3019 StgWord
3020 findRetryFrameHelper (StgTSO *tso)
3021 {
3022   StgPtr           p, next;
3023   StgRetInfoTable *info;
3024
3025   p = tso -> sp;
3026   while (1) {
3027     info = get_ret_itbl((StgClosure *)p);
3028     next = p + stack_frame_sizeW((StgClosure *)p);
3029     switch (info->i.type) {
3030       
3031     case ATOMICALLY_FRAME:
3032         debugTrace(DEBUG_stm,
3033                    "found ATOMICALLY_FRAME at %p during retry", p);
3034         tso->sp = p;
3035         return ATOMICALLY_FRAME;
3036       
3037     case CATCH_RETRY_FRAME:
3038         debugTrace(DEBUG_stm,
3039                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3040         tso->sp = p;
3041         return CATCH_RETRY_FRAME;
3042       
3043     case CATCH_STM_FRAME: {
3044         debugTrace(DEBUG_stm,
3045                    "found CATCH_STM_FRAME at %p during retry", p);
3046         StgTRecHeader *trec = tso -> trec;
3047         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3048         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3049         stmAbortTransaction(tso -> cap, trec);
3050         stmFreeAbortedTRec(tso -> cap, trec);
3051         tso -> trec = outer;
3052         p = next; 
3053         continue;
3054     }
3055       
3056
3057     default:
3058       ASSERT(info->i.type != CATCH_FRAME);
3059       ASSERT(info->i.type != STOP_FRAME);
3060       p = next; 
3061       continue;
3062     }
3063   }
3064 }
3065
3066 /* -----------------------------------------------------------------------------
3067    resurrectThreads is called after garbage collection on the list of
3068    threads found to be garbage.  Each of these threads will be woken
3069    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3070    on an MVar, or NonTermination if the thread was blocked on a Black
3071    Hole.
3072
3073    Locks: assumes we hold *all* the capabilities.
3074    -------------------------------------------------------------------------- */
3075
3076 void
3077 resurrectThreads (StgTSO *threads)
3078 {
3079     StgTSO *tso, *next;
3080     Capability *cap;
3081
3082     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3083         next = tso->global_link;
3084         tso->global_link = all_threads;
3085         all_threads = tso;
3086         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3087         
3088         // Wake up the thread on the Capability it was last on
3089         cap = tso->cap;
3090         
3091         switch (tso->why_blocked) {
3092         case BlockedOnMVar:
3093         case BlockedOnException:
3094             /* Called by GC - sched_mutex lock is currently held. */
3095             throwToSingleThreaded(cap, tso,
3096                                   (StgClosure *)BlockedOnDeadMVar_closure);
3097             break;
3098         case BlockedOnBlackHole:
3099             throwToSingleThreaded(cap, tso,
3100                                   (StgClosure *)NonTermination_closure);
3101             break;
3102         case BlockedOnSTM:
3103             throwToSingleThreaded(cap, tso,
3104                                   (StgClosure *)BlockedIndefinitely_closure);
3105             break;
3106         case NotBlocked:
3107             /* This might happen if the thread was blocked on a black hole
3108              * belonging to a thread that we've just woken up (raiseAsync
3109              * can wake up threads, remember...).
3110              */
3111             continue;
3112         default:
3113             barf("resurrectThreads: thread blocked in a strange way");
3114         }
3115     }
3116 }