Move the context_switch flag into the Capability
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 #if defined(THREADED_RTS)
141 static void schedulePushWork(Capability *cap, Task *task);
142 #endif
143 static void scheduleStartSignalHandlers (Capability *cap);
144 static void scheduleCheckBlockedThreads (Capability *cap);
145 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
146 static void scheduleCheckBlackHoles (Capability *cap);
147 static void scheduleDetectDeadlock (Capability *cap, Task *task);
148 #if defined(PARALLEL_HASKELL)
149 static rtsBool scheduleGetRemoteWork(Capability *cap);
150 static void scheduleSendPendingMessages(void);
151 static void scheduleActivateSpark(Capability *cap);
152 #endif
153 static void schedulePostRunThread(StgTSO *t);
154 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
155 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
156                                          StgTSO *t);
157 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
158                                     nat prev_what_next );
159 static void scheduleHandleThreadBlocked( StgTSO *t );
160 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
161                                              StgTSO *t );
162 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
163 static Capability *scheduleDoGC(Capability *cap, Task *task,
164                                 rtsBool force_major);
165
166 static rtsBool checkBlackHoles(Capability *cap);
167
168 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
169 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
170
171 static void deleteThread (Capability *cap, StgTSO *tso);
172 static void deleteAllThreads (Capability *cap);
173
174 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
175 static void deleteThread_(Capability *cap, StgTSO *tso);
176 #endif
177
178 #ifdef DEBUG
179 static char *whatNext_strs[] = {
180   "(unknown)",
181   "ThreadRunGHC",
182   "ThreadInterpret",
183   "ThreadKilled",
184   "ThreadRelocated",
185   "ThreadComplete"
186 };
187 #endif
188
189 /* -----------------------------------------------------------------------------
190  * Putting a thread on the run queue: different scheduling policies
191  * -------------------------------------------------------------------------- */
192
193 STATIC_INLINE void
194 addToRunQueue( Capability *cap, StgTSO *t )
195 {
196 #if defined(PARALLEL_HASKELL)
197     if (RtsFlags.ParFlags.doFairScheduling) { 
198         // this does round-robin scheduling; good for concurrency
199         appendToRunQueue(cap,t);
200     } else {
201         // this does unfair scheduling; good for parallelism
202         pushOnRunQueue(cap,t);
203     }
204 #else
205     // this does round-robin scheduling; good for concurrency
206     appendToRunQueue(cap,t);
207 #endif
208 }
209
210 /* ---------------------------------------------------------------------------
211    Main scheduling loop.
212
213    We use round-robin scheduling, each thread returning to the
214    scheduler loop when one of these conditions is detected:
215
216       * out of heap space
217       * timer expires (thread yields)
218       * thread blocks
219       * thread ends
220       * stack overflow
221
222    GRAN version:
223      In a GranSim setup this loop iterates over the global event queue.
224      This revolves around the global event queue, which determines what 
225      to do next. Therefore, it's more complicated than either the 
226      concurrent or the parallel (GUM) setup.
227   This version has been entirely removed (JB 2008/08).
228
229    GUM version:
230      GUM iterates over incoming messages.
231      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
232      and sends out a fish whenever it has nothing to do; in-between
233      doing the actual reductions (shared code below) it processes the
234      incoming messages and deals with delayed operations 
235      (see PendingFetches).
236      This is not the ugliest code you could imagine, but it's bloody close.
237
238   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
239   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
240   as well as future GUM versions. This file has been refurbished to
241   only contain valid code, which is however incomplete, refers to
242   invalid includes etc.
243
244    ------------------------------------------------------------------------ */
245
246 static Capability *
247 schedule (Capability *initialCapability, Task *task)
248 {
249   StgTSO *t;
250   Capability *cap;
251   StgThreadReturnCode ret;
252 #if defined(PARALLEL_HASKELL)
253   rtsBool receivedFinish = rtsFalse;
254 #endif
255   nat prev_what_next;
256   rtsBool ready_to_gc;
257 #if defined(THREADED_RTS)
258   rtsBool first = rtsTrue;
259 #endif
260   
261   cap = initialCapability;
262
263   // Pre-condition: this task owns initialCapability.
264   // The sched_mutex is *NOT* held
265   // NB. on return, we still hold a capability.
266
267   debugTrace (DEBUG_sched, 
268               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
269               task, initialCapability);
270
271   schedulePreLoop();
272
273   // -----------------------------------------------------------
274   // Scheduler loop starts here:
275
276 #if defined(PARALLEL_HASKELL)
277 #define TERMINATION_CONDITION        (!receivedFinish)
278 #else
279 #define TERMINATION_CONDITION        rtsTrue
280 #endif
281
282   while (TERMINATION_CONDITION) {
283
284 #if defined(THREADED_RTS)
285       if (first) {
286           // don't yield the first time, we want a chance to run this
287           // thread for a bit, even if there are others banging at the
288           // door.
289           first = rtsFalse;
290           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
291       } else {
292           // Yield the capability to higher-priority tasks if necessary.
293           yieldCapability(&cap, task);
294       }
295 #endif
296       
297 #if defined(THREADED_RTS)
298       schedulePushWork(cap,task);
299 #endif
300
301     // Check whether we have re-entered the RTS from Haskell without
302     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
303     // call).
304     if (cap->in_haskell) {
305           errorBelch("schedule: re-entered unsafely.\n"
306                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
307           stg_exit(EXIT_FAILURE);
308     }
309
310     // The interruption / shutdown sequence.
311     // 
312     // In order to cleanly shut down the runtime, we want to:
313     //   * make sure that all main threads return to their callers
314     //     with the state 'Interrupted'.
315     //   * clean up all OS threads assocated with the runtime
316     //   * free all memory etc.
317     //
318     // So the sequence for ^C goes like this:
319     //
320     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
321     //     arranges for some Capability to wake up
322     //
323     //   * all threads in the system are halted, and the zombies are
324     //     placed on the run queue for cleaning up.  We acquire all
325     //     the capabilities in order to delete the threads, this is
326     //     done by scheduleDoGC() for convenience (because GC already
327     //     needs to acquire all the capabilities).  We can't kill
328     //     threads involved in foreign calls.
329     // 
330     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
331     //
332     //   * sched_state := SCHED_SHUTTING_DOWN
333     //
334     //   * all workers exit when the run queue on their capability
335     //     drains.  All main threads will also exit when their TSO
336     //     reaches the head of the run queue and they can return.
337     //
338     //   * eventually all Capabilities will shut down, and the RTS can
339     //     exit.
340     //
341     //   * We might be left with threads blocked in foreign calls, 
342     //     we should really attempt to kill these somehow (TODO);
343     
344     switch (sched_state) {
345     case SCHED_RUNNING:
346         break;
347     case SCHED_INTERRUPTING:
348         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
349 #if defined(THREADED_RTS)
350         discardSparksCap(cap);
351 #endif
352         /* scheduleDoGC() deletes all the threads */
353         cap = scheduleDoGC(cap,task,rtsFalse);
354         break;
355     case SCHED_SHUTTING_DOWN:
356         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
357         // If we are a worker, just exit.  If we're a bound thread
358         // then we will exit below when we've removed our TSO from
359         // the run queue.
360         if (task->tso == NULL && emptyRunQueue(cap)) {
361             return cap;
362         }
363         break;
364     default:
365         barf("sched_state: %d", sched_state);
366     }
367
368 #if defined(THREADED_RTS)
369     // If the run queue is empty, take a spark and turn it into a thread.
370     {
371         if (emptyRunQueue(cap)) {
372             StgClosure *spark;
373             spark = findSpark(cap);
374             if (spark != NULL) {
375                 debugTrace(DEBUG_sched,
376                            "turning spark of closure %p into a thread",
377                            (StgClosure *)spark);
378                 createSparkThread(cap,spark);     
379             }
380         }
381     }
382 #endif // THREADED_RTS
383
384     scheduleStartSignalHandlers(cap);
385
386     // Only check the black holes here if we've nothing else to do.
387     // During normal execution, the black hole list only gets checked
388     // at GC time, to avoid repeatedly traversing this possibly long
389     // list each time around the scheduler.
390     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
391
392     scheduleCheckWakeupThreads(cap);
393
394     scheduleCheckBlockedThreads(cap);
395
396 #if defined(PARALLEL_HASKELL)
397     /* message processing and work distribution goes here */ 
398
399     /* if messages have been buffered... a NOOP in THREADED_RTS */
400     scheduleSendPendingMessages();
401
402     /* If the run queue is empty,...*/
403     if (emptyRunQueue(cap)) {
404       /* ...take one of our own sparks and turn it into a thread */
405       scheduleActivateSpark(cap);
406
407         /* if this did not work, try to steal a spark from someone else */
408       if (emptyRunQueue(cap)) {
409         receivedFinish = scheduleGetRemoteWork(cap);
410         continue; //  a new round, (hopefully) with new work
411         /* 
412            in GUM, this a) sends out a FISH and returns IF no fish is
413                            out already
414                         b) (blocking) awaits and receives messages
415            
416            in Eden, this is only the blocking receive, as b) in GUM.
417         */
418       }
419     } 
420
421     /* since we perform a blocking receive and continue otherwise,
422        either we never reach here or we definitely have work! */
423     // from here: non-empty run queue
424     ASSERT(!emptyRunQueue(cap));
425
426     if (PacketsWaiting()) {  /* now process incoming messages, if any
427                                 pending...  
428
429                                 CAUTION: scheduleGetRemoteWork called
430                                 above, waits for messages as well! */
431       processMessages(cap, &receivedFinish);
432     }
433 #endif // PARALLEL_HASKELL
434
435     scheduleDetectDeadlock(cap,task);
436 #if defined(THREADED_RTS)
437     cap = task->cap;    // reload cap, it might have changed
438 #endif
439
440     // Normally, the only way we can get here with no threads to
441     // run is if a keyboard interrupt received during 
442     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
443     // Additionally, it is not fatal for the
444     // threaded RTS to reach here with no threads to run.
445     //
446     // win32: might be here due to awaitEvent() being abandoned
447     // as a result of a console event having been delivered.
448     if ( emptyRunQueue(cap) ) {
449 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
450         ASSERT(sched_state >= SCHED_INTERRUPTING);
451 #endif
452         continue; // nothing to do
453     }
454
455     // 
456     // Get a thread to run
457     //
458     t = popRunQueue(cap);
459
460     // Sanity check the thread we're about to run.  This can be
461     // expensive if there is lots of thread switching going on...
462     IF_DEBUG(sanity,checkTSO(t));
463
464 #if defined(THREADED_RTS)
465     // Check whether we can run this thread in the current task.
466     // If not, we have to pass our capability to the right task.
467     {
468         Task *bound = t->bound;
469       
470         if (bound) {
471             if (bound == task) {
472                 debugTrace(DEBUG_sched,
473                            "### Running thread %lu in bound thread", (unsigned long)t->id);
474                 // yes, the Haskell thread is bound to the current native thread
475             } else {
476                 debugTrace(DEBUG_sched,
477                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
478                 // no, bound to a different Haskell thread: pass to that thread
479                 pushOnRunQueue(cap,t);
480                 continue;
481             }
482         } else {
483             // The thread we want to run is unbound.
484             if (task->tso) { 
485                 debugTrace(DEBUG_sched,
486                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
487                 // no, the current native thread is bound to a different
488                 // Haskell thread, so pass it to any worker thread
489                 pushOnRunQueue(cap,t);
490                 continue; 
491             }
492         }
493     }
494 #endif
495
496     /* context switches are initiated by the timer signal, unless
497      * the user specified "context switch as often as possible", with
498      * +RTS -C0
499      */
500     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
501         && !emptyThreadQueues(cap)) {
502         cap->context_switch = 1;
503     }
504          
505 run_thread:
506
507     // CurrentTSO is the thread to run.  t might be different if we
508     // loop back to run_thread, so make sure to set CurrentTSO after
509     // that.
510     cap->r.rCurrentTSO = t;
511
512     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
513                               (long)t->id, whatNext_strs[t->what_next]);
514
515     startHeapProfTimer();
516
517     // Check for exceptions blocked on this thread
518     maybePerformBlockedException (cap, t);
519
520     // ----------------------------------------------------------------------
521     // Run the current thread 
522
523     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
524     ASSERT(t->cap == cap);
525
526     prev_what_next = t->what_next;
527
528     errno = t->saved_errno;
529 #if mingw32_HOST_OS
530     SetLastError(t->saved_winerror);
531 #endif
532
533     cap->in_haskell = rtsTrue;
534
535     dirty_TSO(cap,t);
536
537 #if defined(THREADED_RTS)
538     if (recent_activity == ACTIVITY_DONE_GC) {
539         // ACTIVITY_DONE_GC means we turned off the timer signal to
540         // conserve power (see #1623).  Re-enable it here.
541         nat prev;
542         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
543         if (prev == ACTIVITY_DONE_GC) {
544             startTimer();
545         }
546     } else {
547         recent_activity = ACTIVITY_YES;
548     }
549 #endif
550
551     switch (prev_what_next) {
552         
553     case ThreadKilled:
554     case ThreadComplete:
555         /* Thread already finished, return to scheduler. */
556         ret = ThreadFinished;
557         break;
558         
559     case ThreadRunGHC:
560     {
561         StgRegTable *r;
562         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
563         cap = regTableToCapability(r);
564         ret = r->rRet;
565         break;
566     }
567     
568     case ThreadInterpret:
569         cap = interpretBCO(cap);
570         ret = cap->r.rRet;
571         break;
572         
573     default:
574         barf("schedule: invalid what_next field");
575     }
576
577     cap->in_haskell = rtsFalse;
578
579     // The TSO might have moved, eg. if it re-entered the RTS and a GC
580     // happened.  So find the new location:
581     t = cap->r.rCurrentTSO;
582
583     // We have run some Haskell code: there might be blackhole-blocked
584     // threads to wake up now.
585     // Lock-free test here should be ok, we're just setting a flag.
586     if ( blackhole_queue != END_TSO_QUEUE ) {
587         blackholes_need_checking = rtsTrue;
588     }
589     
590     // And save the current errno in this thread.
591     // XXX: possibly bogus for SMP because this thread might already
592     // be running again, see code below.
593     t->saved_errno = errno;
594 #if mingw32_HOST_OS
595     // Similarly for Windows error code
596     t->saved_winerror = GetLastError();
597 #endif
598
599 #if defined(THREADED_RTS)
600     // If ret is ThreadBlocked, and this Task is bound to the TSO that
601     // blocked, we are in limbo - the TSO is now owned by whatever it
602     // is blocked on, and may in fact already have been woken up,
603     // perhaps even on a different Capability.  It may be the case
604     // that task->cap != cap.  We better yield this Capability
605     // immediately and return to normaility.
606     if (ret == ThreadBlocked) {
607         debugTrace(DEBUG_sched,
608                    "--<< thread %lu (%s) stopped: blocked",
609                    (unsigned long)t->id, whatNext_strs[t->what_next]);
610         continue;
611     }
612 #endif
613
614     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
615     ASSERT(t->cap == cap);
616
617     // ----------------------------------------------------------------------
618     
619     // Costs for the scheduler are assigned to CCS_SYSTEM
620     stopHeapProfTimer();
621 #if defined(PROFILING)
622     CCCS = CCS_SYSTEM;
623 #endif
624     
625     schedulePostRunThread(t);
626
627     t = threadStackUnderflow(task,t);
628
629     ready_to_gc = rtsFalse;
630
631     switch (ret) {
632     case HeapOverflow:
633         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
634         break;
635
636     case StackOverflow:
637         scheduleHandleStackOverflow(cap,task,t);
638         break;
639
640     case ThreadYielding:
641         if (scheduleHandleYield(cap, t, prev_what_next)) {
642             // shortcut for switching between compiler/interpreter:
643             goto run_thread; 
644         }
645         break;
646
647     case ThreadBlocked:
648         scheduleHandleThreadBlocked(t);
649         break;
650
651     case ThreadFinished:
652         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
653         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
654         break;
655
656     default:
657       barf("schedule: invalid thread return code %d", (int)ret);
658     }
659
660     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
661       cap = scheduleDoGC(cap,task,rtsFalse);
662     }
663   } /* end of while() */
664 }
665
666 /* ----------------------------------------------------------------------------
667  * Setting up the scheduler loop
668  * ------------------------------------------------------------------------- */
669
670 static void
671 schedulePreLoop(void)
672 {
673   // initialisation for scheduler - what cannot go into initScheduler()  
674 }
675
676 /* -----------------------------------------------------------------------------
677  * schedulePushWork()
678  *
679  * Push work to other Capabilities if we have some.
680  * -------------------------------------------------------------------------- */
681
682 #if defined(THREADED_RTS)
683 static void
684 schedulePushWork(Capability *cap USED_IF_THREADS, 
685                  Task *task      USED_IF_THREADS)
686 {
687     Capability *free_caps[n_capabilities], *cap0;
688     nat i, n_free_caps;
689
690     // migration can be turned off with +RTS -qg
691     if (!RtsFlags.ParFlags.migrate) return;
692
693     // Check whether we have more threads on our run queue, or sparks
694     // in our pool, that we could hand to another Capability.
695     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
696         && sparkPoolSizeCap(cap) < 2) {
697         return;
698     }
699
700     // First grab as many free Capabilities as we can.
701     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
702         cap0 = &capabilities[i];
703         if (cap != cap0 && tryGrabCapability(cap0,task)) {
704             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
705                 // it already has some work, we just grabbed it at 
706                 // the wrong moment.  Or maybe it's deadlocked!
707                 releaseCapability(cap0);
708             } else {
709                 free_caps[n_free_caps++] = cap0;
710             }
711         }
712     }
713
714     // we now have n_free_caps free capabilities stashed in
715     // free_caps[].  Share our run queue equally with them.  This is
716     // probably the simplest thing we could do; improvements we might
717     // want to do include:
718     //
719     //   - giving high priority to moving relatively new threads, on 
720     //     the gournds that they haven't had time to build up a
721     //     working set in the cache on this CPU/Capability.
722     //
723     //   - giving low priority to moving long-lived threads
724
725     if (n_free_caps > 0) {
726         StgTSO *prev, *t, *next;
727         rtsBool pushed_to_all;
728
729         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
730
731         i = 0;
732         pushed_to_all = rtsFalse;
733
734         if (cap->run_queue_hd != END_TSO_QUEUE) {
735             prev = cap->run_queue_hd;
736             t = prev->_link;
737             prev->_link = END_TSO_QUEUE;
738             for (; t != END_TSO_QUEUE; t = next) {
739                 next = t->_link;
740                 t->_link = END_TSO_QUEUE;
741                 if (t->what_next == ThreadRelocated
742                     || t->bound == task // don't move my bound thread
743                     || tsoLocked(t)) {  // don't move a locked thread
744                     setTSOLink(cap, prev, t);
745                     prev = t;
746                 } else if (i == n_free_caps) {
747                     pushed_to_all = rtsTrue;
748                     i = 0;
749                     // keep one for us
750                     setTSOLink(cap, prev, t);
751                     prev = t;
752                 } else {
753                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
754                     appendToRunQueue(free_caps[i],t);
755                     if (t->bound) { t->bound->cap = free_caps[i]; }
756                     t->cap = free_caps[i];
757                     i++;
758                 }
759             }
760             cap->run_queue_tl = prev;
761         }
762
763         // If there are some free capabilities that we didn't push any
764         // threads to, then try to push a spark to each one.
765         if (!pushed_to_all) {
766             StgClosure *spark;
767             // i is the next free capability to push to
768             for (; i < n_free_caps; i++) {
769                 if (emptySparkPoolCap(free_caps[i])) {
770                     spark = findSpark(cap);
771                     if (spark != NULL) {
772                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
773                         newSpark(&(free_caps[i]->r), spark);
774                     }
775                 }
776             }
777         }
778
779         // release the capabilities
780         for (i = 0; i < n_free_caps; i++) {
781             task->cap = free_caps[i];
782             releaseCapability(free_caps[i]);
783         }
784     }
785     task->cap = cap; // reset to point to our Capability.
786 }
787 #endif
788
789 /* ----------------------------------------------------------------------------
790  * Start any pending signal handlers
791  * ------------------------------------------------------------------------- */
792
793 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
794 static void
795 scheduleStartSignalHandlers(Capability *cap)
796 {
797     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
798         // safe outside the lock
799         startSignalHandlers(cap);
800     }
801 }
802 #else
803 static void
804 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
805 {
806 }
807 #endif
808
809 /* ----------------------------------------------------------------------------
810  * Check for blocked threads that can be woken up.
811  * ------------------------------------------------------------------------- */
812
813 static void
814 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
815 {
816 #if !defined(THREADED_RTS)
817     //
818     // Check whether any waiting threads need to be woken up.  If the
819     // run queue is empty, and there are no other tasks running, we
820     // can wait indefinitely for something to happen.
821     //
822     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
823     {
824         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
825     }
826 #endif
827 }
828
829
830 /* ----------------------------------------------------------------------------
831  * Check for threads woken up by other Capabilities
832  * ------------------------------------------------------------------------- */
833
834 static void
835 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
836 {
837 #if defined(THREADED_RTS)
838     // Any threads that were woken up by other Capabilities get
839     // appended to our run queue.
840     if (!emptyWakeupQueue(cap)) {
841         ACQUIRE_LOCK(&cap->lock);
842         if (emptyRunQueue(cap)) {
843             cap->run_queue_hd = cap->wakeup_queue_hd;
844             cap->run_queue_tl = cap->wakeup_queue_tl;
845         } else {
846             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
847             cap->run_queue_tl = cap->wakeup_queue_tl;
848         }
849         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
850         RELEASE_LOCK(&cap->lock);
851     }
852 #endif
853 }
854
855 /* ----------------------------------------------------------------------------
856  * Check for threads blocked on BLACKHOLEs that can be woken up
857  * ------------------------------------------------------------------------- */
858 static void
859 scheduleCheckBlackHoles (Capability *cap)
860 {
861     if ( blackholes_need_checking ) // check without the lock first
862     {
863         ACQUIRE_LOCK(&sched_mutex);
864         if ( blackholes_need_checking ) {
865             checkBlackHoles(cap);
866             blackholes_need_checking = rtsFalse;
867         }
868         RELEASE_LOCK(&sched_mutex);
869     }
870 }
871
872 /* ----------------------------------------------------------------------------
873  * Detect deadlock conditions and attempt to resolve them.
874  * ------------------------------------------------------------------------- */
875
876 static void
877 scheduleDetectDeadlock (Capability *cap, Task *task)
878 {
879
880 #if defined(PARALLEL_HASKELL)
881     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
882     return;
883 #endif
884
885     /* 
886      * Detect deadlock: when we have no threads to run, there are no
887      * threads blocked, waiting for I/O, or sleeping, and all the
888      * other tasks are waiting for work, we must have a deadlock of
889      * some description.
890      */
891     if ( emptyThreadQueues(cap) )
892     {
893 #if defined(THREADED_RTS)
894         /* 
895          * In the threaded RTS, we only check for deadlock if there
896          * has been no activity in a complete timeslice.  This means
897          * we won't eagerly start a full GC just because we don't have
898          * any threads to run currently.
899          */
900         if (recent_activity != ACTIVITY_INACTIVE) return;
901 #endif
902
903         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
904
905         // Garbage collection can release some new threads due to
906         // either (a) finalizers or (b) threads resurrected because
907         // they are unreachable and will therefore be sent an
908         // exception.  Any threads thus released will be immediately
909         // runnable.
910         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
911
912         recent_activity = ACTIVITY_DONE_GC;
913         // disable timer signals (see #1623)
914         stopTimer();
915         
916         if ( !emptyRunQueue(cap) ) return;
917
918 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
919         /* If we have user-installed signal handlers, then wait
920          * for signals to arrive rather then bombing out with a
921          * deadlock.
922          */
923         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
924             debugTrace(DEBUG_sched,
925                        "still deadlocked, waiting for signals...");
926
927             awaitUserSignals();
928
929             if (signals_pending()) {
930                 startSignalHandlers(cap);
931             }
932
933             // either we have threads to run, or we were interrupted:
934             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
935
936             return;
937         }
938 #endif
939
940 #if !defined(THREADED_RTS)
941         /* Probably a real deadlock.  Send the current main thread the
942          * Deadlock exception.
943          */
944         if (task->tso) {
945             switch (task->tso->why_blocked) {
946             case BlockedOnSTM:
947             case BlockedOnBlackHole:
948             case BlockedOnException:
949             case BlockedOnMVar:
950                 throwToSingleThreaded(cap, task->tso, 
951                                       (StgClosure *)nonTermination_closure);
952                 return;
953             default:
954                 barf("deadlock: main thread blocked in a strange way");
955             }
956         }
957         return;
958 #endif
959     }
960 }
961
962
963 /* ----------------------------------------------------------------------------
964  * Send pending messages (PARALLEL_HASKELL only)
965  * ------------------------------------------------------------------------- */
966
967 #if defined(PARALLEL_HASKELL)
968 static StgTSO *
969 scheduleSendPendingMessages(void)
970 {
971
972 # if defined(PAR) // global Mem.Mgmt., omit for now
973     if (PendingFetches != END_BF_QUEUE) {
974         processFetches();
975     }
976 # endif
977     
978     if (RtsFlags.ParFlags.BufferTime) {
979         // if we use message buffering, we must send away all message
980         // packets which have become too old...
981         sendOldBuffers(); 
982     }
983 }
984 #endif
985
986 /* ----------------------------------------------------------------------------
987  * Activate spark threads (PARALLEL_HASKELL only)
988  * ------------------------------------------------------------------------- */
989
990 #if defined(PARALLEL_HASKELL)
991 static void
992 scheduleActivateSpark(Capability *cap)
993 {
994     StgClosure *spark;
995
996 /* We only want to stay here if the run queue is empty and we want some
997    work. We try to turn a spark into a thread, and add it to the run
998    queue, from where it will be picked up in the next iteration of the
999    scheduler loop.  
1000 */
1001     if (!emptyRunQueue(cap)) 
1002       /* In the threaded RTS, another task might have pushed a thread
1003          on our run queue in the meantime ? But would need a lock.. */
1004       return;
1005
1006     spark = findSpark(cap); // defined in Sparks.c
1007
1008     if (spark != NULL) {
1009       debugTrace(DEBUG_sched,
1010                  "turning spark of closure %p into a thread",
1011                  (StgClosure *)spark);
1012       createSparkThread(cap,spark); // defined in Sparks.c
1013     }
1014 }
1015 #endif // PARALLEL_HASKELL
1016
1017 /* ----------------------------------------------------------------------------
1018  * Get work from a remote node (PARALLEL_HASKELL only)
1019  * ------------------------------------------------------------------------- */
1020     
1021 #if defined(PARALLEL_HASKELL)
1022 static rtsBool
1023 scheduleGetRemoteWork(Capability *cap)
1024 {
1025 #if defined(PARALLEL_HASKELL)
1026   rtsBool receivedFinish = rtsFalse;
1027
1028   // idle() , i.e. send all buffers, wait for work
1029   if (RtsFlags.ParFlags.BufferTime) {
1030         IF_PAR_DEBUG(verbose, 
1031                 debugBelch("...send all pending data,"));
1032         {
1033           nat i;
1034           for (i=1; i<=nPEs; i++)
1035             sendImmediately(i); // send all messages away immediately
1036         }
1037   }
1038
1039   /* this would be the place for fishing in GUM... 
1040
1041      if (no-earlier-fish-around) 
1042           sendFish(choosePe());
1043    */
1044
1045   // Eden:just look for incoming messages (blocking receive)
1046   IF_PAR_DEBUG(verbose, 
1047                debugBelch("...wait for incoming messages...\n"));
1048   processMessages(cap, &receivedFinish); // blocking receive...
1049
1050
1051   return receivedFinish;
1052   // reenter scheduling look after having received something
1053
1054 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1055
1056   return rtsFalse; /* return value unused in THREADED_RTS */
1057
1058 #endif /* PARALLEL_HASKELL */
1059 }
1060 #endif // PARALLEL_HASKELL
1061
1062 /* ----------------------------------------------------------------------------
1063  * After running a thread...
1064  * ------------------------------------------------------------------------- */
1065
1066 static void
1067 schedulePostRunThread (StgTSO *t)
1068 {
1069     // We have to be able to catch transactions that are in an
1070     // infinite loop as a result of seeing an inconsistent view of
1071     // memory, e.g. 
1072     //
1073     //   atomically $ do
1074     //       [a,b] <- mapM readTVar [ta,tb]
1075     //       when (a == b) loop
1076     //
1077     // and a is never equal to b given a consistent view of memory.
1078     //
1079     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1080         if (!stmValidateNestOfTransactions (t -> trec)) {
1081             debugTrace(DEBUG_sched | DEBUG_stm,
1082                        "trec %p found wasting its time", t);
1083             
1084             // strip the stack back to the
1085             // ATOMICALLY_FRAME, aborting the (nested)
1086             // transaction, and saving the stack of any
1087             // partially-evaluated thunks on the heap.
1088             throwToSingleThreaded_(&capabilities[0], t, 
1089                                    NULL, rtsTrue, NULL);
1090             
1091             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1092         }
1093     }
1094
1095   /* some statistics gathering in the parallel case */
1096 }
1097
1098 /* -----------------------------------------------------------------------------
1099  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1100  * -------------------------------------------------------------------------- */
1101
1102 static rtsBool
1103 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1104 {
1105     // did the task ask for a large block?
1106     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1107         // if so, get one and push it on the front of the nursery.
1108         bdescr *bd;
1109         lnat blocks;
1110         
1111         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1112         
1113         debugTrace(DEBUG_sched,
1114                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1115                    (long)t->id, whatNext_strs[t->what_next], blocks);
1116     
1117         // don't do this if the nursery is (nearly) full, we'll GC first.
1118         if (cap->r.rCurrentNursery->link != NULL ||
1119             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1120                                                // if the nursery has only one block.
1121             
1122             ACQUIRE_SM_LOCK
1123             bd = allocGroup( blocks );
1124             RELEASE_SM_LOCK
1125             cap->r.rNursery->n_blocks += blocks;
1126             
1127             // link the new group into the list
1128             bd->link = cap->r.rCurrentNursery;
1129             bd->u.back = cap->r.rCurrentNursery->u.back;
1130             if (cap->r.rCurrentNursery->u.back != NULL) {
1131                 cap->r.rCurrentNursery->u.back->link = bd;
1132             } else {
1133 #if !defined(THREADED_RTS)
1134                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1135                        g0s0 == cap->r.rNursery);
1136 #endif
1137                 cap->r.rNursery->blocks = bd;
1138             }             
1139             cap->r.rCurrentNursery->u.back = bd;
1140             
1141             // initialise it as a nursery block.  We initialise the
1142             // step, gen_no, and flags field of *every* sub-block in
1143             // this large block, because this is easier than making
1144             // sure that we always find the block head of a large
1145             // block whenever we call Bdescr() (eg. evacuate() and
1146             // isAlive() in the GC would both have to do this, at
1147             // least).
1148             { 
1149                 bdescr *x;
1150                 for (x = bd; x < bd + blocks; x++) {
1151                     x->step = cap->r.rNursery;
1152                     x->gen_no = 0;
1153                     x->flags = 0;
1154                 }
1155             }
1156             
1157             // This assert can be a killer if the app is doing lots
1158             // of large block allocations.
1159             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1160             
1161             // now update the nursery to point to the new block
1162             cap->r.rCurrentNursery = bd;
1163             
1164             // we might be unlucky and have another thread get on the
1165             // run queue before us and steal the large block, but in that
1166             // case the thread will just end up requesting another large
1167             // block.
1168             pushOnRunQueue(cap,t);
1169             return rtsFalse;  /* not actually GC'ing */
1170         }
1171     }
1172     
1173     debugTrace(DEBUG_sched,
1174                "--<< thread %ld (%s) stopped: HeapOverflow",
1175                (long)t->id, whatNext_strs[t->what_next]);
1176
1177     if (cap->context_switch) {
1178         // Sometimes we miss a context switch, e.g. when calling
1179         // primitives in a tight loop, MAYBE_GC() doesn't check the
1180         // context switch flag, and we end up waiting for a GC.
1181         // See #1984, and concurrent/should_run/1984
1182         cap->context_switch = 0;
1183         addToRunQueue(cap,t);
1184     } else {
1185         pushOnRunQueue(cap,t);
1186     }
1187     return rtsTrue;
1188     /* actual GC is done at the end of the while loop in schedule() */
1189 }
1190
1191 /* -----------------------------------------------------------------------------
1192  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1193  * -------------------------------------------------------------------------- */
1194
1195 static void
1196 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1197 {
1198     debugTrace (DEBUG_sched,
1199                 "--<< thread %ld (%s) stopped, StackOverflow", 
1200                 (long)t->id, whatNext_strs[t->what_next]);
1201
1202     /* just adjust the stack for this thread, then pop it back
1203      * on the run queue.
1204      */
1205     { 
1206         /* enlarge the stack */
1207         StgTSO *new_t = threadStackOverflow(cap, t);
1208         
1209         /* The TSO attached to this Task may have moved, so update the
1210          * pointer to it.
1211          */
1212         if (task->tso == t) {
1213             task->tso = new_t;
1214         }
1215         pushOnRunQueue(cap,new_t);
1216     }
1217 }
1218
1219 /* -----------------------------------------------------------------------------
1220  * Handle a thread that returned to the scheduler with ThreadYielding
1221  * -------------------------------------------------------------------------- */
1222
1223 static rtsBool
1224 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1225 {
1226     // Reset the context switch flag.  We don't do this just before
1227     // running the thread, because that would mean we would lose ticks
1228     // during GC, which can lead to unfair scheduling (a thread hogs
1229     // the CPU because the tick always arrives during GC).  This way
1230     // penalises threads that do a lot of allocation, but that seems
1231     // better than the alternative.
1232     cap->context_switch = 0;
1233     
1234     /* put the thread back on the run queue.  Then, if we're ready to
1235      * GC, check whether this is the last task to stop.  If so, wake
1236      * up the GC thread.  getThread will block during a GC until the
1237      * GC is finished.
1238      */
1239 #ifdef DEBUG
1240     if (t->what_next != prev_what_next) {
1241         debugTrace(DEBUG_sched,
1242                    "--<< thread %ld (%s) stopped to switch evaluators", 
1243                    (long)t->id, whatNext_strs[t->what_next]);
1244     } else {
1245         debugTrace(DEBUG_sched,
1246                    "--<< thread %ld (%s) stopped, yielding",
1247                    (long)t->id, whatNext_strs[t->what_next]);
1248     }
1249 #endif
1250     
1251     IF_DEBUG(sanity,
1252              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1253              checkTSO(t));
1254     ASSERT(t->_link == END_TSO_QUEUE);
1255     
1256     // Shortcut if we're just switching evaluators: don't bother
1257     // doing stack squeezing (which can be expensive), just run the
1258     // thread.
1259     if (t->what_next != prev_what_next) {
1260         return rtsTrue;
1261     }
1262
1263     addToRunQueue(cap,t);
1264
1265     return rtsFalse;
1266 }
1267
1268 /* -----------------------------------------------------------------------------
1269  * Handle a thread that returned to the scheduler with ThreadBlocked
1270  * -------------------------------------------------------------------------- */
1271
1272 static void
1273 scheduleHandleThreadBlocked( StgTSO *t
1274 #if !defined(GRAN) && !defined(DEBUG)
1275     STG_UNUSED
1276 #endif
1277     )
1278 {
1279
1280       // We don't need to do anything.  The thread is blocked, and it
1281       // has tidied up its stack and placed itself on whatever queue
1282       // it needs to be on.
1283
1284     // ASSERT(t->why_blocked != NotBlocked);
1285     // Not true: for example,
1286     //    - in THREADED_RTS, the thread may already have been woken
1287     //      up by another Capability.  This actually happens: try
1288     //      conc023 +RTS -N2.
1289     //    - the thread may have woken itself up already, because
1290     //      threadPaused() might have raised a blocked throwTo
1291     //      exception, see maybePerformBlockedException().
1292
1293 #ifdef DEBUG
1294     if (traceClass(DEBUG_sched)) {
1295         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1296                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1297         printThreadBlockage(t);
1298         debugTraceEnd();
1299     }
1300 #endif
1301 }
1302
1303 /* -----------------------------------------------------------------------------
1304  * Handle a thread that returned to the scheduler with ThreadFinished
1305  * -------------------------------------------------------------------------- */
1306
1307 static rtsBool
1308 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1309 {
1310     /* Need to check whether this was a main thread, and if so,
1311      * return with the return value.
1312      *
1313      * We also end up here if the thread kills itself with an
1314      * uncaught exception, see Exception.cmm.
1315      */
1316     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1317                (unsigned long)t->id, whatNext_strs[t->what_next]);
1318
1319       //
1320       // Check whether the thread that just completed was a bound
1321       // thread, and if so return with the result.  
1322       //
1323       // There is an assumption here that all thread completion goes
1324       // through this point; we need to make sure that if a thread
1325       // ends up in the ThreadKilled state, that it stays on the run
1326       // queue so it can be dealt with here.
1327       //
1328
1329       if (t->bound) {
1330
1331           if (t->bound != task) {
1332 #if !defined(THREADED_RTS)
1333               // Must be a bound thread that is not the topmost one.  Leave
1334               // it on the run queue until the stack has unwound to the
1335               // point where we can deal with this.  Leaving it on the run
1336               // queue also ensures that the garbage collector knows about
1337               // this thread and its return value (it gets dropped from the
1338               // step->threads list so there's no other way to find it).
1339               appendToRunQueue(cap,t);
1340               return rtsFalse;
1341 #else
1342               // this cannot happen in the threaded RTS, because a
1343               // bound thread can only be run by the appropriate Task.
1344               barf("finished bound thread that isn't mine");
1345 #endif
1346           }
1347
1348           ASSERT(task->tso == t);
1349
1350           if (t->what_next == ThreadComplete) {
1351               if (task->ret) {
1352                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1353                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1354               }
1355               task->stat = Success;
1356           } else {
1357               if (task->ret) {
1358                   *(task->ret) = NULL;
1359               }
1360               if (sched_state >= SCHED_INTERRUPTING) {
1361                   task->stat = Interrupted;
1362               } else {
1363                   task->stat = Killed;
1364               }
1365           }
1366 #ifdef DEBUG
1367           removeThreadLabel((StgWord)task->tso->id);
1368 #endif
1369           return rtsTrue; // tells schedule() to return
1370       }
1371
1372       return rtsFalse;
1373 }
1374
1375 /* -----------------------------------------------------------------------------
1376  * Perform a heap census
1377  * -------------------------------------------------------------------------- */
1378
1379 static rtsBool
1380 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1381 {
1382     // When we have +RTS -i0 and we're heap profiling, do a census at
1383     // every GC.  This lets us get repeatable runs for debugging.
1384     if (performHeapProfile ||
1385         (RtsFlags.ProfFlags.profileInterval==0 &&
1386          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1387         return rtsTrue;
1388     } else {
1389         return rtsFalse;
1390     }
1391 }
1392
1393 /* -----------------------------------------------------------------------------
1394  * Perform a garbage collection if necessary
1395  * -------------------------------------------------------------------------- */
1396
1397 static Capability *
1398 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1399 {
1400     rtsBool heap_census;
1401 #ifdef THREADED_RTS
1402     /* extern static volatile StgWord waiting_for_gc; 
1403        lives inside capability.c */
1404     rtsBool was_waiting;
1405     nat i;
1406 #endif
1407
1408 #ifdef THREADED_RTS
1409     // In order to GC, there must be no threads running Haskell code.
1410     // Therefore, the GC thread needs to hold *all* the capabilities,
1411     // and release them after the GC has completed.  
1412     //
1413     // This seems to be the simplest way: previous attempts involved
1414     // making all the threads with capabilities give up their
1415     // capabilities and sleep except for the *last* one, which
1416     // actually did the GC.  But it's quite hard to arrange for all
1417     // the other tasks to sleep and stay asleep.
1418     //
1419         
1420     /*  Other capabilities are prevented from running yet more Haskell
1421         threads if waiting_for_gc is set. Tested inside
1422         yieldCapability() and releaseCapability() in Capability.c */
1423
1424     was_waiting = cas(&waiting_for_gc, 0, 1);
1425     if (was_waiting) {
1426         do {
1427             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1428             if (cap) yieldCapability(&cap,task);
1429         } while (waiting_for_gc);
1430         return cap;  // NOTE: task->cap might have changed here
1431     }
1432
1433     setContextSwitches();
1434     for (i=0; i < n_capabilities; i++) {
1435         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1436         if (cap != &capabilities[i]) {
1437             Capability *pcap = &capabilities[i];
1438             // we better hope this task doesn't get migrated to
1439             // another Capability while we're waiting for this one.
1440             // It won't, because load balancing happens while we have
1441             // all the Capabilities, but even so it's a slightly
1442             // unsavoury invariant.
1443             task->cap = pcap;
1444             waitForReturnCapability(&pcap, task);
1445             if (pcap != &capabilities[i]) {
1446                 barf("scheduleDoGC: got the wrong capability");
1447             }
1448         }
1449     }
1450
1451     waiting_for_gc = rtsFalse;
1452 #endif
1453
1454     // so this happens periodically:
1455     if (cap) scheduleCheckBlackHoles(cap);
1456     
1457     IF_DEBUG(scheduler, printAllThreads());
1458
1459     /*
1460      * We now have all the capabilities; if we're in an interrupting
1461      * state, then we should take the opportunity to delete all the
1462      * threads in the system.
1463      */
1464     if (sched_state >= SCHED_INTERRUPTING) {
1465         deleteAllThreads(&capabilities[0]);
1466         sched_state = SCHED_SHUTTING_DOWN;
1467     }
1468     
1469     heap_census = scheduleNeedHeapProfile(rtsTrue);
1470
1471     /* everybody back, start the GC.
1472      * Could do it in this thread, or signal a condition var
1473      * to do it in another thread.  Either way, we need to
1474      * broadcast on gc_pending_cond afterward.
1475      */
1476 #if defined(THREADED_RTS)
1477     debugTrace(DEBUG_sched, "doing GC");
1478 #endif
1479     GarbageCollect(force_major || heap_census);
1480     
1481     if (heap_census) {
1482         debugTrace(DEBUG_sched, "performing heap census");
1483         heapCensus();
1484         performHeapProfile = rtsFalse;
1485     }
1486
1487 #if defined(THREADED_RTS)
1488     // release our stash of capabilities.
1489     for (i = 0; i < n_capabilities; i++) {
1490         if (cap != &capabilities[i]) {
1491             task->cap = &capabilities[i];
1492             releaseCapability(&capabilities[i]);
1493         }
1494     }
1495     if (cap) {
1496         task->cap = cap;
1497     } else {
1498         task->cap = NULL;
1499     }
1500 #endif
1501
1502     return cap;
1503 }
1504
1505 /* ---------------------------------------------------------------------------
1506  * Singleton fork(). Do not copy any running threads.
1507  * ------------------------------------------------------------------------- */
1508
1509 pid_t
1510 forkProcess(HsStablePtr *entry
1511 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1512             STG_UNUSED
1513 #endif
1514            )
1515 {
1516 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1517     Task *task;
1518     pid_t pid;
1519     StgTSO* t,*next;
1520     Capability *cap;
1521     nat s;
1522     
1523 #if defined(THREADED_RTS)
1524     if (RtsFlags.ParFlags.nNodes > 1) {
1525         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1526         stg_exit(EXIT_FAILURE);
1527     }
1528 #endif
1529
1530     debugTrace(DEBUG_sched, "forking!");
1531     
1532     // ToDo: for SMP, we should probably acquire *all* the capabilities
1533     cap = rts_lock();
1534     
1535     // no funny business: hold locks while we fork, otherwise if some
1536     // other thread is holding a lock when the fork happens, the data
1537     // structure protected by the lock will forever be in an
1538     // inconsistent state in the child.  See also #1391.
1539     ACQUIRE_LOCK(&sched_mutex);
1540     ACQUIRE_LOCK(&cap->lock);
1541     ACQUIRE_LOCK(&cap->running_task->lock);
1542
1543     pid = fork();
1544     
1545     if (pid) { // parent
1546         
1547         RELEASE_LOCK(&sched_mutex);
1548         RELEASE_LOCK(&cap->lock);
1549         RELEASE_LOCK(&cap->running_task->lock);
1550
1551         // just return the pid
1552         rts_unlock(cap);
1553         return pid;
1554         
1555     } else { // child
1556         
1557 #if defined(THREADED_RTS)
1558         initMutex(&sched_mutex);
1559         initMutex(&cap->lock);
1560         initMutex(&cap->running_task->lock);
1561 #endif
1562
1563         // Now, all OS threads except the thread that forked are
1564         // stopped.  We need to stop all Haskell threads, including
1565         // those involved in foreign calls.  Also we need to delete
1566         // all Tasks, because they correspond to OS threads that are
1567         // now gone.
1568
1569         for (s = 0; s < total_steps; s++) {
1570           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1571             if (t->what_next == ThreadRelocated) {
1572                 next = t->_link;
1573             } else {
1574                 next = t->global_link;
1575                 // don't allow threads to catch the ThreadKilled
1576                 // exception, but we do want to raiseAsync() because these
1577                 // threads may be evaluating thunks that we need later.
1578                 deleteThread_(cap,t);
1579             }
1580           }
1581         }
1582         
1583         // Empty the run queue.  It seems tempting to let all the
1584         // killed threads stay on the run queue as zombies to be
1585         // cleaned up later, but some of them correspond to bound
1586         // threads for which the corresponding Task does not exist.
1587         cap->run_queue_hd = END_TSO_QUEUE;
1588         cap->run_queue_tl = END_TSO_QUEUE;
1589
1590         // Any suspended C-calling Tasks are no more, their OS threads
1591         // don't exist now:
1592         cap->suspended_ccalling_tasks = NULL;
1593
1594         // Empty the threads lists.  Otherwise, the garbage
1595         // collector may attempt to resurrect some of these threads.
1596         for (s = 0; s < total_steps; s++) {
1597             all_steps[s].threads = END_TSO_QUEUE;
1598         }
1599
1600         // Wipe the task list, except the current Task.
1601         ACQUIRE_LOCK(&sched_mutex);
1602         for (task = all_tasks; task != NULL; task=task->all_link) {
1603             if (task != cap->running_task) {
1604 #if defined(THREADED_RTS)
1605                 initMutex(&task->lock); // see #1391
1606 #endif
1607                 discardTask(task);
1608             }
1609         }
1610         RELEASE_LOCK(&sched_mutex);
1611
1612 #if defined(THREADED_RTS)
1613         // Wipe our spare workers list, they no longer exist.  New
1614         // workers will be created if necessary.
1615         cap->spare_workers = NULL;
1616         cap->returning_tasks_hd = NULL;
1617         cap->returning_tasks_tl = NULL;
1618 #endif
1619
1620         // On Unix, all timers are reset in the child, so we need to start
1621         // the timer again.
1622         initTimer();
1623         startTimer();
1624
1625         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1626         rts_checkSchedStatus("forkProcess",cap);
1627         
1628         rts_unlock(cap);
1629         hs_exit();                      // clean up and exit
1630         stg_exit(EXIT_SUCCESS);
1631     }
1632 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1633     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1634     return -1;
1635 #endif
1636 }
1637
1638 /* ---------------------------------------------------------------------------
1639  * Delete all the threads in the system
1640  * ------------------------------------------------------------------------- */
1641    
1642 static void
1643 deleteAllThreads ( Capability *cap )
1644 {
1645     // NOTE: only safe to call if we own all capabilities.
1646
1647     StgTSO* t, *next;
1648     nat s;
1649
1650     debugTrace(DEBUG_sched,"deleting all threads");
1651     for (s = 0; s < total_steps; s++) {
1652       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1653         if (t->what_next == ThreadRelocated) {
1654             next = t->_link;
1655         } else {
1656             next = t->global_link;
1657             deleteThread(cap,t);
1658         }
1659       }
1660     }      
1661
1662     // The run queue now contains a bunch of ThreadKilled threads.  We
1663     // must not throw these away: the main thread(s) will be in there
1664     // somewhere, and the main scheduler loop has to deal with it.
1665     // Also, the run queue is the only thing keeping these threads from
1666     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1667
1668 #if !defined(THREADED_RTS)
1669     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1670     ASSERT(sleeping_queue == END_TSO_QUEUE);
1671 #endif
1672 }
1673
1674 /* -----------------------------------------------------------------------------
1675    Managing the suspended_ccalling_tasks list.
1676    Locks required: sched_mutex
1677    -------------------------------------------------------------------------- */
1678
1679 STATIC_INLINE void
1680 suspendTask (Capability *cap, Task *task)
1681 {
1682     ASSERT(task->next == NULL && task->prev == NULL);
1683     task->next = cap->suspended_ccalling_tasks;
1684     task->prev = NULL;
1685     if (cap->suspended_ccalling_tasks) {
1686         cap->suspended_ccalling_tasks->prev = task;
1687     }
1688     cap->suspended_ccalling_tasks = task;
1689 }
1690
1691 STATIC_INLINE void
1692 recoverSuspendedTask (Capability *cap, Task *task)
1693 {
1694     if (task->prev) {
1695         task->prev->next = task->next;
1696     } else {
1697         ASSERT(cap->suspended_ccalling_tasks == task);
1698         cap->suspended_ccalling_tasks = task->next;
1699     }
1700     if (task->next) {
1701         task->next->prev = task->prev;
1702     }
1703     task->next = task->prev = NULL;
1704 }
1705
1706 /* ---------------------------------------------------------------------------
1707  * Suspending & resuming Haskell threads.
1708  * 
1709  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1710  * its capability before calling the C function.  This allows another
1711  * task to pick up the capability and carry on running Haskell
1712  * threads.  It also means that if the C call blocks, it won't lock
1713  * the whole system.
1714  *
1715  * The Haskell thread making the C call is put to sleep for the
1716  * duration of the call, on the susepended_ccalling_threads queue.  We
1717  * give out a token to the task, which it can use to resume the thread
1718  * on return from the C function.
1719  * ------------------------------------------------------------------------- */
1720    
1721 void *
1722 suspendThread (StgRegTable *reg)
1723 {
1724   Capability *cap;
1725   int saved_errno;
1726   StgTSO *tso;
1727   Task *task;
1728 #if mingw32_HOST_OS
1729   StgWord32 saved_winerror;
1730 #endif
1731
1732   saved_errno = errno;
1733 #if mingw32_HOST_OS
1734   saved_winerror = GetLastError();
1735 #endif
1736
1737   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1738    */
1739   cap = regTableToCapability(reg);
1740
1741   task = cap->running_task;
1742   tso = cap->r.rCurrentTSO;
1743
1744   debugTrace(DEBUG_sched, 
1745              "thread %lu did a safe foreign call", 
1746              (unsigned long)cap->r.rCurrentTSO->id);
1747
1748   // XXX this might not be necessary --SDM
1749   tso->what_next = ThreadRunGHC;
1750
1751   threadPaused(cap,tso);
1752
1753   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1754       tso->why_blocked = BlockedOnCCall;
1755       tso->flags |= TSO_BLOCKEX;
1756       tso->flags &= ~TSO_INTERRUPTIBLE;
1757   } else {
1758       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1759   }
1760
1761   // Hand back capability
1762   task->suspended_tso = tso;
1763
1764   ACQUIRE_LOCK(&cap->lock);
1765
1766   suspendTask(cap,task);
1767   cap->in_haskell = rtsFalse;
1768   releaseCapability_(cap);
1769   
1770   RELEASE_LOCK(&cap->lock);
1771
1772 #if defined(THREADED_RTS)
1773   /* Preparing to leave the RTS, so ensure there's a native thread/task
1774      waiting to take over.
1775   */
1776   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1777 #endif
1778
1779   errno = saved_errno;
1780 #if mingw32_HOST_OS
1781   SetLastError(saved_winerror);
1782 #endif
1783   return task;
1784 }
1785
1786 StgRegTable *
1787 resumeThread (void *task_)
1788 {
1789     StgTSO *tso;
1790     Capability *cap;
1791     Task *task = task_;
1792     int saved_errno;
1793 #if mingw32_HOST_OS
1794     StgWord32 saved_winerror;
1795 #endif
1796
1797     saved_errno = errno;
1798 #if mingw32_HOST_OS
1799     saved_winerror = GetLastError();
1800 #endif
1801
1802     cap = task->cap;
1803     // Wait for permission to re-enter the RTS with the result.
1804     waitForReturnCapability(&cap,task);
1805     // we might be on a different capability now... but if so, our
1806     // entry on the suspended_ccalling_tasks list will also have been
1807     // migrated.
1808
1809     // Remove the thread from the suspended list
1810     recoverSuspendedTask(cap,task);
1811
1812     tso = task->suspended_tso;
1813     task->suspended_tso = NULL;
1814     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1815     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1816     
1817     if (tso->why_blocked == BlockedOnCCall) {
1818         awakenBlockedExceptionQueue(cap,tso);
1819         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1820     }
1821     
1822     /* Reset blocking status */
1823     tso->why_blocked  = NotBlocked;
1824     
1825     cap->r.rCurrentTSO = tso;
1826     cap->in_haskell = rtsTrue;
1827     errno = saved_errno;
1828 #if mingw32_HOST_OS
1829     SetLastError(saved_winerror);
1830 #endif
1831
1832     /* We might have GC'd, mark the TSO dirty again */
1833     dirty_TSO(cap,tso);
1834
1835     IF_DEBUG(sanity, checkTSO(tso));
1836
1837     return &cap->r;
1838 }
1839
1840 /* ---------------------------------------------------------------------------
1841  * scheduleThread()
1842  *
1843  * scheduleThread puts a thread on the end  of the runnable queue.
1844  * This will usually be done immediately after a thread is created.
1845  * The caller of scheduleThread must create the thread using e.g.
1846  * createThread and push an appropriate closure
1847  * on this thread's stack before the scheduler is invoked.
1848  * ------------------------------------------------------------------------ */
1849
1850 void
1851 scheduleThread(Capability *cap, StgTSO *tso)
1852 {
1853     // The thread goes at the *end* of the run-queue, to avoid possible
1854     // starvation of any threads already on the queue.
1855     appendToRunQueue(cap,tso);
1856 }
1857
1858 void
1859 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1860 {
1861 #if defined(THREADED_RTS)
1862     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1863                               // move this thread from now on.
1864     cpu %= RtsFlags.ParFlags.nNodes;
1865     if (cpu == cap->no) {
1866         appendToRunQueue(cap,tso);
1867     } else {
1868         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1869     }
1870 #else
1871     appendToRunQueue(cap,tso);
1872 #endif
1873 }
1874
1875 Capability *
1876 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1877 {
1878     Task *task;
1879
1880     // We already created/initialised the Task
1881     task = cap->running_task;
1882
1883     // This TSO is now a bound thread; make the Task and TSO
1884     // point to each other.
1885     tso->bound = task;
1886     tso->cap = cap;
1887
1888     task->tso = tso;
1889     task->ret = ret;
1890     task->stat = NoStatus;
1891
1892     appendToRunQueue(cap,tso);
1893
1894     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1895
1896     cap = schedule(cap,task);
1897
1898     ASSERT(task->stat != NoStatus);
1899     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1900
1901     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1902     return cap;
1903 }
1904
1905 /* ----------------------------------------------------------------------------
1906  * Starting Tasks
1907  * ------------------------------------------------------------------------- */
1908
1909 #if defined(THREADED_RTS)
1910 void OSThreadProcAttr
1911 workerStart(Task *task)
1912 {
1913     Capability *cap;
1914
1915     // See startWorkerTask().
1916     ACQUIRE_LOCK(&task->lock);
1917     cap = task->cap;
1918     RELEASE_LOCK(&task->lock);
1919
1920     // set the thread-local pointer to the Task:
1921     taskEnter(task);
1922
1923     // schedule() runs without a lock.
1924     cap = schedule(cap,task);
1925
1926     // On exit from schedule(), we have a Capability.
1927     releaseCapability(cap);
1928     workerTaskStop(task);
1929 }
1930 #endif
1931
1932 /* ---------------------------------------------------------------------------
1933  * initScheduler()
1934  *
1935  * Initialise the scheduler.  This resets all the queues - if the
1936  * queues contained any threads, they'll be garbage collected at the
1937  * next pass.
1938  *
1939  * ------------------------------------------------------------------------ */
1940
1941 void 
1942 initScheduler(void)
1943 {
1944 #if !defined(THREADED_RTS)
1945   blocked_queue_hd  = END_TSO_QUEUE;
1946   blocked_queue_tl  = END_TSO_QUEUE;
1947   sleeping_queue    = END_TSO_QUEUE;
1948 #endif
1949
1950   blackhole_queue   = END_TSO_QUEUE;
1951
1952   sched_state    = SCHED_RUNNING;
1953   recent_activity = ACTIVITY_YES;
1954
1955 #if defined(THREADED_RTS)
1956   /* Initialise the mutex and condition variables used by
1957    * the scheduler. */
1958   initMutex(&sched_mutex);
1959 #endif
1960   
1961   ACQUIRE_LOCK(&sched_mutex);
1962
1963   /* A capability holds the state a native thread needs in
1964    * order to execute STG code. At least one capability is
1965    * floating around (only THREADED_RTS builds have more than one).
1966    */
1967   initCapabilities();
1968
1969   initTaskManager();
1970
1971 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
1972   initSparkPools();
1973 #endif
1974
1975 #if defined(THREADED_RTS)
1976   /*
1977    * Eagerly start one worker to run each Capability, except for
1978    * Capability 0.  The idea is that we're probably going to start a
1979    * bound thread on Capability 0 pretty soon, so we don't want a
1980    * worker task hogging it.
1981    */
1982   { 
1983       nat i;
1984       Capability *cap;
1985       for (i = 1; i < n_capabilities; i++) {
1986           cap = &capabilities[i];
1987           ACQUIRE_LOCK(&cap->lock);
1988           startWorkerTask(cap, workerStart);
1989           RELEASE_LOCK(&cap->lock);
1990       }
1991   }
1992 #endif
1993
1994   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
1995
1996   RELEASE_LOCK(&sched_mutex);
1997 }
1998
1999 void
2000 exitScheduler(
2001     rtsBool wait_foreign
2002 #if !defined(THREADED_RTS)
2003                          __attribute__((unused))
2004 #endif
2005 )
2006                /* see Capability.c, shutdownCapability() */
2007 {
2008     Task *task = NULL;
2009
2010 #if defined(THREADED_RTS)
2011     ACQUIRE_LOCK(&sched_mutex);
2012     task = newBoundTask();
2013     RELEASE_LOCK(&sched_mutex);
2014 #endif
2015
2016     // If we haven't killed all the threads yet, do it now.
2017     if (sched_state < SCHED_SHUTTING_DOWN) {
2018         sched_state = SCHED_INTERRUPTING;
2019         scheduleDoGC(NULL,task,rtsFalse);    
2020     }
2021     sched_state = SCHED_SHUTTING_DOWN;
2022
2023 #if defined(THREADED_RTS)
2024     { 
2025         nat i;
2026         
2027         for (i = 0; i < n_capabilities; i++) {
2028             shutdownCapability(&capabilities[i], task, wait_foreign);
2029         }
2030         boundTaskExiting(task);
2031         stopTaskManager();
2032     }
2033 #else
2034     freeCapability(&MainCapability);
2035 #endif
2036 }
2037
2038 void
2039 freeScheduler( void )
2040 {
2041     freeTaskManager();
2042     if (n_capabilities != 1) {
2043         stgFree(capabilities);
2044     }
2045 #if defined(THREADED_RTS)
2046     closeMutex(&sched_mutex);
2047 #endif
2048 }
2049
2050 /* -----------------------------------------------------------------------------
2051    performGC
2052
2053    This is the interface to the garbage collector from Haskell land.
2054    We provide this so that external C code can allocate and garbage
2055    collect when called from Haskell via _ccall_GC.
2056    -------------------------------------------------------------------------- */
2057
2058 static void
2059 performGC_(rtsBool force_major)
2060 {
2061     Task *task;
2062     // We must grab a new Task here, because the existing Task may be
2063     // associated with a particular Capability, and chained onto the 
2064     // suspended_ccalling_tasks queue.
2065     ACQUIRE_LOCK(&sched_mutex);
2066     task = newBoundTask();
2067     RELEASE_LOCK(&sched_mutex);
2068     scheduleDoGC(NULL,task,force_major);
2069     boundTaskExiting(task);
2070 }
2071
2072 void
2073 performGC(void)
2074 {
2075     performGC_(rtsFalse);
2076 }
2077
2078 void
2079 performMajorGC(void)
2080 {
2081     performGC_(rtsTrue);
2082 }
2083
2084 /* -----------------------------------------------------------------------------
2085    Stack overflow
2086
2087    If the thread has reached its maximum stack size, then raise the
2088    StackOverflow exception in the offending thread.  Otherwise
2089    relocate the TSO into a larger chunk of memory and adjust its stack
2090    size appropriately.
2091    -------------------------------------------------------------------------- */
2092
2093 static StgTSO *
2094 threadStackOverflow(Capability *cap, StgTSO *tso)
2095 {
2096   nat new_stack_size, stack_words;
2097   lnat new_tso_size;
2098   StgPtr new_sp;
2099   StgTSO *dest;
2100
2101   IF_DEBUG(sanity,checkTSO(tso));
2102
2103   // don't allow throwTo() to modify the blocked_exceptions queue
2104   // while we are moving the TSO:
2105   lockClosure((StgClosure *)tso);
2106
2107   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2108       // NB. never raise a StackOverflow exception if the thread is
2109       // inside Control.Exceptino.block.  It is impractical to protect
2110       // against stack overflow exceptions, since virtually anything
2111       // can raise one (even 'catch'), so this is the only sensible
2112       // thing to do here.  See bug #767.
2113
2114       debugTrace(DEBUG_gc,
2115                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2116                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2117       IF_DEBUG(gc,
2118                /* If we're debugging, just print out the top of the stack */
2119                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2120                                                 tso->sp+64)));
2121
2122       // Send this thread the StackOverflow exception
2123       unlockTSO(tso);
2124       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2125       return tso;
2126   }
2127
2128   /* Try to double the current stack size.  If that takes us over the
2129    * maximum stack size for this thread, then use the maximum instead.
2130    * Finally round up so the TSO ends up as a whole number of blocks.
2131    */
2132   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2133   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2134                                        TSO_STRUCT_SIZE)/sizeof(W_);
2135   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2136   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2137
2138   debugTrace(DEBUG_sched, 
2139              "increasing stack size from %ld words to %d.",
2140              (long)tso->stack_size, new_stack_size);
2141
2142   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2143   TICK_ALLOC_TSO(new_stack_size,0);
2144
2145   /* copy the TSO block and the old stack into the new area */
2146   memcpy(dest,tso,TSO_STRUCT_SIZE);
2147   stack_words = tso->stack + tso->stack_size - tso->sp;
2148   new_sp = (P_)dest + new_tso_size - stack_words;
2149   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2150
2151   /* relocate the stack pointers... */
2152   dest->sp         = new_sp;
2153   dest->stack_size = new_stack_size;
2154         
2155   /* Mark the old TSO as relocated.  We have to check for relocated
2156    * TSOs in the garbage collector and any primops that deal with TSOs.
2157    *
2158    * It's important to set the sp value to just beyond the end
2159    * of the stack, so we don't attempt to scavenge any part of the
2160    * dead TSO's stack.
2161    */
2162   tso->what_next = ThreadRelocated;
2163   setTSOLink(cap,tso,dest);
2164   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2165   tso->why_blocked = NotBlocked;
2166
2167   IF_PAR_DEBUG(verbose,
2168                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2169                      tso->id, tso, tso->stack_size);
2170                /* If we're debugging, just print out the top of the stack */
2171                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2172                                                 tso->sp+64)));
2173   
2174   unlockTSO(dest);
2175   unlockTSO(tso);
2176
2177   IF_DEBUG(sanity,checkTSO(dest));
2178 #if 0
2179   IF_DEBUG(scheduler,printTSO(dest));
2180 #endif
2181
2182   return dest;
2183 }
2184
2185 static StgTSO *
2186 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2187 {
2188     bdescr *bd, *new_bd;
2189     lnat free_w, tso_size_w;
2190     StgTSO *new_tso;
2191
2192     tso_size_w = tso_sizeW(tso);
2193
2194     if (tso_size_w < MBLOCK_SIZE_W || 
2195         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2196     {
2197         return tso;
2198     }
2199
2200     // don't allow throwTo() to modify the blocked_exceptions queue
2201     // while we are moving the TSO:
2202     lockClosure((StgClosure *)tso);
2203
2204     // this is the number of words we'll free
2205     free_w = round_to_mblocks(tso_size_w/2);
2206
2207     bd = Bdescr((StgPtr)tso);
2208     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2209     bd->free = bd->start + TSO_STRUCT_SIZEW;
2210
2211     new_tso = (StgTSO *)new_bd->start;
2212     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2213     new_tso->stack_size = new_bd->free - new_tso->stack;
2214
2215     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2216                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2217
2218     tso->what_next = ThreadRelocated;
2219     tso->_link = new_tso; // no write barrier reqd: same generation
2220
2221     // The TSO attached to this Task may have moved, so update the
2222     // pointer to it.
2223     if (task->tso == tso) {
2224         task->tso = new_tso;
2225     }
2226
2227     unlockTSO(new_tso);
2228     unlockTSO(tso);
2229
2230     IF_DEBUG(sanity,checkTSO(new_tso));
2231
2232     return new_tso;
2233 }
2234
2235 /* ---------------------------------------------------------------------------
2236    Interrupt execution
2237    - usually called inside a signal handler so it mustn't do anything fancy.   
2238    ------------------------------------------------------------------------ */
2239
2240 void
2241 interruptStgRts(void)
2242 {
2243     sched_state = SCHED_INTERRUPTING;
2244     setContextSwitches();
2245     wakeUpRts();
2246 }
2247
2248 /* -----------------------------------------------------------------------------
2249    Wake up the RTS
2250    
2251    This function causes at least one OS thread to wake up and run the
2252    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2253    an external event has arrived that may need servicing (eg. a
2254    keyboard interrupt).
2255
2256    In the single-threaded RTS we don't do anything here; we only have
2257    one thread anyway, and the event that caused us to want to wake up
2258    will have interrupted any blocking system call in progress anyway.
2259    -------------------------------------------------------------------------- */
2260
2261 void
2262 wakeUpRts(void)
2263 {
2264 #if defined(THREADED_RTS)
2265     // This forces the IO Manager thread to wakeup, which will
2266     // in turn ensure that some OS thread wakes up and runs the
2267     // scheduler loop, which will cause a GC and deadlock check.
2268     ioManagerWakeup();
2269 #endif
2270 }
2271
2272 /* -----------------------------------------------------------------------------
2273  * checkBlackHoles()
2274  *
2275  * Check the blackhole_queue for threads that can be woken up.  We do
2276  * this periodically: before every GC, and whenever the run queue is
2277  * empty.
2278  *
2279  * An elegant solution might be to just wake up all the blocked
2280  * threads with awakenBlockedQueue occasionally: they'll go back to
2281  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2282  * doesn't give us a way to tell whether we've actually managed to
2283  * wake up any threads, so we would be busy-waiting.
2284  *
2285  * -------------------------------------------------------------------------- */
2286
2287 static rtsBool
2288 checkBlackHoles (Capability *cap)
2289 {
2290     StgTSO **prev, *t;
2291     rtsBool any_woke_up = rtsFalse;
2292     StgHalfWord type;
2293
2294     // blackhole_queue is global:
2295     ASSERT_LOCK_HELD(&sched_mutex);
2296
2297     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2298
2299     // ASSUMES: sched_mutex
2300     prev = &blackhole_queue;
2301     t = blackhole_queue;
2302     while (t != END_TSO_QUEUE) {
2303         ASSERT(t->why_blocked == BlockedOnBlackHole);
2304         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2305         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2306             IF_DEBUG(sanity,checkTSO(t));
2307             t = unblockOne(cap, t);
2308             *prev = t;
2309             any_woke_up = rtsTrue;
2310         } else {
2311             prev = &t->_link;
2312             t = t->_link;
2313         }
2314     }
2315
2316     return any_woke_up;
2317 }
2318
2319 /* -----------------------------------------------------------------------------
2320    Deleting threads
2321
2322    This is used for interruption (^C) and forking, and corresponds to
2323    raising an exception but without letting the thread catch the
2324    exception.
2325    -------------------------------------------------------------------------- */
2326
2327 static void 
2328 deleteThread (Capability *cap, StgTSO *tso)
2329 {
2330     // NOTE: must only be called on a TSO that we have exclusive
2331     // access to, because we will call throwToSingleThreaded() below.
2332     // The TSO must be on the run queue of the Capability we own, or 
2333     // we must own all Capabilities.
2334
2335     if (tso->why_blocked != BlockedOnCCall &&
2336         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2337         throwToSingleThreaded(cap,tso,NULL);
2338     }
2339 }
2340
2341 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2342 static void 
2343 deleteThread_(Capability *cap, StgTSO *tso)
2344 { // for forkProcess only:
2345   // like deleteThread(), but we delete threads in foreign calls, too.
2346
2347     if (tso->why_blocked == BlockedOnCCall ||
2348         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2349         unblockOne(cap,tso);
2350         tso->what_next = ThreadKilled;
2351     } else {
2352         deleteThread(cap,tso);
2353     }
2354 }
2355 #endif
2356
2357 /* -----------------------------------------------------------------------------
2358    raiseExceptionHelper
2359    
2360    This function is called by the raise# primitve, just so that we can
2361    move some of the tricky bits of raising an exception from C-- into
2362    C.  Who knows, it might be a useful re-useable thing here too.
2363    -------------------------------------------------------------------------- */
2364
2365 StgWord
2366 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2367 {
2368     Capability *cap = regTableToCapability(reg);
2369     StgThunk *raise_closure = NULL;
2370     StgPtr p, next;
2371     StgRetInfoTable *info;
2372     //
2373     // This closure represents the expression 'raise# E' where E
2374     // is the exception raise.  It is used to overwrite all the
2375     // thunks which are currently under evaluataion.
2376     //
2377
2378     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2379     // LDV profiling: stg_raise_info has THUNK as its closure
2380     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2381     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2382     // 1 does not cause any problem unless profiling is performed.
2383     // However, when LDV profiling goes on, we need to linearly scan
2384     // small object pool, where raise_closure is stored, so we should
2385     // use MIN_UPD_SIZE.
2386     //
2387     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2388     //                                 sizeofW(StgClosure)+1);
2389     //
2390
2391     //
2392     // Walk up the stack, looking for the catch frame.  On the way,
2393     // we update any closures pointed to from update frames with the
2394     // raise closure that we just built.
2395     //
2396     p = tso->sp;
2397     while(1) {
2398         info = get_ret_itbl((StgClosure *)p);
2399         next = p + stack_frame_sizeW((StgClosure *)p);
2400         switch (info->i.type) {
2401             
2402         case UPDATE_FRAME:
2403             // Only create raise_closure if we need to.
2404             if (raise_closure == NULL) {
2405                 raise_closure = 
2406                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2407                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2408                 raise_closure->payload[0] = exception;
2409             }
2410             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2411             p = next;
2412             continue;
2413
2414         case ATOMICALLY_FRAME:
2415             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2416             tso->sp = p;
2417             return ATOMICALLY_FRAME;
2418             
2419         case CATCH_FRAME:
2420             tso->sp = p;
2421             return CATCH_FRAME;
2422
2423         case CATCH_STM_FRAME:
2424             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2425             tso->sp = p;
2426             return CATCH_STM_FRAME;
2427             
2428         case STOP_FRAME:
2429             tso->sp = p;
2430             return STOP_FRAME;
2431
2432         case CATCH_RETRY_FRAME:
2433         default:
2434             p = next; 
2435             continue;
2436         }
2437     }
2438 }
2439
2440
2441 /* -----------------------------------------------------------------------------
2442    findRetryFrameHelper
2443
2444    This function is called by the retry# primitive.  It traverses the stack
2445    leaving tso->sp referring to the frame which should handle the retry.  
2446
2447    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2448    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2449
2450    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2451    create) because retries are not considered to be exceptions, despite the
2452    similar implementation.
2453
2454    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2455    not be created within memory transactions.
2456    -------------------------------------------------------------------------- */
2457
2458 StgWord
2459 findRetryFrameHelper (StgTSO *tso)
2460 {
2461   StgPtr           p, next;
2462   StgRetInfoTable *info;
2463
2464   p = tso -> sp;
2465   while (1) {
2466     info = get_ret_itbl((StgClosure *)p);
2467     next = p + stack_frame_sizeW((StgClosure *)p);
2468     switch (info->i.type) {
2469       
2470     case ATOMICALLY_FRAME:
2471         debugTrace(DEBUG_stm,
2472                    "found ATOMICALLY_FRAME at %p during retry", p);
2473         tso->sp = p;
2474         return ATOMICALLY_FRAME;
2475       
2476     case CATCH_RETRY_FRAME:
2477         debugTrace(DEBUG_stm,
2478                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2479         tso->sp = p;
2480         return CATCH_RETRY_FRAME;
2481       
2482     case CATCH_STM_FRAME: {
2483         StgTRecHeader *trec = tso -> trec;
2484         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2485         debugTrace(DEBUG_stm,
2486                    "found CATCH_STM_FRAME at %p during retry", p);
2487         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2488         stmAbortTransaction(tso -> cap, trec);
2489         stmFreeAbortedTRec(tso -> cap, trec);
2490         tso -> trec = outer;
2491         p = next; 
2492         continue;
2493     }
2494       
2495
2496     default:
2497       ASSERT(info->i.type != CATCH_FRAME);
2498       ASSERT(info->i.type != STOP_FRAME);
2499       p = next; 
2500       continue;
2501     }
2502   }
2503 }
2504
2505 /* -----------------------------------------------------------------------------
2506    resurrectThreads is called after garbage collection on the list of
2507    threads found to be garbage.  Each of these threads will be woken
2508    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2509    on an MVar, or NonTermination if the thread was blocked on a Black
2510    Hole.
2511
2512    Locks: assumes we hold *all* the capabilities.
2513    -------------------------------------------------------------------------- */
2514
2515 void
2516 resurrectThreads (StgTSO *threads)
2517 {
2518     StgTSO *tso, *next;
2519     Capability *cap;
2520     step *step;
2521
2522     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2523         next = tso->global_link;
2524
2525         step = Bdescr((P_)tso)->step;
2526         tso->global_link = step->threads;
2527         step->threads = tso;
2528
2529         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2530         
2531         // Wake up the thread on the Capability it was last on
2532         cap = tso->cap;
2533         
2534         switch (tso->why_blocked) {
2535         case BlockedOnMVar:
2536         case BlockedOnException:
2537             /* Called by GC - sched_mutex lock is currently held. */
2538             throwToSingleThreaded(cap, tso,
2539                                   (StgClosure *)blockedOnDeadMVar_closure);
2540             break;
2541         case BlockedOnBlackHole:
2542             throwToSingleThreaded(cap, tso,
2543                                   (StgClosure *)nonTermination_closure);
2544             break;
2545         case BlockedOnSTM:
2546             throwToSingleThreaded(cap, tso,
2547                                   (StgClosure *)blockedIndefinitely_closure);
2548             break;
2549         case NotBlocked:
2550             /* This might happen if the thread was blocked on a black hole
2551              * belonging to a thread that we've just woken up (raiseAsync
2552              * can wake up threads, remember...).
2553              */
2554             continue;
2555         default:
2556             barf("resurrectThreads: thread blocked in a strange way");
2557         }
2558     }
2559 }
2560
2561 /* -----------------------------------------------------------------------------
2562    performPendingThrowTos is called after garbage collection, and
2563    passed a list of threads that were found to have pending throwTos
2564    (tso->blocked_exceptions was not empty), and were blocked.
2565    Normally this doesn't happen, because we would deliver the
2566    exception directly if the target thread is blocked, but there are
2567    small windows where it might occur on a multiprocessor (see
2568    throwTo()).
2569
2570    NB. we must be holding all the capabilities at this point, just
2571    like resurrectThreads().
2572    -------------------------------------------------------------------------- */
2573
2574 void
2575 performPendingThrowTos (StgTSO *threads)
2576 {
2577     StgTSO *tso, *next;
2578     Capability *cap;
2579     step *step;
2580
2581     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2582         next = tso->global_link;
2583
2584         step = Bdescr((P_)tso)->step;
2585         tso->global_link = step->threads;
2586         step->threads = tso;
2587
2588         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2589         
2590         cap = tso->cap;
2591         maybePerformBlockedException(cap, tso);
2592     }
2593 }