Fixed uninitialised FunBind fun_tick field
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "OSThreads.h"
15 #include "Storage.h"
16 #include "StgRun.h"
17 #include "Hooks.h"
18 #include "Schedule.h"
19 #include "StgMiscClosures.h"
20 #include "Interpreter.h"
21 #include "Printer.h"
22 #include "RtsSignals.h"
23 #include "Sanity.h"
24 #include "Stats.h"
25 #include "STM.h"
26 #include "Timer.h"
27 #include "Prelude.h"
28 #include "ThreadLabels.h"
29 #include "LdvProfile.h"
30 #include "Updates.h"
31 #ifdef PROFILING
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #endif
35 #if defined(GRAN) || defined(PARALLEL_HASKELL)
36 # include "GranSimRts.h"
37 # include "GranSim.h"
38 # include "ParallelRts.h"
39 # include "Parallel.h"
40 # include "ParallelDebug.h"
41 # include "FetchMe.h"
42 # include "HLC.h"
43 #endif
44 #include "Sparks.h"
45 #include "Capability.h"
46 #include "Task.h"
47 #include "AwaitEvent.h"
48 #if defined(mingw32_HOST_OS)
49 #include "win32/IOManager.h"
50 #endif
51 #include "Trace.h"
52 #include "RaiseAsync.h"
53 #include "Threads.h"
54 #include "ThrIOManager.h"
55
56 #ifdef HAVE_SYS_TYPES_H
57 #include <sys/types.h>
58 #endif
59 #ifdef HAVE_UNISTD_H
60 #include <unistd.h>
61 #endif
62
63 #include <string.h>
64 #include <stdlib.h>
65 #include <stdarg.h>
66
67 #ifdef HAVE_ERRNO_H
68 #include <errno.h>
69 #endif
70
71 // Turn off inlining when debugging - it obfuscates things
72 #ifdef DEBUG
73 # undef  STATIC_INLINE
74 # define STATIC_INLINE static
75 #endif
76
77 /* -----------------------------------------------------------------------------
78  * Global variables
79  * -------------------------------------------------------------------------- */
80
81 #if defined(GRAN)
82
83 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
84 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
85
86 /* 
87    In GranSim we have a runnable and a blocked queue for each processor.
88    In order to minimise code changes new arrays run_queue_hds/tls
89    are created. run_queue_hd is then a short cut (macro) for
90    run_queue_hds[CurrentProc] (see GranSim.h).
91    -- HWL
92 */
93 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
94 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
95 StgTSO *ccalling_threadss[MAX_PROC];
96 /* We use the same global list of threads (all_threads) in GranSim as in
97    the std RTS (i.e. we are cheating). However, we don't use this list in
98    the GranSim specific code at the moment (so we are only potentially
99    cheating).  */
100
101 #else /* !GRAN */
102
103 #if !defined(THREADED_RTS)
104 // Blocked/sleeping thrads
105 StgTSO *blocked_queue_hd = NULL;
106 StgTSO *blocked_queue_tl = NULL;
107 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
108 #endif
109
110 /* Threads blocked on blackholes.
111  * LOCK: sched_mutex+capability, or all capabilities
112  */
113 StgTSO *blackhole_queue = NULL;
114 #endif
115
116 /* The blackhole_queue should be checked for threads to wake up.  See
117  * Schedule.h for more thorough comment.
118  * LOCK: none (doesn't matter if we miss an update)
119  */
120 rtsBool blackholes_need_checking = rtsFalse;
121
122 /* Linked list of all threads.
123  * Used for detecting garbage collected threads.
124  * LOCK: sched_mutex+capability, or all capabilities
125  */
126 StgTSO *all_threads = NULL;
127
128 /* flag set by signal handler to precipitate a context switch
129  * LOCK: none (just an advisory flag)
130  */
131 int context_switch = 0;
132
133 /* flag that tracks whether we have done any execution in this time slice.
134  * LOCK: currently none, perhaps we should lock (but needs to be
135  * updated in the fast path of the scheduler).
136  */
137 nat recent_activity = ACTIVITY_YES;
138
139 /* if this flag is set as well, give up execution
140  * LOCK: none (changes once, from false->true)
141  */
142 rtsBool sched_state = SCHED_RUNNING;
143
144 #if defined(GRAN)
145 StgTSO *CurrentTSO;
146 #endif
147
148 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
149  *  exists - earlier gccs apparently didn't.
150  *  -= chak
151  */
152 StgTSO dummy_tso;
153
154 /*
155  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
156  * in an MT setting, needed to signal that a worker thread shouldn't hang around
157  * in the scheduler when it is out of work.
158  */
159 rtsBool shutting_down_scheduler = rtsFalse;
160
161 /*
162  * This mutex protects most of the global scheduler data in
163  * the THREADED_RTS runtime.
164  */
165 #if defined(THREADED_RTS)
166 Mutex sched_mutex;
167 #endif
168
169 #if defined(PARALLEL_HASKELL)
170 StgTSO *LastTSO;
171 rtsTime TimeOfLastYield;
172 rtsBool emitSchedule = rtsTrue;
173 #endif
174
175 #if !defined(mingw32_HOST_OS)
176 #define FORKPROCESS_PRIMOP_SUPPORTED
177 #endif
178
179 /* -----------------------------------------------------------------------------
180  * static function prototypes
181  * -------------------------------------------------------------------------- */
182
183 static Capability *schedule (Capability *initialCapability, Task *task);
184
185 //
186 // These function all encapsulate parts of the scheduler loop, and are
187 // abstracted only to make the structure and control flow of the
188 // scheduler clearer.
189 //
190 static void schedulePreLoop (void);
191 #if defined(THREADED_RTS)
192 static void schedulePushWork(Capability *cap, Task *task);
193 #endif
194 static void scheduleStartSignalHandlers (Capability *cap);
195 static void scheduleCheckBlockedThreads (Capability *cap);
196 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
197 static void scheduleCheckBlackHoles (Capability *cap);
198 static void scheduleDetectDeadlock (Capability *cap, Task *task);
199 #if defined(GRAN)
200 static StgTSO *scheduleProcessEvent(rtsEvent *event);
201 #endif
202 #if defined(PARALLEL_HASKELL)
203 static StgTSO *scheduleSendPendingMessages(void);
204 static void scheduleActivateSpark(void);
205 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
206 #endif
207 #if defined(PAR) || defined(GRAN)
208 static void scheduleGranParReport(void);
209 #endif
210 static void schedulePostRunThread(void);
211 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
212 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
213                                          StgTSO *t);
214 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
215                                     nat prev_what_next );
216 static void scheduleHandleThreadBlocked( StgTSO *t );
217 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
218                                              StgTSO *t );
219 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
220 static Capability *scheduleDoGC(Capability *cap, Task *task,
221                                 rtsBool force_major);
222
223 static rtsBool checkBlackHoles(Capability *cap);
224
225 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
226
227 static void deleteThread (Capability *cap, StgTSO *tso);
228 static void deleteAllThreads (Capability *cap);
229
230 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
231 static void deleteThread_(Capability *cap, StgTSO *tso);
232 #endif
233
234 #if defined(PARALLEL_HASKELL)
235 StgTSO * createSparkThread(rtsSpark spark);
236 StgTSO * activateSpark (rtsSpark spark);  
237 #endif
238
239 #ifdef DEBUG
240 static char *whatNext_strs[] = {
241   "(unknown)",
242   "ThreadRunGHC",
243   "ThreadInterpret",
244   "ThreadKilled",
245   "ThreadRelocated",
246   "ThreadComplete"
247 };
248 #endif
249
250 /* -----------------------------------------------------------------------------
251  * Putting a thread on the run queue: different scheduling policies
252  * -------------------------------------------------------------------------- */
253
254 STATIC_INLINE void
255 addToRunQueue( Capability *cap, StgTSO *t )
256 {
257 #if defined(PARALLEL_HASKELL)
258     if (RtsFlags.ParFlags.doFairScheduling) { 
259         // this does round-robin scheduling; good for concurrency
260         appendToRunQueue(cap,t);
261     } else {
262         // this does unfair scheduling; good for parallelism
263         pushOnRunQueue(cap,t);
264     }
265 #else
266     // this does round-robin scheduling; good for concurrency
267     appendToRunQueue(cap,t);
268 #endif
269 }
270
271 /* ---------------------------------------------------------------------------
272    Main scheduling loop.
273
274    We use round-robin scheduling, each thread returning to the
275    scheduler loop when one of these conditions is detected:
276
277       * out of heap space
278       * timer expires (thread yields)
279       * thread blocks
280       * thread ends
281       * stack overflow
282
283    GRAN version:
284      In a GranSim setup this loop iterates over the global event queue.
285      This revolves around the global event queue, which determines what 
286      to do next. Therefore, it's more complicated than either the 
287      concurrent or the parallel (GUM) setup.
288
289    GUM version:
290      GUM iterates over incoming messages.
291      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
292      and sends out a fish whenever it has nothing to do; in-between
293      doing the actual reductions (shared code below) it processes the
294      incoming messages and deals with delayed operations 
295      (see PendingFetches).
296      This is not the ugliest code you could imagine, but it's bloody close.
297
298    ------------------------------------------------------------------------ */
299
300 static Capability *
301 schedule (Capability *initialCapability, Task *task)
302 {
303   StgTSO *t;
304   Capability *cap;
305   StgThreadReturnCode ret;
306 #if defined(GRAN)
307   rtsEvent *event;
308 #elif defined(PARALLEL_HASKELL)
309   StgTSO *tso;
310   GlobalTaskId pe;
311   rtsBool receivedFinish = rtsFalse;
312 # if defined(DEBUG)
313   nat tp_size, sp_size; // stats only
314 # endif
315 #endif
316   nat prev_what_next;
317   rtsBool ready_to_gc;
318 #if defined(THREADED_RTS)
319   rtsBool first = rtsTrue;
320 #endif
321   
322   cap = initialCapability;
323
324   // Pre-condition: this task owns initialCapability.
325   // The sched_mutex is *NOT* held
326   // NB. on return, we still hold a capability.
327
328   debugTrace (DEBUG_sched, 
329               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
330               task, initialCapability);
331
332   schedulePreLoop();
333
334   // -----------------------------------------------------------
335   // Scheduler loop starts here:
336
337 #if defined(PARALLEL_HASKELL)
338 #define TERMINATION_CONDITION        (!receivedFinish)
339 #elif defined(GRAN)
340 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
341 #else
342 #define TERMINATION_CONDITION        rtsTrue
343 #endif
344
345   while (TERMINATION_CONDITION) {
346
347 #if defined(GRAN)
348       /* Choose the processor with the next event */
349       CurrentProc = event->proc;
350       CurrentTSO = event->tso;
351 #endif
352
353 #if defined(THREADED_RTS)
354       if (first) {
355           // don't yield the first time, we want a chance to run this
356           // thread for a bit, even if there are others banging at the
357           // door.
358           first = rtsFalse;
359           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
360       } else {
361           // Yield the capability to higher-priority tasks if necessary.
362           yieldCapability(&cap, task);
363       }
364 #endif
365       
366 #if defined(THREADED_RTS)
367       schedulePushWork(cap,task);
368 #endif
369
370     // Check whether we have re-entered the RTS from Haskell without
371     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
372     // call).
373     if (cap->in_haskell) {
374           errorBelch("schedule: re-entered unsafely.\n"
375                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
376           stg_exit(EXIT_FAILURE);
377     }
378
379     // The interruption / shutdown sequence.
380     // 
381     // In order to cleanly shut down the runtime, we want to:
382     //   * make sure that all main threads return to their callers
383     //     with the state 'Interrupted'.
384     //   * clean up all OS threads assocated with the runtime
385     //   * free all memory etc.
386     //
387     // So the sequence for ^C goes like this:
388     //
389     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
390     //     arranges for some Capability to wake up
391     //
392     //   * all threads in the system are halted, and the zombies are
393     //     placed on the run queue for cleaning up.  We acquire all
394     //     the capabilities in order to delete the threads, this is
395     //     done by scheduleDoGC() for convenience (because GC already
396     //     needs to acquire all the capabilities).  We can't kill
397     //     threads involved in foreign calls.
398     // 
399     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
400     //
401     //   * sched_state := SCHED_SHUTTING_DOWN
402     //
403     //   * all workers exit when the run queue on their capability
404     //     drains.  All main threads will also exit when their TSO
405     //     reaches the head of the run queue and they can return.
406     //
407     //   * eventually all Capabilities will shut down, and the RTS can
408     //     exit.
409     //
410     //   * We might be left with threads blocked in foreign calls, 
411     //     we should really attempt to kill these somehow (TODO);
412     
413     switch (sched_state) {
414     case SCHED_RUNNING:
415         break;
416     case SCHED_INTERRUPTING:
417         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
418 #if defined(THREADED_RTS)
419         discardSparksCap(cap);
420 #endif
421         /* scheduleDoGC() deletes all the threads */
422         cap = scheduleDoGC(cap,task,rtsFalse);
423         break;
424     case SCHED_SHUTTING_DOWN:
425         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
426         // If we are a worker, just exit.  If we're a bound thread
427         // then we will exit below when we've removed our TSO from
428         // the run queue.
429         if (task->tso == NULL && emptyRunQueue(cap)) {
430             return cap;
431         }
432         break;
433     default:
434         barf("sched_state: %d", sched_state);
435     }
436
437 #if defined(THREADED_RTS)
438     // If the run queue is empty, take a spark and turn it into a thread.
439     {
440         if (emptyRunQueue(cap)) {
441             StgClosure *spark;
442             spark = findSpark(cap);
443             if (spark != NULL) {
444                 debugTrace(DEBUG_sched,
445                            "turning spark of closure %p into a thread",
446                            (StgClosure *)spark);
447                 createSparkThread(cap,spark);     
448             }
449         }
450     }
451 #endif // THREADED_RTS
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckWakeupThreads(cap);
462
463     scheduleCheckBlockedThreads(cap);
464
465     scheduleDetectDeadlock(cap,task);
466 #if defined(THREADED_RTS)
467     cap = task->cap;    // reload cap, it might have changed
468 #endif
469
470     // Normally, the only way we can get here with no threads to
471     // run is if a keyboard interrupt received during 
472     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
473     // Additionally, it is not fatal for the
474     // threaded RTS to reach here with no threads to run.
475     //
476     // win32: might be here due to awaitEvent() being abandoned
477     // as a result of a console event having been delivered.
478     if ( emptyRunQueue(cap) ) {
479 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
480         ASSERT(sched_state >= SCHED_INTERRUPTING);
481 #endif
482         continue; // nothing to do
483     }
484
485 #if defined(PARALLEL_HASKELL)
486     scheduleSendPendingMessages();
487     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
488         continue;
489
490 #if defined(SPARKS)
491     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
492 #endif
493
494     /* If we still have no work we need to send a FISH to get a spark
495        from another PE */
496     if (emptyRunQueue(cap)) {
497         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
498         ASSERT(rtsFalse); // should not happen at the moment
499     }
500     // from here: non-empty run queue.
501     //  TODO: merge above case with this, only one call processMessages() !
502     if (PacketsWaiting()) {  /* process incoming messages, if
503                                 any pending...  only in else
504                                 because getRemoteWork waits for
505                                 messages as well */
506         receivedFinish = processMessages();
507     }
508 #endif
509
510 #if defined(GRAN)
511     scheduleProcessEvent(event);
512 #endif
513
514     // 
515     // Get a thread to run
516     //
517     t = popRunQueue(cap);
518
519 #if defined(GRAN) || defined(PAR)
520     scheduleGranParReport(); // some kind of debuging output
521 #else
522     // Sanity check the thread we're about to run.  This can be
523     // expensive if there is lots of thread switching going on...
524     IF_DEBUG(sanity,checkTSO(t));
525 #endif
526
527 #if defined(THREADED_RTS)
528     // Check whether we can run this thread in the current task.
529     // If not, we have to pass our capability to the right task.
530     {
531         Task *bound = t->bound;
532       
533         if (bound) {
534             if (bound == task) {
535                 debugTrace(DEBUG_sched,
536                            "### Running thread %lu in bound thread", (unsigned long)t->id);
537                 // yes, the Haskell thread is bound to the current native thread
538             } else {
539                 debugTrace(DEBUG_sched,
540                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
541                 // no, bound to a different Haskell thread: pass to that thread
542                 pushOnRunQueue(cap,t);
543                 continue;
544             }
545         } else {
546             // The thread we want to run is unbound.
547             if (task->tso) { 
548                 debugTrace(DEBUG_sched,
549                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
550                 // no, the current native thread is bound to a different
551                 // Haskell thread, so pass it to any worker thread
552                 pushOnRunQueue(cap,t);
553                 continue; 
554             }
555         }
556     }
557 #endif
558
559     cap->r.rCurrentTSO = t;
560     
561     /* context switches are initiated by the timer signal, unless
562      * the user specified "context switch as often as possible", with
563      * +RTS -C0
564      */
565     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
566         && !emptyThreadQueues(cap)) {
567         context_switch = 1;
568     }
569          
570 run_thread:
571
572     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
573                               (long)t->id, whatNext_strs[t->what_next]);
574
575 #if defined(PROFILING)
576     startHeapProfTimer();
577 #endif
578
579     // Check for exceptions blocked on this thread
580     maybePerformBlockedException (cap, t);
581
582     // ----------------------------------------------------------------------
583     // Run the current thread 
584
585     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
586     ASSERT(t->cap == cap);
587
588     prev_what_next = t->what_next;
589
590     errno = t->saved_errno;
591     cap->in_haskell = rtsTrue;
592
593     dirtyTSO(t);
594
595     recent_activity = ACTIVITY_YES;
596
597     switch (prev_what_next) {
598         
599     case ThreadKilled:
600     case ThreadComplete:
601         /* Thread already finished, return to scheduler. */
602         ret = ThreadFinished;
603         break;
604         
605     case ThreadRunGHC:
606     {
607         StgRegTable *r;
608         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
609         cap = regTableToCapability(r);
610         ret = r->rRet;
611         break;
612     }
613     
614     case ThreadInterpret:
615         cap = interpretBCO(cap);
616         ret = cap->r.rRet;
617         break;
618         
619     default:
620         barf("schedule: invalid what_next field");
621     }
622
623     cap->in_haskell = rtsFalse;
624
625     // The TSO might have moved, eg. if it re-entered the RTS and a GC
626     // happened.  So find the new location:
627     t = cap->r.rCurrentTSO;
628
629     // We have run some Haskell code: there might be blackhole-blocked
630     // threads to wake up now.
631     // Lock-free test here should be ok, we're just setting a flag.
632     if ( blackhole_queue != END_TSO_QUEUE ) {
633         blackholes_need_checking = rtsTrue;
634     }
635     
636     // And save the current errno in this thread.
637     // XXX: possibly bogus for SMP because this thread might already
638     // be running again, see code below.
639     t->saved_errno = errno;
640
641 #if defined(THREADED_RTS)
642     // If ret is ThreadBlocked, and this Task is bound to the TSO that
643     // blocked, we are in limbo - the TSO is now owned by whatever it
644     // is blocked on, and may in fact already have been woken up,
645     // perhaps even on a different Capability.  It may be the case
646     // that task->cap != cap.  We better yield this Capability
647     // immediately and return to normaility.
648     if (ret == ThreadBlocked) {
649         debugTrace(DEBUG_sched,
650                    "--<< thread %lu (%s) stopped: blocked",
651                    (unsigned long)t->id, whatNext_strs[t->what_next]);
652         continue;
653     }
654 #endif
655
656     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
657     ASSERT(t->cap == cap);
658
659     // ----------------------------------------------------------------------
660     
661     // Costs for the scheduler are assigned to CCS_SYSTEM
662 #if defined(PROFILING)
663     stopHeapProfTimer();
664     CCCS = CCS_SYSTEM;
665 #endif
666     
667     schedulePostRunThread();
668
669     ready_to_gc = rtsFalse;
670
671     switch (ret) {
672     case HeapOverflow:
673         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
674         break;
675
676     case StackOverflow:
677         scheduleHandleStackOverflow(cap,task,t);
678         break;
679
680     case ThreadYielding:
681         if (scheduleHandleYield(cap, t, prev_what_next)) {
682             // shortcut for switching between compiler/interpreter:
683             goto run_thread; 
684         }
685         break;
686
687     case ThreadBlocked:
688         scheduleHandleThreadBlocked(t);
689         break;
690
691     case ThreadFinished:
692         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
693         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
694         break;
695
696     default:
697       barf("schedule: invalid thread return code %d", (int)ret);
698     }
699
700     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
701     if (ready_to_gc) {
702       cap = scheduleDoGC(cap,task,rtsFalse);
703     }
704   } /* end of while() */
705
706   debugTrace(PAR_DEBUG_verbose,
707              "== Leaving schedule() after having received Finish");
708 }
709
710 /* ----------------------------------------------------------------------------
711  * Setting up the scheduler loop
712  * ------------------------------------------------------------------------- */
713
714 static void
715 schedulePreLoop(void)
716 {
717 #if defined(GRAN) 
718     /* set up first event to get things going */
719     /* ToDo: assign costs for system setup and init MainTSO ! */
720     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
721               ContinueThread, 
722               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
723     
724     debugTrace (DEBUG_gran,
725                 "GRAN: Init CurrentTSO (in schedule) = %p", 
726                 CurrentTSO);
727     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
728     
729     if (RtsFlags.GranFlags.Light) {
730         /* Save current time; GranSim Light only */
731         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
732     }      
733 #endif
734 }
735
736 /* -----------------------------------------------------------------------------
737  * schedulePushWork()
738  *
739  * Push work to other Capabilities if we have some.
740  * -------------------------------------------------------------------------- */
741
742 #if defined(THREADED_RTS)
743 static void
744 schedulePushWork(Capability *cap USED_IF_THREADS, 
745                  Task *task      USED_IF_THREADS)
746 {
747     Capability *free_caps[n_capabilities], *cap0;
748     nat i, n_free_caps;
749
750     // migration can be turned off with +RTS -qg
751     if (!RtsFlags.ParFlags.migrate) return;
752
753     // Check whether we have more threads on our run queue, or sparks
754     // in our pool, that we could hand to another Capability.
755     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
756         && sparkPoolSizeCap(cap) < 2) {
757         return;
758     }
759
760     // First grab as many free Capabilities as we can.
761     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
762         cap0 = &capabilities[i];
763         if (cap != cap0 && tryGrabCapability(cap0,task)) {
764             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
765                 // it already has some work, we just grabbed it at 
766                 // the wrong moment.  Or maybe it's deadlocked!
767                 releaseCapability(cap0);
768             } else {
769                 free_caps[n_free_caps++] = cap0;
770             }
771         }
772     }
773
774     // we now have n_free_caps free capabilities stashed in
775     // free_caps[].  Share our run queue equally with them.  This is
776     // probably the simplest thing we could do; improvements we might
777     // want to do include:
778     //
779     //   - giving high priority to moving relatively new threads, on 
780     //     the gournds that they haven't had time to build up a
781     //     working set in the cache on this CPU/Capability.
782     //
783     //   - giving low priority to moving long-lived threads
784
785     if (n_free_caps > 0) {
786         StgTSO *prev, *t, *next;
787         rtsBool pushed_to_all;
788
789         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
790
791         i = 0;
792         pushed_to_all = rtsFalse;
793
794         if (cap->run_queue_hd != END_TSO_QUEUE) {
795             prev = cap->run_queue_hd;
796             t = prev->link;
797             prev->link = END_TSO_QUEUE;
798             for (; t != END_TSO_QUEUE; t = next) {
799                 next = t->link;
800                 t->link = END_TSO_QUEUE;
801                 if (t->what_next == ThreadRelocated
802                     || t->bound == task // don't move my bound thread
803                     || tsoLocked(t)) {  // don't move a locked thread
804                     prev->link = t;
805                     prev = t;
806                 } else if (i == n_free_caps) {
807                     pushed_to_all = rtsTrue;
808                     i = 0;
809                     // keep one for us
810                     prev->link = t;
811                     prev = t;
812                 } else {
813                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
814                     appendToRunQueue(free_caps[i],t);
815                     if (t->bound) { t->bound->cap = free_caps[i]; }
816                     t->cap = free_caps[i];
817                     i++;
818                 }
819             }
820             cap->run_queue_tl = prev;
821         }
822
823         // If there are some free capabilities that we didn't push any
824         // threads to, then try to push a spark to each one.
825         if (!pushed_to_all) {
826             StgClosure *spark;
827             // i is the next free capability to push to
828             for (; i < n_free_caps; i++) {
829                 if (emptySparkPoolCap(free_caps[i])) {
830                     spark = findSpark(cap);
831                     if (spark != NULL) {
832                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
833                         newSpark(&(free_caps[i]->r), spark);
834                     }
835                 }
836             }
837         }
838
839         // release the capabilities
840         for (i = 0; i < n_free_caps; i++) {
841             task->cap = free_caps[i];
842             releaseCapability(free_caps[i]);
843         }
844     }
845     task->cap = cap; // reset to point to our Capability.
846 }
847 #endif
848
849 /* ----------------------------------------------------------------------------
850  * Start any pending signal handlers
851  * ------------------------------------------------------------------------- */
852
853 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
854 static void
855 scheduleStartSignalHandlers(Capability *cap)
856 {
857     if (signals_pending()) { // safe outside the lock
858         startSignalHandlers(cap);
859     }
860 }
861 #else
862 static void
863 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
864 {
865 }
866 #endif
867
868 /* ----------------------------------------------------------------------------
869  * Check for blocked threads that can be woken up.
870  * ------------------------------------------------------------------------- */
871
872 static void
873 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
874 {
875 #if !defined(THREADED_RTS)
876     //
877     // Check whether any waiting threads need to be woken up.  If the
878     // run queue is empty, and there are no other tasks running, we
879     // can wait indefinitely for something to happen.
880     //
881     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
882     {
883         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
884     }
885 #endif
886 }
887
888
889 /* ----------------------------------------------------------------------------
890  * Check for threads woken up by other Capabilities
891  * ------------------------------------------------------------------------- */
892
893 static void
894 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
895 {
896 #if defined(THREADED_RTS)
897     // Any threads that were woken up by other Capabilities get
898     // appended to our run queue.
899     if (!emptyWakeupQueue(cap)) {
900         ACQUIRE_LOCK(&cap->lock);
901         if (emptyRunQueue(cap)) {
902             cap->run_queue_hd = cap->wakeup_queue_hd;
903             cap->run_queue_tl = cap->wakeup_queue_tl;
904         } else {
905             cap->run_queue_tl->link = cap->wakeup_queue_hd;
906             cap->run_queue_tl = cap->wakeup_queue_tl;
907         }
908         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
909         RELEASE_LOCK(&cap->lock);
910     }
911 #endif
912 }
913
914 /* ----------------------------------------------------------------------------
915  * Check for threads blocked on BLACKHOLEs that can be woken up
916  * ------------------------------------------------------------------------- */
917 static void
918 scheduleCheckBlackHoles (Capability *cap)
919 {
920     if ( blackholes_need_checking ) // check without the lock first
921     {
922         ACQUIRE_LOCK(&sched_mutex);
923         if ( blackholes_need_checking ) {
924             checkBlackHoles(cap);
925             blackholes_need_checking = rtsFalse;
926         }
927         RELEASE_LOCK(&sched_mutex);
928     }
929 }
930
931 /* ----------------------------------------------------------------------------
932  * Detect deadlock conditions and attempt to resolve them.
933  * ------------------------------------------------------------------------- */
934
935 static void
936 scheduleDetectDeadlock (Capability *cap, Task *task)
937 {
938
939 #if defined(PARALLEL_HASKELL)
940     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
941     return;
942 #endif
943
944     /* 
945      * Detect deadlock: when we have no threads to run, there are no
946      * threads blocked, waiting for I/O, or sleeping, and all the
947      * other tasks are waiting for work, we must have a deadlock of
948      * some description.
949      */
950     if ( emptyThreadQueues(cap) )
951     {
952 #if defined(THREADED_RTS)
953         /* 
954          * In the threaded RTS, we only check for deadlock if there
955          * has been no activity in a complete timeslice.  This means
956          * we won't eagerly start a full GC just because we don't have
957          * any threads to run currently.
958          */
959         if (recent_activity != ACTIVITY_INACTIVE) return;
960 #endif
961
962         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
963
964         // Garbage collection can release some new threads due to
965         // either (a) finalizers or (b) threads resurrected because
966         // they are unreachable and will therefore be sent an
967         // exception.  Any threads thus released will be immediately
968         // runnable.
969         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
970
971         recent_activity = ACTIVITY_DONE_GC;
972         
973         if ( !emptyRunQueue(cap) ) return;
974
975 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
976         /* If we have user-installed signal handlers, then wait
977          * for signals to arrive rather then bombing out with a
978          * deadlock.
979          */
980         if ( anyUserHandlers() ) {
981             debugTrace(DEBUG_sched,
982                        "still deadlocked, waiting for signals...");
983
984             awaitUserSignals();
985
986             if (signals_pending()) {
987                 startSignalHandlers(cap);
988             }
989
990             // either we have threads to run, or we were interrupted:
991             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
992         }
993 #endif
994
995 #if !defined(THREADED_RTS)
996         /* Probably a real deadlock.  Send the current main thread the
997          * Deadlock exception.
998          */
999         if (task->tso) {
1000             switch (task->tso->why_blocked) {
1001             case BlockedOnSTM:
1002             case BlockedOnBlackHole:
1003             case BlockedOnException:
1004             case BlockedOnMVar:
1005                 throwToSingleThreaded(cap, task->tso, 
1006                                       (StgClosure *)NonTermination_closure);
1007                 return;
1008             default:
1009                 barf("deadlock: main thread blocked in a strange way");
1010             }
1011         }
1012         return;
1013 #endif
1014     }
1015 }
1016
1017 /* ----------------------------------------------------------------------------
1018  * Process an event (GRAN only)
1019  * ------------------------------------------------------------------------- */
1020
1021 #if defined(GRAN)
1022 static StgTSO *
1023 scheduleProcessEvent(rtsEvent *event)
1024 {
1025     StgTSO *t;
1026
1027     if (RtsFlags.GranFlags.Light)
1028       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1029
1030     /* adjust time based on time-stamp */
1031     if (event->time > CurrentTime[CurrentProc] &&
1032         event->evttype != ContinueThread)
1033       CurrentTime[CurrentProc] = event->time;
1034     
1035     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1036     if (!RtsFlags.GranFlags.Light)
1037       handleIdlePEs();
1038
1039     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1040
1041     /* main event dispatcher in GranSim */
1042     switch (event->evttype) {
1043       /* Should just be continuing execution */
1044     case ContinueThread:
1045       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1046       /* ToDo: check assertion
1047       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1048              run_queue_hd != END_TSO_QUEUE);
1049       */
1050       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1051       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1052           procStatus[CurrentProc]==Fetching) {
1053         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1054               CurrentTSO->id, CurrentTSO, CurrentProc);
1055         goto next_thread;
1056       } 
1057       /* Ignore ContinueThreads for completed threads */
1058       if (CurrentTSO->what_next == ThreadComplete) {
1059         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1060               CurrentTSO->id, CurrentTSO, CurrentProc);
1061         goto next_thread;
1062       } 
1063       /* Ignore ContinueThreads for threads that are being migrated */
1064       if (PROCS(CurrentTSO)==Nowhere) { 
1065         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1066               CurrentTSO->id, CurrentTSO, CurrentProc);
1067         goto next_thread;
1068       }
1069       /* The thread should be at the beginning of the run queue */
1070       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1071         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1072               CurrentTSO->id, CurrentTSO, CurrentProc);
1073         break; // run the thread anyway
1074       }
1075       /*
1076       new_event(proc, proc, CurrentTime[proc],
1077                 FindWork,
1078                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1079       goto next_thread; 
1080       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1081       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1082
1083     case FetchNode:
1084       do_the_fetchnode(event);
1085       goto next_thread;             /* handle next event in event queue  */
1086       
1087     case GlobalBlock:
1088       do_the_globalblock(event);
1089       goto next_thread;             /* handle next event in event queue  */
1090       
1091     case FetchReply:
1092       do_the_fetchreply(event);
1093       goto next_thread;             /* handle next event in event queue  */
1094       
1095     case UnblockThread:   /* Move from the blocked queue to the tail of */
1096       do_the_unblock(event);
1097       goto next_thread;             /* handle next event in event queue  */
1098       
1099     case ResumeThread:  /* Move from the blocked queue to the tail of */
1100       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1101       event->tso->gran.blocktime += 
1102         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1103       do_the_startthread(event);
1104       goto next_thread;             /* handle next event in event queue  */
1105       
1106     case StartThread:
1107       do_the_startthread(event);
1108       goto next_thread;             /* handle next event in event queue  */
1109       
1110     case MoveThread:
1111       do_the_movethread(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case MoveSpark:
1115       do_the_movespark(event);
1116       goto next_thread;             /* handle next event in event queue  */
1117       
1118     case FindWork:
1119       do_the_findwork(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     default:
1123       barf("Illegal event type %u\n", event->evttype);
1124     }  /* switch */
1125     
1126     /* This point was scheduler_loop in the old RTS */
1127
1128     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1129
1130     TimeOfLastEvent = CurrentTime[CurrentProc];
1131     TimeOfNextEvent = get_time_of_next_event();
1132     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1133     // CurrentTSO = ThreadQueueHd;
1134
1135     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1136                          TimeOfNextEvent));
1137
1138     if (RtsFlags.GranFlags.Light) 
1139       GranSimLight_leave_system(event, &ActiveTSO); 
1140
1141     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1142
1143     IF_DEBUG(gran, 
1144              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1145
1146     /* in a GranSim setup the TSO stays on the run queue */
1147     t = CurrentTSO;
1148     /* Take a thread from the run queue. */
1149     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1150
1151     IF_DEBUG(gran, 
1152              debugBelch("GRAN: About to run current thread, which is\n");
1153              G_TSO(t,5));
1154
1155     context_switch = 0; // turned on via GranYield, checking events and time slice
1156
1157     IF_DEBUG(gran, 
1158              DumpGranEvent(GR_SCHEDULE, t));
1159
1160     procStatus[CurrentProc] = Busy;
1161 }
1162 #endif // GRAN
1163
1164 /* ----------------------------------------------------------------------------
1165  * Send pending messages (PARALLEL_HASKELL only)
1166  * ------------------------------------------------------------------------- */
1167
1168 #if defined(PARALLEL_HASKELL)
1169 static StgTSO *
1170 scheduleSendPendingMessages(void)
1171 {
1172     StgSparkPool *pool;
1173     rtsSpark spark;
1174     StgTSO *t;
1175
1176 # if defined(PAR) // global Mem.Mgmt., omit for now
1177     if (PendingFetches != END_BF_QUEUE) {
1178         processFetches();
1179     }
1180 # endif
1181     
1182     if (RtsFlags.ParFlags.BufferTime) {
1183         // if we use message buffering, we must send away all message
1184         // packets which have become too old...
1185         sendOldBuffers(); 
1186     }
1187 }
1188 #endif
1189
1190 /* ----------------------------------------------------------------------------
1191  * Activate spark threads (PARALLEL_HASKELL only)
1192  * ------------------------------------------------------------------------- */
1193
1194 #if defined(PARALLEL_HASKELL)
1195 static void
1196 scheduleActivateSpark(void)
1197 {
1198 #if defined(SPARKS)
1199   ASSERT(emptyRunQueue());
1200 /* We get here if the run queue is empty and want some work.
1201    We try to turn a spark into a thread, and add it to the run queue,
1202    from where it will be picked up in the next iteration of the scheduler
1203    loop.
1204 */
1205
1206       /* :-[  no local threads => look out for local sparks */
1207       /* the spark pool for the current PE */
1208       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1209       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1210           pool->hd < pool->tl) {
1211         /* 
1212          * ToDo: add GC code check that we really have enough heap afterwards!!
1213          * Old comment:
1214          * If we're here (no runnable threads) and we have pending
1215          * sparks, we must have a space problem.  Get enough space
1216          * to turn one of those pending sparks into a
1217          * thread... 
1218          */
1219
1220         spark = findSpark(rtsFalse);            /* get a spark */
1221         if (spark != (rtsSpark) NULL) {
1222           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1223           IF_PAR_DEBUG(fish, // schedule,
1224                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1225                              tso->id, tso, advisory_thread_count));
1226
1227           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1228             IF_PAR_DEBUG(fish, // schedule,
1229                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1230                             spark));
1231             return rtsFalse; /* failed to generate a thread */
1232           }                  /* otherwise fall through & pick-up new tso */
1233         } else {
1234           IF_PAR_DEBUG(fish, // schedule,
1235                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1236                              spark_queue_len(pool)));
1237           return rtsFalse;  /* failed to generate a thread */
1238         }
1239         return rtsTrue;  /* success in generating a thread */
1240   } else { /* no more threads permitted or pool empty */
1241     return rtsFalse;  /* failed to generateThread */
1242   }
1243 #else
1244   tso = NULL; // avoid compiler warning only
1245   return rtsFalse;  /* dummy in non-PAR setup */
1246 #endif // SPARKS
1247 }
1248 #endif // PARALLEL_HASKELL
1249
1250 /* ----------------------------------------------------------------------------
1251  * Get work from a remote node (PARALLEL_HASKELL only)
1252  * ------------------------------------------------------------------------- */
1253     
1254 #if defined(PARALLEL_HASKELL)
1255 static rtsBool
1256 scheduleGetRemoteWork(rtsBool *receivedFinish)
1257 {
1258   ASSERT(emptyRunQueue());
1259
1260   if (RtsFlags.ParFlags.BufferTime) {
1261         IF_PAR_DEBUG(verbose, 
1262                 debugBelch("...send all pending data,"));
1263         {
1264           nat i;
1265           for (i=1; i<=nPEs; i++)
1266             sendImmediately(i); // send all messages away immediately
1267         }
1268   }
1269 # ifndef SPARKS
1270         //++EDEN++ idle() , i.e. send all buffers, wait for work
1271         // suppress fishing in EDEN... just look for incoming messages
1272         // (blocking receive)
1273   IF_PAR_DEBUG(verbose, 
1274                debugBelch("...wait for incoming messages...\n"));
1275   *receivedFinish = processMessages(); // blocking receive...
1276
1277         // and reenter scheduling loop after having received something
1278         // (return rtsFalse below)
1279
1280 # else /* activate SPARKS machinery */
1281 /* We get here, if we have no work, tried to activate a local spark, but still
1282    have no work. We try to get a remote spark, by sending a FISH message.
1283    Thread migration should be added here, and triggered when a sequence of 
1284    fishes returns without work. */
1285         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1286
1287       /* =8-[  no local sparks => look for work on other PEs */
1288         /*
1289          * We really have absolutely no work.  Send out a fish
1290          * (there may be some out there already), and wait for
1291          * something to arrive.  We clearly can't run any threads
1292          * until a SCHEDULE or RESUME arrives, and so that's what
1293          * we're hoping to see.  (Of course, we still have to
1294          * respond to other types of messages.)
1295          */
1296         rtsTime now = msTime() /*CURRENT_TIME*/;
1297         IF_PAR_DEBUG(verbose, 
1298                      debugBelch("--  now=%ld\n", now));
1299         IF_PAR_DEBUG(fish, // verbose,
1300              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1301                  (last_fish_arrived_at!=0 &&
1302                   last_fish_arrived_at+delay > now)) {
1303                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1304                      now, last_fish_arrived_at+delay, 
1305                      last_fish_arrived_at,
1306                      delay);
1307              });
1308   
1309         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1310             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1311           if (last_fish_arrived_at==0 ||
1312               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1313             /* outstandingFishes is set in sendFish, processFish;
1314                avoid flooding system with fishes via delay */
1315     next_fish_to_send_at = 0;  
1316   } else {
1317     /* ToDo: this should be done in the main scheduling loop to avoid the
1318              busy wait here; not so bad if fish delay is very small  */
1319     int iq = 0; // DEBUGGING -- HWL
1320     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1321     /* send a fish when ready, but process messages that arrive in the meantime */
1322     do {
1323       if (PacketsWaiting()) {
1324         iq++; // DEBUGGING
1325         *receivedFinish = processMessages();
1326       }
1327       now = msTime();
1328     } while (!*receivedFinish || now<next_fish_to_send_at);
1329     // JB: This means the fish could become obsolete, if we receive
1330     // work. Better check for work again? 
1331     // last line: while (!receivedFinish || !haveWork || now<...)
1332     // next line: if (receivedFinish || haveWork )
1333
1334     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1335       return rtsFalse;  // NB: this will leave scheduler loop
1336                         // immediately after return!
1337                           
1338     IF_PAR_DEBUG(fish, // verbose,
1339                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1340
1341   }
1342
1343     // JB: IMHO, this should all be hidden inside sendFish(...)
1344     /* pe = choosePE(); 
1345        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1346                 NEW_FISH_HUNGER);
1347
1348     // Global statistics: count no. of fishes
1349     if (RtsFlags.ParFlags.ParStats.Global &&
1350          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1351            globalParStats.tot_fish_mess++;
1352            }
1353     */ 
1354
1355   /* delayed fishes must have been sent by now! */
1356   next_fish_to_send_at = 0;  
1357   }
1358       
1359   *receivedFinish = processMessages();
1360 # endif /* SPARKS */
1361
1362  return rtsFalse;
1363  /* NB: this function always returns rtsFalse, meaning the scheduler
1364     loop continues with the next iteration; 
1365     rationale: 
1366       return code means success in finding work; we enter this function
1367       if there is no local work, thus have to send a fish which takes
1368       time until it arrives with work; in the meantime we should process
1369       messages in the main loop;
1370  */
1371 }
1372 #endif // PARALLEL_HASKELL
1373
1374 /* ----------------------------------------------------------------------------
1375  * PAR/GRAN: Report stats & debugging info(?)
1376  * ------------------------------------------------------------------------- */
1377
1378 #if defined(PAR) || defined(GRAN)
1379 static void
1380 scheduleGranParReport(void)
1381 {
1382   ASSERT(run_queue_hd != END_TSO_QUEUE);
1383
1384   /* Take a thread from the run queue, if we have work */
1385   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1386
1387     /* If this TSO has got its outport closed in the meantime, 
1388      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1389      * It has to be marked as TH_DEAD for this purpose.
1390      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1391
1392 JB: TODO: investigate wether state change field could be nuked
1393      entirely and replaced by the normal tso state (whatnext
1394      field). All we want to do is to kill tsos from outside.
1395      */
1396
1397     /* ToDo: write something to the log-file
1398     if (RTSflags.ParFlags.granSimStats && !sameThread)
1399         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1400
1401     CurrentTSO = t;
1402     */
1403     /* the spark pool for the current PE */
1404     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1405
1406     IF_DEBUG(scheduler, 
1407              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1408                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1409
1410     IF_PAR_DEBUG(fish,
1411              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1412                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1413
1414     if (RtsFlags.ParFlags.ParStats.Full && 
1415         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1416         (emitSchedule || // forced emit
1417          (t && LastTSO && t->id != LastTSO->id))) {
1418       /* 
1419          we are running a different TSO, so write a schedule event to log file
1420          NB: If we use fair scheduling we also have to write  a deschedule 
1421              event for LastTSO; with unfair scheduling we know that the
1422              previous tso has blocked whenever we switch to another tso, so
1423              we don't need it in GUM for now
1424       */
1425       IF_PAR_DEBUG(fish, // schedule,
1426                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1427
1428       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1429                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1430       emitSchedule = rtsFalse;
1431     }
1432 }     
1433 #endif
1434
1435 /* ----------------------------------------------------------------------------
1436  * After running a thread...
1437  * ------------------------------------------------------------------------- */
1438
1439 static void
1440 schedulePostRunThread(void)
1441 {
1442 #if defined(PAR)
1443     /* HACK 675: if the last thread didn't yield, make sure to print a 
1444        SCHEDULE event to the log file when StgRunning the next thread, even
1445        if it is the same one as before */
1446     LastTSO = t; 
1447     TimeOfLastYield = CURRENT_TIME;
1448 #endif
1449
1450   /* some statistics gathering in the parallel case */
1451
1452 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1453   switch (ret) {
1454     case HeapOverflow:
1455 # if defined(GRAN)
1456       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1457       globalGranStats.tot_heapover++;
1458 # elif defined(PAR)
1459       globalParStats.tot_heapover++;
1460 # endif
1461       break;
1462
1463      case StackOverflow:
1464 # if defined(GRAN)
1465       IF_DEBUG(gran, 
1466                DumpGranEvent(GR_DESCHEDULE, t));
1467       globalGranStats.tot_stackover++;
1468 # elif defined(PAR)
1469       // IF_DEBUG(par, 
1470       // DumpGranEvent(GR_DESCHEDULE, t);
1471       globalParStats.tot_stackover++;
1472 # endif
1473       break;
1474
1475     case ThreadYielding:
1476 # if defined(GRAN)
1477       IF_DEBUG(gran, 
1478                DumpGranEvent(GR_DESCHEDULE, t));
1479       globalGranStats.tot_yields++;
1480 # elif defined(PAR)
1481       // IF_DEBUG(par, 
1482       // DumpGranEvent(GR_DESCHEDULE, t);
1483       globalParStats.tot_yields++;
1484 # endif
1485       break; 
1486
1487     case ThreadBlocked:
1488 # if defined(GRAN)
1489         debugTrace(DEBUG_sched, 
1490                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1491                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1492                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1493                if (t->block_info.closure!=(StgClosure*)NULL)
1494                  print_bq(t->block_info.closure);
1495                debugBelch("\n"));
1496
1497       // ??? needed; should emit block before
1498       IF_DEBUG(gran, 
1499                DumpGranEvent(GR_DESCHEDULE, t)); 
1500       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1501       /*
1502         ngoq Dogh!
1503       ASSERT(procStatus[CurrentProc]==Busy || 
1504               ((procStatus[CurrentProc]==Fetching) && 
1505               (t->block_info.closure!=(StgClosure*)NULL)));
1506       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1507           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1508             procStatus[CurrentProc]==Fetching)) 
1509         procStatus[CurrentProc] = Idle;
1510       */
1511 # elif defined(PAR)
1512 //++PAR++  blockThread() writes the event (change?)
1513 # endif
1514     break;
1515
1516   case ThreadFinished:
1517     break;
1518
1519   default:
1520     barf("parGlobalStats: unknown return code");
1521     break;
1522     }
1523 #endif
1524 }
1525
1526 /* -----------------------------------------------------------------------------
1527  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1528  * -------------------------------------------------------------------------- */
1529
1530 static rtsBool
1531 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1532 {
1533     // did the task ask for a large block?
1534     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1535         // if so, get one and push it on the front of the nursery.
1536         bdescr *bd;
1537         lnat blocks;
1538         
1539         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1540         
1541         debugTrace(DEBUG_sched,
1542                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1543                    (long)t->id, whatNext_strs[t->what_next], blocks);
1544     
1545         // don't do this if the nursery is (nearly) full, we'll GC first.
1546         if (cap->r.rCurrentNursery->link != NULL ||
1547             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1548                                                // if the nursery has only one block.
1549             
1550             ACQUIRE_SM_LOCK
1551             bd = allocGroup( blocks );
1552             RELEASE_SM_LOCK
1553             cap->r.rNursery->n_blocks += blocks;
1554             
1555             // link the new group into the list
1556             bd->link = cap->r.rCurrentNursery;
1557             bd->u.back = cap->r.rCurrentNursery->u.back;
1558             if (cap->r.rCurrentNursery->u.back != NULL) {
1559                 cap->r.rCurrentNursery->u.back->link = bd;
1560             } else {
1561 #if !defined(THREADED_RTS)
1562                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1563                        g0s0 == cap->r.rNursery);
1564 #endif
1565                 cap->r.rNursery->blocks = bd;
1566             }             
1567             cap->r.rCurrentNursery->u.back = bd;
1568             
1569             // initialise it as a nursery block.  We initialise the
1570             // step, gen_no, and flags field of *every* sub-block in
1571             // this large block, because this is easier than making
1572             // sure that we always find the block head of a large
1573             // block whenever we call Bdescr() (eg. evacuate() and
1574             // isAlive() in the GC would both have to do this, at
1575             // least).
1576             { 
1577                 bdescr *x;
1578                 for (x = bd; x < bd + blocks; x++) {
1579                     x->step = cap->r.rNursery;
1580                     x->gen_no = 0;
1581                     x->flags = 0;
1582                 }
1583             }
1584             
1585             // This assert can be a killer if the app is doing lots
1586             // of large block allocations.
1587             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1588             
1589             // now update the nursery to point to the new block
1590             cap->r.rCurrentNursery = bd;
1591             
1592             // we might be unlucky and have another thread get on the
1593             // run queue before us and steal the large block, but in that
1594             // case the thread will just end up requesting another large
1595             // block.
1596             pushOnRunQueue(cap,t);
1597             return rtsFalse;  /* not actually GC'ing */
1598         }
1599     }
1600     
1601     debugTrace(DEBUG_sched,
1602                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1603                (long)t->id, whatNext_strs[t->what_next]);
1604
1605 #if defined(GRAN)
1606     ASSERT(!is_on_queue(t,CurrentProc));
1607 #elif defined(PARALLEL_HASKELL)
1608     /* Currently we emit a DESCHEDULE event before GC in GUM.
1609        ToDo: either add separate event to distinguish SYSTEM time from rest
1610        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1611     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1612         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1613                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1614         emitSchedule = rtsTrue;
1615     }
1616 #endif
1617       
1618     pushOnRunQueue(cap,t);
1619     return rtsTrue;
1620     /* actual GC is done at the end of the while loop in schedule() */
1621 }
1622
1623 /* -----------------------------------------------------------------------------
1624  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1625  * -------------------------------------------------------------------------- */
1626
1627 static void
1628 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1629 {
1630     debugTrace (DEBUG_sched,
1631                 "--<< thread %ld (%s) stopped, StackOverflow", 
1632                 (long)t->id, whatNext_strs[t->what_next]);
1633
1634     /* just adjust the stack for this thread, then pop it back
1635      * on the run queue.
1636      */
1637     { 
1638         /* enlarge the stack */
1639         StgTSO *new_t = threadStackOverflow(cap, t);
1640         
1641         /* The TSO attached to this Task may have moved, so update the
1642          * pointer to it.
1643          */
1644         if (task->tso == t) {
1645             task->tso = new_t;
1646         }
1647         pushOnRunQueue(cap,new_t);
1648     }
1649 }
1650
1651 /* -----------------------------------------------------------------------------
1652  * Handle a thread that returned to the scheduler with ThreadYielding
1653  * -------------------------------------------------------------------------- */
1654
1655 static rtsBool
1656 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1657 {
1658     // Reset the context switch flag.  We don't do this just before
1659     // running the thread, because that would mean we would lose ticks
1660     // during GC, which can lead to unfair scheduling (a thread hogs
1661     // the CPU because the tick always arrives during GC).  This way
1662     // penalises threads that do a lot of allocation, but that seems
1663     // better than the alternative.
1664     context_switch = 0;
1665     
1666     /* put the thread back on the run queue.  Then, if we're ready to
1667      * GC, check whether this is the last task to stop.  If so, wake
1668      * up the GC thread.  getThread will block during a GC until the
1669      * GC is finished.
1670      */
1671 #ifdef DEBUG
1672     if (t->what_next != prev_what_next) {
1673         debugTrace(DEBUG_sched,
1674                    "--<< thread %ld (%s) stopped to switch evaluators", 
1675                    (long)t->id, whatNext_strs[t->what_next]);
1676     } else {
1677         debugTrace(DEBUG_sched,
1678                    "--<< thread %ld (%s) stopped, yielding",
1679                    (long)t->id, whatNext_strs[t->what_next]);
1680     }
1681 #endif
1682     
1683     IF_DEBUG(sanity,
1684              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1685              checkTSO(t));
1686     ASSERT(t->link == END_TSO_QUEUE);
1687     
1688     // Shortcut if we're just switching evaluators: don't bother
1689     // doing stack squeezing (which can be expensive), just run the
1690     // thread.
1691     if (t->what_next != prev_what_next) {
1692         return rtsTrue;
1693     }
1694     
1695 #if defined(GRAN)
1696     ASSERT(!is_on_queue(t,CurrentProc));
1697       
1698     IF_DEBUG(sanity,
1699              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1700              checkThreadQsSanity(rtsTrue));
1701
1702 #endif
1703
1704     addToRunQueue(cap,t);
1705
1706 #if defined(GRAN)
1707     /* add a ContinueThread event to actually process the thread */
1708     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1709               ContinueThread,
1710               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1711     IF_GRAN_DEBUG(bq, 
1712                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1713                   G_EVENTQ(0);
1714                   G_CURR_THREADQ(0));
1715 #endif
1716     return rtsFalse;
1717 }
1718
1719 /* -----------------------------------------------------------------------------
1720  * Handle a thread that returned to the scheduler with ThreadBlocked
1721  * -------------------------------------------------------------------------- */
1722
1723 static void
1724 scheduleHandleThreadBlocked( StgTSO *t
1725 #if !defined(GRAN) && !defined(DEBUG)
1726     STG_UNUSED
1727 #endif
1728     )
1729 {
1730 #if defined(GRAN)
1731     IF_DEBUG(scheduler,
1732              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1733                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1734              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1735     
1736     // ??? needed; should emit block before
1737     IF_DEBUG(gran, 
1738              DumpGranEvent(GR_DESCHEDULE, t)); 
1739     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1740     /*
1741       ngoq Dogh!
1742       ASSERT(procStatus[CurrentProc]==Busy || 
1743       ((procStatus[CurrentProc]==Fetching) && 
1744       (t->block_info.closure!=(StgClosure*)NULL)));
1745       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1746       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1747       procStatus[CurrentProc]==Fetching)) 
1748       procStatus[CurrentProc] = Idle;
1749     */
1750 #elif defined(PAR)
1751     IF_DEBUG(scheduler,
1752              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1753                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1754     IF_PAR_DEBUG(bq,
1755                  
1756                  if (t->block_info.closure!=(StgClosure*)NULL) 
1757                  print_bq(t->block_info.closure));
1758     
1759     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1760     blockThread(t);
1761     
1762     /* whatever we schedule next, we must log that schedule */
1763     emitSchedule = rtsTrue;
1764     
1765 #else /* !GRAN */
1766
1767       // We don't need to do anything.  The thread is blocked, and it
1768       // has tidied up its stack and placed itself on whatever queue
1769       // it needs to be on.
1770
1771 #if !defined(THREADED_RTS)
1772     ASSERT(t->why_blocked != NotBlocked);
1773              // This might not be true under THREADED_RTS: we don't have
1774              // exclusive access to this TSO, so someone might have
1775              // woken it up by now.  This actually happens: try
1776              // conc023 +RTS -N2.
1777 #endif
1778
1779 #ifdef DEBUG
1780     if (traceClass(DEBUG_sched)) {
1781         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1782                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1783         printThreadBlockage(t);
1784         debugTraceEnd();
1785     }
1786 #endif
1787     
1788     /* Only for dumping event to log file 
1789        ToDo: do I need this in GranSim, too?
1790        blockThread(t);
1791     */
1792 #endif
1793 }
1794
1795 /* -----------------------------------------------------------------------------
1796  * Handle a thread that returned to the scheduler with ThreadFinished
1797  * -------------------------------------------------------------------------- */
1798
1799 static rtsBool
1800 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1801 {
1802     /* Need to check whether this was a main thread, and if so,
1803      * return with the return value.
1804      *
1805      * We also end up here if the thread kills itself with an
1806      * uncaught exception, see Exception.cmm.
1807      */
1808     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1809                (unsigned long)t->id, whatNext_strs[t->what_next]);
1810
1811     /* Inform the Hpc that a thread has finished */
1812     hs_hpc_event("Thread Finished",t);
1813
1814 #if defined(GRAN)
1815       endThread(t, CurrentProc); // clean-up the thread
1816 #elif defined(PARALLEL_HASKELL)
1817       /* For now all are advisory -- HWL */
1818       //if(t->priority==AdvisoryPriority) ??
1819       advisory_thread_count--; // JB: Caution with this counter, buggy!
1820       
1821 # if defined(DIST)
1822       if(t->dist.priority==RevalPriority)
1823         FinishReval(t);
1824 # endif
1825     
1826 # if defined(EDENOLD)
1827       // the thread could still have an outport... (BUG)
1828       if (t->eden.outport != -1) {
1829       // delete the outport for the tso which has finished...
1830         IF_PAR_DEBUG(eden_ports,
1831                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1832                               t->eden.outport, t->id));
1833         deleteOPT(t);
1834       }
1835       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1836       if (t->eden.epid != -1) {
1837         IF_PAR_DEBUG(eden_ports,
1838                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1839                            t->id, t->eden.epid));
1840         removeTSOfromProcess(t);
1841       }
1842 # endif 
1843
1844 # if defined(PAR)
1845       if (RtsFlags.ParFlags.ParStats.Full &&
1846           !RtsFlags.ParFlags.ParStats.Suppressed) 
1847         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1848
1849       //  t->par only contains statistics: left out for now...
1850       IF_PAR_DEBUG(fish,
1851                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1852                               t->id,t,t->par.sparkname));
1853 # endif
1854 #endif // PARALLEL_HASKELL
1855
1856       //
1857       // Check whether the thread that just completed was a bound
1858       // thread, and if so return with the result.  
1859       //
1860       // There is an assumption here that all thread completion goes
1861       // through this point; we need to make sure that if a thread
1862       // ends up in the ThreadKilled state, that it stays on the run
1863       // queue so it can be dealt with here.
1864       //
1865
1866       if (t->bound) {
1867
1868           if (t->bound != task) {
1869 #if !defined(THREADED_RTS)
1870               // Must be a bound thread that is not the topmost one.  Leave
1871               // it on the run queue until the stack has unwound to the
1872               // point where we can deal with this.  Leaving it on the run
1873               // queue also ensures that the garbage collector knows about
1874               // this thread and its return value (it gets dropped from the
1875               // all_threads list so there's no other way to find it).
1876               appendToRunQueue(cap,t);
1877               return rtsFalse;
1878 #else
1879               // this cannot happen in the threaded RTS, because a
1880               // bound thread can only be run by the appropriate Task.
1881               barf("finished bound thread that isn't mine");
1882 #endif
1883           }
1884
1885           ASSERT(task->tso == t);
1886
1887           if (t->what_next == ThreadComplete) {
1888               if (task->ret) {
1889                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1890                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1891               }
1892               task->stat = Success;
1893           } else {
1894               if (task->ret) {
1895                   *(task->ret) = NULL;
1896               }
1897               if (sched_state >= SCHED_INTERRUPTING) {
1898                   task->stat = Interrupted;
1899               } else {
1900                   task->stat = Killed;
1901               }
1902           }
1903 #ifdef DEBUG
1904           removeThreadLabel((StgWord)task->tso->id);
1905 #endif
1906           return rtsTrue; // tells schedule() to return
1907       }
1908
1909       return rtsFalse;
1910 }
1911
1912 /* -----------------------------------------------------------------------------
1913  * Perform a heap census, if PROFILING
1914  * -------------------------------------------------------------------------- */
1915
1916 static rtsBool
1917 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1918 {
1919 #if defined(PROFILING)
1920     // When we have +RTS -i0 and we're heap profiling, do a census at
1921     // every GC.  This lets us get repeatable runs for debugging.
1922     if (performHeapProfile ||
1923         (RtsFlags.ProfFlags.profileInterval==0 &&
1924          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1925
1926         // checking black holes is necessary before GC, otherwise
1927         // there may be threads that are unreachable except by the
1928         // blackhole queue, which the GC will consider to be
1929         // deadlocked.
1930         scheduleCheckBlackHoles(&MainCapability);
1931
1932         debugTrace(DEBUG_sched, "garbage collecting before heap census");
1933         GarbageCollect(rtsTrue);
1934
1935         debugTrace(DEBUG_sched, "performing heap census");
1936         heapCensus();
1937
1938         performHeapProfile = rtsFalse;
1939         return rtsTrue;  // true <=> we already GC'd
1940     }
1941 #endif
1942     return rtsFalse;
1943 }
1944
1945 /* -----------------------------------------------------------------------------
1946  * Perform a garbage collection if necessary
1947  * -------------------------------------------------------------------------- */
1948
1949 static Capability *
1950 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1951 {
1952     StgTSO *t;
1953 #ifdef THREADED_RTS
1954     static volatile StgWord waiting_for_gc;
1955     rtsBool was_waiting;
1956     nat i;
1957 #endif
1958
1959 #ifdef THREADED_RTS
1960     // In order to GC, there must be no threads running Haskell code.
1961     // Therefore, the GC thread needs to hold *all* the capabilities,
1962     // and release them after the GC has completed.  
1963     //
1964     // This seems to be the simplest way: previous attempts involved
1965     // making all the threads with capabilities give up their
1966     // capabilities and sleep except for the *last* one, which
1967     // actually did the GC.  But it's quite hard to arrange for all
1968     // the other tasks to sleep and stay asleep.
1969     //
1970         
1971     was_waiting = cas(&waiting_for_gc, 0, 1);
1972     if (was_waiting) {
1973         do {
1974             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1975             if (cap) yieldCapability(&cap,task);
1976         } while (waiting_for_gc);
1977         return cap;  // NOTE: task->cap might have changed here
1978     }
1979
1980     for (i=0; i < n_capabilities; i++) {
1981         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1982         if (cap != &capabilities[i]) {
1983             Capability *pcap = &capabilities[i];
1984             // we better hope this task doesn't get migrated to
1985             // another Capability while we're waiting for this one.
1986             // It won't, because load balancing happens while we have
1987             // all the Capabilities, but even so it's a slightly
1988             // unsavoury invariant.
1989             task->cap = pcap;
1990             context_switch = 1;
1991             waitForReturnCapability(&pcap, task);
1992             if (pcap != &capabilities[i]) {
1993                 barf("scheduleDoGC: got the wrong capability");
1994             }
1995         }
1996     }
1997
1998     waiting_for_gc = rtsFalse;
1999 #endif
2000
2001     /* Kick any transactions which are invalid back to their
2002      * atomically frames.  When next scheduled they will try to
2003      * commit, this commit will fail and they will retry.
2004      */
2005     { 
2006         StgTSO *next;
2007
2008         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2009             if (t->what_next == ThreadRelocated) {
2010                 next = t->link;
2011             } else {
2012                 next = t->global_link;
2013                 
2014                 // This is a good place to check for blocked
2015                 // exceptions.  It might be the case that a thread is
2016                 // blocked on delivering an exception to a thread that
2017                 // is also blocked - we try to ensure that this
2018                 // doesn't happen in throwTo(), but it's too hard (or
2019                 // impossible) to close all the race holes, so we
2020                 // accept that some might get through and deal with
2021                 // them here.  A GC will always happen at some point,
2022                 // even if the system is otherwise deadlocked.
2023                 maybePerformBlockedException (&capabilities[0], t);
2024
2025                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2026                     if (!stmValidateNestOfTransactions (t -> trec)) {
2027                         debugTrace(DEBUG_sched | DEBUG_stm,
2028                                    "trec %p found wasting its time", t);
2029                         
2030                         // strip the stack back to the
2031                         // ATOMICALLY_FRAME, aborting the (nested)
2032                         // transaction, and saving the stack of any
2033                         // partially-evaluated thunks on the heap.
2034                         throwToSingleThreaded_(&capabilities[0], t, 
2035                                                NULL, rtsTrue, NULL);
2036                         
2037 #ifdef REG_R1
2038                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2039 #endif
2040                     }
2041                 }
2042             }
2043         }
2044     }
2045     
2046     // so this happens periodically:
2047     if (cap) scheduleCheckBlackHoles(cap);
2048     
2049     IF_DEBUG(scheduler, printAllThreads());
2050
2051     /*
2052      * We now have all the capabilities; if we're in an interrupting
2053      * state, then we should take the opportunity to delete all the
2054      * threads in the system.
2055      */
2056     if (sched_state >= SCHED_INTERRUPTING) {
2057         deleteAllThreads(&capabilities[0]);
2058         sched_state = SCHED_SHUTTING_DOWN;
2059     }
2060
2061     /* everybody back, start the GC.
2062      * Could do it in this thread, or signal a condition var
2063      * to do it in another thread.  Either way, we need to
2064      * broadcast on gc_pending_cond afterward.
2065      */
2066 #if defined(THREADED_RTS)
2067     debugTrace(DEBUG_sched, "doing GC");
2068 #endif
2069     GarbageCollect(force_major);
2070     
2071 #if defined(THREADED_RTS)
2072     // release our stash of capabilities.
2073     for (i = 0; i < n_capabilities; i++) {
2074         if (cap != &capabilities[i]) {
2075             task->cap = &capabilities[i];
2076             releaseCapability(&capabilities[i]);
2077         }
2078     }
2079     if (cap) {
2080         task->cap = cap;
2081     } else {
2082         task->cap = NULL;
2083     }
2084 #endif
2085
2086 #if defined(GRAN)
2087     /* add a ContinueThread event to continue execution of current thread */
2088     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2089               ContinueThread,
2090               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2091     IF_GRAN_DEBUG(bq, 
2092                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2093                   G_EVENTQ(0);
2094                   G_CURR_THREADQ(0));
2095 #endif /* GRAN */
2096
2097     return cap;
2098 }
2099
2100 /* ---------------------------------------------------------------------------
2101  * Singleton fork(). Do not copy any running threads.
2102  * ------------------------------------------------------------------------- */
2103
2104 StgInt
2105 forkProcess(HsStablePtr *entry
2106 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2107             STG_UNUSED
2108 #endif
2109            )
2110 {
2111 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2112     Task *task;
2113     pid_t pid;
2114     StgTSO* t,*next;
2115     Capability *cap;
2116     
2117 #if defined(THREADED_RTS)
2118     if (RtsFlags.ParFlags.nNodes > 1) {
2119         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2120         stg_exit(EXIT_FAILURE);
2121     }
2122 #endif
2123
2124     debugTrace(DEBUG_sched, "forking!");
2125     
2126     // ToDo: for SMP, we should probably acquire *all* the capabilities
2127     cap = rts_lock();
2128     
2129     pid = fork();
2130     
2131     if (pid) { // parent
2132         
2133         // just return the pid
2134         rts_unlock(cap);
2135         return pid;
2136         
2137     } else { // child
2138         
2139         // Now, all OS threads except the thread that forked are
2140         // stopped.  We need to stop all Haskell threads, including
2141         // those involved in foreign calls.  Also we need to delete
2142         // all Tasks, because they correspond to OS threads that are
2143         // now gone.
2144
2145         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2146             if (t->what_next == ThreadRelocated) {
2147                 next = t->link;
2148             } else {
2149                 next = t->global_link;
2150                 // don't allow threads to catch the ThreadKilled
2151                 // exception, but we do want to raiseAsync() because these
2152                 // threads may be evaluating thunks that we need later.
2153                 deleteThread_(cap,t);
2154             }
2155         }
2156         
2157         // Empty the run queue.  It seems tempting to let all the
2158         // killed threads stay on the run queue as zombies to be
2159         // cleaned up later, but some of them correspond to bound
2160         // threads for which the corresponding Task does not exist.
2161         cap->run_queue_hd = END_TSO_QUEUE;
2162         cap->run_queue_tl = END_TSO_QUEUE;
2163
2164         // Any suspended C-calling Tasks are no more, their OS threads
2165         // don't exist now:
2166         cap->suspended_ccalling_tasks = NULL;
2167
2168         // Empty the all_threads list.  Otherwise, the garbage
2169         // collector may attempt to resurrect some of these threads.
2170         all_threads = END_TSO_QUEUE;
2171
2172         // Wipe the task list, except the current Task.
2173         ACQUIRE_LOCK(&sched_mutex);
2174         for (task = all_tasks; task != NULL; task=task->all_link) {
2175             if (task != cap->running_task) {
2176                 discardTask(task);
2177             }
2178         }
2179         RELEASE_LOCK(&sched_mutex);
2180
2181 #if defined(THREADED_RTS)
2182         // Wipe our spare workers list, they no longer exist.  New
2183         // workers will be created if necessary.
2184         cap->spare_workers = NULL;
2185         cap->returning_tasks_hd = NULL;
2186         cap->returning_tasks_tl = NULL;
2187 #endif
2188
2189         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2190         rts_checkSchedStatus("forkProcess",cap);
2191         
2192         rts_unlock(cap);
2193         hs_exit();                      // clean up and exit
2194         stg_exit(EXIT_SUCCESS);
2195     }
2196 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2197     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2198     return -1;
2199 #endif
2200 }
2201
2202 /* ---------------------------------------------------------------------------
2203  * Delete all the threads in the system
2204  * ------------------------------------------------------------------------- */
2205    
2206 static void
2207 deleteAllThreads ( Capability *cap )
2208 {
2209     // NOTE: only safe to call if we own all capabilities.
2210
2211     StgTSO* t, *next;
2212     debugTrace(DEBUG_sched,"deleting all threads");
2213     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2214         if (t->what_next == ThreadRelocated) {
2215             next = t->link;
2216         } else {
2217             next = t->global_link;
2218             deleteThread(cap,t);
2219         }
2220     }      
2221
2222     // The run queue now contains a bunch of ThreadKilled threads.  We
2223     // must not throw these away: the main thread(s) will be in there
2224     // somewhere, and the main scheduler loop has to deal with it.
2225     // Also, the run queue is the only thing keeping these threads from
2226     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2227
2228 #if !defined(THREADED_RTS)
2229     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2230     ASSERT(sleeping_queue == END_TSO_QUEUE);
2231 #endif
2232 }
2233
2234 /* -----------------------------------------------------------------------------
2235    Managing the suspended_ccalling_tasks list.
2236    Locks required: sched_mutex
2237    -------------------------------------------------------------------------- */
2238
2239 STATIC_INLINE void
2240 suspendTask (Capability *cap, Task *task)
2241 {
2242     ASSERT(task->next == NULL && task->prev == NULL);
2243     task->next = cap->suspended_ccalling_tasks;
2244     task->prev = NULL;
2245     if (cap->suspended_ccalling_tasks) {
2246         cap->suspended_ccalling_tasks->prev = task;
2247     }
2248     cap->suspended_ccalling_tasks = task;
2249 }
2250
2251 STATIC_INLINE void
2252 recoverSuspendedTask (Capability *cap, Task *task)
2253 {
2254     if (task->prev) {
2255         task->prev->next = task->next;
2256     } else {
2257         ASSERT(cap->suspended_ccalling_tasks == task);
2258         cap->suspended_ccalling_tasks = task->next;
2259     }
2260     if (task->next) {
2261         task->next->prev = task->prev;
2262     }
2263     task->next = task->prev = NULL;
2264 }
2265
2266 /* ---------------------------------------------------------------------------
2267  * Suspending & resuming Haskell threads.
2268  * 
2269  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2270  * its capability before calling the C function.  This allows another
2271  * task to pick up the capability and carry on running Haskell
2272  * threads.  It also means that if the C call blocks, it won't lock
2273  * the whole system.
2274  *
2275  * The Haskell thread making the C call is put to sleep for the
2276  * duration of the call, on the susepended_ccalling_threads queue.  We
2277  * give out a token to the task, which it can use to resume the thread
2278  * on return from the C function.
2279  * ------------------------------------------------------------------------- */
2280    
2281 void *
2282 suspendThread (StgRegTable *reg)
2283 {
2284   Capability *cap;
2285   int saved_errno = errno;
2286   StgTSO *tso;
2287   Task *task;
2288
2289   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2290    */
2291   cap = regTableToCapability(reg);
2292
2293   task = cap->running_task;
2294   tso = cap->r.rCurrentTSO;
2295
2296   debugTrace(DEBUG_sched, 
2297              "thread %lu did a safe foreign call", 
2298              (unsigned long)cap->r.rCurrentTSO->id);
2299
2300   // XXX this might not be necessary --SDM
2301   tso->what_next = ThreadRunGHC;
2302
2303   threadPaused(cap,tso);
2304
2305   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2306       tso->why_blocked = BlockedOnCCall;
2307       tso->flags |= TSO_BLOCKEX;
2308       tso->flags &= ~TSO_INTERRUPTIBLE;
2309   } else {
2310       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2311   }
2312
2313   // Hand back capability
2314   task->suspended_tso = tso;
2315
2316   ACQUIRE_LOCK(&cap->lock);
2317
2318   suspendTask(cap,task);
2319   cap->in_haskell = rtsFalse;
2320   releaseCapability_(cap);
2321   
2322   RELEASE_LOCK(&cap->lock);
2323
2324 #if defined(THREADED_RTS)
2325   /* Preparing to leave the RTS, so ensure there's a native thread/task
2326      waiting to take over.
2327   */
2328   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2329 #endif
2330
2331   errno = saved_errno;
2332   return task;
2333 }
2334
2335 StgRegTable *
2336 resumeThread (void *task_)
2337 {
2338     StgTSO *tso;
2339     Capability *cap;
2340     int saved_errno = errno;
2341     Task *task = task_;
2342
2343     cap = task->cap;
2344     // Wait for permission to re-enter the RTS with the result.
2345     waitForReturnCapability(&cap,task);
2346     // we might be on a different capability now... but if so, our
2347     // entry on the suspended_ccalling_tasks list will also have been
2348     // migrated.
2349
2350     // Remove the thread from the suspended list
2351     recoverSuspendedTask(cap,task);
2352
2353     tso = task->suspended_tso;
2354     task->suspended_tso = NULL;
2355     tso->link = END_TSO_QUEUE;
2356     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2357     
2358     if (tso->why_blocked == BlockedOnCCall) {
2359         awakenBlockedExceptionQueue(cap,tso);
2360         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2361     }
2362     
2363     /* Reset blocking status */
2364     tso->why_blocked  = NotBlocked;
2365     
2366     cap->r.rCurrentTSO = tso;
2367     cap->in_haskell = rtsTrue;
2368     errno = saved_errno;
2369
2370     /* We might have GC'd, mark the TSO dirty again */
2371     dirtyTSO(tso);
2372
2373     IF_DEBUG(sanity, checkTSO(tso));
2374
2375     return &cap->r;
2376 }
2377
2378 /* ---------------------------------------------------------------------------
2379  * scheduleThread()
2380  *
2381  * scheduleThread puts a thread on the end  of the runnable queue.
2382  * This will usually be done immediately after a thread is created.
2383  * The caller of scheduleThread must create the thread using e.g.
2384  * createThread and push an appropriate closure
2385  * on this thread's stack before the scheduler is invoked.
2386  * ------------------------------------------------------------------------ */
2387
2388 void
2389 scheduleThread(Capability *cap, StgTSO *tso)
2390 {
2391     // The thread goes at the *end* of the run-queue, to avoid possible
2392     // starvation of any threads already on the queue.
2393     appendToRunQueue(cap,tso);
2394 }
2395
2396 void
2397 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2398 {
2399 #if defined(THREADED_RTS)
2400     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2401                               // move this thread from now on.
2402     cpu %= RtsFlags.ParFlags.nNodes;
2403     if (cpu == cap->no) {
2404         appendToRunQueue(cap,tso);
2405     } else {
2406         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2407     }
2408 #else
2409     appendToRunQueue(cap,tso);
2410 #endif
2411 }
2412
2413 Capability *
2414 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2415 {
2416     Task *task;
2417
2418     // We already created/initialised the Task
2419     task = cap->running_task;
2420
2421     // This TSO is now a bound thread; make the Task and TSO
2422     // point to each other.
2423     tso->bound = task;
2424     tso->cap = cap;
2425
2426     task->tso = tso;
2427     task->ret = ret;
2428     task->stat = NoStatus;
2429
2430     appendToRunQueue(cap,tso);
2431
2432     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2433
2434 #if defined(GRAN)
2435     /* GranSim specific init */
2436     CurrentTSO = m->tso;                // the TSO to run
2437     procStatus[MainProc] = Busy;        // status of main PE
2438     CurrentProc = MainProc;             // PE to run it on
2439 #endif
2440
2441     cap = schedule(cap,task);
2442
2443     ASSERT(task->stat != NoStatus);
2444     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2445
2446     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2447     return cap;
2448 }
2449
2450 /* ----------------------------------------------------------------------------
2451  * Starting Tasks
2452  * ------------------------------------------------------------------------- */
2453
2454 #if defined(THREADED_RTS)
2455 void
2456 workerStart(Task *task)
2457 {
2458     Capability *cap;
2459
2460     // See startWorkerTask().
2461     ACQUIRE_LOCK(&task->lock);
2462     cap = task->cap;
2463     RELEASE_LOCK(&task->lock);
2464
2465     // set the thread-local pointer to the Task:
2466     taskEnter(task);
2467
2468     // schedule() runs without a lock.
2469     cap = schedule(cap,task);
2470
2471     // On exit from schedule(), we have a Capability.
2472     releaseCapability(cap);
2473     workerTaskStop(task);
2474 }
2475 #endif
2476
2477 /* ---------------------------------------------------------------------------
2478  * initScheduler()
2479  *
2480  * Initialise the scheduler.  This resets all the queues - if the
2481  * queues contained any threads, they'll be garbage collected at the
2482  * next pass.
2483  *
2484  * ------------------------------------------------------------------------ */
2485
2486 void 
2487 initScheduler(void)
2488 {
2489 #if defined(GRAN)
2490   nat i;
2491   for (i=0; i<=MAX_PROC; i++) {
2492     run_queue_hds[i]      = END_TSO_QUEUE;
2493     run_queue_tls[i]      = END_TSO_QUEUE;
2494     blocked_queue_hds[i]  = END_TSO_QUEUE;
2495     blocked_queue_tls[i]  = END_TSO_QUEUE;
2496     ccalling_threadss[i]  = END_TSO_QUEUE;
2497     blackhole_queue[i]    = END_TSO_QUEUE;
2498     sleeping_queue        = END_TSO_QUEUE;
2499   }
2500 #elif !defined(THREADED_RTS)
2501   blocked_queue_hd  = END_TSO_QUEUE;
2502   blocked_queue_tl  = END_TSO_QUEUE;
2503   sleeping_queue    = END_TSO_QUEUE;
2504 #endif
2505
2506   blackhole_queue   = END_TSO_QUEUE;
2507   all_threads       = END_TSO_QUEUE;
2508
2509   context_switch = 0;
2510   sched_state    = SCHED_RUNNING;
2511
2512 #if defined(THREADED_RTS)
2513   /* Initialise the mutex and condition variables used by
2514    * the scheduler. */
2515   initMutex(&sched_mutex);
2516 #endif
2517   
2518   ACQUIRE_LOCK(&sched_mutex);
2519
2520   /* A capability holds the state a native thread needs in
2521    * order to execute STG code. At least one capability is
2522    * floating around (only THREADED_RTS builds have more than one).
2523    */
2524   initCapabilities();
2525
2526   initTaskManager();
2527
2528 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2529   initSparkPools();
2530 #endif
2531
2532 #if defined(THREADED_RTS)
2533   /*
2534    * Eagerly start one worker to run each Capability, except for
2535    * Capability 0.  The idea is that we're probably going to start a
2536    * bound thread on Capability 0 pretty soon, so we don't want a
2537    * worker task hogging it.
2538    */
2539   { 
2540       nat i;
2541       Capability *cap;
2542       for (i = 1; i < n_capabilities; i++) {
2543           cap = &capabilities[i];
2544           ACQUIRE_LOCK(&cap->lock);
2545           startWorkerTask(cap, workerStart);
2546           RELEASE_LOCK(&cap->lock);
2547       }
2548   }
2549 #endif
2550
2551   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2552
2553   RELEASE_LOCK(&sched_mutex);
2554 }
2555
2556 void
2557 exitScheduler( void )
2558 {
2559     Task *task = NULL;
2560
2561 #if defined(THREADED_RTS)
2562     ACQUIRE_LOCK(&sched_mutex);
2563     task = newBoundTask();
2564     RELEASE_LOCK(&sched_mutex);
2565 #endif
2566
2567     // If we haven't killed all the threads yet, do it now.
2568     if (sched_state < SCHED_SHUTTING_DOWN) {
2569         sched_state = SCHED_INTERRUPTING;
2570         scheduleDoGC(NULL,task,rtsFalse);    
2571     }
2572     sched_state = SCHED_SHUTTING_DOWN;
2573
2574 #if defined(THREADED_RTS)
2575     { 
2576         nat i;
2577         
2578         for (i = 0; i < n_capabilities; i++) {
2579             shutdownCapability(&capabilities[i], task);
2580         }
2581         boundTaskExiting(task);
2582         stopTaskManager();
2583     }
2584 #else
2585     freeCapability(&MainCapability);
2586 #endif
2587 }
2588
2589 void
2590 freeScheduler( void )
2591 {
2592     freeTaskManager();
2593     if (n_capabilities != 1) {
2594         stgFree(capabilities);
2595     }
2596 #if defined(THREADED_RTS)
2597     closeMutex(&sched_mutex);
2598 #endif
2599 }
2600
2601 /* ---------------------------------------------------------------------------
2602    Where are the roots that we know about?
2603
2604         - all the threads on the runnable queue
2605         - all the threads on the blocked queue
2606         - all the threads on the sleeping queue
2607         - all the thread currently executing a _ccall_GC
2608         - all the "main threads"
2609      
2610    ------------------------------------------------------------------------ */
2611
2612 /* This has to be protected either by the scheduler monitor, or by the
2613         garbage collection monitor (probably the latter).
2614         KH @ 25/10/99
2615 */
2616
2617 void
2618 GetRoots( evac_fn evac )
2619 {
2620     nat i;
2621     Capability *cap;
2622     Task *task;
2623
2624 #if defined(GRAN)
2625     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2626         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2627             evac((StgClosure **)&run_queue_hds[i]);
2628         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2629             evac((StgClosure **)&run_queue_tls[i]);
2630         
2631         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2632             evac((StgClosure **)&blocked_queue_hds[i]);
2633         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2634             evac((StgClosure **)&blocked_queue_tls[i]);
2635         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2636             evac((StgClosure **)&ccalling_threads[i]);
2637     }
2638
2639     markEventQueue();
2640
2641 #else /* !GRAN */
2642
2643     for (i = 0; i < n_capabilities; i++) {
2644         cap = &capabilities[i];
2645         evac((StgClosure **)(void *)&cap->run_queue_hd);
2646         evac((StgClosure **)(void *)&cap->run_queue_tl);
2647 #if defined(THREADED_RTS)
2648         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2649         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2650 #endif
2651         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2652              task=task->next) {
2653             debugTrace(DEBUG_sched,
2654                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2655             evac((StgClosure **)(void *)&task->suspended_tso);
2656         }
2657
2658     }
2659     
2660
2661 #if !defined(THREADED_RTS)
2662     evac((StgClosure **)(void *)&blocked_queue_hd);
2663     evac((StgClosure **)(void *)&blocked_queue_tl);
2664     evac((StgClosure **)(void *)&sleeping_queue);
2665 #endif 
2666 #endif
2667
2668     // evac((StgClosure **)&blackhole_queue);
2669
2670 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2671     markSparkQueue(evac);
2672 #endif
2673     
2674 #if defined(RTS_USER_SIGNALS)
2675     // mark the signal handlers (signals should be already blocked)
2676     markSignalHandlers(evac);
2677 #endif
2678 }
2679
2680 /* -----------------------------------------------------------------------------
2681    performGC
2682
2683    This is the interface to the garbage collector from Haskell land.
2684    We provide this so that external C code can allocate and garbage
2685    collect when called from Haskell via _ccall_GC.
2686    -------------------------------------------------------------------------- */
2687
2688 static void
2689 performGC_(rtsBool force_major)
2690 {
2691     Task *task;
2692     // We must grab a new Task here, because the existing Task may be
2693     // associated with a particular Capability, and chained onto the 
2694     // suspended_ccalling_tasks queue.
2695     ACQUIRE_LOCK(&sched_mutex);
2696     task = newBoundTask();
2697     RELEASE_LOCK(&sched_mutex);
2698     scheduleDoGC(NULL,task,force_major);
2699     boundTaskExiting(task);
2700 }
2701
2702 void
2703 performGC(void)
2704 {
2705     performGC_(rtsFalse);
2706 }
2707
2708 void
2709 performMajorGC(void)
2710 {
2711     performGC_(rtsTrue);
2712 }
2713
2714 /* -----------------------------------------------------------------------------
2715    Stack overflow
2716
2717    If the thread has reached its maximum stack size, then raise the
2718    StackOverflow exception in the offending thread.  Otherwise
2719    relocate the TSO into a larger chunk of memory and adjust its stack
2720    size appropriately.
2721    -------------------------------------------------------------------------- */
2722
2723 static StgTSO *
2724 threadStackOverflow(Capability *cap, StgTSO *tso)
2725 {
2726   nat new_stack_size, stack_words;
2727   lnat new_tso_size;
2728   StgPtr new_sp;
2729   StgTSO *dest;
2730
2731   IF_DEBUG(sanity,checkTSO(tso));
2732
2733   // don't allow throwTo() to modify the blocked_exceptions queue
2734   // while we are moving the TSO:
2735   lockClosure((StgClosure *)tso);
2736
2737   if (tso->stack_size >= tso->max_stack_size) {
2738
2739       debugTrace(DEBUG_gc,
2740                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2741                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2742       IF_DEBUG(gc,
2743                /* If we're debugging, just print out the top of the stack */
2744                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2745                                                 tso->sp+64)));
2746
2747       // Send this thread the StackOverflow exception
2748       unlockTSO(tso);
2749       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2750       return tso;
2751   }
2752
2753   /* Try to double the current stack size.  If that takes us over the
2754    * maximum stack size for this thread, then use the maximum instead.
2755    * Finally round up so the TSO ends up as a whole number of blocks.
2756    */
2757   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2758   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2759                                        TSO_STRUCT_SIZE)/sizeof(W_);
2760   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2761   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2762
2763   debugTrace(DEBUG_sched, 
2764              "increasing stack size from %ld words to %d.",
2765              (long)tso->stack_size, new_stack_size);
2766
2767   dest = (StgTSO *)allocate(new_tso_size);
2768   TICK_ALLOC_TSO(new_stack_size,0);
2769
2770   /* copy the TSO block and the old stack into the new area */
2771   memcpy(dest,tso,TSO_STRUCT_SIZE);
2772   stack_words = tso->stack + tso->stack_size - tso->sp;
2773   new_sp = (P_)dest + new_tso_size - stack_words;
2774   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2775
2776   /* relocate the stack pointers... */
2777   dest->sp         = new_sp;
2778   dest->stack_size = new_stack_size;
2779         
2780   /* Mark the old TSO as relocated.  We have to check for relocated
2781    * TSOs in the garbage collector and any primops that deal with TSOs.
2782    *
2783    * It's important to set the sp value to just beyond the end
2784    * of the stack, so we don't attempt to scavenge any part of the
2785    * dead TSO's stack.
2786    */
2787   tso->what_next = ThreadRelocated;
2788   tso->link = dest;
2789   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2790   tso->why_blocked = NotBlocked;
2791
2792   IF_PAR_DEBUG(verbose,
2793                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2794                      tso->id, tso, tso->stack_size);
2795                /* If we're debugging, just print out the top of the stack */
2796                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2797                                                 tso->sp+64)));
2798   
2799   unlockTSO(dest);
2800   unlockTSO(tso);
2801
2802   IF_DEBUG(sanity,checkTSO(dest));
2803 #if 0
2804   IF_DEBUG(scheduler,printTSO(dest));
2805 #endif
2806
2807   return dest;
2808 }
2809
2810 /* ---------------------------------------------------------------------------
2811    Interrupt execution
2812    - usually called inside a signal handler so it mustn't do anything fancy.   
2813    ------------------------------------------------------------------------ */
2814
2815 void
2816 interruptStgRts(void)
2817 {
2818     sched_state = SCHED_INTERRUPTING;
2819     context_switch = 1;
2820     wakeUpRts();
2821 }
2822
2823 /* -----------------------------------------------------------------------------
2824    Wake up the RTS
2825    
2826    This function causes at least one OS thread to wake up and run the
2827    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2828    an external event has arrived that may need servicing (eg. a
2829    keyboard interrupt).
2830
2831    In the single-threaded RTS we don't do anything here; we only have
2832    one thread anyway, and the event that caused us to want to wake up
2833    will have interrupted any blocking system call in progress anyway.
2834    -------------------------------------------------------------------------- */
2835
2836 void
2837 wakeUpRts(void)
2838 {
2839 #if defined(THREADED_RTS)
2840     // This forces the IO Manager thread to wakeup, which will
2841     // in turn ensure that some OS thread wakes up and runs the
2842     // scheduler loop, which will cause a GC and deadlock check.
2843     ioManagerWakeup();
2844 #endif
2845 }
2846
2847 /* -----------------------------------------------------------------------------
2848  * checkBlackHoles()
2849  *
2850  * Check the blackhole_queue for threads that can be woken up.  We do
2851  * this periodically: before every GC, and whenever the run queue is
2852  * empty.
2853  *
2854  * An elegant solution might be to just wake up all the blocked
2855  * threads with awakenBlockedQueue occasionally: they'll go back to
2856  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2857  * doesn't give us a way to tell whether we've actually managed to
2858  * wake up any threads, so we would be busy-waiting.
2859  *
2860  * -------------------------------------------------------------------------- */
2861
2862 static rtsBool
2863 checkBlackHoles (Capability *cap)
2864 {
2865     StgTSO **prev, *t;
2866     rtsBool any_woke_up = rtsFalse;
2867     StgHalfWord type;
2868
2869     // blackhole_queue is global:
2870     ASSERT_LOCK_HELD(&sched_mutex);
2871
2872     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2873
2874     // ASSUMES: sched_mutex
2875     prev = &blackhole_queue;
2876     t = blackhole_queue;
2877     while (t != END_TSO_QUEUE) {
2878         ASSERT(t->why_blocked == BlockedOnBlackHole);
2879         type = get_itbl(t->block_info.closure)->type;
2880         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2881             IF_DEBUG(sanity,checkTSO(t));
2882             t = unblockOne(cap, t);
2883             // urk, the threads migrate to the current capability
2884             // here, but we'd like to keep them on the original one.
2885             *prev = t;
2886             any_woke_up = rtsTrue;
2887         } else {
2888             prev = &t->link;
2889             t = t->link;
2890         }
2891     }
2892
2893     return any_woke_up;
2894 }
2895
2896 /* -----------------------------------------------------------------------------
2897    Deleting threads
2898
2899    This is used for interruption (^C) and forking, and corresponds to
2900    raising an exception but without letting the thread catch the
2901    exception.
2902    -------------------------------------------------------------------------- */
2903
2904 static void 
2905 deleteThread (Capability *cap, StgTSO *tso)
2906 {
2907     // NOTE: must only be called on a TSO that we have exclusive
2908     // access to, because we will call throwToSingleThreaded() below.
2909     // The TSO must be on the run queue of the Capability we own, or 
2910     // we must own all Capabilities.
2911
2912     if (tso->why_blocked != BlockedOnCCall &&
2913         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2914         throwToSingleThreaded(cap,tso,NULL);
2915     }
2916 }
2917
2918 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2919 static void 
2920 deleteThread_(Capability *cap, StgTSO *tso)
2921 { // for forkProcess only:
2922   // like deleteThread(), but we delete threads in foreign calls, too.
2923
2924     if (tso->why_blocked == BlockedOnCCall ||
2925         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2926         unblockOne(cap,tso);
2927         tso->what_next = ThreadKilled;
2928     } else {
2929         deleteThread(cap,tso);
2930     }
2931 }
2932 #endif
2933
2934 /* -----------------------------------------------------------------------------
2935    raiseExceptionHelper
2936    
2937    This function is called by the raise# primitve, just so that we can
2938    move some of the tricky bits of raising an exception from C-- into
2939    C.  Who knows, it might be a useful re-useable thing here too.
2940    -------------------------------------------------------------------------- */
2941
2942 StgWord
2943 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2944 {
2945     Capability *cap = regTableToCapability(reg);
2946     StgThunk *raise_closure = NULL;
2947     StgPtr p, next;
2948     StgRetInfoTable *info;
2949     //
2950     // This closure represents the expression 'raise# E' where E
2951     // is the exception raise.  It is used to overwrite all the
2952     // thunks which are currently under evaluataion.
2953     //
2954
2955     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2956     // LDV profiling: stg_raise_info has THUNK as its closure
2957     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2958     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2959     // 1 does not cause any problem unless profiling is performed.
2960     // However, when LDV profiling goes on, we need to linearly scan
2961     // small object pool, where raise_closure is stored, so we should
2962     // use MIN_UPD_SIZE.
2963     //
2964     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2965     //                                 sizeofW(StgClosure)+1);
2966     //
2967
2968     //
2969     // Walk up the stack, looking for the catch frame.  On the way,
2970     // we update any closures pointed to from update frames with the
2971     // raise closure that we just built.
2972     //
2973     p = tso->sp;
2974     while(1) {
2975         info = get_ret_itbl((StgClosure *)p);
2976         next = p + stack_frame_sizeW((StgClosure *)p);
2977         switch (info->i.type) {
2978             
2979         case UPDATE_FRAME:
2980             // Only create raise_closure if we need to.
2981             if (raise_closure == NULL) {
2982                 raise_closure = 
2983                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2984                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2985                 raise_closure->payload[0] = exception;
2986             }
2987             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2988             p = next;
2989             continue;
2990
2991         case ATOMICALLY_FRAME:
2992             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2993             tso->sp = p;
2994             return ATOMICALLY_FRAME;
2995             
2996         case CATCH_FRAME:
2997             tso->sp = p;
2998             return CATCH_FRAME;
2999
3000         case CATCH_STM_FRAME:
3001             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3002             tso->sp = p;
3003             return CATCH_STM_FRAME;
3004             
3005         case STOP_FRAME:
3006             tso->sp = p;
3007             return STOP_FRAME;
3008
3009         case CATCH_RETRY_FRAME:
3010         default:
3011             p = next; 
3012             continue;
3013         }
3014     }
3015 }
3016
3017
3018 /* -----------------------------------------------------------------------------
3019    findRetryFrameHelper
3020
3021    This function is called by the retry# primitive.  It traverses the stack
3022    leaving tso->sp referring to the frame which should handle the retry.  
3023
3024    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3025    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3026
3027    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3028    create) because retries are not considered to be exceptions, despite the
3029    similar implementation.
3030
3031    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3032    not be created within memory transactions.
3033    -------------------------------------------------------------------------- */
3034
3035 StgWord
3036 findRetryFrameHelper (StgTSO *tso)
3037 {
3038   StgPtr           p, next;
3039   StgRetInfoTable *info;
3040
3041   p = tso -> sp;
3042   while (1) {
3043     info = get_ret_itbl((StgClosure *)p);
3044     next = p + stack_frame_sizeW((StgClosure *)p);
3045     switch (info->i.type) {
3046       
3047     case ATOMICALLY_FRAME:
3048         debugTrace(DEBUG_stm,
3049                    "found ATOMICALLY_FRAME at %p during retry", p);
3050         tso->sp = p;
3051         return ATOMICALLY_FRAME;
3052       
3053     case CATCH_RETRY_FRAME:
3054         debugTrace(DEBUG_stm,
3055                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3056         tso->sp = p;
3057         return CATCH_RETRY_FRAME;
3058       
3059     case CATCH_STM_FRAME: {
3060         debugTrace(DEBUG_stm,
3061                    "found CATCH_STM_FRAME at %p during retry", p);
3062         StgTRecHeader *trec = tso -> trec;
3063         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3064         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3065         stmAbortTransaction(tso -> cap, trec);
3066         stmFreeAbortedTRec(tso -> cap, trec);
3067         tso -> trec = outer;
3068         p = next; 
3069         continue;
3070     }
3071       
3072
3073     default:
3074       ASSERT(info->i.type != CATCH_FRAME);
3075       ASSERT(info->i.type != STOP_FRAME);
3076       p = next; 
3077       continue;
3078     }
3079   }
3080 }
3081
3082 /* -----------------------------------------------------------------------------
3083    resurrectThreads is called after garbage collection on the list of
3084    threads found to be garbage.  Each of these threads will be woken
3085    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3086    on an MVar, or NonTermination if the thread was blocked on a Black
3087    Hole.
3088
3089    Locks: assumes we hold *all* the capabilities.
3090    -------------------------------------------------------------------------- */
3091
3092 void
3093 resurrectThreads (StgTSO *threads)
3094 {
3095     StgTSO *tso, *next;
3096     Capability *cap;
3097
3098     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3099         next = tso->global_link;
3100         tso->global_link = all_threads;
3101         all_threads = tso;
3102         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3103         
3104         // Wake up the thread on the Capability it was last on
3105         cap = tso->cap;
3106         
3107         switch (tso->why_blocked) {
3108         case BlockedOnMVar:
3109         case BlockedOnException:
3110             /* Called by GC - sched_mutex lock is currently held. */
3111             throwToSingleThreaded(cap, tso,
3112                                   (StgClosure *)BlockedOnDeadMVar_closure);
3113             break;
3114         case BlockedOnBlackHole:
3115             throwToSingleThreaded(cap, tso,
3116                                   (StgClosure *)NonTermination_closure);
3117             break;
3118         case BlockedOnSTM:
3119             throwToSingleThreaded(cap, tso,
3120                                   (StgClosure *)BlockedIndefinitely_closure);
3121             break;
3122         case NotBlocked:
3123             /* This might happen if the thread was blocked on a black hole
3124              * belonging to a thread that we've just woken up (raiseAsync
3125              * can wake up threads, remember...).
3126              */
3127             continue;
3128         default:
3129             barf("resurrectThreads: thread blocked in a strange way");
3130         }
3131     }
3132 }