deadlock fix: reset the flag *after* checking the blackhole queue
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
153 #endif
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
156 #endif
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
160                                          StgTSO *t);
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
162                                     nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
165                                              StgTSO *t );
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168                                 rtsBool force_major);
169
170 static rtsBool checkBlackHoles(Capability *cap);
171
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
174
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
177
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
180 #endif
181
182 #ifdef DEBUG
183 static char *whatNext_strs[] = {
184   "(unknown)",
185   "ThreadRunGHC",
186   "ThreadInterpret",
187   "ThreadKilled",
188   "ThreadRelocated",
189   "ThreadComplete"
190 };
191 #endif
192
193 /* -----------------------------------------------------------------------------
194  * Putting a thread on the run queue: different scheduling policies
195  * -------------------------------------------------------------------------- */
196
197 STATIC_INLINE void
198 addToRunQueue( Capability *cap, StgTSO *t )
199 {
200 #if defined(PARALLEL_HASKELL)
201     if (RtsFlags.ParFlags.doFairScheduling) { 
202         // this does round-robin scheduling; good for concurrency
203         appendToRunQueue(cap,t);
204     } else {
205         // this does unfair scheduling; good for parallelism
206         pushOnRunQueue(cap,t);
207     }
208 #else
209     // this does round-robin scheduling; good for concurrency
210     appendToRunQueue(cap,t);
211 #endif
212 }
213
214 /* ---------------------------------------------------------------------------
215    Main scheduling loop.
216
217    We use round-robin scheduling, each thread returning to the
218    scheduler loop when one of these conditions is detected:
219
220       * out of heap space
221       * timer expires (thread yields)
222       * thread blocks
223       * thread ends
224       * stack overflow
225
226    GRAN version:
227      In a GranSim setup this loop iterates over the global event queue.
228      This revolves around the global event queue, which determines what 
229      to do next. Therefore, it's more complicated than either the 
230      concurrent or the parallel (GUM) setup.
231   This version has been entirely removed (JB 2008/08).
232
233    GUM version:
234      GUM iterates over incoming messages.
235      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236      and sends out a fish whenever it has nothing to do; in-between
237      doing the actual reductions (shared code below) it processes the
238      incoming messages and deals with delayed operations 
239      (see PendingFetches).
240      This is not the ugliest code you could imagine, but it's bloody close.
241
242   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244   as well as future GUM versions. This file has been refurbished to
245   only contain valid code, which is however incomplete, refers to
246   invalid includes etc.
247
248    ------------------------------------------------------------------------ */
249
250 static Capability *
251 schedule (Capability *initialCapability, Task *task)
252 {
253   StgTSO *t;
254   Capability *cap;
255   StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257   rtsBool receivedFinish = rtsFalse;
258 #endif
259   nat prev_what_next;
260   rtsBool ready_to_gc;
261 #if defined(THREADED_RTS)
262   rtsBool first = rtsTrue;
263 #endif
264   
265   cap = initialCapability;
266
267   // Pre-condition: this task owns initialCapability.
268   // The sched_mutex is *NOT* held
269   // NB. on return, we still hold a capability.
270
271   debugTrace (DEBUG_sched, 
272               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273               task, initialCapability);
274
275   schedulePreLoop();
276
277   // -----------------------------------------------------------
278   // Scheduler loop starts here:
279
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION        (!receivedFinish)
282 #else
283 #define TERMINATION_CONDITION        rtsTrue
284 #endif
285
286   while (TERMINATION_CONDITION) {
287
288     // Check whether we have re-entered the RTS from Haskell without
289     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
290     // call).
291     if (cap->in_haskell) {
292           errorBelch("schedule: re-entered unsafely.\n"
293                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
294           stg_exit(EXIT_FAILURE);
295     }
296
297     // The interruption / shutdown sequence.
298     // 
299     // In order to cleanly shut down the runtime, we want to:
300     //   * make sure that all main threads return to their callers
301     //     with the state 'Interrupted'.
302     //   * clean up all OS threads assocated with the runtime
303     //   * free all memory etc.
304     //
305     // So the sequence for ^C goes like this:
306     //
307     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
308     //     arranges for some Capability to wake up
309     //
310     //   * all threads in the system are halted, and the zombies are
311     //     placed on the run queue for cleaning up.  We acquire all
312     //     the capabilities in order to delete the threads, this is
313     //     done by scheduleDoGC() for convenience (because GC already
314     //     needs to acquire all the capabilities).  We can't kill
315     //     threads involved in foreign calls.
316     // 
317     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
318     //
319     //   * sched_state := SCHED_SHUTTING_DOWN
320     //
321     //   * all workers exit when the run queue on their capability
322     //     drains.  All main threads will also exit when their TSO
323     //     reaches the head of the run queue and they can return.
324     //
325     //   * eventually all Capabilities will shut down, and the RTS can
326     //     exit.
327     //
328     //   * We might be left with threads blocked in foreign calls, 
329     //     we should really attempt to kill these somehow (TODO);
330     
331     switch (sched_state) {
332     case SCHED_RUNNING:
333         break;
334     case SCHED_INTERRUPTING:
335         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337         discardSparksCap(cap);
338 #endif
339         /* scheduleDoGC() deletes all the threads */
340         cap = scheduleDoGC(cap,task,rtsFalse);
341         break;
342     case SCHED_SHUTTING_DOWN:
343         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344         // If we are a worker, just exit.  If we're a bound thread
345         // then we will exit below when we've removed our TSO from
346         // the run queue.
347         if (task->tso == NULL && emptyRunQueue(cap)) {
348             return cap;
349         }
350         break;
351     default:
352         barf("sched_state: %d", sched_state);
353     }
354
355     scheduleFindWork(cap);
356
357     /* work pushing, currently relevant only for THREADED_RTS:
358        (pushes threads, wakes up idle capabilities for stealing) */
359     schedulePushWork(cap,task);
360
361 #if defined(PARALLEL_HASKELL)
362     /* since we perform a blocking receive and continue otherwise,
363        either we never reach here or we definitely have work! */
364     // from here: non-empty run queue
365     ASSERT(!emptyRunQueue(cap));
366
367     if (PacketsWaiting()) {  /* now process incoming messages, if any
368                                 pending...  
369
370                                 CAUTION: scheduleGetRemoteWork called
371                                 above, waits for messages as well! */
372       processMessages(cap, &receivedFinish);
373     }
374 #endif // PARALLEL_HASKELL: non-empty run queue!
375
376     scheduleDetectDeadlock(cap,task);
377
378 #if defined(THREADED_RTS)
379     cap = task->cap;    // reload cap, it might have changed
380 #endif
381
382     // Normally, the only way we can get here with no threads to
383     // run is if a keyboard interrupt received during 
384     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385     // Additionally, it is not fatal for the
386     // threaded RTS to reach here with no threads to run.
387     //
388     // win32: might be here due to awaitEvent() being abandoned
389     // as a result of a console event having been delivered.
390     
391 #if defined(THREADED_RTS)
392     if (first) 
393     {
394     // XXX: ToDo
395     //     // don't yield the first time, we want a chance to run this
396     //     // thread for a bit, even if there are others banging at the
397     //     // door.
398     //     first = rtsFalse;
399     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
400     }
401
402   yield:
403     scheduleYield(&cap,task);
404     if (emptyRunQueue(cap)) continue; // look for work again
405 #endif
406
407 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
408     if ( emptyRunQueue(cap) ) {
409         ASSERT(sched_state >= SCHED_INTERRUPTING);
410     }
411 #endif
412
413     // 
414     // Get a thread to run
415     //
416     t = popRunQueue(cap);
417
418     // Sanity check the thread we're about to run.  This can be
419     // expensive if there is lots of thread switching going on...
420     IF_DEBUG(sanity,checkTSO(t));
421
422 #if defined(THREADED_RTS)
423     // Check whether we can run this thread in the current task.
424     // If not, we have to pass our capability to the right task.
425     {
426         Task *bound = t->bound;
427       
428         if (bound) {
429             if (bound == task) {
430                 debugTrace(DEBUG_sched,
431                            "### Running thread %lu in bound thread", (unsigned long)t->id);
432                 // yes, the Haskell thread is bound to the current native thread
433             } else {
434                 debugTrace(DEBUG_sched,
435                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
436                 // no, bound to a different Haskell thread: pass to that thread
437                 pushOnRunQueue(cap,t);
438                 continue;
439             }
440         } else {
441             // The thread we want to run is unbound.
442             if (task->tso) { 
443                 debugTrace(DEBUG_sched,
444                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
445                 // no, the current native thread is bound to a different
446                 // Haskell thread, so pass it to any worker thread
447                 pushOnRunQueue(cap,t);
448                 continue; 
449             }
450         }
451     }
452 #endif
453
454     /* context switches are initiated by the timer signal, unless
455      * the user specified "context switch as often as possible", with
456      * +RTS -C0
457      */
458     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
459         && !emptyThreadQueues(cap)) {
460         cap->context_switch = 1;
461     }
462          
463 run_thread:
464
465     // CurrentTSO is the thread to run.  t might be different if we
466     // loop back to run_thread, so make sure to set CurrentTSO after
467     // that.
468     cap->r.rCurrentTSO = t;
469
470     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
471                               (long)t->id, whatNext_strs[t->what_next]);
472
473     startHeapProfTimer();
474
475     // Check for exceptions blocked on this thread
476     maybePerformBlockedException (cap, t);
477
478     // ----------------------------------------------------------------------
479     // Run the current thread 
480
481     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
482     ASSERT(t->cap == cap);
483     ASSERT(t->bound ? t->bound->cap == cap : 1);
484
485     prev_what_next = t->what_next;
486
487     errno = t->saved_errno;
488 #if mingw32_HOST_OS
489     SetLastError(t->saved_winerror);
490 #endif
491
492     cap->in_haskell = rtsTrue;
493
494     dirty_TSO(cap,t);
495
496 #if defined(THREADED_RTS)
497     if (recent_activity == ACTIVITY_DONE_GC) {
498         // ACTIVITY_DONE_GC means we turned off the timer signal to
499         // conserve power (see #1623).  Re-enable it here.
500         nat prev;
501         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
502         if (prev == ACTIVITY_DONE_GC) {
503             startTimer();
504         }
505     } else {
506         recent_activity = ACTIVITY_YES;
507     }
508 #endif
509
510     switch (prev_what_next) {
511         
512     case ThreadKilled:
513     case ThreadComplete:
514         /* Thread already finished, return to scheduler. */
515         ret = ThreadFinished;
516         break;
517         
518     case ThreadRunGHC:
519     {
520         StgRegTable *r;
521         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
522         cap = regTableToCapability(r);
523         ret = r->rRet;
524         break;
525     }
526     
527     case ThreadInterpret:
528         cap = interpretBCO(cap);
529         ret = cap->r.rRet;
530         break;
531         
532     default:
533         barf("schedule: invalid what_next field");
534     }
535
536     cap->in_haskell = rtsFalse;
537
538     // The TSO might have moved, eg. if it re-entered the RTS and a GC
539     // happened.  So find the new location:
540     t = cap->r.rCurrentTSO;
541
542     // We have run some Haskell code: there might be blackhole-blocked
543     // threads to wake up now.
544     // Lock-free test here should be ok, we're just setting a flag.
545     if ( blackhole_queue != END_TSO_QUEUE ) {
546         blackholes_need_checking = rtsTrue;
547     }
548     
549     // And save the current errno in this thread.
550     // XXX: possibly bogus for SMP because this thread might already
551     // be running again, see code below.
552     t->saved_errno = errno;
553 #if mingw32_HOST_OS
554     // Similarly for Windows error code
555     t->saved_winerror = GetLastError();
556 #endif
557
558 #if defined(THREADED_RTS)
559     // If ret is ThreadBlocked, and this Task is bound to the TSO that
560     // blocked, we are in limbo - the TSO is now owned by whatever it
561     // is blocked on, and may in fact already have been woken up,
562     // perhaps even on a different Capability.  It may be the case
563     // that task->cap != cap.  We better yield this Capability
564     // immediately and return to normaility.
565     if (ret == ThreadBlocked) {
566         debugTrace(DEBUG_sched,
567                    "--<< thread %lu (%s) stopped: blocked",
568                    (unsigned long)t->id, whatNext_strs[t->what_next]);
569         goto yield;
570     }
571 #endif
572
573     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
574     ASSERT(t->cap == cap);
575
576     // ----------------------------------------------------------------------
577     
578     // Costs for the scheduler are assigned to CCS_SYSTEM
579     stopHeapProfTimer();
580 #if defined(PROFILING)
581     CCCS = CCS_SYSTEM;
582 #endif
583     
584     schedulePostRunThread(cap,t);
585
586     t = threadStackUnderflow(task,t);
587
588     ready_to_gc = rtsFalse;
589
590     switch (ret) {
591     case HeapOverflow:
592         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
593         break;
594
595     case StackOverflow:
596         scheduleHandleStackOverflow(cap,task,t);
597         break;
598
599     case ThreadYielding:
600         if (scheduleHandleYield(cap, t, prev_what_next)) {
601             // shortcut for switching between compiler/interpreter:
602             goto run_thread; 
603         }
604         break;
605
606     case ThreadBlocked:
607         scheduleHandleThreadBlocked(t);
608         break;
609
610     case ThreadFinished:
611         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
612         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613         break;
614
615     default:
616       barf("schedule: invalid thread return code %d", (int)ret);
617     }
618
619     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
620       cap = scheduleDoGC(cap,task,rtsFalse);
621     }
622   } /* end of while() */
623 }
624
625 /* ----------------------------------------------------------------------------
626  * Setting up the scheduler loop
627  * ------------------------------------------------------------------------- */
628
629 static void
630 schedulePreLoop(void)
631 {
632   // initialisation for scheduler - what cannot go into initScheduler()  
633 }
634
635 /* -----------------------------------------------------------------------------
636  * scheduleFindWork()
637  *
638  * Search for work to do, and handle messages from elsewhere.
639  * -------------------------------------------------------------------------- */
640
641 static void
642 scheduleFindWork (Capability *cap)
643 {
644     scheduleStartSignalHandlers(cap);
645
646     // Only check the black holes here if we've nothing else to do.
647     // During normal execution, the black hole list only gets checked
648     // at GC time, to avoid repeatedly traversing this possibly long
649     // list each time around the scheduler.
650     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
651
652     scheduleCheckWakeupThreads(cap);
653
654     scheduleCheckBlockedThreads(cap);
655
656 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
657     // Try to activate one of our own sparks
658     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
659 #endif
660
661 #if defined(THREADED_RTS)
662     // Try to steak work if we don't have any
663     if (emptyRunQueue(cap)) { stealWork(cap); }
664 #endif
665     
666 #if defined(PARALLEL_HASKELL)
667     // if messages have been buffered...
668     scheduleSendPendingMessages();
669 #endif
670
671 #if defined(PARALLEL_HASKELL)
672     if (emptyRunQueue(cap)) {
673         receivedFinish = scheduleGetRemoteWork(cap);
674         continue; //  a new round, (hopefully) with new work
675         /* 
676            in GUM, this a) sends out a FISH and returns IF no fish is
677                            out already
678                         b) (blocking) awaits and receives messages
679            
680            in Eden, this is only the blocking receive, as b) in GUM.
681         */
682     }
683 #endif
684 }
685
686 #if defined(THREADED_RTS)
687 STATIC_INLINE rtsBool
688 shouldYieldCapability (Capability *cap, Task *task)
689 {
690     // we need to yield this capability to someone else if..
691     //   - another thread is initiating a GC
692     //   - another Task is returning from a foreign call
693     //   - the thread at the head of the run queue cannot be run
694     //     by this Task (it is bound to another Task, or it is unbound
695     //     and this task it bound).
696     return (waiting_for_gc || 
697             cap->returning_tasks_hd != NULL ||
698             (!emptyRunQueue(cap) && (task->tso == NULL
699                                      ? cap->run_queue_hd->bound != NULL
700                                      : cap->run_queue_hd->bound != task)));
701 }
702
703 // This is the single place where a Task goes to sleep.  There are
704 // two reasons it might need to sleep:
705 //    - there are no threads to run
706 //    - we need to yield this Capability to someone else 
707 //      (see shouldYieldCapability())
708 //
709 // The return value indicates whether 
710
711 static void
712 scheduleYield (Capability **pcap, Task *task)
713 {
714     Capability *cap = *pcap;
715
716     // if we have work, and we don't need to give up the Capability, continue.
717     if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
718         return;
719
720     // otherwise yield (sleep), and keep yielding if necessary.
721     do {
722         yieldCapability(&cap,task);
723     } 
724     while (shouldYieldCapability(cap,task));
725
726     // note there may still be no threads on the run queue at this
727     // point, the caller has to check.
728
729     *pcap = cap;
730     return;
731 }
732 #endif
733     
734 /* -----------------------------------------------------------------------------
735  * schedulePushWork()
736  *
737  * Push work to other Capabilities if we have some.
738  * -------------------------------------------------------------------------- */
739
740 static void
741 schedulePushWork(Capability *cap USED_IF_THREADS, 
742                  Task *task      USED_IF_THREADS)
743 {
744   /* following code not for PARALLEL_HASKELL. I kept the call general,
745      future GUM versions might use pushing in a distributed setup */
746 #if defined(THREADED_RTS)
747
748     Capability *free_caps[n_capabilities], *cap0;
749     nat i, n_free_caps;
750
751     // migration can be turned off with +RTS -qg
752     if (!RtsFlags.ParFlags.migrate) return;
753
754     // Check whether we have more threads on our run queue, or sparks
755     // in our pool, that we could hand to another Capability.
756     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
757         && sparkPoolSizeCap(cap) < 2) {
758         return;
759     }
760
761     // First grab as many free Capabilities as we can.
762     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
763         cap0 = &capabilities[i];
764         if (cap != cap0 && tryGrabCapability(cap0,task)) {
765             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
766                 // it already has some work, we just grabbed it at 
767                 // the wrong moment.  Or maybe it's deadlocked!
768                 releaseCapability(cap0);
769             } else {
770                 free_caps[n_free_caps++] = cap0;
771             }
772         }
773     }
774
775     // we now have n_free_caps free capabilities stashed in
776     // free_caps[].  Share our run queue equally with them.  This is
777     // probably the simplest thing we could do; improvements we might
778     // want to do include:
779     //
780     //   - giving high priority to moving relatively new threads, on 
781     //     the gournds that they haven't had time to build up a
782     //     working set in the cache on this CPU/Capability.
783     //
784     //   - giving low priority to moving long-lived threads
785
786     if (n_free_caps > 0) {
787         StgTSO *prev, *t, *next;
788         rtsBool pushed_to_all;
789
790         debugTrace(DEBUG_sched, 
791                    "cap %d: %s and %d free capabilities, sharing...", 
792                    cap->no, 
793                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
794                    "excess threads on run queue":"sparks to share (>=2)",
795                    n_free_caps);
796
797         i = 0;
798         pushed_to_all = rtsFalse;
799
800         if (cap->run_queue_hd != END_TSO_QUEUE) {
801             prev = cap->run_queue_hd;
802             t = prev->_link;
803             prev->_link = END_TSO_QUEUE;
804             for (; t != END_TSO_QUEUE; t = next) {
805                 next = t->_link;
806                 t->_link = END_TSO_QUEUE;
807                 if (t->what_next == ThreadRelocated
808                     || t->bound == task // don't move my bound thread
809                     || tsoLocked(t)) {  // don't move a locked thread
810                     setTSOLink(cap, prev, t);
811                     prev = t;
812                 } else if (i == n_free_caps) {
813                     pushed_to_all = rtsTrue;
814                     i = 0;
815                     // keep one for us
816                     setTSOLink(cap, prev, t);
817                     prev = t;
818                 } else {
819                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
820                     appendToRunQueue(free_caps[i],t);
821                     if (t->bound) { t->bound->cap = free_caps[i]; }
822                     t->cap = free_caps[i];
823                     i++;
824                 }
825             }
826             cap->run_queue_tl = prev;
827         }
828
829 #ifdef SPARK_PUSHING
830         /* JB I left this code in place, it would work but is not necessary */
831
832         // If there are some free capabilities that we didn't push any
833         // threads to, then try to push a spark to each one.
834         if (!pushed_to_all) {
835             StgClosure *spark;
836             // i is the next free capability to push to
837             for (; i < n_free_caps; i++) {
838                 if (emptySparkPoolCap(free_caps[i])) {
839                     spark = tryStealSpark(cap->sparks);
840                     if (spark != NULL) {
841                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
842                         newSpark(&(free_caps[i]->r), spark);
843                     }
844                 }
845             }
846         }
847 #endif /* SPARK_PUSHING */
848
849         // release the capabilities
850         for (i = 0; i < n_free_caps; i++) {
851             task->cap = free_caps[i];
852             releaseAndWakeupCapability(free_caps[i]);
853         }
854     }
855     task->cap = cap; // reset to point to our Capability.
856
857 #endif /* THREADED_RTS */
858
859 }
860
861 /* ----------------------------------------------------------------------------
862  * Start any pending signal handlers
863  * ------------------------------------------------------------------------- */
864
865 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
866 static void
867 scheduleStartSignalHandlers(Capability *cap)
868 {
869     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
870         // safe outside the lock
871         startSignalHandlers(cap);
872     }
873 }
874 #else
875 static void
876 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
877 {
878 }
879 #endif
880
881 /* ----------------------------------------------------------------------------
882  * Check for blocked threads that can be woken up.
883  * ------------------------------------------------------------------------- */
884
885 static void
886 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
887 {
888 #if !defined(THREADED_RTS)
889     //
890     // Check whether any waiting threads need to be woken up.  If the
891     // run queue is empty, and there are no other tasks running, we
892     // can wait indefinitely for something to happen.
893     //
894     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
895     {
896         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
897     }
898 #endif
899 }
900
901
902 /* ----------------------------------------------------------------------------
903  * Check for threads woken up by other Capabilities
904  * ------------------------------------------------------------------------- */
905
906 static void
907 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
908 {
909 #if defined(THREADED_RTS)
910     // Any threads that were woken up by other Capabilities get
911     // appended to our run queue.
912     if (!emptyWakeupQueue(cap)) {
913         ACQUIRE_LOCK(&cap->lock);
914         if (emptyRunQueue(cap)) {
915             cap->run_queue_hd = cap->wakeup_queue_hd;
916             cap->run_queue_tl = cap->wakeup_queue_tl;
917         } else {
918             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
919             cap->run_queue_tl = cap->wakeup_queue_tl;
920         }
921         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
922         RELEASE_LOCK(&cap->lock);
923     }
924 #endif
925 }
926
927 /* ----------------------------------------------------------------------------
928  * Check for threads blocked on BLACKHOLEs that can be woken up
929  * ------------------------------------------------------------------------- */
930 static void
931 scheduleCheckBlackHoles (Capability *cap)
932 {
933     if ( blackholes_need_checking ) // check without the lock first
934     {
935         ACQUIRE_LOCK(&sched_mutex);
936         if ( blackholes_need_checking ) {
937             blackholes_need_checking = rtsFalse;
938             // important that we reset the flag *before* checking the
939             // blackhole queue, otherwise we could get deadlock.  This
940             // happens as follows: we wake up a thread that
941             // immediately runs on another Capability, blocks on a
942             // blackhole, and then we reset the blackholes_need_checking flag.
943             checkBlackHoles(cap);
944         }
945         RELEASE_LOCK(&sched_mutex);
946     }
947 }
948
949 /* ----------------------------------------------------------------------------
950  * Detect deadlock conditions and attempt to resolve them.
951  * ------------------------------------------------------------------------- */
952
953 static void
954 scheduleDetectDeadlock (Capability *cap, Task *task)
955 {
956
957 #if defined(PARALLEL_HASKELL)
958     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
959     return;
960 #endif
961
962     /* 
963      * Detect deadlock: when we have no threads to run, there are no
964      * threads blocked, waiting for I/O, or sleeping, and all the
965      * other tasks are waiting for work, we must have a deadlock of
966      * some description.
967      */
968     if ( emptyThreadQueues(cap) )
969     {
970 #if defined(THREADED_RTS)
971         /* 
972          * In the threaded RTS, we only check for deadlock if there
973          * has been no activity in a complete timeslice.  This means
974          * we won't eagerly start a full GC just because we don't have
975          * any threads to run currently.
976          */
977         if (recent_activity != ACTIVITY_INACTIVE) return;
978 #endif
979
980         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
981
982         // Garbage collection can release some new threads due to
983         // either (a) finalizers or (b) threads resurrected because
984         // they are unreachable and will therefore be sent an
985         // exception.  Any threads thus released will be immediately
986         // runnable.
987         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
988
989         recent_activity = ACTIVITY_DONE_GC;
990         // disable timer signals (see #1623)
991         stopTimer();
992         
993         if ( !emptyRunQueue(cap) ) return;
994
995 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
996         /* If we have user-installed signal handlers, then wait
997          * for signals to arrive rather then bombing out with a
998          * deadlock.
999          */
1000         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1001             debugTrace(DEBUG_sched,
1002                        "still deadlocked, waiting for signals...");
1003
1004             awaitUserSignals();
1005
1006             if (signals_pending()) {
1007                 startSignalHandlers(cap);
1008             }
1009
1010             // either we have threads to run, or we were interrupted:
1011             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1012
1013             return;
1014         }
1015 #endif
1016
1017 #if !defined(THREADED_RTS)
1018         /* Probably a real deadlock.  Send the current main thread the
1019          * Deadlock exception.
1020          */
1021         if (task->tso) {
1022             switch (task->tso->why_blocked) {
1023             case BlockedOnSTM:
1024             case BlockedOnBlackHole:
1025             case BlockedOnException:
1026             case BlockedOnMVar:
1027                 throwToSingleThreaded(cap, task->tso, 
1028                                       (StgClosure *)nonTermination_closure);
1029                 return;
1030             default:
1031                 barf("deadlock: main thread blocked in a strange way");
1032             }
1033         }
1034         return;
1035 #endif
1036     }
1037 }
1038
1039
1040 /* ----------------------------------------------------------------------------
1041  * Send pending messages (PARALLEL_HASKELL only)
1042  * ------------------------------------------------------------------------- */
1043
1044 #if defined(PARALLEL_HASKELL)
1045 static void
1046 scheduleSendPendingMessages(void)
1047 {
1048
1049 # if defined(PAR) // global Mem.Mgmt., omit for now
1050     if (PendingFetches != END_BF_QUEUE) {
1051         processFetches();
1052     }
1053 # endif
1054     
1055     if (RtsFlags.ParFlags.BufferTime) {
1056         // if we use message buffering, we must send away all message
1057         // packets which have become too old...
1058         sendOldBuffers(); 
1059     }
1060 }
1061 #endif
1062
1063 /* ----------------------------------------------------------------------------
1064  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1065  * ------------------------------------------------------------------------- */
1066
1067 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1068 static void
1069 scheduleActivateSpark(Capability *cap)
1070 {
1071     StgClosure *spark;
1072
1073 /* We only want to stay here if the run queue is empty and we want some
1074    work. We try to turn a spark into a thread, and add it to the run
1075    queue, from where it will be picked up in the next iteration of the
1076    scheduler loop.  
1077 */
1078     if (!emptyRunQueue(cap)) 
1079       /* In the threaded RTS, another task might have pushed a thread
1080          on our run queue in the meantime ? But would need a lock.. */
1081       return;
1082
1083  
1084     // Really we should be using reclaimSpark() here, but
1085     // experimentally it doesn't seem to perform as well as just
1086     // stealing from our own spark pool:
1087     // spark = reclaimSpark(cap->sparks);
1088     spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1089
1090     if (spark != NULL) {
1091       debugTrace(DEBUG_sched,
1092                  "turning spark of closure %p into a thread",
1093                  (StgClosure *)spark);
1094       createSparkThread(cap,spark); // defined in Sparks.c
1095     }
1096 }
1097 #endif // PARALLEL_HASKELL || THREADED_RTS
1098
1099 /* ----------------------------------------------------------------------------
1100  * Get work from a remote node (PARALLEL_HASKELL only)
1101  * ------------------------------------------------------------------------- */
1102     
1103 #if defined(PARALLEL_HASKELL)
1104 static rtsBool /* return value used in PARALLEL_HASKELL only */
1105 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1106 {
1107 #if defined(PARALLEL_HASKELL)
1108   rtsBool receivedFinish = rtsFalse;
1109
1110   // idle() , i.e. send all buffers, wait for work
1111   if (RtsFlags.ParFlags.BufferTime) {
1112         IF_PAR_DEBUG(verbose, 
1113                 debugBelch("...send all pending data,"));
1114         {
1115           nat i;
1116           for (i=1; i<=nPEs; i++)
1117             sendImmediately(i); // send all messages away immediately
1118         }
1119   }
1120
1121   /* this would be the place for fishing in GUM... 
1122
1123      if (no-earlier-fish-around) 
1124           sendFish(choosePe());
1125    */
1126
1127   // Eden:just look for incoming messages (blocking receive)
1128   IF_PAR_DEBUG(verbose, 
1129                debugBelch("...wait for incoming messages...\n"));
1130   processMessages(cap, &receivedFinish); // blocking receive...
1131
1132
1133   return receivedFinish;
1134   // reenter scheduling look after having received something
1135
1136 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1137
1138   return rtsFalse; /* return value unused in THREADED_RTS */
1139
1140 #endif /* PARALLEL_HASKELL */
1141 }
1142 #endif // PARALLEL_HASKELL || THREADED_RTS
1143
1144 /* ----------------------------------------------------------------------------
1145  * After running a thread...
1146  * ------------------------------------------------------------------------- */
1147
1148 static void
1149 schedulePostRunThread (Capability *cap, StgTSO *t)
1150 {
1151     // We have to be able to catch transactions that are in an
1152     // infinite loop as a result of seeing an inconsistent view of
1153     // memory, e.g. 
1154     //
1155     //   atomically $ do
1156     //       [a,b] <- mapM readTVar [ta,tb]
1157     //       when (a == b) loop
1158     //
1159     // and a is never equal to b given a consistent view of memory.
1160     //
1161     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1162         if (!stmValidateNestOfTransactions (t -> trec)) {
1163             debugTrace(DEBUG_sched | DEBUG_stm,
1164                        "trec %p found wasting its time", t);
1165             
1166             // strip the stack back to the
1167             // ATOMICALLY_FRAME, aborting the (nested)
1168             // transaction, and saving the stack of any
1169             // partially-evaluated thunks on the heap.
1170             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1171             
1172             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1173         }
1174     }
1175
1176   /* some statistics gathering in the parallel case */
1177 }
1178
1179 /* -----------------------------------------------------------------------------
1180  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1181  * -------------------------------------------------------------------------- */
1182
1183 static rtsBool
1184 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1185 {
1186     // did the task ask for a large block?
1187     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1188         // if so, get one and push it on the front of the nursery.
1189         bdescr *bd;
1190         lnat blocks;
1191         
1192         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1193         
1194         debugTrace(DEBUG_sched,
1195                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1196                    (long)t->id, whatNext_strs[t->what_next], blocks);
1197     
1198         // don't do this if the nursery is (nearly) full, we'll GC first.
1199         if (cap->r.rCurrentNursery->link != NULL ||
1200             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1201                                                // if the nursery has only one block.
1202             
1203             ACQUIRE_SM_LOCK
1204             bd = allocGroup( blocks );
1205             RELEASE_SM_LOCK
1206             cap->r.rNursery->n_blocks += blocks;
1207             
1208             // link the new group into the list
1209             bd->link = cap->r.rCurrentNursery;
1210             bd->u.back = cap->r.rCurrentNursery->u.back;
1211             if (cap->r.rCurrentNursery->u.back != NULL) {
1212                 cap->r.rCurrentNursery->u.back->link = bd;
1213             } else {
1214 #if !defined(THREADED_RTS)
1215                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1216                        g0s0 == cap->r.rNursery);
1217 #endif
1218                 cap->r.rNursery->blocks = bd;
1219             }             
1220             cap->r.rCurrentNursery->u.back = bd;
1221             
1222             // initialise it as a nursery block.  We initialise the
1223             // step, gen_no, and flags field of *every* sub-block in
1224             // this large block, because this is easier than making
1225             // sure that we always find the block head of a large
1226             // block whenever we call Bdescr() (eg. evacuate() and
1227             // isAlive() in the GC would both have to do this, at
1228             // least).
1229             { 
1230                 bdescr *x;
1231                 for (x = bd; x < bd + blocks; x++) {
1232                     x->step = cap->r.rNursery;
1233                     x->gen_no = 0;
1234                     x->flags = 0;
1235                 }
1236             }
1237             
1238             // This assert can be a killer if the app is doing lots
1239             // of large block allocations.
1240             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1241             
1242             // now update the nursery to point to the new block
1243             cap->r.rCurrentNursery = bd;
1244             
1245             // we might be unlucky and have another thread get on the
1246             // run queue before us and steal the large block, but in that
1247             // case the thread will just end up requesting another large
1248             // block.
1249             pushOnRunQueue(cap,t);
1250             return rtsFalse;  /* not actually GC'ing */
1251         }
1252     }
1253     
1254     debugTrace(DEBUG_sched,
1255                "--<< thread %ld (%s) stopped: HeapOverflow",
1256                (long)t->id, whatNext_strs[t->what_next]);
1257
1258     if (cap->context_switch) {
1259         // Sometimes we miss a context switch, e.g. when calling
1260         // primitives in a tight loop, MAYBE_GC() doesn't check the
1261         // context switch flag, and we end up waiting for a GC.
1262         // See #1984, and concurrent/should_run/1984
1263         cap->context_switch = 0;
1264         addToRunQueue(cap,t);
1265     } else {
1266         pushOnRunQueue(cap,t);
1267     }
1268     return rtsTrue;
1269     /* actual GC is done at the end of the while loop in schedule() */
1270 }
1271
1272 /* -----------------------------------------------------------------------------
1273  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1274  * -------------------------------------------------------------------------- */
1275
1276 static void
1277 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1278 {
1279     debugTrace (DEBUG_sched,
1280                 "--<< thread %ld (%s) stopped, StackOverflow", 
1281                 (long)t->id, whatNext_strs[t->what_next]);
1282
1283     /* just adjust the stack for this thread, then pop it back
1284      * on the run queue.
1285      */
1286     { 
1287         /* enlarge the stack */
1288         StgTSO *new_t = threadStackOverflow(cap, t);
1289         
1290         /* The TSO attached to this Task may have moved, so update the
1291          * pointer to it.
1292          */
1293         if (task->tso == t) {
1294             task->tso = new_t;
1295         }
1296         pushOnRunQueue(cap,new_t);
1297     }
1298 }
1299
1300 /* -----------------------------------------------------------------------------
1301  * Handle a thread that returned to the scheduler with ThreadYielding
1302  * -------------------------------------------------------------------------- */
1303
1304 static rtsBool
1305 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1306 {
1307     // Reset the context switch flag.  We don't do this just before
1308     // running the thread, because that would mean we would lose ticks
1309     // during GC, which can lead to unfair scheduling (a thread hogs
1310     // the CPU because the tick always arrives during GC).  This way
1311     // penalises threads that do a lot of allocation, but that seems
1312     // better than the alternative.
1313     cap->context_switch = 0;
1314     
1315     /* put the thread back on the run queue.  Then, if we're ready to
1316      * GC, check whether this is the last task to stop.  If so, wake
1317      * up the GC thread.  getThread will block during a GC until the
1318      * GC is finished.
1319      */
1320 #ifdef DEBUG
1321     if (t->what_next != prev_what_next) {
1322         debugTrace(DEBUG_sched,
1323                    "--<< thread %ld (%s) stopped to switch evaluators", 
1324                    (long)t->id, whatNext_strs[t->what_next]);
1325     } else {
1326         debugTrace(DEBUG_sched,
1327                    "--<< thread %ld (%s) stopped, yielding",
1328                    (long)t->id, whatNext_strs[t->what_next]);
1329     }
1330 #endif
1331     
1332     IF_DEBUG(sanity,
1333              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1334              checkTSO(t));
1335     ASSERT(t->_link == END_TSO_QUEUE);
1336     
1337     // Shortcut if we're just switching evaluators: don't bother
1338     // doing stack squeezing (which can be expensive), just run the
1339     // thread.
1340     if (t->what_next != prev_what_next) {
1341         return rtsTrue;
1342     }
1343
1344     addToRunQueue(cap,t);
1345
1346     return rtsFalse;
1347 }
1348
1349 /* -----------------------------------------------------------------------------
1350  * Handle a thread that returned to the scheduler with ThreadBlocked
1351  * -------------------------------------------------------------------------- */
1352
1353 static void
1354 scheduleHandleThreadBlocked( StgTSO *t
1355 #if !defined(GRAN) && !defined(DEBUG)
1356     STG_UNUSED
1357 #endif
1358     )
1359 {
1360
1361       // We don't need to do anything.  The thread is blocked, and it
1362       // has tidied up its stack and placed itself on whatever queue
1363       // it needs to be on.
1364
1365     // ASSERT(t->why_blocked != NotBlocked);
1366     // Not true: for example,
1367     //    - in THREADED_RTS, the thread may already have been woken
1368     //      up by another Capability.  This actually happens: try
1369     //      conc023 +RTS -N2.
1370     //    - the thread may have woken itself up already, because
1371     //      threadPaused() might have raised a blocked throwTo
1372     //      exception, see maybePerformBlockedException().
1373
1374 #ifdef DEBUG
1375     if (traceClass(DEBUG_sched)) {
1376         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1377                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1378         printThreadBlockage(t);
1379         debugTraceEnd();
1380     }
1381 #endif
1382 }
1383
1384 /* -----------------------------------------------------------------------------
1385  * Handle a thread that returned to the scheduler with ThreadFinished
1386  * -------------------------------------------------------------------------- */
1387
1388 static rtsBool
1389 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1390 {
1391     /* Need to check whether this was a main thread, and if so,
1392      * return with the return value.
1393      *
1394      * We also end up here if the thread kills itself with an
1395      * uncaught exception, see Exception.cmm.
1396      */
1397     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1398                (unsigned long)t->id, whatNext_strs[t->what_next]);
1399
1400       //
1401       // Check whether the thread that just completed was a bound
1402       // thread, and if so return with the result.  
1403       //
1404       // There is an assumption here that all thread completion goes
1405       // through this point; we need to make sure that if a thread
1406       // ends up in the ThreadKilled state, that it stays on the run
1407       // queue so it can be dealt with here.
1408       //
1409
1410       if (t->bound) {
1411
1412           if (t->bound != task) {
1413 #if !defined(THREADED_RTS)
1414               // Must be a bound thread that is not the topmost one.  Leave
1415               // it on the run queue until the stack has unwound to the
1416               // point where we can deal with this.  Leaving it on the run
1417               // queue also ensures that the garbage collector knows about
1418               // this thread and its return value (it gets dropped from the
1419               // step->threads list so there's no other way to find it).
1420               appendToRunQueue(cap,t);
1421               return rtsFalse;
1422 #else
1423               // this cannot happen in the threaded RTS, because a
1424               // bound thread can only be run by the appropriate Task.
1425               barf("finished bound thread that isn't mine");
1426 #endif
1427           }
1428
1429           ASSERT(task->tso == t);
1430
1431           if (t->what_next == ThreadComplete) {
1432               if (task->ret) {
1433                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1434                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1435               }
1436               task->stat = Success;
1437           } else {
1438               if (task->ret) {
1439                   *(task->ret) = NULL;
1440               }
1441               if (sched_state >= SCHED_INTERRUPTING) {
1442                   task->stat = Interrupted;
1443               } else {
1444                   task->stat = Killed;
1445               }
1446           }
1447 #ifdef DEBUG
1448           removeThreadLabel((StgWord)task->tso->id);
1449 #endif
1450           return rtsTrue; // tells schedule() to return
1451       }
1452
1453       return rtsFalse;
1454 }
1455
1456 /* -----------------------------------------------------------------------------
1457  * Perform a heap census
1458  * -------------------------------------------------------------------------- */
1459
1460 static rtsBool
1461 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1462 {
1463     // When we have +RTS -i0 and we're heap profiling, do a census at
1464     // every GC.  This lets us get repeatable runs for debugging.
1465     if (performHeapProfile ||
1466         (RtsFlags.ProfFlags.profileInterval==0 &&
1467          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1468         return rtsTrue;
1469     } else {
1470         return rtsFalse;
1471     }
1472 }
1473
1474 /* -----------------------------------------------------------------------------
1475  * Perform a garbage collection if necessary
1476  * -------------------------------------------------------------------------- */
1477
1478 static Capability *
1479 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1480 {
1481     rtsBool heap_census;
1482 #ifdef THREADED_RTS
1483     /* extern static volatile StgWord waiting_for_gc; 
1484        lives inside capability.c */
1485     rtsBool was_waiting;
1486     nat i;
1487 #endif
1488
1489 #ifdef THREADED_RTS
1490     // In order to GC, there must be no threads running Haskell code.
1491     // Therefore, the GC thread needs to hold *all* the capabilities,
1492     // and release them after the GC has completed.  
1493     //
1494     // This seems to be the simplest way: previous attempts involved
1495     // making all the threads with capabilities give up their
1496     // capabilities and sleep except for the *last* one, which
1497     // actually did the GC.  But it's quite hard to arrange for all
1498     // the other tasks to sleep and stay asleep.
1499     //
1500         
1501     /*  Other capabilities are prevented from running yet more Haskell
1502         threads if waiting_for_gc is set. Tested inside
1503         yieldCapability() and releaseCapability() in Capability.c */
1504
1505     was_waiting = cas(&waiting_for_gc, 0, 1);
1506     if (was_waiting) {
1507         do {
1508             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1509             if (cap) yieldCapability(&cap,task);
1510         } while (waiting_for_gc);
1511         return cap;  // NOTE: task->cap might have changed here
1512     }
1513
1514     setContextSwitches();
1515     for (i=0; i < n_capabilities; i++) {
1516         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1517         if (cap != &capabilities[i]) {
1518             Capability *pcap = &capabilities[i];
1519             // we better hope this task doesn't get migrated to
1520             // another Capability while we're waiting for this one.
1521             // It won't, because load balancing happens while we have
1522             // all the Capabilities, but even so it's a slightly
1523             // unsavoury invariant.
1524             task->cap = pcap;
1525             waitForReturnCapability(&pcap, task);
1526             if (pcap != &capabilities[i]) {
1527                 barf("scheduleDoGC: got the wrong capability");
1528             }
1529         }
1530     }
1531
1532     waiting_for_gc = rtsFalse;
1533 #endif
1534
1535     // so this happens periodically:
1536     if (cap) scheduleCheckBlackHoles(cap);
1537     
1538     IF_DEBUG(scheduler, printAllThreads());
1539
1540     /*
1541      * We now have all the capabilities; if we're in an interrupting
1542      * state, then we should take the opportunity to delete all the
1543      * threads in the system.
1544      */
1545     if (sched_state >= SCHED_INTERRUPTING) {
1546         deleteAllThreads(&capabilities[0]);
1547         sched_state = SCHED_SHUTTING_DOWN;
1548     }
1549     
1550     heap_census = scheduleNeedHeapProfile(rtsTrue);
1551
1552     /* everybody back, start the GC.
1553      * Could do it in this thread, or signal a condition var
1554      * to do it in another thread.  Either way, we need to
1555      * broadcast on gc_pending_cond afterward.
1556      */
1557 #if defined(THREADED_RTS)
1558     debugTrace(DEBUG_sched, "doing GC");
1559 #endif
1560     GarbageCollect(force_major || heap_census);
1561     
1562     if (heap_census) {
1563         debugTrace(DEBUG_sched, "performing heap census");
1564         heapCensus();
1565         performHeapProfile = rtsFalse;
1566     }
1567
1568 #ifdef SPARKBALANCE
1569     /* JB 
1570        Once we are all together... this would be the place to balance all
1571        spark pools. No concurrent stealing or adding of new sparks can
1572        occur. Should be defined in Sparks.c. */
1573     balanceSparkPoolsCaps(n_capabilities, capabilities);
1574 #endif
1575
1576 #if defined(THREADED_RTS)
1577     // release our stash of capabilities.
1578     for (i = 0; i < n_capabilities; i++) {
1579         if (cap != &capabilities[i]) {
1580             task->cap = &capabilities[i];
1581             releaseCapability(&capabilities[i]);
1582         }
1583     }
1584     if (cap) {
1585         task->cap = cap;
1586     } else {
1587         task->cap = NULL;
1588     }
1589 #endif
1590
1591     return cap;
1592 }
1593
1594 /* ---------------------------------------------------------------------------
1595  * Singleton fork(). Do not copy any running threads.
1596  * ------------------------------------------------------------------------- */
1597
1598 pid_t
1599 forkProcess(HsStablePtr *entry
1600 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1601             STG_UNUSED
1602 #endif
1603            )
1604 {
1605 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1606     Task *task;
1607     pid_t pid;
1608     StgTSO* t,*next;
1609     Capability *cap;
1610     nat s;
1611     
1612 #if defined(THREADED_RTS)
1613     if (RtsFlags.ParFlags.nNodes > 1) {
1614         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1615         stg_exit(EXIT_FAILURE);
1616     }
1617 #endif
1618
1619     debugTrace(DEBUG_sched, "forking!");
1620     
1621     // ToDo: for SMP, we should probably acquire *all* the capabilities
1622     cap = rts_lock();
1623     
1624     // no funny business: hold locks while we fork, otherwise if some
1625     // other thread is holding a lock when the fork happens, the data
1626     // structure protected by the lock will forever be in an
1627     // inconsistent state in the child.  See also #1391.
1628     ACQUIRE_LOCK(&sched_mutex);
1629     ACQUIRE_LOCK(&cap->lock);
1630     ACQUIRE_LOCK(&cap->running_task->lock);
1631
1632     pid = fork();
1633     
1634     if (pid) { // parent
1635         
1636         RELEASE_LOCK(&sched_mutex);
1637         RELEASE_LOCK(&cap->lock);
1638         RELEASE_LOCK(&cap->running_task->lock);
1639
1640         // just return the pid
1641         rts_unlock(cap);
1642         return pid;
1643         
1644     } else { // child
1645         
1646 #if defined(THREADED_RTS)
1647         initMutex(&sched_mutex);
1648         initMutex(&cap->lock);
1649         initMutex(&cap->running_task->lock);
1650 #endif
1651
1652         // Now, all OS threads except the thread that forked are
1653         // stopped.  We need to stop all Haskell threads, including
1654         // those involved in foreign calls.  Also we need to delete
1655         // all Tasks, because they correspond to OS threads that are
1656         // now gone.
1657
1658         for (s = 0; s < total_steps; s++) {
1659           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1660             if (t->what_next == ThreadRelocated) {
1661                 next = t->_link;
1662             } else {
1663                 next = t->global_link;
1664                 // don't allow threads to catch the ThreadKilled
1665                 // exception, but we do want to raiseAsync() because these
1666                 // threads may be evaluating thunks that we need later.
1667                 deleteThread_(cap,t);
1668             }
1669           }
1670         }
1671         
1672         // Empty the run queue.  It seems tempting to let all the
1673         // killed threads stay on the run queue as zombies to be
1674         // cleaned up later, but some of them correspond to bound
1675         // threads for which the corresponding Task does not exist.
1676         cap->run_queue_hd = END_TSO_QUEUE;
1677         cap->run_queue_tl = END_TSO_QUEUE;
1678
1679         // Any suspended C-calling Tasks are no more, their OS threads
1680         // don't exist now:
1681         cap->suspended_ccalling_tasks = NULL;
1682
1683         // Empty the threads lists.  Otherwise, the garbage
1684         // collector may attempt to resurrect some of these threads.
1685         for (s = 0; s < total_steps; s++) {
1686             all_steps[s].threads = END_TSO_QUEUE;
1687         }
1688
1689         // Wipe the task list, except the current Task.
1690         ACQUIRE_LOCK(&sched_mutex);
1691         for (task = all_tasks; task != NULL; task=task->all_link) {
1692             if (task != cap->running_task) {
1693 #if defined(THREADED_RTS)
1694                 initMutex(&task->lock); // see #1391
1695 #endif
1696                 discardTask(task);
1697             }
1698         }
1699         RELEASE_LOCK(&sched_mutex);
1700
1701 #if defined(THREADED_RTS)
1702         // Wipe our spare workers list, they no longer exist.  New
1703         // workers will be created if necessary.
1704         cap->spare_workers = NULL;
1705         cap->returning_tasks_hd = NULL;
1706         cap->returning_tasks_tl = NULL;
1707 #endif
1708
1709         // On Unix, all timers are reset in the child, so we need to start
1710         // the timer again.
1711         initTimer();
1712         startTimer();
1713
1714         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1715         rts_checkSchedStatus("forkProcess",cap);
1716         
1717         rts_unlock(cap);
1718         hs_exit();                      // clean up and exit
1719         stg_exit(EXIT_SUCCESS);
1720     }
1721 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1722     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1723     return -1;
1724 #endif
1725 }
1726
1727 /* ---------------------------------------------------------------------------
1728  * Delete all the threads in the system
1729  * ------------------------------------------------------------------------- */
1730    
1731 static void
1732 deleteAllThreads ( Capability *cap )
1733 {
1734     // NOTE: only safe to call if we own all capabilities.
1735
1736     StgTSO* t, *next;
1737     nat s;
1738
1739     debugTrace(DEBUG_sched,"deleting all threads");
1740     for (s = 0; s < total_steps; s++) {
1741       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1742         if (t->what_next == ThreadRelocated) {
1743             next = t->_link;
1744         } else {
1745             next = t->global_link;
1746             deleteThread(cap,t);
1747         }
1748       }
1749     }      
1750
1751     // The run queue now contains a bunch of ThreadKilled threads.  We
1752     // must not throw these away: the main thread(s) will be in there
1753     // somewhere, and the main scheduler loop has to deal with it.
1754     // Also, the run queue is the only thing keeping these threads from
1755     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1756
1757 #if !defined(THREADED_RTS)
1758     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1759     ASSERT(sleeping_queue == END_TSO_QUEUE);
1760 #endif
1761 }
1762
1763 /* -----------------------------------------------------------------------------
1764    Managing the suspended_ccalling_tasks list.
1765    Locks required: sched_mutex
1766    -------------------------------------------------------------------------- */
1767
1768 STATIC_INLINE void
1769 suspendTask (Capability *cap, Task *task)
1770 {
1771     ASSERT(task->next == NULL && task->prev == NULL);
1772     task->next = cap->suspended_ccalling_tasks;
1773     task->prev = NULL;
1774     if (cap->suspended_ccalling_tasks) {
1775         cap->suspended_ccalling_tasks->prev = task;
1776     }
1777     cap->suspended_ccalling_tasks = task;
1778 }
1779
1780 STATIC_INLINE void
1781 recoverSuspendedTask (Capability *cap, Task *task)
1782 {
1783     if (task->prev) {
1784         task->prev->next = task->next;
1785     } else {
1786         ASSERT(cap->suspended_ccalling_tasks == task);
1787         cap->suspended_ccalling_tasks = task->next;
1788     }
1789     if (task->next) {
1790         task->next->prev = task->prev;
1791     }
1792     task->next = task->prev = NULL;
1793 }
1794
1795 /* ---------------------------------------------------------------------------
1796  * Suspending & resuming Haskell threads.
1797  * 
1798  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1799  * its capability before calling the C function.  This allows another
1800  * task to pick up the capability and carry on running Haskell
1801  * threads.  It also means that if the C call blocks, it won't lock
1802  * the whole system.
1803  *
1804  * The Haskell thread making the C call is put to sleep for the
1805  * duration of the call, on the susepended_ccalling_threads queue.  We
1806  * give out a token to the task, which it can use to resume the thread
1807  * on return from the C function.
1808  * ------------------------------------------------------------------------- */
1809    
1810 void *
1811 suspendThread (StgRegTable *reg)
1812 {
1813   Capability *cap;
1814   int saved_errno;
1815   StgTSO *tso;
1816   Task *task;
1817 #if mingw32_HOST_OS
1818   StgWord32 saved_winerror;
1819 #endif
1820
1821   saved_errno = errno;
1822 #if mingw32_HOST_OS
1823   saved_winerror = GetLastError();
1824 #endif
1825
1826   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1827    */
1828   cap = regTableToCapability(reg);
1829
1830   task = cap->running_task;
1831   tso = cap->r.rCurrentTSO;
1832
1833   debugTrace(DEBUG_sched, 
1834              "thread %lu did a safe foreign call", 
1835              (unsigned long)cap->r.rCurrentTSO->id);
1836
1837   // XXX this might not be necessary --SDM
1838   tso->what_next = ThreadRunGHC;
1839
1840   threadPaused(cap,tso);
1841
1842   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1843       tso->why_blocked = BlockedOnCCall;
1844       tso->flags |= TSO_BLOCKEX;
1845       tso->flags &= ~TSO_INTERRUPTIBLE;
1846   } else {
1847       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1848   }
1849
1850   // Hand back capability
1851   task->suspended_tso = tso;
1852
1853   ACQUIRE_LOCK(&cap->lock);
1854
1855   suspendTask(cap,task);
1856   cap->in_haskell = rtsFalse;
1857   releaseCapability_(cap,rtsFalse);
1858   
1859   RELEASE_LOCK(&cap->lock);
1860
1861 #if defined(THREADED_RTS)
1862   /* Preparing to leave the RTS, so ensure there's a native thread/task
1863      waiting to take over.
1864   */
1865   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1866 #endif
1867
1868   errno = saved_errno;
1869 #if mingw32_HOST_OS
1870   SetLastError(saved_winerror);
1871 #endif
1872   return task;
1873 }
1874
1875 StgRegTable *
1876 resumeThread (void *task_)
1877 {
1878     StgTSO *tso;
1879     Capability *cap;
1880     Task *task = task_;
1881     int saved_errno;
1882 #if mingw32_HOST_OS
1883     StgWord32 saved_winerror;
1884 #endif
1885
1886     saved_errno = errno;
1887 #if mingw32_HOST_OS
1888     saved_winerror = GetLastError();
1889 #endif
1890
1891     cap = task->cap;
1892     // Wait for permission to re-enter the RTS with the result.
1893     waitForReturnCapability(&cap,task);
1894     // we might be on a different capability now... but if so, our
1895     // entry on the suspended_ccalling_tasks list will also have been
1896     // migrated.
1897
1898     // Remove the thread from the suspended list
1899     recoverSuspendedTask(cap,task);
1900
1901     tso = task->suspended_tso;
1902     task->suspended_tso = NULL;
1903     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1904     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1905     
1906     if (tso->why_blocked == BlockedOnCCall) {
1907         awakenBlockedExceptionQueue(cap,tso);
1908         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1909     }
1910     
1911     /* Reset blocking status */
1912     tso->why_blocked  = NotBlocked;
1913     
1914     cap->r.rCurrentTSO = tso;
1915     cap->in_haskell = rtsTrue;
1916     errno = saved_errno;
1917 #if mingw32_HOST_OS
1918     SetLastError(saved_winerror);
1919 #endif
1920
1921     /* We might have GC'd, mark the TSO dirty again */
1922     dirty_TSO(cap,tso);
1923
1924     IF_DEBUG(sanity, checkTSO(tso));
1925
1926     return &cap->r;
1927 }
1928
1929 /* ---------------------------------------------------------------------------
1930  * scheduleThread()
1931  *
1932  * scheduleThread puts a thread on the end  of the runnable queue.
1933  * This will usually be done immediately after a thread is created.
1934  * The caller of scheduleThread must create the thread using e.g.
1935  * createThread and push an appropriate closure
1936  * on this thread's stack before the scheduler is invoked.
1937  * ------------------------------------------------------------------------ */
1938
1939 void
1940 scheduleThread(Capability *cap, StgTSO *tso)
1941 {
1942     // The thread goes at the *end* of the run-queue, to avoid possible
1943     // starvation of any threads already on the queue.
1944     appendToRunQueue(cap,tso);
1945 }
1946
1947 void
1948 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1949 {
1950 #if defined(THREADED_RTS)
1951     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1952                               // move this thread from now on.
1953     cpu %= RtsFlags.ParFlags.nNodes;
1954     if (cpu == cap->no) {
1955         appendToRunQueue(cap,tso);
1956     } else {
1957         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1958     }
1959 #else
1960     appendToRunQueue(cap,tso);
1961 #endif
1962 }
1963
1964 Capability *
1965 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1966 {
1967     Task *task;
1968
1969     // We already created/initialised the Task
1970     task = cap->running_task;
1971
1972     // This TSO is now a bound thread; make the Task and TSO
1973     // point to each other.
1974     tso->bound = task;
1975     tso->cap = cap;
1976
1977     task->tso = tso;
1978     task->ret = ret;
1979     task->stat = NoStatus;
1980
1981     appendToRunQueue(cap,tso);
1982
1983     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1984
1985     cap = schedule(cap,task);
1986
1987     ASSERT(task->stat != NoStatus);
1988     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1989
1990     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1991     return cap;
1992 }
1993
1994 /* ----------------------------------------------------------------------------
1995  * Starting Tasks
1996  * ------------------------------------------------------------------------- */
1997
1998 #if defined(THREADED_RTS)
1999 void OSThreadProcAttr
2000 workerStart(Task *task)
2001 {
2002     Capability *cap;
2003
2004     // See startWorkerTask().
2005     ACQUIRE_LOCK(&task->lock);
2006     cap = task->cap;
2007     RELEASE_LOCK(&task->lock);
2008
2009     // set the thread-local pointer to the Task:
2010     taskEnter(task);
2011
2012     // schedule() runs without a lock.
2013     cap = schedule(cap,task);
2014
2015     // On exit from schedule(), we have a Capability.
2016     releaseCapability(cap);
2017     workerTaskStop(task);
2018 }
2019 #endif
2020
2021 /* ---------------------------------------------------------------------------
2022  * initScheduler()
2023  *
2024  * Initialise the scheduler.  This resets all the queues - if the
2025  * queues contained any threads, they'll be garbage collected at the
2026  * next pass.
2027  *
2028  * ------------------------------------------------------------------------ */
2029
2030 void 
2031 initScheduler(void)
2032 {
2033 #if !defined(THREADED_RTS)
2034   blocked_queue_hd  = END_TSO_QUEUE;
2035   blocked_queue_tl  = END_TSO_QUEUE;
2036   sleeping_queue    = END_TSO_QUEUE;
2037 #endif
2038
2039   blackhole_queue   = END_TSO_QUEUE;
2040
2041   sched_state    = SCHED_RUNNING;
2042   recent_activity = ACTIVITY_YES;
2043
2044 #if defined(THREADED_RTS)
2045   /* Initialise the mutex and condition variables used by
2046    * the scheduler. */
2047   initMutex(&sched_mutex);
2048 #endif
2049   
2050   ACQUIRE_LOCK(&sched_mutex);
2051
2052   /* A capability holds the state a native thread needs in
2053    * order to execute STG code. At least one capability is
2054    * floating around (only THREADED_RTS builds have more than one).
2055    */
2056   initCapabilities();
2057
2058   initTaskManager();
2059
2060 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2061   initSparkPools();
2062 #endif
2063
2064 #if defined(THREADED_RTS)
2065   /*
2066    * Eagerly start one worker to run each Capability, except for
2067    * Capability 0.  The idea is that we're probably going to start a
2068    * bound thread on Capability 0 pretty soon, so we don't want a
2069    * worker task hogging it.
2070    */
2071   { 
2072       nat i;
2073       Capability *cap;
2074       for (i = 1; i < n_capabilities; i++) {
2075           cap = &capabilities[i];
2076           ACQUIRE_LOCK(&cap->lock);
2077           startWorkerTask(cap, workerStart);
2078           RELEASE_LOCK(&cap->lock);
2079       }
2080   }
2081 #endif
2082
2083   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2084
2085   RELEASE_LOCK(&sched_mutex);
2086 }
2087
2088 void
2089 exitScheduler(
2090     rtsBool wait_foreign
2091 #if !defined(THREADED_RTS)
2092                          __attribute__((unused))
2093 #endif
2094 )
2095                /* see Capability.c, shutdownCapability() */
2096 {
2097     Task *task = NULL;
2098
2099 #if defined(THREADED_RTS)
2100     ACQUIRE_LOCK(&sched_mutex);
2101     task = newBoundTask();
2102     RELEASE_LOCK(&sched_mutex);
2103 #endif
2104
2105     // If we haven't killed all the threads yet, do it now.
2106     if (sched_state < SCHED_SHUTTING_DOWN) {
2107         sched_state = SCHED_INTERRUPTING;
2108         scheduleDoGC(NULL,task,rtsFalse);    
2109     }
2110     sched_state = SCHED_SHUTTING_DOWN;
2111
2112 #if defined(THREADED_RTS)
2113     { 
2114         nat i;
2115         
2116         for (i = 0; i < n_capabilities; i++) {
2117             shutdownCapability(&capabilities[i], task, wait_foreign);
2118         }
2119         boundTaskExiting(task);
2120         stopTaskManager();
2121     }
2122 #endif
2123 }
2124
2125 void
2126 freeScheduler( void )
2127 {
2128     freeCapabilities();
2129     freeTaskManager();
2130     if (n_capabilities != 1) {
2131         stgFree(capabilities);
2132     }
2133 #if defined(THREADED_RTS)
2134     closeMutex(&sched_mutex);
2135 #endif
2136 }
2137
2138 /* -----------------------------------------------------------------------------
2139    performGC
2140
2141    This is the interface to the garbage collector from Haskell land.
2142    We provide this so that external C code can allocate and garbage
2143    collect when called from Haskell via _ccall_GC.
2144    -------------------------------------------------------------------------- */
2145
2146 static void
2147 performGC_(rtsBool force_major)
2148 {
2149     Task *task;
2150     // We must grab a new Task here, because the existing Task may be
2151     // associated with a particular Capability, and chained onto the 
2152     // suspended_ccalling_tasks queue.
2153     ACQUIRE_LOCK(&sched_mutex);
2154     task = newBoundTask();
2155     RELEASE_LOCK(&sched_mutex);
2156     scheduleDoGC(NULL,task,force_major);
2157     boundTaskExiting(task);
2158 }
2159
2160 void
2161 performGC(void)
2162 {
2163     performGC_(rtsFalse);
2164 }
2165
2166 void
2167 performMajorGC(void)
2168 {
2169     performGC_(rtsTrue);
2170 }
2171
2172 /* -----------------------------------------------------------------------------
2173    Stack overflow
2174
2175    If the thread has reached its maximum stack size, then raise the
2176    StackOverflow exception in the offending thread.  Otherwise
2177    relocate the TSO into a larger chunk of memory and adjust its stack
2178    size appropriately.
2179    -------------------------------------------------------------------------- */
2180
2181 static StgTSO *
2182 threadStackOverflow(Capability *cap, StgTSO *tso)
2183 {
2184   nat new_stack_size, stack_words;
2185   lnat new_tso_size;
2186   StgPtr new_sp;
2187   StgTSO *dest;
2188
2189   IF_DEBUG(sanity,checkTSO(tso));
2190
2191   // don't allow throwTo() to modify the blocked_exceptions queue
2192   // while we are moving the TSO:
2193   lockClosure((StgClosure *)tso);
2194
2195   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2196       // NB. never raise a StackOverflow exception if the thread is
2197       // inside Control.Exceptino.block.  It is impractical to protect
2198       // against stack overflow exceptions, since virtually anything
2199       // can raise one (even 'catch'), so this is the only sensible
2200       // thing to do here.  See bug #767.
2201
2202       debugTrace(DEBUG_gc,
2203                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2204                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2205       IF_DEBUG(gc,
2206                /* If we're debugging, just print out the top of the stack */
2207                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2208                                                 tso->sp+64)));
2209
2210       // Send this thread the StackOverflow exception
2211       unlockTSO(tso);
2212       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2213       return tso;
2214   }
2215
2216   /* Try to double the current stack size.  If that takes us over the
2217    * maximum stack size for this thread, then use the maximum instead
2218    * (that is, unless we're already at or over the max size and we
2219    * can't raise the StackOverflow exception (see above), in which
2220    * case just double the size). Finally round up so the TSO ends up as
2221    * a whole number of blocks.
2222    */
2223   if (tso->stack_size >= tso->max_stack_size) {
2224       new_stack_size = tso->stack_size * 2;
2225   } else { 
2226       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2227   }
2228   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2229                                        TSO_STRUCT_SIZE)/sizeof(W_);
2230   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2231   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2232
2233   debugTrace(DEBUG_sched, 
2234              "increasing stack size from %ld words to %d.",
2235              (long)tso->stack_size, new_stack_size);
2236
2237   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2238   TICK_ALLOC_TSO(new_stack_size,0);
2239
2240   /* copy the TSO block and the old stack into the new area */
2241   memcpy(dest,tso,TSO_STRUCT_SIZE);
2242   stack_words = tso->stack + tso->stack_size - tso->sp;
2243   new_sp = (P_)dest + new_tso_size - stack_words;
2244   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2245
2246   /* relocate the stack pointers... */
2247   dest->sp         = new_sp;
2248   dest->stack_size = new_stack_size;
2249         
2250   /* Mark the old TSO as relocated.  We have to check for relocated
2251    * TSOs in the garbage collector and any primops that deal with TSOs.
2252    *
2253    * It's important to set the sp value to just beyond the end
2254    * of the stack, so we don't attempt to scavenge any part of the
2255    * dead TSO's stack.
2256    */
2257   tso->what_next = ThreadRelocated;
2258   setTSOLink(cap,tso,dest);
2259   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2260   tso->why_blocked = NotBlocked;
2261
2262   IF_PAR_DEBUG(verbose,
2263                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2264                      tso->id, tso, tso->stack_size);
2265                /* If we're debugging, just print out the top of the stack */
2266                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2267                                                 tso->sp+64)));
2268   
2269   unlockTSO(dest);
2270   unlockTSO(tso);
2271
2272   IF_DEBUG(sanity,checkTSO(dest));
2273 #if 0
2274   IF_DEBUG(scheduler,printTSO(dest));
2275 #endif
2276
2277   return dest;
2278 }
2279
2280 static StgTSO *
2281 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2282 {
2283     bdescr *bd, *new_bd;
2284     lnat free_w, tso_size_w;
2285     StgTSO *new_tso;
2286
2287     tso_size_w = tso_sizeW(tso);
2288
2289     if (tso_size_w < MBLOCK_SIZE_W || 
2290         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2291     {
2292         return tso;
2293     }
2294
2295     // don't allow throwTo() to modify the blocked_exceptions queue
2296     // while we are moving the TSO:
2297     lockClosure((StgClosure *)tso);
2298
2299     // this is the number of words we'll free
2300     free_w = round_to_mblocks(tso_size_w/2);
2301
2302     bd = Bdescr((StgPtr)tso);
2303     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2304     bd->free = bd->start + TSO_STRUCT_SIZEW;
2305
2306     new_tso = (StgTSO *)new_bd->start;
2307     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2308     new_tso->stack_size = new_bd->free - new_tso->stack;
2309
2310     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2311                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2312
2313     tso->what_next = ThreadRelocated;
2314     tso->_link = new_tso; // no write barrier reqd: same generation
2315
2316     // The TSO attached to this Task may have moved, so update the
2317     // pointer to it.
2318     if (task->tso == tso) {
2319         task->tso = new_tso;
2320     }
2321
2322     unlockTSO(new_tso);
2323     unlockTSO(tso);
2324
2325     IF_DEBUG(sanity,checkTSO(new_tso));
2326
2327     return new_tso;
2328 }
2329
2330 /* ---------------------------------------------------------------------------
2331    Interrupt execution
2332    - usually called inside a signal handler so it mustn't do anything fancy.   
2333    ------------------------------------------------------------------------ */
2334
2335 void
2336 interruptStgRts(void)
2337 {
2338     sched_state = SCHED_INTERRUPTING;
2339     setContextSwitches();
2340     wakeUpRts();
2341 }
2342
2343 /* -----------------------------------------------------------------------------
2344    Wake up the RTS
2345    
2346    This function causes at least one OS thread to wake up and run the
2347    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2348    an external event has arrived that may need servicing (eg. a
2349    keyboard interrupt).
2350
2351    In the single-threaded RTS we don't do anything here; we only have
2352    one thread anyway, and the event that caused us to want to wake up
2353    will have interrupted any blocking system call in progress anyway.
2354    -------------------------------------------------------------------------- */
2355
2356 void
2357 wakeUpRts(void)
2358 {
2359 #if defined(THREADED_RTS)
2360     // This forces the IO Manager thread to wakeup, which will
2361     // in turn ensure that some OS thread wakes up and runs the
2362     // scheduler loop, which will cause a GC and deadlock check.
2363     ioManagerWakeup();
2364 #endif
2365 }
2366
2367 /* -----------------------------------------------------------------------------
2368  * checkBlackHoles()
2369  *
2370  * Check the blackhole_queue for threads that can be woken up.  We do
2371  * this periodically: before every GC, and whenever the run queue is
2372  * empty.
2373  *
2374  * An elegant solution might be to just wake up all the blocked
2375  * threads with awakenBlockedQueue occasionally: they'll go back to
2376  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2377  * doesn't give us a way to tell whether we've actually managed to
2378  * wake up any threads, so we would be busy-waiting.
2379  *
2380  * -------------------------------------------------------------------------- */
2381
2382 static rtsBool
2383 checkBlackHoles (Capability *cap)
2384 {
2385     StgTSO **prev, *t;
2386     rtsBool any_woke_up = rtsFalse;
2387     StgHalfWord type;
2388
2389     // blackhole_queue is global:
2390     ASSERT_LOCK_HELD(&sched_mutex);
2391
2392     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2393
2394     // ASSUMES: sched_mutex
2395     prev = &blackhole_queue;
2396     t = blackhole_queue;
2397     while (t != END_TSO_QUEUE) {
2398         ASSERT(t->why_blocked == BlockedOnBlackHole);
2399         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2400         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2401             IF_DEBUG(sanity,checkTSO(t));
2402             t = unblockOne(cap, t);
2403             *prev = t;
2404             any_woke_up = rtsTrue;
2405         } else {
2406             prev = &t->_link;
2407             t = t->_link;
2408         }
2409     }
2410
2411     return any_woke_up;
2412 }
2413
2414 /* -----------------------------------------------------------------------------
2415    Deleting threads
2416
2417    This is used for interruption (^C) and forking, and corresponds to
2418    raising an exception but without letting the thread catch the
2419    exception.
2420    -------------------------------------------------------------------------- */
2421
2422 static void 
2423 deleteThread (Capability *cap, StgTSO *tso)
2424 {
2425     // NOTE: must only be called on a TSO that we have exclusive
2426     // access to, because we will call throwToSingleThreaded() below.
2427     // The TSO must be on the run queue of the Capability we own, or 
2428     // we must own all Capabilities.
2429
2430     if (tso->why_blocked != BlockedOnCCall &&
2431         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2432         throwToSingleThreaded(cap,tso,NULL);
2433     }
2434 }
2435
2436 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2437 static void 
2438 deleteThread_(Capability *cap, StgTSO *tso)
2439 { // for forkProcess only:
2440   // like deleteThread(), but we delete threads in foreign calls, too.
2441
2442     if (tso->why_blocked == BlockedOnCCall ||
2443         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2444         unblockOne(cap,tso);
2445         tso->what_next = ThreadKilled;
2446     } else {
2447         deleteThread(cap,tso);
2448     }
2449 }
2450 #endif
2451
2452 /* -----------------------------------------------------------------------------
2453    raiseExceptionHelper
2454    
2455    This function is called by the raise# primitve, just so that we can
2456    move some of the tricky bits of raising an exception from C-- into
2457    C.  Who knows, it might be a useful re-useable thing here too.
2458    -------------------------------------------------------------------------- */
2459
2460 StgWord
2461 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2462 {
2463     Capability *cap = regTableToCapability(reg);
2464     StgThunk *raise_closure = NULL;
2465     StgPtr p, next;
2466     StgRetInfoTable *info;
2467     //
2468     // This closure represents the expression 'raise# E' where E
2469     // is the exception raise.  It is used to overwrite all the
2470     // thunks which are currently under evaluataion.
2471     //
2472
2473     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2474     // LDV profiling: stg_raise_info has THUNK as its closure
2475     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2476     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2477     // 1 does not cause any problem unless profiling is performed.
2478     // However, when LDV profiling goes on, we need to linearly scan
2479     // small object pool, where raise_closure is stored, so we should
2480     // use MIN_UPD_SIZE.
2481     //
2482     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2483     //                                 sizeofW(StgClosure)+1);
2484     //
2485
2486     //
2487     // Walk up the stack, looking for the catch frame.  On the way,
2488     // we update any closures pointed to from update frames with the
2489     // raise closure that we just built.
2490     //
2491     p = tso->sp;
2492     while(1) {
2493         info = get_ret_itbl((StgClosure *)p);
2494         next = p + stack_frame_sizeW((StgClosure *)p);
2495         switch (info->i.type) {
2496             
2497         case UPDATE_FRAME:
2498             // Only create raise_closure if we need to.
2499             if (raise_closure == NULL) {
2500                 raise_closure = 
2501                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2502                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2503                 raise_closure->payload[0] = exception;
2504             }
2505             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2506             p = next;
2507             continue;
2508
2509         case ATOMICALLY_FRAME:
2510             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2511             tso->sp = p;
2512             return ATOMICALLY_FRAME;
2513             
2514         case CATCH_FRAME:
2515             tso->sp = p;
2516             return CATCH_FRAME;
2517
2518         case CATCH_STM_FRAME:
2519             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2520             tso->sp = p;
2521             return CATCH_STM_FRAME;
2522             
2523         case STOP_FRAME:
2524             tso->sp = p;
2525             return STOP_FRAME;
2526
2527         case CATCH_RETRY_FRAME:
2528         default:
2529             p = next; 
2530             continue;
2531         }
2532     }
2533 }
2534
2535
2536 /* -----------------------------------------------------------------------------
2537    findRetryFrameHelper
2538
2539    This function is called by the retry# primitive.  It traverses the stack
2540    leaving tso->sp referring to the frame which should handle the retry.  
2541
2542    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2543    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2544
2545    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2546    create) because retries are not considered to be exceptions, despite the
2547    similar implementation.
2548
2549    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2550    not be created within memory transactions.
2551    -------------------------------------------------------------------------- */
2552
2553 StgWord
2554 findRetryFrameHelper (StgTSO *tso)
2555 {
2556   StgPtr           p, next;
2557   StgRetInfoTable *info;
2558
2559   p = tso -> sp;
2560   while (1) {
2561     info = get_ret_itbl((StgClosure *)p);
2562     next = p + stack_frame_sizeW((StgClosure *)p);
2563     switch (info->i.type) {
2564       
2565     case ATOMICALLY_FRAME:
2566         debugTrace(DEBUG_stm,
2567                    "found ATOMICALLY_FRAME at %p during retry", p);
2568         tso->sp = p;
2569         return ATOMICALLY_FRAME;
2570       
2571     case CATCH_RETRY_FRAME:
2572         debugTrace(DEBUG_stm,
2573                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2574         tso->sp = p;
2575         return CATCH_RETRY_FRAME;
2576       
2577     case CATCH_STM_FRAME: {
2578         StgTRecHeader *trec = tso -> trec;
2579         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2580         debugTrace(DEBUG_stm,
2581                    "found CATCH_STM_FRAME at %p during retry", p);
2582         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2583         stmAbortTransaction(tso -> cap, trec);
2584         stmFreeAbortedTRec(tso -> cap, trec);
2585         tso -> trec = outer;
2586         p = next; 
2587         continue;
2588     }
2589       
2590
2591     default:
2592       ASSERT(info->i.type != CATCH_FRAME);
2593       ASSERT(info->i.type != STOP_FRAME);
2594       p = next; 
2595       continue;
2596     }
2597   }
2598 }
2599
2600 /* -----------------------------------------------------------------------------
2601    resurrectThreads is called after garbage collection on the list of
2602    threads found to be garbage.  Each of these threads will be woken
2603    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2604    on an MVar, or NonTermination if the thread was blocked on a Black
2605    Hole.
2606
2607    Locks: assumes we hold *all* the capabilities.
2608    -------------------------------------------------------------------------- */
2609
2610 void
2611 resurrectThreads (StgTSO *threads)
2612 {
2613     StgTSO *tso, *next;
2614     Capability *cap;
2615     step *step;
2616
2617     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2618         next = tso->global_link;
2619
2620         step = Bdescr((P_)tso)->step;
2621         tso->global_link = step->threads;
2622         step->threads = tso;
2623
2624         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2625         
2626         // Wake up the thread on the Capability it was last on
2627         cap = tso->cap;
2628         
2629         switch (tso->why_blocked) {
2630         case BlockedOnMVar:
2631         case BlockedOnException:
2632             /* Called by GC - sched_mutex lock is currently held. */
2633             throwToSingleThreaded(cap, tso,
2634                                   (StgClosure *)blockedOnDeadMVar_closure);
2635             break;
2636         case BlockedOnBlackHole:
2637             throwToSingleThreaded(cap, tso,
2638                                   (StgClosure *)nonTermination_closure);
2639             break;
2640         case BlockedOnSTM:
2641             throwToSingleThreaded(cap, tso,
2642                                   (StgClosure *)blockedIndefinitely_closure);
2643             break;
2644         case NotBlocked:
2645             /* This might happen if the thread was blocked on a black hole
2646              * belonging to a thread that we've just woken up (raiseAsync
2647              * can wake up threads, remember...).
2648              */
2649             continue;
2650         default:
2651             barf("resurrectThreads: thread blocked in a strange way");
2652         }
2653     }
2654 }
2655
2656 /* -----------------------------------------------------------------------------
2657    performPendingThrowTos is called after garbage collection, and
2658    passed a list of threads that were found to have pending throwTos
2659    (tso->blocked_exceptions was not empty), and were blocked.
2660    Normally this doesn't happen, because we would deliver the
2661    exception directly if the target thread is blocked, but there are
2662    small windows where it might occur on a multiprocessor (see
2663    throwTo()).
2664
2665    NB. we must be holding all the capabilities at this point, just
2666    like resurrectThreads().
2667    -------------------------------------------------------------------------- */
2668
2669 void
2670 performPendingThrowTos (StgTSO *threads)
2671 {
2672     StgTSO *tso, *next;
2673     Capability *cap;
2674     step *step;
2675
2676     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2677         next = tso->global_link;
2678
2679         step = Bdescr((P_)tso)->step;
2680         tso->global_link = step->threads;
2681         step->threads = tso;
2682
2683         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2684         
2685         cap = tso->cap;
2686         maybePerformBlockedException(cap, tso);
2687     }
2688 }