2e9d0dda773ec1c3f432815c047f9ae16d7e8497
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
153 #endif
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
156 #endif
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
160                                          StgTSO *t);
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
162                                     nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
165                                              StgTSO *t );
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168                                 rtsBool force_major);
169
170 static rtsBool checkBlackHoles(Capability *cap);
171
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
174
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
177
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
180 #endif
181
182 #ifdef DEBUG
183 static char *whatNext_strs[] = {
184   "(unknown)",
185   "ThreadRunGHC",
186   "ThreadInterpret",
187   "ThreadKilled",
188   "ThreadRelocated",
189   "ThreadComplete"
190 };
191 #endif
192
193 /* -----------------------------------------------------------------------------
194  * Putting a thread on the run queue: different scheduling policies
195  * -------------------------------------------------------------------------- */
196
197 STATIC_INLINE void
198 addToRunQueue( Capability *cap, StgTSO *t )
199 {
200 #if defined(PARALLEL_HASKELL)
201     if (RtsFlags.ParFlags.doFairScheduling) { 
202         // this does round-robin scheduling; good for concurrency
203         appendToRunQueue(cap,t);
204     } else {
205         // this does unfair scheduling; good for parallelism
206         pushOnRunQueue(cap,t);
207     }
208 #else
209     // this does round-robin scheduling; good for concurrency
210     appendToRunQueue(cap,t);
211 #endif
212 }
213
214 /* ---------------------------------------------------------------------------
215    Main scheduling loop.
216
217    We use round-robin scheduling, each thread returning to the
218    scheduler loop when one of these conditions is detected:
219
220       * out of heap space
221       * timer expires (thread yields)
222       * thread blocks
223       * thread ends
224       * stack overflow
225
226    GRAN version:
227      In a GranSim setup this loop iterates over the global event queue.
228      This revolves around the global event queue, which determines what 
229      to do next. Therefore, it's more complicated than either the 
230      concurrent or the parallel (GUM) setup.
231   This version has been entirely removed (JB 2008/08).
232
233    GUM version:
234      GUM iterates over incoming messages.
235      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236      and sends out a fish whenever it has nothing to do; in-between
237      doing the actual reductions (shared code below) it processes the
238      incoming messages and deals with delayed operations 
239      (see PendingFetches).
240      This is not the ugliest code you could imagine, but it's bloody close.
241
242   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244   as well as future GUM versions. This file has been refurbished to
245   only contain valid code, which is however incomplete, refers to
246   invalid includes etc.
247
248    ------------------------------------------------------------------------ */
249
250 static Capability *
251 schedule (Capability *initialCapability, Task *task)
252 {
253   StgTSO *t;
254   Capability *cap;
255   StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257   rtsBool receivedFinish = rtsFalse;
258 #endif
259   nat prev_what_next;
260   rtsBool ready_to_gc;
261 #if defined(THREADED_RTS)
262   rtsBool first = rtsTrue;
263 #endif
264   
265   cap = initialCapability;
266
267   // Pre-condition: this task owns initialCapability.
268   // The sched_mutex is *NOT* held
269   // NB. on return, we still hold a capability.
270
271   debugTrace (DEBUG_sched, 
272               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273               task, initialCapability);
274
275   schedulePreLoop();
276
277   // -----------------------------------------------------------
278   // Scheduler loop starts here:
279
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION        (!receivedFinish)
282 #else
283 #define TERMINATION_CONDITION        rtsTrue
284 #endif
285
286   while (TERMINATION_CONDITION) {
287
288     // Check whether we have re-entered the RTS from Haskell without
289     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
290     // call).
291     if (cap->in_haskell) {
292           errorBelch("schedule: re-entered unsafely.\n"
293                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
294           stg_exit(EXIT_FAILURE);
295     }
296
297     // The interruption / shutdown sequence.
298     // 
299     // In order to cleanly shut down the runtime, we want to:
300     //   * make sure that all main threads return to their callers
301     //     with the state 'Interrupted'.
302     //   * clean up all OS threads assocated with the runtime
303     //   * free all memory etc.
304     //
305     // So the sequence for ^C goes like this:
306     //
307     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
308     //     arranges for some Capability to wake up
309     //
310     //   * all threads in the system are halted, and the zombies are
311     //     placed on the run queue for cleaning up.  We acquire all
312     //     the capabilities in order to delete the threads, this is
313     //     done by scheduleDoGC() for convenience (because GC already
314     //     needs to acquire all the capabilities).  We can't kill
315     //     threads involved in foreign calls.
316     // 
317     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
318     //
319     //   * sched_state := SCHED_SHUTTING_DOWN
320     //
321     //   * all workers exit when the run queue on their capability
322     //     drains.  All main threads will also exit when their TSO
323     //     reaches the head of the run queue and they can return.
324     //
325     //   * eventually all Capabilities will shut down, and the RTS can
326     //     exit.
327     //
328     //   * We might be left with threads blocked in foreign calls, 
329     //     we should really attempt to kill these somehow (TODO);
330     
331     switch (sched_state) {
332     case SCHED_RUNNING:
333         break;
334     case SCHED_INTERRUPTING:
335         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337         discardSparksCap(cap);
338 #endif
339         /* scheduleDoGC() deletes all the threads */
340         cap = scheduleDoGC(cap,task,rtsFalse);
341         break;
342     case SCHED_SHUTTING_DOWN:
343         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344         // If we are a worker, just exit.  If we're a bound thread
345         // then we will exit below when we've removed our TSO from
346         // the run queue.
347         if (task->tso == NULL && emptyRunQueue(cap)) {
348             return cap;
349         }
350         break;
351     default:
352         barf("sched_state: %d", sched_state);
353     }
354
355     scheduleFindWork(cap);
356
357     /* work pushing, currently relevant only for THREADED_RTS:
358        (pushes threads, wakes up idle capabilities for stealing) */
359     schedulePushWork(cap,task);
360
361 #if defined(PARALLEL_HASKELL)
362     /* since we perform a blocking receive and continue otherwise,
363        either we never reach here or we definitely have work! */
364     // from here: non-empty run queue
365     ASSERT(!emptyRunQueue(cap));
366
367     if (PacketsWaiting()) {  /* now process incoming messages, if any
368                                 pending...  
369
370                                 CAUTION: scheduleGetRemoteWork called
371                                 above, waits for messages as well! */
372       processMessages(cap, &receivedFinish);
373     }
374 #endif // PARALLEL_HASKELL: non-empty run queue!
375
376     scheduleDetectDeadlock(cap,task);
377
378 #if defined(THREADED_RTS)
379     cap = task->cap;    // reload cap, it might have changed
380 #endif
381
382     // Normally, the only way we can get here with no threads to
383     // run is if a keyboard interrupt received during 
384     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385     // Additionally, it is not fatal for the
386     // threaded RTS to reach here with no threads to run.
387     //
388     // win32: might be here due to awaitEvent() being abandoned
389     // as a result of a console event having been delivered.
390     
391 #if defined(THREADED_RTS)
392     if (first) 
393     {
394     // XXX: ToDo
395     //     // don't yield the first time, we want a chance to run this
396     //     // thread for a bit, even if there are others banging at the
397     //     // door.
398     //     first = rtsFalse;
399     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
400     }
401
402     scheduleYield(&cap,task);
403     if (emptyRunQueue(cap)) continue; // look for work again
404 #endif
405
406 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
407     if ( emptyRunQueue(cap) ) {
408         ASSERT(sched_state >= SCHED_INTERRUPTING);
409     }
410 #endif
411
412     // 
413     // Get a thread to run
414     //
415     t = popRunQueue(cap);
416
417     // Sanity check the thread we're about to run.  This can be
418     // expensive if there is lots of thread switching going on...
419     IF_DEBUG(sanity,checkTSO(t));
420
421 #if defined(THREADED_RTS)
422     // Check whether we can run this thread in the current task.
423     // If not, we have to pass our capability to the right task.
424     {
425         Task *bound = t->bound;
426       
427         if (bound) {
428             if (bound == task) {
429                 debugTrace(DEBUG_sched,
430                            "### Running thread %lu in bound thread", (unsigned long)t->id);
431                 // yes, the Haskell thread is bound to the current native thread
432             } else {
433                 debugTrace(DEBUG_sched,
434                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
435                 // no, bound to a different Haskell thread: pass to that thread
436                 pushOnRunQueue(cap,t);
437                 continue;
438             }
439         } else {
440             // The thread we want to run is unbound.
441             if (task->tso) { 
442                 debugTrace(DEBUG_sched,
443                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
444                 // no, the current native thread is bound to a different
445                 // Haskell thread, so pass it to any worker thread
446                 pushOnRunQueue(cap,t);
447                 continue; 
448             }
449         }
450     }
451 #endif
452
453     /* context switches are initiated by the timer signal, unless
454      * the user specified "context switch as often as possible", with
455      * +RTS -C0
456      */
457     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
458         && !emptyThreadQueues(cap)) {
459         cap->context_switch = 1;
460     }
461          
462 run_thread:
463
464     // CurrentTSO is the thread to run.  t might be different if we
465     // loop back to run_thread, so make sure to set CurrentTSO after
466     // that.
467     cap->r.rCurrentTSO = t;
468
469     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
470                               (long)t->id, whatNext_strs[t->what_next]);
471
472     startHeapProfTimer();
473
474     // Check for exceptions blocked on this thread
475     maybePerformBlockedException (cap, t);
476
477     // ----------------------------------------------------------------------
478     // Run the current thread 
479
480     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
481     ASSERT(t->cap == cap);
482     ASSERT(t->bound ? t->bound->cap == cap : 1);
483
484     prev_what_next = t->what_next;
485
486     errno = t->saved_errno;
487 #if mingw32_HOST_OS
488     SetLastError(t->saved_winerror);
489 #endif
490
491     cap->in_haskell = rtsTrue;
492
493     dirty_TSO(cap,t);
494
495 #if defined(THREADED_RTS)
496     if (recent_activity == ACTIVITY_DONE_GC) {
497         // ACTIVITY_DONE_GC means we turned off the timer signal to
498         // conserve power (see #1623).  Re-enable it here.
499         nat prev;
500         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
501         if (prev == ACTIVITY_DONE_GC) {
502             startTimer();
503         }
504     } else {
505         recent_activity = ACTIVITY_YES;
506     }
507 #endif
508
509     switch (prev_what_next) {
510         
511     case ThreadKilled:
512     case ThreadComplete:
513         /* Thread already finished, return to scheduler. */
514         ret = ThreadFinished;
515         break;
516         
517     case ThreadRunGHC:
518     {
519         StgRegTable *r;
520         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
521         cap = regTableToCapability(r);
522         ret = r->rRet;
523         break;
524     }
525     
526     case ThreadInterpret:
527         cap = interpretBCO(cap);
528         ret = cap->r.rRet;
529         break;
530         
531     default:
532         barf("schedule: invalid what_next field");
533     }
534
535     cap->in_haskell = rtsFalse;
536
537     // The TSO might have moved, eg. if it re-entered the RTS and a GC
538     // happened.  So find the new location:
539     t = cap->r.rCurrentTSO;
540
541     // We have run some Haskell code: there might be blackhole-blocked
542     // threads to wake up now.
543     // Lock-free test here should be ok, we're just setting a flag.
544     if ( blackhole_queue != END_TSO_QUEUE ) {
545         blackholes_need_checking = rtsTrue;
546     }
547     
548     // And save the current errno in this thread.
549     // XXX: possibly bogus for SMP because this thread might already
550     // be running again, see code below.
551     t->saved_errno = errno;
552 #if mingw32_HOST_OS
553     // Similarly for Windows error code
554     t->saved_winerror = GetLastError();
555 #endif
556
557 #if defined(THREADED_RTS)
558     // If ret is ThreadBlocked, and this Task is bound to the TSO that
559     // blocked, we are in limbo - the TSO is now owned by whatever it
560     // is blocked on, and may in fact already have been woken up,
561     // perhaps even on a different Capability.  It may be the case
562     // that task->cap != cap.  We better yield this Capability
563     // immediately and return to normaility.
564     if (ret == ThreadBlocked) {
565         debugTrace(DEBUG_sched,
566                    "--<< thread %lu (%s) stopped: blocked",
567                    (unsigned long)t->id, whatNext_strs[t->what_next]);
568         continue;
569     }
570 #endif
571
572     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
573     ASSERT(t->cap == cap);
574
575     // ----------------------------------------------------------------------
576     
577     // Costs for the scheduler are assigned to CCS_SYSTEM
578     stopHeapProfTimer();
579 #if defined(PROFILING)
580     CCCS = CCS_SYSTEM;
581 #endif
582     
583     schedulePostRunThread(cap,t);
584
585     t = threadStackUnderflow(task,t);
586
587     ready_to_gc = rtsFalse;
588
589     switch (ret) {
590     case HeapOverflow:
591         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
592         break;
593
594     case StackOverflow:
595         scheduleHandleStackOverflow(cap,task,t);
596         break;
597
598     case ThreadYielding:
599         if (scheduleHandleYield(cap, t, prev_what_next)) {
600             // shortcut for switching between compiler/interpreter:
601             goto run_thread; 
602         }
603         break;
604
605     case ThreadBlocked:
606         scheduleHandleThreadBlocked(t);
607         break;
608
609     case ThreadFinished:
610         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
611         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
612         break;
613
614     default:
615       barf("schedule: invalid thread return code %d", (int)ret);
616     }
617
618     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
619       cap = scheduleDoGC(cap,task,rtsFalse);
620     }
621   } /* end of while() */
622 }
623
624 /* ----------------------------------------------------------------------------
625  * Setting up the scheduler loop
626  * ------------------------------------------------------------------------- */
627
628 static void
629 schedulePreLoop(void)
630 {
631   // initialisation for scheduler - what cannot go into initScheduler()  
632 }
633
634 /* -----------------------------------------------------------------------------
635  * scheduleFindWork()
636  *
637  * Search for work to do, and handle messages from elsewhere.
638  * -------------------------------------------------------------------------- */
639
640 static void
641 scheduleFindWork (Capability *cap)
642 {
643     scheduleStartSignalHandlers(cap);
644
645     // Only check the black holes here if we've nothing else to do.
646     // During normal execution, the black hole list only gets checked
647     // at GC time, to avoid repeatedly traversing this possibly long
648     // list each time around the scheduler.
649     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
650
651     scheduleCheckWakeupThreads(cap);
652
653     scheduleCheckBlockedThreads(cap);
654
655 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
656     // Try to activate one of our own sparks
657     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
658 #endif
659
660 #if defined(THREADED_RTS)
661     // Try to steak work if we don't have any
662     if (emptyRunQueue(cap)) { stealWork(cap); }
663 #endif
664     
665 #if defined(PARALLEL_HASKELL)
666     // if messages have been buffered...
667     scheduleSendPendingMessages();
668 #endif
669
670 #if defined(PARALLEL_HASKELL)
671     if (emptyRunQueue(cap)) {
672         receivedFinish = scheduleGetRemoteWork(cap);
673         continue; //  a new round, (hopefully) with new work
674         /* 
675            in GUM, this a) sends out a FISH and returns IF no fish is
676                            out already
677                         b) (blocking) awaits and receives messages
678            
679            in Eden, this is only the blocking receive, as b) in GUM.
680         */
681     }
682 #endif
683 }
684
685 #if defined(THREADED_RTS)
686 STATIC_INLINE rtsBool
687 shouldYieldCapability (Capability *cap, Task *task)
688 {
689     // we need to yield this capability to someone else if..
690     //   - another thread is initiating a GC
691     //   - another Task is returning from a foreign call
692     //   - the thread at the head of the run queue cannot be run
693     //     by this Task (it is bound to another Task, or it is unbound
694     //     and this task it bound).
695     return (waiting_for_gc || 
696             cap->returning_tasks_hd != NULL ||
697             (!emptyRunQueue(cap) && (task->tso == NULL
698                                      ? cap->run_queue_hd->bound != NULL
699                                      : cap->run_queue_hd->bound != task)));
700 }
701
702 // This is the single place where a Task goes to sleep.  There are
703 // two reasons it might need to sleep:
704 //    - there are no threads to run
705 //    - we need to yield this Capability to someone else 
706 //      (see shouldYieldCapability())
707 //
708 // The return value indicates whether 
709
710 static void
711 scheduleYield (Capability **pcap, Task *task)
712 {
713     Capability *cap = *pcap;
714
715     // if we have work, and we don't need to give up the Capability, continue.
716     if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
717         return;
718
719     // otherwise yield (sleep), and keep yielding if necessary.
720     do {
721         yieldCapability(&cap,task);
722     } 
723     while (shouldYieldCapability(cap,task));
724
725     // note there may still be no threads on the run queue at this
726     // point, the caller has to check.
727
728     *pcap = cap;
729     return;
730 }
731 #endif
732     
733 /* -----------------------------------------------------------------------------
734  * schedulePushWork()
735  *
736  * Push work to other Capabilities if we have some.
737  * -------------------------------------------------------------------------- */
738
739 static void
740 schedulePushWork(Capability *cap USED_IF_THREADS, 
741                  Task *task      USED_IF_THREADS)
742 {
743   /* following code not for PARALLEL_HASKELL. I kept the call general,
744      future GUM versions might use pushing in a distributed setup */
745 #if defined(THREADED_RTS)
746
747     Capability *free_caps[n_capabilities], *cap0;
748     nat i, n_free_caps;
749
750     // migration can be turned off with +RTS -qg
751     if (!RtsFlags.ParFlags.migrate) return;
752
753     // Check whether we have more threads on our run queue, or sparks
754     // in our pool, that we could hand to another Capability.
755     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
756         && sparkPoolSizeCap(cap) < 2) {
757         return;
758     }
759
760     // First grab as many free Capabilities as we can.
761     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
762         cap0 = &capabilities[i];
763         if (cap != cap0 && tryGrabCapability(cap0,task)) {
764             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
765                 // it already has some work, we just grabbed it at 
766                 // the wrong moment.  Or maybe it's deadlocked!
767                 releaseCapability(cap0);
768             } else {
769                 free_caps[n_free_caps++] = cap0;
770             }
771         }
772     }
773
774     // we now have n_free_caps free capabilities stashed in
775     // free_caps[].  Share our run queue equally with them.  This is
776     // probably the simplest thing we could do; improvements we might
777     // want to do include:
778     //
779     //   - giving high priority to moving relatively new threads, on 
780     //     the gournds that they haven't had time to build up a
781     //     working set in the cache on this CPU/Capability.
782     //
783     //   - giving low priority to moving long-lived threads
784
785     if (n_free_caps > 0) {
786         StgTSO *prev, *t, *next;
787         rtsBool pushed_to_all;
788
789         debugTrace(DEBUG_sched, 
790                    "cap %d: %s and %d free capabilities, sharing...", 
791                    cap->no, 
792                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
793                    "excess threads on run queue":"sparks to share (>=2)",
794                    n_free_caps);
795
796         i = 0;
797         pushed_to_all = rtsFalse;
798
799         if (cap->run_queue_hd != END_TSO_QUEUE) {
800             prev = cap->run_queue_hd;
801             t = prev->_link;
802             prev->_link = END_TSO_QUEUE;
803             for (; t != END_TSO_QUEUE; t = next) {
804                 next = t->_link;
805                 t->_link = END_TSO_QUEUE;
806                 if (t->what_next == ThreadRelocated
807                     || t->bound == task // don't move my bound thread
808                     || tsoLocked(t)) {  // don't move a locked thread
809                     setTSOLink(cap, prev, t);
810                     prev = t;
811                 } else if (i == n_free_caps) {
812                     pushed_to_all = rtsTrue;
813                     i = 0;
814                     // keep one for us
815                     setTSOLink(cap, prev, t);
816                     prev = t;
817                 } else {
818                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
819                     appendToRunQueue(free_caps[i],t);
820                     if (t->bound) { t->bound->cap = free_caps[i]; }
821                     t->cap = free_caps[i];
822                     i++;
823                 }
824             }
825             cap->run_queue_tl = prev;
826         }
827
828 #ifdef SPARK_PUSHING
829         /* JB I left this code in place, it would work but is not necessary */
830
831         // If there are some free capabilities that we didn't push any
832         // threads to, then try to push a spark to each one.
833         if (!pushed_to_all) {
834             StgClosure *spark;
835             // i is the next free capability to push to
836             for (; i < n_free_caps; i++) {
837                 if (emptySparkPoolCap(free_caps[i])) {
838                     spark = tryStealSpark(cap->sparks);
839                     if (spark != NULL) {
840                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
841                         newSpark(&(free_caps[i]->r), spark);
842                     }
843                 }
844             }
845         }
846 #endif /* SPARK_PUSHING */
847
848         // release the capabilities
849         for (i = 0; i < n_free_caps; i++) {
850             task->cap = free_caps[i];
851             releaseAndWakeupCapability(free_caps[i]);
852         }
853     }
854     task->cap = cap; // reset to point to our Capability.
855
856 #endif /* THREADED_RTS */
857
858 }
859
860 /* ----------------------------------------------------------------------------
861  * Start any pending signal handlers
862  * ------------------------------------------------------------------------- */
863
864 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
865 static void
866 scheduleStartSignalHandlers(Capability *cap)
867 {
868     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
869         // safe outside the lock
870         startSignalHandlers(cap);
871     }
872 }
873 #else
874 static void
875 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
876 {
877 }
878 #endif
879
880 /* ----------------------------------------------------------------------------
881  * Check for blocked threads that can be woken up.
882  * ------------------------------------------------------------------------- */
883
884 static void
885 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
886 {
887 #if !defined(THREADED_RTS)
888     //
889     // Check whether any waiting threads need to be woken up.  If the
890     // run queue is empty, and there are no other tasks running, we
891     // can wait indefinitely for something to happen.
892     //
893     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
894     {
895         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
896     }
897 #endif
898 }
899
900
901 /* ----------------------------------------------------------------------------
902  * Check for threads woken up by other Capabilities
903  * ------------------------------------------------------------------------- */
904
905 static void
906 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
907 {
908 #if defined(THREADED_RTS)
909     // Any threads that were woken up by other Capabilities get
910     // appended to our run queue.
911     if (!emptyWakeupQueue(cap)) {
912         ACQUIRE_LOCK(&cap->lock);
913         if (emptyRunQueue(cap)) {
914             cap->run_queue_hd = cap->wakeup_queue_hd;
915             cap->run_queue_tl = cap->wakeup_queue_tl;
916         } else {
917             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
918             cap->run_queue_tl = cap->wakeup_queue_tl;
919         }
920         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
921         RELEASE_LOCK(&cap->lock);
922     }
923 #endif
924 }
925
926 /* ----------------------------------------------------------------------------
927  * Check for threads blocked on BLACKHOLEs that can be woken up
928  * ------------------------------------------------------------------------- */
929 static void
930 scheduleCheckBlackHoles (Capability *cap)
931 {
932     if ( blackholes_need_checking ) // check without the lock first
933     {
934         ACQUIRE_LOCK(&sched_mutex);
935         if ( blackholes_need_checking ) {
936             checkBlackHoles(cap);
937             blackholes_need_checking = rtsFalse;
938         }
939         RELEASE_LOCK(&sched_mutex);
940     }
941 }
942
943 /* ----------------------------------------------------------------------------
944  * Detect deadlock conditions and attempt to resolve them.
945  * ------------------------------------------------------------------------- */
946
947 static void
948 scheduleDetectDeadlock (Capability *cap, Task *task)
949 {
950
951 #if defined(PARALLEL_HASKELL)
952     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
953     return;
954 #endif
955
956     /* 
957      * Detect deadlock: when we have no threads to run, there are no
958      * threads blocked, waiting for I/O, or sleeping, and all the
959      * other tasks are waiting for work, we must have a deadlock of
960      * some description.
961      */
962     if ( emptyThreadQueues(cap) )
963     {
964 #if defined(THREADED_RTS)
965         /* 
966          * In the threaded RTS, we only check for deadlock if there
967          * has been no activity in a complete timeslice.  This means
968          * we won't eagerly start a full GC just because we don't have
969          * any threads to run currently.
970          */
971         if (recent_activity != ACTIVITY_INACTIVE) return;
972 #endif
973
974         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
975
976         // Garbage collection can release some new threads due to
977         // either (a) finalizers or (b) threads resurrected because
978         // they are unreachable and will therefore be sent an
979         // exception.  Any threads thus released will be immediately
980         // runnable.
981         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
982
983         recent_activity = ACTIVITY_DONE_GC;
984         // disable timer signals (see #1623)
985         stopTimer();
986         
987         if ( !emptyRunQueue(cap) ) return;
988
989 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
990         /* If we have user-installed signal handlers, then wait
991          * for signals to arrive rather then bombing out with a
992          * deadlock.
993          */
994         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
995             debugTrace(DEBUG_sched,
996                        "still deadlocked, waiting for signals...");
997
998             awaitUserSignals();
999
1000             if (signals_pending()) {
1001                 startSignalHandlers(cap);
1002             }
1003
1004             // either we have threads to run, or we were interrupted:
1005             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1006
1007             return;
1008         }
1009 #endif
1010
1011 #if !defined(THREADED_RTS)
1012         /* Probably a real deadlock.  Send the current main thread the
1013          * Deadlock exception.
1014          */
1015         if (task->tso) {
1016             switch (task->tso->why_blocked) {
1017             case BlockedOnSTM:
1018             case BlockedOnBlackHole:
1019             case BlockedOnException:
1020             case BlockedOnMVar:
1021                 throwToSingleThreaded(cap, task->tso, 
1022                                       (StgClosure *)nonTermination_closure);
1023                 return;
1024             default:
1025                 barf("deadlock: main thread blocked in a strange way");
1026             }
1027         }
1028         return;
1029 #endif
1030     }
1031 }
1032
1033
1034 /* ----------------------------------------------------------------------------
1035  * Send pending messages (PARALLEL_HASKELL only)
1036  * ------------------------------------------------------------------------- */
1037
1038 #if defined(PARALLEL_HASKELL)
1039 static void
1040 scheduleSendPendingMessages(void)
1041 {
1042
1043 # if defined(PAR) // global Mem.Mgmt., omit for now
1044     if (PendingFetches != END_BF_QUEUE) {
1045         processFetches();
1046     }
1047 # endif
1048     
1049     if (RtsFlags.ParFlags.BufferTime) {
1050         // if we use message buffering, we must send away all message
1051         // packets which have become too old...
1052         sendOldBuffers(); 
1053     }
1054 }
1055 #endif
1056
1057 /* ----------------------------------------------------------------------------
1058  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1059  * ------------------------------------------------------------------------- */
1060
1061 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1062 static void
1063 scheduleActivateSpark(Capability *cap)
1064 {
1065     StgClosure *spark;
1066
1067 /* We only want to stay here if the run queue is empty and we want some
1068    work. We try to turn a spark into a thread, and add it to the run
1069    queue, from where it will be picked up in the next iteration of the
1070    scheduler loop.  
1071 */
1072     if (!emptyRunQueue(cap)) 
1073       /* In the threaded RTS, another task might have pushed a thread
1074          on our run queue in the meantime ? But would need a lock.. */
1075       return;
1076
1077  
1078     // Really we should be using reclaimSpark() here, but
1079     // experimentally it doesn't seem to perform as well as just
1080     // stealing from our own spark pool:
1081     // spark = reclaimSpark(cap->sparks);
1082     spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1083
1084     if (spark != NULL) {
1085       debugTrace(DEBUG_sched,
1086                  "turning spark of closure %p into a thread",
1087                  (StgClosure *)spark);
1088       createSparkThread(cap,spark); // defined in Sparks.c
1089     }
1090 }
1091 #endif // PARALLEL_HASKELL || THREADED_RTS
1092
1093 /* ----------------------------------------------------------------------------
1094  * Get work from a remote node (PARALLEL_HASKELL only)
1095  * ------------------------------------------------------------------------- */
1096     
1097 #if defined(PARALLEL_HASKELL)
1098 static rtsBool /* return value used in PARALLEL_HASKELL only */
1099 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1100 {
1101 #if defined(PARALLEL_HASKELL)
1102   rtsBool receivedFinish = rtsFalse;
1103
1104   // idle() , i.e. send all buffers, wait for work
1105   if (RtsFlags.ParFlags.BufferTime) {
1106         IF_PAR_DEBUG(verbose, 
1107                 debugBelch("...send all pending data,"));
1108         {
1109           nat i;
1110           for (i=1; i<=nPEs; i++)
1111             sendImmediately(i); // send all messages away immediately
1112         }
1113   }
1114
1115   /* this would be the place for fishing in GUM... 
1116
1117      if (no-earlier-fish-around) 
1118           sendFish(choosePe());
1119    */
1120
1121   // Eden:just look for incoming messages (blocking receive)
1122   IF_PAR_DEBUG(verbose, 
1123                debugBelch("...wait for incoming messages...\n"));
1124   processMessages(cap, &receivedFinish); // blocking receive...
1125
1126
1127   return receivedFinish;
1128   // reenter scheduling look after having received something
1129
1130 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1131
1132   return rtsFalse; /* return value unused in THREADED_RTS */
1133
1134 #endif /* PARALLEL_HASKELL */
1135 }
1136 #endif // PARALLEL_HASKELL || THREADED_RTS
1137
1138 /* ----------------------------------------------------------------------------
1139  * After running a thread...
1140  * ------------------------------------------------------------------------- */
1141
1142 static void
1143 schedulePostRunThread (Capability *cap, StgTSO *t)
1144 {
1145     // We have to be able to catch transactions that are in an
1146     // infinite loop as a result of seeing an inconsistent view of
1147     // memory, e.g. 
1148     //
1149     //   atomically $ do
1150     //       [a,b] <- mapM readTVar [ta,tb]
1151     //       when (a == b) loop
1152     //
1153     // and a is never equal to b given a consistent view of memory.
1154     //
1155     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1156         if (!stmValidateNestOfTransactions (t -> trec)) {
1157             debugTrace(DEBUG_sched | DEBUG_stm,
1158                        "trec %p found wasting its time", t);
1159             
1160             // strip the stack back to the
1161             // ATOMICALLY_FRAME, aborting the (nested)
1162             // transaction, and saving the stack of any
1163             // partially-evaluated thunks on the heap.
1164             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1165             
1166             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1167         }
1168     }
1169
1170   /* some statistics gathering in the parallel case */
1171 }
1172
1173 /* -----------------------------------------------------------------------------
1174  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1175  * -------------------------------------------------------------------------- */
1176
1177 static rtsBool
1178 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1179 {
1180     // did the task ask for a large block?
1181     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1182         // if so, get one and push it on the front of the nursery.
1183         bdescr *bd;
1184         lnat blocks;
1185         
1186         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1187         
1188         debugTrace(DEBUG_sched,
1189                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1190                    (long)t->id, whatNext_strs[t->what_next], blocks);
1191     
1192         // don't do this if the nursery is (nearly) full, we'll GC first.
1193         if (cap->r.rCurrentNursery->link != NULL ||
1194             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1195                                                // if the nursery has only one block.
1196             
1197             ACQUIRE_SM_LOCK
1198             bd = allocGroup( blocks );
1199             RELEASE_SM_LOCK
1200             cap->r.rNursery->n_blocks += blocks;
1201             
1202             // link the new group into the list
1203             bd->link = cap->r.rCurrentNursery;
1204             bd->u.back = cap->r.rCurrentNursery->u.back;
1205             if (cap->r.rCurrentNursery->u.back != NULL) {
1206                 cap->r.rCurrentNursery->u.back->link = bd;
1207             } else {
1208 #if !defined(THREADED_RTS)
1209                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1210                        g0s0 == cap->r.rNursery);
1211 #endif
1212                 cap->r.rNursery->blocks = bd;
1213             }             
1214             cap->r.rCurrentNursery->u.back = bd;
1215             
1216             // initialise it as a nursery block.  We initialise the
1217             // step, gen_no, and flags field of *every* sub-block in
1218             // this large block, because this is easier than making
1219             // sure that we always find the block head of a large
1220             // block whenever we call Bdescr() (eg. evacuate() and
1221             // isAlive() in the GC would both have to do this, at
1222             // least).
1223             { 
1224                 bdescr *x;
1225                 for (x = bd; x < bd + blocks; x++) {
1226                     x->step = cap->r.rNursery;
1227                     x->gen_no = 0;
1228                     x->flags = 0;
1229                 }
1230             }
1231             
1232             // This assert can be a killer if the app is doing lots
1233             // of large block allocations.
1234             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1235             
1236             // now update the nursery to point to the new block
1237             cap->r.rCurrentNursery = bd;
1238             
1239             // we might be unlucky and have another thread get on the
1240             // run queue before us and steal the large block, but in that
1241             // case the thread will just end up requesting another large
1242             // block.
1243             pushOnRunQueue(cap,t);
1244             return rtsFalse;  /* not actually GC'ing */
1245         }
1246     }
1247     
1248     debugTrace(DEBUG_sched,
1249                "--<< thread %ld (%s) stopped: HeapOverflow",
1250                (long)t->id, whatNext_strs[t->what_next]);
1251
1252     if (cap->context_switch) {
1253         // Sometimes we miss a context switch, e.g. when calling
1254         // primitives in a tight loop, MAYBE_GC() doesn't check the
1255         // context switch flag, and we end up waiting for a GC.
1256         // See #1984, and concurrent/should_run/1984
1257         cap->context_switch = 0;
1258         addToRunQueue(cap,t);
1259     } else {
1260         pushOnRunQueue(cap,t);
1261     }
1262     return rtsTrue;
1263     /* actual GC is done at the end of the while loop in schedule() */
1264 }
1265
1266 /* -----------------------------------------------------------------------------
1267  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1268  * -------------------------------------------------------------------------- */
1269
1270 static void
1271 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1272 {
1273     debugTrace (DEBUG_sched,
1274                 "--<< thread %ld (%s) stopped, StackOverflow", 
1275                 (long)t->id, whatNext_strs[t->what_next]);
1276
1277     /* just adjust the stack for this thread, then pop it back
1278      * on the run queue.
1279      */
1280     { 
1281         /* enlarge the stack */
1282         StgTSO *new_t = threadStackOverflow(cap, t);
1283         
1284         /* The TSO attached to this Task may have moved, so update the
1285          * pointer to it.
1286          */
1287         if (task->tso == t) {
1288             task->tso = new_t;
1289         }
1290         pushOnRunQueue(cap,new_t);
1291     }
1292 }
1293
1294 /* -----------------------------------------------------------------------------
1295  * Handle a thread that returned to the scheduler with ThreadYielding
1296  * -------------------------------------------------------------------------- */
1297
1298 static rtsBool
1299 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1300 {
1301     // Reset the context switch flag.  We don't do this just before
1302     // running the thread, because that would mean we would lose ticks
1303     // during GC, which can lead to unfair scheduling (a thread hogs
1304     // the CPU because the tick always arrives during GC).  This way
1305     // penalises threads that do a lot of allocation, but that seems
1306     // better than the alternative.
1307     cap->context_switch = 0;
1308     
1309     /* put the thread back on the run queue.  Then, if we're ready to
1310      * GC, check whether this is the last task to stop.  If so, wake
1311      * up the GC thread.  getThread will block during a GC until the
1312      * GC is finished.
1313      */
1314 #ifdef DEBUG
1315     if (t->what_next != prev_what_next) {
1316         debugTrace(DEBUG_sched,
1317                    "--<< thread %ld (%s) stopped to switch evaluators", 
1318                    (long)t->id, whatNext_strs[t->what_next]);
1319     } else {
1320         debugTrace(DEBUG_sched,
1321                    "--<< thread %ld (%s) stopped, yielding",
1322                    (long)t->id, whatNext_strs[t->what_next]);
1323     }
1324 #endif
1325     
1326     IF_DEBUG(sanity,
1327              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1328              checkTSO(t));
1329     ASSERT(t->_link == END_TSO_QUEUE);
1330     
1331     // Shortcut if we're just switching evaluators: don't bother
1332     // doing stack squeezing (which can be expensive), just run the
1333     // thread.
1334     if (t->what_next != prev_what_next) {
1335         return rtsTrue;
1336     }
1337
1338     addToRunQueue(cap,t);
1339
1340     return rtsFalse;
1341 }
1342
1343 /* -----------------------------------------------------------------------------
1344  * Handle a thread that returned to the scheduler with ThreadBlocked
1345  * -------------------------------------------------------------------------- */
1346
1347 static void
1348 scheduleHandleThreadBlocked( StgTSO *t
1349 #if !defined(GRAN) && !defined(DEBUG)
1350     STG_UNUSED
1351 #endif
1352     )
1353 {
1354
1355       // We don't need to do anything.  The thread is blocked, and it
1356       // has tidied up its stack and placed itself on whatever queue
1357       // it needs to be on.
1358
1359     // ASSERT(t->why_blocked != NotBlocked);
1360     // Not true: for example,
1361     //    - in THREADED_RTS, the thread may already have been woken
1362     //      up by another Capability.  This actually happens: try
1363     //      conc023 +RTS -N2.
1364     //    - the thread may have woken itself up already, because
1365     //      threadPaused() might have raised a blocked throwTo
1366     //      exception, see maybePerformBlockedException().
1367
1368 #ifdef DEBUG
1369     if (traceClass(DEBUG_sched)) {
1370         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1371                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1372         printThreadBlockage(t);
1373         debugTraceEnd();
1374     }
1375 #endif
1376 }
1377
1378 /* -----------------------------------------------------------------------------
1379  * Handle a thread that returned to the scheduler with ThreadFinished
1380  * -------------------------------------------------------------------------- */
1381
1382 static rtsBool
1383 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1384 {
1385     /* Need to check whether this was a main thread, and if so,
1386      * return with the return value.
1387      *
1388      * We also end up here if the thread kills itself with an
1389      * uncaught exception, see Exception.cmm.
1390      */
1391     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1392                (unsigned long)t->id, whatNext_strs[t->what_next]);
1393
1394       //
1395       // Check whether the thread that just completed was a bound
1396       // thread, and if so return with the result.  
1397       //
1398       // There is an assumption here that all thread completion goes
1399       // through this point; we need to make sure that if a thread
1400       // ends up in the ThreadKilled state, that it stays on the run
1401       // queue so it can be dealt with here.
1402       //
1403
1404       if (t->bound) {
1405
1406           if (t->bound != task) {
1407 #if !defined(THREADED_RTS)
1408               // Must be a bound thread that is not the topmost one.  Leave
1409               // it on the run queue until the stack has unwound to the
1410               // point where we can deal with this.  Leaving it on the run
1411               // queue also ensures that the garbage collector knows about
1412               // this thread and its return value (it gets dropped from the
1413               // step->threads list so there's no other way to find it).
1414               appendToRunQueue(cap,t);
1415               return rtsFalse;
1416 #else
1417               // this cannot happen in the threaded RTS, because a
1418               // bound thread can only be run by the appropriate Task.
1419               barf("finished bound thread that isn't mine");
1420 #endif
1421           }
1422
1423           ASSERT(task->tso == t);
1424
1425           if (t->what_next == ThreadComplete) {
1426               if (task->ret) {
1427                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1428                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1429               }
1430               task->stat = Success;
1431           } else {
1432               if (task->ret) {
1433                   *(task->ret) = NULL;
1434               }
1435               if (sched_state >= SCHED_INTERRUPTING) {
1436                   task->stat = Interrupted;
1437               } else {
1438                   task->stat = Killed;
1439               }
1440           }
1441 #ifdef DEBUG
1442           removeThreadLabel((StgWord)task->tso->id);
1443 #endif
1444           return rtsTrue; // tells schedule() to return
1445       }
1446
1447       return rtsFalse;
1448 }
1449
1450 /* -----------------------------------------------------------------------------
1451  * Perform a heap census
1452  * -------------------------------------------------------------------------- */
1453
1454 static rtsBool
1455 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1456 {
1457     // When we have +RTS -i0 and we're heap profiling, do a census at
1458     // every GC.  This lets us get repeatable runs for debugging.
1459     if (performHeapProfile ||
1460         (RtsFlags.ProfFlags.profileInterval==0 &&
1461          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1462         return rtsTrue;
1463     } else {
1464         return rtsFalse;
1465     }
1466 }
1467
1468 /* -----------------------------------------------------------------------------
1469  * Perform a garbage collection if necessary
1470  * -------------------------------------------------------------------------- */
1471
1472 static Capability *
1473 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1474 {
1475     rtsBool heap_census;
1476 #ifdef THREADED_RTS
1477     /* extern static volatile StgWord waiting_for_gc; 
1478        lives inside capability.c */
1479     rtsBool was_waiting;
1480     nat i;
1481 #endif
1482
1483 #ifdef THREADED_RTS
1484     // In order to GC, there must be no threads running Haskell code.
1485     // Therefore, the GC thread needs to hold *all* the capabilities,
1486     // and release them after the GC has completed.  
1487     //
1488     // This seems to be the simplest way: previous attempts involved
1489     // making all the threads with capabilities give up their
1490     // capabilities and sleep except for the *last* one, which
1491     // actually did the GC.  But it's quite hard to arrange for all
1492     // the other tasks to sleep and stay asleep.
1493     //
1494         
1495     /*  Other capabilities are prevented from running yet more Haskell
1496         threads if waiting_for_gc is set. Tested inside
1497         yieldCapability() and releaseCapability() in Capability.c */
1498
1499     was_waiting = cas(&waiting_for_gc, 0, 1);
1500     if (was_waiting) {
1501         do {
1502             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1503             if (cap) yieldCapability(&cap,task);
1504         } while (waiting_for_gc);
1505         return cap;  // NOTE: task->cap might have changed here
1506     }
1507
1508     setContextSwitches();
1509     for (i=0; i < n_capabilities; i++) {
1510         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1511         if (cap != &capabilities[i]) {
1512             Capability *pcap = &capabilities[i];
1513             // we better hope this task doesn't get migrated to
1514             // another Capability while we're waiting for this one.
1515             // It won't, because load balancing happens while we have
1516             // all the Capabilities, but even so it's a slightly
1517             // unsavoury invariant.
1518             task->cap = pcap;
1519             waitForReturnCapability(&pcap, task);
1520             if (pcap != &capabilities[i]) {
1521                 barf("scheduleDoGC: got the wrong capability");
1522             }
1523         }
1524     }
1525
1526     waiting_for_gc = rtsFalse;
1527 #endif
1528
1529     // so this happens periodically:
1530     if (cap) scheduleCheckBlackHoles(cap);
1531     
1532     IF_DEBUG(scheduler, printAllThreads());
1533
1534     /*
1535      * We now have all the capabilities; if we're in an interrupting
1536      * state, then we should take the opportunity to delete all the
1537      * threads in the system.
1538      */
1539     if (sched_state >= SCHED_INTERRUPTING) {
1540         deleteAllThreads(&capabilities[0]);
1541         sched_state = SCHED_SHUTTING_DOWN;
1542     }
1543     
1544     heap_census = scheduleNeedHeapProfile(rtsTrue);
1545
1546     /* everybody back, start the GC.
1547      * Could do it in this thread, or signal a condition var
1548      * to do it in another thread.  Either way, we need to
1549      * broadcast on gc_pending_cond afterward.
1550      */
1551 #if defined(THREADED_RTS)
1552     debugTrace(DEBUG_sched, "doing GC");
1553 #endif
1554     GarbageCollect(force_major || heap_census);
1555     
1556     if (heap_census) {
1557         debugTrace(DEBUG_sched, "performing heap census");
1558         heapCensus();
1559         performHeapProfile = rtsFalse;
1560     }
1561
1562 #ifdef SPARKBALANCE
1563     /* JB 
1564        Once we are all together... this would be the place to balance all
1565        spark pools. No concurrent stealing or adding of new sparks can
1566        occur. Should be defined in Sparks.c. */
1567     balanceSparkPoolsCaps(n_capabilities, capabilities);
1568 #endif
1569
1570 #if defined(THREADED_RTS)
1571     // release our stash of capabilities.
1572     for (i = 0; i < n_capabilities; i++) {
1573         if (cap != &capabilities[i]) {
1574             task->cap = &capabilities[i];
1575             releaseCapability(&capabilities[i]);
1576         }
1577     }
1578     if (cap) {
1579         task->cap = cap;
1580     } else {
1581         task->cap = NULL;
1582     }
1583 #endif
1584
1585     return cap;
1586 }
1587
1588 /* ---------------------------------------------------------------------------
1589  * Singleton fork(). Do not copy any running threads.
1590  * ------------------------------------------------------------------------- */
1591
1592 pid_t
1593 forkProcess(HsStablePtr *entry
1594 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1595             STG_UNUSED
1596 #endif
1597            )
1598 {
1599 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1600     Task *task;
1601     pid_t pid;
1602     StgTSO* t,*next;
1603     Capability *cap;
1604     nat s;
1605     
1606 #if defined(THREADED_RTS)
1607     if (RtsFlags.ParFlags.nNodes > 1) {
1608         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1609         stg_exit(EXIT_FAILURE);
1610     }
1611 #endif
1612
1613     debugTrace(DEBUG_sched, "forking!");
1614     
1615     // ToDo: for SMP, we should probably acquire *all* the capabilities
1616     cap = rts_lock();
1617     
1618     // no funny business: hold locks while we fork, otherwise if some
1619     // other thread is holding a lock when the fork happens, the data
1620     // structure protected by the lock will forever be in an
1621     // inconsistent state in the child.  See also #1391.
1622     ACQUIRE_LOCK(&sched_mutex);
1623     ACQUIRE_LOCK(&cap->lock);
1624     ACQUIRE_LOCK(&cap->running_task->lock);
1625
1626     pid = fork();
1627     
1628     if (pid) { // parent
1629         
1630         RELEASE_LOCK(&sched_mutex);
1631         RELEASE_LOCK(&cap->lock);
1632         RELEASE_LOCK(&cap->running_task->lock);
1633
1634         // just return the pid
1635         rts_unlock(cap);
1636         return pid;
1637         
1638     } else { // child
1639         
1640 #if defined(THREADED_RTS)
1641         initMutex(&sched_mutex);
1642         initMutex(&cap->lock);
1643         initMutex(&cap->running_task->lock);
1644 #endif
1645
1646         // Now, all OS threads except the thread that forked are
1647         // stopped.  We need to stop all Haskell threads, including
1648         // those involved in foreign calls.  Also we need to delete
1649         // all Tasks, because they correspond to OS threads that are
1650         // now gone.
1651
1652         for (s = 0; s < total_steps; s++) {
1653           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1654             if (t->what_next == ThreadRelocated) {
1655                 next = t->_link;
1656             } else {
1657                 next = t->global_link;
1658                 // don't allow threads to catch the ThreadKilled
1659                 // exception, but we do want to raiseAsync() because these
1660                 // threads may be evaluating thunks that we need later.
1661                 deleteThread_(cap,t);
1662             }
1663           }
1664         }
1665         
1666         // Empty the run queue.  It seems tempting to let all the
1667         // killed threads stay on the run queue as zombies to be
1668         // cleaned up later, but some of them correspond to bound
1669         // threads for which the corresponding Task does not exist.
1670         cap->run_queue_hd = END_TSO_QUEUE;
1671         cap->run_queue_tl = END_TSO_QUEUE;
1672
1673         // Any suspended C-calling Tasks are no more, their OS threads
1674         // don't exist now:
1675         cap->suspended_ccalling_tasks = NULL;
1676
1677         // Empty the threads lists.  Otherwise, the garbage
1678         // collector may attempt to resurrect some of these threads.
1679         for (s = 0; s < total_steps; s++) {
1680             all_steps[s].threads = END_TSO_QUEUE;
1681         }
1682
1683         // Wipe the task list, except the current Task.
1684         ACQUIRE_LOCK(&sched_mutex);
1685         for (task = all_tasks; task != NULL; task=task->all_link) {
1686             if (task != cap->running_task) {
1687 #if defined(THREADED_RTS)
1688                 initMutex(&task->lock); // see #1391
1689 #endif
1690                 discardTask(task);
1691             }
1692         }
1693         RELEASE_LOCK(&sched_mutex);
1694
1695 #if defined(THREADED_RTS)
1696         // Wipe our spare workers list, they no longer exist.  New
1697         // workers will be created if necessary.
1698         cap->spare_workers = NULL;
1699         cap->returning_tasks_hd = NULL;
1700         cap->returning_tasks_tl = NULL;
1701 #endif
1702
1703         // On Unix, all timers are reset in the child, so we need to start
1704         // the timer again.
1705         initTimer();
1706         startTimer();
1707
1708         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1709         rts_checkSchedStatus("forkProcess",cap);
1710         
1711         rts_unlock(cap);
1712         hs_exit();                      // clean up and exit
1713         stg_exit(EXIT_SUCCESS);
1714     }
1715 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1716     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1717     return -1;
1718 #endif
1719 }
1720
1721 /* ---------------------------------------------------------------------------
1722  * Delete all the threads in the system
1723  * ------------------------------------------------------------------------- */
1724    
1725 static void
1726 deleteAllThreads ( Capability *cap )
1727 {
1728     // NOTE: only safe to call if we own all capabilities.
1729
1730     StgTSO* t, *next;
1731     nat s;
1732
1733     debugTrace(DEBUG_sched,"deleting all threads");
1734     for (s = 0; s < total_steps; s++) {
1735       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1736         if (t->what_next == ThreadRelocated) {
1737             next = t->_link;
1738         } else {
1739             next = t->global_link;
1740             deleteThread(cap,t);
1741         }
1742       }
1743     }      
1744
1745     // The run queue now contains a bunch of ThreadKilled threads.  We
1746     // must not throw these away: the main thread(s) will be in there
1747     // somewhere, and the main scheduler loop has to deal with it.
1748     // Also, the run queue is the only thing keeping these threads from
1749     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1750
1751 #if !defined(THREADED_RTS)
1752     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1753     ASSERT(sleeping_queue == END_TSO_QUEUE);
1754 #endif
1755 }
1756
1757 /* -----------------------------------------------------------------------------
1758    Managing the suspended_ccalling_tasks list.
1759    Locks required: sched_mutex
1760    -------------------------------------------------------------------------- */
1761
1762 STATIC_INLINE void
1763 suspendTask (Capability *cap, Task *task)
1764 {
1765     ASSERT(task->next == NULL && task->prev == NULL);
1766     task->next = cap->suspended_ccalling_tasks;
1767     task->prev = NULL;
1768     if (cap->suspended_ccalling_tasks) {
1769         cap->suspended_ccalling_tasks->prev = task;
1770     }
1771     cap->suspended_ccalling_tasks = task;
1772 }
1773
1774 STATIC_INLINE void
1775 recoverSuspendedTask (Capability *cap, Task *task)
1776 {
1777     if (task->prev) {
1778         task->prev->next = task->next;
1779     } else {
1780         ASSERT(cap->suspended_ccalling_tasks == task);
1781         cap->suspended_ccalling_tasks = task->next;
1782     }
1783     if (task->next) {
1784         task->next->prev = task->prev;
1785     }
1786     task->next = task->prev = NULL;
1787 }
1788
1789 /* ---------------------------------------------------------------------------
1790  * Suspending & resuming Haskell threads.
1791  * 
1792  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1793  * its capability before calling the C function.  This allows another
1794  * task to pick up the capability and carry on running Haskell
1795  * threads.  It also means that if the C call blocks, it won't lock
1796  * the whole system.
1797  *
1798  * The Haskell thread making the C call is put to sleep for the
1799  * duration of the call, on the susepended_ccalling_threads queue.  We
1800  * give out a token to the task, which it can use to resume the thread
1801  * on return from the C function.
1802  * ------------------------------------------------------------------------- */
1803    
1804 void *
1805 suspendThread (StgRegTable *reg)
1806 {
1807   Capability *cap;
1808   int saved_errno;
1809   StgTSO *tso;
1810   Task *task;
1811 #if mingw32_HOST_OS
1812   StgWord32 saved_winerror;
1813 #endif
1814
1815   saved_errno = errno;
1816 #if mingw32_HOST_OS
1817   saved_winerror = GetLastError();
1818 #endif
1819
1820   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1821    */
1822   cap = regTableToCapability(reg);
1823
1824   task = cap->running_task;
1825   tso = cap->r.rCurrentTSO;
1826
1827   debugTrace(DEBUG_sched, 
1828              "thread %lu did a safe foreign call", 
1829              (unsigned long)cap->r.rCurrentTSO->id);
1830
1831   // XXX this might not be necessary --SDM
1832   tso->what_next = ThreadRunGHC;
1833
1834   threadPaused(cap,tso);
1835
1836   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1837       tso->why_blocked = BlockedOnCCall;
1838       tso->flags |= TSO_BLOCKEX;
1839       tso->flags &= ~TSO_INTERRUPTIBLE;
1840   } else {
1841       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1842   }
1843
1844   // Hand back capability
1845   task->suspended_tso = tso;
1846
1847   ACQUIRE_LOCK(&cap->lock);
1848
1849   suspendTask(cap,task);
1850   cap->in_haskell = rtsFalse;
1851   releaseCapability_(cap,rtsFalse);
1852   
1853   RELEASE_LOCK(&cap->lock);
1854
1855 #if defined(THREADED_RTS)
1856   /* Preparing to leave the RTS, so ensure there's a native thread/task
1857      waiting to take over.
1858   */
1859   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1860 #endif
1861
1862   errno = saved_errno;
1863 #if mingw32_HOST_OS
1864   SetLastError(saved_winerror);
1865 #endif
1866   return task;
1867 }
1868
1869 StgRegTable *
1870 resumeThread (void *task_)
1871 {
1872     StgTSO *tso;
1873     Capability *cap;
1874     Task *task = task_;
1875     int saved_errno;
1876 #if mingw32_HOST_OS
1877     StgWord32 saved_winerror;
1878 #endif
1879
1880     saved_errno = errno;
1881 #if mingw32_HOST_OS
1882     saved_winerror = GetLastError();
1883 #endif
1884
1885     cap = task->cap;
1886     // Wait for permission to re-enter the RTS with the result.
1887     waitForReturnCapability(&cap,task);
1888     // we might be on a different capability now... but if so, our
1889     // entry on the suspended_ccalling_tasks list will also have been
1890     // migrated.
1891
1892     // Remove the thread from the suspended list
1893     recoverSuspendedTask(cap,task);
1894
1895     tso = task->suspended_tso;
1896     task->suspended_tso = NULL;
1897     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1898     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1899     
1900     if (tso->why_blocked == BlockedOnCCall) {
1901         awakenBlockedExceptionQueue(cap,tso);
1902         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1903     }
1904     
1905     /* Reset blocking status */
1906     tso->why_blocked  = NotBlocked;
1907     
1908     cap->r.rCurrentTSO = tso;
1909     cap->in_haskell = rtsTrue;
1910     errno = saved_errno;
1911 #if mingw32_HOST_OS
1912     SetLastError(saved_winerror);
1913 #endif
1914
1915     /* We might have GC'd, mark the TSO dirty again */
1916     dirty_TSO(cap,tso);
1917
1918     IF_DEBUG(sanity, checkTSO(tso));
1919
1920     return &cap->r;
1921 }
1922
1923 /* ---------------------------------------------------------------------------
1924  * scheduleThread()
1925  *
1926  * scheduleThread puts a thread on the end  of the runnable queue.
1927  * This will usually be done immediately after a thread is created.
1928  * The caller of scheduleThread must create the thread using e.g.
1929  * createThread and push an appropriate closure
1930  * on this thread's stack before the scheduler is invoked.
1931  * ------------------------------------------------------------------------ */
1932
1933 void
1934 scheduleThread(Capability *cap, StgTSO *tso)
1935 {
1936     // The thread goes at the *end* of the run-queue, to avoid possible
1937     // starvation of any threads already on the queue.
1938     appendToRunQueue(cap,tso);
1939 }
1940
1941 void
1942 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1943 {
1944 #if defined(THREADED_RTS)
1945     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1946                               // move this thread from now on.
1947     cpu %= RtsFlags.ParFlags.nNodes;
1948     if (cpu == cap->no) {
1949         appendToRunQueue(cap,tso);
1950     } else {
1951         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1952     }
1953 #else
1954     appendToRunQueue(cap,tso);
1955 #endif
1956 }
1957
1958 Capability *
1959 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1960 {
1961     Task *task;
1962
1963     // We already created/initialised the Task
1964     task = cap->running_task;
1965
1966     // This TSO is now a bound thread; make the Task and TSO
1967     // point to each other.
1968     tso->bound = task;
1969     tso->cap = cap;
1970
1971     task->tso = tso;
1972     task->ret = ret;
1973     task->stat = NoStatus;
1974
1975     appendToRunQueue(cap,tso);
1976
1977     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1978
1979     cap = schedule(cap,task);
1980
1981     ASSERT(task->stat != NoStatus);
1982     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1983
1984     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1985     return cap;
1986 }
1987
1988 /* ----------------------------------------------------------------------------
1989  * Starting Tasks
1990  * ------------------------------------------------------------------------- */
1991
1992 #if defined(THREADED_RTS)
1993 void OSThreadProcAttr
1994 workerStart(Task *task)
1995 {
1996     Capability *cap;
1997
1998     // See startWorkerTask().
1999     ACQUIRE_LOCK(&task->lock);
2000     cap = task->cap;
2001     RELEASE_LOCK(&task->lock);
2002
2003     // set the thread-local pointer to the Task:
2004     taskEnter(task);
2005
2006     // schedule() runs without a lock.
2007     cap = schedule(cap,task);
2008
2009     // On exit from schedule(), we have a Capability.
2010     releaseCapability(cap);
2011     workerTaskStop(task);
2012 }
2013 #endif
2014
2015 /* ---------------------------------------------------------------------------
2016  * initScheduler()
2017  *
2018  * Initialise the scheduler.  This resets all the queues - if the
2019  * queues contained any threads, they'll be garbage collected at the
2020  * next pass.
2021  *
2022  * ------------------------------------------------------------------------ */
2023
2024 void 
2025 initScheduler(void)
2026 {
2027 #if !defined(THREADED_RTS)
2028   blocked_queue_hd  = END_TSO_QUEUE;
2029   blocked_queue_tl  = END_TSO_QUEUE;
2030   sleeping_queue    = END_TSO_QUEUE;
2031 #endif
2032
2033   blackhole_queue   = END_TSO_QUEUE;
2034
2035   sched_state    = SCHED_RUNNING;
2036   recent_activity = ACTIVITY_YES;
2037
2038 #if defined(THREADED_RTS)
2039   /* Initialise the mutex and condition variables used by
2040    * the scheduler. */
2041   initMutex(&sched_mutex);
2042 #endif
2043   
2044   ACQUIRE_LOCK(&sched_mutex);
2045
2046   /* A capability holds the state a native thread needs in
2047    * order to execute STG code. At least one capability is
2048    * floating around (only THREADED_RTS builds have more than one).
2049    */
2050   initCapabilities();
2051
2052   initTaskManager();
2053
2054 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2055   initSparkPools();
2056 #endif
2057
2058 #if defined(THREADED_RTS)
2059   /*
2060    * Eagerly start one worker to run each Capability, except for
2061    * Capability 0.  The idea is that we're probably going to start a
2062    * bound thread on Capability 0 pretty soon, so we don't want a
2063    * worker task hogging it.
2064    */
2065   { 
2066       nat i;
2067       Capability *cap;
2068       for (i = 1; i < n_capabilities; i++) {
2069           cap = &capabilities[i];
2070           ACQUIRE_LOCK(&cap->lock);
2071           startWorkerTask(cap, workerStart);
2072           RELEASE_LOCK(&cap->lock);
2073       }
2074   }
2075 #endif
2076
2077   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2078
2079   RELEASE_LOCK(&sched_mutex);
2080 }
2081
2082 void
2083 exitScheduler(
2084     rtsBool wait_foreign
2085 #if !defined(THREADED_RTS)
2086                          __attribute__((unused))
2087 #endif
2088 )
2089                /* see Capability.c, shutdownCapability() */
2090 {
2091     Task *task = NULL;
2092
2093 #if defined(THREADED_RTS)
2094     ACQUIRE_LOCK(&sched_mutex);
2095     task = newBoundTask();
2096     RELEASE_LOCK(&sched_mutex);
2097 #endif
2098
2099     // If we haven't killed all the threads yet, do it now.
2100     if (sched_state < SCHED_SHUTTING_DOWN) {
2101         sched_state = SCHED_INTERRUPTING;
2102         scheduleDoGC(NULL,task,rtsFalse);    
2103     }
2104     sched_state = SCHED_SHUTTING_DOWN;
2105
2106 #if defined(THREADED_RTS)
2107     { 
2108         nat i;
2109         
2110         for (i = 0; i < n_capabilities; i++) {
2111             shutdownCapability(&capabilities[i], task, wait_foreign);
2112         }
2113         boundTaskExiting(task);
2114         stopTaskManager();
2115     }
2116 #else
2117     freeCapability(&MainCapability);
2118 #endif
2119 }
2120
2121 void
2122 freeScheduler( void )
2123 {
2124     freeTaskManager();
2125     if (n_capabilities != 1) {
2126         stgFree(capabilities);
2127     }
2128 #if defined(THREADED_RTS)
2129     closeMutex(&sched_mutex);
2130 #endif
2131 }
2132
2133 /* -----------------------------------------------------------------------------
2134    performGC
2135
2136    This is the interface to the garbage collector from Haskell land.
2137    We provide this so that external C code can allocate and garbage
2138    collect when called from Haskell via _ccall_GC.
2139    -------------------------------------------------------------------------- */
2140
2141 static void
2142 performGC_(rtsBool force_major)
2143 {
2144     Task *task;
2145     // We must grab a new Task here, because the existing Task may be
2146     // associated with a particular Capability, and chained onto the 
2147     // suspended_ccalling_tasks queue.
2148     ACQUIRE_LOCK(&sched_mutex);
2149     task = newBoundTask();
2150     RELEASE_LOCK(&sched_mutex);
2151     scheduleDoGC(NULL,task,force_major);
2152     boundTaskExiting(task);
2153 }
2154
2155 void
2156 performGC(void)
2157 {
2158     performGC_(rtsFalse);
2159 }
2160
2161 void
2162 performMajorGC(void)
2163 {
2164     performGC_(rtsTrue);
2165 }
2166
2167 /* -----------------------------------------------------------------------------
2168    Stack overflow
2169
2170    If the thread has reached its maximum stack size, then raise the
2171    StackOverflow exception in the offending thread.  Otherwise
2172    relocate the TSO into a larger chunk of memory and adjust its stack
2173    size appropriately.
2174    -------------------------------------------------------------------------- */
2175
2176 static StgTSO *
2177 threadStackOverflow(Capability *cap, StgTSO *tso)
2178 {
2179   nat new_stack_size, stack_words;
2180   lnat new_tso_size;
2181   StgPtr new_sp;
2182   StgTSO *dest;
2183
2184   IF_DEBUG(sanity,checkTSO(tso));
2185
2186   // don't allow throwTo() to modify the blocked_exceptions queue
2187   // while we are moving the TSO:
2188   lockClosure((StgClosure *)tso);
2189
2190   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2191       // NB. never raise a StackOverflow exception if the thread is
2192       // inside Control.Exceptino.block.  It is impractical to protect
2193       // against stack overflow exceptions, since virtually anything
2194       // can raise one (even 'catch'), so this is the only sensible
2195       // thing to do here.  See bug #767.
2196
2197       debugTrace(DEBUG_gc,
2198                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2199                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2200       IF_DEBUG(gc,
2201                /* If we're debugging, just print out the top of the stack */
2202                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2203                                                 tso->sp+64)));
2204
2205       // Send this thread the StackOverflow exception
2206       unlockTSO(tso);
2207       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2208       return tso;
2209   }
2210
2211   /* Try to double the current stack size.  If that takes us over the
2212    * maximum stack size for this thread, then use the maximum instead
2213    * (that is, unless we're already at or over the max size and we
2214    * can't raise the StackOverflow exception (see above), in which
2215    * case just double the size). Finally round up so the TSO ends up as
2216    * a whole number of blocks.
2217    */
2218   if (tso->stack_size >= tso->max_stack_size) {
2219       new_stack_size = tso->stack_size * 2;
2220   } else { 
2221       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2222   }
2223   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2224                                        TSO_STRUCT_SIZE)/sizeof(W_);
2225   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2226   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2227
2228   debugTrace(DEBUG_sched, 
2229              "increasing stack size from %ld words to %d.",
2230              (long)tso->stack_size, new_stack_size);
2231
2232   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2233   TICK_ALLOC_TSO(new_stack_size,0);
2234
2235   /* copy the TSO block and the old stack into the new area */
2236   memcpy(dest,tso,TSO_STRUCT_SIZE);
2237   stack_words = tso->stack + tso->stack_size - tso->sp;
2238   new_sp = (P_)dest + new_tso_size - stack_words;
2239   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2240
2241   /* relocate the stack pointers... */
2242   dest->sp         = new_sp;
2243   dest->stack_size = new_stack_size;
2244         
2245   /* Mark the old TSO as relocated.  We have to check for relocated
2246    * TSOs in the garbage collector and any primops that deal with TSOs.
2247    *
2248    * It's important to set the sp value to just beyond the end
2249    * of the stack, so we don't attempt to scavenge any part of the
2250    * dead TSO's stack.
2251    */
2252   tso->what_next = ThreadRelocated;
2253   setTSOLink(cap,tso,dest);
2254   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2255   tso->why_blocked = NotBlocked;
2256
2257   IF_PAR_DEBUG(verbose,
2258                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2259                      tso->id, tso, tso->stack_size);
2260                /* If we're debugging, just print out the top of the stack */
2261                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2262                                                 tso->sp+64)));
2263   
2264   unlockTSO(dest);
2265   unlockTSO(tso);
2266
2267   IF_DEBUG(sanity,checkTSO(dest));
2268 #if 0
2269   IF_DEBUG(scheduler,printTSO(dest));
2270 #endif
2271
2272   return dest;
2273 }
2274
2275 static StgTSO *
2276 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2277 {
2278     bdescr *bd, *new_bd;
2279     lnat free_w, tso_size_w;
2280     StgTSO *new_tso;
2281
2282     tso_size_w = tso_sizeW(tso);
2283
2284     if (tso_size_w < MBLOCK_SIZE_W || 
2285         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2286     {
2287         return tso;
2288     }
2289
2290     // don't allow throwTo() to modify the blocked_exceptions queue
2291     // while we are moving the TSO:
2292     lockClosure((StgClosure *)tso);
2293
2294     // this is the number of words we'll free
2295     free_w = round_to_mblocks(tso_size_w/2);
2296
2297     bd = Bdescr((StgPtr)tso);
2298     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2299     bd->free = bd->start + TSO_STRUCT_SIZEW;
2300
2301     new_tso = (StgTSO *)new_bd->start;
2302     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2303     new_tso->stack_size = new_bd->free - new_tso->stack;
2304
2305     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2306                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2307
2308     tso->what_next = ThreadRelocated;
2309     tso->_link = new_tso; // no write barrier reqd: same generation
2310
2311     // The TSO attached to this Task may have moved, so update the
2312     // pointer to it.
2313     if (task->tso == tso) {
2314         task->tso = new_tso;
2315     }
2316
2317     unlockTSO(new_tso);
2318     unlockTSO(tso);
2319
2320     IF_DEBUG(sanity,checkTSO(new_tso));
2321
2322     return new_tso;
2323 }
2324
2325 /* ---------------------------------------------------------------------------
2326    Interrupt execution
2327    - usually called inside a signal handler so it mustn't do anything fancy.   
2328    ------------------------------------------------------------------------ */
2329
2330 void
2331 interruptStgRts(void)
2332 {
2333     sched_state = SCHED_INTERRUPTING;
2334     setContextSwitches();
2335     wakeUpRts();
2336 }
2337
2338 /* -----------------------------------------------------------------------------
2339    Wake up the RTS
2340    
2341    This function causes at least one OS thread to wake up and run the
2342    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2343    an external event has arrived that may need servicing (eg. a
2344    keyboard interrupt).
2345
2346    In the single-threaded RTS we don't do anything here; we only have
2347    one thread anyway, and the event that caused us to want to wake up
2348    will have interrupted any blocking system call in progress anyway.
2349    -------------------------------------------------------------------------- */
2350
2351 void
2352 wakeUpRts(void)
2353 {
2354 #if defined(THREADED_RTS)
2355     // This forces the IO Manager thread to wakeup, which will
2356     // in turn ensure that some OS thread wakes up and runs the
2357     // scheduler loop, which will cause a GC and deadlock check.
2358     ioManagerWakeup();
2359 #endif
2360 }
2361
2362 /* -----------------------------------------------------------------------------
2363  * checkBlackHoles()
2364  *
2365  * Check the blackhole_queue for threads that can be woken up.  We do
2366  * this periodically: before every GC, and whenever the run queue is
2367  * empty.
2368  *
2369  * An elegant solution might be to just wake up all the blocked
2370  * threads with awakenBlockedQueue occasionally: they'll go back to
2371  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2372  * doesn't give us a way to tell whether we've actually managed to
2373  * wake up any threads, so we would be busy-waiting.
2374  *
2375  * -------------------------------------------------------------------------- */
2376
2377 static rtsBool
2378 checkBlackHoles (Capability *cap)
2379 {
2380     StgTSO **prev, *t;
2381     rtsBool any_woke_up = rtsFalse;
2382     StgHalfWord type;
2383
2384     // blackhole_queue is global:
2385     ASSERT_LOCK_HELD(&sched_mutex);
2386
2387     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2388
2389     // ASSUMES: sched_mutex
2390     prev = &blackhole_queue;
2391     t = blackhole_queue;
2392     while (t != END_TSO_QUEUE) {
2393         ASSERT(t->why_blocked == BlockedOnBlackHole);
2394         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2395         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2396             IF_DEBUG(sanity,checkTSO(t));
2397             t = unblockOne(cap, t);
2398             *prev = t;
2399             any_woke_up = rtsTrue;
2400         } else {
2401             prev = &t->_link;
2402             t = t->_link;
2403         }
2404     }
2405
2406     return any_woke_up;
2407 }
2408
2409 /* -----------------------------------------------------------------------------
2410    Deleting threads
2411
2412    This is used for interruption (^C) and forking, and corresponds to
2413    raising an exception but without letting the thread catch the
2414    exception.
2415    -------------------------------------------------------------------------- */
2416
2417 static void 
2418 deleteThread (Capability *cap, StgTSO *tso)
2419 {
2420     // NOTE: must only be called on a TSO that we have exclusive
2421     // access to, because we will call throwToSingleThreaded() below.
2422     // The TSO must be on the run queue of the Capability we own, or 
2423     // we must own all Capabilities.
2424
2425     if (tso->why_blocked != BlockedOnCCall &&
2426         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2427         throwToSingleThreaded(cap,tso,NULL);
2428     }
2429 }
2430
2431 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2432 static void 
2433 deleteThread_(Capability *cap, StgTSO *tso)
2434 { // for forkProcess only:
2435   // like deleteThread(), but we delete threads in foreign calls, too.
2436
2437     if (tso->why_blocked == BlockedOnCCall ||
2438         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2439         unblockOne(cap,tso);
2440         tso->what_next = ThreadKilled;
2441     } else {
2442         deleteThread(cap,tso);
2443     }
2444 }
2445 #endif
2446
2447 /* -----------------------------------------------------------------------------
2448    raiseExceptionHelper
2449    
2450    This function is called by the raise# primitve, just so that we can
2451    move some of the tricky bits of raising an exception from C-- into
2452    C.  Who knows, it might be a useful re-useable thing here too.
2453    -------------------------------------------------------------------------- */
2454
2455 StgWord
2456 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2457 {
2458     Capability *cap = regTableToCapability(reg);
2459     StgThunk *raise_closure = NULL;
2460     StgPtr p, next;
2461     StgRetInfoTable *info;
2462     //
2463     // This closure represents the expression 'raise# E' where E
2464     // is the exception raise.  It is used to overwrite all the
2465     // thunks which are currently under evaluataion.
2466     //
2467
2468     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2469     // LDV profiling: stg_raise_info has THUNK as its closure
2470     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2471     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2472     // 1 does not cause any problem unless profiling is performed.
2473     // However, when LDV profiling goes on, we need to linearly scan
2474     // small object pool, where raise_closure is stored, so we should
2475     // use MIN_UPD_SIZE.
2476     //
2477     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2478     //                                 sizeofW(StgClosure)+1);
2479     //
2480
2481     //
2482     // Walk up the stack, looking for the catch frame.  On the way,
2483     // we update any closures pointed to from update frames with the
2484     // raise closure that we just built.
2485     //
2486     p = tso->sp;
2487     while(1) {
2488         info = get_ret_itbl((StgClosure *)p);
2489         next = p + stack_frame_sizeW((StgClosure *)p);
2490         switch (info->i.type) {
2491             
2492         case UPDATE_FRAME:
2493             // Only create raise_closure if we need to.
2494             if (raise_closure == NULL) {
2495                 raise_closure = 
2496                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2497                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2498                 raise_closure->payload[0] = exception;
2499             }
2500             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2501             p = next;
2502             continue;
2503
2504         case ATOMICALLY_FRAME:
2505             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2506             tso->sp = p;
2507             return ATOMICALLY_FRAME;
2508             
2509         case CATCH_FRAME:
2510             tso->sp = p;
2511             return CATCH_FRAME;
2512
2513         case CATCH_STM_FRAME:
2514             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2515             tso->sp = p;
2516             return CATCH_STM_FRAME;
2517             
2518         case STOP_FRAME:
2519             tso->sp = p;
2520             return STOP_FRAME;
2521
2522         case CATCH_RETRY_FRAME:
2523         default:
2524             p = next; 
2525             continue;
2526         }
2527     }
2528 }
2529
2530
2531 /* -----------------------------------------------------------------------------
2532    findRetryFrameHelper
2533
2534    This function is called by the retry# primitive.  It traverses the stack
2535    leaving tso->sp referring to the frame which should handle the retry.  
2536
2537    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2538    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2539
2540    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2541    create) because retries are not considered to be exceptions, despite the
2542    similar implementation.
2543
2544    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2545    not be created within memory transactions.
2546    -------------------------------------------------------------------------- */
2547
2548 StgWord
2549 findRetryFrameHelper (StgTSO *tso)
2550 {
2551   StgPtr           p, next;
2552   StgRetInfoTable *info;
2553
2554   p = tso -> sp;
2555   while (1) {
2556     info = get_ret_itbl((StgClosure *)p);
2557     next = p + stack_frame_sizeW((StgClosure *)p);
2558     switch (info->i.type) {
2559       
2560     case ATOMICALLY_FRAME:
2561         debugTrace(DEBUG_stm,
2562                    "found ATOMICALLY_FRAME at %p during retry", p);
2563         tso->sp = p;
2564         return ATOMICALLY_FRAME;
2565       
2566     case CATCH_RETRY_FRAME:
2567         debugTrace(DEBUG_stm,
2568                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2569         tso->sp = p;
2570         return CATCH_RETRY_FRAME;
2571       
2572     case CATCH_STM_FRAME: {
2573         StgTRecHeader *trec = tso -> trec;
2574         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2575         debugTrace(DEBUG_stm,
2576                    "found CATCH_STM_FRAME at %p during retry", p);
2577         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2578         stmAbortTransaction(tso -> cap, trec);
2579         stmFreeAbortedTRec(tso -> cap, trec);
2580         tso -> trec = outer;
2581         p = next; 
2582         continue;
2583     }
2584       
2585
2586     default:
2587       ASSERT(info->i.type != CATCH_FRAME);
2588       ASSERT(info->i.type != STOP_FRAME);
2589       p = next; 
2590       continue;
2591     }
2592   }
2593 }
2594
2595 /* -----------------------------------------------------------------------------
2596    resurrectThreads is called after garbage collection on the list of
2597    threads found to be garbage.  Each of these threads will be woken
2598    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2599    on an MVar, or NonTermination if the thread was blocked on a Black
2600    Hole.
2601
2602    Locks: assumes we hold *all* the capabilities.
2603    -------------------------------------------------------------------------- */
2604
2605 void
2606 resurrectThreads (StgTSO *threads)
2607 {
2608     StgTSO *tso, *next;
2609     Capability *cap;
2610     step *step;
2611
2612     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2613         next = tso->global_link;
2614
2615         step = Bdescr((P_)tso)->step;
2616         tso->global_link = step->threads;
2617         step->threads = tso;
2618
2619         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2620         
2621         // Wake up the thread on the Capability it was last on
2622         cap = tso->cap;
2623         
2624         switch (tso->why_blocked) {
2625         case BlockedOnMVar:
2626         case BlockedOnException:
2627             /* Called by GC - sched_mutex lock is currently held. */
2628             throwToSingleThreaded(cap, tso,
2629                                   (StgClosure *)blockedOnDeadMVar_closure);
2630             break;
2631         case BlockedOnBlackHole:
2632             throwToSingleThreaded(cap, tso,
2633                                   (StgClosure *)nonTermination_closure);
2634             break;
2635         case BlockedOnSTM:
2636             throwToSingleThreaded(cap, tso,
2637                                   (StgClosure *)blockedIndefinitely_closure);
2638             break;
2639         case NotBlocked:
2640             /* This might happen if the thread was blocked on a black hole
2641              * belonging to a thread that we've just woken up (raiseAsync
2642              * can wake up threads, remember...).
2643              */
2644             continue;
2645         default:
2646             barf("resurrectThreads: thread blocked in a strange way");
2647         }
2648     }
2649 }
2650
2651 /* -----------------------------------------------------------------------------
2652    performPendingThrowTos is called after garbage collection, and
2653    passed a list of threads that were found to have pending throwTos
2654    (tso->blocked_exceptions was not empty), and were blocked.
2655    Normally this doesn't happen, because we would deliver the
2656    exception directly if the target thread is blocked, but there are
2657    small windows where it might occur on a multiprocessor (see
2658    throwTo()).
2659
2660    NB. we must be holding all the capabilities at this point, just
2661    like resurrectThreads().
2662    -------------------------------------------------------------------------- */
2663
2664 void
2665 performPendingThrowTos (StgTSO *threads)
2666 {
2667     StgTSO *tso, *next;
2668     Capability *cap;
2669     step *step;
2670
2671     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2672         next = tso->global_link;
2673
2674         step = Bdescr((P_)tso)->step;
2675         tso->global_link = step->threads;
2676         step->threads = tso;
2677
2678         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2679         
2680         cap = tso->cap;
2681         maybePerformBlockedException(cap, tso);
2682     }
2683 }