forkProcess: startTimer() in the child, because the timer will be reset
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #ifdef PROFILING
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #endif
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
37 # include "GranSim.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
41 # include "FetchMe.h"
42 # include "HLC.h"
43 #endif
44 #include "Sparks.h"
45 #include "Capability.h"
46 #include "Task.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
50 #endif
51 #include "Trace.h"
52 #include "RaiseAsync.h"
53 #include "Threads.h"
54 #include "ThrIOManager.h"
55
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
58 #endif
59 #ifdef HAVE_UNISTD_H
60 #include <unistd.h>
61 #endif
62
63 #include <string.h>
64 #include <stdlib.h>
65 #include <stdarg.h>
66
67 #ifdef HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 // Turn off inlining when debugging - it obfuscates things
72 #ifdef DEBUG
73 # undef  STATIC_INLINE
74 # define STATIC_INLINE static
75 #endif
76
77 /* -----------------------------------------------------------------------------
78  * Global variables
79  * -------------------------------------------------------------------------- */
80
81 #if defined(GRAN)
82
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86 /* 
87    In GranSim we have a runnable and a blocked queue for each processor.
88    In order to minimise code changes new arrays run_queue_hds/tls
89    are created. run_queue_hd is then a short cut (macro) for
90    run_queue_hds[CurrentProc] (see GranSim.h).
91    -- HWL
92 */
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97    the std RTS (i.e. we are cheating). However, we don't use this list in
98    the GranSim specific code at the moment (so we are only potentially
99    cheating).  */
100
101 #else /* !GRAN */
102
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
108 #endif
109
110 /* Threads blocked on blackholes.
111  * LOCK: sched_mutex+capability, or all capabilities
112  */
113 StgTSO *blackhole_queue = NULL;
114 #endif
115
116 /* The blackhole_queue should be checked for threads to wake up.  See
117  * Schedule.h for more thorough comment.
118  * LOCK: none (doesn't matter if we miss an update)
119  */
120 rtsBool blackholes_need_checking = rtsFalse;
121
122 /* Linked list of all threads.
123  * Used for detecting garbage collected threads.
124  * LOCK: sched_mutex+capability, or all capabilities
125  */
126 StgTSO *all_threads = NULL;
127
128 /* flag set by signal handler to precipitate a context switch
129  * LOCK: none (just an advisory flag)
130  */
131 int context_switch = 0;
132
133 /* flag that tracks whether we have done any execution in this time slice.
134  * LOCK: currently none, perhaps we should lock (but needs to be
135  * updated in the fast path of the scheduler).
136  */
137 nat recent_activity = ACTIVITY_YES;
138
139 /* if this flag is set as well, give up execution
140  * LOCK: none (changes once, from false->true)
141  */
142 rtsBool sched_state = SCHED_RUNNING;
143
144 #if defined(GRAN)
145 StgTSO *CurrentTSO;
146 #endif
147
148 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
149  *  exists - earlier gccs apparently didn't.
150  *  -= chak
151  */
152 StgTSO dummy_tso;
153
154 /*
155  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156  * in an MT setting, needed to signal that a worker thread shouldn't hang around
157  * in the scheduler when it is out of work.
158  */
159 rtsBool shutting_down_scheduler = rtsFalse;
160
161 /*
162  * This mutex protects most of the global scheduler data in
163  * the THREADED_RTS runtime.
164  */
165 #if defined(THREADED_RTS)
166 Mutex sched_mutex;
167 #endif
168
169 #if defined(PARALLEL_HASKELL)
170 StgTSO *LastTSO;
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
173 #endif
174
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
177 #endif
178
179 /* -----------------------------------------------------------------------------
180  * static function prototypes
181  * -------------------------------------------------------------------------- */
182
183 static Capability *schedule (Capability *initialCapability, Task *task);
184
185 //
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
189 //
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
193 #endif
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 #if defined(GRAN)
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #endif
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #endif
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
209 #endif
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
213                                          StgTSO *t);
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
215                                     nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218                                              StgTSO *t );
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221                                 rtsBool force_major);
222
223 static rtsBool checkBlackHoles(Capability *cap);
224
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
229
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
232 #endif
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);  
237 #endif
238
239 #ifdef DEBUG
240 static char *whatNext_strs[] = {
241   "(unknown)",
242   "ThreadRunGHC",
243   "ThreadInterpret",
244   "ThreadKilled",
245   "ThreadRelocated",
246   "ThreadComplete"
247 };
248 #endif
249
250 /* -----------------------------------------------------------------------------
251  * Putting a thread on the run queue: different scheduling policies
252  * -------------------------------------------------------------------------- */
253
254 STATIC_INLINE void
255 addToRunQueue( Capability *cap, StgTSO *t )
256 {
257 #if defined(PARALLEL_HASKELL)
258     if (RtsFlags.ParFlags.doFairScheduling) { 
259         // this does round-robin scheduling; good for concurrency
260         appendToRunQueue(cap,t);
261     } else {
262         // this does unfair scheduling; good for parallelism
263         pushOnRunQueue(cap,t);
264     }
265 #else
266     // this does round-robin scheduling; good for concurrency
267     appendToRunQueue(cap,t);
268 #endif
269 }
270
271 /* ---------------------------------------------------------------------------
272    Main scheduling loop.
273
274    We use round-robin scheduling, each thread returning to the
275    scheduler loop when one of these conditions is detected:
276
277       * out of heap space
278       * timer expires (thread yields)
279       * thread blocks
280       * thread ends
281       * stack overflow
282
283    GRAN version:
284      In a GranSim setup this loop iterates over the global event queue.
285      This revolves around the global event queue, which determines what 
286      to do next. Therefore, it's more complicated than either the 
287      concurrent or the parallel (GUM) setup.
288
289    GUM version:
290      GUM iterates over incoming messages.
291      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292      and sends out a fish whenever it has nothing to do; in-between
293      doing the actual reductions (shared code below) it processes the
294      incoming messages and deals with delayed operations 
295      (see PendingFetches).
296      This is not the ugliest code you could imagine, but it's bloody close.
297
298    ------------------------------------------------------------------------ */
299
300 static Capability *
301 schedule (Capability *initialCapability, Task *task)
302 {
303   StgTSO *t;
304   Capability *cap;
305   StgThreadReturnCode ret;
306 #if defined(GRAN)
307   rtsEvent *event;
308 #elif defined(PARALLEL_HASKELL)
309   StgTSO *tso;
310   GlobalTaskId pe;
311   rtsBool receivedFinish = rtsFalse;
312 # if defined(DEBUG)
313   nat tp_size, sp_size; // stats only
314 # endif
315 #endif
316   nat prev_what_next;
317   rtsBool ready_to_gc;
318 #if defined(THREADED_RTS)
319   rtsBool first = rtsTrue;
320 #endif
321   
322   cap = initialCapability;
323
324   // Pre-condition: this task owns initialCapability.
325   // The sched_mutex is *NOT* held
326   // NB. on return, we still hold a capability.
327
328   debugTrace (DEBUG_sched, 
329               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330               task, initialCapability);
331
332   schedulePreLoop();
333
334   // -----------------------------------------------------------
335   // Scheduler loop starts here:
336
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION        (!receivedFinish)
339 #elif defined(GRAN)
340 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
341 #else
342 #define TERMINATION_CONDITION        rtsTrue
343 #endif
344
345   while (TERMINATION_CONDITION) {
346
347 #if defined(GRAN)
348       /* Choose the processor with the next event */
349       CurrentProc = event->proc;
350       CurrentTSO = event->tso;
351 #endif
352
353 #if defined(THREADED_RTS)
354       if (first) {
355           // don't yield the first time, we want a chance to run this
356           // thread for a bit, even if there are others banging at the
357           // door.
358           first = rtsFalse;
359           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360       } else {
361           // Yield the capability to higher-priority tasks if necessary.
362           yieldCapability(&cap, task);
363       }
364 #endif
365       
366 #if defined(THREADED_RTS)
367       schedulePushWork(cap,task);
368 #endif
369
370     // Check whether we have re-entered the RTS from Haskell without
371     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372     // call).
373     if (cap->in_haskell) {
374           errorBelch("schedule: re-entered unsafely.\n"
375                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
376           stg_exit(EXIT_FAILURE);
377     }
378
379     // The interruption / shutdown sequence.
380     // 
381     // In order to cleanly shut down the runtime, we want to:
382     //   * make sure that all main threads return to their callers
383     //     with the state 'Interrupted'.
384     //   * clean up all OS threads assocated with the runtime
385     //   * free all memory etc.
386     //
387     // So the sequence for ^C goes like this:
388     //
389     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
390     //     arranges for some Capability to wake up
391     //
392     //   * all threads in the system are halted, and the zombies are
393     //     placed on the run queue for cleaning up.  We acquire all
394     //     the capabilities in order to delete the threads, this is
395     //     done by scheduleDoGC() for convenience (because GC already
396     //     needs to acquire all the capabilities).  We can't kill
397     //     threads involved in foreign calls.
398     // 
399     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
400     //
401     //   * sched_state := SCHED_SHUTTING_DOWN
402     //
403     //   * all workers exit when the run queue on their capability
404     //     drains.  All main threads will also exit when their TSO
405     //     reaches the head of the run queue and they can return.
406     //
407     //   * eventually all Capabilities will shut down, and the RTS can
408     //     exit.
409     //
410     //   * We might be left with threads blocked in foreign calls, 
411     //     we should really attempt to kill these somehow (TODO);
412     
413     switch (sched_state) {
414     case SCHED_RUNNING:
415         break;
416     case SCHED_INTERRUPTING:
417         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419         discardSparksCap(cap);
420 #endif
421         /* scheduleDoGC() deletes all the threads */
422         cap = scheduleDoGC(cap,task,rtsFalse);
423         break;
424     case SCHED_SHUTTING_DOWN:
425         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426         // If we are a worker, just exit.  If we're a bound thread
427         // then we will exit below when we've removed our TSO from
428         // the run queue.
429         if (task->tso == NULL && emptyRunQueue(cap)) {
430             return cap;
431         }
432         break;
433     default:
434         barf("sched_state: %d", sched_state);
435     }
436
437 #if defined(THREADED_RTS)
438     // If the run queue is empty, take a spark and turn it into a thread.
439     {
440         if (emptyRunQueue(cap)) {
441             StgClosure *spark;
442             spark = findSpark(cap);
443             if (spark != NULL) {
444                 debugTrace(DEBUG_sched,
445                            "turning spark of closure %p into a thread",
446                            (StgClosure *)spark);
447                 createSparkThread(cap,spark);     
448             }
449         }
450     }
451 #endif // THREADED_RTS
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckWakeupThreads(cap);
462
463     scheduleCheckBlockedThreads(cap);
464
465     scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467     cap = task->cap;    // reload cap, it might have changed
468 #endif
469
470     // Normally, the only way we can get here with no threads to
471     // run is if a keyboard interrupt received during 
472     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473     // Additionally, it is not fatal for the
474     // threaded RTS to reach here with no threads to run.
475     //
476     // win32: might be here due to awaitEvent() being abandoned
477     // as a result of a console event having been delivered.
478     if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480         ASSERT(sched_state >= SCHED_INTERRUPTING);
481 #endif
482         continue; // nothing to do
483     }
484
485 #if defined(PARALLEL_HASKELL)
486     scheduleSendPendingMessages();
487     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
488         continue;
489
490 #if defined(SPARKS)
491     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
492 #endif
493
494     /* If we still have no work we need to send a FISH to get a spark
495        from another PE */
496     if (emptyRunQueue(cap)) {
497         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498         ASSERT(rtsFalse); // should not happen at the moment
499     }
500     // from here: non-empty run queue.
501     //  TODO: merge above case with this, only one call processMessages() !
502     if (PacketsWaiting()) {  /* process incoming messages, if
503                                 any pending...  only in else
504                                 because getRemoteWork waits for
505                                 messages as well */
506         receivedFinish = processMessages();
507     }
508 #endif
509
510 #if defined(GRAN)
511     scheduleProcessEvent(event);
512 #endif
513
514     // 
515     // Get a thread to run
516     //
517     t = popRunQueue(cap);
518
519 #if defined(GRAN) || defined(PAR)
520     scheduleGranParReport(); // some kind of debuging output
521 #else
522     // Sanity check the thread we're about to run.  This can be
523     // expensive if there is lots of thread switching going on...
524     IF_DEBUG(sanity,checkTSO(t));
525 #endif
526
527 #if defined(THREADED_RTS)
528     // Check whether we can run this thread in the current task.
529     // If not, we have to pass our capability to the right task.
530     {
531         Task *bound = t->bound;
532       
533         if (bound) {
534             if (bound == task) {
535                 debugTrace(DEBUG_sched,
536                            "### Running thread %lu in bound thread", (unsigned long)t->id);
537                 // yes, the Haskell thread is bound to the current native thread
538             } else {
539                 debugTrace(DEBUG_sched,
540                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
541                 // no, bound to a different Haskell thread: pass to that thread
542                 pushOnRunQueue(cap,t);
543                 continue;
544             }
545         } else {
546             // The thread we want to run is unbound.
547             if (task->tso) { 
548                 debugTrace(DEBUG_sched,
549                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550                 // no, the current native thread is bound to a different
551                 // Haskell thread, so pass it to any worker thread
552                 pushOnRunQueue(cap,t);
553                 continue; 
554             }
555         }
556     }
557 #endif
558
559     cap->r.rCurrentTSO = t;
560     
561     /* context switches are initiated by the timer signal, unless
562      * the user specified "context switch as often as possible", with
563      * +RTS -C0
564      */
565     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566         && !emptyThreadQueues(cap)) {
567         context_switch = 1;
568     }
569          
570 run_thread:
571
572     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
573                               (long)t->id, whatNext_strs[t->what_next]);
574
575 #if defined(PROFILING)
576     startHeapProfTimer();
577 #endif
578
579     // Check for exceptions blocked on this thread
580     maybePerformBlockedException (cap, t);
581
582     // ----------------------------------------------------------------------
583     // Run the current thread 
584
585     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586     ASSERT(t->cap == cap);
587
588     prev_what_next = t->what_next;
589
590     errno = t->saved_errno;
591 #if mingw32_HOST_OS
592     SetLastError(t->saved_winerror);
593 #endif
594
595     cap->in_haskell = rtsTrue;
596
597     dirtyTSO(t);
598
599     recent_activity = ACTIVITY_YES;
600
601     switch (prev_what_next) {
602         
603     case ThreadKilled:
604     case ThreadComplete:
605         /* Thread already finished, return to scheduler. */
606         ret = ThreadFinished;
607         break;
608         
609     case ThreadRunGHC:
610     {
611         StgRegTable *r;
612         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
613         cap = regTableToCapability(r);
614         ret = r->rRet;
615         break;
616     }
617     
618     case ThreadInterpret:
619         cap = interpretBCO(cap);
620         ret = cap->r.rRet;
621         break;
622         
623     default:
624         barf("schedule: invalid what_next field");
625     }
626
627     cap->in_haskell = rtsFalse;
628
629     // The TSO might have moved, eg. if it re-entered the RTS and a GC
630     // happened.  So find the new location:
631     t = cap->r.rCurrentTSO;
632
633     // We have run some Haskell code: there might be blackhole-blocked
634     // threads to wake up now.
635     // Lock-free test here should be ok, we're just setting a flag.
636     if ( blackhole_queue != END_TSO_QUEUE ) {
637         blackholes_need_checking = rtsTrue;
638     }
639     
640     // And save the current errno in this thread.
641     // XXX: possibly bogus for SMP because this thread might already
642     // be running again, see code below.
643     t->saved_errno = errno;
644 #if mingw32_HOST_OS
645     // Similarly for Windows error code
646     t->saved_winerror = GetLastError();
647 #endif
648
649 #if defined(THREADED_RTS)
650     // If ret is ThreadBlocked, and this Task is bound to the TSO that
651     // blocked, we are in limbo - the TSO is now owned by whatever it
652     // is blocked on, and may in fact already have been woken up,
653     // perhaps even on a different Capability.  It may be the case
654     // that task->cap != cap.  We better yield this Capability
655     // immediately and return to normaility.
656     if (ret == ThreadBlocked) {
657         debugTrace(DEBUG_sched,
658                    "--<< thread %lu (%s) stopped: blocked",
659                    (unsigned long)t->id, whatNext_strs[t->what_next]);
660         continue;
661     }
662 #endif
663
664     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
665     ASSERT(t->cap == cap);
666
667     // ----------------------------------------------------------------------
668     
669     // Costs for the scheduler are assigned to CCS_SYSTEM
670 #if defined(PROFILING)
671     stopHeapProfTimer();
672     CCCS = CCS_SYSTEM;
673 #endif
674     
675     schedulePostRunThread();
676
677     ready_to_gc = rtsFalse;
678
679     switch (ret) {
680     case HeapOverflow:
681         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
682         break;
683
684     case StackOverflow:
685         scheduleHandleStackOverflow(cap,task,t);
686         break;
687
688     case ThreadYielding:
689         if (scheduleHandleYield(cap, t, prev_what_next)) {
690             // shortcut for switching between compiler/interpreter:
691             goto run_thread; 
692         }
693         break;
694
695     case ThreadBlocked:
696         scheduleHandleThreadBlocked(t);
697         break;
698
699     case ThreadFinished:
700         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
701         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
702         break;
703
704     default:
705       barf("schedule: invalid thread return code %d", (int)ret);
706     }
707
708     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
709     if (ready_to_gc) {
710       cap = scheduleDoGC(cap,task,rtsFalse);
711     }
712   } /* end of while() */
713
714   debugTrace(PAR_DEBUG_verbose,
715              "== Leaving schedule() after having received Finish");
716 }
717
718 /* ----------------------------------------------------------------------------
719  * Setting up the scheduler loop
720  * ------------------------------------------------------------------------- */
721
722 static void
723 schedulePreLoop(void)
724 {
725 #if defined(GRAN) 
726     /* set up first event to get things going */
727     /* ToDo: assign costs for system setup and init MainTSO ! */
728     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
729               ContinueThread, 
730               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
731     
732     debugTrace (DEBUG_gran,
733                 "GRAN: Init CurrentTSO (in schedule) = %p", 
734                 CurrentTSO);
735     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
736     
737     if (RtsFlags.GranFlags.Light) {
738         /* Save current time; GranSim Light only */
739         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
740     }      
741 #endif
742 }
743
744 /* -----------------------------------------------------------------------------
745  * schedulePushWork()
746  *
747  * Push work to other Capabilities if we have some.
748  * -------------------------------------------------------------------------- */
749
750 #if defined(THREADED_RTS)
751 static void
752 schedulePushWork(Capability *cap USED_IF_THREADS, 
753                  Task *task      USED_IF_THREADS)
754 {
755     Capability *free_caps[n_capabilities], *cap0;
756     nat i, n_free_caps;
757
758     // migration can be turned off with +RTS -qg
759     if (!RtsFlags.ParFlags.migrate) return;
760
761     // Check whether we have more threads on our run queue, or sparks
762     // in our pool, that we could hand to another Capability.
763     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
764         && sparkPoolSizeCap(cap) < 2) {
765         return;
766     }
767
768     // First grab as many free Capabilities as we can.
769     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
770         cap0 = &capabilities[i];
771         if (cap != cap0 && tryGrabCapability(cap0,task)) {
772             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
773                 // it already has some work, we just grabbed it at 
774                 // the wrong moment.  Or maybe it's deadlocked!
775                 releaseCapability(cap0);
776             } else {
777                 free_caps[n_free_caps++] = cap0;
778             }
779         }
780     }
781
782     // we now have n_free_caps free capabilities stashed in
783     // free_caps[].  Share our run queue equally with them.  This is
784     // probably the simplest thing we could do; improvements we might
785     // want to do include:
786     //
787     //   - giving high priority to moving relatively new threads, on 
788     //     the gournds that they haven't had time to build up a
789     //     working set in the cache on this CPU/Capability.
790     //
791     //   - giving low priority to moving long-lived threads
792
793     if (n_free_caps > 0) {
794         StgTSO *prev, *t, *next;
795         rtsBool pushed_to_all;
796
797         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
798
799         i = 0;
800         pushed_to_all = rtsFalse;
801
802         if (cap->run_queue_hd != END_TSO_QUEUE) {
803             prev = cap->run_queue_hd;
804             t = prev->link;
805             prev->link = END_TSO_QUEUE;
806             for (; t != END_TSO_QUEUE; t = next) {
807                 next = t->link;
808                 t->link = END_TSO_QUEUE;
809                 if (t->what_next == ThreadRelocated
810                     || t->bound == task // don't move my bound thread
811                     || tsoLocked(t)) {  // don't move a locked thread
812                     prev->link = t;
813                     prev = t;
814                 } else if (i == n_free_caps) {
815                     pushed_to_all = rtsTrue;
816                     i = 0;
817                     // keep one for us
818                     prev->link = t;
819                     prev = t;
820                 } else {
821                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
822                     appendToRunQueue(free_caps[i],t);
823                     if (t->bound) { t->bound->cap = free_caps[i]; }
824                     t->cap = free_caps[i];
825                     i++;
826                 }
827             }
828             cap->run_queue_tl = prev;
829         }
830
831         // If there are some free capabilities that we didn't push any
832         // threads to, then try to push a spark to each one.
833         if (!pushed_to_all) {
834             StgClosure *spark;
835             // i is the next free capability to push to
836             for (; i < n_free_caps; i++) {
837                 if (emptySparkPoolCap(free_caps[i])) {
838                     spark = findSpark(cap);
839                     if (spark != NULL) {
840                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
841                         newSpark(&(free_caps[i]->r), spark);
842                     }
843                 }
844             }
845         }
846
847         // release the capabilities
848         for (i = 0; i < n_free_caps; i++) {
849             task->cap = free_caps[i];
850             releaseCapability(free_caps[i]);
851         }
852     }
853     task->cap = cap; // reset to point to our Capability.
854 }
855 #endif
856
857 /* ----------------------------------------------------------------------------
858  * Start any pending signal handlers
859  * ------------------------------------------------------------------------- */
860
861 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
862 static void
863 scheduleStartSignalHandlers(Capability *cap)
864 {
865     if (signals_pending()) { // safe outside the lock
866         startSignalHandlers(cap);
867     }
868 }
869 #else
870 static void
871 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
872 {
873 }
874 #endif
875
876 /* ----------------------------------------------------------------------------
877  * Check for blocked threads that can be woken up.
878  * ------------------------------------------------------------------------- */
879
880 static void
881 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
882 {
883 #if !defined(THREADED_RTS)
884     //
885     // Check whether any waiting threads need to be woken up.  If the
886     // run queue is empty, and there are no other tasks running, we
887     // can wait indefinitely for something to happen.
888     //
889     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
890     {
891         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
892     }
893 #endif
894 }
895
896
897 /* ----------------------------------------------------------------------------
898  * Check for threads woken up by other Capabilities
899  * ------------------------------------------------------------------------- */
900
901 static void
902 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
903 {
904 #if defined(THREADED_RTS)
905     // Any threads that were woken up by other Capabilities get
906     // appended to our run queue.
907     if (!emptyWakeupQueue(cap)) {
908         ACQUIRE_LOCK(&cap->lock);
909         if (emptyRunQueue(cap)) {
910             cap->run_queue_hd = cap->wakeup_queue_hd;
911             cap->run_queue_tl = cap->wakeup_queue_tl;
912         } else {
913             cap->run_queue_tl->link = cap->wakeup_queue_hd;
914             cap->run_queue_tl = cap->wakeup_queue_tl;
915         }
916         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
917         RELEASE_LOCK(&cap->lock);
918     }
919 #endif
920 }
921
922 /* ----------------------------------------------------------------------------
923  * Check for threads blocked on BLACKHOLEs that can be woken up
924  * ------------------------------------------------------------------------- */
925 static void
926 scheduleCheckBlackHoles (Capability *cap)
927 {
928     if ( blackholes_need_checking ) // check without the lock first
929     {
930         ACQUIRE_LOCK(&sched_mutex);
931         if ( blackholes_need_checking ) {
932             checkBlackHoles(cap);
933             blackholes_need_checking = rtsFalse;
934         }
935         RELEASE_LOCK(&sched_mutex);
936     }
937 }
938
939 /* ----------------------------------------------------------------------------
940  * Detect deadlock conditions and attempt to resolve them.
941  * ------------------------------------------------------------------------- */
942
943 static void
944 scheduleDetectDeadlock (Capability *cap, Task *task)
945 {
946
947 #if defined(PARALLEL_HASKELL)
948     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
949     return;
950 #endif
951
952     /* 
953      * Detect deadlock: when we have no threads to run, there are no
954      * threads blocked, waiting for I/O, or sleeping, and all the
955      * other tasks are waiting for work, we must have a deadlock of
956      * some description.
957      */
958     if ( emptyThreadQueues(cap) )
959     {
960 #if defined(THREADED_RTS)
961         /* 
962          * In the threaded RTS, we only check for deadlock if there
963          * has been no activity in a complete timeslice.  This means
964          * we won't eagerly start a full GC just because we don't have
965          * any threads to run currently.
966          */
967         if (recent_activity != ACTIVITY_INACTIVE) return;
968 #endif
969
970         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
971
972         // Garbage collection can release some new threads due to
973         // either (a) finalizers or (b) threads resurrected because
974         // they are unreachable and will therefore be sent an
975         // exception.  Any threads thus released will be immediately
976         // runnable.
977         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
978
979         recent_activity = ACTIVITY_DONE_GC;
980         
981         if ( !emptyRunQueue(cap) ) return;
982
983 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
984         /* If we have user-installed signal handlers, then wait
985          * for signals to arrive rather then bombing out with a
986          * deadlock.
987          */
988         if ( anyUserHandlers() ) {
989             debugTrace(DEBUG_sched,
990                        "still deadlocked, waiting for signals...");
991
992             awaitUserSignals();
993
994             if (signals_pending()) {
995                 startSignalHandlers(cap);
996             }
997
998             // either we have threads to run, or we were interrupted:
999             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1000         }
1001 #endif
1002
1003 #if !defined(THREADED_RTS)
1004         /* Probably a real deadlock.  Send the current main thread the
1005          * Deadlock exception.
1006          */
1007         if (task->tso) {
1008             switch (task->tso->why_blocked) {
1009             case BlockedOnSTM:
1010             case BlockedOnBlackHole:
1011             case BlockedOnException:
1012             case BlockedOnMVar:
1013                 throwToSingleThreaded(cap, task->tso, 
1014                                       (StgClosure *)NonTermination_closure);
1015                 return;
1016             default:
1017                 barf("deadlock: main thread blocked in a strange way");
1018             }
1019         }
1020         return;
1021 #endif
1022     }
1023 }
1024
1025 /* ----------------------------------------------------------------------------
1026  * Process an event (GRAN only)
1027  * ------------------------------------------------------------------------- */
1028
1029 #if defined(GRAN)
1030 static StgTSO *
1031 scheduleProcessEvent(rtsEvent *event)
1032 {
1033     StgTSO *t;
1034
1035     if (RtsFlags.GranFlags.Light)
1036       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1037
1038     /* adjust time based on time-stamp */
1039     if (event->time > CurrentTime[CurrentProc] &&
1040         event->evttype != ContinueThread)
1041       CurrentTime[CurrentProc] = event->time;
1042     
1043     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1044     if (!RtsFlags.GranFlags.Light)
1045       handleIdlePEs();
1046
1047     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1048
1049     /* main event dispatcher in GranSim */
1050     switch (event->evttype) {
1051       /* Should just be continuing execution */
1052     case ContinueThread:
1053       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1054       /* ToDo: check assertion
1055       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1056              run_queue_hd != END_TSO_QUEUE);
1057       */
1058       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1059       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1060           procStatus[CurrentProc]==Fetching) {
1061         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1062               CurrentTSO->id, CurrentTSO, CurrentProc);
1063         goto next_thread;
1064       } 
1065       /* Ignore ContinueThreads for completed threads */
1066       if (CurrentTSO->what_next == ThreadComplete) {
1067         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1068               CurrentTSO->id, CurrentTSO, CurrentProc);
1069         goto next_thread;
1070       } 
1071       /* Ignore ContinueThreads for threads that are being migrated */
1072       if (PROCS(CurrentTSO)==Nowhere) { 
1073         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1074               CurrentTSO->id, CurrentTSO, CurrentProc);
1075         goto next_thread;
1076       }
1077       /* The thread should be at the beginning of the run queue */
1078       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1079         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1080               CurrentTSO->id, CurrentTSO, CurrentProc);
1081         break; // run the thread anyway
1082       }
1083       /*
1084       new_event(proc, proc, CurrentTime[proc],
1085                 FindWork,
1086                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1087       goto next_thread; 
1088       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1089       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1090
1091     case FetchNode:
1092       do_the_fetchnode(event);
1093       goto next_thread;             /* handle next event in event queue  */
1094       
1095     case GlobalBlock:
1096       do_the_globalblock(event);
1097       goto next_thread;             /* handle next event in event queue  */
1098       
1099     case FetchReply:
1100       do_the_fetchreply(event);
1101       goto next_thread;             /* handle next event in event queue  */
1102       
1103     case UnblockThread:   /* Move from the blocked queue to the tail of */
1104       do_the_unblock(event);
1105       goto next_thread;             /* handle next event in event queue  */
1106       
1107     case ResumeThread:  /* Move from the blocked queue to the tail of */
1108       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1109       event->tso->gran.blocktime += 
1110         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1111       do_the_startthread(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case StartThread:
1115       do_the_startthread(event);
1116       goto next_thread;             /* handle next event in event queue  */
1117       
1118     case MoveThread:
1119       do_the_movethread(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     case MoveSpark:
1123       do_the_movespark(event);
1124       goto next_thread;             /* handle next event in event queue  */
1125       
1126     case FindWork:
1127       do_the_findwork(event);
1128       goto next_thread;             /* handle next event in event queue  */
1129       
1130     default:
1131       barf("Illegal event type %u\n", event->evttype);
1132     }  /* switch */
1133     
1134     /* This point was scheduler_loop in the old RTS */
1135
1136     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1137
1138     TimeOfLastEvent = CurrentTime[CurrentProc];
1139     TimeOfNextEvent = get_time_of_next_event();
1140     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1141     // CurrentTSO = ThreadQueueHd;
1142
1143     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1144                          TimeOfNextEvent));
1145
1146     if (RtsFlags.GranFlags.Light) 
1147       GranSimLight_leave_system(event, &ActiveTSO); 
1148
1149     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1150
1151     IF_DEBUG(gran, 
1152              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1153
1154     /* in a GranSim setup the TSO stays on the run queue */
1155     t = CurrentTSO;
1156     /* Take a thread from the run queue. */
1157     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1158
1159     IF_DEBUG(gran, 
1160              debugBelch("GRAN: About to run current thread, which is\n");
1161              G_TSO(t,5));
1162
1163     context_switch = 0; // turned on via GranYield, checking events and time slice
1164
1165     IF_DEBUG(gran, 
1166              DumpGranEvent(GR_SCHEDULE, t));
1167
1168     procStatus[CurrentProc] = Busy;
1169 }
1170 #endif // GRAN
1171
1172 /* ----------------------------------------------------------------------------
1173  * Send pending messages (PARALLEL_HASKELL only)
1174  * ------------------------------------------------------------------------- */
1175
1176 #if defined(PARALLEL_HASKELL)
1177 static StgTSO *
1178 scheduleSendPendingMessages(void)
1179 {
1180     StgSparkPool *pool;
1181     rtsSpark spark;
1182     StgTSO *t;
1183
1184 # if defined(PAR) // global Mem.Mgmt., omit for now
1185     if (PendingFetches != END_BF_QUEUE) {
1186         processFetches();
1187     }
1188 # endif
1189     
1190     if (RtsFlags.ParFlags.BufferTime) {
1191         // if we use message buffering, we must send away all message
1192         // packets which have become too old...
1193         sendOldBuffers(); 
1194     }
1195 }
1196 #endif
1197
1198 /* ----------------------------------------------------------------------------
1199  * Activate spark threads (PARALLEL_HASKELL only)
1200  * ------------------------------------------------------------------------- */
1201
1202 #if defined(PARALLEL_HASKELL)
1203 static void
1204 scheduleActivateSpark(void)
1205 {
1206 #if defined(SPARKS)
1207   ASSERT(emptyRunQueue());
1208 /* We get here if the run queue is empty and want some work.
1209    We try to turn a spark into a thread, and add it to the run queue,
1210    from where it will be picked up in the next iteration of the scheduler
1211    loop.
1212 */
1213
1214       /* :-[  no local threads => look out for local sparks */
1215       /* the spark pool for the current PE */
1216       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1217       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1218           pool->hd < pool->tl) {
1219         /* 
1220          * ToDo: add GC code check that we really have enough heap afterwards!!
1221          * Old comment:
1222          * If we're here (no runnable threads) and we have pending
1223          * sparks, we must have a space problem.  Get enough space
1224          * to turn one of those pending sparks into a
1225          * thread... 
1226          */
1227
1228         spark = findSpark(rtsFalse);            /* get a spark */
1229         if (spark != (rtsSpark) NULL) {
1230           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1231           IF_PAR_DEBUG(fish, // schedule,
1232                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1233                              tso->id, tso, advisory_thread_count));
1234
1235           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1236             IF_PAR_DEBUG(fish, // schedule,
1237                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1238                             spark));
1239             return rtsFalse; /* failed to generate a thread */
1240           }                  /* otherwise fall through & pick-up new tso */
1241         } else {
1242           IF_PAR_DEBUG(fish, // schedule,
1243                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1244                              spark_queue_len(pool)));
1245           return rtsFalse;  /* failed to generate a thread */
1246         }
1247         return rtsTrue;  /* success in generating a thread */
1248   } else { /* no more threads permitted or pool empty */
1249     return rtsFalse;  /* failed to generateThread */
1250   }
1251 #else
1252   tso = NULL; // avoid compiler warning only
1253   return rtsFalse;  /* dummy in non-PAR setup */
1254 #endif // SPARKS
1255 }
1256 #endif // PARALLEL_HASKELL
1257
1258 /* ----------------------------------------------------------------------------
1259  * Get work from a remote node (PARALLEL_HASKELL only)
1260  * ------------------------------------------------------------------------- */
1261     
1262 #if defined(PARALLEL_HASKELL)
1263 static rtsBool
1264 scheduleGetRemoteWork(rtsBool *receivedFinish)
1265 {
1266   ASSERT(emptyRunQueue());
1267
1268   if (RtsFlags.ParFlags.BufferTime) {
1269         IF_PAR_DEBUG(verbose, 
1270                 debugBelch("...send all pending data,"));
1271         {
1272           nat i;
1273           for (i=1; i<=nPEs; i++)
1274             sendImmediately(i); // send all messages away immediately
1275         }
1276   }
1277 # ifndef SPARKS
1278         //++EDEN++ idle() , i.e. send all buffers, wait for work
1279         // suppress fishing in EDEN... just look for incoming messages
1280         // (blocking receive)
1281   IF_PAR_DEBUG(verbose, 
1282                debugBelch("...wait for incoming messages...\n"));
1283   *receivedFinish = processMessages(); // blocking receive...
1284
1285         // and reenter scheduling loop after having received something
1286         // (return rtsFalse below)
1287
1288 # else /* activate SPARKS machinery */
1289 /* We get here, if we have no work, tried to activate a local spark, but still
1290    have no work. We try to get a remote spark, by sending a FISH message.
1291    Thread migration should be added here, and triggered when a sequence of 
1292    fishes returns without work. */
1293         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1294
1295       /* =8-[  no local sparks => look for work on other PEs */
1296         /*
1297          * We really have absolutely no work.  Send out a fish
1298          * (there may be some out there already), and wait for
1299          * something to arrive.  We clearly can't run any threads
1300          * until a SCHEDULE or RESUME arrives, and so that's what
1301          * we're hoping to see.  (Of course, we still have to
1302          * respond to other types of messages.)
1303          */
1304         rtsTime now = msTime() /*CURRENT_TIME*/;
1305         IF_PAR_DEBUG(verbose, 
1306                      debugBelch("--  now=%ld\n", now));
1307         IF_PAR_DEBUG(fish, // verbose,
1308              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1309                  (last_fish_arrived_at!=0 &&
1310                   last_fish_arrived_at+delay > now)) {
1311                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1312                      now, last_fish_arrived_at+delay, 
1313                      last_fish_arrived_at,
1314                      delay);
1315              });
1316   
1317         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1318             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1319           if (last_fish_arrived_at==0 ||
1320               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1321             /* outstandingFishes is set in sendFish, processFish;
1322                avoid flooding system with fishes via delay */
1323     next_fish_to_send_at = 0;  
1324   } else {
1325     /* ToDo: this should be done in the main scheduling loop to avoid the
1326              busy wait here; not so bad if fish delay is very small  */
1327     int iq = 0; // DEBUGGING -- HWL
1328     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1329     /* send a fish when ready, but process messages that arrive in the meantime */
1330     do {
1331       if (PacketsWaiting()) {
1332         iq++; // DEBUGGING
1333         *receivedFinish = processMessages();
1334       }
1335       now = msTime();
1336     } while (!*receivedFinish || now<next_fish_to_send_at);
1337     // JB: This means the fish could become obsolete, if we receive
1338     // work. Better check for work again? 
1339     // last line: while (!receivedFinish || !haveWork || now<...)
1340     // next line: if (receivedFinish || haveWork )
1341
1342     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1343       return rtsFalse;  // NB: this will leave scheduler loop
1344                         // immediately after return!
1345                           
1346     IF_PAR_DEBUG(fish, // verbose,
1347                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1348
1349   }
1350
1351     // JB: IMHO, this should all be hidden inside sendFish(...)
1352     /* pe = choosePE(); 
1353        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1354                 NEW_FISH_HUNGER);
1355
1356     // Global statistics: count no. of fishes
1357     if (RtsFlags.ParFlags.ParStats.Global &&
1358          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1359            globalParStats.tot_fish_mess++;
1360            }
1361     */ 
1362
1363   /* delayed fishes must have been sent by now! */
1364   next_fish_to_send_at = 0;  
1365   }
1366       
1367   *receivedFinish = processMessages();
1368 # endif /* SPARKS */
1369
1370  return rtsFalse;
1371  /* NB: this function always returns rtsFalse, meaning the scheduler
1372     loop continues with the next iteration; 
1373     rationale: 
1374       return code means success in finding work; we enter this function
1375       if there is no local work, thus have to send a fish which takes
1376       time until it arrives with work; in the meantime we should process
1377       messages in the main loop;
1378  */
1379 }
1380 #endif // PARALLEL_HASKELL
1381
1382 /* ----------------------------------------------------------------------------
1383  * PAR/GRAN: Report stats & debugging info(?)
1384  * ------------------------------------------------------------------------- */
1385
1386 #if defined(PAR) || defined(GRAN)
1387 static void
1388 scheduleGranParReport(void)
1389 {
1390   ASSERT(run_queue_hd != END_TSO_QUEUE);
1391
1392   /* Take a thread from the run queue, if we have work */
1393   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1394
1395     /* If this TSO has got its outport closed in the meantime, 
1396      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1397      * It has to be marked as TH_DEAD for this purpose.
1398      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1399
1400 JB: TODO: investigate wether state change field could be nuked
1401      entirely and replaced by the normal tso state (whatnext
1402      field). All we want to do is to kill tsos from outside.
1403      */
1404
1405     /* ToDo: write something to the log-file
1406     if (RTSflags.ParFlags.granSimStats && !sameThread)
1407         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1408
1409     CurrentTSO = t;
1410     */
1411     /* the spark pool for the current PE */
1412     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1413
1414     IF_DEBUG(scheduler, 
1415              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1416                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1417
1418     IF_PAR_DEBUG(fish,
1419              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1420                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1421
1422     if (RtsFlags.ParFlags.ParStats.Full && 
1423         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1424         (emitSchedule || // forced emit
1425          (t && LastTSO && t->id != LastTSO->id))) {
1426       /* 
1427          we are running a different TSO, so write a schedule event to log file
1428          NB: If we use fair scheduling we also have to write  a deschedule 
1429              event for LastTSO; with unfair scheduling we know that the
1430              previous tso has blocked whenever we switch to another tso, so
1431              we don't need it in GUM for now
1432       */
1433       IF_PAR_DEBUG(fish, // schedule,
1434                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1435
1436       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1437                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1438       emitSchedule = rtsFalse;
1439     }
1440 }     
1441 #endif
1442
1443 /* ----------------------------------------------------------------------------
1444  * After running a thread...
1445  * ------------------------------------------------------------------------- */
1446
1447 static void
1448 schedulePostRunThread(void)
1449 {
1450 #if defined(PAR)
1451     /* HACK 675: if the last thread didn't yield, make sure to print a 
1452        SCHEDULE event to the log file when StgRunning the next thread, even
1453        if it is the same one as before */
1454     LastTSO = t; 
1455     TimeOfLastYield = CURRENT_TIME;
1456 #endif
1457
1458   /* some statistics gathering in the parallel case */
1459
1460 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1461   switch (ret) {
1462     case HeapOverflow:
1463 # if defined(GRAN)
1464       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1465       globalGranStats.tot_heapover++;
1466 # elif defined(PAR)
1467       globalParStats.tot_heapover++;
1468 # endif
1469       break;
1470
1471      case StackOverflow:
1472 # if defined(GRAN)
1473       IF_DEBUG(gran, 
1474                DumpGranEvent(GR_DESCHEDULE, t));
1475       globalGranStats.tot_stackover++;
1476 # elif defined(PAR)
1477       // IF_DEBUG(par, 
1478       // DumpGranEvent(GR_DESCHEDULE, t);
1479       globalParStats.tot_stackover++;
1480 # endif
1481       break;
1482
1483     case ThreadYielding:
1484 # if defined(GRAN)
1485       IF_DEBUG(gran, 
1486                DumpGranEvent(GR_DESCHEDULE, t));
1487       globalGranStats.tot_yields++;
1488 # elif defined(PAR)
1489       // IF_DEBUG(par, 
1490       // DumpGranEvent(GR_DESCHEDULE, t);
1491       globalParStats.tot_yields++;
1492 # endif
1493       break; 
1494
1495     case ThreadBlocked:
1496 # if defined(GRAN)
1497         debugTrace(DEBUG_sched, 
1498                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1499                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1500                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1501                if (t->block_info.closure!=(StgClosure*)NULL)
1502                  print_bq(t->block_info.closure);
1503                debugBelch("\n"));
1504
1505       // ??? needed; should emit block before
1506       IF_DEBUG(gran, 
1507                DumpGranEvent(GR_DESCHEDULE, t)); 
1508       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1509       /*
1510         ngoq Dogh!
1511       ASSERT(procStatus[CurrentProc]==Busy || 
1512               ((procStatus[CurrentProc]==Fetching) && 
1513               (t->block_info.closure!=(StgClosure*)NULL)));
1514       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1515           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1516             procStatus[CurrentProc]==Fetching)) 
1517         procStatus[CurrentProc] = Idle;
1518       */
1519 # elif defined(PAR)
1520 //++PAR++  blockThread() writes the event (change?)
1521 # endif
1522     break;
1523
1524   case ThreadFinished:
1525     break;
1526
1527   default:
1528     barf("parGlobalStats: unknown return code");
1529     break;
1530     }
1531 #endif
1532 }
1533
1534 /* -----------------------------------------------------------------------------
1535  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1536  * -------------------------------------------------------------------------- */
1537
1538 static rtsBool
1539 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1540 {
1541     // did the task ask for a large block?
1542     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1543         // if so, get one and push it on the front of the nursery.
1544         bdescr *bd;
1545         lnat blocks;
1546         
1547         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1548         
1549         debugTrace(DEBUG_sched,
1550                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1551                    (long)t->id, whatNext_strs[t->what_next], blocks);
1552     
1553         // don't do this if the nursery is (nearly) full, we'll GC first.
1554         if (cap->r.rCurrentNursery->link != NULL ||
1555             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1556                                                // if the nursery has only one block.
1557             
1558             ACQUIRE_SM_LOCK
1559             bd = allocGroup( blocks );
1560             RELEASE_SM_LOCK
1561             cap->r.rNursery->n_blocks += blocks;
1562             
1563             // link the new group into the list
1564             bd->link = cap->r.rCurrentNursery;
1565             bd->u.back = cap->r.rCurrentNursery->u.back;
1566             if (cap->r.rCurrentNursery->u.back != NULL) {
1567                 cap->r.rCurrentNursery->u.back->link = bd;
1568             } else {
1569 #if !defined(THREADED_RTS)
1570                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1571                        g0s0 == cap->r.rNursery);
1572 #endif
1573                 cap->r.rNursery->blocks = bd;
1574             }             
1575             cap->r.rCurrentNursery->u.back = bd;
1576             
1577             // initialise it as a nursery block.  We initialise the
1578             // step, gen_no, and flags field of *every* sub-block in
1579             // this large block, because this is easier than making
1580             // sure that we always find the block head of a large
1581             // block whenever we call Bdescr() (eg. evacuate() and
1582             // isAlive() in the GC would both have to do this, at
1583             // least).
1584             { 
1585                 bdescr *x;
1586                 for (x = bd; x < bd + blocks; x++) {
1587                     x->step = cap->r.rNursery;
1588                     x->gen_no = 0;
1589                     x->flags = 0;
1590                 }
1591             }
1592             
1593             // This assert can be a killer if the app is doing lots
1594             // of large block allocations.
1595             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1596             
1597             // now update the nursery to point to the new block
1598             cap->r.rCurrentNursery = bd;
1599             
1600             // we might be unlucky and have another thread get on the
1601             // run queue before us and steal the large block, but in that
1602             // case the thread will just end up requesting another large
1603             // block.
1604             pushOnRunQueue(cap,t);
1605             return rtsFalse;  /* not actually GC'ing */
1606         }
1607     }
1608     
1609     debugTrace(DEBUG_sched,
1610                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1611                (long)t->id, whatNext_strs[t->what_next]);
1612
1613 #if defined(GRAN)
1614     ASSERT(!is_on_queue(t,CurrentProc));
1615 #elif defined(PARALLEL_HASKELL)
1616     /* Currently we emit a DESCHEDULE event before GC in GUM.
1617        ToDo: either add separate event to distinguish SYSTEM time from rest
1618        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1619     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1620         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1621                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1622         emitSchedule = rtsTrue;
1623     }
1624 #endif
1625       
1626     pushOnRunQueue(cap,t);
1627     return rtsTrue;
1628     /* actual GC is done at the end of the while loop in schedule() */
1629 }
1630
1631 /* -----------------------------------------------------------------------------
1632  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1633  * -------------------------------------------------------------------------- */
1634
1635 static void
1636 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1637 {
1638     debugTrace (DEBUG_sched,
1639                 "--<< thread %ld (%s) stopped, StackOverflow", 
1640                 (long)t->id, whatNext_strs[t->what_next]);
1641
1642     /* just adjust the stack for this thread, then pop it back
1643      * on the run queue.
1644      */
1645     { 
1646         /* enlarge the stack */
1647         StgTSO *new_t = threadStackOverflow(cap, t);
1648         
1649         /* The TSO attached to this Task may have moved, so update the
1650          * pointer to it.
1651          */
1652         if (task->tso == t) {
1653             task->tso = new_t;
1654         }
1655         pushOnRunQueue(cap,new_t);
1656     }
1657 }
1658
1659 /* -----------------------------------------------------------------------------
1660  * Handle a thread that returned to the scheduler with ThreadYielding
1661  * -------------------------------------------------------------------------- */
1662
1663 static rtsBool
1664 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1665 {
1666     // Reset the context switch flag.  We don't do this just before
1667     // running the thread, because that would mean we would lose ticks
1668     // during GC, which can lead to unfair scheduling (a thread hogs
1669     // the CPU because the tick always arrives during GC).  This way
1670     // penalises threads that do a lot of allocation, but that seems
1671     // better than the alternative.
1672     context_switch = 0;
1673     
1674     /* put the thread back on the run queue.  Then, if we're ready to
1675      * GC, check whether this is the last task to stop.  If so, wake
1676      * up the GC thread.  getThread will block during a GC until the
1677      * GC is finished.
1678      */
1679 #ifdef DEBUG
1680     if (t->what_next != prev_what_next) {
1681         debugTrace(DEBUG_sched,
1682                    "--<< thread %ld (%s) stopped to switch evaluators", 
1683                    (long)t->id, whatNext_strs[t->what_next]);
1684     } else {
1685         debugTrace(DEBUG_sched,
1686                    "--<< thread %ld (%s) stopped, yielding",
1687                    (long)t->id, whatNext_strs[t->what_next]);
1688     }
1689 #endif
1690     
1691     IF_DEBUG(sanity,
1692              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1693              checkTSO(t));
1694     ASSERT(t->link == END_TSO_QUEUE);
1695     
1696     // Shortcut if we're just switching evaluators: don't bother
1697     // doing stack squeezing (which can be expensive), just run the
1698     // thread.
1699     if (t->what_next != prev_what_next) {
1700         return rtsTrue;
1701     }
1702     
1703 #if defined(GRAN)
1704     ASSERT(!is_on_queue(t,CurrentProc));
1705       
1706     IF_DEBUG(sanity,
1707              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1708              checkThreadQsSanity(rtsTrue));
1709
1710 #endif
1711
1712     addToRunQueue(cap,t);
1713
1714 #if defined(GRAN)
1715     /* add a ContinueThread event to actually process the thread */
1716     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1717               ContinueThread,
1718               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1719     IF_GRAN_DEBUG(bq, 
1720                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1721                   G_EVENTQ(0);
1722                   G_CURR_THREADQ(0));
1723 #endif
1724     return rtsFalse;
1725 }
1726
1727 /* -----------------------------------------------------------------------------
1728  * Handle a thread that returned to the scheduler with ThreadBlocked
1729  * -------------------------------------------------------------------------- */
1730
1731 static void
1732 scheduleHandleThreadBlocked( StgTSO *t
1733 #if !defined(GRAN) && !defined(DEBUG)
1734     STG_UNUSED
1735 #endif
1736     )
1737 {
1738 #if defined(GRAN)
1739     IF_DEBUG(scheduler,
1740              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1741                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1742              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1743     
1744     // ??? needed; should emit block before
1745     IF_DEBUG(gran, 
1746              DumpGranEvent(GR_DESCHEDULE, t)); 
1747     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1748     /*
1749       ngoq Dogh!
1750       ASSERT(procStatus[CurrentProc]==Busy || 
1751       ((procStatus[CurrentProc]==Fetching) && 
1752       (t->block_info.closure!=(StgClosure*)NULL)));
1753       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1754       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1755       procStatus[CurrentProc]==Fetching)) 
1756       procStatus[CurrentProc] = Idle;
1757     */
1758 #elif defined(PAR)
1759     IF_DEBUG(scheduler,
1760              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1761                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1762     IF_PAR_DEBUG(bq,
1763                  
1764                  if (t->block_info.closure!=(StgClosure*)NULL) 
1765                  print_bq(t->block_info.closure));
1766     
1767     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1768     blockThread(t);
1769     
1770     /* whatever we schedule next, we must log that schedule */
1771     emitSchedule = rtsTrue;
1772     
1773 #else /* !GRAN */
1774
1775       // We don't need to do anything.  The thread is blocked, and it
1776       // has tidied up its stack and placed itself on whatever queue
1777       // it needs to be on.
1778
1779     // ASSERT(t->why_blocked != NotBlocked);
1780     // Not true: for example,
1781     //    - in THREADED_RTS, the thread may already have been woken
1782     //      up by another Capability.  This actually happens: try
1783     //      conc023 +RTS -N2.
1784     //    - the thread may have woken itself up already, because
1785     //      threadPaused() might have raised a blocked throwTo
1786     //      exception, see maybePerformBlockedException().
1787
1788 #ifdef DEBUG
1789     if (traceClass(DEBUG_sched)) {
1790         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1791                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1792         printThreadBlockage(t);
1793         debugTraceEnd();
1794     }
1795 #endif
1796     
1797     /* Only for dumping event to log file 
1798        ToDo: do I need this in GranSim, too?
1799        blockThread(t);
1800     */
1801 #endif
1802 }
1803
1804 /* -----------------------------------------------------------------------------
1805  * Handle a thread that returned to the scheduler with ThreadFinished
1806  * -------------------------------------------------------------------------- */
1807
1808 static rtsBool
1809 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1810 {
1811     /* Need to check whether this was a main thread, and if so,
1812      * return with the return value.
1813      *
1814      * We also end up here if the thread kills itself with an
1815      * uncaught exception, see Exception.cmm.
1816      */
1817     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1818                (unsigned long)t->id, whatNext_strs[t->what_next]);
1819
1820     /* Inform the Hpc that a thread has finished */
1821     hs_hpc_thread_finished_event(t);
1822
1823 #if defined(GRAN)
1824       endThread(t, CurrentProc); // clean-up the thread
1825 #elif defined(PARALLEL_HASKELL)
1826       /* For now all are advisory -- HWL */
1827       //if(t->priority==AdvisoryPriority) ??
1828       advisory_thread_count--; // JB: Caution with this counter, buggy!
1829       
1830 # if defined(DIST)
1831       if(t->dist.priority==RevalPriority)
1832         FinishReval(t);
1833 # endif
1834     
1835 # if defined(EDENOLD)
1836       // the thread could still have an outport... (BUG)
1837       if (t->eden.outport != -1) {
1838       // delete the outport for the tso which has finished...
1839         IF_PAR_DEBUG(eden_ports,
1840                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1841                               t->eden.outport, t->id));
1842         deleteOPT(t);
1843       }
1844       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1845       if (t->eden.epid != -1) {
1846         IF_PAR_DEBUG(eden_ports,
1847                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1848                            t->id, t->eden.epid));
1849         removeTSOfromProcess(t);
1850       }
1851 # endif 
1852
1853 # if defined(PAR)
1854       if (RtsFlags.ParFlags.ParStats.Full &&
1855           !RtsFlags.ParFlags.ParStats.Suppressed) 
1856         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1857
1858       //  t->par only contains statistics: left out for now...
1859       IF_PAR_DEBUG(fish,
1860                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1861                               t->id,t,t->par.sparkname));
1862 # endif
1863 #endif // PARALLEL_HASKELL
1864
1865       //
1866       // Check whether the thread that just completed was a bound
1867       // thread, and if so return with the result.  
1868       //
1869       // There is an assumption here that all thread completion goes
1870       // through this point; we need to make sure that if a thread
1871       // ends up in the ThreadKilled state, that it stays on the run
1872       // queue so it can be dealt with here.
1873       //
1874
1875       if (t->bound) {
1876
1877           if (t->bound != task) {
1878 #if !defined(THREADED_RTS)
1879               // Must be a bound thread that is not the topmost one.  Leave
1880               // it on the run queue until the stack has unwound to the
1881               // point where we can deal with this.  Leaving it on the run
1882               // queue also ensures that the garbage collector knows about
1883               // this thread and its return value (it gets dropped from the
1884               // all_threads list so there's no other way to find it).
1885               appendToRunQueue(cap,t);
1886               return rtsFalse;
1887 #else
1888               // this cannot happen in the threaded RTS, because a
1889               // bound thread can only be run by the appropriate Task.
1890               barf("finished bound thread that isn't mine");
1891 #endif
1892           }
1893
1894           ASSERT(task->tso == t);
1895
1896           if (t->what_next == ThreadComplete) {
1897               if (task->ret) {
1898                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1899                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1900               }
1901               task->stat = Success;
1902           } else {
1903               if (task->ret) {
1904                   *(task->ret) = NULL;
1905               }
1906               if (sched_state >= SCHED_INTERRUPTING) {
1907                   task->stat = Interrupted;
1908               } else {
1909                   task->stat = Killed;
1910               }
1911           }
1912 #ifdef DEBUG
1913           removeThreadLabel((StgWord)task->tso->id);
1914 #endif
1915           return rtsTrue; // tells schedule() to return
1916       }
1917
1918       return rtsFalse;
1919 }
1920
1921 /* -----------------------------------------------------------------------------
1922  * Perform a heap census, if PROFILING
1923  * -------------------------------------------------------------------------- */
1924
1925 static rtsBool
1926 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1927 {
1928 #if defined(PROFILING)
1929     // When we have +RTS -i0 and we're heap profiling, do a census at
1930     // every GC.  This lets us get repeatable runs for debugging.
1931     if (performHeapProfile ||
1932         (RtsFlags.ProfFlags.profileInterval==0 &&
1933          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1934
1935         // checking black holes is necessary before GC, otherwise
1936         // there may be threads that are unreachable except by the
1937         // blackhole queue, which the GC will consider to be
1938         // deadlocked.
1939         scheduleCheckBlackHoles(&MainCapability);
1940
1941         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1942         GarbageCollect(rtsTrue);
1943
1944         debugTrace(DEBUG_sched, "performing heap census");
1945         heapCensus();
1946
1947         performHeapProfile = rtsFalse;
1948         return rtsTrue;  // true <=> we already GC'd
1949     }
1950 #endif
1951     return rtsFalse;
1952 }
1953
1954 /* -----------------------------------------------------------------------------
1955  * Perform a garbage collection if necessary
1956  * -------------------------------------------------------------------------- */
1957
1958 static Capability *
1959 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1960 {
1961     StgTSO *t;
1962 #ifdef THREADED_RTS
1963     static volatile StgWord waiting_for_gc;
1964     rtsBool was_waiting;
1965     nat i;
1966 #endif
1967
1968 #ifdef THREADED_RTS
1969     // In order to GC, there must be no threads running Haskell code.
1970     // Therefore, the GC thread needs to hold *all* the capabilities,
1971     // and release them after the GC has completed.  
1972     //
1973     // This seems to be the simplest way: previous attempts involved
1974     // making all the threads with capabilities give up their
1975     // capabilities and sleep except for the *last* one, which
1976     // actually did the GC.  But it's quite hard to arrange for all
1977     // the other tasks to sleep and stay asleep.
1978     //
1979         
1980     was_waiting = cas(&waiting_for_gc, 0, 1);
1981     if (was_waiting) {
1982         do {
1983             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1984             if (cap) yieldCapability(&cap,task);
1985         } while (waiting_for_gc);
1986         return cap;  // NOTE: task->cap might have changed here
1987     }
1988
1989     for (i=0; i < n_capabilities; i++) {
1990         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1991         if (cap != &capabilities[i]) {
1992             Capability *pcap = &capabilities[i];
1993             // we better hope this task doesn't get migrated to
1994             // another Capability while we're waiting for this one.
1995             // It won't, because load balancing happens while we have
1996             // all the Capabilities, but even so it's a slightly
1997             // unsavoury invariant.
1998             task->cap = pcap;
1999             context_switch = 1;
2000             waitForReturnCapability(&pcap, task);
2001             if (pcap != &capabilities[i]) {
2002                 barf("scheduleDoGC: got the wrong capability");
2003             }
2004         }
2005     }
2006
2007     waiting_for_gc = rtsFalse;
2008 #endif
2009
2010     /* Kick any transactions which are invalid back to their
2011      * atomically frames.  When next scheduled they will try to
2012      * commit, this commit will fail and they will retry.
2013      */
2014     { 
2015         StgTSO *next;
2016
2017         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2018             if (t->what_next == ThreadRelocated) {
2019                 next = t->link;
2020             } else {
2021                 next = t->global_link;
2022                 
2023                 // This is a good place to check for blocked
2024                 // exceptions.  It might be the case that a thread is
2025                 // blocked on delivering an exception to a thread that
2026                 // is also blocked - we try to ensure that this
2027                 // doesn't happen in throwTo(), but it's too hard (or
2028                 // impossible) to close all the race holes, so we
2029                 // accept that some might get through and deal with
2030                 // them here.  A GC will always happen at some point,
2031                 // even if the system is otherwise deadlocked.
2032                 maybePerformBlockedException (&capabilities[0], t);
2033
2034                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2035                     if (!stmValidateNestOfTransactions (t -> trec)) {
2036                         debugTrace(DEBUG_sched | DEBUG_stm,
2037                                    "trec %p found wasting its time", t);
2038                         
2039                         // strip the stack back to the
2040                         // ATOMICALLY_FRAME, aborting the (nested)
2041                         // transaction, and saving the stack of any
2042                         // partially-evaluated thunks on the heap.
2043                         throwToSingleThreaded_(&capabilities[0], t, 
2044                                                NULL, rtsTrue, NULL);
2045                         
2046 #ifdef REG_R1
2047                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2048 #endif
2049                     }
2050                 }
2051             }
2052         }
2053     }
2054     
2055     // so this happens periodically:
2056     if (cap) scheduleCheckBlackHoles(cap);
2057     
2058     IF_DEBUG(scheduler, printAllThreads());
2059
2060     /*
2061      * We now have all the capabilities; if we're in an interrupting
2062      * state, then we should take the opportunity to delete all the
2063      * threads in the system.
2064      */
2065     if (sched_state >= SCHED_INTERRUPTING) {
2066         deleteAllThreads(&capabilities[0]);
2067         sched_state = SCHED_SHUTTING_DOWN;
2068     }
2069
2070     /* everybody back, start the GC.
2071      * Could do it in this thread, or signal a condition var
2072      * to do it in another thread.  Either way, we need to
2073      * broadcast on gc_pending_cond afterward.
2074      */
2075 #if defined(THREADED_RTS)
2076     debugTrace(DEBUG_sched, "doing GC");
2077 #endif
2078     GarbageCollect(force_major);
2079     
2080 #if defined(THREADED_RTS)
2081     // release our stash of capabilities.
2082     for (i = 0; i < n_capabilities; i++) {
2083         if (cap != &capabilities[i]) {
2084             task->cap = &capabilities[i];
2085             releaseCapability(&capabilities[i]);
2086         }
2087     }
2088     if (cap) {
2089         task->cap = cap;
2090     } else {
2091         task->cap = NULL;
2092     }
2093 #endif
2094
2095 #if defined(GRAN)
2096     /* add a ContinueThread event to continue execution of current thread */
2097     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2098               ContinueThread,
2099               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2100     IF_GRAN_DEBUG(bq, 
2101                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2102                   G_EVENTQ(0);
2103                   G_CURR_THREADQ(0));
2104 #endif /* GRAN */
2105
2106     return cap;
2107 }
2108
2109 /* ---------------------------------------------------------------------------
2110  * Singleton fork(). Do not copy any running threads.
2111  * ------------------------------------------------------------------------- */
2112
2113 StgInt
2114 forkProcess(HsStablePtr *entry
2115 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2116             STG_UNUSED
2117 #endif
2118            )
2119 {
2120 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2121     Task *task;
2122     pid_t pid;
2123     StgTSO* t,*next;
2124     Capability *cap;
2125     
2126 #if defined(THREADED_RTS)
2127     if (RtsFlags.ParFlags.nNodes > 1) {
2128         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2129         stg_exit(EXIT_FAILURE);
2130     }
2131 #endif
2132
2133     debugTrace(DEBUG_sched, "forking!");
2134     
2135     // ToDo: for SMP, we should probably acquire *all* the capabilities
2136     cap = rts_lock();
2137     
2138     pid = fork();
2139     
2140     if (pid) { // parent
2141         
2142         // just return the pid
2143         rts_unlock(cap);
2144         return pid;
2145         
2146     } else { // child
2147         
2148         // Now, all OS threads except the thread that forked are
2149         // stopped.  We need to stop all Haskell threads, including
2150         // those involved in foreign calls.  Also we need to delete
2151         // all Tasks, because they correspond to OS threads that are
2152         // now gone.
2153
2154         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2155             if (t->what_next == ThreadRelocated) {
2156                 next = t->link;
2157             } else {
2158                 next = t->global_link;
2159                 // don't allow threads to catch the ThreadKilled
2160                 // exception, but we do want to raiseAsync() because these
2161                 // threads may be evaluating thunks that we need later.
2162                 deleteThread_(cap,t);
2163             }
2164         }
2165         
2166         // Empty the run queue.  It seems tempting to let all the
2167         // killed threads stay on the run queue as zombies to be
2168         // cleaned up later, but some of them correspond to bound
2169         // threads for which the corresponding Task does not exist.
2170         cap->run_queue_hd = END_TSO_QUEUE;
2171         cap->run_queue_tl = END_TSO_QUEUE;
2172
2173         // Any suspended C-calling Tasks are no more, their OS threads
2174         // don't exist now:
2175         cap->suspended_ccalling_tasks = NULL;
2176
2177         // Empty the all_threads list.  Otherwise, the garbage
2178         // collector may attempt to resurrect some of these threads.
2179         all_threads = END_TSO_QUEUE;
2180
2181         // Wipe the task list, except the current Task.
2182         ACQUIRE_LOCK(&sched_mutex);
2183         for (task = all_tasks; task != NULL; task=task->all_link) {
2184             if (task != cap->running_task) {
2185                 discardTask(task);
2186             }
2187         }
2188         RELEASE_LOCK(&sched_mutex);
2189
2190 #if defined(THREADED_RTS)
2191         // Wipe our spare workers list, they no longer exist.  New
2192         // workers will be created if necessary.
2193         cap->spare_workers = NULL;
2194         cap->returning_tasks_hd = NULL;
2195         cap->returning_tasks_tl = NULL;
2196 #endif
2197
2198         // On Unix, all timers are reset in the child, so we need to start
2199         // the timer again.
2200         startTimer();
2201
2202         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2203         rts_checkSchedStatus("forkProcess",cap);
2204         
2205         rts_unlock(cap);
2206         hs_exit();                      // clean up and exit
2207         stg_exit(EXIT_SUCCESS);
2208     }
2209 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2210     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2211     return -1;
2212 #endif
2213 }
2214
2215 /* ---------------------------------------------------------------------------
2216  * Delete all the threads in the system
2217  * ------------------------------------------------------------------------- */
2218    
2219 static void
2220 deleteAllThreads ( Capability *cap )
2221 {
2222     // NOTE: only safe to call if we own all capabilities.
2223
2224     StgTSO* t, *next;
2225     debugTrace(DEBUG_sched,"deleting all threads");
2226     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2227         if (t->what_next == ThreadRelocated) {
2228             next = t->link;
2229         } else {
2230             next = t->global_link;
2231             deleteThread(cap,t);
2232         }
2233     }      
2234
2235     // The run queue now contains a bunch of ThreadKilled threads.  We
2236     // must not throw these away: the main thread(s) will be in there
2237     // somewhere, and the main scheduler loop has to deal with it.
2238     // Also, the run queue is the only thing keeping these threads from
2239     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2240
2241 #if !defined(THREADED_RTS)
2242     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2243     ASSERT(sleeping_queue == END_TSO_QUEUE);
2244 #endif
2245 }
2246
2247 /* -----------------------------------------------------------------------------
2248    Managing the suspended_ccalling_tasks list.
2249    Locks required: sched_mutex
2250    -------------------------------------------------------------------------- */
2251
2252 STATIC_INLINE void
2253 suspendTask (Capability *cap, Task *task)
2254 {
2255     ASSERT(task->next == NULL && task->prev == NULL);
2256     task->next = cap->suspended_ccalling_tasks;
2257     task->prev = NULL;
2258     if (cap->suspended_ccalling_tasks) {
2259         cap->suspended_ccalling_tasks->prev = task;
2260     }
2261     cap->suspended_ccalling_tasks = task;
2262 }
2263
2264 STATIC_INLINE void
2265 recoverSuspendedTask (Capability *cap, Task *task)
2266 {
2267     if (task->prev) {
2268         task->prev->next = task->next;
2269     } else {
2270         ASSERT(cap->suspended_ccalling_tasks == task);
2271         cap->suspended_ccalling_tasks = task->next;
2272     }
2273     if (task->next) {
2274         task->next->prev = task->prev;
2275     }
2276     task->next = task->prev = NULL;
2277 }
2278
2279 /* ---------------------------------------------------------------------------
2280  * Suspending & resuming Haskell threads.
2281  * 
2282  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2283  * its capability before calling the C function.  This allows another
2284  * task to pick up the capability and carry on running Haskell
2285  * threads.  It also means that if the C call blocks, it won't lock
2286  * the whole system.
2287  *
2288  * The Haskell thread making the C call is put to sleep for the
2289  * duration of the call, on the susepended_ccalling_threads queue.  We
2290  * give out a token to the task, which it can use to resume the thread
2291  * on return from the C function.
2292  * ------------------------------------------------------------------------- */
2293    
2294 void *
2295 suspendThread (StgRegTable *reg)
2296 {
2297   Capability *cap;
2298   int saved_errno;
2299   StgTSO *tso;
2300   Task *task;
2301 #if mingw32_HOST_OS
2302   StgWord32 saved_winerror;
2303 #endif
2304
2305   saved_errno = errno;
2306 #if mingw32_HOST_OS
2307   saved_winerror = GetLastError();
2308 #endif
2309
2310   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2311    */
2312   cap = regTableToCapability(reg);
2313
2314   task = cap->running_task;
2315   tso = cap->r.rCurrentTSO;
2316
2317   debugTrace(DEBUG_sched, 
2318              "thread %lu did a safe foreign call", 
2319              (unsigned long)cap->r.rCurrentTSO->id);
2320
2321   // XXX this might not be necessary --SDM
2322   tso->what_next = ThreadRunGHC;
2323
2324   threadPaused(cap,tso);
2325
2326   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2327       tso->why_blocked = BlockedOnCCall;
2328       tso->flags |= TSO_BLOCKEX;
2329       tso->flags &= ~TSO_INTERRUPTIBLE;
2330   } else {
2331       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2332   }
2333
2334   // Hand back capability
2335   task->suspended_tso = tso;
2336
2337   ACQUIRE_LOCK(&cap->lock);
2338
2339   suspendTask(cap,task);
2340   cap->in_haskell = rtsFalse;
2341   releaseCapability_(cap);
2342   
2343   RELEASE_LOCK(&cap->lock);
2344
2345 #if defined(THREADED_RTS)
2346   /* Preparing to leave the RTS, so ensure there's a native thread/task
2347      waiting to take over.
2348   */
2349   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2350 #endif
2351
2352   errno = saved_errno;
2353 #if mingw32_HOST_OS
2354   SetLastError(saved_winerror);
2355 #endif
2356   return task;
2357 }
2358
2359 StgRegTable *
2360 resumeThread (void *task_)
2361 {
2362     StgTSO *tso;
2363     Capability *cap;
2364     Task *task = task_;
2365     int saved_errno;
2366 #if mingw32_HOST_OS
2367     StgWord32 saved_winerror;
2368 #endif
2369
2370     saved_errno = errno;
2371 #if mingw32_HOST_OS
2372     saved_winerror = GetLastError();
2373 #endif
2374
2375     cap = task->cap;
2376     // Wait for permission to re-enter the RTS with the result.
2377     waitForReturnCapability(&cap,task);
2378     // we might be on a different capability now... but if so, our
2379     // entry on the suspended_ccalling_tasks list will also have been
2380     // migrated.
2381
2382     // Remove the thread from the suspended list
2383     recoverSuspendedTask(cap,task);
2384
2385     tso = task->suspended_tso;
2386     task->suspended_tso = NULL;
2387     tso->link = END_TSO_QUEUE;
2388     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2389     
2390     if (tso->why_blocked == BlockedOnCCall) {
2391         awakenBlockedExceptionQueue(cap,tso);
2392         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2393     }
2394     
2395     /* Reset blocking status */
2396     tso->why_blocked  = NotBlocked;
2397     
2398     cap->r.rCurrentTSO = tso;
2399     cap->in_haskell = rtsTrue;
2400     errno = saved_errno;
2401 #if mingw32_HOST_OS
2402     SetLastError(saved_winerror);
2403 #endif
2404
2405     /* We might have GC'd, mark the TSO dirty again */
2406     dirtyTSO(tso);
2407
2408     IF_DEBUG(sanity, checkTSO(tso));
2409
2410     return &cap->r;
2411 }
2412
2413 /* ---------------------------------------------------------------------------
2414  * scheduleThread()
2415  *
2416  * scheduleThread puts a thread on the end  of the runnable queue.
2417  * This will usually be done immediately after a thread is created.
2418  * The caller of scheduleThread must create the thread using e.g.
2419  * createThread and push an appropriate closure
2420  * on this thread's stack before the scheduler is invoked.
2421  * ------------------------------------------------------------------------ */
2422
2423 void
2424 scheduleThread(Capability *cap, StgTSO *tso)
2425 {
2426     // The thread goes at the *end* of the run-queue, to avoid possible
2427     // starvation of any threads already on the queue.
2428     appendToRunQueue(cap,tso);
2429 }
2430
2431 void
2432 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2433 {
2434 #if defined(THREADED_RTS)
2435     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2436                               // move this thread from now on.
2437     cpu %= RtsFlags.ParFlags.nNodes;
2438     if (cpu == cap->no) {
2439         appendToRunQueue(cap,tso);
2440     } else {
2441         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2442     }
2443 #else
2444     appendToRunQueue(cap,tso);
2445 #endif
2446 }
2447
2448 Capability *
2449 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2450 {
2451     Task *task;
2452
2453     // We already created/initialised the Task
2454     task = cap->running_task;
2455
2456     // This TSO is now a bound thread; make the Task and TSO
2457     // point to each other.
2458     tso->bound = task;
2459     tso->cap = cap;
2460
2461     task->tso = tso;
2462     task->ret = ret;
2463     task->stat = NoStatus;
2464
2465     appendToRunQueue(cap,tso);
2466
2467     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2468
2469 #if defined(GRAN)
2470     /* GranSim specific init */
2471     CurrentTSO = m->tso;                // the TSO to run
2472     procStatus[MainProc] = Busy;        // status of main PE
2473     CurrentProc = MainProc;             // PE to run it on
2474 #endif
2475
2476     cap = schedule(cap,task);
2477
2478     ASSERT(task->stat != NoStatus);
2479     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2480
2481     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2482     return cap;
2483 }
2484
2485 /* ----------------------------------------------------------------------------
2486  * Starting Tasks
2487  * ------------------------------------------------------------------------- */
2488
2489 #if defined(THREADED_RTS)
2490 void
2491 workerStart(Task *task)
2492 {
2493     Capability *cap;
2494
2495     // See startWorkerTask().
2496     ACQUIRE_LOCK(&task->lock);
2497     cap = task->cap;
2498     RELEASE_LOCK(&task->lock);
2499
2500     // set the thread-local pointer to the Task:
2501     taskEnter(task);
2502
2503     // schedule() runs without a lock.
2504     cap = schedule(cap,task);
2505
2506     // On exit from schedule(), we have a Capability.
2507     releaseCapability(cap);
2508     workerTaskStop(task);
2509 }
2510 #endif
2511
2512 /* ---------------------------------------------------------------------------
2513  * initScheduler()
2514  *
2515  * Initialise the scheduler.  This resets all the queues - if the
2516  * queues contained any threads, they'll be garbage collected at the
2517  * next pass.
2518  *
2519  * ------------------------------------------------------------------------ */
2520
2521 void 
2522 initScheduler(void)
2523 {
2524 #if defined(GRAN)
2525   nat i;
2526   for (i=0; i<=MAX_PROC; i++) {
2527     run_queue_hds[i]      = END_TSO_QUEUE;
2528     run_queue_tls[i]      = END_TSO_QUEUE;
2529     blocked_queue_hds[i]  = END_TSO_QUEUE;
2530     blocked_queue_tls[i]  = END_TSO_QUEUE;
2531     ccalling_threadss[i]  = END_TSO_QUEUE;
2532     blackhole_queue[i]    = END_TSO_QUEUE;
2533     sleeping_queue        = END_TSO_QUEUE;
2534   }
2535 #elif !defined(THREADED_RTS)
2536   blocked_queue_hd  = END_TSO_QUEUE;
2537   blocked_queue_tl  = END_TSO_QUEUE;
2538   sleeping_queue    = END_TSO_QUEUE;
2539 #endif
2540
2541   blackhole_queue   = END_TSO_QUEUE;
2542   all_threads       = END_TSO_QUEUE;
2543
2544   context_switch = 0;
2545   sched_state    = SCHED_RUNNING;
2546
2547 #if defined(THREADED_RTS)
2548   /* Initialise the mutex and condition variables used by
2549    * the scheduler. */
2550   initMutex(&sched_mutex);
2551 #endif
2552   
2553   ACQUIRE_LOCK(&sched_mutex);
2554
2555   /* A capability holds the state a native thread needs in
2556    * order to execute STG code. At least one capability is
2557    * floating around (only THREADED_RTS builds have more than one).
2558    */
2559   initCapabilities();
2560
2561   initTaskManager();
2562
2563 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2564   initSparkPools();
2565 #endif
2566
2567 #if defined(THREADED_RTS)
2568   /*
2569    * Eagerly start one worker to run each Capability, except for
2570    * Capability 0.  The idea is that we're probably going to start a
2571    * bound thread on Capability 0 pretty soon, so we don't want a
2572    * worker task hogging it.
2573    */
2574   { 
2575       nat i;
2576       Capability *cap;
2577       for (i = 1; i < n_capabilities; i++) {
2578           cap = &capabilities[i];
2579           ACQUIRE_LOCK(&cap->lock);
2580           startWorkerTask(cap, workerStart);
2581           RELEASE_LOCK(&cap->lock);
2582       }
2583   }
2584 #endif
2585
2586   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2587
2588   RELEASE_LOCK(&sched_mutex);
2589 }
2590
2591 void
2592 exitScheduler( void )
2593 {
2594     Task *task = NULL;
2595
2596 #if defined(THREADED_RTS)
2597     ACQUIRE_LOCK(&sched_mutex);
2598     task = newBoundTask();
2599     RELEASE_LOCK(&sched_mutex);
2600 #endif
2601
2602     // If we haven't killed all the threads yet, do it now.
2603     if (sched_state < SCHED_SHUTTING_DOWN) {
2604         sched_state = SCHED_INTERRUPTING;
2605         scheduleDoGC(NULL,task,rtsFalse);    
2606     }
2607     sched_state = SCHED_SHUTTING_DOWN;
2608
2609 #if defined(THREADED_RTS)
2610     { 
2611         nat i;
2612         
2613         for (i = 0; i < n_capabilities; i++) {
2614             shutdownCapability(&capabilities[i], task);
2615         }
2616         boundTaskExiting(task);
2617         stopTaskManager();
2618     }
2619 #else
2620     freeCapability(&MainCapability);
2621 #endif
2622 }
2623
2624 void
2625 freeScheduler( void )
2626 {
2627     freeTaskManager();
2628     if (n_capabilities != 1) {
2629         stgFree(capabilities);
2630     }
2631 #if defined(THREADED_RTS)
2632     closeMutex(&sched_mutex);
2633 #endif
2634 }
2635
2636 /* ---------------------------------------------------------------------------
2637    Where are the roots that we know about?
2638
2639         - all the threads on the runnable queue
2640         - all the threads on the blocked queue
2641         - all the threads on the sleeping queue
2642         - all the thread currently executing a _ccall_GC
2643         - all the "main threads"
2644      
2645    ------------------------------------------------------------------------ */
2646
2647 /* This has to be protected either by the scheduler monitor, or by the
2648         garbage collection monitor (probably the latter).
2649         KH @ 25/10/99
2650 */
2651
2652 void
2653 GetRoots( evac_fn evac )
2654 {
2655     nat i;
2656     Capability *cap;
2657     Task *task;
2658
2659 #if defined(GRAN)
2660     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2661         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2662             evac((StgClosure **)&run_queue_hds[i]);
2663         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2664             evac((StgClosure **)&run_queue_tls[i]);
2665         
2666         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2667             evac((StgClosure **)&blocked_queue_hds[i]);
2668         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2669             evac((StgClosure **)&blocked_queue_tls[i]);
2670         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2671             evac((StgClosure **)&ccalling_threads[i]);
2672     }
2673
2674     markEventQueue();
2675
2676 #else /* !GRAN */
2677
2678     for (i = 0; i < n_capabilities; i++) {
2679         cap = &capabilities[i];
2680         evac((StgClosure **)(void *)&cap->run_queue_hd);
2681         evac((StgClosure **)(void *)&cap->run_queue_tl);
2682 #if defined(THREADED_RTS)
2683         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2684         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2685 #endif
2686         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2687              task=task->next) {
2688             debugTrace(DEBUG_sched,
2689                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2690             evac((StgClosure **)(void *)&task->suspended_tso);
2691         }
2692
2693     }
2694     
2695
2696 #if !defined(THREADED_RTS)
2697     evac((StgClosure **)(void *)&blocked_queue_hd);
2698     evac((StgClosure **)(void *)&blocked_queue_tl);
2699     evac((StgClosure **)(void *)&sleeping_queue);
2700 #endif 
2701 #endif
2702
2703     // evac((StgClosure **)&blackhole_queue);
2704
2705 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2706     markSparkQueue(evac);
2707 #endif
2708     
2709 #if defined(RTS_USER_SIGNALS)
2710     // mark the signal handlers (signals should be already blocked)
2711     markSignalHandlers(evac);
2712 #endif
2713 }
2714
2715 /* -----------------------------------------------------------------------------
2716    performGC
2717
2718    This is the interface to the garbage collector from Haskell land.
2719    We provide this so that external C code can allocate and garbage
2720    collect when called from Haskell via _ccall_GC.
2721    -------------------------------------------------------------------------- */
2722
2723 static void
2724 performGC_(rtsBool force_major)
2725 {
2726     Task *task;
2727     // We must grab a new Task here, because the existing Task may be
2728     // associated with a particular Capability, and chained onto the 
2729     // suspended_ccalling_tasks queue.
2730     ACQUIRE_LOCK(&sched_mutex);
2731     task = newBoundTask();
2732     RELEASE_LOCK(&sched_mutex);
2733     scheduleDoGC(NULL,task,force_major);
2734     boundTaskExiting(task);
2735 }
2736
2737 void
2738 performGC(void)
2739 {
2740     performGC_(rtsFalse);
2741 }
2742
2743 void
2744 performMajorGC(void)
2745 {
2746     performGC_(rtsTrue);
2747 }
2748
2749 /* -----------------------------------------------------------------------------
2750    Stack overflow
2751
2752    If the thread has reached its maximum stack size, then raise the
2753    StackOverflow exception in the offending thread.  Otherwise
2754    relocate the TSO into a larger chunk of memory and adjust its stack
2755    size appropriately.
2756    -------------------------------------------------------------------------- */
2757
2758 static StgTSO *
2759 threadStackOverflow(Capability *cap, StgTSO *tso)
2760 {
2761   nat new_stack_size, stack_words;
2762   lnat new_tso_size;
2763   StgPtr new_sp;
2764   StgTSO *dest;
2765
2766   IF_DEBUG(sanity,checkTSO(tso));
2767
2768   // don't allow throwTo() to modify the blocked_exceptions queue
2769   // while we are moving the TSO:
2770   lockClosure((StgClosure *)tso);
2771
2772   if (tso->stack_size >= tso->max_stack_size) {
2773
2774       debugTrace(DEBUG_gc,
2775                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2776                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2777       IF_DEBUG(gc,
2778                /* If we're debugging, just print out the top of the stack */
2779                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2780                                                 tso->sp+64)));
2781
2782       // Send this thread the StackOverflow exception
2783       unlockTSO(tso);
2784       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2785       return tso;
2786   }
2787
2788   /* Try to double the current stack size.  If that takes us over the
2789    * maximum stack size for this thread, then use the maximum instead.
2790    * Finally round up so the TSO ends up as a whole number of blocks.
2791    */
2792   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2793   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2794                                        TSO_STRUCT_SIZE)/sizeof(W_);
2795   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2796   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2797
2798   debugTrace(DEBUG_sched, 
2799              "increasing stack size from %ld words to %d.",
2800              (long)tso->stack_size, new_stack_size);
2801
2802   dest = (StgTSO *)allocate(new_tso_size);
2803   TICK_ALLOC_TSO(new_stack_size,0);
2804
2805   /* copy the TSO block and the old stack into the new area */
2806   memcpy(dest,tso,TSO_STRUCT_SIZE);
2807   stack_words = tso->stack + tso->stack_size - tso->sp;
2808   new_sp = (P_)dest + new_tso_size - stack_words;
2809   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2810
2811   /* relocate the stack pointers... */
2812   dest->sp         = new_sp;
2813   dest->stack_size = new_stack_size;
2814         
2815   /* Mark the old TSO as relocated.  We have to check for relocated
2816    * TSOs in the garbage collector and any primops that deal with TSOs.
2817    *
2818    * It's important to set the sp value to just beyond the end
2819    * of the stack, so we don't attempt to scavenge any part of the
2820    * dead TSO's stack.
2821    */
2822   tso->what_next = ThreadRelocated;
2823   tso->link = dest;
2824   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2825   tso->why_blocked = NotBlocked;
2826
2827   IF_PAR_DEBUG(verbose,
2828                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2829                      tso->id, tso, tso->stack_size);
2830                /* If we're debugging, just print out the top of the stack */
2831                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2832                                                 tso->sp+64)));
2833   
2834   unlockTSO(dest);
2835   unlockTSO(tso);
2836
2837   IF_DEBUG(sanity,checkTSO(dest));
2838 #if 0
2839   IF_DEBUG(scheduler,printTSO(dest));
2840 #endif
2841
2842   return dest;
2843 }
2844
2845 /* ---------------------------------------------------------------------------
2846    Interrupt execution
2847    - usually called inside a signal handler so it mustn't do anything fancy.   
2848    ------------------------------------------------------------------------ */
2849
2850 void
2851 interruptStgRts(void)
2852 {
2853     sched_state = SCHED_INTERRUPTING;
2854     context_switch = 1;
2855     wakeUpRts();
2856 }
2857
2858 /* -----------------------------------------------------------------------------
2859    Wake up the RTS
2860    
2861    This function causes at least one OS thread to wake up and run the
2862    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2863    an external event has arrived that may need servicing (eg. a
2864    keyboard interrupt).
2865
2866    In the single-threaded RTS we don't do anything here; we only have
2867    one thread anyway, and the event that caused us to want to wake up
2868    will have interrupted any blocking system call in progress anyway.
2869    -------------------------------------------------------------------------- */
2870
2871 void
2872 wakeUpRts(void)
2873 {
2874 #if defined(THREADED_RTS)
2875     // This forces the IO Manager thread to wakeup, which will
2876     // in turn ensure that some OS thread wakes up and runs the
2877     // scheduler loop, which will cause a GC and deadlock check.
2878     ioManagerWakeup();
2879 #endif
2880 }
2881
2882 /* -----------------------------------------------------------------------------
2883  * checkBlackHoles()
2884  *
2885  * Check the blackhole_queue for threads that can be woken up.  We do
2886  * this periodically: before every GC, and whenever the run queue is
2887  * empty.
2888  *
2889  * An elegant solution might be to just wake up all the blocked
2890  * threads with awakenBlockedQueue occasionally: they'll go back to
2891  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2892  * doesn't give us a way to tell whether we've actually managed to
2893  * wake up any threads, so we would be busy-waiting.
2894  *
2895  * -------------------------------------------------------------------------- */
2896
2897 static rtsBool
2898 checkBlackHoles (Capability *cap)
2899 {
2900     StgTSO **prev, *t;
2901     rtsBool any_woke_up = rtsFalse;
2902     StgHalfWord type;
2903
2904     // blackhole_queue is global:
2905     ASSERT_LOCK_HELD(&sched_mutex);
2906
2907     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2908
2909     // ASSUMES: sched_mutex
2910     prev = &blackhole_queue;
2911     t = blackhole_queue;
2912     while (t != END_TSO_QUEUE) {
2913         ASSERT(t->why_blocked == BlockedOnBlackHole);
2914         type = get_itbl(t->block_info.closure)->type;
2915         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2916             IF_DEBUG(sanity,checkTSO(t));
2917             t = unblockOne(cap, t);
2918             // urk, the threads migrate to the current capability
2919             // here, but we'd like to keep them on the original one.
2920             *prev = t;
2921             any_woke_up = rtsTrue;
2922         } else {
2923             prev = &t->link;
2924             t = t->link;
2925         }
2926     }
2927
2928     return any_woke_up;
2929 }
2930
2931 /* -----------------------------------------------------------------------------
2932    Deleting threads
2933
2934    This is used for interruption (^C) and forking, and corresponds to
2935    raising an exception but without letting the thread catch the
2936    exception.
2937    -------------------------------------------------------------------------- */
2938
2939 static void 
2940 deleteThread (Capability *cap, StgTSO *tso)
2941 {
2942     // NOTE: must only be called on a TSO that we have exclusive
2943     // access to, because we will call throwToSingleThreaded() below.
2944     // The TSO must be on the run queue of the Capability we own, or 
2945     // we must own all Capabilities.
2946
2947     if (tso->why_blocked != BlockedOnCCall &&
2948         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2949         throwToSingleThreaded(cap,tso,NULL);
2950     }
2951 }
2952
2953 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2954 static void 
2955 deleteThread_(Capability *cap, StgTSO *tso)
2956 { // for forkProcess only:
2957   // like deleteThread(), but we delete threads in foreign calls, too.
2958
2959     if (tso->why_blocked == BlockedOnCCall ||
2960         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2961         unblockOne(cap,tso);
2962         tso->what_next = ThreadKilled;
2963     } else {
2964         deleteThread(cap,tso);
2965     }
2966 }
2967 #endif
2968
2969 /* -----------------------------------------------------------------------------
2970    raiseExceptionHelper
2971    
2972    This function is called by the raise# primitve, just so that we can
2973    move some of the tricky bits of raising an exception from C-- into
2974    C.  Who knows, it might be a useful re-useable thing here too.
2975    -------------------------------------------------------------------------- */
2976
2977 StgWord
2978 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2979 {
2980     Capability *cap = regTableToCapability(reg);
2981     StgThunk *raise_closure = NULL;
2982     StgPtr p, next;
2983     StgRetInfoTable *info;
2984     //
2985     // This closure represents the expression 'raise# E' where E
2986     // is the exception raise.  It is used to overwrite all the
2987     // thunks which are currently under evaluataion.
2988     //
2989
2990     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2991     // LDV profiling: stg_raise_info has THUNK as its closure
2992     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2993     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2994     // 1 does not cause any problem unless profiling is performed.
2995     // However, when LDV profiling goes on, we need to linearly scan
2996     // small object pool, where raise_closure is stored, so we should
2997     // use MIN_UPD_SIZE.
2998     //
2999     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3000     //                                 sizeofW(StgClosure)+1);
3001     //
3002
3003     //
3004     // Walk up the stack, looking for the catch frame.  On the way,
3005     // we update any closures pointed to from update frames with the
3006     // raise closure that we just built.
3007     //
3008     p = tso->sp;
3009     while(1) {
3010         info = get_ret_itbl((StgClosure *)p);
3011         next = p + stack_frame_sizeW((StgClosure *)p);
3012         switch (info->i.type) {
3013             
3014         case UPDATE_FRAME:
3015             // Only create raise_closure if we need to.
3016             if (raise_closure == NULL) {
3017                 raise_closure = 
3018                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3019                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3020                 raise_closure->payload[0] = exception;
3021             }
3022             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3023             p = next;
3024             continue;
3025
3026         case ATOMICALLY_FRAME:
3027             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3028             tso->sp = p;
3029             return ATOMICALLY_FRAME;
3030             
3031         case CATCH_FRAME:
3032             tso->sp = p;
3033             return CATCH_FRAME;
3034
3035         case CATCH_STM_FRAME:
3036             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3037             tso->sp = p;
3038             return CATCH_STM_FRAME;
3039             
3040         case STOP_FRAME:
3041             tso->sp = p;
3042             return STOP_FRAME;
3043
3044         case CATCH_RETRY_FRAME:
3045         default:
3046             p = next; 
3047             continue;
3048         }
3049     }
3050 }
3051
3052
3053 /* -----------------------------------------------------------------------------
3054    findRetryFrameHelper
3055
3056    This function is called by the retry# primitive.  It traverses the stack
3057    leaving tso->sp referring to the frame which should handle the retry.  
3058
3059    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3060    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3061
3062    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3063    create) because retries are not considered to be exceptions, despite the
3064    similar implementation.
3065
3066    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3067    not be created within memory transactions.
3068    -------------------------------------------------------------------------- */
3069
3070 StgWord
3071 findRetryFrameHelper (StgTSO *tso)
3072 {
3073   StgPtr           p, next;
3074   StgRetInfoTable *info;
3075
3076   p = tso -> sp;
3077   while (1) {
3078     info = get_ret_itbl((StgClosure *)p);
3079     next = p + stack_frame_sizeW((StgClosure *)p);
3080     switch (info->i.type) {
3081       
3082     case ATOMICALLY_FRAME:
3083         debugTrace(DEBUG_stm,
3084                    "found ATOMICALLY_FRAME at %p during retry", p);
3085         tso->sp = p;
3086         return ATOMICALLY_FRAME;
3087       
3088     case CATCH_RETRY_FRAME:
3089         debugTrace(DEBUG_stm,
3090                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3091         tso->sp = p;
3092         return CATCH_RETRY_FRAME;
3093       
3094     case CATCH_STM_FRAME: {
3095         debugTrace(DEBUG_stm,
3096                    "found CATCH_STM_FRAME at %p during retry", p);
3097         StgTRecHeader *trec = tso -> trec;
3098         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3099         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3100         stmAbortTransaction(tso -> cap, trec);
3101         stmFreeAbortedTRec(tso -> cap, trec);
3102         tso -> trec = outer;
3103         p = next; 
3104         continue;
3105     }
3106       
3107
3108     default:
3109       ASSERT(info->i.type != CATCH_FRAME);
3110       ASSERT(info->i.type != STOP_FRAME);
3111       p = next; 
3112       continue;
3113     }
3114   }
3115 }
3116
3117 /* -----------------------------------------------------------------------------
3118    resurrectThreads is called after garbage collection on the list of
3119    threads found to be garbage.  Each of these threads will be woken
3120    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3121    on an MVar, or NonTermination if the thread was blocked on a Black
3122    Hole.
3123
3124    Locks: assumes we hold *all* the capabilities.
3125    -------------------------------------------------------------------------- */
3126
3127 void
3128 resurrectThreads (StgTSO *threads)
3129 {
3130     StgTSO *tso, *next;
3131     Capability *cap;
3132
3133     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3134         next = tso->global_link;
3135         tso->global_link = all_threads;
3136         all_threads = tso;
3137         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3138         
3139         // Wake up the thread on the Capability it was last on
3140         cap = tso->cap;
3141         
3142         switch (tso->why_blocked) {
3143         case BlockedOnMVar:
3144         case BlockedOnException:
3145             /* Called by GC - sched_mutex lock is currently held. */
3146             throwToSingleThreaded(cap, tso,
3147                                   (StgClosure *)BlockedOnDeadMVar_closure);
3148             break;
3149         case BlockedOnBlackHole:
3150             throwToSingleThreaded(cap, tso,
3151                                   (StgClosure *)NonTermination_closure);
3152             break;
3153         case BlockedOnSTM:
3154             throwToSingleThreaded(cap, tso,
3155                                   (StgClosure *)BlockedIndefinitely_closure);
3156             break;
3157         case NotBlocked:
3158             /* This might happen if the thread was blocked on a black hole
3159              * belonging to a thread that we've just woken up (raiseAsync
3160              * can wake up threads, remember...).
3161              */
3162             continue;
3163         default:
3164             barf("resurrectThreads: thread blocked in a strange way");
3165         }
3166     }
3167 }