Eagerly raise a blocked exception when entering 'unblock' or exiting 'block'
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #ifdef PROFILING
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #endif
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
37 # include "GranSim.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
41 # include "FetchMe.h"
42 # include "HLC.h"
43 #endif
44 #include "Sparks.h"
45 #include "Capability.h"
46 #include "Task.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
50 #endif
51 #include "Trace.h"
52 #include "RaiseAsync.h"
53 #include "Threads.h"
54 #include "ThrIOManager.h"
55
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
58 #endif
59 #ifdef HAVE_UNISTD_H
60 #include <unistd.h>
61 #endif
62
63 #include <string.h>
64 #include <stdlib.h>
65 #include <stdarg.h>
66
67 #ifdef HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 // Turn off inlining when debugging - it obfuscates things
72 #ifdef DEBUG
73 # undef  STATIC_INLINE
74 # define STATIC_INLINE static
75 #endif
76
77 /* -----------------------------------------------------------------------------
78  * Global variables
79  * -------------------------------------------------------------------------- */
80
81 #if defined(GRAN)
82
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86 /* 
87    In GranSim we have a runnable and a blocked queue for each processor.
88    In order to minimise code changes new arrays run_queue_hds/tls
89    are created. run_queue_hd is then a short cut (macro) for
90    run_queue_hds[CurrentProc] (see GranSim.h).
91    -- HWL
92 */
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97    the std RTS (i.e. we are cheating). However, we don't use this list in
98    the GranSim specific code at the moment (so we are only potentially
99    cheating).  */
100
101 #else /* !GRAN */
102
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
108 #endif
109
110 /* Threads blocked on blackholes.
111  * LOCK: sched_mutex+capability, or all capabilities
112  */
113 StgTSO *blackhole_queue = NULL;
114 #endif
115
116 /* The blackhole_queue should be checked for threads to wake up.  See
117  * Schedule.h for more thorough comment.
118  * LOCK: none (doesn't matter if we miss an update)
119  */
120 rtsBool blackholes_need_checking = rtsFalse;
121
122 /* Linked list of all threads.
123  * Used for detecting garbage collected threads.
124  * LOCK: sched_mutex+capability, or all capabilities
125  */
126 StgTSO *all_threads = NULL;
127
128 /* flag set by signal handler to precipitate a context switch
129  * LOCK: none (just an advisory flag)
130  */
131 int context_switch = 0;
132
133 /* flag that tracks whether we have done any execution in this time slice.
134  * LOCK: currently none, perhaps we should lock (but needs to be
135  * updated in the fast path of the scheduler).
136  */
137 nat recent_activity = ACTIVITY_YES;
138
139 /* if this flag is set as well, give up execution
140  * LOCK: none (changes once, from false->true)
141  */
142 rtsBool sched_state = SCHED_RUNNING;
143
144 #if defined(GRAN)
145 StgTSO *CurrentTSO;
146 #endif
147
148 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
149  *  exists - earlier gccs apparently didn't.
150  *  -= chak
151  */
152 StgTSO dummy_tso;
153
154 /*
155  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156  * in an MT setting, needed to signal that a worker thread shouldn't hang around
157  * in the scheduler when it is out of work.
158  */
159 rtsBool shutting_down_scheduler = rtsFalse;
160
161 /*
162  * This mutex protects most of the global scheduler data in
163  * the THREADED_RTS runtime.
164  */
165 #if defined(THREADED_RTS)
166 Mutex sched_mutex;
167 #endif
168
169 #if defined(PARALLEL_HASKELL)
170 StgTSO *LastTSO;
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
173 #endif
174
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
177 #endif
178
179 /* -----------------------------------------------------------------------------
180  * static function prototypes
181  * -------------------------------------------------------------------------- */
182
183 static Capability *schedule (Capability *initialCapability, Task *task);
184
185 //
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
189 //
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
193 #endif
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 #if defined(GRAN)
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #endif
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #endif
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
209 #endif
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
213                                          StgTSO *t);
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
215                                     nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218                                              StgTSO *t );
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221                                 rtsBool force_major);
222
223 static rtsBool checkBlackHoles(Capability *cap);
224
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
229
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
232 #endif
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);  
237 #endif
238
239 #ifdef DEBUG
240 static char *whatNext_strs[] = {
241   "(unknown)",
242   "ThreadRunGHC",
243   "ThreadInterpret",
244   "ThreadKilled",
245   "ThreadRelocated",
246   "ThreadComplete"
247 };
248 #endif
249
250 /* -----------------------------------------------------------------------------
251  * Putting a thread on the run queue: different scheduling policies
252  * -------------------------------------------------------------------------- */
253
254 STATIC_INLINE void
255 addToRunQueue( Capability *cap, StgTSO *t )
256 {
257 #if defined(PARALLEL_HASKELL)
258     if (RtsFlags.ParFlags.doFairScheduling) { 
259         // this does round-robin scheduling; good for concurrency
260         appendToRunQueue(cap,t);
261     } else {
262         // this does unfair scheduling; good for parallelism
263         pushOnRunQueue(cap,t);
264     }
265 #else
266     // this does round-robin scheduling; good for concurrency
267     appendToRunQueue(cap,t);
268 #endif
269 }
270
271 /* ---------------------------------------------------------------------------
272    Main scheduling loop.
273
274    We use round-robin scheduling, each thread returning to the
275    scheduler loop when one of these conditions is detected:
276
277       * out of heap space
278       * timer expires (thread yields)
279       * thread blocks
280       * thread ends
281       * stack overflow
282
283    GRAN version:
284      In a GranSim setup this loop iterates over the global event queue.
285      This revolves around the global event queue, which determines what 
286      to do next. Therefore, it's more complicated than either the 
287      concurrent or the parallel (GUM) setup.
288
289    GUM version:
290      GUM iterates over incoming messages.
291      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292      and sends out a fish whenever it has nothing to do; in-between
293      doing the actual reductions (shared code below) it processes the
294      incoming messages and deals with delayed operations 
295      (see PendingFetches).
296      This is not the ugliest code you could imagine, but it's bloody close.
297
298    ------------------------------------------------------------------------ */
299
300 static Capability *
301 schedule (Capability *initialCapability, Task *task)
302 {
303   StgTSO *t;
304   Capability *cap;
305   StgThreadReturnCode ret;
306 #if defined(GRAN)
307   rtsEvent *event;
308 #elif defined(PARALLEL_HASKELL)
309   StgTSO *tso;
310   GlobalTaskId pe;
311   rtsBool receivedFinish = rtsFalse;
312 # if defined(DEBUG)
313   nat tp_size, sp_size; // stats only
314 # endif
315 #endif
316   nat prev_what_next;
317   rtsBool ready_to_gc;
318 #if defined(THREADED_RTS)
319   rtsBool first = rtsTrue;
320 #endif
321   
322   cap = initialCapability;
323
324   // Pre-condition: this task owns initialCapability.
325   // The sched_mutex is *NOT* held
326   // NB. on return, we still hold a capability.
327
328   debugTrace (DEBUG_sched, 
329               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330               task, initialCapability);
331
332   schedulePreLoop();
333
334   // -----------------------------------------------------------
335   // Scheduler loop starts here:
336
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION        (!receivedFinish)
339 #elif defined(GRAN)
340 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
341 #else
342 #define TERMINATION_CONDITION        rtsTrue
343 #endif
344
345   while (TERMINATION_CONDITION) {
346
347 #if defined(GRAN)
348       /* Choose the processor with the next event */
349       CurrentProc = event->proc;
350       CurrentTSO = event->tso;
351 #endif
352
353 #if defined(THREADED_RTS)
354       if (first) {
355           // don't yield the first time, we want a chance to run this
356           // thread for a bit, even if there are others banging at the
357           // door.
358           first = rtsFalse;
359           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360       } else {
361           // Yield the capability to higher-priority tasks if necessary.
362           yieldCapability(&cap, task);
363       }
364 #endif
365       
366 #if defined(THREADED_RTS)
367       schedulePushWork(cap,task);
368 #endif
369
370     // Check whether we have re-entered the RTS from Haskell without
371     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372     // call).
373     if (cap->in_haskell) {
374           errorBelch("schedule: re-entered unsafely.\n"
375                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
376           stg_exit(EXIT_FAILURE);
377     }
378
379     // The interruption / shutdown sequence.
380     // 
381     // In order to cleanly shut down the runtime, we want to:
382     //   * make sure that all main threads return to their callers
383     //     with the state 'Interrupted'.
384     //   * clean up all OS threads assocated with the runtime
385     //   * free all memory etc.
386     //
387     // So the sequence for ^C goes like this:
388     //
389     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
390     //     arranges for some Capability to wake up
391     //
392     //   * all threads in the system are halted, and the zombies are
393     //     placed on the run queue for cleaning up.  We acquire all
394     //     the capabilities in order to delete the threads, this is
395     //     done by scheduleDoGC() for convenience (because GC already
396     //     needs to acquire all the capabilities).  We can't kill
397     //     threads involved in foreign calls.
398     // 
399     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
400     //
401     //   * sched_state := SCHED_SHUTTING_DOWN
402     //
403     //   * all workers exit when the run queue on their capability
404     //     drains.  All main threads will also exit when their TSO
405     //     reaches the head of the run queue and they can return.
406     //
407     //   * eventually all Capabilities will shut down, and the RTS can
408     //     exit.
409     //
410     //   * We might be left with threads blocked in foreign calls, 
411     //     we should really attempt to kill these somehow (TODO);
412     
413     switch (sched_state) {
414     case SCHED_RUNNING:
415         break;
416     case SCHED_INTERRUPTING:
417         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419         discardSparksCap(cap);
420 #endif
421         /* scheduleDoGC() deletes all the threads */
422         cap = scheduleDoGC(cap,task,rtsFalse);
423         break;
424     case SCHED_SHUTTING_DOWN:
425         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426         // If we are a worker, just exit.  If we're a bound thread
427         // then we will exit below when we've removed our TSO from
428         // the run queue.
429         if (task->tso == NULL && emptyRunQueue(cap)) {
430             return cap;
431         }
432         break;
433     default:
434         barf("sched_state: %d", sched_state);
435     }
436
437 #if defined(THREADED_RTS)
438     // If the run queue is empty, take a spark and turn it into a thread.
439     {
440         if (emptyRunQueue(cap)) {
441             StgClosure *spark;
442             spark = findSpark(cap);
443             if (spark != NULL) {
444                 debugTrace(DEBUG_sched,
445                            "turning spark of closure %p into a thread",
446                            (StgClosure *)spark);
447                 createSparkThread(cap,spark);     
448             }
449         }
450     }
451 #endif // THREADED_RTS
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckWakeupThreads(cap);
462
463     scheduleCheckBlockedThreads(cap);
464
465     scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467     cap = task->cap;    // reload cap, it might have changed
468 #endif
469
470     // Normally, the only way we can get here with no threads to
471     // run is if a keyboard interrupt received during 
472     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473     // Additionally, it is not fatal for the
474     // threaded RTS to reach here with no threads to run.
475     //
476     // win32: might be here due to awaitEvent() being abandoned
477     // as a result of a console event having been delivered.
478     if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480         ASSERT(sched_state >= SCHED_INTERRUPTING);
481 #endif
482         continue; // nothing to do
483     }
484
485 #if defined(PARALLEL_HASKELL)
486     scheduleSendPendingMessages();
487     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
488         continue;
489
490 #if defined(SPARKS)
491     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
492 #endif
493
494     /* If we still have no work we need to send a FISH to get a spark
495        from another PE */
496     if (emptyRunQueue(cap)) {
497         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498         ASSERT(rtsFalse); // should not happen at the moment
499     }
500     // from here: non-empty run queue.
501     //  TODO: merge above case with this, only one call processMessages() !
502     if (PacketsWaiting()) {  /* process incoming messages, if
503                                 any pending...  only in else
504                                 because getRemoteWork waits for
505                                 messages as well */
506         receivedFinish = processMessages();
507     }
508 #endif
509
510 #if defined(GRAN)
511     scheduleProcessEvent(event);
512 #endif
513
514     // 
515     // Get a thread to run
516     //
517     t = popRunQueue(cap);
518
519 #if defined(GRAN) || defined(PAR)
520     scheduleGranParReport(); // some kind of debuging output
521 #else
522     // Sanity check the thread we're about to run.  This can be
523     // expensive if there is lots of thread switching going on...
524     IF_DEBUG(sanity,checkTSO(t));
525 #endif
526
527 #if defined(THREADED_RTS)
528     // Check whether we can run this thread in the current task.
529     // If not, we have to pass our capability to the right task.
530     {
531         Task *bound = t->bound;
532       
533         if (bound) {
534             if (bound == task) {
535                 debugTrace(DEBUG_sched,
536                            "### Running thread %lu in bound thread", (unsigned long)t->id);
537                 // yes, the Haskell thread is bound to the current native thread
538             } else {
539                 debugTrace(DEBUG_sched,
540                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
541                 // no, bound to a different Haskell thread: pass to that thread
542                 pushOnRunQueue(cap,t);
543                 continue;
544             }
545         } else {
546             // The thread we want to run is unbound.
547             if (task->tso) { 
548                 debugTrace(DEBUG_sched,
549                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550                 // no, the current native thread is bound to a different
551                 // Haskell thread, so pass it to any worker thread
552                 pushOnRunQueue(cap,t);
553                 continue; 
554             }
555         }
556     }
557 #endif
558
559     cap->r.rCurrentTSO = t;
560     
561     /* context switches are initiated by the timer signal, unless
562      * the user specified "context switch as often as possible", with
563      * +RTS -C0
564      */
565     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566         && !emptyThreadQueues(cap)) {
567         context_switch = 1;
568     }
569          
570 run_thread:
571
572     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
573                               (long)t->id, whatNext_strs[t->what_next]);
574
575 #if defined(PROFILING)
576     startHeapProfTimer();
577 #endif
578
579     // Check for exceptions blocked on this thread
580     maybePerformBlockedException (cap, t);
581
582     // ----------------------------------------------------------------------
583     // Run the current thread 
584
585     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586     ASSERT(t->cap == cap);
587
588     prev_what_next = t->what_next;
589
590     errno = t->saved_errno;
591     cap->in_haskell = rtsTrue;
592
593     dirtyTSO(t);
594
595     recent_activity = ACTIVITY_YES;
596
597     switch (prev_what_next) {
598         
599     case ThreadKilled:
600     case ThreadComplete:
601         /* Thread already finished, return to scheduler. */
602         ret = ThreadFinished;
603         break;
604         
605     case ThreadRunGHC:
606     {
607         StgRegTable *r;
608         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
609         cap = regTableToCapability(r);
610         ret = r->rRet;
611         break;
612     }
613     
614     case ThreadInterpret:
615         cap = interpretBCO(cap);
616         ret = cap->r.rRet;
617         break;
618         
619     default:
620         barf("schedule: invalid what_next field");
621     }
622
623     cap->in_haskell = rtsFalse;
624
625     // The TSO might have moved, eg. if it re-entered the RTS and a GC
626     // happened.  So find the new location:
627     t = cap->r.rCurrentTSO;
628
629     // We have run some Haskell code: there might be blackhole-blocked
630     // threads to wake up now.
631     // Lock-free test here should be ok, we're just setting a flag.
632     if ( blackhole_queue != END_TSO_QUEUE ) {
633         blackholes_need_checking = rtsTrue;
634     }
635     
636     // And save the current errno in this thread.
637     // XXX: possibly bogus for SMP because this thread might already
638     // be running again, see code below.
639     t->saved_errno = errno;
640
641 #if defined(THREADED_RTS)
642     // If ret is ThreadBlocked, and this Task is bound to the TSO that
643     // blocked, we are in limbo - the TSO is now owned by whatever it
644     // is blocked on, and may in fact already have been woken up,
645     // perhaps even on a different Capability.  It may be the case
646     // that task->cap != cap.  We better yield this Capability
647     // immediately and return to normaility.
648     if (ret == ThreadBlocked) {
649         debugTrace(DEBUG_sched,
650                    "--<< thread %lu (%s) stopped: blocked",
651                    (unsigned long)t->id, whatNext_strs[t->what_next]);
652         continue;
653     }
654 #endif
655
656     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
657     ASSERT(t->cap == cap);
658
659     // ----------------------------------------------------------------------
660     
661     // Costs for the scheduler are assigned to CCS_SYSTEM
662 #if defined(PROFILING)
663     stopHeapProfTimer();
664     CCCS = CCS_SYSTEM;
665 #endif
666     
667     schedulePostRunThread();
668
669     ready_to_gc = rtsFalse;
670
671     switch (ret) {
672     case HeapOverflow:
673         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
674         break;
675
676     case StackOverflow:
677         scheduleHandleStackOverflow(cap,task,t);
678         break;
679
680     case ThreadYielding:
681         if (scheduleHandleYield(cap, t, prev_what_next)) {
682             // shortcut for switching between compiler/interpreter:
683             goto run_thread; 
684         }
685         break;
686
687     case ThreadBlocked:
688         scheduleHandleThreadBlocked(t);
689         break;
690
691     case ThreadFinished:
692         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
693         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
694         break;
695
696     default:
697       barf("schedule: invalid thread return code %d", (int)ret);
698     }
699
700     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
701     if (ready_to_gc) {
702       cap = scheduleDoGC(cap,task,rtsFalse);
703     }
704   } /* end of while() */
705
706   debugTrace(PAR_DEBUG_verbose,
707              "== Leaving schedule() after having received Finish");
708 }
709
710 /* ----------------------------------------------------------------------------
711  * Setting up the scheduler loop
712  * ------------------------------------------------------------------------- */
713
714 static void
715 schedulePreLoop(void)
716 {
717 #if defined(GRAN) 
718     /* set up first event to get things going */
719     /* ToDo: assign costs for system setup and init MainTSO ! */
720     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
721               ContinueThread, 
722               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
723     
724     debugTrace (DEBUG_gran,
725                 "GRAN: Init CurrentTSO (in schedule) = %p", 
726                 CurrentTSO);
727     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
728     
729     if (RtsFlags.GranFlags.Light) {
730         /* Save current time; GranSim Light only */
731         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
732     }      
733 #endif
734 }
735
736 /* -----------------------------------------------------------------------------
737  * schedulePushWork()
738  *
739  * Push work to other Capabilities if we have some.
740  * -------------------------------------------------------------------------- */
741
742 #if defined(THREADED_RTS)
743 static void
744 schedulePushWork(Capability *cap USED_IF_THREADS, 
745                  Task *task      USED_IF_THREADS)
746 {
747     Capability *free_caps[n_capabilities], *cap0;
748     nat i, n_free_caps;
749
750     // migration can be turned off with +RTS -qg
751     if (!RtsFlags.ParFlags.migrate) return;
752
753     // Check whether we have more threads on our run queue, or sparks
754     // in our pool, that we could hand to another Capability.
755     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
756         && sparkPoolSizeCap(cap) < 2) {
757         return;
758     }
759
760     // First grab as many free Capabilities as we can.
761     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
762         cap0 = &capabilities[i];
763         if (cap != cap0 && tryGrabCapability(cap0,task)) {
764             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
765                 // it already has some work, we just grabbed it at 
766                 // the wrong moment.  Or maybe it's deadlocked!
767                 releaseCapability(cap0);
768             } else {
769                 free_caps[n_free_caps++] = cap0;
770             }
771         }
772     }
773
774     // we now have n_free_caps free capabilities stashed in
775     // free_caps[].  Share our run queue equally with them.  This is
776     // probably the simplest thing we could do; improvements we might
777     // want to do include:
778     //
779     //   - giving high priority to moving relatively new threads, on 
780     //     the gournds that they haven't had time to build up a
781     //     working set in the cache on this CPU/Capability.
782     //
783     //   - giving low priority to moving long-lived threads
784
785     if (n_free_caps > 0) {
786         StgTSO *prev, *t, *next;
787         rtsBool pushed_to_all;
788
789         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
790
791         i = 0;
792         pushed_to_all = rtsFalse;
793
794         if (cap->run_queue_hd != END_TSO_QUEUE) {
795             prev = cap->run_queue_hd;
796             t = prev->link;
797             prev->link = END_TSO_QUEUE;
798             for (; t != END_TSO_QUEUE; t = next) {
799                 next = t->link;
800                 t->link = END_TSO_QUEUE;
801                 if (t->what_next == ThreadRelocated
802                     || t->bound == task // don't move my bound thread
803                     || tsoLocked(t)) {  // don't move a locked thread
804                     prev->link = t;
805                     prev = t;
806                 } else if (i == n_free_caps) {
807                     pushed_to_all = rtsTrue;
808                     i = 0;
809                     // keep one for us
810                     prev->link = t;
811                     prev = t;
812                 } else {
813                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
814                     appendToRunQueue(free_caps[i],t);
815                     if (t->bound) { t->bound->cap = free_caps[i]; }
816                     t->cap = free_caps[i];
817                     i++;
818                 }
819             }
820             cap->run_queue_tl = prev;
821         }
822
823         // If there are some free capabilities that we didn't push any
824         // threads to, then try to push a spark to each one.
825         if (!pushed_to_all) {
826             StgClosure *spark;
827             // i is the next free capability to push to
828             for (; i < n_free_caps; i++) {
829                 if (emptySparkPoolCap(free_caps[i])) {
830                     spark = findSpark(cap);
831                     if (spark != NULL) {
832                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
833                         newSpark(&(free_caps[i]->r), spark);
834                     }
835                 }
836             }
837         }
838
839         // release the capabilities
840         for (i = 0; i < n_free_caps; i++) {
841             task->cap = free_caps[i];
842             releaseCapability(free_caps[i]);
843         }
844     }
845     task->cap = cap; // reset to point to our Capability.
846 }
847 #endif
848
849 /* ----------------------------------------------------------------------------
850  * Start any pending signal handlers
851  * ------------------------------------------------------------------------- */
852
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854 static void
855 scheduleStartSignalHandlers(Capability *cap)
856 {
857     if (signals_pending()) { // safe outside the lock
858         startSignalHandlers(cap);
859     }
860 }
861 #else
862 static void
863 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
864 {
865 }
866 #endif
867
868 /* ----------------------------------------------------------------------------
869  * Check for blocked threads that can be woken up.
870  * ------------------------------------------------------------------------- */
871
872 static void
873 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
874 {
875 #if !defined(THREADED_RTS)
876     //
877     // Check whether any waiting threads need to be woken up.  If the
878     // run queue is empty, and there are no other tasks running, we
879     // can wait indefinitely for something to happen.
880     //
881     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
882     {
883         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
884     }
885 #endif
886 }
887
888
889 /* ----------------------------------------------------------------------------
890  * Check for threads woken up by other Capabilities
891  * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
895 {
896 #if defined(THREADED_RTS)
897     // Any threads that were woken up by other Capabilities get
898     // appended to our run queue.
899     if (!emptyWakeupQueue(cap)) {
900         ACQUIRE_LOCK(&cap->lock);
901         if (emptyRunQueue(cap)) {
902             cap->run_queue_hd = cap->wakeup_queue_hd;
903             cap->run_queue_tl = cap->wakeup_queue_tl;
904         } else {
905             cap->run_queue_tl->link = cap->wakeup_queue_hd;
906             cap->run_queue_tl = cap->wakeup_queue_tl;
907         }
908         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
909         RELEASE_LOCK(&cap->lock);
910     }
911 #endif
912 }
913
914 /* ----------------------------------------------------------------------------
915  * Check for threads blocked on BLACKHOLEs that can be woken up
916  * ------------------------------------------------------------------------- */
917 static void
918 scheduleCheckBlackHoles (Capability *cap)
919 {
920     if ( blackholes_need_checking ) // check without the lock first
921     {
922         ACQUIRE_LOCK(&sched_mutex);
923         if ( blackholes_need_checking ) {
924             checkBlackHoles(cap);
925             blackholes_need_checking = rtsFalse;
926         }
927         RELEASE_LOCK(&sched_mutex);
928     }
929 }
930
931 /* ----------------------------------------------------------------------------
932  * Detect deadlock conditions and attempt to resolve them.
933  * ------------------------------------------------------------------------- */
934
935 static void
936 scheduleDetectDeadlock (Capability *cap, Task *task)
937 {
938
939 #if defined(PARALLEL_HASKELL)
940     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
941     return;
942 #endif
943
944     /* 
945      * Detect deadlock: when we have no threads to run, there are no
946      * threads blocked, waiting for I/O, or sleeping, and all the
947      * other tasks are waiting for work, we must have a deadlock of
948      * some description.
949      */
950     if ( emptyThreadQueues(cap) )
951     {
952 #if defined(THREADED_RTS)
953         /* 
954          * In the threaded RTS, we only check for deadlock if there
955          * has been no activity in a complete timeslice.  This means
956          * we won't eagerly start a full GC just because we don't have
957          * any threads to run currently.
958          */
959         if (recent_activity != ACTIVITY_INACTIVE) return;
960 #endif
961
962         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
963
964         // Garbage collection can release some new threads due to
965         // either (a) finalizers or (b) threads resurrected because
966         // they are unreachable and will therefore be sent an
967         // exception.  Any threads thus released will be immediately
968         // runnable.
969         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
970
971         recent_activity = ACTIVITY_DONE_GC;
972         
973         if ( !emptyRunQueue(cap) ) return;
974
975 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
976         /* If we have user-installed signal handlers, then wait
977          * for signals to arrive rather then bombing out with a
978          * deadlock.
979          */
980         if ( anyUserHandlers() ) {
981             debugTrace(DEBUG_sched,
982                        "still deadlocked, waiting for signals...");
983
984             awaitUserSignals();
985
986             if (signals_pending()) {
987                 startSignalHandlers(cap);
988             }
989
990             // either we have threads to run, or we were interrupted:
991             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
992         }
993 #endif
994
995 #if !defined(THREADED_RTS)
996         /* Probably a real deadlock.  Send the current main thread the
997          * Deadlock exception.
998          */
999         if (task->tso) {
1000             switch (task->tso->why_blocked) {
1001             case BlockedOnSTM:
1002             case BlockedOnBlackHole:
1003             case BlockedOnException:
1004             case BlockedOnMVar:
1005                 throwToSingleThreaded(cap, task->tso, 
1006                                       (StgClosure *)NonTermination_closure);
1007                 return;
1008             default:
1009                 barf("deadlock: main thread blocked in a strange way");
1010             }
1011         }
1012         return;
1013 #endif
1014     }
1015 }
1016
1017 /* ----------------------------------------------------------------------------
1018  * Process an event (GRAN only)
1019  * ------------------------------------------------------------------------- */
1020
1021 #if defined(GRAN)
1022 static StgTSO *
1023 scheduleProcessEvent(rtsEvent *event)
1024 {
1025     StgTSO *t;
1026
1027     if (RtsFlags.GranFlags.Light)
1028       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1029
1030     /* adjust time based on time-stamp */
1031     if (event->time > CurrentTime[CurrentProc] &&
1032         event->evttype != ContinueThread)
1033       CurrentTime[CurrentProc] = event->time;
1034     
1035     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1036     if (!RtsFlags.GranFlags.Light)
1037       handleIdlePEs();
1038
1039     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1040
1041     /* main event dispatcher in GranSim */
1042     switch (event->evttype) {
1043       /* Should just be continuing execution */
1044     case ContinueThread:
1045       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1046       /* ToDo: check assertion
1047       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1048              run_queue_hd != END_TSO_QUEUE);
1049       */
1050       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1051       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1052           procStatus[CurrentProc]==Fetching) {
1053         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1054               CurrentTSO->id, CurrentTSO, CurrentProc);
1055         goto next_thread;
1056       } 
1057       /* Ignore ContinueThreads for completed threads */
1058       if (CurrentTSO->what_next == ThreadComplete) {
1059         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1060               CurrentTSO->id, CurrentTSO, CurrentProc);
1061         goto next_thread;
1062       } 
1063       /* Ignore ContinueThreads for threads that are being migrated */
1064       if (PROCS(CurrentTSO)==Nowhere) { 
1065         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1066               CurrentTSO->id, CurrentTSO, CurrentProc);
1067         goto next_thread;
1068       }
1069       /* The thread should be at the beginning of the run queue */
1070       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1071         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1072               CurrentTSO->id, CurrentTSO, CurrentProc);
1073         break; // run the thread anyway
1074       }
1075       /*
1076       new_event(proc, proc, CurrentTime[proc],
1077                 FindWork,
1078                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1079       goto next_thread; 
1080       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1081       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1082
1083     case FetchNode:
1084       do_the_fetchnode(event);
1085       goto next_thread;             /* handle next event in event queue  */
1086       
1087     case GlobalBlock:
1088       do_the_globalblock(event);
1089       goto next_thread;             /* handle next event in event queue  */
1090       
1091     case FetchReply:
1092       do_the_fetchreply(event);
1093       goto next_thread;             /* handle next event in event queue  */
1094       
1095     case UnblockThread:   /* Move from the blocked queue to the tail of */
1096       do_the_unblock(event);
1097       goto next_thread;             /* handle next event in event queue  */
1098       
1099     case ResumeThread:  /* Move from the blocked queue to the tail of */
1100       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1101       event->tso->gran.blocktime += 
1102         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1103       do_the_startthread(event);
1104       goto next_thread;             /* handle next event in event queue  */
1105       
1106     case StartThread:
1107       do_the_startthread(event);
1108       goto next_thread;             /* handle next event in event queue  */
1109       
1110     case MoveThread:
1111       do_the_movethread(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case MoveSpark:
1115       do_the_movespark(event);
1116       goto next_thread;             /* handle next event in event queue  */
1117       
1118     case FindWork:
1119       do_the_findwork(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     default:
1123       barf("Illegal event type %u\n", event->evttype);
1124     }  /* switch */
1125     
1126     /* This point was scheduler_loop in the old RTS */
1127
1128     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1129
1130     TimeOfLastEvent = CurrentTime[CurrentProc];
1131     TimeOfNextEvent = get_time_of_next_event();
1132     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1133     // CurrentTSO = ThreadQueueHd;
1134
1135     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1136                          TimeOfNextEvent));
1137
1138     if (RtsFlags.GranFlags.Light) 
1139       GranSimLight_leave_system(event, &ActiveTSO); 
1140
1141     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1142
1143     IF_DEBUG(gran, 
1144              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1145
1146     /* in a GranSim setup the TSO stays on the run queue */
1147     t = CurrentTSO;
1148     /* Take a thread from the run queue. */
1149     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1150
1151     IF_DEBUG(gran, 
1152              debugBelch("GRAN: About to run current thread, which is\n");
1153              G_TSO(t,5));
1154
1155     context_switch = 0; // turned on via GranYield, checking events and time slice
1156
1157     IF_DEBUG(gran, 
1158              DumpGranEvent(GR_SCHEDULE, t));
1159
1160     procStatus[CurrentProc] = Busy;
1161 }
1162 #endif // GRAN
1163
1164 /* ----------------------------------------------------------------------------
1165  * Send pending messages (PARALLEL_HASKELL only)
1166  * ------------------------------------------------------------------------- */
1167
1168 #if defined(PARALLEL_HASKELL)
1169 static StgTSO *
1170 scheduleSendPendingMessages(void)
1171 {
1172     StgSparkPool *pool;
1173     rtsSpark spark;
1174     StgTSO *t;
1175
1176 # if defined(PAR) // global Mem.Mgmt., omit for now
1177     if (PendingFetches != END_BF_QUEUE) {
1178         processFetches();
1179     }
1180 # endif
1181     
1182     if (RtsFlags.ParFlags.BufferTime) {
1183         // if we use message buffering, we must send away all message
1184         // packets which have become too old...
1185         sendOldBuffers(); 
1186     }
1187 }
1188 #endif
1189
1190 /* ----------------------------------------------------------------------------
1191  * Activate spark threads (PARALLEL_HASKELL only)
1192  * ------------------------------------------------------------------------- */
1193
1194 #if defined(PARALLEL_HASKELL)
1195 static void
1196 scheduleActivateSpark(void)
1197 {
1198 #if defined(SPARKS)
1199   ASSERT(emptyRunQueue());
1200 /* We get here if the run queue is empty and want some work.
1201    We try to turn a spark into a thread, and add it to the run queue,
1202    from where it will be picked up in the next iteration of the scheduler
1203    loop.
1204 */
1205
1206       /* :-[  no local threads => look out for local sparks */
1207       /* the spark pool for the current PE */
1208       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1209       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1210           pool->hd < pool->tl) {
1211         /* 
1212          * ToDo: add GC code check that we really have enough heap afterwards!!
1213          * Old comment:
1214          * If we're here (no runnable threads) and we have pending
1215          * sparks, we must have a space problem.  Get enough space
1216          * to turn one of those pending sparks into a
1217          * thread... 
1218          */
1219
1220         spark = findSpark(rtsFalse);            /* get a spark */
1221         if (spark != (rtsSpark) NULL) {
1222           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1223           IF_PAR_DEBUG(fish, // schedule,
1224                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1225                              tso->id, tso, advisory_thread_count));
1226
1227           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1228             IF_PAR_DEBUG(fish, // schedule,
1229                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1230                             spark));
1231             return rtsFalse; /* failed to generate a thread */
1232           }                  /* otherwise fall through & pick-up new tso */
1233         } else {
1234           IF_PAR_DEBUG(fish, // schedule,
1235                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1236                              spark_queue_len(pool)));
1237           return rtsFalse;  /* failed to generate a thread */
1238         }
1239         return rtsTrue;  /* success in generating a thread */
1240   } else { /* no more threads permitted or pool empty */
1241     return rtsFalse;  /* failed to generateThread */
1242   }
1243 #else
1244   tso = NULL; // avoid compiler warning only
1245   return rtsFalse;  /* dummy in non-PAR setup */
1246 #endif // SPARKS
1247 }
1248 #endif // PARALLEL_HASKELL
1249
1250 /* ----------------------------------------------------------------------------
1251  * Get work from a remote node (PARALLEL_HASKELL only)
1252  * ------------------------------------------------------------------------- */
1253     
1254 #if defined(PARALLEL_HASKELL)
1255 static rtsBool
1256 scheduleGetRemoteWork(rtsBool *receivedFinish)
1257 {
1258   ASSERT(emptyRunQueue());
1259
1260   if (RtsFlags.ParFlags.BufferTime) {
1261         IF_PAR_DEBUG(verbose, 
1262                 debugBelch("...send all pending data,"));
1263         {
1264           nat i;
1265           for (i=1; i<=nPEs; i++)
1266             sendImmediately(i); // send all messages away immediately
1267         }
1268   }
1269 # ifndef SPARKS
1270         //++EDEN++ idle() , i.e. send all buffers, wait for work
1271         // suppress fishing in EDEN... just look for incoming messages
1272         // (blocking receive)
1273   IF_PAR_DEBUG(verbose, 
1274                debugBelch("...wait for incoming messages...\n"));
1275   *receivedFinish = processMessages(); // blocking receive...
1276
1277         // and reenter scheduling loop after having received something
1278         // (return rtsFalse below)
1279
1280 # else /* activate SPARKS machinery */
1281 /* We get here, if we have no work, tried to activate a local spark, but still
1282    have no work. We try to get a remote spark, by sending a FISH message.
1283    Thread migration should be added here, and triggered when a sequence of 
1284    fishes returns without work. */
1285         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1286
1287       /* =8-[  no local sparks => look for work on other PEs */
1288         /*
1289          * We really have absolutely no work.  Send out a fish
1290          * (there may be some out there already), and wait for
1291          * something to arrive.  We clearly can't run any threads
1292          * until a SCHEDULE or RESUME arrives, and so that's what
1293          * we're hoping to see.  (Of course, we still have to
1294          * respond to other types of messages.)
1295          */
1296         rtsTime now = msTime() /*CURRENT_TIME*/;
1297         IF_PAR_DEBUG(verbose, 
1298                      debugBelch("--  now=%ld\n", now));
1299         IF_PAR_DEBUG(fish, // verbose,
1300              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1301                  (last_fish_arrived_at!=0 &&
1302                   last_fish_arrived_at+delay > now)) {
1303                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1304                      now, last_fish_arrived_at+delay, 
1305                      last_fish_arrived_at,
1306                      delay);
1307              });
1308   
1309         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1310             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1311           if (last_fish_arrived_at==0 ||
1312               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1313             /* outstandingFishes is set in sendFish, processFish;
1314                avoid flooding system with fishes via delay */
1315     next_fish_to_send_at = 0;  
1316   } else {
1317     /* ToDo: this should be done in the main scheduling loop to avoid the
1318              busy wait here; not so bad if fish delay is very small  */
1319     int iq = 0; // DEBUGGING -- HWL
1320     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1321     /* send a fish when ready, but process messages that arrive in the meantime */
1322     do {
1323       if (PacketsWaiting()) {
1324         iq++; // DEBUGGING
1325         *receivedFinish = processMessages();
1326       }
1327       now = msTime();
1328     } while (!*receivedFinish || now<next_fish_to_send_at);
1329     // JB: This means the fish could become obsolete, if we receive
1330     // work. Better check for work again? 
1331     // last line: while (!receivedFinish || !haveWork || now<...)
1332     // next line: if (receivedFinish || haveWork )
1333
1334     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1335       return rtsFalse;  // NB: this will leave scheduler loop
1336                         // immediately after return!
1337                           
1338     IF_PAR_DEBUG(fish, // verbose,
1339                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1340
1341   }
1342
1343     // JB: IMHO, this should all be hidden inside sendFish(...)
1344     /* pe = choosePE(); 
1345        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1346                 NEW_FISH_HUNGER);
1347
1348     // Global statistics: count no. of fishes
1349     if (RtsFlags.ParFlags.ParStats.Global &&
1350          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1351            globalParStats.tot_fish_mess++;
1352            }
1353     */ 
1354
1355   /* delayed fishes must have been sent by now! */
1356   next_fish_to_send_at = 0;  
1357   }
1358       
1359   *receivedFinish = processMessages();
1360 # endif /* SPARKS */
1361
1362  return rtsFalse;
1363  /* NB: this function always returns rtsFalse, meaning the scheduler
1364     loop continues with the next iteration; 
1365     rationale: 
1366       return code means success in finding work; we enter this function
1367       if there is no local work, thus have to send a fish which takes
1368       time until it arrives with work; in the meantime we should process
1369       messages in the main loop;
1370  */
1371 }
1372 #endif // PARALLEL_HASKELL
1373
1374 /* ----------------------------------------------------------------------------
1375  * PAR/GRAN: Report stats & debugging info(?)
1376  * ------------------------------------------------------------------------- */
1377
1378 #if defined(PAR) || defined(GRAN)
1379 static void
1380 scheduleGranParReport(void)
1381 {
1382   ASSERT(run_queue_hd != END_TSO_QUEUE);
1383
1384   /* Take a thread from the run queue, if we have work */
1385   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1386
1387     /* If this TSO has got its outport closed in the meantime, 
1388      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1389      * It has to be marked as TH_DEAD for this purpose.
1390      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1391
1392 JB: TODO: investigate wether state change field could be nuked
1393      entirely and replaced by the normal tso state (whatnext
1394      field). All we want to do is to kill tsos from outside.
1395      */
1396
1397     /* ToDo: write something to the log-file
1398     if (RTSflags.ParFlags.granSimStats && !sameThread)
1399         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1400
1401     CurrentTSO = t;
1402     */
1403     /* the spark pool for the current PE */
1404     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1405
1406     IF_DEBUG(scheduler, 
1407              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1408                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1409
1410     IF_PAR_DEBUG(fish,
1411              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1412                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1413
1414     if (RtsFlags.ParFlags.ParStats.Full && 
1415         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1416         (emitSchedule || // forced emit
1417          (t && LastTSO && t->id != LastTSO->id))) {
1418       /* 
1419          we are running a different TSO, so write a schedule event to log file
1420          NB: If we use fair scheduling we also have to write  a deschedule 
1421              event for LastTSO; with unfair scheduling we know that the
1422              previous tso has blocked whenever we switch to another tso, so
1423              we don't need it in GUM for now
1424       */
1425       IF_PAR_DEBUG(fish, // schedule,
1426                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1427
1428       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1429                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1430       emitSchedule = rtsFalse;
1431     }
1432 }     
1433 #endif
1434
1435 /* ----------------------------------------------------------------------------
1436  * After running a thread...
1437  * ------------------------------------------------------------------------- */
1438
1439 static void
1440 schedulePostRunThread(void)
1441 {
1442 #if defined(PAR)
1443     /* HACK 675: if the last thread didn't yield, make sure to print a 
1444        SCHEDULE event to the log file when StgRunning the next thread, even
1445        if it is the same one as before */
1446     LastTSO = t; 
1447     TimeOfLastYield = CURRENT_TIME;
1448 #endif
1449
1450   /* some statistics gathering in the parallel case */
1451
1452 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1453   switch (ret) {
1454     case HeapOverflow:
1455 # if defined(GRAN)
1456       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1457       globalGranStats.tot_heapover++;
1458 # elif defined(PAR)
1459       globalParStats.tot_heapover++;
1460 # endif
1461       break;
1462
1463      case StackOverflow:
1464 # if defined(GRAN)
1465       IF_DEBUG(gran, 
1466                DumpGranEvent(GR_DESCHEDULE, t));
1467       globalGranStats.tot_stackover++;
1468 # elif defined(PAR)
1469       // IF_DEBUG(par, 
1470       // DumpGranEvent(GR_DESCHEDULE, t);
1471       globalParStats.tot_stackover++;
1472 # endif
1473       break;
1474
1475     case ThreadYielding:
1476 # if defined(GRAN)
1477       IF_DEBUG(gran, 
1478                DumpGranEvent(GR_DESCHEDULE, t));
1479       globalGranStats.tot_yields++;
1480 # elif defined(PAR)
1481       // IF_DEBUG(par, 
1482       // DumpGranEvent(GR_DESCHEDULE, t);
1483       globalParStats.tot_yields++;
1484 # endif
1485       break; 
1486
1487     case ThreadBlocked:
1488 # if defined(GRAN)
1489         debugTrace(DEBUG_sched, 
1490                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1491                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1492                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1493                if (t->block_info.closure!=(StgClosure*)NULL)
1494                  print_bq(t->block_info.closure);
1495                debugBelch("\n"));
1496
1497       // ??? needed; should emit block before
1498       IF_DEBUG(gran, 
1499                DumpGranEvent(GR_DESCHEDULE, t)); 
1500       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1501       /*
1502         ngoq Dogh!
1503       ASSERT(procStatus[CurrentProc]==Busy || 
1504               ((procStatus[CurrentProc]==Fetching) && 
1505               (t->block_info.closure!=(StgClosure*)NULL)));
1506       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1507           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1508             procStatus[CurrentProc]==Fetching)) 
1509         procStatus[CurrentProc] = Idle;
1510       */
1511 # elif defined(PAR)
1512 //++PAR++  blockThread() writes the event (change?)
1513 # endif
1514     break;
1515
1516   case ThreadFinished:
1517     break;
1518
1519   default:
1520     barf("parGlobalStats: unknown return code");
1521     break;
1522     }
1523 #endif
1524 }
1525
1526 /* -----------------------------------------------------------------------------
1527  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1528  * -------------------------------------------------------------------------- */
1529
1530 static rtsBool
1531 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1532 {
1533     // did the task ask for a large block?
1534     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1535         // if so, get one and push it on the front of the nursery.
1536         bdescr *bd;
1537         lnat blocks;
1538         
1539         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1540         
1541         debugTrace(DEBUG_sched,
1542                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1543                    (long)t->id, whatNext_strs[t->what_next], blocks);
1544     
1545         // don't do this if the nursery is (nearly) full, we'll GC first.
1546         if (cap->r.rCurrentNursery->link != NULL ||
1547             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1548                                                // if the nursery has only one block.
1549             
1550             ACQUIRE_SM_LOCK
1551             bd = allocGroup( blocks );
1552             RELEASE_SM_LOCK
1553             cap->r.rNursery->n_blocks += blocks;
1554             
1555             // link the new group into the list
1556             bd->link = cap->r.rCurrentNursery;
1557             bd->u.back = cap->r.rCurrentNursery->u.back;
1558             if (cap->r.rCurrentNursery->u.back != NULL) {
1559                 cap->r.rCurrentNursery->u.back->link = bd;
1560             } else {
1561 #if !defined(THREADED_RTS)
1562                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1563                        g0s0 == cap->r.rNursery);
1564 #endif
1565                 cap->r.rNursery->blocks = bd;
1566             }             
1567             cap->r.rCurrentNursery->u.back = bd;
1568             
1569             // initialise it as a nursery block.  We initialise the
1570             // step, gen_no, and flags field of *every* sub-block in
1571             // this large block, because this is easier than making
1572             // sure that we always find the block head of a large
1573             // block whenever we call Bdescr() (eg. evacuate() and
1574             // isAlive() in the GC would both have to do this, at
1575             // least).
1576             { 
1577                 bdescr *x;
1578                 for (x = bd; x < bd + blocks; x++) {
1579                     x->step = cap->r.rNursery;
1580                     x->gen_no = 0;
1581                     x->flags = 0;
1582                 }
1583             }
1584             
1585             // This assert can be a killer if the app is doing lots
1586             // of large block allocations.
1587             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1588             
1589             // now update the nursery to point to the new block
1590             cap->r.rCurrentNursery = bd;
1591             
1592             // we might be unlucky and have another thread get on the
1593             // run queue before us and steal the large block, but in that
1594             // case the thread will just end up requesting another large
1595             // block.
1596             pushOnRunQueue(cap,t);
1597             return rtsFalse;  /* not actually GC'ing */
1598         }
1599     }
1600     
1601     debugTrace(DEBUG_sched,
1602                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1603                (long)t->id, whatNext_strs[t->what_next]);
1604
1605 #if defined(GRAN)
1606     ASSERT(!is_on_queue(t,CurrentProc));
1607 #elif defined(PARALLEL_HASKELL)
1608     /* Currently we emit a DESCHEDULE event before GC in GUM.
1609        ToDo: either add separate event to distinguish SYSTEM time from rest
1610        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1611     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1612         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1613                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1614         emitSchedule = rtsTrue;
1615     }
1616 #endif
1617       
1618     pushOnRunQueue(cap,t);
1619     return rtsTrue;
1620     /* actual GC is done at the end of the while loop in schedule() */
1621 }
1622
1623 /* -----------------------------------------------------------------------------
1624  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1625  * -------------------------------------------------------------------------- */
1626
1627 static void
1628 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1629 {
1630     debugTrace (DEBUG_sched,
1631                 "--<< thread %ld (%s) stopped, StackOverflow", 
1632                 (long)t->id, whatNext_strs[t->what_next]);
1633
1634     /* just adjust the stack for this thread, then pop it back
1635      * on the run queue.
1636      */
1637     { 
1638         /* enlarge the stack */
1639         StgTSO *new_t = threadStackOverflow(cap, t);
1640         
1641         /* The TSO attached to this Task may have moved, so update the
1642          * pointer to it.
1643          */
1644         if (task->tso == t) {
1645             task->tso = new_t;
1646         }
1647         pushOnRunQueue(cap,new_t);
1648     }
1649 }
1650
1651 /* -----------------------------------------------------------------------------
1652  * Handle a thread that returned to the scheduler with ThreadYielding
1653  * -------------------------------------------------------------------------- */
1654
1655 static rtsBool
1656 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1657 {
1658     // Reset the context switch flag.  We don't do this just before
1659     // running the thread, because that would mean we would lose ticks
1660     // during GC, which can lead to unfair scheduling (a thread hogs
1661     // the CPU because the tick always arrives during GC).  This way
1662     // penalises threads that do a lot of allocation, but that seems
1663     // better than the alternative.
1664     context_switch = 0;
1665     
1666     /* put the thread back on the run queue.  Then, if we're ready to
1667      * GC, check whether this is the last task to stop.  If so, wake
1668      * up the GC thread.  getThread will block during a GC until the
1669      * GC is finished.
1670      */
1671 #ifdef DEBUG
1672     if (t->what_next != prev_what_next) {
1673         debugTrace(DEBUG_sched,
1674                    "--<< thread %ld (%s) stopped to switch evaluators", 
1675                    (long)t->id, whatNext_strs[t->what_next]);
1676     } else {
1677         debugTrace(DEBUG_sched,
1678                    "--<< thread %ld (%s) stopped, yielding",
1679                    (long)t->id, whatNext_strs[t->what_next]);
1680     }
1681 #endif
1682     
1683     IF_DEBUG(sanity,
1684              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1685              checkTSO(t));
1686     ASSERT(t->link == END_TSO_QUEUE);
1687     
1688     // Shortcut if we're just switching evaluators: don't bother
1689     // doing stack squeezing (which can be expensive), just run the
1690     // thread.
1691     if (t->what_next != prev_what_next) {
1692         return rtsTrue;
1693     }
1694     
1695 #if defined(GRAN)
1696     ASSERT(!is_on_queue(t,CurrentProc));
1697       
1698     IF_DEBUG(sanity,
1699              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1700              checkThreadQsSanity(rtsTrue));
1701
1702 #endif
1703
1704     addToRunQueue(cap,t);
1705
1706 #if defined(GRAN)
1707     /* add a ContinueThread event to actually process the thread */
1708     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1709               ContinueThread,
1710               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1711     IF_GRAN_DEBUG(bq, 
1712                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1713                   G_EVENTQ(0);
1714                   G_CURR_THREADQ(0));
1715 #endif
1716     return rtsFalse;
1717 }
1718
1719 /* -----------------------------------------------------------------------------
1720  * Handle a thread that returned to the scheduler with ThreadBlocked
1721  * -------------------------------------------------------------------------- */
1722
1723 static void
1724 scheduleHandleThreadBlocked( StgTSO *t
1725 #if !defined(GRAN) && !defined(DEBUG)
1726     STG_UNUSED
1727 #endif
1728     )
1729 {
1730 #if defined(GRAN)
1731     IF_DEBUG(scheduler,
1732              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1733                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1734              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1735     
1736     // ??? needed; should emit block before
1737     IF_DEBUG(gran, 
1738              DumpGranEvent(GR_DESCHEDULE, t)); 
1739     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1740     /*
1741       ngoq Dogh!
1742       ASSERT(procStatus[CurrentProc]==Busy || 
1743       ((procStatus[CurrentProc]==Fetching) && 
1744       (t->block_info.closure!=(StgClosure*)NULL)));
1745       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1746       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1747       procStatus[CurrentProc]==Fetching)) 
1748       procStatus[CurrentProc] = Idle;
1749     */
1750 #elif defined(PAR)
1751     IF_DEBUG(scheduler,
1752              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1753                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1754     IF_PAR_DEBUG(bq,
1755                  
1756                  if (t->block_info.closure!=(StgClosure*)NULL) 
1757                  print_bq(t->block_info.closure));
1758     
1759     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1760     blockThread(t);
1761     
1762     /* whatever we schedule next, we must log that schedule */
1763     emitSchedule = rtsTrue;
1764     
1765 #else /* !GRAN */
1766
1767       // We don't need to do anything.  The thread is blocked, and it
1768       // has tidied up its stack and placed itself on whatever queue
1769       // it needs to be on.
1770
1771     // ASSERT(t->why_blocked != NotBlocked);
1772     // Not true: for example,
1773     //    - in THREADED_RTS, the thread may already have been woken
1774     //      up by another Capability.  This actually happens: try
1775     //      conc023 +RTS -N2.
1776     //    - the thread may have woken itself up already, because
1777     //      threadPaused() might have raised a blocked throwTo
1778     //      exception, see maybePerformBlockedException().
1779
1780 #ifdef DEBUG
1781     if (traceClass(DEBUG_sched)) {
1782         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1783                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1784         printThreadBlockage(t);
1785         debugTraceEnd();
1786     }
1787 #endif
1788     
1789     /* Only for dumping event to log file 
1790        ToDo: do I need this in GranSim, too?
1791        blockThread(t);
1792     */
1793 #endif
1794 }
1795
1796 /* -----------------------------------------------------------------------------
1797  * Handle a thread that returned to the scheduler with ThreadFinished
1798  * -------------------------------------------------------------------------- */
1799
1800 static rtsBool
1801 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1802 {
1803     /* Need to check whether this was a main thread, and if so,
1804      * return with the return value.
1805      *
1806      * We also end up here if the thread kills itself with an
1807      * uncaught exception, see Exception.cmm.
1808      */
1809     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1810                (unsigned long)t->id, whatNext_strs[t->what_next]);
1811
1812     /* Inform the Hpc that a thread has finished */
1813     hs_hpc_event("Thread Finished",t);
1814
1815 #if defined(GRAN)
1816       endThread(t, CurrentProc); // clean-up the thread
1817 #elif defined(PARALLEL_HASKELL)
1818       /* For now all are advisory -- HWL */
1819       //if(t->priority==AdvisoryPriority) ??
1820       advisory_thread_count--; // JB: Caution with this counter, buggy!
1821       
1822 # if defined(DIST)
1823       if(t->dist.priority==RevalPriority)
1824         FinishReval(t);
1825 # endif
1826     
1827 # if defined(EDENOLD)
1828       // the thread could still have an outport... (BUG)
1829       if (t->eden.outport != -1) {
1830       // delete the outport for the tso which has finished...
1831         IF_PAR_DEBUG(eden_ports,
1832                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1833                               t->eden.outport, t->id));
1834         deleteOPT(t);
1835       }
1836       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1837       if (t->eden.epid != -1) {
1838         IF_PAR_DEBUG(eden_ports,
1839                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1840                            t->id, t->eden.epid));
1841         removeTSOfromProcess(t);
1842       }
1843 # endif 
1844
1845 # if defined(PAR)
1846       if (RtsFlags.ParFlags.ParStats.Full &&
1847           !RtsFlags.ParFlags.ParStats.Suppressed) 
1848         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1849
1850       //  t->par only contains statistics: left out for now...
1851       IF_PAR_DEBUG(fish,
1852                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1853                               t->id,t,t->par.sparkname));
1854 # endif
1855 #endif // PARALLEL_HASKELL
1856
1857       //
1858       // Check whether the thread that just completed was a bound
1859       // thread, and if so return with the result.  
1860       //
1861       // There is an assumption here that all thread completion goes
1862       // through this point; we need to make sure that if a thread
1863       // ends up in the ThreadKilled state, that it stays on the run
1864       // queue so it can be dealt with here.
1865       //
1866
1867       if (t->bound) {
1868
1869           if (t->bound != task) {
1870 #if !defined(THREADED_RTS)
1871               // Must be a bound thread that is not the topmost one.  Leave
1872               // it on the run queue until the stack has unwound to the
1873               // point where we can deal with this.  Leaving it on the run
1874               // queue also ensures that the garbage collector knows about
1875               // this thread and its return value (it gets dropped from the
1876               // all_threads list so there's no other way to find it).
1877               appendToRunQueue(cap,t);
1878               return rtsFalse;
1879 #else
1880               // this cannot happen in the threaded RTS, because a
1881               // bound thread can only be run by the appropriate Task.
1882               barf("finished bound thread that isn't mine");
1883 #endif
1884           }
1885
1886           ASSERT(task->tso == t);
1887
1888           if (t->what_next == ThreadComplete) {
1889               if (task->ret) {
1890                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1891                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1892               }
1893               task->stat = Success;
1894           } else {
1895               if (task->ret) {
1896                   *(task->ret) = NULL;
1897               }
1898               if (sched_state >= SCHED_INTERRUPTING) {
1899                   task->stat = Interrupted;
1900               } else {
1901                   task->stat = Killed;
1902               }
1903           }
1904 #ifdef DEBUG
1905           removeThreadLabel((StgWord)task->tso->id);
1906 #endif
1907           return rtsTrue; // tells schedule() to return
1908       }
1909
1910       return rtsFalse;
1911 }
1912
1913 /* -----------------------------------------------------------------------------
1914  * Perform a heap census, if PROFILING
1915  * -------------------------------------------------------------------------- */
1916
1917 static rtsBool
1918 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1919 {
1920 #if defined(PROFILING)
1921     // When we have +RTS -i0 and we're heap profiling, do a census at
1922     // every GC.  This lets us get repeatable runs for debugging.
1923     if (performHeapProfile ||
1924         (RtsFlags.ProfFlags.profileInterval==0 &&
1925          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1926
1927         // checking black holes is necessary before GC, otherwise
1928         // there may be threads that are unreachable except by the
1929         // blackhole queue, which the GC will consider to be
1930         // deadlocked.
1931         scheduleCheckBlackHoles(&MainCapability);
1932
1933         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1934         GarbageCollect(rtsTrue);
1935
1936         debugTrace(DEBUG_sched, "performing heap census");
1937         heapCensus();
1938
1939         performHeapProfile = rtsFalse;
1940         return rtsTrue;  // true <=> we already GC'd
1941     }
1942 #endif
1943     return rtsFalse;
1944 }
1945
1946 /* -----------------------------------------------------------------------------
1947  * Perform a garbage collection if necessary
1948  * -------------------------------------------------------------------------- */
1949
1950 static Capability *
1951 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1952 {
1953     StgTSO *t;
1954 #ifdef THREADED_RTS
1955     static volatile StgWord waiting_for_gc;
1956     rtsBool was_waiting;
1957     nat i;
1958 #endif
1959
1960 #ifdef THREADED_RTS
1961     // In order to GC, there must be no threads running Haskell code.
1962     // Therefore, the GC thread needs to hold *all* the capabilities,
1963     // and release them after the GC has completed.  
1964     //
1965     // This seems to be the simplest way: previous attempts involved
1966     // making all the threads with capabilities give up their
1967     // capabilities and sleep except for the *last* one, which
1968     // actually did the GC.  But it's quite hard to arrange for all
1969     // the other tasks to sleep and stay asleep.
1970     //
1971         
1972     was_waiting = cas(&waiting_for_gc, 0, 1);
1973     if (was_waiting) {
1974         do {
1975             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1976             if (cap) yieldCapability(&cap,task);
1977         } while (waiting_for_gc);
1978         return cap;  // NOTE: task->cap might have changed here
1979     }
1980
1981     for (i=0; i < n_capabilities; i++) {
1982         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1983         if (cap != &capabilities[i]) {
1984             Capability *pcap = &capabilities[i];
1985             // we better hope this task doesn't get migrated to
1986             // another Capability while we're waiting for this one.
1987             // It won't, because load balancing happens while we have
1988             // all the Capabilities, but even so it's a slightly
1989             // unsavoury invariant.
1990             task->cap = pcap;
1991             context_switch = 1;
1992             waitForReturnCapability(&pcap, task);
1993             if (pcap != &capabilities[i]) {
1994                 barf("scheduleDoGC: got the wrong capability");
1995             }
1996         }
1997     }
1998
1999     waiting_for_gc = rtsFalse;
2000 #endif
2001
2002     /* Kick any transactions which are invalid back to their
2003      * atomically frames.  When next scheduled they will try to
2004      * commit, this commit will fail and they will retry.
2005      */
2006     { 
2007         StgTSO *next;
2008
2009         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2010             if (t->what_next == ThreadRelocated) {
2011                 next = t->link;
2012             } else {
2013                 next = t->global_link;
2014                 
2015                 // This is a good place to check for blocked
2016                 // exceptions.  It might be the case that a thread is
2017                 // blocked on delivering an exception to a thread that
2018                 // is also blocked - we try to ensure that this
2019                 // doesn't happen in throwTo(), but it's too hard (or
2020                 // impossible) to close all the race holes, so we
2021                 // accept that some might get through and deal with
2022                 // them here.  A GC will always happen at some point,
2023                 // even if the system is otherwise deadlocked.
2024                 maybePerformBlockedException (&capabilities[0], t);
2025
2026                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2027                     if (!stmValidateNestOfTransactions (t -> trec)) {
2028                         debugTrace(DEBUG_sched | DEBUG_stm,
2029                                    "trec %p found wasting its time", t);
2030                         
2031                         // strip the stack back to the
2032                         // ATOMICALLY_FRAME, aborting the (nested)
2033                         // transaction, and saving the stack of any
2034                         // partially-evaluated thunks on the heap.
2035                         throwToSingleThreaded_(&capabilities[0], t, 
2036                                                NULL, rtsTrue, NULL);
2037                         
2038 #ifdef REG_R1
2039                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2040 #endif
2041                     }
2042                 }
2043             }
2044         }
2045     }
2046     
2047     // so this happens periodically:
2048     if (cap) scheduleCheckBlackHoles(cap);
2049     
2050     IF_DEBUG(scheduler, printAllThreads());
2051
2052     /*
2053      * We now have all the capabilities; if we're in an interrupting
2054      * state, then we should take the opportunity to delete all the
2055      * threads in the system.
2056      */
2057     if (sched_state >= SCHED_INTERRUPTING) {
2058         deleteAllThreads(&capabilities[0]);
2059         sched_state = SCHED_SHUTTING_DOWN;
2060     }
2061
2062     /* everybody back, start the GC.
2063      * Could do it in this thread, or signal a condition var
2064      * to do it in another thread.  Either way, we need to
2065      * broadcast on gc_pending_cond afterward.
2066      */
2067 #if defined(THREADED_RTS)
2068     debugTrace(DEBUG_sched, "doing GC");
2069 #endif
2070     GarbageCollect(force_major);
2071     
2072 #if defined(THREADED_RTS)
2073     // release our stash of capabilities.
2074     for (i = 0; i < n_capabilities; i++) {
2075         if (cap != &capabilities[i]) {
2076             task->cap = &capabilities[i];
2077             releaseCapability(&capabilities[i]);
2078         }
2079     }
2080     if (cap) {
2081         task->cap = cap;
2082     } else {
2083         task->cap = NULL;
2084     }
2085 #endif
2086
2087 #if defined(GRAN)
2088     /* add a ContinueThread event to continue execution of current thread */
2089     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2090               ContinueThread,
2091               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2092     IF_GRAN_DEBUG(bq, 
2093                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2094                   G_EVENTQ(0);
2095                   G_CURR_THREADQ(0));
2096 #endif /* GRAN */
2097
2098     return cap;
2099 }
2100
2101 /* ---------------------------------------------------------------------------
2102  * Singleton fork(). Do not copy any running threads.
2103  * ------------------------------------------------------------------------- */
2104
2105 StgInt
2106 forkProcess(HsStablePtr *entry
2107 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2108             STG_UNUSED
2109 #endif
2110            )
2111 {
2112 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2113     Task *task;
2114     pid_t pid;
2115     StgTSO* t,*next;
2116     Capability *cap;
2117     
2118 #if defined(THREADED_RTS)
2119     if (RtsFlags.ParFlags.nNodes > 1) {
2120         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2121         stg_exit(EXIT_FAILURE);
2122     }
2123 #endif
2124
2125     debugTrace(DEBUG_sched, "forking!");
2126     
2127     // ToDo: for SMP, we should probably acquire *all* the capabilities
2128     cap = rts_lock();
2129     
2130     pid = fork();
2131     
2132     if (pid) { // parent
2133         
2134         // just return the pid
2135         rts_unlock(cap);
2136         return pid;
2137         
2138     } else { // child
2139         
2140         // Now, all OS threads except the thread that forked are
2141         // stopped.  We need to stop all Haskell threads, including
2142         // those involved in foreign calls.  Also we need to delete
2143         // all Tasks, because they correspond to OS threads that are
2144         // now gone.
2145
2146         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2147             if (t->what_next == ThreadRelocated) {
2148                 next = t->link;
2149             } else {
2150                 next = t->global_link;
2151                 // don't allow threads to catch the ThreadKilled
2152                 // exception, but we do want to raiseAsync() because these
2153                 // threads may be evaluating thunks that we need later.
2154                 deleteThread_(cap,t);
2155             }
2156         }
2157         
2158         // Empty the run queue.  It seems tempting to let all the
2159         // killed threads stay on the run queue as zombies to be
2160         // cleaned up later, but some of them correspond to bound
2161         // threads for which the corresponding Task does not exist.
2162         cap->run_queue_hd = END_TSO_QUEUE;
2163         cap->run_queue_tl = END_TSO_QUEUE;
2164
2165         // Any suspended C-calling Tasks are no more, their OS threads
2166         // don't exist now:
2167         cap->suspended_ccalling_tasks = NULL;
2168
2169         // Empty the all_threads list.  Otherwise, the garbage
2170         // collector may attempt to resurrect some of these threads.
2171         all_threads = END_TSO_QUEUE;
2172
2173         // Wipe the task list, except the current Task.
2174         ACQUIRE_LOCK(&sched_mutex);
2175         for (task = all_tasks; task != NULL; task=task->all_link) {
2176             if (task != cap->running_task) {
2177                 discardTask(task);
2178             }
2179         }
2180         RELEASE_LOCK(&sched_mutex);
2181
2182 #if defined(THREADED_RTS)
2183         // Wipe our spare workers list, they no longer exist.  New
2184         // workers will be created if necessary.
2185         cap->spare_workers = NULL;
2186         cap->returning_tasks_hd = NULL;
2187         cap->returning_tasks_tl = NULL;
2188 #endif
2189
2190         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2191         rts_checkSchedStatus("forkProcess",cap);
2192         
2193         rts_unlock(cap);
2194         hs_exit();                      // clean up and exit
2195         stg_exit(EXIT_SUCCESS);
2196     }
2197 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2198     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2199     return -1;
2200 #endif
2201 }
2202
2203 /* ---------------------------------------------------------------------------
2204  * Delete all the threads in the system
2205  * ------------------------------------------------------------------------- */
2206    
2207 static void
2208 deleteAllThreads ( Capability *cap )
2209 {
2210     // NOTE: only safe to call if we own all capabilities.
2211
2212     StgTSO* t, *next;
2213     debugTrace(DEBUG_sched,"deleting all threads");
2214     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2215         if (t->what_next == ThreadRelocated) {
2216             next = t->link;
2217         } else {
2218             next = t->global_link;
2219             deleteThread(cap,t);
2220         }
2221     }      
2222
2223     // The run queue now contains a bunch of ThreadKilled threads.  We
2224     // must not throw these away: the main thread(s) will be in there
2225     // somewhere, and the main scheduler loop has to deal with it.
2226     // Also, the run queue is the only thing keeping these threads from
2227     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2228
2229 #if !defined(THREADED_RTS)
2230     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2231     ASSERT(sleeping_queue == END_TSO_QUEUE);
2232 #endif
2233 }
2234
2235 /* -----------------------------------------------------------------------------
2236    Managing the suspended_ccalling_tasks list.
2237    Locks required: sched_mutex
2238    -------------------------------------------------------------------------- */
2239
2240 STATIC_INLINE void
2241 suspendTask (Capability *cap, Task *task)
2242 {
2243     ASSERT(task->next == NULL && task->prev == NULL);
2244     task->next = cap->suspended_ccalling_tasks;
2245     task->prev = NULL;
2246     if (cap->suspended_ccalling_tasks) {
2247         cap->suspended_ccalling_tasks->prev = task;
2248     }
2249     cap->suspended_ccalling_tasks = task;
2250 }
2251
2252 STATIC_INLINE void
2253 recoverSuspendedTask (Capability *cap, Task *task)
2254 {
2255     if (task->prev) {
2256         task->prev->next = task->next;
2257     } else {
2258         ASSERT(cap->suspended_ccalling_tasks == task);
2259         cap->suspended_ccalling_tasks = task->next;
2260     }
2261     if (task->next) {
2262         task->next->prev = task->prev;
2263     }
2264     task->next = task->prev = NULL;
2265 }
2266
2267 /* ---------------------------------------------------------------------------
2268  * Suspending & resuming Haskell threads.
2269  * 
2270  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2271  * its capability before calling the C function.  This allows another
2272  * task to pick up the capability and carry on running Haskell
2273  * threads.  It also means that if the C call blocks, it won't lock
2274  * the whole system.
2275  *
2276  * The Haskell thread making the C call is put to sleep for the
2277  * duration of the call, on the susepended_ccalling_threads queue.  We
2278  * give out a token to the task, which it can use to resume the thread
2279  * on return from the C function.
2280  * ------------------------------------------------------------------------- */
2281    
2282 void *
2283 suspendThread (StgRegTable *reg)
2284 {
2285   Capability *cap;
2286   int saved_errno = errno;
2287   StgTSO *tso;
2288   Task *task;
2289
2290   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2291    */
2292   cap = regTableToCapability(reg);
2293
2294   task = cap->running_task;
2295   tso = cap->r.rCurrentTSO;
2296
2297   debugTrace(DEBUG_sched, 
2298              "thread %lu did a safe foreign call", 
2299              (unsigned long)cap->r.rCurrentTSO->id);
2300
2301   // XXX this might not be necessary --SDM
2302   tso->what_next = ThreadRunGHC;
2303
2304   threadPaused(cap,tso);
2305
2306   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2307       tso->why_blocked = BlockedOnCCall;
2308       tso->flags |= TSO_BLOCKEX;
2309       tso->flags &= ~TSO_INTERRUPTIBLE;
2310   } else {
2311       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2312   }
2313
2314   // Hand back capability
2315   task->suspended_tso = tso;
2316
2317   ACQUIRE_LOCK(&cap->lock);
2318
2319   suspendTask(cap,task);
2320   cap->in_haskell = rtsFalse;
2321   releaseCapability_(cap);
2322   
2323   RELEASE_LOCK(&cap->lock);
2324
2325 #if defined(THREADED_RTS)
2326   /* Preparing to leave the RTS, so ensure there's a native thread/task
2327      waiting to take over.
2328   */
2329   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2330 #endif
2331
2332   errno = saved_errno;
2333   return task;
2334 }
2335
2336 StgRegTable *
2337 resumeThread (void *task_)
2338 {
2339     StgTSO *tso;
2340     Capability *cap;
2341     int saved_errno = errno;
2342     Task *task = task_;
2343
2344     cap = task->cap;
2345     // Wait for permission to re-enter the RTS with the result.
2346     waitForReturnCapability(&cap,task);
2347     // we might be on a different capability now... but if so, our
2348     // entry on the suspended_ccalling_tasks list will also have been
2349     // migrated.
2350
2351     // Remove the thread from the suspended list
2352     recoverSuspendedTask(cap,task);
2353
2354     tso = task->suspended_tso;
2355     task->suspended_tso = NULL;
2356     tso->link = END_TSO_QUEUE;
2357     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2358     
2359     if (tso->why_blocked == BlockedOnCCall) {
2360         awakenBlockedExceptionQueue(cap,tso);
2361         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2362     }
2363     
2364     /* Reset blocking status */
2365     tso->why_blocked  = NotBlocked;
2366     
2367     cap->r.rCurrentTSO = tso;
2368     cap->in_haskell = rtsTrue;
2369     errno = saved_errno;
2370
2371     /* We might have GC'd, mark the TSO dirty again */
2372     dirtyTSO(tso);
2373
2374     IF_DEBUG(sanity, checkTSO(tso));
2375
2376     return &cap->r;
2377 }
2378
2379 /* ---------------------------------------------------------------------------
2380  * scheduleThread()
2381  *
2382  * scheduleThread puts a thread on the end  of the runnable queue.
2383  * This will usually be done immediately after a thread is created.
2384  * The caller of scheduleThread must create the thread using e.g.
2385  * createThread and push an appropriate closure
2386  * on this thread's stack before the scheduler is invoked.
2387  * ------------------------------------------------------------------------ */
2388
2389 void
2390 scheduleThread(Capability *cap, StgTSO *tso)
2391 {
2392     // The thread goes at the *end* of the run-queue, to avoid possible
2393     // starvation of any threads already on the queue.
2394     appendToRunQueue(cap,tso);
2395 }
2396
2397 void
2398 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2399 {
2400 #if defined(THREADED_RTS)
2401     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2402                               // move this thread from now on.
2403     cpu %= RtsFlags.ParFlags.nNodes;
2404     if (cpu == cap->no) {
2405         appendToRunQueue(cap,tso);
2406     } else {
2407         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2408     }
2409 #else
2410     appendToRunQueue(cap,tso);
2411 #endif
2412 }
2413
2414 Capability *
2415 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2416 {
2417     Task *task;
2418
2419     // We already created/initialised the Task
2420     task = cap->running_task;
2421
2422     // This TSO is now a bound thread; make the Task and TSO
2423     // point to each other.
2424     tso->bound = task;
2425     tso->cap = cap;
2426
2427     task->tso = tso;
2428     task->ret = ret;
2429     task->stat = NoStatus;
2430
2431     appendToRunQueue(cap,tso);
2432
2433     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2434
2435 #if defined(GRAN)
2436     /* GranSim specific init */
2437     CurrentTSO = m->tso;                // the TSO to run
2438     procStatus[MainProc] = Busy;        // status of main PE
2439     CurrentProc = MainProc;             // PE to run it on
2440 #endif
2441
2442     cap = schedule(cap,task);
2443
2444     ASSERT(task->stat != NoStatus);
2445     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2446
2447     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2448     return cap;
2449 }
2450
2451 /* ----------------------------------------------------------------------------
2452  * Starting Tasks
2453  * ------------------------------------------------------------------------- */
2454
2455 #if defined(THREADED_RTS)
2456 void
2457 workerStart(Task *task)
2458 {
2459     Capability *cap;
2460
2461     // See startWorkerTask().
2462     ACQUIRE_LOCK(&task->lock);
2463     cap = task->cap;
2464     RELEASE_LOCK(&task->lock);
2465
2466     // set the thread-local pointer to the Task:
2467     taskEnter(task);
2468
2469     // schedule() runs without a lock.
2470     cap = schedule(cap,task);
2471
2472     // On exit from schedule(), we have a Capability.
2473     releaseCapability(cap);
2474     workerTaskStop(task);
2475 }
2476 #endif
2477
2478 /* ---------------------------------------------------------------------------
2479  * initScheduler()
2480  *
2481  * Initialise the scheduler.  This resets all the queues - if the
2482  * queues contained any threads, they'll be garbage collected at the
2483  * next pass.
2484  *
2485  * ------------------------------------------------------------------------ */
2486
2487 void 
2488 initScheduler(void)
2489 {
2490 #if defined(GRAN)
2491   nat i;
2492   for (i=0; i<=MAX_PROC; i++) {
2493     run_queue_hds[i]      = END_TSO_QUEUE;
2494     run_queue_tls[i]      = END_TSO_QUEUE;
2495     blocked_queue_hds[i]  = END_TSO_QUEUE;
2496     blocked_queue_tls[i]  = END_TSO_QUEUE;
2497     ccalling_threadss[i]  = END_TSO_QUEUE;
2498     blackhole_queue[i]    = END_TSO_QUEUE;
2499     sleeping_queue        = END_TSO_QUEUE;
2500   }
2501 #elif !defined(THREADED_RTS)
2502   blocked_queue_hd  = END_TSO_QUEUE;
2503   blocked_queue_tl  = END_TSO_QUEUE;
2504   sleeping_queue    = END_TSO_QUEUE;
2505 #endif
2506
2507   blackhole_queue   = END_TSO_QUEUE;
2508   all_threads       = END_TSO_QUEUE;
2509
2510   context_switch = 0;
2511   sched_state    = SCHED_RUNNING;
2512
2513 #if defined(THREADED_RTS)
2514   /* Initialise the mutex and condition variables used by
2515    * the scheduler. */
2516   initMutex(&sched_mutex);
2517 #endif
2518   
2519   ACQUIRE_LOCK(&sched_mutex);
2520
2521   /* A capability holds the state a native thread needs in
2522    * order to execute STG code. At least one capability is
2523    * floating around (only THREADED_RTS builds have more than one).
2524    */
2525   initCapabilities();
2526
2527   initTaskManager();
2528
2529 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2530   initSparkPools();
2531 #endif
2532
2533 #if defined(THREADED_RTS)
2534   /*
2535    * Eagerly start one worker to run each Capability, except for
2536    * Capability 0.  The idea is that we're probably going to start a
2537    * bound thread on Capability 0 pretty soon, so we don't want a
2538    * worker task hogging it.
2539    */
2540   { 
2541       nat i;
2542       Capability *cap;
2543       for (i = 1; i < n_capabilities; i++) {
2544           cap = &capabilities[i];
2545           ACQUIRE_LOCK(&cap->lock);
2546           startWorkerTask(cap, workerStart);
2547           RELEASE_LOCK(&cap->lock);
2548       }
2549   }
2550 #endif
2551
2552   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2553
2554   RELEASE_LOCK(&sched_mutex);
2555 }
2556
2557 void
2558 exitScheduler( void )
2559 {
2560     Task *task = NULL;
2561
2562 #if defined(THREADED_RTS)
2563     ACQUIRE_LOCK(&sched_mutex);
2564     task = newBoundTask();
2565     RELEASE_LOCK(&sched_mutex);
2566 #endif
2567
2568     // If we haven't killed all the threads yet, do it now.
2569     if (sched_state < SCHED_SHUTTING_DOWN) {
2570         sched_state = SCHED_INTERRUPTING;
2571         scheduleDoGC(NULL,task,rtsFalse);    
2572     }
2573     sched_state = SCHED_SHUTTING_DOWN;
2574
2575 #if defined(THREADED_RTS)
2576     { 
2577         nat i;
2578         
2579         for (i = 0; i < n_capabilities; i++) {
2580             shutdownCapability(&capabilities[i], task);
2581         }
2582         boundTaskExiting(task);
2583         stopTaskManager();
2584     }
2585 #else
2586     freeCapability(&MainCapability);
2587 #endif
2588 }
2589
2590 void
2591 freeScheduler( void )
2592 {
2593     freeTaskManager();
2594     if (n_capabilities != 1) {
2595         stgFree(capabilities);
2596     }
2597 #if defined(THREADED_RTS)
2598     closeMutex(&sched_mutex);
2599 #endif
2600 }
2601
2602 /* ---------------------------------------------------------------------------
2603    Where are the roots that we know about?
2604
2605         - all the threads on the runnable queue
2606         - all the threads on the blocked queue
2607         - all the threads on the sleeping queue
2608         - all the thread currently executing a _ccall_GC
2609         - all the "main threads"
2610      
2611    ------------------------------------------------------------------------ */
2612
2613 /* This has to be protected either by the scheduler monitor, or by the
2614         garbage collection monitor (probably the latter).
2615         KH @ 25/10/99
2616 */
2617
2618 void
2619 GetRoots( evac_fn evac )
2620 {
2621     nat i;
2622     Capability *cap;
2623     Task *task;
2624
2625 #if defined(GRAN)
2626     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2627         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2628             evac((StgClosure **)&run_queue_hds[i]);
2629         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2630             evac((StgClosure **)&run_queue_tls[i]);
2631         
2632         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2633             evac((StgClosure **)&blocked_queue_hds[i]);
2634         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2635             evac((StgClosure **)&blocked_queue_tls[i]);
2636         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2637             evac((StgClosure **)&ccalling_threads[i]);
2638     }
2639
2640     markEventQueue();
2641
2642 #else /* !GRAN */
2643
2644     for (i = 0; i < n_capabilities; i++) {
2645         cap = &capabilities[i];
2646         evac((StgClosure **)(void *)&cap->run_queue_hd);
2647         evac((StgClosure **)(void *)&cap->run_queue_tl);
2648 #if defined(THREADED_RTS)
2649         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2650         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2651 #endif
2652         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2653              task=task->next) {
2654             debugTrace(DEBUG_sched,
2655                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2656             evac((StgClosure **)(void *)&task->suspended_tso);
2657         }
2658
2659     }
2660     
2661
2662 #if !defined(THREADED_RTS)
2663     evac((StgClosure **)(void *)&blocked_queue_hd);
2664     evac((StgClosure **)(void *)&blocked_queue_tl);
2665     evac((StgClosure **)(void *)&sleeping_queue);
2666 #endif 
2667 #endif
2668
2669     // evac((StgClosure **)&blackhole_queue);
2670
2671 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2672     markSparkQueue(evac);
2673 #endif
2674     
2675 #if defined(RTS_USER_SIGNALS)
2676     // mark the signal handlers (signals should be already blocked)
2677     markSignalHandlers(evac);
2678 #endif
2679 }
2680
2681 /* -----------------------------------------------------------------------------
2682    performGC
2683
2684    This is the interface to the garbage collector from Haskell land.
2685    We provide this so that external C code can allocate and garbage
2686    collect when called from Haskell via _ccall_GC.
2687    -------------------------------------------------------------------------- */
2688
2689 static void
2690 performGC_(rtsBool force_major)
2691 {
2692     Task *task;
2693     // We must grab a new Task here, because the existing Task may be
2694     // associated with a particular Capability, and chained onto the 
2695     // suspended_ccalling_tasks queue.
2696     ACQUIRE_LOCK(&sched_mutex);
2697     task = newBoundTask();
2698     RELEASE_LOCK(&sched_mutex);
2699     scheduleDoGC(NULL,task,force_major);
2700     boundTaskExiting(task);
2701 }
2702
2703 void
2704 performGC(void)
2705 {
2706     performGC_(rtsFalse);
2707 }
2708
2709 void
2710 performMajorGC(void)
2711 {
2712     performGC_(rtsTrue);
2713 }
2714
2715 /* -----------------------------------------------------------------------------
2716    Stack overflow
2717
2718    If the thread has reached its maximum stack size, then raise the
2719    StackOverflow exception in the offending thread.  Otherwise
2720    relocate the TSO into a larger chunk of memory and adjust its stack
2721    size appropriately.
2722    -------------------------------------------------------------------------- */
2723
2724 static StgTSO *
2725 threadStackOverflow(Capability *cap, StgTSO *tso)
2726 {
2727   nat new_stack_size, stack_words;
2728   lnat new_tso_size;
2729   StgPtr new_sp;
2730   StgTSO *dest;
2731
2732   IF_DEBUG(sanity,checkTSO(tso));
2733
2734   // don't allow throwTo() to modify the blocked_exceptions queue
2735   // while we are moving the TSO:
2736   lockClosure((StgClosure *)tso);
2737
2738   if (tso->stack_size >= tso->max_stack_size) {
2739
2740       debugTrace(DEBUG_gc,
2741                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2742                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2743       IF_DEBUG(gc,
2744                /* If we're debugging, just print out the top of the stack */
2745                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2746                                                 tso->sp+64)));
2747
2748       // Send this thread the StackOverflow exception
2749       unlockTSO(tso);
2750       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2751       return tso;
2752   }
2753
2754   /* Try to double the current stack size.  If that takes us over the
2755    * maximum stack size for this thread, then use the maximum instead.
2756    * Finally round up so the TSO ends up as a whole number of blocks.
2757    */
2758   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2759   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2760                                        TSO_STRUCT_SIZE)/sizeof(W_);
2761   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2762   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2763
2764   debugTrace(DEBUG_sched, 
2765              "increasing stack size from %ld words to %d.",
2766              (long)tso->stack_size, new_stack_size);
2767
2768   dest = (StgTSO *)allocate(new_tso_size);
2769   TICK_ALLOC_TSO(new_stack_size,0);
2770
2771   /* copy the TSO block and the old stack into the new area */
2772   memcpy(dest,tso,TSO_STRUCT_SIZE);
2773   stack_words = tso->stack + tso->stack_size - tso->sp;
2774   new_sp = (P_)dest + new_tso_size - stack_words;
2775   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2776
2777   /* relocate the stack pointers... */
2778   dest->sp         = new_sp;
2779   dest->stack_size = new_stack_size;
2780         
2781   /* Mark the old TSO as relocated.  We have to check for relocated
2782    * TSOs in the garbage collector and any primops that deal with TSOs.
2783    *
2784    * It's important to set the sp value to just beyond the end
2785    * of the stack, so we don't attempt to scavenge any part of the
2786    * dead TSO's stack.
2787    */
2788   tso->what_next = ThreadRelocated;
2789   tso->link = dest;
2790   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2791   tso->why_blocked = NotBlocked;
2792
2793   IF_PAR_DEBUG(verbose,
2794                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2795                      tso->id, tso, tso->stack_size);
2796                /* If we're debugging, just print out the top of the stack */
2797                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2798                                                 tso->sp+64)));
2799   
2800   unlockTSO(dest);
2801   unlockTSO(tso);
2802
2803   IF_DEBUG(sanity,checkTSO(dest));
2804 #if 0
2805   IF_DEBUG(scheduler,printTSO(dest));
2806 #endif
2807
2808   return dest;
2809 }
2810
2811 /* ---------------------------------------------------------------------------
2812    Interrupt execution
2813    - usually called inside a signal handler so it mustn't do anything fancy.   
2814    ------------------------------------------------------------------------ */
2815
2816 void
2817 interruptStgRts(void)
2818 {
2819     sched_state = SCHED_INTERRUPTING;
2820     context_switch = 1;
2821     wakeUpRts();
2822 }
2823
2824 /* -----------------------------------------------------------------------------
2825    Wake up the RTS
2826    
2827    This function causes at least one OS thread to wake up and run the
2828    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2829    an external event has arrived that may need servicing (eg. a
2830    keyboard interrupt).
2831
2832    In the single-threaded RTS we don't do anything here; we only have
2833    one thread anyway, and the event that caused us to want to wake up
2834    will have interrupted any blocking system call in progress anyway.
2835    -------------------------------------------------------------------------- */
2836
2837 void
2838 wakeUpRts(void)
2839 {
2840 #if defined(THREADED_RTS)
2841     // This forces the IO Manager thread to wakeup, which will
2842     // in turn ensure that some OS thread wakes up and runs the
2843     // scheduler loop, which will cause a GC and deadlock check.
2844     ioManagerWakeup();
2845 #endif
2846 }
2847
2848 /* -----------------------------------------------------------------------------
2849  * checkBlackHoles()
2850  *
2851  * Check the blackhole_queue for threads that can be woken up.  We do
2852  * this periodically: before every GC, and whenever the run queue is
2853  * empty.
2854  *
2855  * An elegant solution might be to just wake up all the blocked
2856  * threads with awakenBlockedQueue occasionally: they'll go back to
2857  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2858  * doesn't give us a way to tell whether we've actually managed to
2859  * wake up any threads, so we would be busy-waiting.
2860  *
2861  * -------------------------------------------------------------------------- */
2862
2863 static rtsBool
2864 checkBlackHoles (Capability *cap)
2865 {
2866     StgTSO **prev, *t;
2867     rtsBool any_woke_up = rtsFalse;
2868     StgHalfWord type;
2869
2870     // blackhole_queue is global:
2871     ASSERT_LOCK_HELD(&sched_mutex);
2872
2873     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2874
2875     // ASSUMES: sched_mutex
2876     prev = &blackhole_queue;
2877     t = blackhole_queue;
2878     while (t != END_TSO_QUEUE) {
2879         ASSERT(t->why_blocked == BlockedOnBlackHole);
2880         type = get_itbl(t->block_info.closure)->type;
2881         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2882             IF_DEBUG(sanity,checkTSO(t));
2883             t = unblockOne(cap, t);
2884             // urk, the threads migrate to the current capability
2885             // here, but we'd like to keep them on the original one.
2886             *prev = t;
2887             any_woke_up = rtsTrue;
2888         } else {
2889             prev = &t->link;
2890             t = t->link;
2891         }
2892     }
2893
2894     return any_woke_up;
2895 }
2896
2897 /* -----------------------------------------------------------------------------
2898    Deleting threads
2899
2900    This is used for interruption (^C) and forking, and corresponds to
2901    raising an exception but without letting the thread catch the
2902    exception.
2903    -------------------------------------------------------------------------- */
2904
2905 static void 
2906 deleteThread (Capability *cap, StgTSO *tso)
2907 {
2908     // NOTE: must only be called on a TSO that we have exclusive
2909     // access to, because we will call throwToSingleThreaded() below.
2910     // The TSO must be on the run queue of the Capability we own, or 
2911     // we must own all Capabilities.
2912
2913     if (tso->why_blocked != BlockedOnCCall &&
2914         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2915         throwToSingleThreaded(cap,tso,NULL);
2916     }
2917 }
2918
2919 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2920 static void 
2921 deleteThread_(Capability *cap, StgTSO *tso)
2922 { // for forkProcess only:
2923   // like deleteThread(), but we delete threads in foreign calls, too.
2924
2925     if (tso->why_blocked == BlockedOnCCall ||
2926         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2927         unblockOne(cap,tso);
2928         tso->what_next = ThreadKilled;
2929     } else {
2930         deleteThread(cap,tso);
2931     }
2932 }
2933 #endif
2934
2935 /* -----------------------------------------------------------------------------
2936    raiseExceptionHelper
2937    
2938    This function is called by the raise# primitve, just so that we can
2939    move some of the tricky bits of raising an exception from C-- into
2940    C.  Who knows, it might be a useful re-useable thing here too.
2941    -------------------------------------------------------------------------- */
2942
2943 StgWord
2944 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2945 {
2946     Capability *cap = regTableToCapability(reg);
2947     StgThunk *raise_closure = NULL;
2948     StgPtr p, next;
2949     StgRetInfoTable *info;
2950     //
2951     // This closure represents the expression 'raise# E' where E
2952     // is the exception raise.  It is used to overwrite all the
2953     // thunks which are currently under evaluataion.
2954     //
2955
2956     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2957     // LDV profiling: stg_raise_info has THUNK as its closure
2958     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2959     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2960     // 1 does not cause any problem unless profiling is performed.
2961     // However, when LDV profiling goes on, we need to linearly scan
2962     // small object pool, where raise_closure is stored, so we should
2963     // use MIN_UPD_SIZE.
2964     //
2965     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2966     //                                 sizeofW(StgClosure)+1);
2967     //
2968
2969     //
2970     // Walk up the stack, looking for the catch frame.  On the way,
2971     // we update any closures pointed to from update frames with the
2972     // raise closure that we just built.
2973     //
2974     p = tso->sp;
2975     while(1) {
2976         info = get_ret_itbl((StgClosure *)p);
2977         next = p + stack_frame_sizeW((StgClosure *)p);
2978         switch (info->i.type) {
2979             
2980         case UPDATE_FRAME:
2981             // Only create raise_closure if we need to.
2982             if (raise_closure == NULL) {
2983                 raise_closure = 
2984                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2985                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2986                 raise_closure->payload[0] = exception;
2987             }
2988             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2989             p = next;
2990             continue;
2991
2992         case ATOMICALLY_FRAME:
2993             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2994             tso->sp = p;
2995             return ATOMICALLY_FRAME;
2996             
2997         case CATCH_FRAME:
2998             tso->sp = p;
2999             return CATCH_FRAME;
3000
3001         case CATCH_STM_FRAME:
3002             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3003             tso->sp = p;
3004             return CATCH_STM_FRAME;
3005             
3006         case STOP_FRAME:
3007             tso->sp = p;
3008             return STOP_FRAME;
3009
3010         case CATCH_RETRY_FRAME:
3011         default:
3012             p = next; 
3013             continue;
3014         }
3015     }
3016 }
3017
3018
3019 /* -----------------------------------------------------------------------------
3020    findRetryFrameHelper
3021
3022    This function is called by the retry# primitive.  It traverses the stack
3023    leaving tso->sp referring to the frame which should handle the retry.  
3024
3025    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3026    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3027
3028    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3029    create) because retries are not considered to be exceptions, despite the
3030    similar implementation.
3031
3032    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3033    not be created within memory transactions.
3034    -------------------------------------------------------------------------- */
3035
3036 StgWord
3037 findRetryFrameHelper (StgTSO *tso)
3038 {
3039   StgPtr           p, next;
3040   StgRetInfoTable *info;
3041
3042   p = tso -> sp;
3043   while (1) {
3044     info = get_ret_itbl((StgClosure *)p);
3045     next = p + stack_frame_sizeW((StgClosure *)p);
3046     switch (info->i.type) {
3047       
3048     case ATOMICALLY_FRAME:
3049         debugTrace(DEBUG_stm,
3050                    "found ATOMICALLY_FRAME at %p during retry", p);
3051         tso->sp = p;
3052         return ATOMICALLY_FRAME;
3053       
3054     case CATCH_RETRY_FRAME:
3055         debugTrace(DEBUG_stm,
3056                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3057         tso->sp = p;
3058         return CATCH_RETRY_FRAME;
3059       
3060     case CATCH_STM_FRAME: {
3061         debugTrace(DEBUG_stm,
3062                    "found CATCH_STM_FRAME at %p during retry", p);
3063         StgTRecHeader *trec = tso -> trec;
3064         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3065         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3066         stmAbortTransaction(tso -> cap, trec);
3067         stmFreeAbortedTRec(tso -> cap, trec);
3068         tso -> trec = outer;
3069         p = next; 
3070         continue;
3071     }
3072       
3073
3074     default:
3075       ASSERT(info->i.type != CATCH_FRAME);
3076       ASSERT(info->i.type != STOP_FRAME);
3077       p = next; 
3078       continue;
3079     }
3080   }
3081 }
3082
3083 /* -----------------------------------------------------------------------------
3084    resurrectThreads is called after garbage collection on the list of
3085    threads found to be garbage.  Each of these threads will be woken
3086    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3087    on an MVar, or NonTermination if the thread was blocked on a Black
3088    Hole.
3089
3090    Locks: assumes we hold *all* the capabilities.
3091    -------------------------------------------------------------------------- */
3092
3093 void
3094 resurrectThreads (StgTSO *threads)
3095 {
3096     StgTSO *tso, *next;
3097     Capability *cap;
3098
3099     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3100         next = tso->global_link;
3101         tso->global_link = all_threads;
3102         all_threads = tso;
3103         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3104         
3105         // Wake up the thread on the Capability it was last on
3106         cap = tso->cap;
3107         
3108         switch (tso->why_blocked) {
3109         case BlockedOnMVar:
3110         case BlockedOnException:
3111             /* Called by GC - sched_mutex lock is currently held. */
3112             throwToSingleThreaded(cap, tso,
3113                                   (StgClosure *)BlockedOnDeadMVar_closure);
3114             break;
3115         case BlockedOnBlackHole:
3116             throwToSingleThreaded(cap, tso,
3117                                   (StgClosure *)NonTermination_closure);
3118             break;
3119         case BlockedOnSTM:
3120             throwToSingleThreaded(cap, tso,
3121                                   (StgClosure *)BlockedIndefinitely_closure);
3122             break;
3123         case NotBlocked:
3124             /* This might happen if the thread was blocked on a black hole
3125              * belonging to a thread that we've just woken up (raiseAsync
3126              * can wake up threads, remember...).
3127              */
3128             continue;
3129         default:
3130             barf("resurrectThreads: thread blocked in a strange way");
3131         }
3132     }
3133 }