Fix a bug in the new scheduler
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
153 #endif
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
156 #endif
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
160                                          StgTSO *t);
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
162                                     nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
165                                              StgTSO *t );
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168                                 rtsBool force_major);
169
170 static rtsBool checkBlackHoles(Capability *cap);
171
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
174
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
177
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
180 #endif
181
182 #ifdef DEBUG
183 static char *whatNext_strs[] = {
184   "(unknown)",
185   "ThreadRunGHC",
186   "ThreadInterpret",
187   "ThreadKilled",
188   "ThreadRelocated",
189   "ThreadComplete"
190 };
191 #endif
192
193 /* -----------------------------------------------------------------------------
194  * Putting a thread on the run queue: different scheduling policies
195  * -------------------------------------------------------------------------- */
196
197 STATIC_INLINE void
198 addToRunQueue( Capability *cap, StgTSO *t )
199 {
200 #if defined(PARALLEL_HASKELL)
201     if (RtsFlags.ParFlags.doFairScheduling) { 
202         // this does round-robin scheduling; good for concurrency
203         appendToRunQueue(cap,t);
204     } else {
205         // this does unfair scheduling; good for parallelism
206         pushOnRunQueue(cap,t);
207     }
208 #else
209     // this does round-robin scheduling; good for concurrency
210     appendToRunQueue(cap,t);
211 #endif
212 }
213
214 /* ---------------------------------------------------------------------------
215    Main scheduling loop.
216
217    We use round-robin scheduling, each thread returning to the
218    scheduler loop when one of these conditions is detected:
219
220       * out of heap space
221       * timer expires (thread yields)
222       * thread blocks
223       * thread ends
224       * stack overflow
225
226    GRAN version:
227      In a GranSim setup this loop iterates over the global event queue.
228      This revolves around the global event queue, which determines what 
229      to do next. Therefore, it's more complicated than either the 
230      concurrent or the parallel (GUM) setup.
231   This version has been entirely removed (JB 2008/08).
232
233    GUM version:
234      GUM iterates over incoming messages.
235      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236      and sends out a fish whenever it has nothing to do; in-between
237      doing the actual reductions (shared code below) it processes the
238      incoming messages and deals with delayed operations 
239      (see PendingFetches).
240      This is not the ugliest code you could imagine, but it's bloody close.
241
242   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244   as well as future GUM versions. This file has been refurbished to
245   only contain valid code, which is however incomplete, refers to
246   invalid includes etc.
247
248    ------------------------------------------------------------------------ */
249
250 static Capability *
251 schedule (Capability *initialCapability, Task *task)
252 {
253   StgTSO *t;
254   Capability *cap;
255   StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257   rtsBool receivedFinish = rtsFalse;
258 #endif
259   nat prev_what_next;
260   rtsBool ready_to_gc;
261 #if defined(THREADED_RTS)
262   rtsBool first = rtsTrue;
263 #endif
264   
265   cap = initialCapability;
266
267   // Pre-condition: this task owns initialCapability.
268   // The sched_mutex is *NOT* held
269   // NB. on return, we still hold a capability.
270
271   debugTrace (DEBUG_sched, 
272               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273               task, initialCapability);
274
275   schedulePreLoop();
276
277   // -----------------------------------------------------------
278   // Scheduler loop starts here:
279
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION        (!receivedFinish)
282 #else
283 #define TERMINATION_CONDITION        rtsTrue
284 #endif
285
286   while (TERMINATION_CONDITION) {
287
288     // Check whether we have re-entered the RTS from Haskell without
289     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
290     // call).
291     if (cap->in_haskell) {
292           errorBelch("schedule: re-entered unsafely.\n"
293                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
294           stg_exit(EXIT_FAILURE);
295     }
296
297     // The interruption / shutdown sequence.
298     // 
299     // In order to cleanly shut down the runtime, we want to:
300     //   * make sure that all main threads return to their callers
301     //     with the state 'Interrupted'.
302     //   * clean up all OS threads assocated with the runtime
303     //   * free all memory etc.
304     //
305     // So the sequence for ^C goes like this:
306     //
307     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
308     //     arranges for some Capability to wake up
309     //
310     //   * all threads in the system are halted, and the zombies are
311     //     placed on the run queue for cleaning up.  We acquire all
312     //     the capabilities in order to delete the threads, this is
313     //     done by scheduleDoGC() for convenience (because GC already
314     //     needs to acquire all the capabilities).  We can't kill
315     //     threads involved in foreign calls.
316     // 
317     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
318     //
319     //   * sched_state := SCHED_SHUTTING_DOWN
320     //
321     //   * all workers exit when the run queue on their capability
322     //     drains.  All main threads will also exit when their TSO
323     //     reaches the head of the run queue and they can return.
324     //
325     //   * eventually all Capabilities will shut down, and the RTS can
326     //     exit.
327     //
328     //   * We might be left with threads blocked in foreign calls, 
329     //     we should really attempt to kill these somehow (TODO);
330     
331     switch (sched_state) {
332     case SCHED_RUNNING:
333         break;
334     case SCHED_INTERRUPTING:
335         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337         discardSparksCap(cap);
338 #endif
339         /* scheduleDoGC() deletes all the threads */
340         cap = scheduleDoGC(cap,task,rtsFalse);
341         break;
342     case SCHED_SHUTTING_DOWN:
343         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344         // If we are a worker, just exit.  If we're a bound thread
345         // then we will exit below when we've removed our TSO from
346         // the run queue.
347         if (task->tso == NULL && emptyRunQueue(cap)) {
348             return cap;
349         }
350         break;
351     default:
352         barf("sched_state: %d", sched_state);
353     }
354
355     scheduleFindWork(cap);
356
357     /* work pushing, currently relevant only for THREADED_RTS:
358        (pushes threads, wakes up idle capabilities for stealing) */
359     schedulePushWork(cap,task);
360
361 #if defined(PARALLEL_HASKELL)
362     /* since we perform a blocking receive and continue otherwise,
363        either we never reach here or we definitely have work! */
364     // from here: non-empty run queue
365     ASSERT(!emptyRunQueue(cap));
366
367     if (PacketsWaiting()) {  /* now process incoming messages, if any
368                                 pending...  
369
370                                 CAUTION: scheduleGetRemoteWork called
371                                 above, waits for messages as well! */
372       processMessages(cap, &receivedFinish);
373     }
374 #endif // PARALLEL_HASKELL: non-empty run queue!
375
376     scheduleDetectDeadlock(cap,task);
377
378 #if defined(THREADED_RTS)
379     cap = task->cap;    // reload cap, it might have changed
380 #endif
381
382     // Normally, the only way we can get here with no threads to
383     // run is if a keyboard interrupt received during 
384     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385     // Additionally, it is not fatal for the
386     // threaded RTS to reach here with no threads to run.
387     //
388     // win32: might be here due to awaitEvent() being abandoned
389     // as a result of a console event having been delivered.
390     
391 #if defined(THREADED_RTS)
392     if (first) 
393     {
394     // XXX: ToDo
395     //     // don't yield the first time, we want a chance to run this
396     //     // thread for a bit, even if there are others banging at the
397     //     // door.
398     //     first = rtsFalse;
399     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
400     }
401
402   yield:
403     scheduleYield(&cap,task);
404     if (emptyRunQueue(cap)) continue; // look for work again
405 #endif
406
407 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
408     if ( emptyRunQueue(cap) ) {
409         ASSERT(sched_state >= SCHED_INTERRUPTING);
410     }
411 #endif
412
413     // 
414     // Get a thread to run
415     //
416     t = popRunQueue(cap);
417
418     // Sanity check the thread we're about to run.  This can be
419     // expensive if there is lots of thread switching going on...
420     IF_DEBUG(sanity,checkTSO(t));
421
422 #if defined(THREADED_RTS)
423     // Check whether we can run this thread in the current task.
424     // If not, we have to pass our capability to the right task.
425     {
426         Task *bound = t->bound;
427       
428         if (bound) {
429             if (bound == task) {
430                 debugTrace(DEBUG_sched,
431                            "### Running thread %lu in bound thread", (unsigned long)t->id);
432                 // yes, the Haskell thread is bound to the current native thread
433             } else {
434                 debugTrace(DEBUG_sched,
435                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
436                 // no, bound to a different Haskell thread: pass to that thread
437                 pushOnRunQueue(cap,t);
438                 continue;
439             }
440         } else {
441             // The thread we want to run is unbound.
442             if (task->tso) { 
443                 debugTrace(DEBUG_sched,
444                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
445                 // no, the current native thread is bound to a different
446                 // Haskell thread, so pass it to any worker thread
447                 pushOnRunQueue(cap,t);
448                 continue; 
449             }
450         }
451     }
452 #endif
453
454     /* context switches are initiated by the timer signal, unless
455      * the user specified "context switch as often as possible", with
456      * +RTS -C0
457      */
458     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
459         && !emptyThreadQueues(cap)) {
460         cap->context_switch = 1;
461     }
462          
463 run_thread:
464
465     // CurrentTSO is the thread to run.  t might be different if we
466     // loop back to run_thread, so make sure to set CurrentTSO after
467     // that.
468     cap->r.rCurrentTSO = t;
469
470     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
471                               (long)t->id, whatNext_strs[t->what_next]);
472
473     startHeapProfTimer();
474
475     // Check for exceptions blocked on this thread
476     maybePerformBlockedException (cap, t);
477
478     // ----------------------------------------------------------------------
479     // Run the current thread 
480
481     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
482     ASSERT(t->cap == cap);
483     ASSERT(t->bound ? t->bound->cap == cap : 1);
484
485     prev_what_next = t->what_next;
486
487     errno = t->saved_errno;
488 #if mingw32_HOST_OS
489     SetLastError(t->saved_winerror);
490 #endif
491
492     cap->in_haskell = rtsTrue;
493
494     dirty_TSO(cap,t);
495
496 #if defined(THREADED_RTS)
497     if (recent_activity == ACTIVITY_DONE_GC) {
498         // ACTIVITY_DONE_GC means we turned off the timer signal to
499         // conserve power (see #1623).  Re-enable it here.
500         nat prev;
501         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
502         if (prev == ACTIVITY_DONE_GC) {
503             startTimer();
504         }
505     } else {
506         recent_activity = ACTIVITY_YES;
507     }
508 #endif
509
510     switch (prev_what_next) {
511         
512     case ThreadKilled:
513     case ThreadComplete:
514         /* Thread already finished, return to scheduler. */
515         ret = ThreadFinished;
516         break;
517         
518     case ThreadRunGHC:
519     {
520         StgRegTable *r;
521         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
522         cap = regTableToCapability(r);
523         ret = r->rRet;
524         break;
525     }
526     
527     case ThreadInterpret:
528         cap = interpretBCO(cap);
529         ret = cap->r.rRet;
530         break;
531         
532     default:
533         barf("schedule: invalid what_next field");
534     }
535
536     cap->in_haskell = rtsFalse;
537
538     // The TSO might have moved, eg. if it re-entered the RTS and a GC
539     // happened.  So find the new location:
540     t = cap->r.rCurrentTSO;
541
542     // We have run some Haskell code: there might be blackhole-blocked
543     // threads to wake up now.
544     // Lock-free test here should be ok, we're just setting a flag.
545     if ( blackhole_queue != END_TSO_QUEUE ) {
546         blackholes_need_checking = rtsTrue;
547     }
548     
549     // And save the current errno in this thread.
550     // XXX: possibly bogus for SMP because this thread might already
551     // be running again, see code below.
552     t->saved_errno = errno;
553 #if mingw32_HOST_OS
554     // Similarly for Windows error code
555     t->saved_winerror = GetLastError();
556 #endif
557
558 #if defined(THREADED_RTS)
559     // If ret is ThreadBlocked, and this Task is bound to the TSO that
560     // blocked, we are in limbo - the TSO is now owned by whatever it
561     // is blocked on, and may in fact already have been woken up,
562     // perhaps even on a different Capability.  It may be the case
563     // that task->cap != cap.  We better yield this Capability
564     // immediately and return to normaility.
565     if (ret == ThreadBlocked) {
566         debugTrace(DEBUG_sched,
567                    "--<< thread %lu (%s) stopped: blocked",
568                    (unsigned long)t->id, whatNext_strs[t->what_next]);
569         goto yield;
570     }
571 #endif
572
573     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
574     ASSERT(t->cap == cap);
575
576     // ----------------------------------------------------------------------
577     
578     // Costs for the scheduler are assigned to CCS_SYSTEM
579     stopHeapProfTimer();
580 #if defined(PROFILING)
581     CCCS = CCS_SYSTEM;
582 #endif
583     
584     schedulePostRunThread(cap,t);
585
586     t = threadStackUnderflow(task,t);
587
588     ready_to_gc = rtsFalse;
589
590     switch (ret) {
591     case HeapOverflow:
592         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
593         break;
594
595     case StackOverflow:
596         scheduleHandleStackOverflow(cap,task,t);
597         break;
598
599     case ThreadYielding:
600         if (scheduleHandleYield(cap, t, prev_what_next)) {
601             // shortcut for switching between compiler/interpreter:
602             goto run_thread; 
603         }
604         break;
605
606     case ThreadBlocked:
607         scheduleHandleThreadBlocked(t);
608         break;
609
610     case ThreadFinished:
611         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
612         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613         break;
614
615     default:
616       barf("schedule: invalid thread return code %d", (int)ret);
617     }
618
619     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
620       cap = scheduleDoGC(cap,task,rtsFalse);
621     }
622   } /* end of while() */
623 }
624
625 /* ----------------------------------------------------------------------------
626  * Setting up the scheduler loop
627  * ------------------------------------------------------------------------- */
628
629 static void
630 schedulePreLoop(void)
631 {
632   // initialisation for scheduler - what cannot go into initScheduler()  
633 }
634
635 /* -----------------------------------------------------------------------------
636  * scheduleFindWork()
637  *
638  * Search for work to do, and handle messages from elsewhere.
639  * -------------------------------------------------------------------------- */
640
641 static void
642 scheduleFindWork (Capability *cap)
643 {
644     scheduleStartSignalHandlers(cap);
645
646     // Only check the black holes here if we've nothing else to do.
647     // During normal execution, the black hole list only gets checked
648     // at GC time, to avoid repeatedly traversing this possibly long
649     // list each time around the scheduler.
650     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
651
652     scheduleCheckWakeupThreads(cap);
653
654     scheduleCheckBlockedThreads(cap);
655
656 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
657     // Try to activate one of our own sparks
658     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
659 #endif
660
661 #if defined(THREADED_RTS)
662     // Try to steak work if we don't have any
663     if (emptyRunQueue(cap)) { stealWork(cap); }
664 #endif
665     
666 #if defined(PARALLEL_HASKELL)
667     // if messages have been buffered...
668     scheduleSendPendingMessages();
669 #endif
670
671 #if defined(PARALLEL_HASKELL)
672     if (emptyRunQueue(cap)) {
673         receivedFinish = scheduleGetRemoteWork(cap);
674         continue; //  a new round, (hopefully) with new work
675         /* 
676            in GUM, this a) sends out a FISH and returns IF no fish is
677                            out already
678                         b) (blocking) awaits and receives messages
679            
680            in Eden, this is only the blocking receive, as b) in GUM.
681         */
682     }
683 #endif
684 }
685
686 #if defined(THREADED_RTS)
687 STATIC_INLINE rtsBool
688 shouldYieldCapability (Capability *cap, Task *task)
689 {
690     // we need to yield this capability to someone else if..
691     //   - another thread is initiating a GC
692     //   - another Task is returning from a foreign call
693     //   - the thread at the head of the run queue cannot be run
694     //     by this Task (it is bound to another Task, or it is unbound
695     //     and this task it bound).
696     return (waiting_for_gc || 
697             cap->returning_tasks_hd != NULL ||
698             (!emptyRunQueue(cap) && (task->tso == NULL
699                                      ? cap->run_queue_hd->bound != NULL
700                                      : cap->run_queue_hd->bound != task)));
701 }
702
703 // This is the single place where a Task goes to sleep.  There are
704 // two reasons it might need to sleep:
705 //    - there are no threads to run
706 //    - we need to yield this Capability to someone else 
707 //      (see shouldYieldCapability())
708 //
709 // The return value indicates whether 
710
711 static void
712 scheduleYield (Capability **pcap, Task *task)
713 {
714     Capability *cap = *pcap;
715
716     // if we have work, and we don't need to give up the Capability, continue.
717     if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
718         return;
719
720     // otherwise yield (sleep), and keep yielding if necessary.
721     do {
722         yieldCapability(&cap,task);
723     } 
724     while (shouldYieldCapability(cap,task));
725
726     // note there may still be no threads on the run queue at this
727     // point, the caller has to check.
728
729     *pcap = cap;
730     return;
731 }
732 #endif
733     
734 /* -----------------------------------------------------------------------------
735  * schedulePushWork()
736  *
737  * Push work to other Capabilities if we have some.
738  * -------------------------------------------------------------------------- */
739
740 static void
741 schedulePushWork(Capability *cap USED_IF_THREADS, 
742                  Task *task      USED_IF_THREADS)
743 {
744   /* following code not for PARALLEL_HASKELL. I kept the call general,
745      future GUM versions might use pushing in a distributed setup */
746 #if defined(THREADED_RTS)
747
748     Capability *free_caps[n_capabilities], *cap0;
749     nat i, n_free_caps;
750
751     // migration can be turned off with +RTS -qg
752     if (!RtsFlags.ParFlags.migrate) return;
753
754     // Check whether we have more threads on our run queue, or sparks
755     // in our pool, that we could hand to another Capability.
756     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
757         && sparkPoolSizeCap(cap) < 2) {
758         return;
759     }
760
761     // First grab as many free Capabilities as we can.
762     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
763         cap0 = &capabilities[i];
764         if (cap != cap0 && tryGrabCapability(cap0,task)) {
765             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
766                 // it already has some work, we just grabbed it at 
767                 // the wrong moment.  Or maybe it's deadlocked!
768                 releaseCapability(cap0);
769             } else {
770                 free_caps[n_free_caps++] = cap0;
771             }
772         }
773     }
774
775     // we now have n_free_caps free capabilities stashed in
776     // free_caps[].  Share our run queue equally with them.  This is
777     // probably the simplest thing we could do; improvements we might
778     // want to do include:
779     //
780     //   - giving high priority to moving relatively new threads, on 
781     //     the gournds that they haven't had time to build up a
782     //     working set in the cache on this CPU/Capability.
783     //
784     //   - giving low priority to moving long-lived threads
785
786     if (n_free_caps > 0) {
787         StgTSO *prev, *t, *next;
788         rtsBool pushed_to_all;
789
790         debugTrace(DEBUG_sched, 
791                    "cap %d: %s and %d free capabilities, sharing...", 
792                    cap->no, 
793                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
794                    "excess threads on run queue":"sparks to share (>=2)",
795                    n_free_caps);
796
797         i = 0;
798         pushed_to_all = rtsFalse;
799
800         if (cap->run_queue_hd != END_TSO_QUEUE) {
801             prev = cap->run_queue_hd;
802             t = prev->_link;
803             prev->_link = END_TSO_QUEUE;
804             for (; t != END_TSO_QUEUE; t = next) {
805                 next = t->_link;
806                 t->_link = END_TSO_QUEUE;
807                 if (t->what_next == ThreadRelocated
808                     || t->bound == task // don't move my bound thread
809                     || tsoLocked(t)) {  // don't move a locked thread
810                     setTSOLink(cap, prev, t);
811                     prev = t;
812                 } else if (i == n_free_caps) {
813                     pushed_to_all = rtsTrue;
814                     i = 0;
815                     // keep one for us
816                     setTSOLink(cap, prev, t);
817                     prev = t;
818                 } else {
819                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
820                     appendToRunQueue(free_caps[i],t);
821                     if (t->bound) { t->bound->cap = free_caps[i]; }
822                     t->cap = free_caps[i];
823                     i++;
824                 }
825             }
826             cap->run_queue_tl = prev;
827         }
828
829 #ifdef SPARK_PUSHING
830         /* JB I left this code in place, it would work but is not necessary */
831
832         // If there are some free capabilities that we didn't push any
833         // threads to, then try to push a spark to each one.
834         if (!pushed_to_all) {
835             StgClosure *spark;
836             // i is the next free capability to push to
837             for (; i < n_free_caps; i++) {
838                 if (emptySparkPoolCap(free_caps[i])) {
839                     spark = tryStealSpark(cap->sparks);
840                     if (spark != NULL) {
841                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
842                         newSpark(&(free_caps[i]->r), spark);
843                     }
844                 }
845             }
846         }
847 #endif /* SPARK_PUSHING */
848
849         // release the capabilities
850         for (i = 0; i < n_free_caps; i++) {
851             task->cap = free_caps[i];
852             releaseAndWakeupCapability(free_caps[i]);
853         }
854     }
855     task->cap = cap; // reset to point to our Capability.
856
857 #endif /* THREADED_RTS */
858
859 }
860
861 /* ----------------------------------------------------------------------------
862  * Start any pending signal handlers
863  * ------------------------------------------------------------------------- */
864
865 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
866 static void
867 scheduleStartSignalHandlers(Capability *cap)
868 {
869     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
870         // safe outside the lock
871         startSignalHandlers(cap);
872     }
873 }
874 #else
875 static void
876 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
877 {
878 }
879 #endif
880
881 /* ----------------------------------------------------------------------------
882  * Check for blocked threads that can be woken up.
883  * ------------------------------------------------------------------------- */
884
885 static void
886 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
887 {
888 #if !defined(THREADED_RTS)
889     //
890     // Check whether any waiting threads need to be woken up.  If the
891     // run queue is empty, and there are no other tasks running, we
892     // can wait indefinitely for something to happen.
893     //
894     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
895     {
896         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
897     }
898 #endif
899 }
900
901
902 /* ----------------------------------------------------------------------------
903  * Check for threads woken up by other Capabilities
904  * ------------------------------------------------------------------------- */
905
906 static void
907 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
908 {
909 #if defined(THREADED_RTS)
910     // Any threads that were woken up by other Capabilities get
911     // appended to our run queue.
912     if (!emptyWakeupQueue(cap)) {
913         ACQUIRE_LOCK(&cap->lock);
914         if (emptyRunQueue(cap)) {
915             cap->run_queue_hd = cap->wakeup_queue_hd;
916             cap->run_queue_tl = cap->wakeup_queue_tl;
917         } else {
918             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
919             cap->run_queue_tl = cap->wakeup_queue_tl;
920         }
921         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
922         RELEASE_LOCK(&cap->lock);
923     }
924 #endif
925 }
926
927 /* ----------------------------------------------------------------------------
928  * Check for threads blocked on BLACKHOLEs that can be woken up
929  * ------------------------------------------------------------------------- */
930 static void
931 scheduleCheckBlackHoles (Capability *cap)
932 {
933     if ( blackholes_need_checking ) // check without the lock first
934     {
935         ACQUIRE_LOCK(&sched_mutex);
936         if ( blackholes_need_checking ) {
937             checkBlackHoles(cap);
938             blackholes_need_checking = rtsFalse;
939         }
940         RELEASE_LOCK(&sched_mutex);
941     }
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Detect deadlock conditions and attempt to resolve them.
946  * ------------------------------------------------------------------------- */
947
948 static void
949 scheduleDetectDeadlock (Capability *cap, Task *task)
950 {
951
952 #if defined(PARALLEL_HASKELL)
953     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
954     return;
955 #endif
956
957     /* 
958      * Detect deadlock: when we have no threads to run, there are no
959      * threads blocked, waiting for I/O, or sleeping, and all the
960      * other tasks are waiting for work, we must have a deadlock of
961      * some description.
962      */
963     if ( emptyThreadQueues(cap) )
964     {
965 #if defined(THREADED_RTS)
966         /* 
967          * In the threaded RTS, we only check for deadlock if there
968          * has been no activity in a complete timeslice.  This means
969          * we won't eagerly start a full GC just because we don't have
970          * any threads to run currently.
971          */
972         if (recent_activity != ACTIVITY_INACTIVE) return;
973 #endif
974
975         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
976
977         // Garbage collection can release some new threads due to
978         // either (a) finalizers or (b) threads resurrected because
979         // they are unreachable and will therefore be sent an
980         // exception.  Any threads thus released will be immediately
981         // runnable.
982         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
983
984         recent_activity = ACTIVITY_DONE_GC;
985         // disable timer signals (see #1623)
986         stopTimer();
987         
988         if ( !emptyRunQueue(cap) ) return;
989
990 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
991         /* If we have user-installed signal handlers, then wait
992          * for signals to arrive rather then bombing out with a
993          * deadlock.
994          */
995         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
996             debugTrace(DEBUG_sched,
997                        "still deadlocked, waiting for signals...");
998
999             awaitUserSignals();
1000
1001             if (signals_pending()) {
1002                 startSignalHandlers(cap);
1003             }
1004
1005             // either we have threads to run, or we were interrupted:
1006             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1007
1008             return;
1009         }
1010 #endif
1011
1012 #if !defined(THREADED_RTS)
1013         /* Probably a real deadlock.  Send the current main thread the
1014          * Deadlock exception.
1015          */
1016         if (task->tso) {
1017             switch (task->tso->why_blocked) {
1018             case BlockedOnSTM:
1019             case BlockedOnBlackHole:
1020             case BlockedOnException:
1021             case BlockedOnMVar:
1022                 throwToSingleThreaded(cap, task->tso, 
1023                                       (StgClosure *)nonTermination_closure);
1024                 return;
1025             default:
1026                 barf("deadlock: main thread blocked in a strange way");
1027             }
1028         }
1029         return;
1030 #endif
1031     }
1032 }
1033
1034
1035 /* ----------------------------------------------------------------------------
1036  * Send pending messages (PARALLEL_HASKELL only)
1037  * ------------------------------------------------------------------------- */
1038
1039 #if defined(PARALLEL_HASKELL)
1040 static void
1041 scheduleSendPendingMessages(void)
1042 {
1043
1044 # if defined(PAR) // global Mem.Mgmt., omit for now
1045     if (PendingFetches != END_BF_QUEUE) {
1046         processFetches();
1047     }
1048 # endif
1049     
1050     if (RtsFlags.ParFlags.BufferTime) {
1051         // if we use message buffering, we must send away all message
1052         // packets which have become too old...
1053         sendOldBuffers(); 
1054     }
1055 }
1056 #endif
1057
1058 /* ----------------------------------------------------------------------------
1059  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1060  * ------------------------------------------------------------------------- */
1061
1062 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1063 static void
1064 scheduleActivateSpark(Capability *cap)
1065 {
1066     StgClosure *spark;
1067
1068 /* We only want to stay here if the run queue is empty and we want some
1069    work. We try to turn a spark into a thread, and add it to the run
1070    queue, from where it will be picked up in the next iteration of the
1071    scheduler loop.  
1072 */
1073     if (!emptyRunQueue(cap)) 
1074       /* In the threaded RTS, another task might have pushed a thread
1075          on our run queue in the meantime ? But would need a lock.. */
1076       return;
1077
1078  
1079     // Really we should be using reclaimSpark() here, but
1080     // experimentally it doesn't seem to perform as well as just
1081     // stealing from our own spark pool:
1082     // spark = reclaimSpark(cap->sparks);
1083     spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1084
1085     if (spark != NULL) {
1086       debugTrace(DEBUG_sched,
1087                  "turning spark of closure %p into a thread",
1088                  (StgClosure *)spark);
1089       createSparkThread(cap,spark); // defined in Sparks.c
1090     }
1091 }
1092 #endif // PARALLEL_HASKELL || THREADED_RTS
1093
1094 /* ----------------------------------------------------------------------------
1095  * Get work from a remote node (PARALLEL_HASKELL only)
1096  * ------------------------------------------------------------------------- */
1097     
1098 #if defined(PARALLEL_HASKELL)
1099 static rtsBool /* return value used in PARALLEL_HASKELL only */
1100 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1101 {
1102 #if defined(PARALLEL_HASKELL)
1103   rtsBool receivedFinish = rtsFalse;
1104
1105   // idle() , i.e. send all buffers, wait for work
1106   if (RtsFlags.ParFlags.BufferTime) {
1107         IF_PAR_DEBUG(verbose, 
1108                 debugBelch("...send all pending data,"));
1109         {
1110           nat i;
1111           for (i=1; i<=nPEs; i++)
1112             sendImmediately(i); // send all messages away immediately
1113         }
1114   }
1115
1116   /* this would be the place for fishing in GUM... 
1117
1118      if (no-earlier-fish-around) 
1119           sendFish(choosePe());
1120    */
1121
1122   // Eden:just look for incoming messages (blocking receive)
1123   IF_PAR_DEBUG(verbose, 
1124                debugBelch("...wait for incoming messages...\n"));
1125   processMessages(cap, &receivedFinish); // blocking receive...
1126
1127
1128   return receivedFinish;
1129   // reenter scheduling look after having received something
1130
1131 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1132
1133   return rtsFalse; /* return value unused in THREADED_RTS */
1134
1135 #endif /* PARALLEL_HASKELL */
1136 }
1137 #endif // PARALLEL_HASKELL || THREADED_RTS
1138
1139 /* ----------------------------------------------------------------------------
1140  * After running a thread...
1141  * ------------------------------------------------------------------------- */
1142
1143 static void
1144 schedulePostRunThread (Capability *cap, StgTSO *t)
1145 {
1146     // We have to be able to catch transactions that are in an
1147     // infinite loop as a result of seeing an inconsistent view of
1148     // memory, e.g. 
1149     //
1150     //   atomically $ do
1151     //       [a,b] <- mapM readTVar [ta,tb]
1152     //       when (a == b) loop
1153     //
1154     // and a is never equal to b given a consistent view of memory.
1155     //
1156     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1157         if (!stmValidateNestOfTransactions (t -> trec)) {
1158             debugTrace(DEBUG_sched | DEBUG_stm,
1159                        "trec %p found wasting its time", t);
1160             
1161             // strip the stack back to the
1162             // ATOMICALLY_FRAME, aborting the (nested)
1163             // transaction, and saving the stack of any
1164             // partially-evaluated thunks on the heap.
1165             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1166             
1167             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1168         }
1169     }
1170
1171   /* some statistics gathering in the parallel case */
1172 }
1173
1174 /* -----------------------------------------------------------------------------
1175  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1176  * -------------------------------------------------------------------------- */
1177
1178 static rtsBool
1179 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1180 {
1181     // did the task ask for a large block?
1182     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1183         // if so, get one and push it on the front of the nursery.
1184         bdescr *bd;
1185         lnat blocks;
1186         
1187         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1188         
1189         debugTrace(DEBUG_sched,
1190                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1191                    (long)t->id, whatNext_strs[t->what_next], blocks);
1192     
1193         // don't do this if the nursery is (nearly) full, we'll GC first.
1194         if (cap->r.rCurrentNursery->link != NULL ||
1195             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1196                                                // if the nursery has only one block.
1197             
1198             ACQUIRE_SM_LOCK
1199             bd = allocGroup( blocks );
1200             RELEASE_SM_LOCK
1201             cap->r.rNursery->n_blocks += blocks;
1202             
1203             // link the new group into the list
1204             bd->link = cap->r.rCurrentNursery;
1205             bd->u.back = cap->r.rCurrentNursery->u.back;
1206             if (cap->r.rCurrentNursery->u.back != NULL) {
1207                 cap->r.rCurrentNursery->u.back->link = bd;
1208             } else {
1209 #if !defined(THREADED_RTS)
1210                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1211                        g0s0 == cap->r.rNursery);
1212 #endif
1213                 cap->r.rNursery->blocks = bd;
1214             }             
1215             cap->r.rCurrentNursery->u.back = bd;
1216             
1217             // initialise it as a nursery block.  We initialise the
1218             // step, gen_no, and flags field of *every* sub-block in
1219             // this large block, because this is easier than making
1220             // sure that we always find the block head of a large
1221             // block whenever we call Bdescr() (eg. evacuate() and
1222             // isAlive() in the GC would both have to do this, at
1223             // least).
1224             { 
1225                 bdescr *x;
1226                 for (x = bd; x < bd + blocks; x++) {
1227                     x->step = cap->r.rNursery;
1228                     x->gen_no = 0;
1229                     x->flags = 0;
1230                 }
1231             }
1232             
1233             // This assert can be a killer if the app is doing lots
1234             // of large block allocations.
1235             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1236             
1237             // now update the nursery to point to the new block
1238             cap->r.rCurrentNursery = bd;
1239             
1240             // we might be unlucky and have another thread get on the
1241             // run queue before us and steal the large block, but in that
1242             // case the thread will just end up requesting another large
1243             // block.
1244             pushOnRunQueue(cap,t);
1245             return rtsFalse;  /* not actually GC'ing */
1246         }
1247     }
1248     
1249     debugTrace(DEBUG_sched,
1250                "--<< thread %ld (%s) stopped: HeapOverflow",
1251                (long)t->id, whatNext_strs[t->what_next]);
1252
1253     if (cap->context_switch) {
1254         // Sometimes we miss a context switch, e.g. when calling
1255         // primitives in a tight loop, MAYBE_GC() doesn't check the
1256         // context switch flag, and we end up waiting for a GC.
1257         // See #1984, and concurrent/should_run/1984
1258         cap->context_switch = 0;
1259         addToRunQueue(cap,t);
1260     } else {
1261         pushOnRunQueue(cap,t);
1262     }
1263     return rtsTrue;
1264     /* actual GC is done at the end of the while loop in schedule() */
1265 }
1266
1267 /* -----------------------------------------------------------------------------
1268  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1269  * -------------------------------------------------------------------------- */
1270
1271 static void
1272 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1273 {
1274     debugTrace (DEBUG_sched,
1275                 "--<< thread %ld (%s) stopped, StackOverflow", 
1276                 (long)t->id, whatNext_strs[t->what_next]);
1277
1278     /* just adjust the stack for this thread, then pop it back
1279      * on the run queue.
1280      */
1281     { 
1282         /* enlarge the stack */
1283         StgTSO *new_t = threadStackOverflow(cap, t);
1284         
1285         /* The TSO attached to this Task may have moved, so update the
1286          * pointer to it.
1287          */
1288         if (task->tso == t) {
1289             task->tso = new_t;
1290         }
1291         pushOnRunQueue(cap,new_t);
1292     }
1293 }
1294
1295 /* -----------------------------------------------------------------------------
1296  * Handle a thread that returned to the scheduler with ThreadYielding
1297  * -------------------------------------------------------------------------- */
1298
1299 static rtsBool
1300 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1301 {
1302     // Reset the context switch flag.  We don't do this just before
1303     // running the thread, because that would mean we would lose ticks
1304     // during GC, which can lead to unfair scheduling (a thread hogs
1305     // the CPU because the tick always arrives during GC).  This way
1306     // penalises threads that do a lot of allocation, but that seems
1307     // better than the alternative.
1308     cap->context_switch = 0;
1309     
1310     /* put the thread back on the run queue.  Then, if we're ready to
1311      * GC, check whether this is the last task to stop.  If so, wake
1312      * up the GC thread.  getThread will block during a GC until the
1313      * GC is finished.
1314      */
1315 #ifdef DEBUG
1316     if (t->what_next != prev_what_next) {
1317         debugTrace(DEBUG_sched,
1318                    "--<< thread %ld (%s) stopped to switch evaluators", 
1319                    (long)t->id, whatNext_strs[t->what_next]);
1320     } else {
1321         debugTrace(DEBUG_sched,
1322                    "--<< thread %ld (%s) stopped, yielding",
1323                    (long)t->id, whatNext_strs[t->what_next]);
1324     }
1325 #endif
1326     
1327     IF_DEBUG(sanity,
1328              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1329              checkTSO(t));
1330     ASSERT(t->_link == END_TSO_QUEUE);
1331     
1332     // Shortcut if we're just switching evaluators: don't bother
1333     // doing stack squeezing (which can be expensive), just run the
1334     // thread.
1335     if (t->what_next != prev_what_next) {
1336         return rtsTrue;
1337     }
1338
1339     addToRunQueue(cap,t);
1340
1341     return rtsFalse;
1342 }
1343
1344 /* -----------------------------------------------------------------------------
1345  * Handle a thread that returned to the scheduler with ThreadBlocked
1346  * -------------------------------------------------------------------------- */
1347
1348 static void
1349 scheduleHandleThreadBlocked( StgTSO *t
1350 #if !defined(GRAN) && !defined(DEBUG)
1351     STG_UNUSED
1352 #endif
1353     )
1354 {
1355
1356       // We don't need to do anything.  The thread is blocked, and it
1357       // has tidied up its stack and placed itself on whatever queue
1358       // it needs to be on.
1359
1360     // ASSERT(t->why_blocked != NotBlocked);
1361     // Not true: for example,
1362     //    - in THREADED_RTS, the thread may already have been woken
1363     //      up by another Capability.  This actually happens: try
1364     //      conc023 +RTS -N2.
1365     //    - the thread may have woken itself up already, because
1366     //      threadPaused() might have raised a blocked throwTo
1367     //      exception, see maybePerformBlockedException().
1368
1369 #ifdef DEBUG
1370     if (traceClass(DEBUG_sched)) {
1371         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1372                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1373         printThreadBlockage(t);
1374         debugTraceEnd();
1375     }
1376 #endif
1377 }
1378
1379 /* -----------------------------------------------------------------------------
1380  * Handle a thread that returned to the scheduler with ThreadFinished
1381  * -------------------------------------------------------------------------- */
1382
1383 static rtsBool
1384 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1385 {
1386     /* Need to check whether this was a main thread, and if so,
1387      * return with the return value.
1388      *
1389      * We also end up here if the thread kills itself with an
1390      * uncaught exception, see Exception.cmm.
1391      */
1392     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1393                (unsigned long)t->id, whatNext_strs[t->what_next]);
1394
1395       //
1396       // Check whether the thread that just completed was a bound
1397       // thread, and if so return with the result.  
1398       //
1399       // There is an assumption here that all thread completion goes
1400       // through this point; we need to make sure that if a thread
1401       // ends up in the ThreadKilled state, that it stays on the run
1402       // queue so it can be dealt with here.
1403       //
1404
1405       if (t->bound) {
1406
1407           if (t->bound != task) {
1408 #if !defined(THREADED_RTS)
1409               // Must be a bound thread that is not the topmost one.  Leave
1410               // it on the run queue until the stack has unwound to the
1411               // point where we can deal with this.  Leaving it on the run
1412               // queue also ensures that the garbage collector knows about
1413               // this thread and its return value (it gets dropped from the
1414               // step->threads list so there's no other way to find it).
1415               appendToRunQueue(cap,t);
1416               return rtsFalse;
1417 #else
1418               // this cannot happen in the threaded RTS, because a
1419               // bound thread can only be run by the appropriate Task.
1420               barf("finished bound thread that isn't mine");
1421 #endif
1422           }
1423
1424           ASSERT(task->tso == t);
1425
1426           if (t->what_next == ThreadComplete) {
1427               if (task->ret) {
1428                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1429                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1430               }
1431               task->stat = Success;
1432           } else {
1433               if (task->ret) {
1434                   *(task->ret) = NULL;
1435               }
1436               if (sched_state >= SCHED_INTERRUPTING) {
1437                   task->stat = Interrupted;
1438               } else {
1439                   task->stat = Killed;
1440               }
1441           }
1442 #ifdef DEBUG
1443           removeThreadLabel((StgWord)task->tso->id);
1444 #endif
1445           return rtsTrue; // tells schedule() to return
1446       }
1447
1448       return rtsFalse;
1449 }
1450
1451 /* -----------------------------------------------------------------------------
1452  * Perform a heap census
1453  * -------------------------------------------------------------------------- */
1454
1455 static rtsBool
1456 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1457 {
1458     // When we have +RTS -i0 and we're heap profiling, do a census at
1459     // every GC.  This lets us get repeatable runs for debugging.
1460     if (performHeapProfile ||
1461         (RtsFlags.ProfFlags.profileInterval==0 &&
1462          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1463         return rtsTrue;
1464     } else {
1465         return rtsFalse;
1466     }
1467 }
1468
1469 /* -----------------------------------------------------------------------------
1470  * Perform a garbage collection if necessary
1471  * -------------------------------------------------------------------------- */
1472
1473 static Capability *
1474 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1475 {
1476     rtsBool heap_census;
1477 #ifdef THREADED_RTS
1478     /* extern static volatile StgWord waiting_for_gc; 
1479        lives inside capability.c */
1480     rtsBool was_waiting;
1481     nat i;
1482 #endif
1483
1484 #ifdef THREADED_RTS
1485     // In order to GC, there must be no threads running Haskell code.
1486     // Therefore, the GC thread needs to hold *all* the capabilities,
1487     // and release them after the GC has completed.  
1488     //
1489     // This seems to be the simplest way: previous attempts involved
1490     // making all the threads with capabilities give up their
1491     // capabilities and sleep except for the *last* one, which
1492     // actually did the GC.  But it's quite hard to arrange for all
1493     // the other tasks to sleep and stay asleep.
1494     //
1495         
1496     /*  Other capabilities are prevented from running yet more Haskell
1497         threads if waiting_for_gc is set. Tested inside
1498         yieldCapability() and releaseCapability() in Capability.c */
1499
1500     was_waiting = cas(&waiting_for_gc, 0, 1);
1501     if (was_waiting) {
1502         do {
1503             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1504             if (cap) yieldCapability(&cap,task);
1505         } while (waiting_for_gc);
1506         return cap;  // NOTE: task->cap might have changed here
1507     }
1508
1509     setContextSwitches();
1510     for (i=0; i < n_capabilities; i++) {
1511         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1512         if (cap != &capabilities[i]) {
1513             Capability *pcap = &capabilities[i];
1514             // we better hope this task doesn't get migrated to
1515             // another Capability while we're waiting for this one.
1516             // It won't, because load balancing happens while we have
1517             // all the Capabilities, but even so it's a slightly
1518             // unsavoury invariant.
1519             task->cap = pcap;
1520             waitForReturnCapability(&pcap, task);
1521             if (pcap != &capabilities[i]) {
1522                 barf("scheduleDoGC: got the wrong capability");
1523             }
1524         }
1525     }
1526
1527     waiting_for_gc = rtsFalse;
1528 #endif
1529
1530     // so this happens periodically:
1531     if (cap) scheduleCheckBlackHoles(cap);
1532     
1533     IF_DEBUG(scheduler, printAllThreads());
1534
1535     /*
1536      * We now have all the capabilities; if we're in an interrupting
1537      * state, then we should take the opportunity to delete all the
1538      * threads in the system.
1539      */
1540     if (sched_state >= SCHED_INTERRUPTING) {
1541         deleteAllThreads(&capabilities[0]);
1542         sched_state = SCHED_SHUTTING_DOWN;
1543     }
1544     
1545     heap_census = scheduleNeedHeapProfile(rtsTrue);
1546
1547     /* everybody back, start the GC.
1548      * Could do it in this thread, or signal a condition var
1549      * to do it in another thread.  Either way, we need to
1550      * broadcast on gc_pending_cond afterward.
1551      */
1552 #if defined(THREADED_RTS)
1553     debugTrace(DEBUG_sched, "doing GC");
1554 #endif
1555     GarbageCollect(force_major || heap_census);
1556     
1557     if (heap_census) {
1558         debugTrace(DEBUG_sched, "performing heap census");
1559         heapCensus();
1560         performHeapProfile = rtsFalse;
1561     }
1562
1563 #ifdef SPARKBALANCE
1564     /* JB 
1565        Once we are all together... this would be the place to balance all
1566        spark pools. No concurrent stealing or adding of new sparks can
1567        occur. Should be defined in Sparks.c. */
1568     balanceSparkPoolsCaps(n_capabilities, capabilities);
1569 #endif
1570
1571 #if defined(THREADED_RTS)
1572     // release our stash of capabilities.
1573     for (i = 0; i < n_capabilities; i++) {
1574         if (cap != &capabilities[i]) {
1575             task->cap = &capabilities[i];
1576             releaseCapability(&capabilities[i]);
1577         }
1578     }
1579     if (cap) {
1580         task->cap = cap;
1581     } else {
1582         task->cap = NULL;
1583     }
1584 #endif
1585
1586     return cap;
1587 }
1588
1589 /* ---------------------------------------------------------------------------
1590  * Singleton fork(). Do not copy any running threads.
1591  * ------------------------------------------------------------------------- */
1592
1593 pid_t
1594 forkProcess(HsStablePtr *entry
1595 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1596             STG_UNUSED
1597 #endif
1598            )
1599 {
1600 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1601     Task *task;
1602     pid_t pid;
1603     StgTSO* t,*next;
1604     Capability *cap;
1605     nat s;
1606     
1607 #if defined(THREADED_RTS)
1608     if (RtsFlags.ParFlags.nNodes > 1) {
1609         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1610         stg_exit(EXIT_FAILURE);
1611     }
1612 #endif
1613
1614     debugTrace(DEBUG_sched, "forking!");
1615     
1616     // ToDo: for SMP, we should probably acquire *all* the capabilities
1617     cap = rts_lock();
1618     
1619     // no funny business: hold locks while we fork, otherwise if some
1620     // other thread is holding a lock when the fork happens, the data
1621     // structure protected by the lock will forever be in an
1622     // inconsistent state in the child.  See also #1391.
1623     ACQUIRE_LOCK(&sched_mutex);
1624     ACQUIRE_LOCK(&cap->lock);
1625     ACQUIRE_LOCK(&cap->running_task->lock);
1626
1627     pid = fork();
1628     
1629     if (pid) { // parent
1630         
1631         RELEASE_LOCK(&sched_mutex);
1632         RELEASE_LOCK(&cap->lock);
1633         RELEASE_LOCK(&cap->running_task->lock);
1634
1635         // just return the pid
1636         rts_unlock(cap);
1637         return pid;
1638         
1639     } else { // child
1640         
1641 #if defined(THREADED_RTS)
1642         initMutex(&sched_mutex);
1643         initMutex(&cap->lock);
1644         initMutex(&cap->running_task->lock);
1645 #endif
1646
1647         // Now, all OS threads except the thread that forked are
1648         // stopped.  We need to stop all Haskell threads, including
1649         // those involved in foreign calls.  Also we need to delete
1650         // all Tasks, because they correspond to OS threads that are
1651         // now gone.
1652
1653         for (s = 0; s < total_steps; s++) {
1654           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1655             if (t->what_next == ThreadRelocated) {
1656                 next = t->_link;
1657             } else {
1658                 next = t->global_link;
1659                 // don't allow threads to catch the ThreadKilled
1660                 // exception, but we do want to raiseAsync() because these
1661                 // threads may be evaluating thunks that we need later.
1662                 deleteThread_(cap,t);
1663             }
1664           }
1665         }
1666         
1667         // Empty the run queue.  It seems tempting to let all the
1668         // killed threads stay on the run queue as zombies to be
1669         // cleaned up later, but some of them correspond to bound
1670         // threads for which the corresponding Task does not exist.
1671         cap->run_queue_hd = END_TSO_QUEUE;
1672         cap->run_queue_tl = END_TSO_QUEUE;
1673
1674         // Any suspended C-calling Tasks are no more, their OS threads
1675         // don't exist now:
1676         cap->suspended_ccalling_tasks = NULL;
1677
1678         // Empty the threads lists.  Otherwise, the garbage
1679         // collector may attempt to resurrect some of these threads.
1680         for (s = 0; s < total_steps; s++) {
1681             all_steps[s].threads = END_TSO_QUEUE;
1682         }
1683
1684         // Wipe the task list, except the current Task.
1685         ACQUIRE_LOCK(&sched_mutex);
1686         for (task = all_tasks; task != NULL; task=task->all_link) {
1687             if (task != cap->running_task) {
1688 #if defined(THREADED_RTS)
1689                 initMutex(&task->lock); // see #1391
1690 #endif
1691                 discardTask(task);
1692             }
1693         }
1694         RELEASE_LOCK(&sched_mutex);
1695
1696 #if defined(THREADED_RTS)
1697         // Wipe our spare workers list, they no longer exist.  New
1698         // workers will be created if necessary.
1699         cap->spare_workers = NULL;
1700         cap->returning_tasks_hd = NULL;
1701         cap->returning_tasks_tl = NULL;
1702 #endif
1703
1704         // On Unix, all timers are reset in the child, so we need to start
1705         // the timer again.
1706         initTimer();
1707         startTimer();
1708
1709         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1710         rts_checkSchedStatus("forkProcess",cap);
1711         
1712         rts_unlock(cap);
1713         hs_exit();                      // clean up and exit
1714         stg_exit(EXIT_SUCCESS);
1715     }
1716 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1717     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1718     return -1;
1719 #endif
1720 }
1721
1722 /* ---------------------------------------------------------------------------
1723  * Delete all the threads in the system
1724  * ------------------------------------------------------------------------- */
1725    
1726 static void
1727 deleteAllThreads ( Capability *cap )
1728 {
1729     // NOTE: only safe to call if we own all capabilities.
1730
1731     StgTSO* t, *next;
1732     nat s;
1733
1734     debugTrace(DEBUG_sched,"deleting all threads");
1735     for (s = 0; s < total_steps; s++) {
1736       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1737         if (t->what_next == ThreadRelocated) {
1738             next = t->_link;
1739         } else {
1740             next = t->global_link;
1741             deleteThread(cap,t);
1742         }
1743       }
1744     }      
1745
1746     // The run queue now contains a bunch of ThreadKilled threads.  We
1747     // must not throw these away: the main thread(s) will be in there
1748     // somewhere, and the main scheduler loop has to deal with it.
1749     // Also, the run queue is the only thing keeping these threads from
1750     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1751
1752 #if !defined(THREADED_RTS)
1753     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1754     ASSERT(sleeping_queue == END_TSO_QUEUE);
1755 #endif
1756 }
1757
1758 /* -----------------------------------------------------------------------------
1759    Managing the suspended_ccalling_tasks list.
1760    Locks required: sched_mutex
1761    -------------------------------------------------------------------------- */
1762
1763 STATIC_INLINE void
1764 suspendTask (Capability *cap, Task *task)
1765 {
1766     ASSERT(task->next == NULL && task->prev == NULL);
1767     task->next = cap->suspended_ccalling_tasks;
1768     task->prev = NULL;
1769     if (cap->suspended_ccalling_tasks) {
1770         cap->suspended_ccalling_tasks->prev = task;
1771     }
1772     cap->suspended_ccalling_tasks = task;
1773 }
1774
1775 STATIC_INLINE void
1776 recoverSuspendedTask (Capability *cap, Task *task)
1777 {
1778     if (task->prev) {
1779         task->prev->next = task->next;
1780     } else {
1781         ASSERT(cap->suspended_ccalling_tasks == task);
1782         cap->suspended_ccalling_tasks = task->next;
1783     }
1784     if (task->next) {
1785         task->next->prev = task->prev;
1786     }
1787     task->next = task->prev = NULL;
1788 }
1789
1790 /* ---------------------------------------------------------------------------
1791  * Suspending & resuming Haskell threads.
1792  * 
1793  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1794  * its capability before calling the C function.  This allows another
1795  * task to pick up the capability and carry on running Haskell
1796  * threads.  It also means that if the C call blocks, it won't lock
1797  * the whole system.
1798  *
1799  * The Haskell thread making the C call is put to sleep for the
1800  * duration of the call, on the susepended_ccalling_threads queue.  We
1801  * give out a token to the task, which it can use to resume the thread
1802  * on return from the C function.
1803  * ------------------------------------------------------------------------- */
1804    
1805 void *
1806 suspendThread (StgRegTable *reg)
1807 {
1808   Capability *cap;
1809   int saved_errno;
1810   StgTSO *tso;
1811   Task *task;
1812 #if mingw32_HOST_OS
1813   StgWord32 saved_winerror;
1814 #endif
1815
1816   saved_errno = errno;
1817 #if mingw32_HOST_OS
1818   saved_winerror = GetLastError();
1819 #endif
1820
1821   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1822    */
1823   cap = regTableToCapability(reg);
1824
1825   task = cap->running_task;
1826   tso = cap->r.rCurrentTSO;
1827
1828   debugTrace(DEBUG_sched, 
1829              "thread %lu did a safe foreign call", 
1830              (unsigned long)cap->r.rCurrentTSO->id);
1831
1832   // XXX this might not be necessary --SDM
1833   tso->what_next = ThreadRunGHC;
1834
1835   threadPaused(cap,tso);
1836
1837   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1838       tso->why_blocked = BlockedOnCCall;
1839       tso->flags |= TSO_BLOCKEX;
1840       tso->flags &= ~TSO_INTERRUPTIBLE;
1841   } else {
1842       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1843   }
1844
1845   // Hand back capability
1846   task->suspended_tso = tso;
1847
1848   ACQUIRE_LOCK(&cap->lock);
1849
1850   suspendTask(cap,task);
1851   cap->in_haskell = rtsFalse;
1852   releaseCapability_(cap,rtsFalse);
1853   
1854   RELEASE_LOCK(&cap->lock);
1855
1856 #if defined(THREADED_RTS)
1857   /* Preparing to leave the RTS, so ensure there's a native thread/task
1858      waiting to take over.
1859   */
1860   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1861 #endif
1862
1863   errno = saved_errno;
1864 #if mingw32_HOST_OS
1865   SetLastError(saved_winerror);
1866 #endif
1867   return task;
1868 }
1869
1870 StgRegTable *
1871 resumeThread (void *task_)
1872 {
1873     StgTSO *tso;
1874     Capability *cap;
1875     Task *task = task_;
1876     int saved_errno;
1877 #if mingw32_HOST_OS
1878     StgWord32 saved_winerror;
1879 #endif
1880
1881     saved_errno = errno;
1882 #if mingw32_HOST_OS
1883     saved_winerror = GetLastError();
1884 #endif
1885
1886     cap = task->cap;
1887     // Wait for permission to re-enter the RTS with the result.
1888     waitForReturnCapability(&cap,task);
1889     // we might be on a different capability now... but if so, our
1890     // entry on the suspended_ccalling_tasks list will also have been
1891     // migrated.
1892
1893     // Remove the thread from the suspended list
1894     recoverSuspendedTask(cap,task);
1895
1896     tso = task->suspended_tso;
1897     task->suspended_tso = NULL;
1898     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1899     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1900     
1901     if (tso->why_blocked == BlockedOnCCall) {
1902         awakenBlockedExceptionQueue(cap,tso);
1903         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1904     }
1905     
1906     /* Reset blocking status */
1907     tso->why_blocked  = NotBlocked;
1908     
1909     cap->r.rCurrentTSO = tso;
1910     cap->in_haskell = rtsTrue;
1911     errno = saved_errno;
1912 #if mingw32_HOST_OS
1913     SetLastError(saved_winerror);
1914 #endif
1915
1916     /* We might have GC'd, mark the TSO dirty again */
1917     dirty_TSO(cap,tso);
1918
1919     IF_DEBUG(sanity, checkTSO(tso));
1920
1921     return &cap->r;
1922 }
1923
1924 /* ---------------------------------------------------------------------------
1925  * scheduleThread()
1926  *
1927  * scheduleThread puts a thread on the end  of the runnable queue.
1928  * This will usually be done immediately after a thread is created.
1929  * The caller of scheduleThread must create the thread using e.g.
1930  * createThread and push an appropriate closure
1931  * on this thread's stack before the scheduler is invoked.
1932  * ------------------------------------------------------------------------ */
1933
1934 void
1935 scheduleThread(Capability *cap, StgTSO *tso)
1936 {
1937     // The thread goes at the *end* of the run-queue, to avoid possible
1938     // starvation of any threads already on the queue.
1939     appendToRunQueue(cap,tso);
1940 }
1941
1942 void
1943 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1944 {
1945 #if defined(THREADED_RTS)
1946     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1947                               // move this thread from now on.
1948     cpu %= RtsFlags.ParFlags.nNodes;
1949     if (cpu == cap->no) {
1950         appendToRunQueue(cap,tso);
1951     } else {
1952         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1953     }
1954 #else
1955     appendToRunQueue(cap,tso);
1956 #endif
1957 }
1958
1959 Capability *
1960 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1961 {
1962     Task *task;
1963
1964     // We already created/initialised the Task
1965     task = cap->running_task;
1966
1967     // This TSO is now a bound thread; make the Task and TSO
1968     // point to each other.
1969     tso->bound = task;
1970     tso->cap = cap;
1971
1972     task->tso = tso;
1973     task->ret = ret;
1974     task->stat = NoStatus;
1975
1976     appendToRunQueue(cap,tso);
1977
1978     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1979
1980     cap = schedule(cap,task);
1981
1982     ASSERT(task->stat != NoStatus);
1983     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1984
1985     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1986     return cap;
1987 }
1988
1989 /* ----------------------------------------------------------------------------
1990  * Starting Tasks
1991  * ------------------------------------------------------------------------- */
1992
1993 #if defined(THREADED_RTS)
1994 void OSThreadProcAttr
1995 workerStart(Task *task)
1996 {
1997     Capability *cap;
1998
1999     // See startWorkerTask().
2000     ACQUIRE_LOCK(&task->lock);
2001     cap = task->cap;
2002     RELEASE_LOCK(&task->lock);
2003
2004     // set the thread-local pointer to the Task:
2005     taskEnter(task);
2006
2007     // schedule() runs without a lock.
2008     cap = schedule(cap,task);
2009
2010     // On exit from schedule(), we have a Capability.
2011     releaseCapability(cap);
2012     workerTaskStop(task);
2013 }
2014 #endif
2015
2016 /* ---------------------------------------------------------------------------
2017  * initScheduler()
2018  *
2019  * Initialise the scheduler.  This resets all the queues - if the
2020  * queues contained any threads, they'll be garbage collected at the
2021  * next pass.
2022  *
2023  * ------------------------------------------------------------------------ */
2024
2025 void 
2026 initScheduler(void)
2027 {
2028 #if !defined(THREADED_RTS)
2029   blocked_queue_hd  = END_TSO_QUEUE;
2030   blocked_queue_tl  = END_TSO_QUEUE;
2031   sleeping_queue    = END_TSO_QUEUE;
2032 #endif
2033
2034   blackhole_queue   = END_TSO_QUEUE;
2035
2036   sched_state    = SCHED_RUNNING;
2037   recent_activity = ACTIVITY_YES;
2038
2039 #if defined(THREADED_RTS)
2040   /* Initialise the mutex and condition variables used by
2041    * the scheduler. */
2042   initMutex(&sched_mutex);
2043 #endif
2044   
2045   ACQUIRE_LOCK(&sched_mutex);
2046
2047   /* A capability holds the state a native thread needs in
2048    * order to execute STG code. At least one capability is
2049    * floating around (only THREADED_RTS builds have more than one).
2050    */
2051   initCapabilities();
2052
2053   initTaskManager();
2054
2055 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2056   initSparkPools();
2057 #endif
2058
2059 #if defined(THREADED_RTS)
2060   /*
2061    * Eagerly start one worker to run each Capability, except for
2062    * Capability 0.  The idea is that we're probably going to start a
2063    * bound thread on Capability 0 pretty soon, so we don't want a
2064    * worker task hogging it.
2065    */
2066   { 
2067       nat i;
2068       Capability *cap;
2069       for (i = 1; i < n_capabilities; i++) {
2070           cap = &capabilities[i];
2071           ACQUIRE_LOCK(&cap->lock);
2072           startWorkerTask(cap, workerStart);
2073           RELEASE_LOCK(&cap->lock);
2074       }
2075   }
2076 #endif
2077
2078   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2079
2080   RELEASE_LOCK(&sched_mutex);
2081 }
2082
2083 void
2084 exitScheduler(
2085     rtsBool wait_foreign
2086 #if !defined(THREADED_RTS)
2087                          __attribute__((unused))
2088 #endif
2089 )
2090                /* see Capability.c, shutdownCapability() */
2091 {
2092     Task *task = NULL;
2093
2094 #if defined(THREADED_RTS)
2095     ACQUIRE_LOCK(&sched_mutex);
2096     task = newBoundTask();
2097     RELEASE_LOCK(&sched_mutex);
2098 #endif
2099
2100     // If we haven't killed all the threads yet, do it now.
2101     if (sched_state < SCHED_SHUTTING_DOWN) {
2102         sched_state = SCHED_INTERRUPTING;
2103         scheduleDoGC(NULL,task,rtsFalse);    
2104     }
2105     sched_state = SCHED_SHUTTING_DOWN;
2106
2107 #if defined(THREADED_RTS)
2108     { 
2109         nat i;
2110         
2111         for (i = 0; i < n_capabilities; i++) {
2112             shutdownCapability(&capabilities[i], task, wait_foreign);
2113         }
2114         boundTaskExiting(task);
2115         stopTaskManager();
2116     }
2117 #else
2118     freeCapability(&MainCapability);
2119 #endif
2120 }
2121
2122 void
2123 freeScheduler( void )
2124 {
2125     freeTaskManager();
2126     if (n_capabilities != 1) {
2127         stgFree(capabilities);
2128     }
2129 #if defined(THREADED_RTS)
2130     closeMutex(&sched_mutex);
2131 #endif
2132 }
2133
2134 /* -----------------------------------------------------------------------------
2135    performGC
2136
2137    This is the interface to the garbage collector from Haskell land.
2138    We provide this so that external C code can allocate and garbage
2139    collect when called from Haskell via _ccall_GC.
2140    -------------------------------------------------------------------------- */
2141
2142 static void
2143 performGC_(rtsBool force_major)
2144 {
2145     Task *task;
2146     // We must grab a new Task here, because the existing Task may be
2147     // associated with a particular Capability, and chained onto the 
2148     // suspended_ccalling_tasks queue.
2149     ACQUIRE_LOCK(&sched_mutex);
2150     task = newBoundTask();
2151     RELEASE_LOCK(&sched_mutex);
2152     scheduleDoGC(NULL,task,force_major);
2153     boundTaskExiting(task);
2154 }
2155
2156 void
2157 performGC(void)
2158 {
2159     performGC_(rtsFalse);
2160 }
2161
2162 void
2163 performMajorGC(void)
2164 {
2165     performGC_(rtsTrue);
2166 }
2167
2168 /* -----------------------------------------------------------------------------
2169    Stack overflow
2170
2171    If the thread has reached its maximum stack size, then raise the
2172    StackOverflow exception in the offending thread.  Otherwise
2173    relocate the TSO into a larger chunk of memory and adjust its stack
2174    size appropriately.
2175    -------------------------------------------------------------------------- */
2176
2177 static StgTSO *
2178 threadStackOverflow(Capability *cap, StgTSO *tso)
2179 {
2180   nat new_stack_size, stack_words;
2181   lnat new_tso_size;
2182   StgPtr new_sp;
2183   StgTSO *dest;
2184
2185   IF_DEBUG(sanity,checkTSO(tso));
2186
2187   // don't allow throwTo() to modify the blocked_exceptions queue
2188   // while we are moving the TSO:
2189   lockClosure((StgClosure *)tso);
2190
2191   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2192       // NB. never raise a StackOverflow exception if the thread is
2193       // inside Control.Exceptino.block.  It is impractical to protect
2194       // against stack overflow exceptions, since virtually anything
2195       // can raise one (even 'catch'), so this is the only sensible
2196       // thing to do here.  See bug #767.
2197
2198       debugTrace(DEBUG_gc,
2199                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2200                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2201       IF_DEBUG(gc,
2202                /* If we're debugging, just print out the top of the stack */
2203                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2204                                                 tso->sp+64)));
2205
2206       // Send this thread the StackOverflow exception
2207       unlockTSO(tso);
2208       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2209       return tso;
2210   }
2211
2212   /* Try to double the current stack size.  If that takes us over the
2213    * maximum stack size for this thread, then use the maximum instead
2214    * (that is, unless we're already at or over the max size and we
2215    * can't raise the StackOverflow exception (see above), in which
2216    * case just double the size). Finally round up so the TSO ends up as
2217    * a whole number of blocks.
2218    */
2219   if (tso->stack_size >= tso->max_stack_size) {
2220       new_stack_size = tso->stack_size * 2;
2221   } else { 
2222       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2223   }
2224   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2225                                        TSO_STRUCT_SIZE)/sizeof(W_);
2226   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2227   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2228
2229   debugTrace(DEBUG_sched, 
2230              "increasing stack size from %ld words to %d.",
2231              (long)tso->stack_size, new_stack_size);
2232
2233   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2234   TICK_ALLOC_TSO(new_stack_size,0);
2235
2236   /* copy the TSO block and the old stack into the new area */
2237   memcpy(dest,tso,TSO_STRUCT_SIZE);
2238   stack_words = tso->stack + tso->stack_size - tso->sp;
2239   new_sp = (P_)dest + new_tso_size - stack_words;
2240   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2241
2242   /* relocate the stack pointers... */
2243   dest->sp         = new_sp;
2244   dest->stack_size = new_stack_size;
2245         
2246   /* Mark the old TSO as relocated.  We have to check for relocated
2247    * TSOs in the garbage collector and any primops that deal with TSOs.
2248    *
2249    * It's important to set the sp value to just beyond the end
2250    * of the stack, so we don't attempt to scavenge any part of the
2251    * dead TSO's stack.
2252    */
2253   tso->what_next = ThreadRelocated;
2254   setTSOLink(cap,tso,dest);
2255   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2256   tso->why_blocked = NotBlocked;
2257
2258   IF_PAR_DEBUG(verbose,
2259                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2260                      tso->id, tso, tso->stack_size);
2261                /* If we're debugging, just print out the top of the stack */
2262                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2263                                                 tso->sp+64)));
2264   
2265   unlockTSO(dest);
2266   unlockTSO(tso);
2267
2268   IF_DEBUG(sanity,checkTSO(dest));
2269 #if 0
2270   IF_DEBUG(scheduler,printTSO(dest));
2271 #endif
2272
2273   return dest;
2274 }
2275
2276 static StgTSO *
2277 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2278 {
2279     bdescr *bd, *new_bd;
2280     lnat free_w, tso_size_w;
2281     StgTSO *new_tso;
2282
2283     tso_size_w = tso_sizeW(tso);
2284
2285     if (tso_size_w < MBLOCK_SIZE_W || 
2286         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2287     {
2288         return tso;
2289     }
2290
2291     // don't allow throwTo() to modify the blocked_exceptions queue
2292     // while we are moving the TSO:
2293     lockClosure((StgClosure *)tso);
2294
2295     // this is the number of words we'll free
2296     free_w = round_to_mblocks(tso_size_w/2);
2297
2298     bd = Bdescr((StgPtr)tso);
2299     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2300     bd->free = bd->start + TSO_STRUCT_SIZEW;
2301
2302     new_tso = (StgTSO *)new_bd->start;
2303     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2304     new_tso->stack_size = new_bd->free - new_tso->stack;
2305
2306     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2307                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2308
2309     tso->what_next = ThreadRelocated;
2310     tso->_link = new_tso; // no write barrier reqd: same generation
2311
2312     // The TSO attached to this Task may have moved, so update the
2313     // pointer to it.
2314     if (task->tso == tso) {
2315         task->tso = new_tso;
2316     }
2317
2318     unlockTSO(new_tso);
2319     unlockTSO(tso);
2320
2321     IF_DEBUG(sanity,checkTSO(new_tso));
2322
2323     return new_tso;
2324 }
2325
2326 /* ---------------------------------------------------------------------------
2327    Interrupt execution
2328    - usually called inside a signal handler so it mustn't do anything fancy.   
2329    ------------------------------------------------------------------------ */
2330
2331 void
2332 interruptStgRts(void)
2333 {
2334     sched_state = SCHED_INTERRUPTING;
2335     setContextSwitches();
2336     wakeUpRts();
2337 }
2338
2339 /* -----------------------------------------------------------------------------
2340    Wake up the RTS
2341    
2342    This function causes at least one OS thread to wake up and run the
2343    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2344    an external event has arrived that may need servicing (eg. a
2345    keyboard interrupt).
2346
2347    In the single-threaded RTS we don't do anything here; we only have
2348    one thread anyway, and the event that caused us to want to wake up
2349    will have interrupted any blocking system call in progress anyway.
2350    -------------------------------------------------------------------------- */
2351
2352 void
2353 wakeUpRts(void)
2354 {
2355 #if defined(THREADED_RTS)
2356     // This forces the IO Manager thread to wakeup, which will
2357     // in turn ensure that some OS thread wakes up and runs the
2358     // scheduler loop, which will cause a GC and deadlock check.
2359     ioManagerWakeup();
2360 #endif
2361 }
2362
2363 /* -----------------------------------------------------------------------------
2364  * checkBlackHoles()
2365  *
2366  * Check the blackhole_queue for threads that can be woken up.  We do
2367  * this periodically: before every GC, and whenever the run queue is
2368  * empty.
2369  *
2370  * An elegant solution might be to just wake up all the blocked
2371  * threads with awakenBlockedQueue occasionally: they'll go back to
2372  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2373  * doesn't give us a way to tell whether we've actually managed to
2374  * wake up any threads, so we would be busy-waiting.
2375  *
2376  * -------------------------------------------------------------------------- */
2377
2378 static rtsBool
2379 checkBlackHoles (Capability *cap)
2380 {
2381     StgTSO **prev, *t;
2382     rtsBool any_woke_up = rtsFalse;
2383     StgHalfWord type;
2384
2385     // blackhole_queue is global:
2386     ASSERT_LOCK_HELD(&sched_mutex);
2387
2388     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2389
2390     // ASSUMES: sched_mutex
2391     prev = &blackhole_queue;
2392     t = blackhole_queue;
2393     while (t != END_TSO_QUEUE) {
2394         ASSERT(t->why_blocked == BlockedOnBlackHole);
2395         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2396         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2397             IF_DEBUG(sanity,checkTSO(t));
2398             t = unblockOne(cap, t);
2399             *prev = t;
2400             any_woke_up = rtsTrue;
2401         } else {
2402             prev = &t->_link;
2403             t = t->_link;
2404         }
2405     }
2406
2407     return any_woke_up;
2408 }
2409
2410 /* -----------------------------------------------------------------------------
2411    Deleting threads
2412
2413    This is used for interruption (^C) and forking, and corresponds to
2414    raising an exception but without letting the thread catch the
2415    exception.
2416    -------------------------------------------------------------------------- */
2417
2418 static void 
2419 deleteThread (Capability *cap, StgTSO *tso)
2420 {
2421     // NOTE: must only be called on a TSO that we have exclusive
2422     // access to, because we will call throwToSingleThreaded() below.
2423     // The TSO must be on the run queue of the Capability we own, or 
2424     // we must own all Capabilities.
2425
2426     if (tso->why_blocked != BlockedOnCCall &&
2427         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2428         throwToSingleThreaded(cap,tso,NULL);
2429     }
2430 }
2431
2432 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2433 static void 
2434 deleteThread_(Capability *cap, StgTSO *tso)
2435 { // for forkProcess only:
2436   // like deleteThread(), but we delete threads in foreign calls, too.
2437
2438     if (tso->why_blocked == BlockedOnCCall ||
2439         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2440         unblockOne(cap,tso);
2441         tso->what_next = ThreadKilled;
2442     } else {
2443         deleteThread(cap,tso);
2444     }
2445 }
2446 #endif
2447
2448 /* -----------------------------------------------------------------------------
2449    raiseExceptionHelper
2450    
2451    This function is called by the raise# primitve, just so that we can
2452    move some of the tricky bits of raising an exception from C-- into
2453    C.  Who knows, it might be a useful re-useable thing here too.
2454    -------------------------------------------------------------------------- */
2455
2456 StgWord
2457 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2458 {
2459     Capability *cap = regTableToCapability(reg);
2460     StgThunk *raise_closure = NULL;
2461     StgPtr p, next;
2462     StgRetInfoTable *info;
2463     //
2464     // This closure represents the expression 'raise# E' where E
2465     // is the exception raise.  It is used to overwrite all the
2466     // thunks which are currently under evaluataion.
2467     //
2468
2469     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2470     // LDV profiling: stg_raise_info has THUNK as its closure
2471     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2472     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2473     // 1 does not cause any problem unless profiling is performed.
2474     // However, when LDV profiling goes on, we need to linearly scan
2475     // small object pool, where raise_closure is stored, so we should
2476     // use MIN_UPD_SIZE.
2477     //
2478     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2479     //                                 sizeofW(StgClosure)+1);
2480     //
2481
2482     //
2483     // Walk up the stack, looking for the catch frame.  On the way,
2484     // we update any closures pointed to from update frames with the
2485     // raise closure that we just built.
2486     //
2487     p = tso->sp;
2488     while(1) {
2489         info = get_ret_itbl((StgClosure *)p);
2490         next = p + stack_frame_sizeW((StgClosure *)p);
2491         switch (info->i.type) {
2492             
2493         case UPDATE_FRAME:
2494             // Only create raise_closure if we need to.
2495             if (raise_closure == NULL) {
2496                 raise_closure = 
2497                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2498                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2499                 raise_closure->payload[0] = exception;
2500             }
2501             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2502             p = next;
2503             continue;
2504
2505         case ATOMICALLY_FRAME:
2506             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2507             tso->sp = p;
2508             return ATOMICALLY_FRAME;
2509             
2510         case CATCH_FRAME:
2511             tso->sp = p;
2512             return CATCH_FRAME;
2513
2514         case CATCH_STM_FRAME:
2515             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2516             tso->sp = p;
2517             return CATCH_STM_FRAME;
2518             
2519         case STOP_FRAME:
2520             tso->sp = p;
2521             return STOP_FRAME;
2522
2523         case CATCH_RETRY_FRAME:
2524         default:
2525             p = next; 
2526             continue;
2527         }
2528     }
2529 }
2530
2531
2532 /* -----------------------------------------------------------------------------
2533    findRetryFrameHelper
2534
2535    This function is called by the retry# primitive.  It traverses the stack
2536    leaving tso->sp referring to the frame which should handle the retry.  
2537
2538    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2539    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2540
2541    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2542    create) because retries are not considered to be exceptions, despite the
2543    similar implementation.
2544
2545    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2546    not be created within memory transactions.
2547    -------------------------------------------------------------------------- */
2548
2549 StgWord
2550 findRetryFrameHelper (StgTSO *tso)
2551 {
2552   StgPtr           p, next;
2553   StgRetInfoTable *info;
2554
2555   p = tso -> sp;
2556   while (1) {
2557     info = get_ret_itbl((StgClosure *)p);
2558     next = p + stack_frame_sizeW((StgClosure *)p);
2559     switch (info->i.type) {
2560       
2561     case ATOMICALLY_FRAME:
2562         debugTrace(DEBUG_stm,
2563                    "found ATOMICALLY_FRAME at %p during retry", p);
2564         tso->sp = p;
2565         return ATOMICALLY_FRAME;
2566       
2567     case CATCH_RETRY_FRAME:
2568         debugTrace(DEBUG_stm,
2569                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2570         tso->sp = p;
2571         return CATCH_RETRY_FRAME;
2572       
2573     case CATCH_STM_FRAME: {
2574         StgTRecHeader *trec = tso -> trec;
2575         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2576         debugTrace(DEBUG_stm,
2577                    "found CATCH_STM_FRAME at %p during retry", p);
2578         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2579         stmAbortTransaction(tso -> cap, trec);
2580         stmFreeAbortedTRec(tso -> cap, trec);
2581         tso -> trec = outer;
2582         p = next; 
2583         continue;
2584     }
2585       
2586
2587     default:
2588       ASSERT(info->i.type != CATCH_FRAME);
2589       ASSERT(info->i.type != STOP_FRAME);
2590       p = next; 
2591       continue;
2592     }
2593   }
2594 }
2595
2596 /* -----------------------------------------------------------------------------
2597    resurrectThreads is called after garbage collection on the list of
2598    threads found to be garbage.  Each of these threads will be woken
2599    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2600    on an MVar, or NonTermination if the thread was blocked on a Black
2601    Hole.
2602
2603    Locks: assumes we hold *all* the capabilities.
2604    -------------------------------------------------------------------------- */
2605
2606 void
2607 resurrectThreads (StgTSO *threads)
2608 {
2609     StgTSO *tso, *next;
2610     Capability *cap;
2611     step *step;
2612
2613     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2614         next = tso->global_link;
2615
2616         step = Bdescr((P_)tso)->step;
2617         tso->global_link = step->threads;
2618         step->threads = tso;
2619
2620         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2621         
2622         // Wake up the thread on the Capability it was last on
2623         cap = tso->cap;
2624         
2625         switch (tso->why_blocked) {
2626         case BlockedOnMVar:
2627         case BlockedOnException:
2628             /* Called by GC - sched_mutex lock is currently held. */
2629             throwToSingleThreaded(cap, tso,
2630                                   (StgClosure *)blockedOnDeadMVar_closure);
2631             break;
2632         case BlockedOnBlackHole:
2633             throwToSingleThreaded(cap, tso,
2634                                   (StgClosure *)nonTermination_closure);
2635             break;
2636         case BlockedOnSTM:
2637             throwToSingleThreaded(cap, tso,
2638                                   (StgClosure *)blockedIndefinitely_closure);
2639             break;
2640         case NotBlocked:
2641             /* This might happen if the thread was blocked on a black hole
2642              * belonging to a thread that we've just woken up (raiseAsync
2643              * can wake up threads, remember...).
2644              */
2645             continue;
2646         default:
2647             barf("resurrectThreads: thread blocked in a strange way");
2648         }
2649     }
2650 }
2651
2652 /* -----------------------------------------------------------------------------
2653    performPendingThrowTos is called after garbage collection, and
2654    passed a list of threads that were found to have pending throwTos
2655    (tso->blocked_exceptions was not empty), and were blocked.
2656    Normally this doesn't happen, because we would deliver the
2657    exception directly if the target thread is blocked, but there are
2658    small windows where it might occur on a multiprocessor (see
2659    throwTo()).
2660
2661    NB. we must be holding all the capabilities at this point, just
2662    like resurrectThreads().
2663    -------------------------------------------------------------------------- */
2664
2665 void
2666 performPendingThrowTos (StgTSO *threads)
2667 {
2668     StgTSO *tso, *next;
2669     Capability *cap;
2670     step *step;
2671
2672     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2673         next = tso->global_link;
2674
2675         step = Bdescr((P_)tso)->step;
2676         tso->global_link = step->threads;
2677         step->threads = tso;
2678
2679         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2680         
2681         cap = tso->cap;
2682         maybePerformBlockedException(cap, tso);
2683     }
2684 }