e17c6534d667cc0475297ee5a02cce9265664c85
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
153 #endif
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
156 #endif
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
160                                          StgTSO *t);
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
162                                     nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
165                                              StgTSO *t );
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168                                 rtsBool force_major);
169
170 static rtsBool checkBlackHoles(Capability *cap);
171
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
174
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
177
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
180 #endif
181
182 #ifdef DEBUG
183 static char *whatNext_strs[] = {
184   "(unknown)",
185   "ThreadRunGHC",
186   "ThreadInterpret",
187   "ThreadKilled",
188   "ThreadRelocated",
189   "ThreadComplete"
190 };
191 #endif
192
193 /* -----------------------------------------------------------------------------
194  * Putting a thread on the run queue: different scheduling policies
195  * -------------------------------------------------------------------------- */
196
197 STATIC_INLINE void
198 addToRunQueue( Capability *cap, StgTSO *t )
199 {
200 #if defined(PARALLEL_HASKELL)
201     if (RtsFlags.ParFlags.doFairScheduling) { 
202         // this does round-robin scheduling; good for concurrency
203         appendToRunQueue(cap,t);
204     } else {
205         // this does unfair scheduling; good for parallelism
206         pushOnRunQueue(cap,t);
207     }
208 #else
209     // this does round-robin scheduling; good for concurrency
210     appendToRunQueue(cap,t);
211 #endif
212 }
213
214 /* ---------------------------------------------------------------------------
215    Main scheduling loop.
216
217    We use round-robin scheduling, each thread returning to the
218    scheduler loop when one of these conditions is detected:
219
220       * out of heap space
221       * timer expires (thread yields)
222       * thread blocks
223       * thread ends
224       * stack overflow
225
226    GRAN version:
227      In a GranSim setup this loop iterates over the global event queue.
228      This revolves around the global event queue, which determines what 
229      to do next. Therefore, it's more complicated than either the 
230      concurrent or the parallel (GUM) setup.
231   This version has been entirely removed (JB 2008/08).
232
233    GUM version:
234      GUM iterates over incoming messages.
235      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236      and sends out a fish whenever it has nothing to do; in-between
237      doing the actual reductions (shared code below) it processes the
238      incoming messages and deals with delayed operations 
239      (see PendingFetches).
240      This is not the ugliest code you could imagine, but it's bloody close.
241
242   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244   as well as future GUM versions. This file has been refurbished to
245   only contain valid code, which is however incomplete, refers to
246   invalid includes etc.
247
248    ------------------------------------------------------------------------ */
249
250 static Capability *
251 schedule (Capability *initialCapability, Task *task)
252 {
253   StgTSO *t;
254   Capability *cap;
255   StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257   rtsBool receivedFinish = rtsFalse;
258 #endif
259   nat prev_what_next;
260   rtsBool ready_to_gc;
261 #if defined(THREADED_RTS)
262   rtsBool first = rtsTrue;
263 #endif
264   
265   cap = initialCapability;
266
267   // Pre-condition: this task owns initialCapability.
268   // The sched_mutex is *NOT* held
269   // NB. on return, we still hold a capability.
270
271   debugTrace (DEBUG_sched, 
272               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273               task, initialCapability);
274
275   schedulePreLoop();
276
277   // -----------------------------------------------------------
278   // Scheduler loop starts here:
279
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION        (!receivedFinish)
282 #else
283 #define TERMINATION_CONDITION        rtsTrue
284 #endif
285
286   while (TERMINATION_CONDITION) {
287
288     // Check whether we have re-entered the RTS from Haskell without
289     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
290     // call).
291     if (cap->in_haskell) {
292           errorBelch("schedule: re-entered unsafely.\n"
293                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
294           stg_exit(EXIT_FAILURE);
295     }
296
297     // The interruption / shutdown sequence.
298     // 
299     // In order to cleanly shut down the runtime, we want to:
300     //   * make sure that all main threads return to their callers
301     //     with the state 'Interrupted'.
302     //   * clean up all OS threads assocated with the runtime
303     //   * free all memory etc.
304     //
305     // So the sequence for ^C goes like this:
306     //
307     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
308     //     arranges for some Capability to wake up
309     //
310     //   * all threads in the system are halted, and the zombies are
311     //     placed on the run queue for cleaning up.  We acquire all
312     //     the capabilities in order to delete the threads, this is
313     //     done by scheduleDoGC() for convenience (because GC already
314     //     needs to acquire all the capabilities).  We can't kill
315     //     threads involved in foreign calls.
316     // 
317     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
318     //
319     //   * sched_state := SCHED_SHUTTING_DOWN
320     //
321     //   * all workers exit when the run queue on their capability
322     //     drains.  All main threads will also exit when their TSO
323     //     reaches the head of the run queue and they can return.
324     //
325     //   * eventually all Capabilities will shut down, and the RTS can
326     //     exit.
327     //
328     //   * We might be left with threads blocked in foreign calls, 
329     //     we should really attempt to kill these somehow (TODO);
330     
331     switch (sched_state) {
332     case SCHED_RUNNING:
333         break;
334     case SCHED_INTERRUPTING:
335         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337         discardSparksCap(cap);
338 #endif
339         /* scheduleDoGC() deletes all the threads */
340         cap = scheduleDoGC(cap,task,rtsFalse);
341         break;
342     case SCHED_SHUTTING_DOWN:
343         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344         // If we are a worker, just exit.  If we're a bound thread
345         // then we will exit below when we've removed our TSO from
346         // the run queue.
347         if (task->tso == NULL && emptyRunQueue(cap)) {
348             return cap;
349         }
350         break;
351     default:
352         barf("sched_state: %d", sched_state);
353     }
354
355     scheduleFindWork(cap);
356
357     /* work pushing, currently relevant only for THREADED_RTS:
358        (pushes threads, wakes up idle capabilities for stealing) */
359     schedulePushWork(cap,task);
360
361 #if defined(PARALLEL_HASKELL)
362     /* since we perform a blocking receive and continue otherwise,
363        either we never reach here or we definitely have work! */
364     // from here: non-empty run queue
365     ASSERT(!emptyRunQueue(cap));
366
367     if (PacketsWaiting()) {  /* now process incoming messages, if any
368                                 pending...  
369
370                                 CAUTION: scheduleGetRemoteWork called
371                                 above, waits for messages as well! */
372       processMessages(cap, &receivedFinish);
373     }
374 #endif // PARALLEL_HASKELL: non-empty run queue!
375
376     scheduleDetectDeadlock(cap,task);
377
378 #if defined(THREADED_RTS)
379     cap = task->cap;    // reload cap, it might have changed
380 #endif
381
382     // Normally, the only way we can get here with no threads to
383     // run is if a keyboard interrupt received during 
384     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385     // Additionally, it is not fatal for the
386     // threaded RTS to reach here with no threads to run.
387     //
388     // win32: might be here due to awaitEvent() being abandoned
389     // as a result of a console event having been delivered.
390     
391 #if defined(THREADED_RTS)
392     if (first) 
393     {
394     // XXX: ToDo
395     //     // don't yield the first time, we want a chance to run this
396     //     // thread for a bit, even if there are others banging at the
397     //     // door.
398     //     first = rtsFalse;
399     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
400     }
401
402     scheduleYield(&cap,task);
403     if (emptyRunQueue(cap)) continue; // look for work again
404 #endif
405
406 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
407     if ( emptyRunQueue(cap) ) {
408         ASSERT(sched_state >= SCHED_INTERRUPTING);
409     }
410 #endif
411
412     // 
413     // Get a thread to run
414     //
415     t = popRunQueue(cap);
416
417     // Sanity check the thread we're about to run.  This can be
418     // expensive if there is lots of thread switching going on...
419     IF_DEBUG(sanity,checkTSO(t));
420
421 #if defined(THREADED_RTS)
422     // Check whether we can run this thread in the current task.
423     // If not, we have to pass our capability to the right task.
424     {
425         Task *bound = t->bound;
426       
427         if (bound) {
428             if (bound == task) {
429                 debugTrace(DEBUG_sched,
430                            "### Running thread %lu in bound thread", (unsigned long)t->id);
431                 // yes, the Haskell thread is bound to the current native thread
432             } else {
433                 debugTrace(DEBUG_sched,
434                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
435                 // no, bound to a different Haskell thread: pass to that thread
436                 pushOnRunQueue(cap,t);
437                 continue;
438             }
439         } else {
440             // The thread we want to run is unbound.
441             if (task->tso) { 
442                 debugTrace(DEBUG_sched,
443                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
444                 // no, the current native thread is bound to a different
445                 // Haskell thread, so pass it to any worker thread
446                 pushOnRunQueue(cap,t);
447                 continue; 
448             }
449         }
450     }
451 #endif
452
453     /* context switches are initiated by the timer signal, unless
454      * the user specified "context switch as often as possible", with
455      * +RTS -C0
456      */
457     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
458         && !emptyThreadQueues(cap)) {
459         cap->context_switch = 1;
460     }
461          
462 run_thread:
463
464     // CurrentTSO is the thread to run.  t might be different if we
465     // loop back to run_thread, so make sure to set CurrentTSO after
466     // that.
467     cap->r.rCurrentTSO = t;
468
469     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
470                               (long)t->id, whatNext_strs[t->what_next]);
471
472     startHeapProfTimer();
473
474     // Check for exceptions blocked on this thread
475     maybePerformBlockedException (cap, t);
476
477     // ----------------------------------------------------------------------
478     // Run the current thread 
479
480     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
481     ASSERT(t->cap == cap);
482
483     prev_what_next = t->what_next;
484
485     errno = t->saved_errno;
486 #if mingw32_HOST_OS
487     SetLastError(t->saved_winerror);
488 #endif
489
490     cap->in_haskell = rtsTrue;
491
492     dirty_TSO(cap,t);
493
494 #if defined(THREADED_RTS)
495     if (recent_activity == ACTIVITY_DONE_GC) {
496         // ACTIVITY_DONE_GC means we turned off the timer signal to
497         // conserve power (see #1623).  Re-enable it here.
498         nat prev;
499         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
500         if (prev == ACTIVITY_DONE_GC) {
501             startTimer();
502         }
503     } else {
504         recent_activity = ACTIVITY_YES;
505     }
506 #endif
507
508     switch (prev_what_next) {
509         
510     case ThreadKilled:
511     case ThreadComplete:
512         /* Thread already finished, return to scheduler. */
513         ret = ThreadFinished;
514         break;
515         
516     case ThreadRunGHC:
517     {
518         StgRegTable *r;
519         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
520         cap = regTableToCapability(r);
521         ret = r->rRet;
522         break;
523     }
524     
525     case ThreadInterpret:
526         cap = interpretBCO(cap);
527         ret = cap->r.rRet;
528         break;
529         
530     default:
531         barf("schedule: invalid what_next field");
532     }
533
534     cap->in_haskell = rtsFalse;
535
536     // The TSO might have moved, eg. if it re-entered the RTS and a GC
537     // happened.  So find the new location:
538     t = cap->r.rCurrentTSO;
539
540     // We have run some Haskell code: there might be blackhole-blocked
541     // threads to wake up now.
542     // Lock-free test here should be ok, we're just setting a flag.
543     if ( blackhole_queue != END_TSO_QUEUE ) {
544         blackholes_need_checking = rtsTrue;
545     }
546     
547     // And save the current errno in this thread.
548     // XXX: possibly bogus for SMP because this thread might already
549     // be running again, see code below.
550     t->saved_errno = errno;
551 #if mingw32_HOST_OS
552     // Similarly for Windows error code
553     t->saved_winerror = GetLastError();
554 #endif
555
556 #if defined(THREADED_RTS)
557     // If ret is ThreadBlocked, and this Task is bound to the TSO that
558     // blocked, we are in limbo - the TSO is now owned by whatever it
559     // is blocked on, and may in fact already have been woken up,
560     // perhaps even on a different Capability.  It may be the case
561     // that task->cap != cap.  We better yield this Capability
562     // immediately and return to normaility.
563     if (ret == ThreadBlocked) {
564         debugTrace(DEBUG_sched,
565                    "--<< thread %lu (%s) stopped: blocked",
566                    (unsigned long)t->id, whatNext_strs[t->what_next]);
567         continue;
568     }
569 #endif
570
571     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
572     ASSERT(t->cap == cap);
573
574     // ----------------------------------------------------------------------
575     
576     // Costs for the scheduler are assigned to CCS_SYSTEM
577     stopHeapProfTimer();
578 #if defined(PROFILING)
579     CCCS = CCS_SYSTEM;
580 #endif
581     
582     schedulePostRunThread(cap,t);
583
584     t = threadStackUnderflow(task,t);
585
586     ready_to_gc = rtsFalse;
587
588     switch (ret) {
589     case HeapOverflow:
590         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
591         break;
592
593     case StackOverflow:
594         scheduleHandleStackOverflow(cap,task,t);
595         break;
596
597     case ThreadYielding:
598         if (scheduleHandleYield(cap, t, prev_what_next)) {
599             // shortcut for switching between compiler/interpreter:
600             goto run_thread; 
601         }
602         break;
603
604     case ThreadBlocked:
605         scheduleHandleThreadBlocked(t);
606         break;
607
608     case ThreadFinished:
609         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
610         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
611         break;
612
613     default:
614       barf("schedule: invalid thread return code %d", (int)ret);
615     }
616
617     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
618       cap = scheduleDoGC(cap,task,rtsFalse);
619     }
620   } /* end of while() */
621 }
622
623 /* ----------------------------------------------------------------------------
624  * Setting up the scheduler loop
625  * ------------------------------------------------------------------------- */
626
627 static void
628 schedulePreLoop(void)
629 {
630   // initialisation for scheduler - what cannot go into initScheduler()  
631 }
632
633 /* -----------------------------------------------------------------------------
634  * scheduleFindWork()
635  *
636  * Search for work to do, and handle messages from elsewhere.
637  * -------------------------------------------------------------------------- */
638
639 static void
640 scheduleFindWork (Capability *cap)
641 {
642     scheduleStartSignalHandlers(cap);
643
644     // Only check the black holes here if we've nothing else to do.
645     // During normal execution, the black hole list only gets checked
646     // at GC time, to avoid repeatedly traversing this possibly long
647     // list each time around the scheduler.
648     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
649
650     scheduleCheckWakeupThreads(cap);
651
652     scheduleCheckBlockedThreads(cap);
653
654 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
655     // Try to activate one of our own sparks
656     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
657 #endif
658
659 #if defined(THREADED_RTS)
660     // Try to steak work if we don't have any
661     if (emptyRunQueue(cap)) { stealWork(cap); }
662 #endif
663     
664 #if defined(PARALLEL_HASKELL)
665     // if messages have been buffered...
666     scheduleSendPendingMessages();
667 #endif
668
669 #if defined(PARALLEL_HASKELL)
670     if (emptyRunQueue(cap)) {
671         receivedFinish = scheduleGetRemoteWork(cap);
672         continue; //  a new round, (hopefully) with new work
673         /* 
674            in GUM, this a) sends out a FISH and returns IF no fish is
675                            out already
676                         b) (blocking) awaits and receives messages
677            
678            in Eden, this is only the blocking receive, as b) in GUM.
679         */
680     }
681 #endif
682 }
683
684 #if defined(THREADED_RTS)
685 STATIC_INLINE rtsBool
686 shouldYieldCapability (Capability *cap, Task *task)
687 {
688     // we need to yield this capability to someone else if..
689     //   - another thread is initiating a GC
690     //   - another Task is returning from a foreign call
691     //   - the thread at the head of the run queue cannot be run
692     //     by this Task (it is bound to another Task, or it is unbound
693     //     and this task it bound).
694     return (waiting_for_gc || 
695             cap->returning_tasks_hd != NULL ||
696             (!emptyRunQueue(cap) && (task->tso == NULL
697                                      ? cap->run_queue_hd->bound != NULL
698                                      : cap->run_queue_hd->bound != task)));
699 }
700
701 // This is the single place where a Task goes to sleep.  There are
702 // two reasons it might need to sleep:
703 //    - there are no threads to run
704 //    - we need to yield this Capability to someone else 
705 //      (see shouldYieldCapability())
706 //
707 // The return value indicates whether 
708
709 static void
710 scheduleYield (Capability **pcap, Task *task)
711 {
712     Capability *cap = *pcap;
713
714     // if we have work, and we don't need to give up the Capability, continue.
715     if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
716         return;
717
718     // otherwise yield (sleep), and keep yielding if necessary.
719     do {
720         yieldCapability(&cap,task);
721     } 
722     while (shouldYieldCapability(cap,task));
723
724     // note there may still be no threads on the run queue at this
725     // point, the caller has to check.
726
727     *pcap = cap;
728     return;
729 }
730 #endif
731     
732 /* -----------------------------------------------------------------------------
733  * schedulePushWork()
734  *
735  * Push work to other Capabilities if we have some.
736  * -------------------------------------------------------------------------- */
737
738 static void
739 schedulePushWork(Capability *cap USED_IF_THREADS, 
740                  Task *task      USED_IF_THREADS)
741 {
742   /* following code not for PARALLEL_HASKELL. I kept the call general,
743      future GUM versions might use pushing in a distributed setup */
744 #if defined(THREADED_RTS)
745
746     Capability *free_caps[n_capabilities], *cap0;
747     nat i, n_free_caps;
748
749     // migration can be turned off with +RTS -qg
750     if (!RtsFlags.ParFlags.migrate) return;
751
752     // Check whether we have more threads on our run queue, or sparks
753     // in our pool, that we could hand to another Capability.
754     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
755         && sparkPoolSizeCap(cap) < 2) {
756         return;
757     }
758
759     // First grab as many free Capabilities as we can.
760     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
761         cap0 = &capabilities[i];
762         if (cap != cap0 && tryGrabCapability(cap0,task)) {
763             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
764                 // it already has some work, we just grabbed it at 
765                 // the wrong moment.  Or maybe it's deadlocked!
766                 releaseCapability(cap0);
767             } else {
768                 free_caps[n_free_caps++] = cap0;
769             }
770         }
771     }
772
773     // we now have n_free_caps free capabilities stashed in
774     // free_caps[].  Share our run queue equally with them.  This is
775     // probably the simplest thing we could do; improvements we might
776     // want to do include:
777     //
778     //   - giving high priority to moving relatively new threads, on 
779     //     the gournds that they haven't had time to build up a
780     //     working set in the cache on this CPU/Capability.
781     //
782     //   - giving low priority to moving long-lived threads
783
784     if (n_free_caps > 0) {
785         StgTSO *prev, *t, *next;
786         rtsBool pushed_to_all;
787
788         debugTrace(DEBUG_sched, 
789                    "cap %d: %s and %d free capabilities, sharing...", 
790                    cap->no, 
791                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
792                    "excess threads on run queue":"sparks to share (>=2)",
793                    n_free_caps);
794
795         i = 0;
796         pushed_to_all = rtsFalse;
797
798         if (cap->run_queue_hd != END_TSO_QUEUE) {
799             prev = cap->run_queue_hd;
800             t = prev->_link;
801             prev->_link = END_TSO_QUEUE;
802             for (; t != END_TSO_QUEUE; t = next) {
803                 next = t->_link;
804                 t->_link = END_TSO_QUEUE;
805                 if (t->what_next == ThreadRelocated
806                     || t->bound == task // don't move my bound thread
807                     || tsoLocked(t)) {  // don't move a locked thread
808                     setTSOLink(cap, prev, t);
809                     prev = t;
810                 } else if (i == n_free_caps) {
811                     pushed_to_all = rtsTrue;
812                     i = 0;
813                     // keep one for us
814                     setTSOLink(cap, prev, t);
815                     prev = t;
816                 } else {
817                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
818                     appendToRunQueue(free_caps[i],t);
819                     if (t->bound) { t->bound->cap = free_caps[i]; }
820                     t->cap = free_caps[i];
821                     i++;
822                 }
823             }
824             cap->run_queue_tl = prev;
825         }
826
827 #ifdef SPARK_PUSHING
828         /* JB I left this code in place, it would work but is not necessary */
829
830         // If there are some free capabilities that we didn't push any
831         // threads to, then try to push a spark to each one.
832         if (!pushed_to_all) {
833             StgClosure *spark;
834             // i is the next free capability to push to
835             for (; i < n_free_caps; i++) {
836                 if (emptySparkPoolCap(free_caps[i])) {
837                     spark = tryStealSpark(cap->sparks);
838                     if (spark != NULL) {
839                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
840                         newSpark(&(free_caps[i]->r), spark);
841                     }
842                 }
843             }
844         }
845 #endif /* SPARK_PUSHING */
846
847         // release the capabilities
848         for (i = 0; i < n_free_caps; i++) {
849             task->cap = free_caps[i];
850             releaseAndWakeupCapability(free_caps[i]);
851         }
852     }
853     task->cap = cap; // reset to point to our Capability.
854
855 #endif /* THREADED_RTS */
856
857 }
858
859 /* ----------------------------------------------------------------------------
860  * Start any pending signal handlers
861  * ------------------------------------------------------------------------- */
862
863 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
864 static void
865 scheduleStartSignalHandlers(Capability *cap)
866 {
867     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
868         // safe outside the lock
869         startSignalHandlers(cap);
870     }
871 }
872 #else
873 static void
874 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
875 {
876 }
877 #endif
878
879 /* ----------------------------------------------------------------------------
880  * Check for blocked threads that can be woken up.
881  * ------------------------------------------------------------------------- */
882
883 static void
884 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
885 {
886 #if !defined(THREADED_RTS)
887     //
888     // Check whether any waiting threads need to be woken up.  If the
889     // run queue is empty, and there are no other tasks running, we
890     // can wait indefinitely for something to happen.
891     //
892     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
893     {
894         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
895     }
896 #endif
897 }
898
899
900 /* ----------------------------------------------------------------------------
901  * Check for threads woken up by other Capabilities
902  * ------------------------------------------------------------------------- */
903
904 static void
905 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
906 {
907 #if defined(THREADED_RTS)
908     // Any threads that were woken up by other Capabilities get
909     // appended to our run queue.
910     if (!emptyWakeupQueue(cap)) {
911         ACQUIRE_LOCK(&cap->lock);
912         if (emptyRunQueue(cap)) {
913             cap->run_queue_hd = cap->wakeup_queue_hd;
914             cap->run_queue_tl = cap->wakeup_queue_tl;
915         } else {
916             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         }
919         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
920         RELEASE_LOCK(&cap->lock);
921     }
922 #endif
923 }
924
925 /* ----------------------------------------------------------------------------
926  * Check for threads blocked on BLACKHOLEs that can be woken up
927  * ------------------------------------------------------------------------- */
928 static void
929 scheduleCheckBlackHoles (Capability *cap)
930 {
931     if ( blackholes_need_checking ) // check without the lock first
932     {
933         ACQUIRE_LOCK(&sched_mutex);
934         if ( blackholes_need_checking ) {
935             checkBlackHoles(cap);
936             blackholes_need_checking = rtsFalse;
937         }
938         RELEASE_LOCK(&sched_mutex);
939     }
940 }
941
942 /* ----------------------------------------------------------------------------
943  * Detect deadlock conditions and attempt to resolve them.
944  * ------------------------------------------------------------------------- */
945
946 static void
947 scheduleDetectDeadlock (Capability *cap, Task *task)
948 {
949
950 #if defined(PARALLEL_HASKELL)
951     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
952     return;
953 #endif
954
955     /* 
956      * Detect deadlock: when we have no threads to run, there are no
957      * threads blocked, waiting for I/O, or sleeping, and all the
958      * other tasks are waiting for work, we must have a deadlock of
959      * some description.
960      */
961     if ( emptyThreadQueues(cap) )
962     {
963 #if defined(THREADED_RTS)
964         /* 
965          * In the threaded RTS, we only check for deadlock if there
966          * has been no activity in a complete timeslice.  This means
967          * we won't eagerly start a full GC just because we don't have
968          * any threads to run currently.
969          */
970         if (recent_activity != ACTIVITY_INACTIVE) return;
971 #endif
972
973         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
974
975         // Garbage collection can release some new threads due to
976         // either (a) finalizers or (b) threads resurrected because
977         // they are unreachable and will therefore be sent an
978         // exception.  Any threads thus released will be immediately
979         // runnable.
980         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
981
982         recent_activity = ACTIVITY_DONE_GC;
983         // disable timer signals (see #1623)
984         stopTimer();
985         
986         if ( !emptyRunQueue(cap) ) return;
987
988 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
989         /* If we have user-installed signal handlers, then wait
990          * for signals to arrive rather then bombing out with a
991          * deadlock.
992          */
993         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
994             debugTrace(DEBUG_sched,
995                        "still deadlocked, waiting for signals...");
996
997             awaitUserSignals();
998
999             if (signals_pending()) {
1000                 startSignalHandlers(cap);
1001             }
1002
1003             // either we have threads to run, or we were interrupted:
1004             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1005
1006             return;
1007         }
1008 #endif
1009
1010 #if !defined(THREADED_RTS)
1011         /* Probably a real deadlock.  Send the current main thread the
1012          * Deadlock exception.
1013          */
1014         if (task->tso) {
1015             switch (task->tso->why_blocked) {
1016             case BlockedOnSTM:
1017             case BlockedOnBlackHole:
1018             case BlockedOnException:
1019             case BlockedOnMVar:
1020                 throwToSingleThreaded(cap, task->tso, 
1021                                       (StgClosure *)nonTermination_closure);
1022                 return;
1023             default:
1024                 barf("deadlock: main thread blocked in a strange way");
1025             }
1026         }
1027         return;
1028 #endif
1029     }
1030 }
1031
1032
1033 /* ----------------------------------------------------------------------------
1034  * Send pending messages (PARALLEL_HASKELL only)
1035  * ------------------------------------------------------------------------- */
1036
1037 #if defined(PARALLEL_HASKELL)
1038 static void
1039 scheduleSendPendingMessages(void)
1040 {
1041
1042 # if defined(PAR) // global Mem.Mgmt., omit for now
1043     if (PendingFetches != END_BF_QUEUE) {
1044         processFetches();
1045     }
1046 # endif
1047     
1048     if (RtsFlags.ParFlags.BufferTime) {
1049         // if we use message buffering, we must send away all message
1050         // packets which have become too old...
1051         sendOldBuffers(); 
1052     }
1053 }
1054 #endif
1055
1056 /* ----------------------------------------------------------------------------
1057  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1058  * ------------------------------------------------------------------------- */
1059
1060 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1061 static void
1062 scheduleActivateSpark(Capability *cap)
1063 {
1064     StgClosure *spark;
1065
1066 /* We only want to stay here if the run queue is empty and we want some
1067    work. We try to turn a spark into a thread, and add it to the run
1068    queue, from where it will be picked up in the next iteration of the
1069    scheduler loop.  
1070 */
1071     if (!emptyRunQueue(cap)) 
1072       /* In the threaded RTS, another task might have pushed a thread
1073          on our run queue in the meantime ? But would need a lock.. */
1074       return;
1075
1076  
1077     // Really we should be using reclaimSpark() here, but
1078     // experimentally it doesn't seem to perform as well as just
1079     // stealing from our own spark pool:
1080     // spark = reclaimSpark(cap->sparks);
1081     spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1082
1083     if (spark != NULL) {
1084       debugTrace(DEBUG_sched,
1085                  "turning spark of closure %p into a thread",
1086                  (StgClosure *)spark);
1087       createSparkThread(cap,spark); // defined in Sparks.c
1088     }
1089 }
1090 #endif // PARALLEL_HASKELL || THREADED_RTS
1091
1092 /* ----------------------------------------------------------------------------
1093  * Get work from a remote node (PARALLEL_HASKELL only)
1094  * ------------------------------------------------------------------------- */
1095     
1096 #if defined(PARALLEL_HASKELL)
1097 static rtsBool /* return value used in PARALLEL_HASKELL only */
1098 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1099 {
1100 #if defined(PARALLEL_HASKELL)
1101   rtsBool receivedFinish = rtsFalse;
1102
1103   // idle() , i.e. send all buffers, wait for work
1104   if (RtsFlags.ParFlags.BufferTime) {
1105         IF_PAR_DEBUG(verbose, 
1106                 debugBelch("...send all pending data,"));
1107         {
1108           nat i;
1109           for (i=1; i<=nPEs; i++)
1110             sendImmediately(i); // send all messages away immediately
1111         }
1112   }
1113
1114   /* this would be the place for fishing in GUM... 
1115
1116      if (no-earlier-fish-around) 
1117           sendFish(choosePe());
1118    */
1119
1120   // Eden:just look for incoming messages (blocking receive)
1121   IF_PAR_DEBUG(verbose, 
1122                debugBelch("...wait for incoming messages...\n"));
1123   processMessages(cap, &receivedFinish); // blocking receive...
1124
1125
1126   return receivedFinish;
1127   // reenter scheduling look after having received something
1128
1129 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1130
1131   return rtsFalse; /* return value unused in THREADED_RTS */
1132
1133 #endif /* PARALLEL_HASKELL */
1134 }
1135 #endif // PARALLEL_HASKELL || THREADED_RTS
1136
1137 /* ----------------------------------------------------------------------------
1138  * After running a thread...
1139  * ------------------------------------------------------------------------- */
1140
1141 static void
1142 schedulePostRunThread (Capability *cap, StgTSO *t)
1143 {
1144     // We have to be able to catch transactions that are in an
1145     // infinite loop as a result of seeing an inconsistent view of
1146     // memory, e.g. 
1147     //
1148     //   atomically $ do
1149     //       [a,b] <- mapM readTVar [ta,tb]
1150     //       when (a == b) loop
1151     //
1152     // and a is never equal to b given a consistent view of memory.
1153     //
1154     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1155         if (!stmValidateNestOfTransactions (t -> trec)) {
1156             debugTrace(DEBUG_sched | DEBUG_stm,
1157                        "trec %p found wasting its time", t);
1158             
1159             // strip the stack back to the
1160             // ATOMICALLY_FRAME, aborting the (nested)
1161             // transaction, and saving the stack of any
1162             // partially-evaluated thunks on the heap.
1163             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1164             
1165             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1166         }
1167     }
1168
1169   /* some statistics gathering in the parallel case */
1170 }
1171
1172 /* -----------------------------------------------------------------------------
1173  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1174  * -------------------------------------------------------------------------- */
1175
1176 static rtsBool
1177 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1178 {
1179     // did the task ask for a large block?
1180     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1181         // if so, get one and push it on the front of the nursery.
1182         bdescr *bd;
1183         lnat blocks;
1184         
1185         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1186         
1187         debugTrace(DEBUG_sched,
1188                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1189                    (long)t->id, whatNext_strs[t->what_next], blocks);
1190     
1191         // don't do this if the nursery is (nearly) full, we'll GC first.
1192         if (cap->r.rCurrentNursery->link != NULL ||
1193             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1194                                                // if the nursery has only one block.
1195             
1196             ACQUIRE_SM_LOCK
1197             bd = allocGroup( blocks );
1198             RELEASE_SM_LOCK
1199             cap->r.rNursery->n_blocks += blocks;
1200             
1201             // link the new group into the list
1202             bd->link = cap->r.rCurrentNursery;
1203             bd->u.back = cap->r.rCurrentNursery->u.back;
1204             if (cap->r.rCurrentNursery->u.back != NULL) {
1205                 cap->r.rCurrentNursery->u.back->link = bd;
1206             } else {
1207 #if !defined(THREADED_RTS)
1208                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1209                        g0s0 == cap->r.rNursery);
1210 #endif
1211                 cap->r.rNursery->blocks = bd;
1212             }             
1213             cap->r.rCurrentNursery->u.back = bd;
1214             
1215             // initialise it as a nursery block.  We initialise the
1216             // step, gen_no, and flags field of *every* sub-block in
1217             // this large block, because this is easier than making
1218             // sure that we always find the block head of a large
1219             // block whenever we call Bdescr() (eg. evacuate() and
1220             // isAlive() in the GC would both have to do this, at
1221             // least).
1222             { 
1223                 bdescr *x;
1224                 for (x = bd; x < bd + blocks; x++) {
1225                     x->step = cap->r.rNursery;
1226                     x->gen_no = 0;
1227                     x->flags = 0;
1228                 }
1229             }
1230             
1231             // This assert can be a killer if the app is doing lots
1232             // of large block allocations.
1233             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1234             
1235             // now update the nursery to point to the new block
1236             cap->r.rCurrentNursery = bd;
1237             
1238             // we might be unlucky and have another thread get on the
1239             // run queue before us and steal the large block, but in that
1240             // case the thread will just end up requesting another large
1241             // block.
1242             pushOnRunQueue(cap,t);
1243             return rtsFalse;  /* not actually GC'ing */
1244         }
1245     }
1246     
1247     debugTrace(DEBUG_sched,
1248                "--<< thread %ld (%s) stopped: HeapOverflow",
1249                (long)t->id, whatNext_strs[t->what_next]);
1250
1251     if (cap->context_switch) {
1252         // Sometimes we miss a context switch, e.g. when calling
1253         // primitives in a tight loop, MAYBE_GC() doesn't check the
1254         // context switch flag, and we end up waiting for a GC.
1255         // See #1984, and concurrent/should_run/1984
1256         cap->context_switch = 0;
1257         addToRunQueue(cap,t);
1258     } else {
1259         pushOnRunQueue(cap,t);
1260     }
1261     return rtsTrue;
1262     /* actual GC is done at the end of the while loop in schedule() */
1263 }
1264
1265 /* -----------------------------------------------------------------------------
1266  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1267  * -------------------------------------------------------------------------- */
1268
1269 static void
1270 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1271 {
1272     debugTrace (DEBUG_sched,
1273                 "--<< thread %ld (%s) stopped, StackOverflow", 
1274                 (long)t->id, whatNext_strs[t->what_next]);
1275
1276     /* just adjust the stack for this thread, then pop it back
1277      * on the run queue.
1278      */
1279     { 
1280         /* enlarge the stack */
1281         StgTSO *new_t = threadStackOverflow(cap, t);
1282         
1283         /* The TSO attached to this Task may have moved, so update the
1284          * pointer to it.
1285          */
1286         if (task->tso == t) {
1287             task->tso = new_t;
1288         }
1289         pushOnRunQueue(cap,new_t);
1290     }
1291 }
1292
1293 /* -----------------------------------------------------------------------------
1294  * Handle a thread that returned to the scheduler with ThreadYielding
1295  * -------------------------------------------------------------------------- */
1296
1297 static rtsBool
1298 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1299 {
1300     // Reset the context switch flag.  We don't do this just before
1301     // running the thread, because that would mean we would lose ticks
1302     // during GC, which can lead to unfair scheduling (a thread hogs
1303     // the CPU because the tick always arrives during GC).  This way
1304     // penalises threads that do a lot of allocation, but that seems
1305     // better than the alternative.
1306     cap->context_switch = 0;
1307     
1308     /* put the thread back on the run queue.  Then, if we're ready to
1309      * GC, check whether this is the last task to stop.  If so, wake
1310      * up the GC thread.  getThread will block during a GC until the
1311      * GC is finished.
1312      */
1313 #ifdef DEBUG
1314     if (t->what_next != prev_what_next) {
1315         debugTrace(DEBUG_sched,
1316                    "--<< thread %ld (%s) stopped to switch evaluators", 
1317                    (long)t->id, whatNext_strs[t->what_next]);
1318     } else {
1319         debugTrace(DEBUG_sched,
1320                    "--<< thread %ld (%s) stopped, yielding",
1321                    (long)t->id, whatNext_strs[t->what_next]);
1322     }
1323 #endif
1324     
1325     IF_DEBUG(sanity,
1326              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1327              checkTSO(t));
1328     ASSERT(t->_link == END_TSO_QUEUE);
1329     
1330     // Shortcut if we're just switching evaluators: don't bother
1331     // doing stack squeezing (which can be expensive), just run the
1332     // thread.
1333     if (t->what_next != prev_what_next) {
1334         return rtsTrue;
1335     }
1336
1337     addToRunQueue(cap,t);
1338
1339     return rtsFalse;
1340 }
1341
1342 /* -----------------------------------------------------------------------------
1343  * Handle a thread that returned to the scheduler with ThreadBlocked
1344  * -------------------------------------------------------------------------- */
1345
1346 static void
1347 scheduleHandleThreadBlocked( StgTSO *t
1348 #if !defined(GRAN) && !defined(DEBUG)
1349     STG_UNUSED
1350 #endif
1351     )
1352 {
1353
1354       // We don't need to do anything.  The thread is blocked, and it
1355       // has tidied up its stack and placed itself on whatever queue
1356       // it needs to be on.
1357
1358     // ASSERT(t->why_blocked != NotBlocked);
1359     // Not true: for example,
1360     //    - in THREADED_RTS, the thread may already have been woken
1361     //      up by another Capability.  This actually happens: try
1362     //      conc023 +RTS -N2.
1363     //    - the thread may have woken itself up already, because
1364     //      threadPaused() might have raised a blocked throwTo
1365     //      exception, see maybePerformBlockedException().
1366
1367 #ifdef DEBUG
1368     if (traceClass(DEBUG_sched)) {
1369         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1370                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1371         printThreadBlockage(t);
1372         debugTraceEnd();
1373     }
1374 #endif
1375 }
1376
1377 /* -----------------------------------------------------------------------------
1378  * Handle a thread that returned to the scheduler with ThreadFinished
1379  * -------------------------------------------------------------------------- */
1380
1381 static rtsBool
1382 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1383 {
1384     /* Need to check whether this was a main thread, and if so,
1385      * return with the return value.
1386      *
1387      * We also end up here if the thread kills itself with an
1388      * uncaught exception, see Exception.cmm.
1389      */
1390     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1391                (unsigned long)t->id, whatNext_strs[t->what_next]);
1392
1393       //
1394       // Check whether the thread that just completed was a bound
1395       // thread, and if so return with the result.  
1396       //
1397       // There is an assumption here that all thread completion goes
1398       // through this point; we need to make sure that if a thread
1399       // ends up in the ThreadKilled state, that it stays on the run
1400       // queue so it can be dealt with here.
1401       //
1402
1403       if (t->bound) {
1404
1405           if (t->bound != task) {
1406 #if !defined(THREADED_RTS)
1407               // Must be a bound thread that is not the topmost one.  Leave
1408               // it on the run queue until the stack has unwound to the
1409               // point where we can deal with this.  Leaving it on the run
1410               // queue also ensures that the garbage collector knows about
1411               // this thread and its return value (it gets dropped from the
1412               // step->threads list so there's no other way to find it).
1413               appendToRunQueue(cap,t);
1414               return rtsFalse;
1415 #else
1416               // this cannot happen in the threaded RTS, because a
1417               // bound thread can only be run by the appropriate Task.
1418               barf("finished bound thread that isn't mine");
1419 #endif
1420           }
1421
1422           ASSERT(task->tso == t);
1423
1424           if (t->what_next == ThreadComplete) {
1425               if (task->ret) {
1426                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1427                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1428               }
1429               task->stat = Success;
1430           } else {
1431               if (task->ret) {
1432                   *(task->ret) = NULL;
1433               }
1434               if (sched_state >= SCHED_INTERRUPTING) {
1435                   task->stat = Interrupted;
1436               } else {
1437                   task->stat = Killed;
1438               }
1439           }
1440 #ifdef DEBUG
1441           removeThreadLabel((StgWord)task->tso->id);
1442 #endif
1443           return rtsTrue; // tells schedule() to return
1444       }
1445
1446       return rtsFalse;
1447 }
1448
1449 /* -----------------------------------------------------------------------------
1450  * Perform a heap census
1451  * -------------------------------------------------------------------------- */
1452
1453 static rtsBool
1454 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1455 {
1456     // When we have +RTS -i0 and we're heap profiling, do a census at
1457     // every GC.  This lets us get repeatable runs for debugging.
1458     if (performHeapProfile ||
1459         (RtsFlags.ProfFlags.profileInterval==0 &&
1460          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1461         return rtsTrue;
1462     } else {
1463         return rtsFalse;
1464     }
1465 }
1466
1467 /* -----------------------------------------------------------------------------
1468  * Perform a garbage collection if necessary
1469  * -------------------------------------------------------------------------- */
1470
1471 static Capability *
1472 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1473 {
1474     rtsBool heap_census;
1475 #ifdef THREADED_RTS
1476     /* extern static volatile StgWord waiting_for_gc; 
1477        lives inside capability.c */
1478     rtsBool was_waiting;
1479     nat i;
1480 #endif
1481
1482 #ifdef THREADED_RTS
1483     // In order to GC, there must be no threads running Haskell code.
1484     // Therefore, the GC thread needs to hold *all* the capabilities,
1485     // and release them after the GC has completed.  
1486     //
1487     // This seems to be the simplest way: previous attempts involved
1488     // making all the threads with capabilities give up their
1489     // capabilities and sleep except for the *last* one, which
1490     // actually did the GC.  But it's quite hard to arrange for all
1491     // the other tasks to sleep and stay asleep.
1492     //
1493         
1494     /*  Other capabilities are prevented from running yet more Haskell
1495         threads if waiting_for_gc is set. Tested inside
1496         yieldCapability() and releaseCapability() in Capability.c */
1497
1498     was_waiting = cas(&waiting_for_gc, 0, 1);
1499     if (was_waiting) {
1500         do {
1501             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1502             if (cap) yieldCapability(&cap,task);
1503         } while (waiting_for_gc);
1504         return cap;  // NOTE: task->cap might have changed here
1505     }
1506
1507     setContextSwitches();
1508     for (i=0; i < n_capabilities; i++) {
1509         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1510         if (cap != &capabilities[i]) {
1511             Capability *pcap = &capabilities[i];
1512             // we better hope this task doesn't get migrated to
1513             // another Capability while we're waiting for this one.
1514             // It won't, because load balancing happens while we have
1515             // all the Capabilities, but even so it's a slightly
1516             // unsavoury invariant.
1517             task->cap = pcap;
1518             waitForReturnCapability(&pcap, task);
1519             if (pcap != &capabilities[i]) {
1520                 barf("scheduleDoGC: got the wrong capability");
1521             }
1522         }
1523     }
1524
1525     waiting_for_gc = rtsFalse;
1526 #endif
1527
1528     // so this happens periodically:
1529     if (cap) scheduleCheckBlackHoles(cap);
1530     
1531     IF_DEBUG(scheduler, printAllThreads());
1532
1533     /*
1534      * We now have all the capabilities; if we're in an interrupting
1535      * state, then we should take the opportunity to delete all the
1536      * threads in the system.
1537      */
1538     if (sched_state >= SCHED_INTERRUPTING) {
1539         deleteAllThreads(&capabilities[0]);
1540         sched_state = SCHED_SHUTTING_DOWN;
1541     }
1542     
1543     heap_census = scheduleNeedHeapProfile(rtsTrue);
1544
1545     /* everybody back, start the GC.
1546      * Could do it in this thread, or signal a condition var
1547      * to do it in another thread.  Either way, we need to
1548      * broadcast on gc_pending_cond afterward.
1549      */
1550 #if defined(THREADED_RTS)
1551     debugTrace(DEBUG_sched, "doing GC");
1552 #endif
1553     GarbageCollect(force_major || heap_census);
1554     
1555     if (heap_census) {
1556         debugTrace(DEBUG_sched, "performing heap census");
1557         heapCensus();
1558         performHeapProfile = rtsFalse;
1559     }
1560
1561 #ifdef SPARKBALANCE
1562     /* JB 
1563        Once we are all together... this would be the place to balance all
1564        spark pools. No concurrent stealing or adding of new sparks can
1565        occur. Should be defined in Sparks.c. */
1566     balanceSparkPoolsCaps(n_capabilities, capabilities);
1567 #endif
1568
1569 #if defined(THREADED_RTS)
1570     // release our stash of capabilities.
1571     for (i = 0; i < n_capabilities; i++) {
1572         if (cap != &capabilities[i]) {
1573             task->cap = &capabilities[i];
1574             releaseCapability(&capabilities[i]);
1575         }
1576     }
1577     if (cap) {
1578         task->cap = cap;
1579     } else {
1580         task->cap = NULL;
1581     }
1582 #endif
1583
1584     return cap;
1585 }
1586
1587 /* ---------------------------------------------------------------------------
1588  * Singleton fork(). Do not copy any running threads.
1589  * ------------------------------------------------------------------------- */
1590
1591 pid_t
1592 forkProcess(HsStablePtr *entry
1593 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1594             STG_UNUSED
1595 #endif
1596            )
1597 {
1598 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1599     Task *task;
1600     pid_t pid;
1601     StgTSO* t,*next;
1602     Capability *cap;
1603     nat s;
1604     
1605 #if defined(THREADED_RTS)
1606     if (RtsFlags.ParFlags.nNodes > 1) {
1607         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1608         stg_exit(EXIT_FAILURE);
1609     }
1610 #endif
1611
1612     debugTrace(DEBUG_sched, "forking!");
1613     
1614     // ToDo: for SMP, we should probably acquire *all* the capabilities
1615     cap = rts_lock();
1616     
1617     // no funny business: hold locks while we fork, otherwise if some
1618     // other thread is holding a lock when the fork happens, the data
1619     // structure protected by the lock will forever be in an
1620     // inconsistent state in the child.  See also #1391.
1621     ACQUIRE_LOCK(&sched_mutex);
1622     ACQUIRE_LOCK(&cap->lock);
1623     ACQUIRE_LOCK(&cap->running_task->lock);
1624
1625     pid = fork();
1626     
1627     if (pid) { // parent
1628         
1629         RELEASE_LOCK(&sched_mutex);
1630         RELEASE_LOCK(&cap->lock);
1631         RELEASE_LOCK(&cap->running_task->lock);
1632
1633         // just return the pid
1634         rts_unlock(cap);
1635         return pid;
1636         
1637     } else { // child
1638         
1639 #if defined(THREADED_RTS)
1640         initMutex(&sched_mutex);
1641         initMutex(&cap->lock);
1642         initMutex(&cap->running_task->lock);
1643 #endif
1644
1645         // Now, all OS threads except the thread that forked are
1646         // stopped.  We need to stop all Haskell threads, including
1647         // those involved in foreign calls.  Also we need to delete
1648         // all Tasks, because they correspond to OS threads that are
1649         // now gone.
1650
1651         for (s = 0; s < total_steps; s++) {
1652           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1653             if (t->what_next == ThreadRelocated) {
1654                 next = t->_link;
1655             } else {
1656                 next = t->global_link;
1657                 // don't allow threads to catch the ThreadKilled
1658                 // exception, but we do want to raiseAsync() because these
1659                 // threads may be evaluating thunks that we need later.
1660                 deleteThread_(cap,t);
1661             }
1662           }
1663         }
1664         
1665         // Empty the run queue.  It seems tempting to let all the
1666         // killed threads stay on the run queue as zombies to be
1667         // cleaned up later, but some of them correspond to bound
1668         // threads for which the corresponding Task does not exist.
1669         cap->run_queue_hd = END_TSO_QUEUE;
1670         cap->run_queue_tl = END_TSO_QUEUE;
1671
1672         // Any suspended C-calling Tasks are no more, their OS threads
1673         // don't exist now:
1674         cap->suspended_ccalling_tasks = NULL;
1675
1676         // Empty the threads lists.  Otherwise, the garbage
1677         // collector may attempt to resurrect some of these threads.
1678         for (s = 0; s < total_steps; s++) {
1679             all_steps[s].threads = END_TSO_QUEUE;
1680         }
1681
1682         // Wipe the task list, except the current Task.
1683         ACQUIRE_LOCK(&sched_mutex);
1684         for (task = all_tasks; task != NULL; task=task->all_link) {
1685             if (task != cap->running_task) {
1686 #if defined(THREADED_RTS)
1687                 initMutex(&task->lock); // see #1391
1688 #endif
1689                 discardTask(task);
1690             }
1691         }
1692         RELEASE_LOCK(&sched_mutex);
1693
1694 #if defined(THREADED_RTS)
1695         // Wipe our spare workers list, they no longer exist.  New
1696         // workers will be created if necessary.
1697         cap->spare_workers = NULL;
1698         cap->returning_tasks_hd = NULL;
1699         cap->returning_tasks_tl = NULL;
1700 #endif
1701
1702         // On Unix, all timers are reset in the child, so we need to start
1703         // the timer again.
1704         initTimer();
1705         startTimer();
1706
1707         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1708         rts_checkSchedStatus("forkProcess",cap);
1709         
1710         rts_unlock(cap);
1711         hs_exit();                      // clean up and exit
1712         stg_exit(EXIT_SUCCESS);
1713     }
1714 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1715     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1716     return -1;
1717 #endif
1718 }
1719
1720 /* ---------------------------------------------------------------------------
1721  * Delete all the threads in the system
1722  * ------------------------------------------------------------------------- */
1723    
1724 static void
1725 deleteAllThreads ( Capability *cap )
1726 {
1727     // NOTE: only safe to call if we own all capabilities.
1728
1729     StgTSO* t, *next;
1730     nat s;
1731
1732     debugTrace(DEBUG_sched,"deleting all threads");
1733     for (s = 0; s < total_steps; s++) {
1734       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1735         if (t->what_next == ThreadRelocated) {
1736             next = t->_link;
1737         } else {
1738             next = t->global_link;
1739             deleteThread(cap,t);
1740         }
1741       }
1742     }      
1743
1744     // The run queue now contains a bunch of ThreadKilled threads.  We
1745     // must not throw these away: the main thread(s) will be in there
1746     // somewhere, and the main scheduler loop has to deal with it.
1747     // Also, the run queue is the only thing keeping these threads from
1748     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1749
1750 #if !defined(THREADED_RTS)
1751     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1752     ASSERT(sleeping_queue == END_TSO_QUEUE);
1753 #endif
1754 }
1755
1756 /* -----------------------------------------------------------------------------
1757    Managing the suspended_ccalling_tasks list.
1758    Locks required: sched_mutex
1759    -------------------------------------------------------------------------- */
1760
1761 STATIC_INLINE void
1762 suspendTask (Capability *cap, Task *task)
1763 {
1764     ASSERT(task->next == NULL && task->prev == NULL);
1765     task->next = cap->suspended_ccalling_tasks;
1766     task->prev = NULL;
1767     if (cap->suspended_ccalling_tasks) {
1768         cap->suspended_ccalling_tasks->prev = task;
1769     }
1770     cap->suspended_ccalling_tasks = task;
1771 }
1772
1773 STATIC_INLINE void
1774 recoverSuspendedTask (Capability *cap, Task *task)
1775 {
1776     if (task->prev) {
1777         task->prev->next = task->next;
1778     } else {
1779         ASSERT(cap->suspended_ccalling_tasks == task);
1780         cap->suspended_ccalling_tasks = task->next;
1781     }
1782     if (task->next) {
1783         task->next->prev = task->prev;
1784     }
1785     task->next = task->prev = NULL;
1786 }
1787
1788 /* ---------------------------------------------------------------------------
1789  * Suspending & resuming Haskell threads.
1790  * 
1791  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1792  * its capability before calling the C function.  This allows another
1793  * task to pick up the capability and carry on running Haskell
1794  * threads.  It also means that if the C call blocks, it won't lock
1795  * the whole system.
1796  *
1797  * The Haskell thread making the C call is put to sleep for the
1798  * duration of the call, on the susepended_ccalling_threads queue.  We
1799  * give out a token to the task, which it can use to resume the thread
1800  * on return from the C function.
1801  * ------------------------------------------------------------------------- */
1802    
1803 void *
1804 suspendThread (StgRegTable *reg)
1805 {
1806   Capability *cap;
1807   int saved_errno;
1808   StgTSO *tso;
1809   Task *task;
1810 #if mingw32_HOST_OS
1811   StgWord32 saved_winerror;
1812 #endif
1813
1814   saved_errno = errno;
1815 #if mingw32_HOST_OS
1816   saved_winerror = GetLastError();
1817 #endif
1818
1819   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1820    */
1821   cap = regTableToCapability(reg);
1822
1823   task = cap->running_task;
1824   tso = cap->r.rCurrentTSO;
1825
1826   debugTrace(DEBUG_sched, 
1827              "thread %lu did a safe foreign call", 
1828              (unsigned long)cap->r.rCurrentTSO->id);
1829
1830   // XXX this might not be necessary --SDM
1831   tso->what_next = ThreadRunGHC;
1832
1833   threadPaused(cap,tso);
1834
1835   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1836       tso->why_blocked = BlockedOnCCall;
1837       tso->flags |= TSO_BLOCKEX;
1838       tso->flags &= ~TSO_INTERRUPTIBLE;
1839   } else {
1840       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1841   }
1842
1843   // Hand back capability
1844   task->suspended_tso = tso;
1845
1846   ACQUIRE_LOCK(&cap->lock);
1847
1848   suspendTask(cap,task);
1849   cap->in_haskell = rtsFalse;
1850   releaseCapability_(cap,rtsFalse);
1851   
1852   RELEASE_LOCK(&cap->lock);
1853
1854 #if defined(THREADED_RTS)
1855   /* Preparing to leave the RTS, so ensure there's a native thread/task
1856      waiting to take over.
1857   */
1858   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1859 #endif
1860
1861   errno = saved_errno;
1862 #if mingw32_HOST_OS
1863   SetLastError(saved_winerror);
1864 #endif
1865   return task;
1866 }
1867
1868 StgRegTable *
1869 resumeThread (void *task_)
1870 {
1871     StgTSO *tso;
1872     Capability *cap;
1873     Task *task = task_;
1874     int saved_errno;
1875 #if mingw32_HOST_OS
1876     StgWord32 saved_winerror;
1877 #endif
1878
1879     saved_errno = errno;
1880 #if mingw32_HOST_OS
1881     saved_winerror = GetLastError();
1882 #endif
1883
1884     cap = task->cap;
1885     // Wait for permission to re-enter the RTS with the result.
1886     waitForReturnCapability(&cap,task);
1887     // we might be on a different capability now... but if so, our
1888     // entry on the suspended_ccalling_tasks list will also have been
1889     // migrated.
1890
1891     // Remove the thread from the suspended list
1892     recoverSuspendedTask(cap,task);
1893
1894     tso = task->suspended_tso;
1895     task->suspended_tso = NULL;
1896     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1897     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1898     
1899     if (tso->why_blocked == BlockedOnCCall) {
1900         awakenBlockedExceptionQueue(cap,tso);
1901         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1902     }
1903     
1904     /* Reset blocking status */
1905     tso->why_blocked  = NotBlocked;
1906     
1907     cap->r.rCurrentTSO = tso;
1908     cap->in_haskell = rtsTrue;
1909     errno = saved_errno;
1910 #if mingw32_HOST_OS
1911     SetLastError(saved_winerror);
1912 #endif
1913
1914     /* We might have GC'd, mark the TSO dirty again */
1915     dirty_TSO(cap,tso);
1916
1917     IF_DEBUG(sanity, checkTSO(tso));
1918
1919     return &cap->r;
1920 }
1921
1922 /* ---------------------------------------------------------------------------
1923  * scheduleThread()
1924  *
1925  * scheduleThread puts a thread on the end  of the runnable queue.
1926  * This will usually be done immediately after a thread is created.
1927  * The caller of scheduleThread must create the thread using e.g.
1928  * createThread and push an appropriate closure
1929  * on this thread's stack before the scheduler is invoked.
1930  * ------------------------------------------------------------------------ */
1931
1932 void
1933 scheduleThread(Capability *cap, StgTSO *tso)
1934 {
1935     // The thread goes at the *end* of the run-queue, to avoid possible
1936     // starvation of any threads already on the queue.
1937     appendToRunQueue(cap,tso);
1938 }
1939
1940 void
1941 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1942 {
1943 #if defined(THREADED_RTS)
1944     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1945                               // move this thread from now on.
1946     cpu %= RtsFlags.ParFlags.nNodes;
1947     if (cpu == cap->no) {
1948         appendToRunQueue(cap,tso);
1949     } else {
1950         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1951     }
1952 #else
1953     appendToRunQueue(cap,tso);
1954 #endif
1955 }
1956
1957 Capability *
1958 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1959 {
1960     Task *task;
1961
1962     // We already created/initialised the Task
1963     task = cap->running_task;
1964
1965     // This TSO is now a bound thread; make the Task and TSO
1966     // point to each other.
1967     tso->bound = task;
1968     tso->cap = cap;
1969
1970     task->tso = tso;
1971     task->ret = ret;
1972     task->stat = NoStatus;
1973
1974     appendToRunQueue(cap,tso);
1975
1976     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1977
1978     cap = schedule(cap,task);
1979
1980     ASSERT(task->stat != NoStatus);
1981     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1982
1983     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1984     return cap;
1985 }
1986
1987 /* ----------------------------------------------------------------------------
1988  * Starting Tasks
1989  * ------------------------------------------------------------------------- */
1990
1991 #if defined(THREADED_RTS)
1992 void OSThreadProcAttr
1993 workerStart(Task *task)
1994 {
1995     Capability *cap;
1996
1997     // See startWorkerTask().
1998     ACQUIRE_LOCK(&task->lock);
1999     cap = task->cap;
2000     RELEASE_LOCK(&task->lock);
2001
2002     // set the thread-local pointer to the Task:
2003     taskEnter(task);
2004
2005     // schedule() runs without a lock.
2006     cap = schedule(cap,task);
2007
2008     // On exit from schedule(), we have a Capability.
2009     releaseCapability(cap);
2010     workerTaskStop(task);
2011 }
2012 #endif
2013
2014 /* ---------------------------------------------------------------------------
2015  * initScheduler()
2016  *
2017  * Initialise the scheduler.  This resets all the queues - if the
2018  * queues contained any threads, they'll be garbage collected at the
2019  * next pass.
2020  *
2021  * ------------------------------------------------------------------------ */
2022
2023 void 
2024 initScheduler(void)
2025 {
2026 #if !defined(THREADED_RTS)
2027   blocked_queue_hd  = END_TSO_QUEUE;
2028   blocked_queue_tl  = END_TSO_QUEUE;
2029   sleeping_queue    = END_TSO_QUEUE;
2030 #endif
2031
2032   blackhole_queue   = END_TSO_QUEUE;
2033
2034   sched_state    = SCHED_RUNNING;
2035   recent_activity = ACTIVITY_YES;
2036
2037 #if defined(THREADED_RTS)
2038   /* Initialise the mutex and condition variables used by
2039    * the scheduler. */
2040   initMutex(&sched_mutex);
2041 #endif
2042   
2043   ACQUIRE_LOCK(&sched_mutex);
2044
2045   /* A capability holds the state a native thread needs in
2046    * order to execute STG code. At least one capability is
2047    * floating around (only THREADED_RTS builds have more than one).
2048    */
2049   initCapabilities();
2050
2051   initTaskManager();
2052
2053 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2054   initSparkPools();
2055 #endif
2056
2057 #if defined(THREADED_RTS)
2058   /*
2059    * Eagerly start one worker to run each Capability, except for
2060    * Capability 0.  The idea is that we're probably going to start a
2061    * bound thread on Capability 0 pretty soon, so we don't want a
2062    * worker task hogging it.
2063    */
2064   { 
2065       nat i;
2066       Capability *cap;
2067       for (i = 1; i < n_capabilities; i++) {
2068           cap = &capabilities[i];
2069           ACQUIRE_LOCK(&cap->lock);
2070           startWorkerTask(cap, workerStart);
2071           RELEASE_LOCK(&cap->lock);
2072       }
2073   }
2074 #endif
2075
2076   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2077
2078   RELEASE_LOCK(&sched_mutex);
2079 }
2080
2081 void
2082 exitScheduler(
2083     rtsBool wait_foreign
2084 #if !defined(THREADED_RTS)
2085                          __attribute__((unused))
2086 #endif
2087 )
2088                /* see Capability.c, shutdownCapability() */
2089 {
2090     Task *task = NULL;
2091
2092 #if defined(THREADED_RTS)
2093     ACQUIRE_LOCK(&sched_mutex);
2094     task = newBoundTask();
2095     RELEASE_LOCK(&sched_mutex);
2096 #endif
2097
2098     // If we haven't killed all the threads yet, do it now.
2099     if (sched_state < SCHED_SHUTTING_DOWN) {
2100         sched_state = SCHED_INTERRUPTING;
2101         scheduleDoGC(NULL,task,rtsFalse);    
2102     }
2103     sched_state = SCHED_SHUTTING_DOWN;
2104
2105 #if defined(THREADED_RTS)
2106     { 
2107         nat i;
2108         
2109         for (i = 0; i < n_capabilities; i++) {
2110             shutdownCapability(&capabilities[i], task, wait_foreign);
2111         }
2112         boundTaskExiting(task);
2113         stopTaskManager();
2114     }
2115 #else
2116     freeCapability(&MainCapability);
2117 #endif
2118 }
2119
2120 void
2121 freeScheduler( void )
2122 {
2123     freeTaskManager();
2124     if (n_capabilities != 1) {
2125         stgFree(capabilities);
2126     }
2127 #if defined(THREADED_RTS)
2128     closeMutex(&sched_mutex);
2129 #endif
2130 }
2131
2132 /* -----------------------------------------------------------------------------
2133    performGC
2134
2135    This is the interface to the garbage collector from Haskell land.
2136    We provide this so that external C code can allocate and garbage
2137    collect when called from Haskell via _ccall_GC.
2138    -------------------------------------------------------------------------- */
2139
2140 static void
2141 performGC_(rtsBool force_major)
2142 {
2143     Task *task;
2144     // We must grab a new Task here, because the existing Task may be
2145     // associated with a particular Capability, and chained onto the 
2146     // suspended_ccalling_tasks queue.
2147     ACQUIRE_LOCK(&sched_mutex);
2148     task = newBoundTask();
2149     RELEASE_LOCK(&sched_mutex);
2150     scheduleDoGC(NULL,task,force_major);
2151     boundTaskExiting(task);
2152 }
2153
2154 void
2155 performGC(void)
2156 {
2157     performGC_(rtsFalse);
2158 }
2159
2160 void
2161 performMajorGC(void)
2162 {
2163     performGC_(rtsTrue);
2164 }
2165
2166 /* -----------------------------------------------------------------------------
2167    Stack overflow
2168
2169    If the thread has reached its maximum stack size, then raise the
2170    StackOverflow exception in the offending thread.  Otherwise
2171    relocate the TSO into a larger chunk of memory and adjust its stack
2172    size appropriately.
2173    -------------------------------------------------------------------------- */
2174
2175 static StgTSO *
2176 threadStackOverflow(Capability *cap, StgTSO *tso)
2177 {
2178   nat new_stack_size, stack_words;
2179   lnat new_tso_size;
2180   StgPtr new_sp;
2181   StgTSO *dest;
2182
2183   IF_DEBUG(sanity,checkTSO(tso));
2184
2185   // don't allow throwTo() to modify the blocked_exceptions queue
2186   // while we are moving the TSO:
2187   lockClosure((StgClosure *)tso);
2188
2189   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2190       // NB. never raise a StackOverflow exception if the thread is
2191       // inside Control.Exceptino.block.  It is impractical to protect
2192       // against stack overflow exceptions, since virtually anything
2193       // can raise one (even 'catch'), so this is the only sensible
2194       // thing to do here.  See bug #767.
2195
2196       debugTrace(DEBUG_gc,
2197                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2198                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2199       IF_DEBUG(gc,
2200                /* If we're debugging, just print out the top of the stack */
2201                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2202                                                 tso->sp+64)));
2203
2204       // Send this thread the StackOverflow exception
2205       unlockTSO(tso);
2206       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2207       return tso;
2208   }
2209
2210   /* Try to double the current stack size.  If that takes us over the
2211    * maximum stack size for this thread, then use the maximum instead
2212    * (that is, unless we're already at or over the max size and we
2213    * can't raise the StackOverflow exception (see above), in which
2214    * case just double the size). Finally round up so the TSO ends up as
2215    * a whole number of blocks.
2216    */
2217   if (tso->stack_size >= tso->max_stack_size) {
2218       new_stack_size = tso->stack_size * 2;
2219   } else { 
2220       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2221   }
2222   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2223                                        TSO_STRUCT_SIZE)/sizeof(W_);
2224   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2225   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2226
2227   debugTrace(DEBUG_sched, 
2228              "increasing stack size from %ld words to %d.",
2229              (long)tso->stack_size, new_stack_size);
2230
2231   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2232   TICK_ALLOC_TSO(new_stack_size,0);
2233
2234   /* copy the TSO block and the old stack into the new area */
2235   memcpy(dest,tso,TSO_STRUCT_SIZE);
2236   stack_words = tso->stack + tso->stack_size - tso->sp;
2237   new_sp = (P_)dest + new_tso_size - stack_words;
2238   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2239
2240   /* relocate the stack pointers... */
2241   dest->sp         = new_sp;
2242   dest->stack_size = new_stack_size;
2243         
2244   /* Mark the old TSO as relocated.  We have to check for relocated
2245    * TSOs in the garbage collector and any primops that deal with TSOs.
2246    *
2247    * It's important to set the sp value to just beyond the end
2248    * of the stack, so we don't attempt to scavenge any part of the
2249    * dead TSO's stack.
2250    */
2251   tso->what_next = ThreadRelocated;
2252   setTSOLink(cap,tso,dest);
2253   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2254   tso->why_blocked = NotBlocked;
2255
2256   IF_PAR_DEBUG(verbose,
2257                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2258                      tso->id, tso, tso->stack_size);
2259                /* If we're debugging, just print out the top of the stack */
2260                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2261                                                 tso->sp+64)));
2262   
2263   unlockTSO(dest);
2264   unlockTSO(tso);
2265
2266   IF_DEBUG(sanity,checkTSO(dest));
2267 #if 0
2268   IF_DEBUG(scheduler,printTSO(dest));
2269 #endif
2270
2271   return dest;
2272 }
2273
2274 static StgTSO *
2275 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2276 {
2277     bdescr *bd, *new_bd;
2278     lnat free_w, tso_size_w;
2279     StgTSO *new_tso;
2280
2281     tso_size_w = tso_sizeW(tso);
2282
2283     if (tso_size_w < MBLOCK_SIZE_W || 
2284         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2285     {
2286         return tso;
2287     }
2288
2289     // don't allow throwTo() to modify the blocked_exceptions queue
2290     // while we are moving the TSO:
2291     lockClosure((StgClosure *)tso);
2292
2293     // this is the number of words we'll free
2294     free_w = round_to_mblocks(tso_size_w/2);
2295
2296     bd = Bdescr((StgPtr)tso);
2297     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2298     bd->free = bd->start + TSO_STRUCT_SIZEW;
2299
2300     new_tso = (StgTSO *)new_bd->start;
2301     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2302     new_tso->stack_size = new_bd->free - new_tso->stack;
2303
2304     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2305                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2306
2307     tso->what_next = ThreadRelocated;
2308     tso->_link = new_tso; // no write barrier reqd: same generation
2309
2310     // The TSO attached to this Task may have moved, so update the
2311     // pointer to it.
2312     if (task->tso == tso) {
2313         task->tso = new_tso;
2314     }
2315
2316     unlockTSO(new_tso);
2317     unlockTSO(tso);
2318
2319     IF_DEBUG(sanity,checkTSO(new_tso));
2320
2321     return new_tso;
2322 }
2323
2324 /* ---------------------------------------------------------------------------
2325    Interrupt execution
2326    - usually called inside a signal handler so it mustn't do anything fancy.   
2327    ------------------------------------------------------------------------ */
2328
2329 void
2330 interruptStgRts(void)
2331 {
2332     sched_state = SCHED_INTERRUPTING;
2333     setContextSwitches();
2334     wakeUpRts();
2335 }
2336
2337 /* -----------------------------------------------------------------------------
2338    Wake up the RTS
2339    
2340    This function causes at least one OS thread to wake up and run the
2341    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2342    an external event has arrived that may need servicing (eg. a
2343    keyboard interrupt).
2344
2345    In the single-threaded RTS we don't do anything here; we only have
2346    one thread anyway, and the event that caused us to want to wake up
2347    will have interrupted any blocking system call in progress anyway.
2348    -------------------------------------------------------------------------- */
2349
2350 void
2351 wakeUpRts(void)
2352 {
2353 #if defined(THREADED_RTS)
2354     // This forces the IO Manager thread to wakeup, which will
2355     // in turn ensure that some OS thread wakes up and runs the
2356     // scheduler loop, which will cause a GC and deadlock check.
2357     ioManagerWakeup();
2358 #endif
2359 }
2360
2361 /* -----------------------------------------------------------------------------
2362  * checkBlackHoles()
2363  *
2364  * Check the blackhole_queue for threads that can be woken up.  We do
2365  * this periodically: before every GC, and whenever the run queue is
2366  * empty.
2367  *
2368  * An elegant solution might be to just wake up all the blocked
2369  * threads with awakenBlockedQueue occasionally: they'll go back to
2370  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2371  * doesn't give us a way to tell whether we've actually managed to
2372  * wake up any threads, so we would be busy-waiting.
2373  *
2374  * -------------------------------------------------------------------------- */
2375
2376 static rtsBool
2377 checkBlackHoles (Capability *cap)
2378 {
2379     StgTSO **prev, *t;
2380     rtsBool any_woke_up = rtsFalse;
2381     StgHalfWord type;
2382
2383     // blackhole_queue is global:
2384     ASSERT_LOCK_HELD(&sched_mutex);
2385
2386     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2387
2388     // ASSUMES: sched_mutex
2389     prev = &blackhole_queue;
2390     t = blackhole_queue;
2391     while (t != END_TSO_QUEUE) {
2392         ASSERT(t->why_blocked == BlockedOnBlackHole);
2393         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2394         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2395             IF_DEBUG(sanity,checkTSO(t));
2396             t = unblockOne(cap, t);
2397             *prev = t;
2398             any_woke_up = rtsTrue;
2399         } else {
2400             prev = &t->_link;
2401             t = t->_link;
2402         }
2403     }
2404
2405     return any_woke_up;
2406 }
2407
2408 /* -----------------------------------------------------------------------------
2409    Deleting threads
2410
2411    This is used for interruption (^C) and forking, and corresponds to
2412    raising an exception but without letting the thread catch the
2413    exception.
2414    -------------------------------------------------------------------------- */
2415
2416 static void 
2417 deleteThread (Capability *cap, StgTSO *tso)
2418 {
2419     // NOTE: must only be called on a TSO that we have exclusive
2420     // access to, because we will call throwToSingleThreaded() below.
2421     // The TSO must be on the run queue of the Capability we own, or 
2422     // we must own all Capabilities.
2423
2424     if (tso->why_blocked != BlockedOnCCall &&
2425         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2426         throwToSingleThreaded(cap,tso,NULL);
2427     }
2428 }
2429
2430 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2431 static void 
2432 deleteThread_(Capability *cap, StgTSO *tso)
2433 { // for forkProcess only:
2434   // like deleteThread(), but we delete threads in foreign calls, too.
2435
2436     if (tso->why_blocked == BlockedOnCCall ||
2437         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2438         unblockOne(cap,tso);
2439         tso->what_next = ThreadKilled;
2440     } else {
2441         deleteThread(cap,tso);
2442     }
2443 }
2444 #endif
2445
2446 /* -----------------------------------------------------------------------------
2447    raiseExceptionHelper
2448    
2449    This function is called by the raise# primitve, just so that we can
2450    move some of the tricky bits of raising an exception from C-- into
2451    C.  Who knows, it might be a useful re-useable thing here too.
2452    -------------------------------------------------------------------------- */
2453
2454 StgWord
2455 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2456 {
2457     Capability *cap = regTableToCapability(reg);
2458     StgThunk *raise_closure = NULL;
2459     StgPtr p, next;
2460     StgRetInfoTable *info;
2461     //
2462     // This closure represents the expression 'raise# E' where E
2463     // is the exception raise.  It is used to overwrite all the
2464     // thunks which are currently under evaluataion.
2465     //
2466
2467     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2468     // LDV profiling: stg_raise_info has THUNK as its closure
2469     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2470     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2471     // 1 does not cause any problem unless profiling is performed.
2472     // However, when LDV profiling goes on, we need to linearly scan
2473     // small object pool, where raise_closure is stored, so we should
2474     // use MIN_UPD_SIZE.
2475     //
2476     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2477     //                                 sizeofW(StgClosure)+1);
2478     //
2479
2480     //
2481     // Walk up the stack, looking for the catch frame.  On the way,
2482     // we update any closures pointed to from update frames with the
2483     // raise closure that we just built.
2484     //
2485     p = tso->sp;
2486     while(1) {
2487         info = get_ret_itbl((StgClosure *)p);
2488         next = p + stack_frame_sizeW((StgClosure *)p);
2489         switch (info->i.type) {
2490             
2491         case UPDATE_FRAME:
2492             // Only create raise_closure if we need to.
2493             if (raise_closure == NULL) {
2494                 raise_closure = 
2495                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2496                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2497                 raise_closure->payload[0] = exception;
2498             }
2499             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2500             p = next;
2501             continue;
2502
2503         case ATOMICALLY_FRAME:
2504             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2505             tso->sp = p;
2506             return ATOMICALLY_FRAME;
2507             
2508         case CATCH_FRAME:
2509             tso->sp = p;
2510             return CATCH_FRAME;
2511
2512         case CATCH_STM_FRAME:
2513             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2514             tso->sp = p;
2515             return CATCH_STM_FRAME;
2516             
2517         case STOP_FRAME:
2518             tso->sp = p;
2519             return STOP_FRAME;
2520
2521         case CATCH_RETRY_FRAME:
2522         default:
2523             p = next; 
2524             continue;
2525         }
2526     }
2527 }
2528
2529
2530 /* -----------------------------------------------------------------------------
2531    findRetryFrameHelper
2532
2533    This function is called by the retry# primitive.  It traverses the stack
2534    leaving tso->sp referring to the frame which should handle the retry.  
2535
2536    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2537    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2538
2539    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2540    create) because retries are not considered to be exceptions, despite the
2541    similar implementation.
2542
2543    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2544    not be created within memory transactions.
2545    -------------------------------------------------------------------------- */
2546
2547 StgWord
2548 findRetryFrameHelper (StgTSO *tso)
2549 {
2550   StgPtr           p, next;
2551   StgRetInfoTable *info;
2552
2553   p = tso -> sp;
2554   while (1) {
2555     info = get_ret_itbl((StgClosure *)p);
2556     next = p + stack_frame_sizeW((StgClosure *)p);
2557     switch (info->i.type) {
2558       
2559     case ATOMICALLY_FRAME:
2560         debugTrace(DEBUG_stm,
2561                    "found ATOMICALLY_FRAME at %p during retry", p);
2562         tso->sp = p;
2563         return ATOMICALLY_FRAME;
2564       
2565     case CATCH_RETRY_FRAME:
2566         debugTrace(DEBUG_stm,
2567                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2568         tso->sp = p;
2569         return CATCH_RETRY_FRAME;
2570       
2571     case CATCH_STM_FRAME: {
2572         StgTRecHeader *trec = tso -> trec;
2573         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2574         debugTrace(DEBUG_stm,
2575                    "found CATCH_STM_FRAME at %p during retry", p);
2576         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2577         stmAbortTransaction(tso -> cap, trec);
2578         stmFreeAbortedTRec(tso -> cap, trec);
2579         tso -> trec = outer;
2580         p = next; 
2581         continue;
2582     }
2583       
2584
2585     default:
2586       ASSERT(info->i.type != CATCH_FRAME);
2587       ASSERT(info->i.type != STOP_FRAME);
2588       p = next; 
2589       continue;
2590     }
2591   }
2592 }
2593
2594 /* -----------------------------------------------------------------------------
2595    resurrectThreads is called after garbage collection on the list of
2596    threads found to be garbage.  Each of these threads will be woken
2597    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2598    on an MVar, or NonTermination if the thread was blocked on a Black
2599    Hole.
2600
2601    Locks: assumes we hold *all* the capabilities.
2602    -------------------------------------------------------------------------- */
2603
2604 void
2605 resurrectThreads (StgTSO *threads)
2606 {
2607     StgTSO *tso, *next;
2608     Capability *cap;
2609     step *step;
2610
2611     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2612         next = tso->global_link;
2613
2614         step = Bdescr((P_)tso)->step;
2615         tso->global_link = step->threads;
2616         step->threads = tso;
2617
2618         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2619         
2620         // Wake up the thread on the Capability it was last on
2621         cap = tso->cap;
2622         
2623         switch (tso->why_blocked) {
2624         case BlockedOnMVar:
2625         case BlockedOnException:
2626             /* Called by GC - sched_mutex lock is currently held. */
2627             throwToSingleThreaded(cap, tso,
2628                                   (StgClosure *)blockedOnDeadMVar_closure);
2629             break;
2630         case BlockedOnBlackHole:
2631             throwToSingleThreaded(cap, tso,
2632                                   (StgClosure *)nonTermination_closure);
2633             break;
2634         case BlockedOnSTM:
2635             throwToSingleThreaded(cap, tso,
2636                                   (StgClosure *)blockedIndefinitely_closure);
2637             break;
2638         case NotBlocked:
2639             /* This might happen if the thread was blocked on a black hole
2640              * belonging to a thread that we've just woken up (raiseAsync
2641              * can wake up threads, remember...).
2642              */
2643             continue;
2644         default:
2645             barf("resurrectThreads: thread blocked in a strange way");
2646         }
2647     }
2648 }
2649
2650 /* -----------------------------------------------------------------------------
2651    performPendingThrowTos is called after garbage collection, and
2652    passed a list of threads that were found to have pending throwTos
2653    (tso->blocked_exceptions was not empty), and were blocked.
2654    Normally this doesn't happen, because we would deliver the
2655    exception directly if the target thread is blocked, but there are
2656    small windows where it might occur on a multiprocessor (see
2657    throwTo()).
2658
2659    NB. we must be holding all the capabilities at this point, just
2660    like resurrectThreads().
2661    -------------------------------------------------------------------------- */
2662
2663 void
2664 performPendingThrowTos (StgTSO *threads)
2665 {
2666     StgTSO *tso, *next;
2667     Capability *cap;
2668     step *step;
2669
2670     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2671         next = tso->global_link;
2672
2673         step = Bdescr((P_)tso)->step;
2674         tso->global_link = step->threads;
2675         step->threads = tso;
2676
2677         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2678         
2679         cap = tso->cap;
2680         maybePerformBlockedException(cap, tso);
2681     }
2682 }