Don't look at all the threads before each GC.
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* flag set by signal handler to precipitate a context switch
122  * LOCK: none (just an advisory flag)
123  */
124 int context_switch = 0;
125
126 /* flag that tracks whether we have done any execution in this time slice.
127  * LOCK: currently none, perhaps we should lock (but needs to be
128  * updated in the fast path of the scheduler).
129  */
130 nat recent_activity = ACTIVITY_YES;
131
132 /* if this flag is set as well, give up execution
133  * LOCK: none (changes once, from false->true)
134  */
135 rtsBool sched_state = SCHED_RUNNING;
136
137 #if defined(GRAN)
138 StgTSO *CurrentTSO;
139 #endif
140
141 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
142  *  exists - earlier gccs apparently didn't.
143  *  -= chak
144  */
145 StgTSO dummy_tso;
146
147 /*
148  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
149  * in an MT setting, needed to signal that a worker thread shouldn't hang around
150  * in the scheduler when it is out of work.
151  */
152 rtsBool shutting_down_scheduler = rtsFalse;
153
154 /*
155  * This mutex protects most of the global scheduler data in
156  * the THREADED_RTS runtime.
157  */
158 #if defined(THREADED_RTS)
159 Mutex sched_mutex;
160 #endif
161
162 #if defined(PARALLEL_HASKELL)
163 StgTSO *LastTSO;
164 rtsTime TimeOfLastYield;
165 rtsBool emitSchedule = rtsTrue;
166 #endif
167
168 #if !defined(mingw32_HOST_OS)
169 #define FORKPROCESS_PRIMOP_SUPPORTED
170 #endif
171
172 /* -----------------------------------------------------------------------------
173  * static function prototypes
174  * -------------------------------------------------------------------------- */
175
176 static Capability *schedule (Capability *initialCapability, Task *task);
177
178 //
179 // These function all encapsulate parts of the scheduler loop, and are
180 // abstracted only to make the structure and control flow of the
181 // scheduler clearer.
182 //
183 static void schedulePreLoop (void);
184 #if defined(THREADED_RTS)
185 static void schedulePushWork(Capability *cap, Task *task);
186 #endif
187 static void scheduleStartSignalHandlers (Capability *cap);
188 static void scheduleCheckBlockedThreads (Capability *cap);
189 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190 static void scheduleCheckBlackHoles (Capability *cap);
191 static void scheduleDetectDeadlock (Capability *cap, Task *task);
192 #if defined(GRAN)
193 static StgTSO *scheduleProcessEvent(rtsEvent *event);
194 #endif
195 #if defined(PARALLEL_HASKELL)
196 static StgTSO *scheduleSendPendingMessages(void);
197 static void scheduleActivateSpark(void);
198 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
199 #endif
200 #if defined(PAR) || defined(GRAN)
201 static void scheduleGranParReport(void);
202 #endif
203 static void schedulePostRunThread(StgTSO *t);
204 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
206                                          StgTSO *t);
207 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
208                                     nat prev_what_next );
209 static void scheduleHandleThreadBlocked( StgTSO *t );
210 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
211                                              StgTSO *t );
212 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213 static Capability *scheduleDoGC(Capability *cap, Task *task,
214                                 rtsBool force_major);
215
216 static rtsBool checkBlackHoles(Capability *cap);
217
218 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
220
221 static void deleteThread (Capability *cap, StgTSO *tso);
222 static void deleteAllThreads (Capability *cap);
223
224 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
225 static void deleteThread_(Capability *cap, StgTSO *tso);
226 #endif
227
228 #if defined(PARALLEL_HASKELL)
229 StgTSO * createSparkThread(rtsSpark spark);
230 StgTSO * activateSpark (rtsSpark spark);  
231 #endif
232
233 #ifdef DEBUG
234 static char *whatNext_strs[] = {
235   "(unknown)",
236   "ThreadRunGHC",
237   "ThreadInterpret",
238   "ThreadKilled",
239   "ThreadRelocated",
240   "ThreadComplete"
241 };
242 #endif
243
244 /* -----------------------------------------------------------------------------
245  * Putting a thread on the run queue: different scheduling policies
246  * -------------------------------------------------------------------------- */
247
248 STATIC_INLINE void
249 addToRunQueue( Capability *cap, StgTSO *t )
250 {
251 #if defined(PARALLEL_HASKELL)
252     if (RtsFlags.ParFlags.doFairScheduling) { 
253         // this does round-robin scheduling; good for concurrency
254         appendToRunQueue(cap,t);
255     } else {
256         // this does unfair scheduling; good for parallelism
257         pushOnRunQueue(cap,t);
258     }
259 #else
260     // this does round-robin scheduling; good for concurrency
261     appendToRunQueue(cap,t);
262 #endif
263 }
264
265 /* ---------------------------------------------------------------------------
266    Main scheduling loop.
267
268    We use round-robin scheduling, each thread returning to the
269    scheduler loop when one of these conditions is detected:
270
271       * out of heap space
272       * timer expires (thread yields)
273       * thread blocks
274       * thread ends
275       * stack overflow
276
277    GRAN version:
278      In a GranSim setup this loop iterates over the global event queue.
279      This revolves around the global event queue, which determines what 
280      to do next. Therefore, it's more complicated than either the 
281      concurrent or the parallel (GUM) setup.
282
283    GUM version:
284      GUM iterates over incoming messages.
285      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
286      and sends out a fish whenever it has nothing to do; in-between
287      doing the actual reductions (shared code below) it processes the
288      incoming messages and deals with delayed operations 
289      (see PendingFetches).
290      This is not the ugliest code you could imagine, but it's bloody close.
291
292    ------------------------------------------------------------------------ */
293
294 static Capability *
295 schedule (Capability *initialCapability, Task *task)
296 {
297   StgTSO *t;
298   Capability *cap;
299   StgThreadReturnCode ret;
300 #if defined(GRAN)
301   rtsEvent *event;
302 #elif defined(PARALLEL_HASKELL)
303   StgTSO *tso;
304   GlobalTaskId pe;
305   rtsBool receivedFinish = rtsFalse;
306 # if defined(DEBUG)
307   nat tp_size, sp_size; // stats only
308 # endif
309 #endif
310   nat prev_what_next;
311   rtsBool ready_to_gc;
312 #if defined(THREADED_RTS)
313   rtsBool first = rtsTrue;
314 #endif
315   
316   cap = initialCapability;
317
318   // Pre-condition: this task owns initialCapability.
319   // The sched_mutex is *NOT* held
320   // NB. on return, we still hold a capability.
321
322   debugTrace (DEBUG_sched, 
323               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
324               task, initialCapability);
325
326   schedulePreLoop();
327
328   // -----------------------------------------------------------
329   // Scheduler loop starts here:
330
331 #if defined(PARALLEL_HASKELL)
332 #define TERMINATION_CONDITION        (!receivedFinish)
333 #elif defined(GRAN)
334 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
335 #else
336 #define TERMINATION_CONDITION        rtsTrue
337 #endif
338
339   while (TERMINATION_CONDITION) {
340
341 #if defined(GRAN)
342       /* Choose the processor with the next event */
343       CurrentProc = event->proc;
344       CurrentTSO = event->tso;
345 #endif
346
347 #if defined(THREADED_RTS)
348       if (first) {
349           // don't yield the first time, we want a chance to run this
350           // thread for a bit, even if there are others banging at the
351           // door.
352           first = rtsFalse;
353           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
354       } else {
355           // Yield the capability to higher-priority tasks if necessary.
356           yieldCapability(&cap, task);
357       }
358 #endif
359       
360 #if defined(THREADED_RTS)
361       schedulePushWork(cap,task);
362 #endif
363
364     // Check whether we have re-entered the RTS from Haskell without
365     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
366     // call).
367     if (cap->in_haskell) {
368           errorBelch("schedule: re-entered unsafely.\n"
369                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
370           stg_exit(EXIT_FAILURE);
371     }
372
373     // The interruption / shutdown sequence.
374     // 
375     // In order to cleanly shut down the runtime, we want to:
376     //   * make sure that all main threads return to their callers
377     //     with the state 'Interrupted'.
378     //   * clean up all OS threads assocated with the runtime
379     //   * free all memory etc.
380     //
381     // So the sequence for ^C goes like this:
382     //
383     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
384     //     arranges for some Capability to wake up
385     //
386     //   * all threads in the system are halted, and the zombies are
387     //     placed on the run queue for cleaning up.  We acquire all
388     //     the capabilities in order to delete the threads, this is
389     //     done by scheduleDoGC() for convenience (because GC already
390     //     needs to acquire all the capabilities).  We can't kill
391     //     threads involved in foreign calls.
392     // 
393     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
394     //
395     //   * sched_state := SCHED_SHUTTING_DOWN
396     //
397     //   * all workers exit when the run queue on their capability
398     //     drains.  All main threads will also exit when their TSO
399     //     reaches the head of the run queue and they can return.
400     //
401     //   * eventually all Capabilities will shut down, and the RTS can
402     //     exit.
403     //
404     //   * We might be left with threads blocked in foreign calls, 
405     //     we should really attempt to kill these somehow (TODO);
406     
407     switch (sched_state) {
408     case SCHED_RUNNING:
409         break;
410     case SCHED_INTERRUPTING:
411         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412 #if defined(THREADED_RTS)
413         discardSparksCap(cap);
414 #endif
415         /* scheduleDoGC() deletes all the threads */
416         cap = scheduleDoGC(cap,task,rtsFalse);
417         break;
418     case SCHED_SHUTTING_DOWN:
419         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420         // If we are a worker, just exit.  If we're a bound thread
421         // then we will exit below when we've removed our TSO from
422         // the run queue.
423         if (task->tso == NULL && emptyRunQueue(cap)) {
424             return cap;
425         }
426         break;
427     default:
428         barf("sched_state: %d", sched_state);
429     }
430
431 #if defined(THREADED_RTS)
432     // If the run queue is empty, take a spark and turn it into a thread.
433     {
434         if (emptyRunQueue(cap)) {
435             StgClosure *spark;
436             spark = findSpark(cap);
437             if (spark != NULL) {
438                 debugTrace(DEBUG_sched,
439                            "turning spark of closure %p into a thread",
440                            (StgClosure *)spark);
441                 createSparkThread(cap,spark);     
442             }
443         }
444     }
445 #endif // THREADED_RTS
446
447     scheduleStartSignalHandlers(cap);
448
449     // Only check the black holes here if we've nothing else to do.
450     // During normal execution, the black hole list only gets checked
451     // at GC time, to avoid repeatedly traversing this possibly long
452     // list each time around the scheduler.
453     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
454
455     scheduleCheckWakeupThreads(cap);
456
457     scheduleCheckBlockedThreads(cap);
458
459     scheduleDetectDeadlock(cap,task);
460 #if defined(THREADED_RTS)
461     cap = task->cap;    // reload cap, it might have changed
462 #endif
463
464     // Normally, the only way we can get here with no threads to
465     // run is if a keyboard interrupt received during 
466     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
467     // Additionally, it is not fatal for the
468     // threaded RTS to reach here with no threads to run.
469     //
470     // win32: might be here due to awaitEvent() being abandoned
471     // as a result of a console event having been delivered.
472     if ( emptyRunQueue(cap) ) {
473 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474         ASSERT(sched_state >= SCHED_INTERRUPTING);
475 #endif
476         continue; // nothing to do
477     }
478
479 #if defined(PARALLEL_HASKELL)
480     scheduleSendPendingMessages();
481     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
482         continue;
483
484 #if defined(SPARKS)
485     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
486 #endif
487
488     /* If we still have no work we need to send a FISH to get a spark
489        from another PE */
490     if (emptyRunQueue(cap)) {
491         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
492         ASSERT(rtsFalse); // should not happen at the moment
493     }
494     // from here: non-empty run queue.
495     //  TODO: merge above case with this, only one call processMessages() !
496     if (PacketsWaiting()) {  /* process incoming messages, if
497                                 any pending...  only in else
498                                 because getRemoteWork waits for
499                                 messages as well */
500         receivedFinish = processMessages();
501     }
502 #endif
503
504 #if defined(GRAN)
505     scheduleProcessEvent(event);
506 #endif
507
508     // 
509     // Get a thread to run
510     //
511     t = popRunQueue(cap);
512
513 #if defined(GRAN) || defined(PAR)
514     scheduleGranParReport(); // some kind of debuging output
515 #else
516     // Sanity check the thread we're about to run.  This can be
517     // expensive if there is lots of thread switching going on...
518     IF_DEBUG(sanity,checkTSO(t));
519 #endif
520
521 #if defined(THREADED_RTS)
522     // Check whether we can run this thread in the current task.
523     // If not, we have to pass our capability to the right task.
524     {
525         Task *bound = t->bound;
526       
527         if (bound) {
528             if (bound == task) {
529                 debugTrace(DEBUG_sched,
530                            "### Running thread %lu in bound thread", (unsigned long)t->id);
531                 // yes, the Haskell thread is bound to the current native thread
532             } else {
533                 debugTrace(DEBUG_sched,
534                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 debugTrace(DEBUG_sched,
543                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     cap->r.rCurrentTSO = t;
554     
555     /* context switches are initiated by the timer signal, unless
556      * the user specified "context switch as often as possible", with
557      * +RTS -C0
558      */
559     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
560         && !emptyThreadQueues(cap)) {
561         context_switch = 1;
562     }
563          
564 run_thread:
565
566     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
567                               (long)t->id, whatNext_strs[t->what_next]);
568
569     startHeapProfTimer();
570
571     // Check for exceptions blocked on this thread
572     maybePerformBlockedException (cap, t);
573
574     // ----------------------------------------------------------------------
575     // Run the current thread 
576
577     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
578     ASSERT(t->cap == cap);
579
580     prev_what_next = t->what_next;
581
582     errno = t->saved_errno;
583 #if mingw32_HOST_OS
584     SetLastError(t->saved_winerror);
585 #endif
586
587     cap->in_haskell = rtsTrue;
588
589     dirty_TSO(cap,t);
590
591 #if defined(THREADED_RTS)
592     if (recent_activity == ACTIVITY_DONE_GC) {
593         // ACTIVITY_DONE_GC means we turned off the timer signal to
594         // conserve power (see #1623).  Re-enable it here.
595         nat prev;
596         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
597         if (prev == ACTIVITY_DONE_GC) {
598             startTimer();
599         }
600     } else {
601         recent_activity = ACTIVITY_YES;
602     }
603 #endif
604
605     switch (prev_what_next) {
606         
607     case ThreadKilled:
608     case ThreadComplete:
609         /* Thread already finished, return to scheduler. */
610         ret = ThreadFinished;
611         break;
612         
613     case ThreadRunGHC:
614     {
615         StgRegTable *r;
616         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
617         cap = regTableToCapability(r);
618         ret = r->rRet;
619         break;
620     }
621     
622     case ThreadInterpret:
623         cap = interpretBCO(cap);
624         ret = cap->r.rRet;
625         break;
626         
627     default:
628         barf("schedule: invalid what_next field");
629     }
630
631     cap->in_haskell = rtsFalse;
632
633     // The TSO might have moved, eg. if it re-entered the RTS and a GC
634     // happened.  So find the new location:
635     t = cap->r.rCurrentTSO;
636
637     // We have run some Haskell code: there might be blackhole-blocked
638     // threads to wake up now.
639     // Lock-free test here should be ok, we're just setting a flag.
640     if ( blackhole_queue != END_TSO_QUEUE ) {
641         blackholes_need_checking = rtsTrue;
642     }
643     
644     // And save the current errno in this thread.
645     // XXX: possibly bogus for SMP because this thread might already
646     // be running again, see code below.
647     t->saved_errno = errno;
648 #if mingw32_HOST_OS
649     // Similarly for Windows error code
650     t->saved_winerror = GetLastError();
651 #endif
652
653 #if defined(THREADED_RTS)
654     // If ret is ThreadBlocked, and this Task is bound to the TSO that
655     // blocked, we are in limbo - the TSO is now owned by whatever it
656     // is blocked on, and may in fact already have been woken up,
657     // perhaps even on a different Capability.  It may be the case
658     // that task->cap != cap.  We better yield this Capability
659     // immediately and return to normaility.
660     if (ret == ThreadBlocked) {
661         debugTrace(DEBUG_sched,
662                    "--<< thread %lu (%s) stopped: blocked",
663                    (unsigned long)t->id, whatNext_strs[t->what_next]);
664         continue;
665     }
666 #endif
667
668     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
669     ASSERT(t->cap == cap);
670
671     // ----------------------------------------------------------------------
672     
673     // Costs for the scheduler are assigned to CCS_SYSTEM
674     stopHeapProfTimer();
675 #if defined(PROFILING)
676     CCCS = CCS_SYSTEM;
677 #endif
678     
679     schedulePostRunThread(t);
680
681     t = threadStackUnderflow(task,t);
682
683     ready_to_gc = rtsFalse;
684
685     switch (ret) {
686     case HeapOverflow:
687         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
688         break;
689
690     case StackOverflow:
691         scheduleHandleStackOverflow(cap,task,t);
692         break;
693
694     case ThreadYielding:
695         if (scheduleHandleYield(cap, t, prev_what_next)) {
696             // shortcut for switching between compiler/interpreter:
697             goto run_thread; 
698         }
699         break;
700
701     case ThreadBlocked:
702         scheduleHandleThreadBlocked(t);
703         break;
704
705     case ThreadFinished:
706         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
707         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
708         break;
709
710     default:
711       barf("schedule: invalid thread return code %d", (int)ret);
712     }
713
714     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
715       cap = scheduleDoGC(cap,task,rtsFalse);
716     }
717   } /* end of while() */
718 }
719
720 /* ----------------------------------------------------------------------------
721  * Setting up the scheduler loop
722  * ------------------------------------------------------------------------- */
723
724 static void
725 schedulePreLoop(void)
726 {
727 #if defined(GRAN) 
728     /* set up first event to get things going */
729     /* ToDo: assign costs for system setup and init MainTSO ! */
730     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
731               ContinueThread, 
732               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
733     
734     debugTrace (DEBUG_gran,
735                 "GRAN: Init CurrentTSO (in schedule) = %p", 
736                 CurrentTSO);
737     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
738     
739     if (RtsFlags.GranFlags.Light) {
740         /* Save current time; GranSim Light only */
741         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
742     }      
743 #endif
744 }
745
746 /* -----------------------------------------------------------------------------
747  * schedulePushWork()
748  *
749  * Push work to other Capabilities if we have some.
750  * -------------------------------------------------------------------------- */
751
752 #if defined(THREADED_RTS)
753 static void
754 schedulePushWork(Capability *cap USED_IF_THREADS, 
755                  Task *task      USED_IF_THREADS)
756 {
757     Capability *free_caps[n_capabilities], *cap0;
758     nat i, n_free_caps;
759
760     // migration can be turned off with +RTS -qg
761     if (!RtsFlags.ParFlags.migrate) return;
762
763     // Check whether we have more threads on our run queue, or sparks
764     // in our pool, that we could hand to another Capability.
765     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
766         && sparkPoolSizeCap(cap) < 2) {
767         return;
768     }
769
770     // First grab as many free Capabilities as we can.
771     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
772         cap0 = &capabilities[i];
773         if (cap != cap0 && tryGrabCapability(cap0,task)) {
774             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
775                 // it already has some work, we just grabbed it at 
776                 // the wrong moment.  Or maybe it's deadlocked!
777                 releaseCapability(cap0);
778             } else {
779                 free_caps[n_free_caps++] = cap0;
780             }
781         }
782     }
783
784     // we now have n_free_caps free capabilities stashed in
785     // free_caps[].  Share our run queue equally with them.  This is
786     // probably the simplest thing we could do; improvements we might
787     // want to do include:
788     //
789     //   - giving high priority to moving relatively new threads, on 
790     //     the gournds that they haven't had time to build up a
791     //     working set in the cache on this CPU/Capability.
792     //
793     //   - giving low priority to moving long-lived threads
794
795     if (n_free_caps > 0) {
796         StgTSO *prev, *t, *next;
797         rtsBool pushed_to_all;
798
799         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
800
801         i = 0;
802         pushed_to_all = rtsFalse;
803
804         if (cap->run_queue_hd != END_TSO_QUEUE) {
805             prev = cap->run_queue_hd;
806             t = prev->_link;
807             prev->_link = END_TSO_QUEUE;
808             for (; t != END_TSO_QUEUE; t = next) {
809                 next = t->_link;
810                 t->_link = END_TSO_QUEUE;
811                 if (t->what_next == ThreadRelocated
812                     || t->bound == task // don't move my bound thread
813                     || tsoLocked(t)) {  // don't move a locked thread
814                     setTSOLink(cap, prev, t);
815                     prev = t;
816                 } else if (i == n_free_caps) {
817                     pushed_to_all = rtsTrue;
818                     i = 0;
819                     // keep one for us
820                     setTSOLink(cap, prev, t);
821                     prev = t;
822                 } else {
823                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
824                     appendToRunQueue(free_caps[i],t);
825                     if (t->bound) { t->bound->cap = free_caps[i]; }
826                     t->cap = free_caps[i];
827                     i++;
828                 }
829             }
830             cap->run_queue_tl = prev;
831         }
832
833         // If there are some free capabilities that we didn't push any
834         // threads to, then try to push a spark to each one.
835         if (!pushed_to_all) {
836             StgClosure *spark;
837             // i is the next free capability to push to
838             for (; i < n_free_caps; i++) {
839                 if (emptySparkPoolCap(free_caps[i])) {
840                     spark = findSpark(cap);
841                     if (spark != NULL) {
842                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
843                         newSpark(&(free_caps[i]->r), spark);
844                     }
845                 }
846             }
847         }
848
849         // release the capabilities
850         for (i = 0; i < n_free_caps; i++) {
851             task->cap = free_caps[i];
852             releaseCapability(free_caps[i]);
853         }
854     }
855     task->cap = cap; // reset to point to our Capability.
856 }
857 #endif
858
859 /* ----------------------------------------------------------------------------
860  * Start any pending signal handlers
861  * ------------------------------------------------------------------------- */
862
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
864 static void
865 scheduleStartSignalHandlers(Capability *cap)
866 {
867     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868         // safe outside the lock
869         startSignalHandlers(cap);
870     }
871 }
872 #else
873 static void
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
875 {
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880  * Check for blocked threads that can be woken up.
881  * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
885 {
886 #if !defined(THREADED_RTS)
887     //
888     // Check whether any waiting threads need to be woken up.  If the
889     // run queue is empty, and there are no other tasks running, we
890     // can wait indefinitely for something to happen.
891     //
892     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
893     {
894         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
895     }
896 #endif
897 }
898
899
900 /* ----------------------------------------------------------------------------
901  * Check for threads woken up by other Capabilities
902  * ------------------------------------------------------------------------- */
903
904 static void
905 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
906 {
907 #if defined(THREADED_RTS)
908     // Any threads that were woken up by other Capabilities get
909     // appended to our run queue.
910     if (!emptyWakeupQueue(cap)) {
911         ACQUIRE_LOCK(&cap->lock);
912         if (emptyRunQueue(cap)) {
913             cap->run_queue_hd = cap->wakeup_queue_hd;
914             cap->run_queue_tl = cap->wakeup_queue_tl;
915         } else {
916             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         }
919         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
920         RELEASE_LOCK(&cap->lock);
921     }
922 #endif
923 }
924
925 /* ----------------------------------------------------------------------------
926  * Check for threads blocked on BLACKHOLEs that can be woken up
927  * ------------------------------------------------------------------------- */
928 static void
929 scheduleCheckBlackHoles (Capability *cap)
930 {
931     if ( blackholes_need_checking ) // check without the lock first
932     {
933         ACQUIRE_LOCK(&sched_mutex);
934         if ( blackholes_need_checking ) {
935             checkBlackHoles(cap);
936             blackholes_need_checking = rtsFalse;
937         }
938         RELEASE_LOCK(&sched_mutex);
939     }
940 }
941
942 /* ----------------------------------------------------------------------------
943  * Detect deadlock conditions and attempt to resolve them.
944  * ------------------------------------------------------------------------- */
945
946 static void
947 scheduleDetectDeadlock (Capability *cap, Task *task)
948 {
949
950 #if defined(PARALLEL_HASKELL)
951     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
952     return;
953 #endif
954
955     /* 
956      * Detect deadlock: when we have no threads to run, there are no
957      * threads blocked, waiting for I/O, or sleeping, and all the
958      * other tasks are waiting for work, we must have a deadlock of
959      * some description.
960      */
961     if ( emptyThreadQueues(cap) )
962     {
963 #if defined(THREADED_RTS)
964         /* 
965          * In the threaded RTS, we only check for deadlock if there
966          * has been no activity in a complete timeslice.  This means
967          * we won't eagerly start a full GC just because we don't have
968          * any threads to run currently.
969          */
970         if (recent_activity != ACTIVITY_INACTIVE) return;
971 #endif
972
973         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
974
975         // Garbage collection can release some new threads due to
976         // either (a) finalizers or (b) threads resurrected because
977         // they are unreachable and will therefore be sent an
978         // exception.  Any threads thus released will be immediately
979         // runnable.
980         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
981
982         recent_activity = ACTIVITY_DONE_GC;
983         // disable timer signals (see #1623)
984         stopTimer();
985         
986         if ( !emptyRunQueue(cap) ) return;
987
988 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
989         /* If we have user-installed signal handlers, then wait
990          * for signals to arrive rather then bombing out with a
991          * deadlock.
992          */
993         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
994             debugTrace(DEBUG_sched,
995                        "still deadlocked, waiting for signals...");
996
997             awaitUserSignals();
998
999             if (signals_pending()) {
1000                 startSignalHandlers(cap);
1001             }
1002
1003             // either we have threads to run, or we were interrupted:
1004             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1005         }
1006 #endif
1007
1008 #if !defined(THREADED_RTS)
1009         /* Probably a real deadlock.  Send the current main thread the
1010          * Deadlock exception.
1011          */
1012         if (task->tso) {
1013             switch (task->tso->why_blocked) {
1014             case BlockedOnSTM:
1015             case BlockedOnBlackHole:
1016             case BlockedOnException:
1017             case BlockedOnMVar:
1018                 throwToSingleThreaded(cap, task->tso, 
1019                                       (StgClosure *)NonTermination_closure);
1020                 return;
1021             default:
1022                 barf("deadlock: main thread blocked in a strange way");
1023             }
1024         }
1025         return;
1026 #endif
1027     }
1028 }
1029
1030 /* ----------------------------------------------------------------------------
1031  * Process an event (GRAN only)
1032  * ------------------------------------------------------------------------- */
1033
1034 #if defined(GRAN)
1035 static StgTSO *
1036 scheduleProcessEvent(rtsEvent *event)
1037 {
1038     StgTSO *t;
1039
1040     if (RtsFlags.GranFlags.Light)
1041       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1042
1043     /* adjust time based on time-stamp */
1044     if (event->time > CurrentTime[CurrentProc] &&
1045         event->evttype != ContinueThread)
1046       CurrentTime[CurrentProc] = event->time;
1047     
1048     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1049     if (!RtsFlags.GranFlags.Light)
1050       handleIdlePEs();
1051
1052     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1053
1054     /* main event dispatcher in GranSim */
1055     switch (event->evttype) {
1056       /* Should just be continuing execution */
1057     case ContinueThread:
1058       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1059       /* ToDo: check assertion
1060       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1061              run_queue_hd != END_TSO_QUEUE);
1062       */
1063       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1064       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1065           procStatus[CurrentProc]==Fetching) {
1066         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1067               CurrentTSO->id, CurrentTSO, CurrentProc);
1068         goto next_thread;
1069       } 
1070       /* Ignore ContinueThreads for completed threads */
1071       if (CurrentTSO->what_next == ThreadComplete) {
1072         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1073               CurrentTSO->id, CurrentTSO, CurrentProc);
1074         goto next_thread;
1075       } 
1076       /* Ignore ContinueThreads for threads that are being migrated */
1077       if (PROCS(CurrentTSO)==Nowhere) { 
1078         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1079               CurrentTSO->id, CurrentTSO, CurrentProc);
1080         goto next_thread;
1081       }
1082       /* The thread should be at the beginning of the run queue */
1083       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1084         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1085               CurrentTSO->id, CurrentTSO, CurrentProc);
1086         break; // run the thread anyway
1087       }
1088       /*
1089       new_event(proc, proc, CurrentTime[proc],
1090                 FindWork,
1091                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1092       goto next_thread; 
1093       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1094       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1095
1096     case FetchNode:
1097       do_the_fetchnode(event);
1098       goto next_thread;             /* handle next event in event queue  */
1099       
1100     case GlobalBlock:
1101       do_the_globalblock(event);
1102       goto next_thread;             /* handle next event in event queue  */
1103       
1104     case FetchReply:
1105       do_the_fetchreply(event);
1106       goto next_thread;             /* handle next event in event queue  */
1107       
1108     case UnblockThread:   /* Move from the blocked queue to the tail of */
1109       do_the_unblock(event);
1110       goto next_thread;             /* handle next event in event queue  */
1111       
1112     case ResumeThread:  /* Move from the blocked queue to the tail of */
1113       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1114       event->tso->gran.blocktime += 
1115         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1116       do_the_startthread(event);
1117       goto next_thread;             /* handle next event in event queue  */
1118       
1119     case StartThread:
1120       do_the_startthread(event);
1121       goto next_thread;             /* handle next event in event queue  */
1122       
1123     case MoveThread:
1124       do_the_movethread(event);
1125       goto next_thread;             /* handle next event in event queue  */
1126       
1127     case MoveSpark:
1128       do_the_movespark(event);
1129       goto next_thread;             /* handle next event in event queue  */
1130       
1131     case FindWork:
1132       do_the_findwork(event);
1133       goto next_thread;             /* handle next event in event queue  */
1134       
1135     default:
1136       barf("Illegal event type %u\n", event->evttype);
1137     }  /* switch */
1138     
1139     /* This point was scheduler_loop in the old RTS */
1140
1141     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1142
1143     TimeOfLastEvent = CurrentTime[CurrentProc];
1144     TimeOfNextEvent = get_time_of_next_event();
1145     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1146     // CurrentTSO = ThreadQueueHd;
1147
1148     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1149                          TimeOfNextEvent));
1150
1151     if (RtsFlags.GranFlags.Light) 
1152       GranSimLight_leave_system(event, &ActiveTSO); 
1153
1154     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1155
1156     IF_DEBUG(gran, 
1157              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1158
1159     /* in a GranSim setup the TSO stays on the run queue */
1160     t = CurrentTSO;
1161     /* Take a thread from the run queue. */
1162     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1163
1164     IF_DEBUG(gran, 
1165              debugBelch("GRAN: About to run current thread, which is\n");
1166              G_TSO(t,5));
1167
1168     context_switch = 0; // turned on via GranYield, checking events and time slice
1169
1170     IF_DEBUG(gran, 
1171              DumpGranEvent(GR_SCHEDULE, t));
1172
1173     procStatus[CurrentProc] = Busy;
1174 }
1175 #endif // GRAN
1176
1177 /* ----------------------------------------------------------------------------
1178  * Send pending messages (PARALLEL_HASKELL only)
1179  * ------------------------------------------------------------------------- */
1180
1181 #if defined(PARALLEL_HASKELL)
1182 static StgTSO *
1183 scheduleSendPendingMessages(void)
1184 {
1185     StgSparkPool *pool;
1186     rtsSpark spark;
1187     StgTSO *t;
1188
1189 # if defined(PAR) // global Mem.Mgmt., omit for now
1190     if (PendingFetches != END_BF_QUEUE) {
1191         processFetches();
1192     }
1193 # endif
1194     
1195     if (RtsFlags.ParFlags.BufferTime) {
1196         // if we use message buffering, we must send away all message
1197         // packets which have become too old...
1198         sendOldBuffers(); 
1199     }
1200 }
1201 #endif
1202
1203 /* ----------------------------------------------------------------------------
1204  * Activate spark threads (PARALLEL_HASKELL only)
1205  * ------------------------------------------------------------------------- */
1206
1207 #if defined(PARALLEL_HASKELL)
1208 static void
1209 scheduleActivateSpark(void)
1210 {
1211 #if defined(SPARKS)
1212   ASSERT(emptyRunQueue());
1213 /* We get here if the run queue is empty and want some work.
1214    We try to turn a spark into a thread, and add it to the run queue,
1215    from where it will be picked up in the next iteration of the scheduler
1216    loop.
1217 */
1218
1219       /* :-[  no local threads => look out for local sparks */
1220       /* the spark pool for the current PE */
1221       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1222       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1223           pool->hd < pool->tl) {
1224         /* 
1225          * ToDo: add GC code check that we really have enough heap afterwards!!
1226          * Old comment:
1227          * If we're here (no runnable threads) and we have pending
1228          * sparks, we must have a space problem.  Get enough space
1229          * to turn one of those pending sparks into a
1230          * thread... 
1231          */
1232
1233         spark = findSpark(rtsFalse);            /* get a spark */
1234         if (spark != (rtsSpark) NULL) {
1235           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1236           IF_PAR_DEBUG(fish, // schedule,
1237                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1238                              tso->id, tso, advisory_thread_count));
1239
1240           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1241             IF_PAR_DEBUG(fish, // schedule,
1242                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1243                             spark));
1244             return rtsFalse; /* failed to generate a thread */
1245           }                  /* otherwise fall through & pick-up new tso */
1246         } else {
1247           IF_PAR_DEBUG(fish, // schedule,
1248                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1249                              spark_queue_len(pool)));
1250           return rtsFalse;  /* failed to generate a thread */
1251         }
1252         return rtsTrue;  /* success in generating a thread */
1253   } else { /* no more threads permitted or pool empty */
1254     return rtsFalse;  /* failed to generateThread */
1255   }
1256 #else
1257   tso = NULL; // avoid compiler warning only
1258   return rtsFalse;  /* dummy in non-PAR setup */
1259 #endif // SPARKS
1260 }
1261 #endif // PARALLEL_HASKELL
1262
1263 /* ----------------------------------------------------------------------------
1264  * Get work from a remote node (PARALLEL_HASKELL only)
1265  * ------------------------------------------------------------------------- */
1266     
1267 #if defined(PARALLEL_HASKELL)
1268 static rtsBool
1269 scheduleGetRemoteWork(rtsBool *receivedFinish)
1270 {
1271   ASSERT(emptyRunQueue());
1272
1273   if (RtsFlags.ParFlags.BufferTime) {
1274         IF_PAR_DEBUG(verbose, 
1275                 debugBelch("...send all pending data,"));
1276         {
1277           nat i;
1278           for (i=1; i<=nPEs; i++)
1279             sendImmediately(i); // send all messages away immediately
1280         }
1281   }
1282 # ifndef SPARKS
1283         //++EDEN++ idle() , i.e. send all buffers, wait for work
1284         // suppress fishing in EDEN... just look for incoming messages
1285         // (blocking receive)
1286   IF_PAR_DEBUG(verbose, 
1287                debugBelch("...wait for incoming messages...\n"));
1288   *receivedFinish = processMessages(); // blocking receive...
1289
1290         // and reenter scheduling loop after having received something
1291         // (return rtsFalse below)
1292
1293 # else /* activate SPARKS machinery */
1294 /* We get here, if we have no work, tried to activate a local spark, but still
1295    have no work. We try to get a remote spark, by sending a FISH message.
1296    Thread migration should be added here, and triggered when a sequence of 
1297    fishes returns without work. */
1298         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1299
1300       /* =8-[  no local sparks => look for work on other PEs */
1301         /*
1302          * We really have absolutely no work.  Send out a fish
1303          * (there may be some out there already), and wait for
1304          * something to arrive.  We clearly can't run any threads
1305          * until a SCHEDULE or RESUME arrives, and so that's what
1306          * we're hoping to see.  (Of course, we still have to
1307          * respond to other types of messages.)
1308          */
1309         rtsTime now = msTime() /*CURRENT_TIME*/;
1310         IF_PAR_DEBUG(verbose, 
1311                      debugBelch("--  now=%ld\n", now));
1312         IF_PAR_DEBUG(fish, // verbose,
1313              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1314                  (last_fish_arrived_at!=0 &&
1315                   last_fish_arrived_at+delay > now)) {
1316                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1317                      now, last_fish_arrived_at+delay, 
1318                      last_fish_arrived_at,
1319                      delay);
1320              });
1321   
1322         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1323             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1324           if (last_fish_arrived_at==0 ||
1325               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1326             /* outstandingFishes is set in sendFish, processFish;
1327                avoid flooding system with fishes via delay */
1328     next_fish_to_send_at = 0;  
1329   } else {
1330     /* ToDo: this should be done in the main scheduling loop to avoid the
1331              busy wait here; not so bad if fish delay is very small  */
1332     int iq = 0; // DEBUGGING -- HWL
1333     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1334     /* send a fish when ready, but process messages that arrive in the meantime */
1335     do {
1336       if (PacketsWaiting()) {
1337         iq++; // DEBUGGING
1338         *receivedFinish = processMessages();
1339       }
1340       now = msTime();
1341     } while (!*receivedFinish || now<next_fish_to_send_at);
1342     // JB: This means the fish could become obsolete, if we receive
1343     // work. Better check for work again? 
1344     // last line: while (!receivedFinish || !haveWork || now<...)
1345     // next line: if (receivedFinish || haveWork )
1346
1347     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1348       return rtsFalse;  // NB: this will leave scheduler loop
1349                         // immediately after return!
1350                           
1351     IF_PAR_DEBUG(fish, // verbose,
1352                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1353
1354   }
1355
1356     // JB: IMHO, this should all be hidden inside sendFish(...)
1357     /* pe = choosePE(); 
1358        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1359                 NEW_FISH_HUNGER);
1360
1361     // Global statistics: count no. of fishes
1362     if (RtsFlags.ParFlags.ParStats.Global &&
1363          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1364            globalParStats.tot_fish_mess++;
1365            }
1366     */ 
1367
1368   /* delayed fishes must have been sent by now! */
1369   next_fish_to_send_at = 0;  
1370   }
1371       
1372   *receivedFinish = processMessages();
1373 # endif /* SPARKS */
1374
1375  return rtsFalse;
1376  /* NB: this function always returns rtsFalse, meaning the scheduler
1377     loop continues with the next iteration; 
1378     rationale: 
1379       return code means success in finding work; we enter this function
1380       if there is no local work, thus have to send a fish which takes
1381       time until it arrives with work; in the meantime we should process
1382       messages in the main loop;
1383  */
1384 }
1385 #endif // PARALLEL_HASKELL
1386
1387 /* ----------------------------------------------------------------------------
1388  * PAR/GRAN: Report stats & debugging info(?)
1389  * ------------------------------------------------------------------------- */
1390
1391 #if defined(PAR) || defined(GRAN)
1392 static void
1393 scheduleGranParReport(void)
1394 {
1395   ASSERT(run_queue_hd != END_TSO_QUEUE);
1396
1397   /* Take a thread from the run queue, if we have work */
1398   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1399
1400     /* If this TSO has got its outport closed in the meantime, 
1401      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1402      * It has to be marked as TH_DEAD for this purpose.
1403      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1404
1405 JB: TODO: investigate wether state change field could be nuked
1406      entirely and replaced by the normal tso state (whatnext
1407      field). All we want to do is to kill tsos from outside.
1408      */
1409
1410     /* ToDo: write something to the log-file
1411     if (RTSflags.ParFlags.granSimStats && !sameThread)
1412         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1413
1414     CurrentTSO = t;
1415     */
1416     /* the spark pool for the current PE */
1417     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1418
1419     IF_DEBUG(scheduler, 
1420              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1421                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1422
1423     IF_PAR_DEBUG(fish,
1424              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1425                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1426
1427     if (RtsFlags.ParFlags.ParStats.Full && 
1428         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1429         (emitSchedule || // forced emit
1430          (t && LastTSO && t->id != LastTSO->id))) {
1431       /* 
1432          we are running a different TSO, so write a schedule event to log file
1433          NB: If we use fair scheduling we also have to write  a deschedule 
1434              event for LastTSO; with unfair scheduling we know that the
1435              previous tso has blocked whenever we switch to another tso, so
1436              we don't need it in GUM for now
1437       */
1438       IF_PAR_DEBUG(fish, // schedule,
1439                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1440
1441       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1442                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1443       emitSchedule = rtsFalse;
1444     }
1445 }     
1446 #endif
1447
1448 /* ----------------------------------------------------------------------------
1449  * After running a thread...
1450  * ------------------------------------------------------------------------- */
1451
1452 static void
1453 schedulePostRunThread (StgTSO *t)
1454 {
1455     // We have to be able to catch transactions that are in an
1456     // infinite loop as a result of seeing an inconsistent view of
1457     // memory, e.g. 
1458     //
1459     //   atomically $ do
1460     //       [a,b] <- mapM readTVar [ta,tb]
1461     //       when (a == b) loop
1462     //
1463     // and a is never equal to b given a consistent view of memory.
1464     //
1465     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1466         if (!stmValidateNestOfTransactions (t -> trec)) {
1467             debugTrace(DEBUG_sched | DEBUG_stm,
1468                        "trec %p found wasting its time", t);
1469             
1470             // strip the stack back to the
1471             // ATOMICALLY_FRAME, aborting the (nested)
1472             // transaction, and saving the stack of any
1473             // partially-evaluated thunks on the heap.
1474             throwToSingleThreaded_(&capabilities[0], t, 
1475                                    NULL, rtsTrue, NULL);
1476             
1477 #ifdef REG_R1
1478             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1479 #endif
1480         }
1481     }
1482
1483 #if defined(PAR)
1484     /* HACK 675: if the last thread didn't yield, make sure to print a 
1485        SCHEDULE event to the log file when StgRunning the next thread, even
1486        if it is the same one as before */
1487     LastTSO = t; 
1488     TimeOfLastYield = CURRENT_TIME;
1489 #endif
1490
1491   /* some statistics gathering in the parallel case */
1492
1493 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1494   switch (ret) {
1495     case HeapOverflow:
1496 # if defined(GRAN)
1497       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1498       globalGranStats.tot_heapover++;
1499 # elif defined(PAR)
1500       globalParStats.tot_heapover++;
1501 # endif
1502       break;
1503
1504      case StackOverflow:
1505 # if defined(GRAN)
1506       IF_DEBUG(gran, 
1507                DumpGranEvent(GR_DESCHEDULE, t));
1508       globalGranStats.tot_stackover++;
1509 # elif defined(PAR)
1510       // IF_DEBUG(par, 
1511       // DumpGranEvent(GR_DESCHEDULE, t);
1512       globalParStats.tot_stackover++;
1513 # endif
1514       break;
1515
1516     case ThreadYielding:
1517 # if defined(GRAN)
1518       IF_DEBUG(gran, 
1519                DumpGranEvent(GR_DESCHEDULE, t));
1520       globalGranStats.tot_yields++;
1521 # elif defined(PAR)
1522       // IF_DEBUG(par, 
1523       // DumpGranEvent(GR_DESCHEDULE, t);
1524       globalParStats.tot_yields++;
1525 # endif
1526       break; 
1527
1528     case ThreadBlocked:
1529 # if defined(GRAN)
1530         debugTrace(DEBUG_sched, 
1531                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1532                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1533                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1534                if (t->block_info.closure!=(StgClosure*)NULL)
1535                  print_bq(t->block_info.closure);
1536                debugBelch("\n"));
1537
1538       // ??? needed; should emit block before
1539       IF_DEBUG(gran, 
1540                DumpGranEvent(GR_DESCHEDULE, t)); 
1541       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1542       /*
1543         ngoq Dogh!
1544       ASSERT(procStatus[CurrentProc]==Busy || 
1545               ((procStatus[CurrentProc]==Fetching) && 
1546               (t->block_info.closure!=(StgClosure*)NULL)));
1547       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1548           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1549             procStatus[CurrentProc]==Fetching)) 
1550         procStatus[CurrentProc] = Idle;
1551       */
1552 # elif defined(PAR)
1553 //++PAR++  blockThread() writes the event (change?)
1554 # endif
1555     break;
1556
1557   case ThreadFinished:
1558     break;
1559
1560   default:
1561     barf("parGlobalStats: unknown return code");
1562     break;
1563     }
1564 #endif
1565 }
1566
1567 /* -----------------------------------------------------------------------------
1568  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1569  * -------------------------------------------------------------------------- */
1570
1571 static rtsBool
1572 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1573 {
1574     // did the task ask for a large block?
1575     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1576         // if so, get one and push it on the front of the nursery.
1577         bdescr *bd;
1578         lnat blocks;
1579         
1580         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1581         
1582         debugTrace(DEBUG_sched,
1583                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1584                    (long)t->id, whatNext_strs[t->what_next], blocks);
1585     
1586         // don't do this if the nursery is (nearly) full, we'll GC first.
1587         if (cap->r.rCurrentNursery->link != NULL ||
1588             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1589                                                // if the nursery has only one block.
1590             
1591             ACQUIRE_SM_LOCK
1592             bd = allocGroup( blocks );
1593             RELEASE_SM_LOCK
1594             cap->r.rNursery->n_blocks += blocks;
1595             
1596             // link the new group into the list
1597             bd->link = cap->r.rCurrentNursery;
1598             bd->u.back = cap->r.rCurrentNursery->u.back;
1599             if (cap->r.rCurrentNursery->u.back != NULL) {
1600                 cap->r.rCurrentNursery->u.back->link = bd;
1601             } else {
1602 #if !defined(THREADED_RTS)
1603                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1604                        g0s0 == cap->r.rNursery);
1605 #endif
1606                 cap->r.rNursery->blocks = bd;
1607             }             
1608             cap->r.rCurrentNursery->u.back = bd;
1609             
1610             // initialise it as a nursery block.  We initialise the
1611             // step, gen_no, and flags field of *every* sub-block in
1612             // this large block, because this is easier than making
1613             // sure that we always find the block head of a large
1614             // block whenever we call Bdescr() (eg. evacuate() and
1615             // isAlive() in the GC would both have to do this, at
1616             // least).
1617             { 
1618                 bdescr *x;
1619                 for (x = bd; x < bd + blocks; x++) {
1620                     x->step = cap->r.rNursery;
1621                     x->gen_no = 0;
1622                     x->flags = 0;
1623                 }
1624             }
1625             
1626             // This assert can be a killer if the app is doing lots
1627             // of large block allocations.
1628             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1629             
1630             // now update the nursery to point to the new block
1631             cap->r.rCurrentNursery = bd;
1632             
1633             // we might be unlucky and have another thread get on the
1634             // run queue before us and steal the large block, but in that
1635             // case the thread will just end up requesting another large
1636             // block.
1637             pushOnRunQueue(cap,t);
1638             return rtsFalse;  /* not actually GC'ing */
1639         }
1640     }
1641     
1642     debugTrace(DEBUG_sched,
1643                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1644                (long)t->id, whatNext_strs[t->what_next]);
1645
1646 #if defined(GRAN)
1647     ASSERT(!is_on_queue(t,CurrentProc));
1648 #elif defined(PARALLEL_HASKELL)
1649     /* Currently we emit a DESCHEDULE event before GC in GUM.
1650        ToDo: either add separate event to distinguish SYSTEM time from rest
1651        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1652     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1653         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1654                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1655         emitSchedule = rtsTrue;
1656     }
1657 #endif
1658       
1659     if (context_switch) {
1660         // Sometimes we miss a context switch, e.g. when calling
1661         // primitives in a tight loop, MAYBE_GC() doesn't check the
1662         // context switch flag, and we end up waiting for a GC.
1663         // See #1984, and concurrent/should_run/1984
1664         context_switch = 0;
1665         addToRunQueue(cap,t);
1666     } else {
1667         pushOnRunQueue(cap,t);
1668     }
1669     return rtsTrue;
1670     /* actual GC is done at the end of the while loop in schedule() */
1671 }
1672
1673 /* -----------------------------------------------------------------------------
1674  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1675  * -------------------------------------------------------------------------- */
1676
1677 static void
1678 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1679 {
1680     debugTrace (DEBUG_sched,
1681                 "--<< thread %ld (%s) stopped, StackOverflow", 
1682                 (long)t->id, whatNext_strs[t->what_next]);
1683
1684     /* just adjust the stack for this thread, then pop it back
1685      * on the run queue.
1686      */
1687     { 
1688         /* enlarge the stack */
1689         StgTSO *new_t = threadStackOverflow(cap, t);
1690         
1691         /* The TSO attached to this Task may have moved, so update the
1692          * pointer to it.
1693          */
1694         if (task->tso == t) {
1695             task->tso = new_t;
1696         }
1697         pushOnRunQueue(cap,new_t);
1698     }
1699 }
1700
1701 /* -----------------------------------------------------------------------------
1702  * Handle a thread that returned to the scheduler with ThreadYielding
1703  * -------------------------------------------------------------------------- */
1704
1705 static rtsBool
1706 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1707 {
1708     // Reset the context switch flag.  We don't do this just before
1709     // running the thread, because that would mean we would lose ticks
1710     // during GC, which can lead to unfair scheduling (a thread hogs
1711     // the CPU because the tick always arrives during GC).  This way
1712     // penalises threads that do a lot of allocation, but that seems
1713     // better than the alternative.
1714     context_switch = 0;
1715     
1716     /* put the thread back on the run queue.  Then, if we're ready to
1717      * GC, check whether this is the last task to stop.  If so, wake
1718      * up the GC thread.  getThread will block during a GC until the
1719      * GC is finished.
1720      */
1721 #ifdef DEBUG
1722     if (t->what_next != prev_what_next) {
1723         debugTrace(DEBUG_sched,
1724                    "--<< thread %ld (%s) stopped to switch evaluators", 
1725                    (long)t->id, whatNext_strs[t->what_next]);
1726     } else {
1727         debugTrace(DEBUG_sched,
1728                    "--<< thread %ld (%s) stopped, yielding",
1729                    (long)t->id, whatNext_strs[t->what_next]);
1730     }
1731 #endif
1732     
1733     IF_DEBUG(sanity,
1734              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1735              checkTSO(t));
1736     ASSERT(t->_link == END_TSO_QUEUE);
1737     
1738     // Shortcut if we're just switching evaluators: don't bother
1739     // doing stack squeezing (which can be expensive), just run the
1740     // thread.
1741     if (t->what_next != prev_what_next) {
1742         return rtsTrue;
1743     }
1744     
1745 #if defined(GRAN)
1746     ASSERT(!is_on_queue(t,CurrentProc));
1747       
1748     IF_DEBUG(sanity,
1749              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1750              checkThreadQsSanity(rtsTrue));
1751
1752 #endif
1753
1754     addToRunQueue(cap,t);
1755
1756 #if defined(GRAN)
1757     /* add a ContinueThread event to actually process the thread */
1758     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1759               ContinueThread,
1760               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1761     IF_GRAN_DEBUG(bq, 
1762                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1763                   G_EVENTQ(0);
1764                   G_CURR_THREADQ(0));
1765 #endif
1766     return rtsFalse;
1767 }
1768
1769 /* -----------------------------------------------------------------------------
1770  * Handle a thread that returned to the scheduler with ThreadBlocked
1771  * -------------------------------------------------------------------------- */
1772
1773 static void
1774 scheduleHandleThreadBlocked( StgTSO *t
1775 #if !defined(GRAN) && !defined(DEBUG)
1776     STG_UNUSED
1777 #endif
1778     )
1779 {
1780 #if defined(GRAN)
1781     IF_DEBUG(scheduler,
1782              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1783                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1784              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1785     
1786     // ??? needed; should emit block before
1787     IF_DEBUG(gran, 
1788              DumpGranEvent(GR_DESCHEDULE, t)); 
1789     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1790     /*
1791       ngoq Dogh!
1792       ASSERT(procStatus[CurrentProc]==Busy || 
1793       ((procStatus[CurrentProc]==Fetching) && 
1794       (t->block_info.closure!=(StgClosure*)NULL)));
1795       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1796       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1797       procStatus[CurrentProc]==Fetching)) 
1798       procStatus[CurrentProc] = Idle;
1799     */
1800 #elif defined(PAR)
1801     IF_DEBUG(scheduler,
1802              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1803                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1804     IF_PAR_DEBUG(bq,
1805                  
1806                  if (t->block_info.closure!=(StgClosure*)NULL) 
1807                  print_bq(t->block_info.closure));
1808     
1809     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1810     blockThread(t);
1811     
1812     /* whatever we schedule next, we must log that schedule */
1813     emitSchedule = rtsTrue;
1814     
1815 #else /* !GRAN */
1816
1817       // We don't need to do anything.  The thread is blocked, and it
1818       // has tidied up its stack and placed itself on whatever queue
1819       // it needs to be on.
1820
1821     // ASSERT(t->why_blocked != NotBlocked);
1822     // Not true: for example,
1823     //    - in THREADED_RTS, the thread may already have been woken
1824     //      up by another Capability.  This actually happens: try
1825     //      conc023 +RTS -N2.
1826     //    - the thread may have woken itself up already, because
1827     //      threadPaused() might have raised a blocked throwTo
1828     //      exception, see maybePerformBlockedException().
1829
1830 #ifdef DEBUG
1831     if (traceClass(DEBUG_sched)) {
1832         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1833                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1834         printThreadBlockage(t);
1835         debugTraceEnd();
1836     }
1837 #endif
1838     
1839     /* Only for dumping event to log file 
1840        ToDo: do I need this in GranSim, too?
1841        blockThread(t);
1842     */
1843 #endif
1844 }
1845
1846 /* -----------------------------------------------------------------------------
1847  * Handle a thread that returned to the scheduler with ThreadFinished
1848  * -------------------------------------------------------------------------- */
1849
1850 static rtsBool
1851 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1852 {
1853     /* Need to check whether this was a main thread, and if so,
1854      * return with the return value.
1855      *
1856      * We also end up here if the thread kills itself with an
1857      * uncaught exception, see Exception.cmm.
1858      */
1859     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1860                (unsigned long)t->id, whatNext_strs[t->what_next]);
1861
1862 #if defined(GRAN)
1863       endThread(t, CurrentProc); // clean-up the thread
1864 #elif defined(PARALLEL_HASKELL)
1865       /* For now all are advisory -- HWL */
1866       //if(t->priority==AdvisoryPriority) ??
1867       advisory_thread_count--; // JB: Caution with this counter, buggy!
1868       
1869 # if defined(DIST)
1870       if(t->dist.priority==RevalPriority)
1871         FinishReval(t);
1872 # endif
1873     
1874 # if defined(EDENOLD)
1875       // the thread could still have an outport... (BUG)
1876       if (t->eden.outport != -1) {
1877       // delete the outport for the tso which has finished...
1878         IF_PAR_DEBUG(eden_ports,
1879                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1880                               t->eden.outport, t->id));
1881         deleteOPT(t);
1882       }
1883       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1884       if (t->eden.epid != -1) {
1885         IF_PAR_DEBUG(eden_ports,
1886                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1887                            t->id, t->eden.epid));
1888         removeTSOfromProcess(t);
1889       }
1890 # endif 
1891
1892 # if defined(PAR)
1893       if (RtsFlags.ParFlags.ParStats.Full &&
1894           !RtsFlags.ParFlags.ParStats.Suppressed) 
1895         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1896
1897       //  t->par only contains statistics: left out for now...
1898       IF_PAR_DEBUG(fish,
1899                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1900                               t->id,t,t->par.sparkname));
1901 # endif
1902 #endif // PARALLEL_HASKELL
1903
1904       //
1905       // Check whether the thread that just completed was a bound
1906       // thread, and if so return with the result.  
1907       //
1908       // There is an assumption here that all thread completion goes
1909       // through this point; we need to make sure that if a thread
1910       // ends up in the ThreadKilled state, that it stays on the run
1911       // queue so it can be dealt with here.
1912       //
1913
1914       if (t->bound) {
1915
1916           if (t->bound != task) {
1917 #if !defined(THREADED_RTS)
1918               // Must be a bound thread that is not the topmost one.  Leave
1919               // it on the run queue until the stack has unwound to the
1920               // point where we can deal with this.  Leaving it on the run
1921               // queue also ensures that the garbage collector knows about
1922               // this thread and its return value (it gets dropped from the
1923               // step->threads list so there's no other way to find it).
1924               appendToRunQueue(cap,t);
1925               return rtsFalse;
1926 #else
1927               // this cannot happen in the threaded RTS, because a
1928               // bound thread can only be run by the appropriate Task.
1929               barf("finished bound thread that isn't mine");
1930 #endif
1931           }
1932
1933           ASSERT(task->tso == t);
1934
1935           if (t->what_next == ThreadComplete) {
1936               if (task->ret) {
1937                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1938                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1939               }
1940               task->stat = Success;
1941           } else {
1942               if (task->ret) {
1943                   *(task->ret) = NULL;
1944               }
1945               if (sched_state >= SCHED_INTERRUPTING) {
1946                   task->stat = Interrupted;
1947               } else {
1948                   task->stat = Killed;
1949               }
1950           }
1951 #ifdef DEBUG
1952           removeThreadLabel((StgWord)task->tso->id);
1953 #endif
1954           return rtsTrue; // tells schedule() to return
1955       }
1956
1957       return rtsFalse;
1958 }
1959
1960 /* -----------------------------------------------------------------------------
1961  * Perform a heap census
1962  * -------------------------------------------------------------------------- */
1963
1964 static rtsBool
1965 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1966 {
1967     // When we have +RTS -i0 and we're heap profiling, do a census at
1968     // every GC.  This lets us get repeatable runs for debugging.
1969     if (performHeapProfile ||
1970         (RtsFlags.ProfFlags.profileInterval==0 &&
1971          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1972         return rtsTrue;
1973     } else {
1974         return rtsFalse;
1975     }
1976 }
1977
1978 /* -----------------------------------------------------------------------------
1979  * Perform a garbage collection if necessary
1980  * -------------------------------------------------------------------------- */
1981
1982 static Capability *
1983 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1984 {
1985     StgTSO *t;
1986     rtsBool heap_census;
1987 #ifdef THREADED_RTS
1988     static volatile StgWord waiting_for_gc;
1989     rtsBool was_waiting;
1990     nat i;
1991 #endif
1992
1993 #ifdef THREADED_RTS
1994     // In order to GC, there must be no threads running Haskell code.
1995     // Therefore, the GC thread needs to hold *all* the capabilities,
1996     // and release them after the GC has completed.  
1997     //
1998     // This seems to be the simplest way: previous attempts involved
1999     // making all the threads with capabilities give up their
2000     // capabilities and sleep except for the *last* one, which
2001     // actually did the GC.  But it's quite hard to arrange for all
2002     // the other tasks to sleep and stay asleep.
2003     //
2004         
2005     was_waiting = cas(&waiting_for_gc, 0, 1);
2006     if (was_waiting) {
2007         do {
2008             debugTrace(DEBUG_sched, "someone else is trying to GC...");
2009             if (cap) yieldCapability(&cap,task);
2010         } while (waiting_for_gc);
2011         return cap;  // NOTE: task->cap might have changed here
2012     }
2013
2014     for (i=0; i < n_capabilities; i++) {
2015         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
2016         if (cap != &capabilities[i]) {
2017             Capability *pcap = &capabilities[i];
2018             // we better hope this task doesn't get migrated to
2019             // another Capability while we're waiting for this one.
2020             // It won't, because load balancing happens while we have
2021             // all the Capabilities, but even so it's a slightly
2022             // unsavoury invariant.
2023             task->cap = pcap;
2024             context_switch = 1;
2025             waitForReturnCapability(&pcap, task);
2026             if (pcap != &capabilities[i]) {
2027                 barf("scheduleDoGC: got the wrong capability");
2028             }
2029         }
2030     }
2031
2032     waiting_for_gc = rtsFalse;
2033 #endif
2034
2035     // so this happens periodically:
2036     if (cap) scheduleCheckBlackHoles(cap);
2037     
2038     IF_DEBUG(scheduler, printAllThreads());
2039
2040     /*
2041      * We now have all the capabilities; if we're in an interrupting
2042      * state, then we should take the opportunity to delete all the
2043      * threads in the system.
2044      */
2045     if (sched_state >= SCHED_INTERRUPTING) {
2046         deleteAllThreads(&capabilities[0]);
2047         sched_state = SCHED_SHUTTING_DOWN;
2048     }
2049     
2050     heap_census = scheduleNeedHeapProfile(rtsTrue);
2051
2052     /* everybody back, start the GC.
2053      * Could do it in this thread, or signal a condition var
2054      * to do it in another thread.  Either way, we need to
2055      * broadcast on gc_pending_cond afterward.
2056      */
2057 #if defined(THREADED_RTS)
2058     debugTrace(DEBUG_sched, "doing GC");
2059 #endif
2060     GarbageCollect(force_major || heap_census);
2061     
2062     if (heap_census) {
2063         debugTrace(DEBUG_sched, "performing heap census");
2064         heapCensus();
2065         performHeapProfile = rtsFalse;
2066     }
2067
2068 #if defined(THREADED_RTS)
2069     // release our stash of capabilities.
2070     for (i = 0; i < n_capabilities; i++) {
2071         if (cap != &capabilities[i]) {
2072             task->cap = &capabilities[i];
2073             releaseCapability(&capabilities[i]);
2074         }
2075     }
2076     if (cap) {
2077         task->cap = cap;
2078     } else {
2079         task->cap = NULL;
2080     }
2081 #endif
2082
2083 #if defined(GRAN)
2084     /* add a ContinueThread event to continue execution of current thread */
2085     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2086               ContinueThread,
2087               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2088     IF_GRAN_DEBUG(bq, 
2089                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2090                   G_EVENTQ(0);
2091                   G_CURR_THREADQ(0));
2092 #endif /* GRAN */
2093
2094     return cap;
2095 }
2096
2097 /* ---------------------------------------------------------------------------
2098  * Singleton fork(). Do not copy any running threads.
2099  * ------------------------------------------------------------------------- */
2100
2101 pid_t
2102 forkProcess(HsStablePtr *entry
2103 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2104             STG_UNUSED
2105 #endif
2106            )
2107 {
2108 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2109     Task *task;
2110     pid_t pid;
2111     StgTSO* t,*next;
2112     Capability *cap;
2113     nat s;
2114     
2115 #if defined(THREADED_RTS)
2116     if (RtsFlags.ParFlags.nNodes > 1) {
2117         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2118         stg_exit(EXIT_FAILURE);
2119     }
2120 #endif
2121
2122     debugTrace(DEBUG_sched, "forking!");
2123     
2124     // ToDo: for SMP, we should probably acquire *all* the capabilities
2125     cap = rts_lock();
2126     
2127     // no funny business: hold locks while we fork, otherwise if some
2128     // other thread is holding a lock when the fork happens, the data
2129     // structure protected by the lock will forever be in an
2130     // inconsistent state in the child.  See also #1391.
2131     ACQUIRE_LOCK(&sched_mutex);
2132     ACQUIRE_LOCK(&cap->lock);
2133     ACQUIRE_LOCK(&cap->running_task->lock);
2134
2135     pid = fork();
2136     
2137     if (pid) { // parent
2138         
2139         RELEASE_LOCK(&sched_mutex);
2140         RELEASE_LOCK(&cap->lock);
2141         RELEASE_LOCK(&cap->running_task->lock);
2142
2143         // just return the pid
2144         rts_unlock(cap);
2145         return pid;
2146         
2147     } else { // child
2148         
2149 #if defined(THREADED_RTS)
2150         initMutex(&sched_mutex);
2151         initMutex(&cap->lock);
2152         initMutex(&cap->running_task->lock);
2153 #endif
2154
2155         // Now, all OS threads except the thread that forked are
2156         // stopped.  We need to stop all Haskell threads, including
2157         // those involved in foreign calls.  Also we need to delete
2158         // all Tasks, because they correspond to OS threads that are
2159         // now gone.
2160
2161         for (s = 0; s < total_steps; s++) {
2162           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2163             if (t->what_next == ThreadRelocated) {
2164                 next = t->_link;
2165             } else {
2166                 next = t->global_link;
2167                 // don't allow threads to catch the ThreadKilled
2168                 // exception, but we do want to raiseAsync() because these
2169                 // threads may be evaluating thunks that we need later.
2170                 deleteThread_(cap,t);
2171             }
2172           }
2173         }
2174         
2175         // Empty the run queue.  It seems tempting to let all the
2176         // killed threads stay on the run queue as zombies to be
2177         // cleaned up later, but some of them correspond to bound
2178         // threads for which the corresponding Task does not exist.
2179         cap->run_queue_hd = END_TSO_QUEUE;
2180         cap->run_queue_tl = END_TSO_QUEUE;
2181
2182         // Any suspended C-calling Tasks are no more, their OS threads
2183         // don't exist now:
2184         cap->suspended_ccalling_tasks = NULL;
2185
2186         // Empty the threads lists.  Otherwise, the garbage
2187         // collector may attempt to resurrect some of these threads.
2188         for (s = 0; s < total_steps; s++) {
2189             all_steps[s].threads = END_TSO_QUEUE;
2190         }
2191
2192         // Wipe the task list, except the current Task.
2193         ACQUIRE_LOCK(&sched_mutex);
2194         for (task = all_tasks; task != NULL; task=task->all_link) {
2195             if (task != cap->running_task) {
2196 #if defined(THREADED_RTS)
2197                 initMutex(&task->lock); // see #1391
2198 #endif
2199                 discardTask(task);
2200             }
2201         }
2202         RELEASE_LOCK(&sched_mutex);
2203
2204 #if defined(THREADED_RTS)
2205         // Wipe our spare workers list, they no longer exist.  New
2206         // workers will be created if necessary.
2207         cap->spare_workers = NULL;
2208         cap->returning_tasks_hd = NULL;
2209         cap->returning_tasks_tl = NULL;
2210 #endif
2211
2212         // On Unix, all timers are reset in the child, so we need to start
2213         // the timer again.
2214         initTimer();
2215         startTimer();
2216
2217         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2218         rts_checkSchedStatus("forkProcess",cap);
2219         
2220         rts_unlock(cap);
2221         hs_exit();                      // clean up and exit
2222         stg_exit(EXIT_SUCCESS);
2223     }
2224 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2225     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2226     return -1;
2227 #endif
2228 }
2229
2230 /* ---------------------------------------------------------------------------
2231  * Delete all the threads in the system
2232  * ------------------------------------------------------------------------- */
2233    
2234 static void
2235 deleteAllThreads ( Capability *cap )
2236 {
2237     // NOTE: only safe to call if we own all capabilities.
2238
2239     StgTSO* t, *next;
2240     nat s;
2241
2242     debugTrace(DEBUG_sched,"deleting all threads");
2243     for (s = 0; s < total_steps; s++) {
2244       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2245         if (t->what_next == ThreadRelocated) {
2246             next = t->_link;
2247         } else {
2248             next = t->global_link;
2249             deleteThread(cap,t);
2250         }
2251       }
2252     }      
2253
2254     // The run queue now contains a bunch of ThreadKilled threads.  We
2255     // must not throw these away: the main thread(s) will be in there
2256     // somewhere, and the main scheduler loop has to deal with it.
2257     // Also, the run queue is the only thing keeping these threads from
2258     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2259
2260 #if !defined(THREADED_RTS)
2261     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2262     ASSERT(sleeping_queue == END_TSO_QUEUE);
2263 #endif
2264 }
2265
2266 /* -----------------------------------------------------------------------------
2267    Managing the suspended_ccalling_tasks list.
2268    Locks required: sched_mutex
2269    -------------------------------------------------------------------------- */
2270
2271 STATIC_INLINE void
2272 suspendTask (Capability *cap, Task *task)
2273 {
2274     ASSERT(task->next == NULL && task->prev == NULL);
2275     task->next = cap->suspended_ccalling_tasks;
2276     task->prev = NULL;
2277     if (cap->suspended_ccalling_tasks) {
2278         cap->suspended_ccalling_tasks->prev = task;
2279     }
2280     cap->suspended_ccalling_tasks = task;
2281 }
2282
2283 STATIC_INLINE void
2284 recoverSuspendedTask (Capability *cap, Task *task)
2285 {
2286     if (task->prev) {
2287         task->prev->next = task->next;
2288     } else {
2289         ASSERT(cap->suspended_ccalling_tasks == task);
2290         cap->suspended_ccalling_tasks = task->next;
2291     }
2292     if (task->next) {
2293         task->next->prev = task->prev;
2294     }
2295     task->next = task->prev = NULL;
2296 }
2297
2298 /* ---------------------------------------------------------------------------
2299  * Suspending & resuming Haskell threads.
2300  * 
2301  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2302  * its capability before calling the C function.  This allows another
2303  * task to pick up the capability and carry on running Haskell
2304  * threads.  It also means that if the C call blocks, it won't lock
2305  * the whole system.
2306  *
2307  * The Haskell thread making the C call is put to sleep for the
2308  * duration of the call, on the susepended_ccalling_threads queue.  We
2309  * give out a token to the task, which it can use to resume the thread
2310  * on return from the C function.
2311  * ------------------------------------------------------------------------- */
2312    
2313 void *
2314 suspendThread (StgRegTable *reg)
2315 {
2316   Capability *cap;
2317   int saved_errno;
2318   StgTSO *tso;
2319   Task *task;
2320 #if mingw32_HOST_OS
2321   StgWord32 saved_winerror;
2322 #endif
2323
2324   saved_errno = errno;
2325 #if mingw32_HOST_OS
2326   saved_winerror = GetLastError();
2327 #endif
2328
2329   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2330    */
2331   cap = regTableToCapability(reg);
2332
2333   task = cap->running_task;
2334   tso = cap->r.rCurrentTSO;
2335
2336   debugTrace(DEBUG_sched, 
2337              "thread %lu did a safe foreign call", 
2338              (unsigned long)cap->r.rCurrentTSO->id);
2339
2340   // XXX this might not be necessary --SDM
2341   tso->what_next = ThreadRunGHC;
2342
2343   threadPaused(cap,tso);
2344
2345   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2346       tso->why_blocked = BlockedOnCCall;
2347       tso->flags |= TSO_BLOCKEX;
2348       tso->flags &= ~TSO_INTERRUPTIBLE;
2349   } else {
2350       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2351   }
2352
2353   // Hand back capability
2354   task->suspended_tso = tso;
2355
2356   ACQUIRE_LOCK(&cap->lock);
2357
2358   suspendTask(cap,task);
2359   cap->in_haskell = rtsFalse;
2360   releaseCapability_(cap);
2361   
2362   RELEASE_LOCK(&cap->lock);
2363
2364 #if defined(THREADED_RTS)
2365   /* Preparing to leave the RTS, so ensure there's a native thread/task
2366      waiting to take over.
2367   */
2368   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2369 #endif
2370
2371   errno = saved_errno;
2372 #if mingw32_HOST_OS
2373   SetLastError(saved_winerror);
2374 #endif
2375   return task;
2376 }
2377
2378 StgRegTable *
2379 resumeThread (void *task_)
2380 {
2381     StgTSO *tso;
2382     Capability *cap;
2383     Task *task = task_;
2384     int saved_errno;
2385 #if mingw32_HOST_OS
2386     StgWord32 saved_winerror;
2387 #endif
2388
2389     saved_errno = errno;
2390 #if mingw32_HOST_OS
2391     saved_winerror = GetLastError();
2392 #endif
2393
2394     cap = task->cap;
2395     // Wait for permission to re-enter the RTS with the result.
2396     waitForReturnCapability(&cap,task);
2397     // we might be on a different capability now... but if so, our
2398     // entry on the suspended_ccalling_tasks list will also have been
2399     // migrated.
2400
2401     // Remove the thread from the suspended list
2402     recoverSuspendedTask(cap,task);
2403
2404     tso = task->suspended_tso;
2405     task->suspended_tso = NULL;
2406     tso->_link = END_TSO_QUEUE; // no write barrier reqd
2407     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2408     
2409     if (tso->why_blocked == BlockedOnCCall) {
2410         awakenBlockedExceptionQueue(cap,tso);
2411         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2412     }
2413     
2414     /* Reset blocking status */
2415     tso->why_blocked  = NotBlocked;
2416     
2417     cap->r.rCurrentTSO = tso;
2418     cap->in_haskell = rtsTrue;
2419     errno = saved_errno;
2420 #if mingw32_HOST_OS
2421     SetLastError(saved_winerror);
2422 #endif
2423
2424     /* We might have GC'd, mark the TSO dirty again */
2425     dirty_TSO(cap,tso);
2426
2427     IF_DEBUG(sanity, checkTSO(tso));
2428
2429     return &cap->r;
2430 }
2431
2432 /* ---------------------------------------------------------------------------
2433  * scheduleThread()
2434  *
2435  * scheduleThread puts a thread on the end  of the runnable queue.
2436  * This will usually be done immediately after a thread is created.
2437  * The caller of scheduleThread must create the thread using e.g.
2438  * createThread and push an appropriate closure
2439  * on this thread's stack before the scheduler is invoked.
2440  * ------------------------------------------------------------------------ */
2441
2442 void
2443 scheduleThread(Capability *cap, StgTSO *tso)
2444 {
2445     // The thread goes at the *end* of the run-queue, to avoid possible
2446     // starvation of any threads already on the queue.
2447     appendToRunQueue(cap,tso);
2448 }
2449
2450 void
2451 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2452 {
2453 #if defined(THREADED_RTS)
2454     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2455                               // move this thread from now on.
2456     cpu %= RtsFlags.ParFlags.nNodes;
2457     if (cpu == cap->no) {
2458         appendToRunQueue(cap,tso);
2459     } else {
2460         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2461     }
2462 #else
2463     appendToRunQueue(cap,tso);
2464 #endif
2465 }
2466
2467 Capability *
2468 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2469 {
2470     Task *task;
2471
2472     // We already created/initialised the Task
2473     task = cap->running_task;
2474
2475     // This TSO is now a bound thread; make the Task and TSO
2476     // point to each other.
2477     tso->bound = task;
2478     tso->cap = cap;
2479
2480     task->tso = tso;
2481     task->ret = ret;
2482     task->stat = NoStatus;
2483
2484     appendToRunQueue(cap,tso);
2485
2486     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2487
2488 #if defined(GRAN)
2489     /* GranSim specific init */
2490     CurrentTSO = m->tso;                // the TSO to run
2491     procStatus[MainProc] = Busy;        // status of main PE
2492     CurrentProc = MainProc;             // PE to run it on
2493 #endif
2494
2495     cap = schedule(cap,task);
2496
2497     ASSERT(task->stat != NoStatus);
2498     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2499
2500     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2501     return cap;
2502 }
2503
2504 /* ----------------------------------------------------------------------------
2505  * Starting Tasks
2506  * ------------------------------------------------------------------------- */
2507
2508 #if defined(THREADED_RTS)
2509 void
2510 workerStart(Task *task)
2511 {
2512     Capability *cap;
2513
2514     // See startWorkerTask().
2515     ACQUIRE_LOCK(&task->lock);
2516     cap = task->cap;
2517     RELEASE_LOCK(&task->lock);
2518
2519     // set the thread-local pointer to the Task:
2520     taskEnter(task);
2521
2522     // schedule() runs without a lock.
2523     cap = schedule(cap,task);
2524
2525     // On exit from schedule(), we have a Capability.
2526     releaseCapability(cap);
2527     workerTaskStop(task);
2528 }
2529 #endif
2530
2531 /* ---------------------------------------------------------------------------
2532  * initScheduler()
2533  *
2534  * Initialise the scheduler.  This resets all the queues - if the
2535  * queues contained any threads, they'll be garbage collected at the
2536  * next pass.
2537  *
2538  * ------------------------------------------------------------------------ */
2539
2540 void 
2541 initScheduler(void)
2542 {
2543 #if defined(GRAN)
2544   nat i;
2545   for (i=0; i<=MAX_PROC; i++) {
2546     run_queue_hds[i]      = END_TSO_QUEUE;
2547     run_queue_tls[i]      = END_TSO_QUEUE;
2548     blocked_queue_hds[i]  = END_TSO_QUEUE;
2549     blocked_queue_tls[i]  = END_TSO_QUEUE;
2550     ccalling_threadss[i]  = END_TSO_QUEUE;
2551     blackhole_queue[i]    = END_TSO_QUEUE;
2552     sleeping_queue        = END_TSO_QUEUE;
2553   }
2554 #elif !defined(THREADED_RTS)
2555   blocked_queue_hd  = END_TSO_QUEUE;
2556   blocked_queue_tl  = END_TSO_QUEUE;
2557   sleeping_queue    = END_TSO_QUEUE;
2558 #endif
2559
2560   blackhole_queue   = END_TSO_QUEUE;
2561
2562   context_switch = 0;
2563   sched_state    = SCHED_RUNNING;
2564   recent_activity = ACTIVITY_YES;
2565
2566 #if defined(THREADED_RTS)
2567   /* Initialise the mutex and condition variables used by
2568    * the scheduler. */
2569   initMutex(&sched_mutex);
2570 #endif
2571   
2572   ACQUIRE_LOCK(&sched_mutex);
2573
2574   /* A capability holds the state a native thread needs in
2575    * order to execute STG code. At least one capability is
2576    * floating around (only THREADED_RTS builds have more than one).
2577    */
2578   initCapabilities();
2579
2580   initTaskManager();
2581
2582 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2583   initSparkPools();
2584 #endif
2585
2586 #if defined(THREADED_RTS)
2587   /*
2588    * Eagerly start one worker to run each Capability, except for
2589    * Capability 0.  The idea is that we're probably going to start a
2590    * bound thread on Capability 0 pretty soon, so we don't want a
2591    * worker task hogging it.
2592    */
2593   { 
2594       nat i;
2595       Capability *cap;
2596       for (i = 1; i < n_capabilities; i++) {
2597           cap = &capabilities[i];
2598           ACQUIRE_LOCK(&cap->lock);
2599           startWorkerTask(cap, workerStart);
2600           RELEASE_LOCK(&cap->lock);
2601       }
2602   }
2603 #endif
2604
2605   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2606
2607   RELEASE_LOCK(&sched_mutex);
2608 }
2609
2610 void
2611 exitScheduler(
2612     rtsBool wait_foreign
2613 #if !defined(THREADED_RTS)
2614                          __attribute__((unused))
2615 #endif
2616 )
2617                /* see Capability.c, shutdownCapability() */
2618 {
2619     Task *task = NULL;
2620
2621 #if defined(THREADED_RTS)
2622     ACQUIRE_LOCK(&sched_mutex);
2623     task = newBoundTask();
2624     RELEASE_LOCK(&sched_mutex);
2625 #endif
2626
2627     // If we haven't killed all the threads yet, do it now.
2628     if (sched_state < SCHED_SHUTTING_DOWN) {
2629         sched_state = SCHED_INTERRUPTING;
2630         scheduleDoGC(NULL,task,rtsFalse);    
2631     }
2632     sched_state = SCHED_SHUTTING_DOWN;
2633
2634 #if defined(THREADED_RTS)
2635     { 
2636         nat i;
2637         
2638         for (i = 0; i < n_capabilities; i++) {
2639             shutdownCapability(&capabilities[i], task, wait_foreign);
2640         }
2641         boundTaskExiting(task);
2642         stopTaskManager();
2643     }
2644 #else
2645     freeCapability(&MainCapability);
2646 #endif
2647 }
2648
2649 void
2650 freeScheduler( void )
2651 {
2652     freeTaskManager();
2653     if (n_capabilities != 1) {
2654         stgFree(capabilities);
2655     }
2656 #if defined(THREADED_RTS)
2657     closeMutex(&sched_mutex);
2658 #endif
2659 }
2660
2661 /* -----------------------------------------------------------------------------
2662    performGC
2663
2664    This is the interface to the garbage collector from Haskell land.
2665    We provide this so that external C code can allocate and garbage
2666    collect when called from Haskell via _ccall_GC.
2667    -------------------------------------------------------------------------- */
2668
2669 static void
2670 performGC_(rtsBool force_major)
2671 {
2672     Task *task;
2673     // We must grab a new Task here, because the existing Task may be
2674     // associated with a particular Capability, and chained onto the 
2675     // suspended_ccalling_tasks queue.
2676     ACQUIRE_LOCK(&sched_mutex);
2677     task = newBoundTask();
2678     RELEASE_LOCK(&sched_mutex);
2679     scheduleDoGC(NULL,task,force_major);
2680     boundTaskExiting(task);
2681 }
2682
2683 void
2684 performGC(void)
2685 {
2686     performGC_(rtsFalse);
2687 }
2688
2689 void
2690 performMajorGC(void)
2691 {
2692     performGC_(rtsTrue);
2693 }
2694
2695 /* -----------------------------------------------------------------------------
2696    Stack overflow
2697
2698    If the thread has reached its maximum stack size, then raise the
2699    StackOverflow exception in the offending thread.  Otherwise
2700    relocate the TSO into a larger chunk of memory and adjust its stack
2701    size appropriately.
2702    -------------------------------------------------------------------------- */
2703
2704 static StgTSO *
2705 threadStackOverflow(Capability *cap, StgTSO *tso)
2706 {
2707   nat new_stack_size, stack_words;
2708   lnat new_tso_size;
2709   StgPtr new_sp;
2710   StgTSO *dest;
2711
2712   IF_DEBUG(sanity,checkTSO(tso));
2713
2714   // don't allow throwTo() to modify the blocked_exceptions queue
2715   // while we are moving the TSO:
2716   lockClosure((StgClosure *)tso);
2717
2718   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2719       // NB. never raise a StackOverflow exception if the thread is
2720       // inside Control.Exceptino.block.  It is impractical to protect
2721       // against stack overflow exceptions, since virtually anything
2722       // can raise one (even 'catch'), so this is the only sensible
2723       // thing to do here.  See bug #767.
2724
2725       debugTrace(DEBUG_gc,
2726                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2727                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2728       IF_DEBUG(gc,
2729                /* If we're debugging, just print out the top of the stack */
2730                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2731                                                 tso->sp+64)));
2732
2733       // Send this thread the StackOverflow exception
2734       unlockTSO(tso);
2735       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2736       return tso;
2737   }
2738
2739   /* Try to double the current stack size.  If that takes us over the
2740    * maximum stack size for this thread, then use the maximum instead.
2741    * Finally round up so the TSO ends up as a whole number of blocks.
2742    */
2743   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2744   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2745                                        TSO_STRUCT_SIZE)/sizeof(W_);
2746   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2747   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2748
2749   debugTrace(DEBUG_sched, 
2750              "increasing stack size from %ld words to %d.",
2751              (long)tso->stack_size, new_stack_size);
2752
2753   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2754   TICK_ALLOC_TSO(new_stack_size,0);
2755
2756   /* copy the TSO block and the old stack into the new area */
2757   memcpy(dest,tso,TSO_STRUCT_SIZE);
2758   stack_words = tso->stack + tso->stack_size - tso->sp;
2759   new_sp = (P_)dest + new_tso_size - stack_words;
2760   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2761
2762   /* relocate the stack pointers... */
2763   dest->sp         = new_sp;
2764   dest->stack_size = new_stack_size;
2765         
2766   /* Mark the old TSO as relocated.  We have to check for relocated
2767    * TSOs in the garbage collector and any primops that deal with TSOs.
2768    *
2769    * It's important to set the sp value to just beyond the end
2770    * of the stack, so we don't attempt to scavenge any part of the
2771    * dead TSO's stack.
2772    */
2773   tso->what_next = ThreadRelocated;
2774   setTSOLink(cap,tso,dest);
2775   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2776   tso->why_blocked = NotBlocked;
2777
2778   IF_PAR_DEBUG(verbose,
2779                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2780                      tso->id, tso, tso->stack_size);
2781                /* If we're debugging, just print out the top of the stack */
2782                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2783                                                 tso->sp+64)));
2784   
2785   unlockTSO(dest);
2786   unlockTSO(tso);
2787
2788   IF_DEBUG(sanity,checkTSO(dest));
2789 #if 0
2790   IF_DEBUG(scheduler,printTSO(dest));
2791 #endif
2792
2793   return dest;
2794 }
2795
2796 static StgTSO *
2797 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2798 {
2799     bdescr *bd, *new_bd;
2800     lnat new_tso_size_w, tso_size_w;
2801     StgTSO *new_tso;
2802
2803     tso_size_w = tso_sizeW(tso);
2804
2805     if (tso_size_w < MBLOCK_SIZE_W || 
2806         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2807     {
2808         return tso;
2809     }
2810
2811     // don't allow throwTo() to modify the blocked_exceptions queue
2812     // while we are moving the TSO:
2813     lockClosure((StgClosure *)tso);
2814
2815     new_tso_size_w = round_to_mblocks(tso_size_w/2);
2816
2817     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2818                tso->id, tso_size_w, new_tso_size_w);
2819
2820     bd = Bdescr((StgPtr)tso);
2821     new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2822
2823     new_tso = (StgTSO *)new_bd->start;
2824     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2825     new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2826
2827     tso->what_next = ThreadRelocated;
2828     tso->_link = new_tso; // no write barrier reqd: same generation
2829
2830     // The TSO attached to this Task may have moved, so update the
2831     // pointer to it.
2832     if (task->tso == tso) {
2833         task->tso = new_tso;
2834     }
2835
2836     unlockTSO(new_tso);
2837     unlockTSO(tso);
2838
2839     IF_DEBUG(sanity,checkTSO(new_tso));
2840
2841     return new_tso;
2842 }
2843
2844 /* ---------------------------------------------------------------------------
2845    Interrupt execution
2846    - usually called inside a signal handler so it mustn't do anything fancy.   
2847    ------------------------------------------------------------------------ */
2848
2849 void
2850 interruptStgRts(void)
2851 {
2852     sched_state = SCHED_INTERRUPTING;
2853     context_switch = 1;
2854     wakeUpRts();
2855 }
2856
2857 /* -----------------------------------------------------------------------------
2858    Wake up the RTS
2859    
2860    This function causes at least one OS thread to wake up and run the
2861    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2862    an external event has arrived that may need servicing (eg. a
2863    keyboard interrupt).
2864
2865    In the single-threaded RTS we don't do anything here; we only have
2866    one thread anyway, and the event that caused us to want to wake up
2867    will have interrupted any blocking system call in progress anyway.
2868    -------------------------------------------------------------------------- */
2869
2870 void
2871 wakeUpRts(void)
2872 {
2873 #if defined(THREADED_RTS)
2874     // This forces the IO Manager thread to wakeup, which will
2875     // in turn ensure that some OS thread wakes up and runs the
2876     // scheduler loop, which will cause a GC and deadlock check.
2877     ioManagerWakeup();
2878 #endif
2879 }
2880
2881 /* -----------------------------------------------------------------------------
2882  * checkBlackHoles()
2883  *
2884  * Check the blackhole_queue for threads that can be woken up.  We do
2885  * this periodically: before every GC, and whenever the run queue is
2886  * empty.
2887  *
2888  * An elegant solution might be to just wake up all the blocked
2889  * threads with awakenBlockedQueue occasionally: they'll go back to
2890  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2891  * doesn't give us a way to tell whether we've actually managed to
2892  * wake up any threads, so we would be busy-waiting.
2893  *
2894  * -------------------------------------------------------------------------- */
2895
2896 static rtsBool
2897 checkBlackHoles (Capability *cap)
2898 {
2899     StgTSO **prev, *t;
2900     rtsBool any_woke_up = rtsFalse;
2901     StgHalfWord type;
2902
2903     // blackhole_queue is global:
2904     ASSERT_LOCK_HELD(&sched_mutex);
2905
2906     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2907
2908     // ASSUMES: sched_mutex
2909     prev = &blackhole_queue;
2910     t = blackhole_queue;
2911     while (t != END_TSO_QUEUE) {
2912         ASSERT(t->why_blocked == BlockedOnBlackHole);
2913         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2914         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2915             IF_DEBUG(sanity,checkTSO(t));
2916             t = unblockOne(cap, t);
2917             // urk, the threads migrate to the current capability
2918             // here, but we'd like to keep them on the original one.
2919             *prev = t;
2920             any_woke_up = rtsTrue;
2921         } else {
2922             prev = &t->_link;
2923             t = t->_link;
2924         }
2925     }
2926
2927     return any_woke_up;
2928 }
2929
2930 /* -----------------------------------------------------------------------------
2931    Deleting threads
2932
2933    This is used for interruption (^C) and forking, and corresponds to
2934    raising an exception but without letting the thread catch the
2935    exception.
2936    -------------------------------------------------------------------------- */
2937
2938 static void 
2939 deleteThread (Capability *cap, StgTSO *tso)
2940 {
2941     // NOTE: must only be called on a TSO that we have exclusive
2942     // access to, because we will call throwToSingleThreaded() below.
2943     // The TSO must be on the run queue of the Capability we own, or 
2944     // we must own all Capabilities.
2945
2946     if (tso->why_blocked != BlockedOnCCall &&
2947         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2948         throwToSingleThreaded(cap,tso,NULL);
2949     }
2950 }
2951
2952 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2953 static void 
2954 deleteThread_(Capability *cap, StgTSO *tso)
2955 { // for forkProcess only:
2956   // like deleteThread(), but we delete threads in foreign calls, too.
2957
2958     if (tso->why_blocked == BlockedOnCCall ||
2959         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2960         unblockOne(cap,tso);
2961         tso->what_next = ThreadKilled;
2962     } else {
2963         deleteThread(cap,tso);
2964     }
2965 }
2966 #endif
2967
2968 /* -----------------------------------------------------------------------------
2969    raiseExceptionHelper
2970    
2971    This function is called by the raise# primitve, just so that we can
2972    move some of the tricky bits of raising an exception from C-- into
2973    C.  Who knows, it might be a useful re-useable thing here too.
2974    -------------------------------------------------------------------------- */
2975
2976 StgWord
2977 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2978 {
2979     Capability *cap = regTableToCapability(reg);
2980     StgThunk *raise_closure = NULL;
2981     StgPtr p, next;
2982     StgRetInfoTable *info;
2983     //
2984     // This closure represents the expression 'raise# E' where E
2985     // is the exception raise.  It is used to overwrite all the
2986     // thunks which are currently under evaluataion.
2987     //
2988
2989     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2990     // LDV profiling: stg_raise_info has THUNK as its closure
2991     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2992     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2993     // 1 does not cause any problem unless profiling is performed.
2994     // However, when LDV profiling goes on, we need to linearly scan
2995     // small object pool, where raise_closure is stored, so we should
2996     // use MIN_UPD_SIZE.
2997     //
2998     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2999     //                                 sizeofW(StgClosure)+1);
3000     //
3001
3002     //
3003     // Walk up the stack, looking for the catch frame.  On the way,
3004     // we update any closures pointed to from update frames with the
3005     // raise closure that we just built.
3006     //
3007     p = tso->sp;
3008     while(1) {
3009         info = get_ret_itbl((StgClosure *)p);
3010         next = p + stack_frame_sizeW((StgClosure *)p);
3011         switch (info->i.type) {
3012             
3013         case UPDATE_FRAME:
3014             // Only create raise_closure if we need to.
3015             if (raise_closure == NULL) {
3016                 raise_closure = 
3017                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3018                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3019                 raise_closure->payload[0] = exception;
3020             }
3021             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3022             p = next;
3023             continue;
3024
3025         case ATOMICALLY_FRAME:
3026             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3027             tso->sp = p;
3028             return ATOMICALLY_FRAME;
3029             
3030         case CATCH_FRAME:
3031             tso->sp = p;
3032             return CATCH_FRAME;
3033
3034         case CATCH_STM_FRAME:
3035             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3036             tso->sp = p;
3037             return CATCH_STM_FRAME;
3038             
3039         case STOP_FRAME:
3040             tso->sp = p;
3041             return STOP_FRAME;
3042
3043         case CATCH_RETRY_FRAME:
3044         default:
3045             p = next; 
3046             continue;
3047         }
3048     }
3049 }
3050
3051
3052 /* -----------------------------------------------------------------------------
3053    findRetryFrameHelper
3054
3055    This function is called by the retry# primitive.  It traverses the stack
3056    leaving tso->sp referring to the frame which should handle the retry.  
3057
3058    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3059    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3060
3061    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3062    create) because retries are not considered to be exceptions, despite the
3063    similar implementation.
3064
3065    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3066    not be created within memory transactions.
3067    -------------------------------------------------------------------------- */
3068
3069 StgWord
3070 findRetryFrameHelper (StgTSO *tso)
3071 {
3072   StgPtr           p, next;
3073   StgRetInfoTable *info;
3074
3075   p = tso -> sp;
3076   while (1) {
3077     info = get_ret_itbl((StgClosure *)p);
3078     next = p + stack_frame_sizeW((StgClosure *)p);
3079     switch (info->i.type) {
3080       
3081     case ATOMICALLY_FRAME:
3082         debugTrace(DEBUG_stm,
3083                    "found ATOMICALLY_FRAME at %p during retry", p);
3084         tso->sp = p;
3085         return ATOMICALLY_FRAME;
3086       
3087     case CATCH_RETRY_FRAME:
3088         debugTrace(DEBUG_stm,
3089                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3090         tso->sp = p;
3091         return CATCH_RETRY_FRAME;
3092       
3093     case CATCH_STM_FRAME: {
3094         StgTRecHeader *trec = tso -> trec;
3095         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3096         debugTrace(DEBUG_stm,
3097                    "found CATCH_STM_FRAME at %p during retry", p);
3098         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3099         stmAbortTransaction(tso -> cap, trec);
3100         stmFreeAbortedTRec(tso -> cap, trec);
3101         tso -> trec = outer;
3102         p = next; 
3103         continue;
3104     }
3105       
3106
3107     default:
3108       ASSERT(info->i.type != CATCH_FRAME);
3109       ASSERT(info->i.type != STOP_FRAME);
3110       p = next; 
3111       continue;
3112     }
3113   }
3114 }
3115
3116 /* -----------------------------------------------------------------------------
3117    resurrectThreads is called after garbage collection on the list of
3118    threads found to be garbage.  Each of these threads will be woken
3119    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3120    on an MVar, or NonTermination if the thread was blocked on a Black
3121    Hole.
3122
3123    Locks: assumes we hold *all* the capabilities.
3124    -------------------------------------------------------------------------- */
3125
3126 void
3127 resurrectThreads (StgTSO *threads)
3128 {
3129     StgTSO *tso, *next;
3130     Capability *cap;
3131     step *step;
3132
3133     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3134         next = tso->global_link;
3135
3136         step = Bdescr((P_)tso)->step;
3137         tso->global_link = step->threads;
3138         step->threads = tso;
3139
3140         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3141         
3142         // Wake up the thread on the Capability it was last on
3143         cap = tso->cap;
3144         
3145         switch (tso->why_blocked) {
3146         case BlockedOnMVar:
3147         case BlockedOnException:
3148             /* Called by GC - sched_mutex lock is currently held. */
3149             throwToSingleThreaded(cap, tso,
3150                                   (StgClosure *)BlockedOnDeadMVar_closure);
3151             break;
3152         case BlockedOnBlackHole:
3153             throwToSingleThreaded(cap, tso,
3154                                   (StgClosure *)NonTermination_closure);
3155             break;
3156         case BlockedOnSTM:
3157             throwToSingleThreaded(cap, tso,
3158                                   (StgClosure *)BlockedIndefinitely_closure);
3159             break;
3160         case NotBlocked:
3161             /* This might happen if the thread was blocked on a black hole
3162              * belonging to a thread that we've just woken up (raiseAsync
3163              * can wake up threads, remember...).
3164              */
3165             continue;
3166         default:
3167             barf("resurrectThreads: thread blocked in a strange way");
3168         }
3169     }
3170 }
3171
3172 /* -----------------------------------------------------------------------------
3173    performPendingThrowTos is called after garbage collection, and
3174    passed a list of threads that were found to have pending throwTos
3175    (tso->blocked_exceptions was not empty), and were blocked.
3176    Normally this doesn't happen, because we would deliver the
3177    exception directly if the target thread is blocked, but there are
3178    small windows where it might occur on a multiprocessor (see
3179    throwTo()).
3180
3181    NB. we must be holding all the capabilities at this point, just
3182    like resurrectThreads().
3183    -------------------------------------------------------------------------- */
3184
3185 void
3186 performPendingThrowTos (StgTSO *threads)
3187 {
3188     StgTSO *tso, *next;
3189     Capability *cap;
3190     step *step;
3191
3192     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3193         next = tso->global_link;
3194
3195         step = Bdescr((P_)tso)->step;
3196         tso->global_link = step->threads;
3197         step->threads = tso;
3198
3199         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
3200         
3201         cap = tso->cap;
3202         maybePerformBlockedException(cap, tso);
3203     }
3204 }