Split GC.c, and move storage manager into sm/ directory
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #ifdef PROFILING
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #endif
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
37 # include "GranSim.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
41 # include "FetchMe.h"
42 # include "HLC.h"
43 #endif
44 #include "Sparks.h"
45 #include "Capability.h"
46 #include "Task.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
50 #endif
51 #include "Trace.h"
52 #include "RaiseAsync.h"
53 #include "Threads.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122  * Used for detecting garbage collected threads.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128  * LOCK: none (just an advisory flag)
129  */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133  * LOCK: currently none, perhaps we should lock (but needs to be
134  * updated in the fast path of the scheduler).
135  */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139  * LOCK: none (changes once, from false->true)
140  */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 #if defined(GRAN)
144 StgTSO *CurrentTSO;
145 #endif
146
147 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
148  *  exists - earlier gccs apparently didn't.
149  *  -= chak
150  */
151 StgTSO dummy_tso;
152
153 /*
154  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155  * in an MT setting, needed to signal that a worker thread shouldn't hang around
156  * in the scheduler when it is out of work.
157  */
158 rtsBool shutting_down_scheduler = rtsFalse;
159
160 /*
161  * This mutex protects most of the global scheduler data in
162  * the THREADED_RTS runtime.
163  */
164 #if defined(THREADED_RTS)
165 Mutex sched_mutex;
166 #endif
167
168 #if defined(PARALLEL_HASKELL)
169 StgTSO *LastTSO;
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
172 #endif
173
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
176 #endif
177
178 /* -----------------------------------------------------------------------------
179  * static function prototypes
180  * -------------------------------------------------------------------------- */
181
182 static Capability *schedule (Capability *initialCapability, Task *task);
183
184 //
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
188 //
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
192 #endif
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
198 #if defined(GRAN)
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
200 #endif
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
205 #endif
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
208 #endif
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
212                                          StgTSO *t);
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
214                                     nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
217                                              StgTSO *t );
218 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220                                 rtsBool force_major);
221
222 static rtsBool checkBlackHoles(Capability *cap);
223
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
225
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
228
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
231 #endif
232
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);  
236 #endif
237
238 #ifdef DEBUG
239 static char *whatNext_strs[] = {
240   "(unknown)",
241   "ThreadRunGHC",
242   "ThreadInterpret",
243   "ThreadKilled",
244   "ThreadRelocated",
245   "ThreadComplete"
246 };
247 #endif
248
249 /* -----------------------------------------------------------------------------
250  * Putting a thread on the run queue: different scheduling policies
251  * -------------------------------------------------------------------------- */
252
253 STATIC_INLINE void
254 addToRunQueue( Capability *cap, StgTSO *t )
255 {
256 #if defined(PARALLEL_HASKELL)
257     if (RtsFlags.ParFlags.doFairScheduling) { 
258         // this does round-robin scheduling; good for concurrency
259         appendToRunQueue(cap,t);
260     } else {
261         // this does unfair scheduling; good for parallelism
262         pushOnRunQueue(cap,t);
263     }
264 #else
265     // this does round-robin scheduling; good for concurrency
266     appendToRunQueue(cap,t);
267 #endif
268 }
269
270 /* ---------------------------------------------------------------------------
271    Main scheduling loop.
272
273    We use round-robin scheduling, each thread returning to the
274    scheduler loop when one of these conditions is detected:
275
276       * out of heap space
277       * timer expires (thread yields)
278       * thread blocks
279       * thread ends
280       * stack overflow
281
282    GRAN version:
283      In a GranSim setup this loop iterates over the global event queue.
284      This revolves around the global event queue, which determines what 
285      to do next. Therefore, it's more complicated than either the 
286      concurrent or the parallel (GUM) setup.
287
288    GUM version:
289      GUM iterates over incoming messages.
290      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291      and sends out a fish whenever it has nothing to do; in-between
292      doing the actual reductions (shared code below) it processes the
293      incoming messages and deals with delayed operations 
294      (see PendingFetches).
295      This is not the ugliest code you could imagine, but it's bloody close.
296
297    ------------------------------------------------------------------------ */
298
299 static Capability *
300 schedule (Capability *initialCapability, Task *task)
301 {
302   StgTSO *t;
303   Capability *cap;
304   StgThreadReturnCode ret;
305 #if defined(GRAN)
306   rtsEvent *event;
307 #elif defined(PARALLEL_HASKELL)
308   StgTSO *tso;
309   GlobalTaskId pe;
310   rtsBool receivedFinish = rtsFalse;
311 # if defined(DEBUG)
312   nat tp_size, sp_size; // stats only
313 # endif
314 #endif
315   nat prev_what_next;
316   rtsBool ready_to_gc;
317 #if defined(THREADED_RTS)
318   rtsBool first = rtsTrue;
319 #endif
320   
321   cap = initialCapability;
322
323   // Pre-condition: this task owns initialCapability.
324   // The sched_mutex is *NOT* held
325   // NB. on return, we still hold a capability.
326
327   debugTrace (DEBUG_sched, 
328               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329               task, initialCapability);
330
331   schedulePreLoop();
332
333   // -----------------------------------------------------------
334   // Scheduler loop starts here:
335
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION        (!receivedFinish)
338 #elif defined(GRAN)
339 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
340 #else
341 #define TERMINATION_CONDITION        rtsTrue
342 #endif
343
344   while (TERMINATION_CONDITION) {
345
346 #if defined(GRAN)
347       /* Choose the processor with the next event */
348       CurrentProc = event->proc;
349       CurrentTSO = event->tso;
350 #endif
351
352 #if defined(THREADED_RTS)
353       if (first) {
354           // don't yield the first time, we want a chance to run this
355           // thread for a bit, even if there are others banging at the
356           // door.
357           first = rtsFalse;
358           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
359       } else {
360           // Yield the capability to higher-priority tasks if necessary.
361           yieldCapability(&cap, task);
362       }
363 #endif
364       
365 #if defined(THREADED_RTS)
366       schedulePushWork(cap,task);
367 #endif
368
369     // Check whether we have re-entered the RTS from Haskell without
370     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
371     // call).
372     if (cap->in_haskell) {
373           errorBelch("schedule: re-entered unsafely.\n"
374                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
375           stg_exit(EXIT_FAILURE);
376     }
377
378     // The interruption / shutdown sequence.
379     // 
380     // In order to cleanly shut down the runtime, we want to:
381     //   * make sure that all main threads return to their callers
382     //     with the state 'Interrupted'.
383     //   * clean up all OS threads assocated with the runtime
384     //   * free all memory etc.
385     //
386     // So the sequence for ^C goes like this:
387     //
388     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
389     //     arranges for some Capability to wake up
390     //
391     //   * all threads in the system are halted, and the zombies are
392     //     placed on the run queue for cleaning up.  We acquire all
393     //     the capabilities in order to delete the threads, this is
394     //     done by scheduleDoGC() for convenience (because GC already
395     //     needs to acquire all the capabilities).  We can't kill
396     //     threads involved in foreign calls.
397     // 
398     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
399     //
400     //   * sched_state := SCHED_SHUTTING_DOWN
401     //
402     //   * all workers exit when the run queue on their capability
403     //     drains.  All main threads will also exit when their TSO
404     //     reaches the head of the run queue and they can return.
405     //
406     //   * eventually all Capabilities will shut down, and the RTS can
407     //     exit.
408     //
409     //   * We might be left with threads blocked in foreign calls, 
410     //     we should really attempt to kill these somehow (TODO);
411     
412     switch (sched_state) {
413     case SCHED_RUNNING:
414         break;
415     case SCHED_INTERRUPTING:
416         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418         discardSparksCap(cap);
419 #endif
420         /* scheduleDoGC() deletes all the threads */
421         cap = scheduleDoGC(cap,task,rtsFalse);
422         break;
423     case SCHED_SHUTTING_DOWN:
424         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425         // If we are a worker, just exit.  If we're a bound thread
426         // then we will exit below when we've removed our TSO from
427         // the run queue.
428         if (task->tso == NULL && emptyRunQueue(cap)) {
429             return cap;
430         }
431         break;
432     default:
433         barf("sched_state: %d", sched_state);
434     }
435
436 #if defined(THREADED_RTS)
437     // If the run queue is empty, take a spark and turn it into a thread.
438     {
439         if (emptyRunQueue(cap)) {
440             StgClosure *spark;
441             spark = findSpark(cap);
442             if (spark != NULL) {
443                 debugTrace(DEBUG_sched,
444                            "turning spark of closure %p into a thread",
445                            (StgClosure *)spark);
446                 createSparkThread(cap,spark);     
447             }
448         }
449     }
450 #endif // THREADED_RTS
451
452     scheduleStartSignalHandlers(cap);
453
454     // Only check the black holes here if we've nothing else to do.
455     // During normal execution, the black hole list only gets checked
456     // at GC time, to avoid repeatedly traversing this possibly long
457     // list each time around the scheduler.
458     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
459
460     scheduleCheckWakeupThreads(cap);
461
462     scheduleCheckBlockedThreads(cap);
463
464     scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466     cap = task->cap;    // reload cap, it might have changed
467 #endif
468
469     // Normally, the only way we can get here with no threads to
470     // run is if a keyboard interrupt received during 
471     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472     // Additionally, it is not fatal for the
473     // threaded RTS to reach here with no threads to run.
474     //
475     // win32: might be here due to awaitEvent() being abandoned
476     // as a result of a console event having been delivered.
477     if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479         ASSERT(sched_state >= SCHED_INTERRUPTING);
480 #endif
481         continue; // nothing to do
482     }
483
484 #if defined(PARALLEL_HASKELL)
485     scheduleSendPendingMessages();
486     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
487         continue;
488
489 #if defined(SPARKS)
490     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
491 #endif
492
493     /* If we still have no work we need to send a FISH to get a spark
494        from another PE */
495     if (emptyRunQueue(cap)) {
496         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497         ASSERT(rtsFalse); // should not happen at the moment
498     }
499     // from here: non-empty run queue.
500     //  TODO: merge above case with this, only one call processMessages() !
501     if (PacketsWaiting()) {  /* process incoming messages, if
502                                 any pending...  only in else
503                                 because getRemoteWork waits for
504                                 messages as well */
505         receivedFinish = processMessages();
506     }
507 #endif
508
509 #if defined(GRAN)
510     scheduleProcessEvent(event);
511 #endif
512
513     // 
514     // Get a thread to run
515     //
516     t = popRunQueue(cap);
517
518 #if defined(GRAN) || defined(PAR)
519     scheduleGranParReport(); // some kind of debuging output
520 #else
521     // Sanity check the thread we're about to run.  This can be
522     // expensive if there is lots of thread switching going on...
523     IF_DEBUG(sanity,checkTSO(t));
524 #endif
525
526 #if defined(THREADED_RTS)
527     // Check whether we can run this thread in the current task.
528     // If not, we have to pass our capability to the right task.
529     {
530         Task *bound = t->bound;
531       
532         if (bound) {
533             if (bound == task) {
534                 debugTrace(DEBUG_sched,
535                            "### Running thread %lu in bound thread", (unsigned long)t->id);
536                 // yes, the Haskell thread is bound to the current native thread
537             } else {
538                 debugTrace(DEBUG_sched,
539                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
540                 // no, bound to a different Haskell thread: pass to that thread
541                 pushOnRunQueue(cap,t);
542                 continue;
543             }
544         } else {
545             // The thread we want to run is unbound.
546             if (task->tso) { 
547                 debugTrace(DEBUG_sched,
548                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549                 // no, the current native thread is bound to a different
550                 // Haskell thread, so pass it to any worker thread
551                 pushOnRunQueue(cap,t);
552                 continue; 
553             }
554         }
555     }
556 #endif
557
558     cap->r.rCurrentTSO = t;
559     
560     /* context switches are initiated by the timer signal, unless
561      * the user specified "context switch as often as possible", with
562      * +RTS -C0
563      */
564     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565         && !emptyThreadQueues(cap)) {
566         context_switch = 1;
567     }
568          
569 run_thread:
570
571     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
572                               (long)t->id, whatNext_strs[t->what_next]);
573
574 #if defined(PROFILING)
575     startHeapProfTimer();
576 #endif
577
578     // Check for exceptions blocked on this thread
579     maybePerformBlockedException (cap, t);
580
581     // ----------------------------------------------------------------------
582     // Run the current thread 
583
584     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
585     ASSERT(t->cap == cap);
586
587     prev_what_next = t->what_next;
588
589     errno = t->saved_errno;
590     cap->in_haskell = rtsTrue;
591
592     dirtyTSO(t);
593
594     recent_activity = ACTIVITY_YES;
595
596     switch (prev_what_next) {
597         
598     case ThreadKilled:
599     case ThreadComplete:
600         /* Thread already finished, return to scheduler. */
601         ret = ThreadFinished;
602         break;
603         
604     case ThreadRunGHC:
605     {
606         StgRegTable *r;
607         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
608         cap = regTableToCapability(r);
609         ret = r->rRet;
610         break;
611     }
612     
613     case ThreadInterpret:
614         cap = interpretBCO(cap);
615         ret = cap->r.rRet;
616         break;
617         
618     default:
619         barf("schedule: invalid what_next field");
620     }
621
622     cap->in_haskell = rtsFalse;
623
624     // The TSO might have moved, eg. if it re-entered the RTS and a GC
625     // happened.  So find the new location:
626     t = cap->r.rCurrentTSO;
627
628     // We have run some Haskell code: there might be blackhole-blocked
629     // threads to wake up now.
630     // Lock-free test here should be ok, we're just setting a flag.
631     if ( blackhole_queue != END_TSO_QUEUE ) {
632         blackholes_need_checking = rtsTrue;
633     }
634     
635     // And save the current errno in this thread.
636     // XXX: possibly bogus for SMP because this thread might already
637     // be running again, see code below.
638     t->saved_errno = errno;
639
640 #if defined(THREADED_RTS)
641     // If ret is ThreadBlocked, and this Task is bound to the TSO that
642     // blocked, we are in limbo - the TSO is now owned by whatever it
643     // is blocked on, and may in fact already have been woken up,
644     // perhaps even on a different Capability.  It may be the case
645     // that task->cap != cap.  We better yield this Capability
646     // immediately and return to normaility.
647     if (ret == ThreadBlocked) {
648         debugTrace(DEBUG_sched,
649                    "--<< thread %lu (%s) stopped: blocked",
650                    (unsigned long)t->id, whatNext_strs[t->what_next]);
651         continue;
652     }
653 #endif
654
655     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
656     ASSERT(t->cap == cap);
657
658     // ----------------------------------------------------------------------
659     
660     // Costs for the scheduler are assigned to CCS_SYSTEM
661 #if defined(PROFILING)
662     stopHeapProfTimer();
663     CCCS = CCS_SYSTEM;
664 #endif
665     
666     schedulePostRunThread();
667
668     ready_to_gc = rtsFalse;
669
670     switch (ret) {
671     case HeapOverflow:
672         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
673         break;
674
675     case StackOverflow:
676         scheduleHandleStackOverflow(cap,task,t);
677         break;
678
679     case ThreadYielding:
680         if (scheduleHandleYield(cap, t, prev_what_next)) {
681             // shortcut for switching between compiler/interpreter:
682             goto run_thread; 
683         }
684         break;
685
686     case ThreadBlocked:
687         scheduleHandleThreadBlocked(t);
688         break;
689
690     case ThreadFinished:
691         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
692         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
693         break;
694
695     default:
696       barf("schedule: invalid thread return code %d", (int)ret);
697     }
698
699     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
700     if (ready_to_gc) {
701       cap = scheduleDoGC(cap,task,rtsFalse);
702     }
703   } /* end of while() */
704
705   debugTrace(PAR_DEBUG_verbose,
706              "== Leaving schedule() after having received Finish");
707 }
708
709 /* ----------------------------------------------------------------------------
710  * Setting up the scheduler loop
711  * ------------------------------------------------------------------------- */
712
713 static void
714 schedulePreLoop(void)
715 {
716 #if defined(GRAN) 
717     /* set up first event to get things going */
718     /* ToDo: assign costs for system setup and init MainTSO ! */
719     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
720               ContinueThread, 
721               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
722     
723     debugTrace (DEBUG_gran,
724                 "GRAN: Init CurrentTSO (in schedule) = %p", 
725                 CurrentTSO);
726     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
727     
728     if (RtsFlags.GranFlags.Light) {
729         /* Save current time; GranSim Light only */
730         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
731     }      
732 #endif
733 }
734
735 /* -----------------------------------------------------------------------------
736  * schedulePushWork()
737  *
738  * Push work to other Capabilities if we have some.
739  * -------------------------------------------------------------------------- */
740
741 #if defined(THREADED_RTS)
742 static void
743 schedulePushWork(Capability *cap USED_IF_THREADS, 
744                  Task *task      USED_IF_THREADS)
745 {
746     Capability *free_caps[n_capabilities], *cap0;
747     nat i, n_free_caps;
748
749     // migration can be turned off with +RTS -qg
750     if (!RtsFlags.ParFlags.migrate) return;
751
752     // Check whether we have more threads on our run queue, or sparks
753     // in our pool, that we could hand to another Capability.
754     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
755         && sparkPoolSizeCap(cap) < 2) {
756         return;
757     }
758
759     // First grab as many free Capabilities as we can.
760     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
761         cap0 = &capabilities[i];
762         if (cap != cap0 && tryGrabCapability(cap0,task)) {
763             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
764                 // it already has some work, we just grabbed it at 
765                 // the wrong moment.  Or maybe it's deadlocked!
766                 releaseCapability(cap0);
767             } else {
768                 free_caps[n_free_caps++] = cap0;
769             }
770         }
771     }
772
773     // we now have n_free_caps free capabilities stashed in
774     // free_caps[].  Share our run queue equally with them.  This is
775     // probably the simplest thing we could do; improvements we might
776     // want to do include:
777     //
778     //   - giving high priority to moving relatively new threads, on 
779     //     the gournds that they haven't had time to build up a
780     //     working set in the cache on this CPU/Capability.
781     //
782     //   - giving low priority to moving long-lived threads
783
784     if (n_free_caps > 0) {
785         StgTSO *prev, *t, *next;
786         rtsBool pushed_to_all;
787
788         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
789
790         i = 0;
791         pushed_to_all = rtsFalse;
792
793         if (cap->run_queue_hd != END_TSO_QUEUE) {
794             prev = cap->run_queue_hd;
795             t = prev->link;
796             prev->link = END_TSO_QUEUE;
797             for (; t != END_TSO_QUEUE; t = next) {
798                 next = t->link;
799                 t->link = END_TSO_QUEUE;
800                 if (t->what_next == ThreadRelocated
801                     || t->bound == task // don't move my bound thread
802                     || tsoLocked(t)) {  // don't move a locked thread
803                     prev->link = t;
804                     prev = t;
805                 } else if (i == n_free_caps) {
806                     pushed_to_all = rtsTrue;
807                     i = 0;
808                     // keep one for us
809                     prev->link = t;
810                     prev = t;
811                 } else {
812                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
813                     appendToRunQueue(free_caps[i],t);
814                     if (t->bound) { t->bound->cap = free_caps[i]; }
815                     t->cap = free_caps[i];
816                     i++;
817                 }
818             }
819             cap->run_queue_tl = prev;
820         }
821
822         // If there are some free capabilities that we didn't push any
823         // threads to, then try to push a spark to each one.
824         if (!pushed_to_all) {
825             StgClosure *spark;
826             // i is the next free capability to push to
827             for (; i < n_free_caps; i++) {
828                 if (emptySparkPoolCap(free_caps[i])) {
829                     spark = findSpark(cap);
830                     if (spark != NULL) {
831                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
832                         newSpark(&(free_caps[i]->r), spark);
833                     }
834                 }
835             }
836         }
837
838         // release the capabilities
839         for (i = 0; i < n_free_caps; i++) {
840             task->cap = free_caps[i];
841             releaseCapability(free_caps[i]);
842         }
843     }
844     task->cap = cap; // reset to point to our Capability.
845 }
846 #endif
847
848 /* ----------------------------------------------------------------------------
849  * Start any pending signal handlers
850  * ------------------------------------------------------------------------- */
851
852 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
853 static void
854 scheduleStartSignalHandlers(Capability *cap)
855 {
856     if (signals_pending()) { // safe outside the lock
857         startSignalHandlers(cap);
858     }
859 }
860 #else
861 static void
862 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
863 {
864 }
865 #endif
866
867 /* ----------------------------------------------------------------------------
868  * Check for blocked threads that can be woken up.
869  * ------------------------------------------------------------------------- */
870
871 static void
872 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
873 {
874 #if !defined(THREADED_RTS)
875     //
876     // Check whether any waiting threads need to be woken up.  If the
877     // run queue is empty, and there are no other tasks running, we
878     // can wait indefinitely for something to happen.
879     //
880     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
881     {
882         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
883     }
884 #endif
885 }
886
887
888 /* ----------------------------------------------------------------------------
889  * Check for threads woken up by other Capabilities
890  * ------------------------------------------------------------------------- */
891
892 static void
893 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
894 {
895 #if defined(THREADED_RTS)
896     // Any threads that were woken up by other Capabilities get
897     // appended to our run queue.
898     if (!emptyWakeupQueue(cap)) {
899         ACQUIRE_LOCK(&cap->lock);
900         if (emptyRunQueue(cap)) {
901             cap->run_queue_hd = cap->wakeup_queue_hd;
902             cap->run_queue_tl = cap->wakeup_queue_tl;
903         } else {
904             cap->run_queue_tl->link = cap->wakeup_queue_hd;
905             cap->run_queue_tl = cap->wakeup_queue_tl;
906         }
907         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
908         RELEASE_LOCK(&cap->lock);
909     }
910 #endif
911 }
912
913 /* ----------------------------------------------------------------------------
914  * Check for threads blocked on BLACKHOLEs that can be woken up
915  * ------------------------------------------------------------------------- */
916 static void
917 scheduleCheckBlackHoles (Capability *cap)
918 {
919     if ( blackholes_need_checking ) // check without the lock first
920     {
921         ACQUIRE_LOCK(&sched_mutex);
922         if ( blackholes_need_checking ) {
923             checkBlackHoles(cap);
924             blackholes_need_checking = rtsFalse;
925         }
926         RELEASE_LOCK(&sched_mutex);
927     }
928 }
929
930 /* ----------------------------------------------------------------------------
931  * Detect deadlock conditions and attempt to resolve them.
932  * ------------------------------------------------------------------------- */
933
934 static void
935 scheduleDetectDeadlock (Capability *cap, Task *task)
936 {
937
938 #if defined(PARALLEL_HASKELL)
939     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
940     return;
941 #endif
942
943     /* 
944      * Detect deadlock: when we have no threads to run, there are no
945      * threads blocked, waiting for I/O, or sleeping, and all the
946      * other tasks are waiting for work, we must have a deadlock of
947      * some description.
948      */
949     if ( emptyThreadQueues(cap) )
950     {
951 #if defined(THREADED_RTS)
952         /* 
953          * In the threaded RTS, we only check for deadlock if there
954          * has been no activity in a complete timeslice.  This means
955          * we won't eagerly start a full GC just because we don't have
956          * any threads to run currently.
957          */
958         if (recent_activity != ACTIVITY_INACTIVE) return;
959 #endif
960
961         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
962
963         // Garbage collection can release some new threads due to
964         // either (a) finalizers or (b) threads resurrected because
965         // they are unreachable and will therefore be sent an
966         // exception.  Any threads thus released will be immediately
967         // runnable.
968         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
969
970         recent_activity = ACTIVITY_DONE_GC;
971         
972         if ( !emptyRunQueue(cap) ) return;
973
974 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
975         /* If we have user-installed signal handlers, then wait
976          * for signals to arrive rather then bombing out with a
977          * deadlock.
978          */
979         if ( anyUserHandlers() ) {
980             debugTrace(DEBUG_sched,
981                        "still deadlocked, waiting for signals...");
982
983             awaitUserSignals();
984
985             if (signals_pending()) {
986                 startSignalHandlers(cap);
987             }
988
989             // either we have threads to run, or we were interrupted:
990             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
991         }
992 #endif
993
994 #if !defined(THREADED_RTS)
995         /* Probably a real deadlock.  Send the current main thread the
996          * Deadlock exception.
997          */
998         if (task->tso) {
999             switch (task->tso->why_blocked) {
1000             case BlockedOnSTM:
1001             case BlockedOnBlackHole:
1002             case BlockedOnException:
1003             case BlockedOnMVar:
1004                 throwToSingleThreaded(cap, task->tso, 
1005                                       (StgClosure *)NonTermination_closure);
1006                 return;
1007             default:
1008                 barf("deadlock: main thread blocked in a strange way");
1009             }
1010         }
1011         return;
1012 #endif
1013     }
1014 }
1015
1016 /* ----------------------------------------------------------------------------
1017  * Process an event (GRAN only)
1018  * ------------------------------------------------------------------------- */
1019
1020 #if defined(GRAN)
1021 static StgTSO *
1022 scheduleProcessEvent(rtsEvent *event)
1023 {
1024     StgTSO *t;
1025
1026     if (RtsFlags.GranFlags.Light)
1027       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1028
1029     /* adjust time based on time-stamp */
1030     if (event->time > CurrentTime[CurrentProc] &&
1031         event->evttype != ContinueThread)
1032       CurrentTime[CurrentProc] = event->time;
1033     
1034     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1035     if (!RtsFlags.GranFlags.Light)
1036       handleIdlePEs();
1037
1038     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1039
1040     /* main event dispatcher in GranSim */
1041     switch (event->evttype) {
1042       /* Should just be continuing execution */
1043     case ContinueThread:
1044       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1045       /* ToDo: check assertion
1046       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1047              run_queue_hd != END_TSO_QUEUE);
1048       */
1049       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1050       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1051           procStatus[CurrentProc]==Fetching) {
1052         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1053               CurrentTSO->id, CurrentTSO, CurrentProc);
1054         goto next_thread;
1055       } 
1056       /* Ignore ContinueThreads for completed threads */
1057       if (CurrentTSO->what_next == ThreadComplete) {
1058         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1059               CurrentTSO->id, CurrentTSO, CurrentProc);
1060         goto next_thread;
1061       } 
1062       /* Ignore ContinueThreads for threads that are being migrated */
1063       if (PROCS(CurrentTSO)==Nowhere) { 
1064         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1065               CurrentTSO->id, CurrentTSO, CurrentProc);
1066         goto next_thread;
1067       }
1068       /* The thread should be at the beginning of the run queue */
1069       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1070         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1071               CurrentTSO->id, CurrentTSO, CurrentProc);
1072         break; // run the thread anyway
1073       }
1074       /*
1075       new_event(proc, proc, CurrentTime[proc],
1076                 FindWork,
1077                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1078       goto next_thread; 
1079       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1080       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1081
1082     case FetchNode:
1083       do_the_fetchnode(event);
1084       goto next_thread;             /* handle next event in event queue  */
1085       
1086     case GlobalBlock:
1087       do_the_globalblock(event);
1088       goto next_thread;             /* handle next event in event queue  */
1089       
1090     case FetchReply:
1091       do_the_fetchreply(event);
1092       goto next_thread;             /* handle next event in event queue  */
1093       
1094     case UnblockThread:   /* Move from the blocked queue to the tail of */
1095       do_the_unblock(event);
1096       goto next_thread;             /* handle next event in event queue  */
1097       
1098     case ResumeThread:  /* Move from the blocked queue to the tail of */
1099       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1100       event->tso->gran.blocktime += 
1101         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1102       do_the_startthread(event);
1103       goto next_thread;             /* handle next event in event queue  */
1104       
1105     case StartThread:
1106       do_the_startthread(event);
1107       goto next_thread;             /* handle next event in event queue  */
1108       
1109     case MoveThread:
1110       do_the_movethread(event);
1111       goto next_thread;             /* handle next event in event queue  */
1112       
1113     case MoveSpark:
1114       do_the_movespark(event);
1115       goto next_thread;             /* handle next event in event queue  */
1116       
1117     case FindWork:
1118       do_the_findwork(event);
1119       goto next_thread;             /* handle next event in event queue  */
1120       
1121     default:
1122       barf("Illegal event type %u\n", event->evttype);
1123     }  /* switch */
1124     
1125     /* This point was scheduler_loop in the old RTS */
1126
1127     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1128
1129     TimeOfLastEvent = CurrentTime[CurrentProc];
1130     TimeOfNextEvent = get_time_of_next_event();
1131     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1132     // CurrentTSO = ThreadQueueHd;
1133
1134     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1135                          TimeOfNextEvent));
1136
1137     if (RtsFlags.GranFlags.Light) 
1138       GranSimLight_leave_system(event, &ActiveTSO); 
1139
1140     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1141
1142     IF_DEBUG(gran, 
1143              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1144
1145     /* in a GranSim setup the TSO stays on the run queue */
1146     t = CurrentTSO;
1147     /* Take a thread from the run queue. */
1148     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1149
1150     IF_DEBUG(gran, 
1151              debugBelch("GRAN: About to run current thread, which is\n");
1152              G_TSO(t,5));
1153
1154     context_switch = 0; // turned on via GranYield, checking events and time slice
1155
1156     IF_DEBUG(gran, 
1157              DumpGranEvent(GR_SCHEDULE, t));
1158
1159     procStatus[CurrentProc] = Busy;
1160 }
1161 #endif // GRAN
1162
1163 /* ----------------------------------------------------------------------------
1164  * Send pending messages (PARALLEL_HASKELL only)
1165  * ------------------------------------------------------------------------- */
1166
1167 #if defined(PARALLEL_HASKELL)
1168 static StgTSO *
1169 scheduleSendPendingMessages(void)
1170 {
1171     StgSparkPool *pool;
1172     rtsSpark spark;
1173     StgTSO *t;
1174
1175 # if defined(PAR) // global Mem.Mgmt., omit for now
1176     if (PendingFetches != END_BF_QUEUE) {
1177         processFetches();
1178     }
1179 # endif
1180     
1181     if (RtsFlags.ParFlags.BufferTime) {
1182         // if we use message buffering, we must send away all message
1183         // packets which have become too old...
1184         sendOldBuffers(); 
1185     }
1186 }
1187 #endif
1188
1189 /* ----------------------------------------------------------------------------
1190  * Activate spark threads (PARALLEL_HASKELL only)
1191  * ------------------------------------------------------------------------- */
1192
1193 #if defined(PARALLEL_HASKELL)
1194 static void
1195 scheduleActivateSpark(void)
1196 {
1197 #if defined(SPARKS)
1198   ASSERT(emptyRunQueue());
1199 /* We get here if the run queue is empty and want some work.
1200    We try to turn a spark into a thread, and add it to the run queue,
1201    from where it will be picked up in the next iteration of the scheduler
1202    loop.
1203 */
1204
1205       /* :-[  no local threads => look out for local sparks */
1206       /* the spark pool for the current PE */
1207       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1208       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1209           pool->hd < pool->tl) {
1210         /* 
1211          * ToDo: add GC code check that we really have enough heap afterwards!!
1212          * Old comment:
1213          * If we're here (no runnable threads) and we have pending
1214          * sparks, we must have a space problem.  Get enough space
1215          * to turn one of those pending sparks into a
1216          * thread... 
1217          */
1218
1219         spark = findSpark(rtsFalse);            /* get a spark */
1220         if (spark != (rtsSpark) NULL) {
1221           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1222           IF_PAR_DEBUG(fish, // schedule,
1223                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1224                              tso->id, tso, advisory_thread_count));
1225
1226           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1227             IF_PAR_DEBUG(fish, // schedule,
1228                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1229                             spark));
1230             return rtsFalse; /* failed to generate a thread */
1231           }                  /* otherwise fall through & pick-up new tso */
1232         } else {
1233           IF_PAR_DEBUG(fish, // schedule,
1234                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1235                              spark_queue_len(pool)));
1236           return rtsFalse;  /* failed to generate a thread */
1237         }
1238         return rtsTrue;  /* success in generating a thread */
1239   } else { /* no more threads permitted or pool empty */
1240     return rtsFalse;  /* failed to generateThread */
1241   }
1242 #else
1243   tso = NULL; // avoid compiler warning only
1244   return rtsFalse;  /* dummy in non-PAR setup */
1245 #endif // SPARKS
1246 }
1247 #endif // PARALLEL_HASKELL
1248
1249 /* ----------------------------------------------------------------------------
1250  * Get work from a remote node (PARALLEL_HASKELL only)
1251  * ------------------------------------------------------------------------- */
1252     
1253 #if defined(PARALLEL_HASKELL)
1254 static rtsBool
1255 scheduleGetRemoteWork(rtsBool *receivedFinish)
1256 {
1257   ASSERT(emptyRunQueue());
1258
1259   if (RtsFlags.ParFlags.BufferTime) {
1260         IF_PAR_DEBUG(verbose, 
1261                 debugBelch("...send all pending data,"));
1262         {
1263           nat i;
1264           for (i=1; i<=nPEs; i++)
1265             sendImmediately(i); // send all messages away immediately
1266         }
1267   }
1268 # ifndef SPARKS
1269         //++EDEN++ idle() , i.e. send all buffers, wait for work
1270         // suppress fishing in EDEN... just look for incoming messages
1271         // (blocking receive)
1272   IF_PAR_DEBUG(verbose, 
1273                debugBelch("...wait for incoming messages...\n"));
1274   *receivedFinish = processMessages(); // blocking receive...
1275
1276         // and reenter scheduling loop after having received something
1277         // (return rtsFalse below)
1278
1279 # else /* activate SPARKS machinery */
1280 /* We get here, if we have no work, tried to activate a local spark, but still
1281    have no work. We try to get a remote spark, by sending a FISH message.
1282    Thread migration should be added here, and triggered when a sequence of 
1283    fishes returns without work. */
1284         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1285
1286       /* =8-[  no local sparks => look for work on other PEs */
1287         /*
1288          * We really have absolutely no work.  Send out a fish
1289          * (there may be some out there already), and wait for
1290          * something to arrive.  We clearly can't run any threads
1291          * until a SCHEDULE or RESUME arrives, and so that's what
1292          * we're hoping to see.  (Of course, we still have to
1293          * respond to other types of messages.)
1294          */
1295         rtsTime now = msTime() /*CURRENT_TIME*/;
1296         IF_PAR_DEBUG(verbose, 
1297                      debugBelch("--  now=%ld\n", now));
1298         IF_PAR_DEBUG(fish, // verbose,
1299              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1300                  (last_fish_arrived_at!=0 &&
1301                   last_fish_arrived_at+delay > now)) {
1302                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1303                      now, last_fish_arrived_at+delay, 
1304                      last_fish_arrived_at,
1305                      delay);
1306              });
1307   
1308         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1309             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1310           if (last_fish_arrived_at==0 ||
1311               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1312             /* outstandingFishes is set in sendFish, processFish;
1313                avoid flooding system with fishes via delay */
1314     next_fish_to_send_at = 0;  
1315   } else {
1316     /* ToDo: this should be done in the main scheduling loop to avoid the
1317              busy wait here; not so bad if fish delay is very small  */
1318     int iq = 0; // DEBUGGING -- HWL
1319     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1320     /* send a fish when ready, but process messages that arrive in the meantime */
1321     do {
1322       if (PacketsWaiting()) {
1323         iq++; // DEBUGGING
1324         *receivedFinish = processMessages();
1325       }
1326       now = msTime();
1327     } while (!*receivedFinish || now<next_fish_to_send_at);
1328     // JB: This means the fish could become obsolete, if we receive
1329     // work. Better check for work again? 
1330     // last line: while (!receivedFinish || !haveWork || now<...)
1331     // next line: if (receivedFinish || haveWork )
1332
1333     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1334       return rtsFalse;  // NB: this will leave scheduler loop
1335                         // immediately after return!
1336                           
1337     IF_PAR_DEBUG(fish, // verbose,
1338                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1339
1340   }
1341
1342     // JB: IMHO, this should all be hidden inside sendFish(...)
1343     /* pe = choosePE(); 
1344        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1345                 NEW_FISH_HUNGER);
1346
1347     // Global statistics: count no. of fishes
1348     if (RtsFlags.ParFlags.ParStats.Global &&
1349          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1350            globalParStats.tot_fish_mess++;
1351            }
1352     */ 
1353
1354   /* delayed fishes must have been sent by now! */
1355   next_fish_to_send_at = 0;  
1356   }
1357       
1358   *receivedFinish = processMessages();
1359 # endif /* SPARKS */
1360
1361  return rtsFalse;
1362  /* NB: this function always returns rtsFalse, meaning the scheduler
1363     loop continues with the next iteration; 
1364     rationale: 
1365       return code means success in finding work; we enter this function
1366       if there is no local work, thus have to send a fish which takes
1367       time until it arrives with work; in the meantime we should process
1368       messages in the main loop;
1369  */
1370 }
1371 #endif // PARALLEL_HASKELL
1372
1373 /* ----------------------------------------------------------------------------
1374  * PAR/GRAN: Report stats & debugging info(?)
1375  * ------------------------------------------------------------------------- */
1376
1377 #if defined(PAR) || defined(GRAN)
1378 static void
1379 scheduleGranParReport(void)
1380 {
1381   ASSERT(run_queue_hd != END_TSO_QUEUE);
1382
1383   /* Take a thread from the run queue, if we have work */
1384   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1385
1386     /* If this TSO has got its outport closed in the meantime, 
1387      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1388      * It has to be marked as TH_DEAD for this purpose.
1389      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1390
1391 JB: TODO: investigate wether state change field could be nuked
1392      entirely and replaced by the normal tso state (whatnext
1393      field). All we want to do is to kill tsos from outside.
1394      */
1395
1396     /* ToDo: write something to the log-file
1397     if (RTSflags.ParFlags.granSimStats && !sameThread)
1398         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1399
1400     CurrentTSO = t;
1401     */
1402     /* the spark pool for the current PE */
1403     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1404
1405     IF_DEBUG(scheduler, 
1406              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1407                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1408
1409     IF_PAR_DEBUG(fish,
1410              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1411                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1412
1413     if (RtsFlags.ParFlags.ParStats.Full && 
1414         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1415         (emitSchedule || // forced emit
1416          (t && LastTSO && t->id != LastTSO->id))) {
1417       /* 
1418          we are running a different TSO, so write a schedule event to log file
1419          NB: If we use fair scheduling we also have to write  a deschedule 
1420              event for LastTSO; with unfair scheduling we know that the
1421              previous tso has blocked whenever we switch to another tso, so
1422              we don't need it in GUM for now
1423       */
1424       IF_PAR_DEBUG(fish, // schedule,
1425                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1426
1427       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1428                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1429       emitSchedule = rtsFalse;
1430     }
1431 }     
1432 #endif
1433
1434 /* ----------------------------------------------------------------------------
1435  * After running a thread...
1436  * ------------------------------------------------------------------------- */
1437
1438 static void
1439 schedulePostRunThread(void)
1440 {
1441 #if defined(PAR)
1442     /* HACK 675: if the last thread didn't yield, make sure to print a 
1443        SCHEDULE event to the log file when StgRunning the next thread, even
1444        if it is the same one as before */
1445     LastTSO = t; 
1446     TimeOfLastYield = CURRENT_TIME;
1447 #endif
1448
1449   /* some statistics gathering in the parallel case */
1450
1451 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1452   switch (ret) {
1453     case HeapOverflow:
1454 # if defined(GRAN)
1455       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1456       globalGranStats.tot_heapover++;
1457 # elif defined(PAR)
1458       globalParStats.tot_heapover++;
1459 # endif
1460       break;
1461
1462      case StackOverflow:
1463 # if defined(GRAN)
1464       IF_DEBUG(gran, 
1465                DumpGranEvent(GR_DESCHEDULE, t));
1466       globalGranStats.tot_stackover++;
1467 # elif defined(PAR)
1468       // IF_DEBUG(par, 
1469       // DumpGranEvent(GR_DESCHEDULE, t);
1470       globalParStats.tot_stackover++;
1471 # endif
1472       break;
1473
1474     case ThreadYielding:
1475 # if defined(GRAN)
1476       IF_DEBUG(gran, 
1477                DumpGranEvent(GR_DESCHEDULE, t));
1478       globalGranStats.tot_yields++;
1479 # elif defined(PAR)
1480       // IF_DEBUG(par, 
1481       // DumpGranEvent(GR_DESCHEDULE, t);
1482       globalParStats.tot_yields++;
1483 # endif
1484       break; 
1485
1486     case ThreadBlocked:
1487 # if defined(GRAN)
1488         debugTrace(DEBUG_sched, 
1489                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1490                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1491                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1492                if (t->block_info.closure!=(StgClosure*)NULL)
1493                  print_bq(t->block_info.closure);
1494                debugBelch("\n"));
1495
1496       // ??? needed; should emit block before
1497       IF_DEBUG(gran, 
1498                DumpGranEvent(GR_DESCHEDULE, t)); 
1499       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1500       /*
1501         ngoq Dogh!
1502       ASSERT(procStatus[CurrentProc]==Busy || 
1503               ((procStatus[CurrentProc]==Fetching) && 
1504               (t->block_info.closure!=(StgClosure*)NULL)));
1505       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1506           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1507             procStatus[CurrentProc]==Fetching)) 
1508         procStatus[CurrentProc] = Idle;
1509       */
1510 # elif defined(PAR)
1511 //++PAR++  blockThread() writes the event (change?)
1512 # endif
1513     break;
1514
1515   case ThreadFinished:
1516     break;
1517
1518   default:
1519     barf("parGlobalStats: unknown return code");
1520     break;
1521     }
1522 #endif
1523 }
1524
1525 /* -----------------------------------------------------------------------------
1526  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1527  * -------------------------------------------------------------------------- */
1528
1529 static rtsBool
1530 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1531 {
1532     // did the task ask for a large block?
1533     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1534         // if so, get one and push it on the front of the nursery.
1535         bdescr *bd;
1536         lnat blocks;
1537         
1538         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1539         
1540         debugTrace(DEBUG_sched,
1541                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1542                    (long)t->id, whatNext_strs[t->what_next], blocks);
1543     
1544         // don't do this if the nursery is (nearly) full, we'll GC first.
1545         if (cap->r.rCurrentNursery->link != NULL ||
1546             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1547                                                // if the nursery has only one block.
1548             
1549             ACQUIRE_SM_LOCK
1550             bd = allocGroup( blocks );
1551             RELEASE_SM_LOCK
1552             cap->r.rNursery->n_blocks += blocks;
1553             
1554             // link the new group into the list
1555             bd->link = cap->r.rCurrentNursery;
1556             bd->u.back = cap->r.rCurrentNursery->u.back;
1557             if (cap->r.rCurrentNursery->u.back != NULL) {
1558                 cap->r.rCurrentNursery->u.back->link = bd;
1559             } else {
1560 #if !defined(THREADED_RTS)
1561                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1562                        g0s0 == cap->r.rNursery);
1563 #endif
1564                 cap->r.rNursery->blocks = bd;
1565             }             
1566             cap->r.rCurrentNursery->u.back = bd;
1567             
1568             // initialise it as a nursery block.  We initialise the
1569             // step, gen_no, and flags field of *every* sub-block in
1570             // this large block, because this is easier than making
1571             // sure that we always find the block head of a large
1572             // block whenever we call Bdescr() (eg. evacuate() and
1573             // isAlive() in the GC would both have to do this, at
1574             // least).
1575             { 
1576                 bdescr *x;
1577                 for (x = bd; x < bd + blocks; x++) {
1578                     x->step = cap->r.rNursery;
1579                     x->gen_no = 0;
1580                     x->flags = 0;
1581                 }
1582             }
1583             
1584             // This assert can be a killer if the app is doing lots
1585             // of large block allocations.
1586             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1587             
1588             // now update the nursery to point to the new block
1589             cap->r.rCurrentNursery = bd;
1590             
1591             // we might be unlucky and have another thread get on the
1592             // run queue before us and steal the large block, but in that
1593             // case the thread will just end up requesting another large
1594             // block.
1595             pushOnRunQueue(cap,t);
1596             return rtsFalse;  /* not actually GC'ing */
1597         }
1598     }
1599     
1600     debugTrace(DEBUG_sched,
1601                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1602                (long)t->id, whatNext_strs[t->what_next]);
1603
1604 #if defined(GRAN)
1605     ASSERT(!is_on_queue(t,CurrentProc));
1606 #elif defined(PARALLEL_HASKELL)
1607     /* Currently we emit a DESCHEDULE event before GC in GUM.
1608        ToDo: either add separate event to distinguish SYSTEM time from rest
1609        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1610     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1611         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1612                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1613         emitSchedule = rtsTrue;
1614     }
1615 #endif
1616       
1617     pushOnRunQueue(cap,t);
1618     return rtsTrue;
1619     /* actual GC is done at the end of the while loop in schedule() */
1620 }
1621
1622 /* -----------------------------------------------------------------------------
1623  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1624  * -------------------------------------------------------------------------- */
1625
1626 static void
1627 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1628 {
1629     debugTrace (DEBUG_sched,
1630                 "--<< thread %ld (%s) stopped, StackOverflow", 
1631                 (long)t->id, whatNext_strs[t->what_next]);
1632
1633     /* just adjust the stack for this thread, then pop it back
1634      * on the run queue.
1635      */
1636     { 
1637         /* enlarge the stack */
1638         StgTSO *new_t = threadStackOverflow(cap, t);
1639         
1640         /* The TSO attached to this Task may have moved, so update the
1641          * pointer to it.
1642          */
1643         if (task->tso == t) {
1644             task->tso = new_t;
1645         }
1646         pushOnRunQueue(cap,new_t);
1647     }
1648 }
1649
1650 /* -----------------------------------------------------------------------------
1651  * Handle a thread that returned to the scheduler with ThreadYielding
1652  * -------------------------------------------------------------------------- */
1653
1654 static rtsBool
1655 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1656 {
1657     // Reset the context switch flag.  We don't do this just before
1658     // running the thread, because that would mean we would lose ticks
1659     // during GC, which can lead to unfair scheduling (a thread hogs
1660     // the CPU because the tick always arrives during GC).  This way
1661     // penalises threads that do a lot of allocation, but that seems
1662     // better than the alternative.
1663     context_switch = 0;
1664     
1665     /* put the thread back on the run queue.  Then, if we're ready to
1666      * GC, check whether this is the last task to stop.  If so, wake
1667      * up the GC thread.  getThread will block during a GC until the
1668      * GC is finished.
1669      */
1670 #ifdef DEBUG
1671     if (t->what_next != prev_what_next) {
1672         debugTrace(DEBUG_sched,
1673                    "--<< thread %ld (%s) stopped to switch evaluators", 
1674                    (long)t->id, whatNext_strs[t->what_next]);
1675     } else {
1676         debugTrace(DEBUG_sched,
1677                    "--<< thread %ld (%s) stopped, yielding",
1678                    (long)t->id, whatNext_strs[t->what_next]);
1679     }
1680 #endif
1681     
1682     IF_DEBUG(sanity,
1683              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1684              checkTSO(t));
1685     ASSERT(t->link == END_TSO_QUEUE);
1686     
1687     // Shortcut if we're just switching evaluators: don't bother
1688     // doing stack squeezing (which can be expensive), just run the
1689     // thread.
1690     if (t->what_next != prev_what_next) {
1691         return rtsTrue;
1692     }
1693     
1694 #if defined(GRAN)
1695     ASSERT(!is_on_queue(t,CurrentProc));
1696       
1697     IF_DEBUG(sanity,
1698              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1699              checkThreadQsSanity(rtsTrue));
1700
1701 #endif
1702
1703     addToRunQueue(cap,t);
1704
1705 #if defined(GRAN)
1706     /* add a ContinueThread event to actually process the thread */
1707     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1708               ContinueThread,
1709               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1710     IF_GRAN_DEBUG(bq, 
1711                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1712                   G_EVENTQ(0);
1713                   G_CURR_THREADQ(0));
1714 #endif
1715     return rtsFalse;
1716 }
1717
1718 /* -----------------------------------------------------------------------------
1719  * Handle a thread that returned to the scheduler with ThreadBlocked
1720  * -------------------------------------------------------------------------- */
1721
1722 static void
1723 scheduleHandleThreadBlocked( StgTSO *t
1724 #if !defined(GRAN) && !defined(DEBUG)
1725     STG_UNUSED
1726 #endif
1727     )
1728 {
1729 #if defined(GRAN)
1730     IF_DEBUG(scheduler,
1731              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1732                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1733              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1734     
1735     // ??? needed; should emit block before
1736     IF_DEBUG(gran, 
1737              DumpGranEvent(GR_DESCHEDULE, t)); 
1738     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1739     /*
1740       ngoq Dogh!
1741       ASSERT(procStatus[CurrentProc]==Busy || 
1742       ((procStatus[CurrentProc]==Fetching) && 
1743       (t->block_info.closure!=(StgClosure*)NULL)));
1744       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1745       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1746       procStatus[CurrentProc]==Fetching)) 
1747       procStatus[CurrentProc] = Idle;
1748     */
1749 #elif defined(PAR)
1750     IF_DEBUG(scheduler,
1751              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1752                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1753     IF_PAR_DEBUG(bq,
1754                  
1755                  if (t->block_info.closure!=(StgClosure*)NULL) 
1756                  print_bq(t->block_info.closure));
1757     
1758     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1759     blockThread(t);
1760     
1761     /* whatever we schedule next, we must log that schedule */
1762     emitSchedule = rtsTrue;
1763     
1764 #else /* !GRAN */
1765
1766       // We don't need to do anything.  The thread is blocked, and it
1767       // has tidied up its stack and placed itself on whatever queue
1768       // it needs to be on.
1769
1770 #if !defined(THREADED_RTS)
1771     ASSERT(t->why_blocked != NotBlocked);
1772              // This might not be true under THREADED_RTS: we don't have
1773              // exclusive access to this TSO, so someone might have
1774              // woken it up by now.  This actually happens: try
1775              // conc023 +RTS -N2.
1776 #endif
1777
1778 #ifdef DEBUG
1779     if (traceClass(DEBUG_sched)) {
1780         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1781                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1782         printThreadBlockage(t);
1783         debugTraceEnd();
1784     }
1785 #endif
1786     
1787     /* Only for dumping event to log file 
1788        ToDo: do I need this in GranSim, too?
1789        blockThread(t);
1790     */
1791 #endif
1792 }
1793
1794 /* -----------------------------------------------------------------------------
1795  * Handle a thread that returned to the scheduler with ThreadFinished
1796  * -------------------------------------------------------------------------- */
1797
1798 static rtsBool
1799 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1800 {
1801     /* Need to check whether this was a main thread, and if so,
1802      * return with the return value.
1803      *
1804      * We also end up here if the thread kills itself with an
1805      * uncaught exception, see Exception.cmm.
1806      */
1807     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1808                (unsigned long)t->id, whatNext_strs[t->what_next]);
1809
1810 #if defined(GRAN)
1811       endThread(t, CurrentProc); // clean-up the thread
1812 #elif defined(PARALLEL_HASKELL)
1813       /* For now all are advisory -- HWL */
1814       //if(t->priority==AdvisoryPriority) ??
1815       advisory_thread_count--; // JB: Caution with this counter, buggy!
1816       
1817 # if defined(DIST)
1818       if(t->dist.priority==RevalPriority)
1819         FinishReval(t);
1820 # endif
1821     
1822 # if defined(EDENOLD)
1823       // the thread could still have an outport... (BUG)
1824       if (t->eden.outport != -1) {
1825       // delete the outport for the tso which has finished...
1826         IF_PAR_DEBUG(eden_ports,
1827                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1828                               t->eden.outport, t->id));
1829         deleteOPT(t);
1830       }
1831       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1832       if (t->eden.epid != -1) {
1833         IF_PAR_DEBUG(eden_ports,
1834                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1835                            t->id, t->eden.epid));
1836         removeTSOfromProcess(t);
1837       }
1838 # endif 
1839
1840 # if defined(PAR)
1841       if (RtsFlags.ParFlags.ParStats.Full &&
1842           !RtsFlags.ParFlags.ParStats.Suppressed) 
1843         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1844
1845       //  t->par only contains statistics: left out for now...
1846       IF_PAR_DEBUG(fish,
1847                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1848                               t->id,t,t->par.sparkname));
1849 # endif
1850 #endif // PARALLEL_HASKELL
1851
1852       //
1853       // Check whether the thread that just completed was a bound
1854       // thread, and if so return with the result.  
1855       //
1856       // There is an assumption here that all thread completion goes
1857       // through this point; we need to make sure that if a thread
1858       // ends up in the ThreadKilled state, that it stays on the run
1859       // queue so it can be dealt with here.
1860       //
1861
1862       if (t->bound) {
1863
1864           if (t->bound != task) {
1865 #if !defined(THREADED_RTS)
1866               // Must be a bound thread that is not the topmost one.  Leave
1867               // it on the run queue until the stack has unwound to the
1868               // point where we can deal with this.  Leaving it on the run
1869               // queue also ensures that the garbage collector knows about
1870               // this thread and its return value (it gets dropped from the
1871               // all_threads list so there's no other way to find it).
1872               appendToRunQueue(cap,t);
1873               return rtsFalse;
1874 #else
1875               // this cannot happen in the threaded RTS, because a
1876               // bound thread can only be run by the appropriate Task.
1877               barf("finished bound thread that isn't mine");
1878 #endif
1879           }
1880
1881           ASSERT(task->tso == t);
1882
1883           if (t->what_next == ThreadComplete) {
1884               if (task->ret) {
1885                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1886                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1887               }
1888               task->stat = Success;
1889           } else {
1890               if (task->ret) {
1891                   *(task->ret) = NULL;
1892               }
1893               if (sched_state >= SCHED_INTERRUPTING) {
1894                   task->stat = Interrupted;
1895               } else {
1896                   task->stat = Killed;
1897               }
1898           }
1899 #ifdef DEBUG
1900           removeThreadLabel((StgWord)task->tso->id);
1901 #endif
1902           return rtsTrue; // tells schedule() to return
1903       }
1904
1905       return rtsFalse;
1906 }
1907
1908 /* -----------------------------------------------------------------------------
1909  * Perform a heap census, if PROFILING
1910  * -------------------------------------------------------------------------- */
1911
1912 static rtsBool
1913 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1914 {
1915 #if defined(PROFILING)
1916     // When we have +RTS -i0 and we're heap profiling, do a census at
1917     // every GC.  This lets us get repeatable runs for debugging.
1918     if (performHeapProfile ||
1919         (RtsFlags.ProfFlags.profileInterval==0 &&
1920          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1921
1922         // checking black holes is necessary before GC, otherwise
1923         // there may be threads that are unreachable except by the
1924         // blackhole queue, which the GC will consider to be
1925         // deadlocked.
1926         scheduleCheckBlackHoles(&MainCapability);
1927
1928         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1929         GarbageCollect(rtsTrue);
1930
1931         debugTrace(DEBUG_sched, "performing heap census");
1932         heapCensus();
1933
1934         performHeapProfile = rtsFalse;
1935         return rtsTrue;  // true <=> we already GC'd
1936     }
1937 #endif
1938     return rtsFalse;
1939 }
1940
1941 /* -----------------------------------------------------------------------------
1942  * Perform a garbage collection if necessary
1943  * -------------------------------------------------------------------------- */
1944
1945 static Capability *
1946 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1947 {
1948     StgTSO *t;
1949 #ifdef THREADED_RTS
1950     static volatile StgWord waiting_for_gc;
1951     rtsBool was_waiting;
1952     nat i;
1953 #endif
1954
1955 #ifdef THREADED_RTS
1956     // In order to GC, there must be no threads running Haskell code.
1957     // Therefore, the GC thread needs to hold *all* the capabilities,
1958     // and release them after the GC has completed.  
1959     //
1960     // This seems to be the simplest way: previous attempts involved
1961     // making all the threads with capabilities give up their
1962     // capabilities and sleep except for the *last* one, which
1963     // actually did the GC.  But it's quite hard to arrange for all
1964     // the other tasks to sleep and stay asleep.
1965     //
1966         
1967     was_waiting = cas(&waiting_for_gc, 0, 1);
1968     if (was_waiting) {
1969         do {
1970             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1971             if (cap) yieldCapability(&cap,task);
1972         } while (waiting_for_gc);
1973         return cap;  // NOTE: task->cap might have changed here
1974     }
1975
1976     for (i=0; i < n_capabilities; i++) {
1977         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1978         if (cap != &capabilities[i]) {
1979             Capability *pcap = &capabilities[i];
1980             // we better hope this task doesn't get migrated to
1981             // another Capability while we're waiting for this one.
1982             // It won't, because load balancing happens while we have
1983             // all the Capabilities, but even so it's a slightly
1984             // unsavoury invariant.
1985             task->cap = pcap;
1986             context_switch = 1;
1987             waitForReturnCapability(&pcap, task);
1988             if (pcap != &capabilities[i]) {
1989                 barf("scheduleDoGC: got the wrong capability");
1990             }
1991         }
1992     }
1993
1994     waiting_for_gc = rtsFalse;
1995 #endif
1996
1997     /* Kick any transactions which are invalid back to their
1998      * atomically frames.  When next scheduled they will try to
1999      * commit, this commit will fail and they will retry.
2000      */
2001     { 
2002         StgTSO *next;
2003
2004         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2005             if (t->what_next == ThreadRelocated) {
2006                 next = t->link;
2007             } else {
2008                 next = t->global_link;
2009                 
2010                 // This is a good place to check for blocked
2011                 // exceptions.  It might be the case that a thread is
2012                 // blocked on delivering an exception to a thread that
2013                 // is also blocked - we try to ensure that this
2014                 // doesn't happen in throwTo(), but it's too hard (or
2015                 // impossible) to close all the race holes, so we
2016                 // accept that some might get through and deal with
2017                 // them here.  A GC will always happen at some point,
2018                 // even if the system is otherwise deadlocked.
2019                 maybePerformBlockedException (&capabilities[0], t);
2020
2021                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2022                     if (!stmValidateNestOfTransactions (t -> trec)) {
2023                         debugTrace(DEBUG_sched | DEBUG_stm,
2024                                    "trec %p found wasting its time", t);
2025                         
2026                         // strip the stack back to the
2027                         // ATOMICALLY_FRAME, aborting the (nested)
2028                         // transaction, and saving the stack of any
2029                         // partially-evaluated thunks on the heap.
2030                         throwToSingleThreaded_(&capabilities[0], t, 
2031                                                NULL, rtsTrue, NULL);
2032                         
2033 #ifdef REG_R1
2034                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2035 #endif
2036                     }
2037                 }
2038             }
2039         }
2040     }
2041     
2042     // so this happens periodically:
2043     if (cap) scheduleCheckBlackHoles(cap);
2044     
2045     IF_DEBUG(scheduler, printAllThreads());
2046
2047     /*
2048      * We now have all the capabilities; if we're in an interrupting
2049      * state, then we should take the opportunity to delete all the
2050      * threads in the system.
2051      */
2052     if (sched_state >= SCHED_INTERRUPTING) {
2053         deleteAllThreads(&capabilities[0]);
2054         sched_state = SCHED_SHUTTING_DOWN;
2055     }
2056
2057     /* everybody back, start the GC.
2058      * Could do it in this thread, or signal a condition var
2059      * to do it in another thread.  Either way, we need to
2060      * broadcast on gc_pending_cond afterward.
2061      */
2062 #if defined(THREADED_RTS)
2063     debugTrace(DEBUG_sched, "doing GC");
2064 #endif
2065     GarbageCollect(force_major);
2066     
2067 #if defined(THREADED_RTS)
2068     // release our stash of capabilities.
2069     for (i = 0; i < n_capabilities; i++) {
2070         if (cap != &capabilities[i]) {
2071             task->cap = &capabilities[i];
2072             releaseCapability(&capabilities[i]);
2073         }
2074     }
2075     if (cap) {
2076         task->cap = cap;
2077     } else {
2078         task->cap = NULL;
2079     }
2080 #endif
2081
2082 #if defined(GRAN)
2083     /* add a ContinueThread event to continue execution of current thread */
2084     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2085               ContinueThread,
2086               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2087     IF_GRAN_DEBUG(bq, 
2088                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2089                   G_EVENTQ(0);
2090                   G_CURR_THREADQ(0));
2091 #endif /* GRAN */
2092
2093     return cap;
2094 }
2095
2096 /* ---------------------------------------------------------------------------
2097  * Singleton fork(). Do not copy any running threads.
2098  * ------------------------------------------------------------------------- */
2099
2100 StgInt
2101 forkProcess(HsStablePtr *entry
2102 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2103             STG_UNUSED
2104 #endif
2105            )
2106 {
2107 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2108     Task *task;
2109     pid_t pid;
2110     StgTSO* t,*next;
2111     Capability *cap;
2112     
2113 #if defined(THREADED_RTS)
2114     if (RtsFlags.ParFlags.nNodes > 1) {
2115         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2116         stg_exit(EXIT_FAILURE);
2117     }
2118 #endif
2119
2120     debugTrace(DEBUG_sched, "forking!");
2121     
2122     // ToDo: for SMP, we should probably acquire *all* the capabilities
2123     cap = rts_lock();
2124     
2125     pid = fork();
2126     
2127     if (pid) { // parent
2128         
2129         // just return the pid
2130         rts_unlock(cap);
2131         return pid;
2132         
2133     } else { // child
2134         
2135         // Now, all OS threads except the thread that forked are
2136         // stopped.  We need to stop all Haskell threads, including
2137         // those involved in foreign calls.  Also we need to delete
2138         // all Tasks, because they correspond to OS threads that are
2139         // now gone.
2140
2141         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2142             if (t->what_next == ThreadRelocated) {
2143                 next = t->link;
2144             } else {
2145                 next = t->global_link;
2146                 // don't allow threads to catch the ThreadKilled
2147                 // exception, but we do want to raiseAsync() because these
2148                 // threads may be evaluating thunks that we need later.
2149                 deleteThread_(cap,t);
2150             }
2151         }
2152         
2153         // Empty the run queue.  It seems tempting to let all the
2154         // killed threads stay on the run queue as zombies to be
2155         // cleaned up later, but some of them correspond to bound
2156         // threads for which the corresponding Task does not exist.
2157         cap->run_queue_hd = END_TSO_QUEUE;
2158         cap->run_queue_tl = END_TSO_QUEUE;
2159
2160         // Any suspended C-calling Tasks are no more, their OS threads
2161         // don't exist now:
2162         cap->suspended_ccalling_tasks = NULL;
2163
2164         // Empty the all_threads list.  Otherwise, the garbage
2165         // collector may attempt to resurrect some of these threads.
2166         all_threads = END_TSO_QUEUE;
2167
2168         // Wipe the task list, except the current Task.
2169         ACQUIRE_LOCK(&sched_mutex);
2170         for (task = all_tasks; task != NULL; task=task->all_link) {
2171             if (task != cap->running_task) {
2172                 discardTask(task);
2173             }
2174         }
2175         RELEASE_LOCK(&sched_mutex);
2176
2177 #if defined(THREADED_RTS)
2178         // Wipe our spare workers list, they no longer exist.  New
2179         // workers will be created if necessary.
2180         cap->spare_workers = NULL;
2181         cap->returning_tasks_hd = NULL;
2182         cap->returning_tasks_tl = NULL;
2183 #endif
2184
2185         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2186         rts_checkSchedStatus("forkProcess",cap);
2187         
2188         rts_unlock(cap);
2189         hs_exit();                      // clean up and exit
2190         stg_exit(EXIT_SUCCESS);
2191     }
2192 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2193     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2194     return -1;
2195 #endif
2196 }
2197
2198 /* ---------------------------------------------------------------------------
2199  * Delete all the threads in the system
2200  * ------------------------------------------------------------------------- */
2201    
2202 static void
2203 deleteAllThreads ( Capability *cap )
2204 {
2205     // NOTE: only safe to call if we own all capabilities.
2206
2207     StgTSO* t, *next;
2208     debugTrace(DEBUG_sched,"deleting all threads");
2209     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2210         if (t->what_next == ThreadRelocated) {
2211             next = t->link;
2212         } else {
2213             next = t->global_link;
2214             deleteThread(cap,t);
2215         }
2216     }      
2217
2218     // The run queue now contains a bunch of ThreadKilled threads.  We
2219     // must not throw these away: the main thread(s) will be in there
2220     // somewhere, and the main scheduler loop has to deal with it.
2221     // Also, the run queue is the only thing keeping these threads from
2222     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2223
2224 #if !defined(THREADED_RTS)
2225     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2226     ASSERT(sleeping_queue == END_TSO_QUEUE);
2227 #endif
2228 }
2229
2230 /* -----------------------------------------------------------------------------
2231    Managing the suspended_ccalling_tasks list.
2232    Locks required: sched_mutex
2233    -------------------------------------------------------------------------- */
2234
2235 STATIC_INLINE void
2236 suspendTask (Capability *cap, Task *task)
2237 {
2238     ASSERT(task->next == NULL && task->prev == NULL);
2239     task->next = cap->suspended_ccalling_tasks;
2240     task->prev = NULL;
2241     if (cap->suspended_ccalling_tasks) {
2242         cap->suspended_ccalling_tasks->prev = task;
2243     }
2244     cap->suspended_ccalling_tasks = task;
2245 }
2246
2247 STATIC_INLINE void
2248 recoverSuspendedTask (Capability *cap, Task *task)
2249 {
2250     if (task->prev) {
2251         task->prev->next = task->next;
2252     } else {
2253         ASSERT(cap->suspended_ccalling_tasks == task);
2254         cap->suspended_ccalling_tasks = task->next;
2255     }
2256     if (task->next) {
2257         task->next->prev = task->prev;
2258     }
2259     task->next = task->prev = NULL;
2260 }
2261
2262 /* ---------------------------------------------------------------------------
2263  * Suspending & resuming Haskell threads.
2264  * 
2265  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2266  * its capability before calling the C function.  This allows another
2267  * task to pick up the capability and carry on running Haskell
2268  * threads.  It also means that if the C call blocks, it won't lock
2269  * the whole system.
2270  *
2271  * The Haskell thread making the C call is put to sleep for the
2272  * duration of the call, on the susepended_ccalling_threads queue.  We
2273  * give out a token to the task, which it can use to resume the thread
2274  * on return from the C function.
2275  * ------------------------------------------------------------------------- */
2276    
2277 void *
2278 suspendThread (StgRegTable *reg)
2279 {
2280   Capability *cap;
2281   int saved_errno = errno;
2282   StgTSO *tso;
2283   Task *task;
2284
2285   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2286    */
2287   cap = regTableToCapability(reg);
2288
2289   task = cap->running_task;
2290   tso = cap->r.rCurrentTSO;
2291
2292   debugTrace(DEBUG_sched, 
2293              "thread %lu did a safe foreign call", 
2294              (unsigned long)cap->r.rCurrentTSO->id);
2295
2296   // XXX this might not be necessary --SDM
2297   tso->what_next = ThreadRunGHC;
2298
2299   threadPaused(cap,tso);
2300
2301   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2302       tso->why_blocked = BlockedOnCCall;
2303       tso->flags |= TSO_BLOCKEX;
2304       tso->flags &= ~TSO_INTERRUPTIBLE;
2305   } else {
2306       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2307   }
2308
2309   // Hand back capability
2310   task->suspended_tso = tso;
2311
2312   ACQUIRE_LOCK(&cap->lock);
2313
2314   suspendTask(cap,task);
2315   cap->in_haskell = rtsFalse;
2316   releaseCapability_(cap);
2317   
2318   RELEASE_LOCK(&cap->lock);
2319
2320 #if defined(THREADED_RTS)
2321   /* Preparing to leave the RTS, so ensure there's a native thread/task
2322      waiting to take over.
2323   */
2324   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2325 #endif
2326
2327   errno = saved_errno;
2328   return task;
2329 }
2330
2331 StgRegTable *
2332 resumeThread (void *task_)
2333 {
2334     StgTSO *tso;
2335     Capability *cap;
2336     int saved_errno = errno;
2337     Task *task = task_;
2338
2339     cap = task->cap;
2340     // Wait for permission to re-enter the RTS with the result.
2341     waitForReturnCapability(&cap,task);
2342     // we might be on a different capability now... but if so, our
2343     // entry on the suspended_ccalling_tasks list will also have been
2344     // migrated.
2345
2346     // Remove the thread from the suspended list
2347     recoverSuspendedTask(cap,task);
2348
2349     tso = task->suspended_tso;
2350     task->suspended_tso = NULL;
2351     tso->link = END_TSO_QUEUE;
2352     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2353     
2354     if (tso->why_blocked == BlockedOnCCall) {
2355         awakenBlockedExceptionQueue(cap,tso);
2356         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2357     }
2358     
2359     /* Reset blocking status */
2360     tso->why_blocked  = NotBlocked;
2361     
2362     cap->r.rCurrentTSO = tso;
2363     cap->in_haskell = rtsTrue;
2364     errno = saved_errno;
2365
2366     /* We might have GC'd, mark the TSO dirty again */
2367     dirtyTSO(tso);
2368
2369     IF_DEBUG(sanity, checkTSO(tso));
2370
2371     return &cap->r;
2372 }
2373
2374 /* ---------------------------------------------------------------------------
2375  * scheduleThread()
2376  *
2377  * scheduleThread puts a thread on the end  of the runnable queue.
2378  * This will usually be done immediately after a thread is created.
2379  * The caller of scheduleThread must create the thread using e.g.
2380  * createThread and push an appropriate closure
2381  * on this thread's stack before the scheduler is invoked.
2382  * ------------------------------------------------------------------------ */
2383
2384 void
2385 scheduleThread(Capability *cap, StgTSO *tso)
2386 {
2387     // The thread goes at the *end* of the run-queue, to avoid possible
2388     // starvation of any threads already on the queue.
2389     appendToRunQueue(cap,tso);
2390 }
2391
2392 void
2393 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2394 {
2395 #if defined(THREADED_RTS)
2396     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2397                               // move this thread from now on.
2398     cpu %= RtsFlags.ParFlags.nNodes;
2399     if (cpu == cap->no) {
2400         appendToRunQueue(cap,tso);
2401     } else {
2402         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2403     }
2404 #else
2405     appendToRunQueue(cap,tso);
2406 #endif
2407 }
2408
2409 Capability *
2410 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2411 {
2412     Task *task;
2413
2414     // We already created/initialised the Task
2415     task = cap->running_task;
2416
2417     // This TSO is now a bound thread; make the Task and TSO
2418     // point to each other.
2419     tso->bound = task;
2420     tso->cap = cap;
2421
2422     task->tso = tso;
2423     task->ret = ret;
2424     task->stat = NoStatus;
2425
2426     appendToRunQueue(cap,tso);
2427
2428     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2429
2430 #if defined(GRAN)
2431     /* GranSim specific init */
2432     CurrentTSO = m->tso;                // the TSO to run
2433     procStatus[MainProc] = Busy;        // status of main PE
2434     CurrentProc = MainProc;             // PE to run it on
2435 #endif
2436
2437     cap = schedule(cap,task);
2438
2439     ASSERT(task->stat != NoStatus);
2440     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2441
2442     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2443     return cap;
2444 }
2445
2446 /* ----------------------------------------------------------------------------
2447  * Starting Tasks
2448  * ------------------------------------------------------------------------- */
2449
2450 #if defined(THREADED_RTS)
2451 void
2452 workerStart(Task *task)
2453 {
2454     Capability *cap;
2455
2456     // See startWorkerTask().
2457     ACQUIRE_LOCK(&task->lock);
2458     cap = task->cap;
2459     RELEASE_LOCK(&task->lock);
2460
2461     // set the thread-local pointer to the Task:
2462     taskEnter(task);
2463
2464     // schedule() runs without a lock.
2465     cap = schedule(cap,task);
2466
2467     // On exit from schedule(), we have a Capability.
2468     releaseCapability(cap);
2469     workerTaskStop(task);
2470 }
2471 #endif
2472
2473 /* ---------------------------------------------------------------------------
2474  * initScheduler()
2475  *
2476  * Initialise the scheduler.  This resets all the queues - if the
2477  * queues contained any threads, they'll be garbage collected at the
2478  * next pass.
2479  *
2480  * ------------------------------------------------------------------------ */
2481
2482 void 
2483 initScheduler(void)
2484 {
2485 #if defined(GRAN)
2486   nat i;
2487   for (i=0; i<=MAX_PROC; i++) {
2488     run_queue_hds[i]      = END_TSO_QUEUE;
2489     run_queue_tls[i]      = END_TSO_QUEUE;
2490     blocked_queue_hds[i]  = END_TSO_QUEUE;
2491     blocked_queue_tls[i]  = END_TSO_QUEUE;
2492     ccalling_threadss[i]  = END_TSO_QUEUE;
2493     blackhole_queue[i]    = END_TSO_QUEUE;
2494     sleeping_queue        = END_TSO_QUEUE;
2495   }
2496 #elif !defined(THREADED_RTS)
2497   blocked_queue_hd  = END_TSO_QUEUE;
2498   blocked_queue_tl  = END_TSO_QUEUE;
2499   sleeping_queue    = END_TSO_QUEUE;
2500 #endif
2501
2502   blackhole_queue   = END_TSO_QUEUE;
2503   all_threads       = END_TSO_QUEUE;
2504
2505   context_switch = 0;
2506   sched_state    = SCHED_RUNNING;
2507
2508 #if defined(THREADED_RTS)
2509   /* Initialise the mutex and condition variables used by
2510    * the scheduler. */
2511   initMutex(&sched_mutex);
2512 #endif
2513   
2514   ACQUIRE_LOCK(&sched_mutex);
2515
2516   /* A capability holds the state a native thread needs in
2517    * order to execute STG code. At least one capability is
2518    * floating around (only THREADED_RTS builds have more than one).
2519    */
2520   initCapabilities();
2521
2522   initTaskManager();
2523
2524 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2525   initSparkPools();
2526 #endif
2527
2528 #if defined(THREADED_RTS)
2529   /*
2530    * Eagerly start one worker to run each Capability, except for
2531    * Capability 0.  The idea is that we're probably going to start a
2532    * bound thread on Capability 0 pretty soon, so we don't want a
2533    * worker task hogging it.
2534    */
2535   { 
2536       nat i;
2537       Capability *cap;
2538       for (i = 1; i < n_capabilities; i++) {
2539           cap = &capabilities[i];
2540           ACQUIRE_LOCK(&cap->lock);
2541           startWorkerTask(cap, workerStart);
2542           RELEASE_LOCK(&cap->lock);
2543       }
2544   }
2545 #endif
2546
2547   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2548
2549   RELEASE_LOCK(&sched_mutex);
2550 }
2551
2552 void
2553 exitScheduler( void )
2554 {
2555     Task *task = NULL;
2556
2557 #if defined(THREADED_RTS)
2558     ACQUIRE_LOCK(&sched_mutex);
2559     task = newBoundTask();
2560     RELEASE_LOCK(&sched_mutex);
2561 #endif
2562
2563     // If we haven't killed all the threads yet, do it now.
2564     if (sched_state < SCHED_SHUTTING_DOWN) {
2565         sched_state = SCHED_INTERRUPTING;
2566         scheduleDoGC(NULL,task,rtsFalse);    
2567     }
2568     sched_state = SCHED_SHUTTING_DOWN;
2569
2570 #if defined(THREADED_RTS)
2571     { 
2572         nat i;
2573         
2574         for (i = 0; i < n_capabilities; i++) {
2575             shutdownCapability(&capabilities[i], task);
2576         }
2577         boundTaskExiting(task);
2578         stopTaskManager();
2579     }
2580     closeMutex(&sched_mutex);
2581 #endif
2582 }
2583
2584 /* ---------------------------------------------------------------------------
2585    Where are the roots that we know about?
2586
2587         - all the threads on the runnable queue
2588         - all the threads on the blocked queue
2589         - all the threads on the sleeping queue
2590         - all the thread currently executing a _ccall_GC
2591         - all the "main threads"
2592      
2593    ------------------------------------------------------------------------ */
2594
2595 /* This has to be protected either by the scheduler monitor, or by the
2596         garbage collection monitor (probably the latter).
2597         KH @ 25/10/99
2598 */
2599
2600 void
2601 GetRoots( evac_fn evac )
2602 {
2603     nat i;
2604     Capability *cap;
2605     Task *task;
2606
2607 #if defined(GRAN)
2608     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2609         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2610             evac((StgClosure **)&run_queue_hds[i]);
2611         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2612             evac((StgClosure **)&run_queue_tls[i]);
2613         
2614         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2615             evac((StgClosure **)&blocked_queue_hds[i]);
2616         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2617             evac((StgClosure **)&blocked_queue_tls[i]);
2618         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2619             evac((StgClosure **)&ccalling_threads[i]);
2620     }
2621
2622     markEventQueue();
2623
2624 #else /* !GRAN */
2625
2626     for (i = 0; i < n_capabilities; i++) {
2627         cap = &capabilities[i];
2628         evac((StgClosure **)(void *)&cap->run_queue_hd);
2629         evac((StgClosure **)(void *)&cap->run_queue_tl);
2630 #if defined(THREADED_RTS)
2631         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2632         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2633 #endif
2634         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2635              task=task->next) {
2636             debugTrace(DEBUG_sched,
2637                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2638             evac((StgClosure **)(void *)&task->suspended_tso);
2639         }
2640
2641     }
2642     
2643
2644 #if !defined(THREADED_RTS)
2645     evac((StgClosure **)(void *)&blocked_queue_hd);
2646     evac((StgClosure **)(void *)&blocked_queue_tl);
2647     evac((StgClosure **)(void *)&sleeping_queue);
2648 #endif 
2649 #endif
2650
2651     // evac((StgClosure **)&blackhole_queue);
2652
2653 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2654     markSparkQueue(evac);
2655 #endif
2656     
2657 #if defined(RTS_USER_SIGNALS)
2658     // mark the signal handlers (signals should be already blocked)
2659     markSignalHandlers(evac);
2660 #endif
2661 }
2662
2663 /* -----------------------------------------------------------------------------
2664    performGC
2665
2666    This is the interface to the garbage collector from Haskell land.
2667    We provide this so that external C code can allocate and garbage
2668    collect when called from Haskell via _ccall_GC.
2669    -------------------------------------------------------------------------- */
2670
2671 static void
2672 performGC_(rtsBool force_major)
2673 {
2674     Task *task;
2675     // We must grab a new Task here, because the existing Task may be
2676     // associated with a particular Capability, and chained onto the 
2677     // suspended_ccalling_tasks queue.
2678     ACQUIRE_LOCK(&sched_mutex);
2679     task = newBoundTask();
2680     RELEASE_LOCK(&sched_mutex);
2681     scheduleDoGC(NULL,task,force_major);
2682     boundTaskExiting(task);
2683 }
2684
2685 void
2686 performGC(void)
2687 {
2688     performGC_(rtsFalse);
2689 }
2690
2691 void
2692 performMajorGC(void)
2693 {
2694     performGC_(rtsTrue);
2695 }
2696
2697 /* -----------------------------------------------------------------------------
2698    Stack overflow
2699
2700    If the thread has reached its maximum stack size, then raise the
2701    StackOverflow exception in the offending thread.  Otherwise
2702    relocate the TSO into a larger chunk of memory and adjust its stack
2703    size appropriately.
2704    -------------------------------------------------------------------------- */
2705
2706 static StgTSO *
2707 threadStackOverflow(Capability *cap, StgTSO *tso)
2708 {
2709   nat new_stack_size, stack_words;
2710   lnat new_tso_size;
2711   StgPtr new_sp;
2712   StgTSO *dest;
2713
2714   IF_DEBUG(sanity,checkTSO(tso));
2715
2716   // don't allow throwTo() to modify the blocked_exceptions queue
2717   // while we are moving the TSO:
2718   lockClosure((StgClosure *)tso);
2719
2720   if (tso->stack_size >= tso->max_stack_size) {
2721
2722       debugTrace(DEBUG_gc,
2723                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2724                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2725       IF_DEBUG(gc,
2726                /* If we're debugging, just print out the top of the stack */
2727                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2728                                                 tso->sp+64)));
2729
2730       // Send this thread the StackOverflow exception
2731       unlockTSO(tso);
2732       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2733       return tso;
2734   }
2735
2736   /* Try to double the current stack size.  If that takes us over the
2737    * maximum stack size for this thread, then use the maximum instead.
2738    * Finally round up so the TSO ends up as a whole number of blocks.
2739    */
2740   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2741   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2742                                        TSO_STRUCT_SIZE)/sizeof(W_);
2743   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2744   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2745
2746   debugTrace(DEBUG_sched, 
2747              "increasing stack size from %ld words to %d.",
2748              (long)tso->stack_size, new_stack_size);
2749
2750   dest = (StgTSO *)allocate(new_tso_size);
2751   TICK_ALLOC_TSO(new_stack_size,0);
2752
2753   /* copy the TSO block and the old stack into the new area */
2754   memcpy(dest,tso,TSO_STRUCT_SIZE);
2755   stack_words = tso->stack + tso->stack_size - tso->sp;
2756   new_sp = (P_)dest + new_tso_size - stack_words;
2757   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2758
2759   /* relocate the stack pointers... */
2760   dest->sp         = new_sp;
2761   dest->stack_size = new_stack_size;
2762         
2763   /* Mark the old TSO as relocated.  We have to check for relocated
2764    * TSOs in the garbage collector and any primops that deal with TSOs.
2765    *
2766    * It's important to set the sp value to just beyond the end
2767    * of the stack, so we don't attempt to scavenge any part of the
2768    * dead TSO's stack.
2769    */
2770   tso->what_next = ThreadRelocated;
2771   tso->link = dest;
2772   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2773   tso->why_blocked = NotBlocked;
2774
2775   IF_PAR_DEBUG(verbose,
2776                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2777                      tso->id, tso, tso->stack_size);
2778                /* If we're debugging, just print out the top of the stack */
2779                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2780                                                 tso->sp+64)));
2781   
2782   unlockTSO(dest);
2783   unlockTSO(tso);
2784
2785   IF_DEBUG(sanity,checkTSO(dest));
2786 #if 0
2787   IF_DEBUG(scheduler,printTSO(dest));
2788 #endif
2789
2790   return dest;
2791 }
2792
2793 /* ---------------------------------------------------------------------------
2794    Interrupt execution
2795    - usually called inside a signal handler so it mustn't do anything fancy.   
2796    ------------------------------------------------------------------------ */
2797
2798 void
2799 interruptStgRts(void)
2800 {
2801     sched_state = SCHED_INTERRUPTING;
2802     context_switch = 1;
2803     wakeUpRts();
2804 }
2805
2806 /* -----------------------------------------------------------------------------
2807    Wake up the RTS
2808    
2809    This function causes at least one OS thread to wake up and run the
2810    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2811    an external event has arrived that may need servicing (eg. a
2812    keyboard interrupt).
2813
2814    In the single-threaded RTS we don't do anything here; we only have
2815    one thread anyway, and the event that caused us to want to wake up
2816    will have interrupted any blocking system call in progress anyway.
2817    -------------------------------------------------------------------------- */
2818
2819 void
2820 wakeUpRts(void)
2821 {
2822 #if defined(THREADED_RTS)
2823 #if !defined(mingw32_HOST_OS)
2824     // This forces the IO Manager thread to wakeup, which will
2825     // in turn ensure that some OS thread wakes up and runs the
2826     // scheduler loop, which will cause a GC and deadlock check.
2827     ioManagerWakeup();
2828 #else
2829     // On Windows this might be safe enough, because we aren't
2830     // in a signal handler.  Later we should use the IO Manager,
2831     // though.
2832     prodOneCapability();
2833 #endif
2834 #endif
2835 }
2836
2837 /* -----------------------------------------------------------------------------
2838  * checkBlackHoles()
2839  *
2840  * Check the blackhole_queue for threads that can be woken up.  We do
2841  * this periodically: before every GC, and whenever the run queue is
2842  * empty.
2843  *
2844  * An elegant solution might be to just wake up all the blocked
2845  * threads with awakenBlockedQueue occasionally: they'll go back to
2846  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2847  * doesn't give us a way to tell whether we've actually managed to
2848  * wake up any threads, so we would be busy-waiting.
2849  *
2850  * -------------------------------------------------------------------------- */
2851
2852 static rtsBool
2853 checkBlackHoles (Capability *cap)
2854 {
2855     StgTSO **prev, *t;
2856     rtsBool any_woke_up = rtsFalse;
2857     StgHalfWord type;
2858
2859     // blackhole_queue is global:
2860     ASSERT_LOCK_HELD(&sched_mutex);
2861
2862     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2863
2864     // ASSUMES: sched_mutex
2865     prev = &blackhole_queue;
2866     t = blackhole_queue;
2867     while (t != END_TSO_QUEUE) {
2868         ASSERT(t->why_blocked == BlockedOnBlackHole);
2869         type = get_itbl(t->block_info.closure)->type;
2870         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2871             IF_DEBUG(sanity,checkTSO(t));
2872             t = unblockOne(cap, t);
2873             // urk, the threads migrate to the current capability
2874             // here, but we'd like to keep them on the original one.
2875             *prev = t;
2876             any_woke_up = rtsTrue;
2877         } else {
2878             prev = &t->link;
2879             t = t->link;
2880         }
2881     }
2882
2883     return any_woke_up;
2884 }
2885
2886 /* -----------------------------------------------------------------------------
2887    Deleting threads
2888
2889    This is used for interruption (^C) and forking, and corresponds to
2890    raising an exception but without letting the thread catch the
2891    exception.
2892    -------------------------------------------------------------------------- */
2893
2894 static void 
2895 deleteThread (Capability *cap, StgTSO *tso)
2896 {
2897     // NOTE: must only be called on a TSO that we have exclusive
2898     // access to, because we will call throwToSingleThreaded() below.
2899     // The TSO must be on the run queue of the Capability we own, or 
2900     // we must own all Capabilities.
2901
2902     if (tso->why_blocked != BlockedOnCCall &&
2903         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2904         throwToSingleThreaded(cap,tso,NULL);
2905     }
2906 }
2907
2908 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2909 static void 
2910 deleteThread_(Capability *cap, StgTSO *tso)
2911 { // for forkProcess only:
2912   // like deleteThread(), but we delete threads in foreign calls, too.
2913
2914     if (tso->why_blocked == BlockedOnCCall ||
2915         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2916         unblockOne(cap,tso);
2917         tso->what_next = ThreadKilled;
2918     } else {
2919         deleteThread(cap,tso);
2920     }
2921 }
2922 #endif
2923
2924 /* -----------------------------------------------------------------------------
2925    raiseExceptionHelper
2926    
2927    This function is called by the raise# primitve, just so that we can
2928    move some of the tricky bits of raising an exception from C-- into
2929    C.  Who knows, it might be a useful re-useable thing here too.
2930    -------------------------------------------------------------------------- */
2931
2932 StgWord
2933 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2934 {
2935     Capability *cap = regTableToCapability(reg);
2936     StgThunk *raise_closure = NULL;
2937     StgPtr p, next;
2938     StgRetInfoTable *info;
2939     //
2940     // This closure represents the expression 'raise# E' where E
2941     // is the exception raise.  It is used to overwrite all the
2942     // thunks which are currently under evaluataion.
2943     //
2944
2945     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2946     // LDV profiling: stg_raise_info has THUNK as its closure
2947     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2948     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2949     // 1 does not cause any problem unless profiling is performed.
2950     // However, when LDV profiling goes on, we need to linearly scan
2951     // small object pool, where raise_closure is stored, so we should
2952     // use MIN_UPD_SIZE.
2953     //
2954     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2955     //                                 sizeofW(StgClosure)+1);
2956     //
2957
2958     //
2959     // Walk up the stack, looking for the catch frame.  On the way,
2960     // we update any closures pointed to from update frames with the
2961     // raise closure that we just built.
2962     //
2963     p = tso->sp;
2964     while(1) {
2965         info = get_ret_itbl((StgClosure *)p);
2966         next = p + stack_frame_sizeW((StgClosure *)p);
2967         switch (info->i.type) {
2968             
2969         case UPDATE_FRAME:
2970             // Only create raise_closure if we need to.
2971             if (raise_closure == NULL) {
2972                 raise_closure = 
2973                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2974                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2975                 raise_closure->payload[0] = exception;
2976             }
2977             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2978             p = next;
2979             continue;
2980
2981         case ATOMICALLY_FRAME:
2982             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2983             tso->sp = p;
2984             return ATOMICALLY_FRAME;
2985             
2986         case CATCH_FRAME:
2987             tso->sp = p;
2988             return CATCH_FRAME;
2989
2990         case CATCH_STM_FRAME:
2991             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2992             tso->sp = p;
2993             return CATCH_STM_FRAME;
2994             
2995         case STOP_FRAME:
2996             tso->sp = p;
2997             return STOP_FRAME;
2998
2999         case CATCH_RETRY_FRAME:
3000         default:
3001             p = next; 
3002             continue;
3003         }
3004     }
3005 }
3006
3007
3008 /* -----------------------------------------------------------------------------
3009    findRetryFrameHelper
3010
3011    This function is called by the retry# primitive.  It traverses the stack
3012    leaving tso->sp referring to the frame which should handle the retry.  
3013
3014    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3015    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3016
3017    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3018    create) because retries are not considered to be exceptions, despite the
3019    similar implementation.
3020
3021    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3022    not be created within memory transactions.
3023    -------------------------------------------------------------------------- */
3024
3025 StgWord
3026 findRetryFrameHelper (StgTSO *tso)
3027 {
3028   StgPtr           p, next;
3029   StgRetInfoTable *info;
3030
3031   p = tso -> sp;
3032   while (1) {
3033     info = get_ret_itbl((StgClosure *)p);
3034     next = p + stack_frame_sizeW((StgClosure *)p);
3035     switch (info->i.type) {
3036       
3037     case ATOMICALLY_FRAME:
3038         debugTrace(DEBUG_stm,
3039                    "found ATOMICALLY_FRAME at %p during retry", p);
3040         tso->sp = p;
3041         return ATOMICALLY_FRAME;
3042       
3043     case CATCH_RETRY_FRAME:
3044         debugTrace(DEBUG_stm,
3045                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3046         tso->sp = p;
3047         return CATCH_RETRY_FRAME;
3048       
3049     case CATCH_STM_FRAME: {
3050         debugTrace(DEBUG_stm,
3051                    "found CATCH_STM_FRAME at %p during retry", p);
3052         StgTRecHeader *trec = tso -> trec;
3053         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3054         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3055         stmAbortTransaction(tso -> cap, trec);
3056         stmFreeAbortedTRec(tso -> cap, trec);
3057         tso -> trec = outer;
3058         p = next; 
3059         continue;
3060     }
3061       
3062
3063     default:
3064       ASSERT(info->i.type != CATCH_FRAME);
3065       ASSERT(info->i.type != STOP_FRAME);
3066       p = next; 
3067       continue;
3068     }
3069   }
3070 }
3071
3072 /* -----------------------------------------------------------------------------
3073    resurrectThreads is called after garbage collection on the list of
3074    threads found to be garbage.  Each of these threads will be woken
3075    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3076    on an MVar, or NonTermination if the thread was blocked on a Black
3077    Hole.
3078
3079    Locks: assumes we hold *all* the capabilities.
3080    -------------------------------------------------------------------------- */
3081
3082 void
3083 resurrectThreads (StgTSO *threads)
3084 {
3085     StgTSO *tso, *next;
3086     Capability *cap;
3087
3088     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3089         next = tso->global_link;
3090         tso->global_link = all_threads;
3091         all_threads = tso;
3092         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3093         
3094         // Wake up the thread on the Capability it was last on
3095         cap = tso->cap;
3096         
3097         switch (tso->why_blocked) {
3098         case BlockedOnMVar:
3099         case BlockedOnException:
3100             /* Called by GC - sched_mutex lock is currently held. */
3101             throwToSingleThreaded(cap, tso,
3102                                   (StgClosure *)BlockedOnDeadMVar_closure);
3103             break;
3104         case BlockedOnBlackHole:
3105             throwToSingleThreaded(cap, tso,
3106                                   (StgClosure *)NonTermination_closure);
3107             break;
3108         case BlockedOnSTM:
3109             throwToSingleThreaded(cap, tso,
3110                                   (StgClosure *)BlockedIndefinitely_closure);
3111             break;
3112         case NotBlocked:
3113             /* This might happen if the thread was blocked on a black hole
3114              * belonging to a thread that we've just woken up (raiseAsync
3115              * can wake up threads, remember...).
3116              */
3117             continue;
3118         default:
3119             barf("resurrectThreads: thread blocked in a strange way");
3120         }
3121     }
3122 }