Work stealing for sparks
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleStartSignalHandlers (Capability *cap);
141 static void scheduleCheckBlockedThreads (Capability *cap);
142 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
143 static void scheduleCheckBlackHoles (Capability *cap);
144 static void scheduleDetectDeadlock (Capability *cap, Task *task);
145 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
146 static void schedulePushWork(Capability *cap, Task *task);
147 static rtsBool scheduleGetRemoteWork(Capability *cap);
148 #if defined(PARALLEL_HASKELL)
149 static void scheduleSendPendingMessages(void);
150 #endif
151 static void scheduleActivateSpark(Capability *cap);
152 #endif
153 static void schedulePostRunThread(Capability *cap, StgTSO *t);
154 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
155 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
156                                          StgTSO *t);
157 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
158                                     nat prev_what_next );
159 static void scheduleHandleThreadBlocked( StgTSO *t );
160 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
161                                              StgTSO *t );
162 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
163 static Capability *scheduleDoGC(Capability *cap, Task *task,
164                                 rtsBool force_major);
165
166 static rtsBool checkBlackHoles(Capability *cap);
167
168 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
169 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
170
171 static void deleteThread (Capability *cap, StgTSO *tso);
172 static void deleteAllThreads (Capability *cap);
173
174 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
175 static void deleteThread_(Capability *cap, StgTSO *tso);
176 #endif
177
178 #ifdef DEBUG
179 static char *whatNext_strs[] = {
180   "(unknown)",
181   "ThreadRunGHC",
182   "ThreadInterpret",
183   "ThreadKilled",
184   "ThreadRelocated",
185   "ThreadComplete"
186 };
187 #endif
188
189 /* -----------------------------------------------------------------------------
190  * Putting a thread on the run queue: different scheduling policies
191  * -------------------------------------------------------------------------- */
192
193 STATIC_INLINE void
194 addToRunQueue( Capability *cap, StgTSO *t )
195 {
196 #if defined(PARALLEL_HASKELL)
197     if (RtsFlags.ParFlags.doFairScheduling) { 
198         // this does round-robin scheduling; good for concurrency
199         appendToRunQueue(cap,t);
200     } else {
201         // this does unfair scheduling; good for parallelism
202         pushOnRunQueue(cap,t);
203     }
204 #else
205     // this does round-robin scheduling; good for concurrency
206     appendToRunQueue(cap,t);
207 #endif
208 }
209
210 /* ---------------------------------------------------------------------------
211    Main scheduling loop.
212
213    We use round-robin scheduling, each thread returning to the
214    scheduler loop when one of these conditions is detected:
215
216       * out of heap space
217       * timer expires (thread yields)
218       * thread blocks
219       * thread ends
220       * stack overflow
221
222    GRAN version:
223      In a GranSim setup this loop iterates over the global event queue.
224      This revolves around the global event queue, which determines what 
225      to do next. Therefore, it's more complicated than either the 
226      concurrent or the parallel (GUM) setup.
227   This version has been entirely removed (JB 2008/08).
228
229    GUM version:
230      GUM iterates over incoming messages.
231      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
232      and sends out a fish whenever it has nothing to do; in-between
233      doing the actual reductions (shared code below) it processes the
234      incoming messages and deals with delayed operations 
235      (see PendingFetches).
236      This is not the ugliest code you could imagine, but it's bloody close.
237
238   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
239   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
240   as well as future GUM versions. This file has been refurbished to
241   only contain valid code, which is however incomplete, refers to
242   invalid includes etc.
243
244    ------------------------------------------------------------------------ */
245
246 static Capability *
247 schedule (Capability *initialCapability, Task *task)
248 {
249   StgTSO *t;
250   Capability *cap;
251   StgThreadReturnCode ret;
252 #if defined(PARALLEL_HASKELL)
253   rtsBool receivedFinish = rtsFalse;
254 #endif
255   nat prev_what_next;
256   rtsBool ready_to_gc;
257 #if defined(THREADED_RTS)
258   rtsBool first = rtsTrue;
259 #endif
260   
261   cap = initialCapability;
262
263   // Pre-condition: this task owns initialCapability.
264   // The sched_mutex is *NOT* held
265   // NB. on return, we still hold a capability.
266
267   debugTrace (DEBUG_sched, 
268               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
269               task, initialCapability);
270
271   schedulePreLoop();
272
273   // -----------------------------------------------------------
274   // Scheduler loop starts here:
275
276 #if defined(PARALLEL_HASKELL)
277 #define TERMINATION_CONDITION        (!receivedFinish)
278 #else
279 #define TERMINATION_CONDITION        rtsTrue
280 #endif
281
282   while (TERMINATION_CONDITION) {
283
284 #if defined(THREADED_RTS)
285       if (first) {
286           // don't yield the first time, we want a chance to run this
287           // thread for a bit, even if there are others banging at the
288           // door.
289           first = rtsFalse;
290           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
291       } else {
292           // Yield the capability to higher-priority tasks if necessary.
293           yieldCapability(&cap, task);
294           /* inside yieldCapability, attempts to steal work from other
295              capabilities, unless the capability has own work. 
296              See (REMARK) below.
297           */
298       }
299 #endif
300
301     /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
302       
303     // Check whether we have re-entered the RTS from Haskell without
304     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
305     // call).
306     if (cap->in_haskell) {
307           errorBelch("schedule: re-entered unsafely.\n"
308                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
309           stg_exit(EXIT_FAILURE);
310     }
311
312     // The interruption / shutdown sequence.
313     // 
314     // In order to cleanly shut down the runtime, we want to:
315     //   * make sure that all main threads return to their callers
316     //     with the state 'Interrupted'.
317     //   * clean up all OS threads assocated with the runtime
318     //   * free all memory etc.
319     //
320     // So the sequence for ^C goes like this:
321     //
322     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
323     //     arranges for some Capability to wake up
324     //
325     //   * all threads in the system are halted, and the zombies are
326     //     placed on the run queue for cleaning up.  We acquire all
327     //     the capabilities in order to delete the threads, this is
328     //     done by scheduleDoGC() for convenience (because GC already
329     //     needs to acquire all the capabilities).  We can't kill
330     //     threads involved in foreign calls.
331     // 
332     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
333     //
334     //   * sched_state := SCHED_SHUTTING_DOWN
335     //
336     //   * all workers exit when the run queue on their capability
337     //     drains.  All main threads will also exit when their TSO
338     //     reaches the head of the run queue and they can return.
339     //
340     //   * eventually all Capabilities will shut down, and the RTS can
341     //     exit.
342     //
343     //   * We might be left with threads blocked in foreign calls, 
344     //     we should really attempt to kill these somehow (TODO);
345     
346     switch (sched_state) {
347     case SCHED_RUNNING:
348         break;
349     case SCHED_INTERRUPTING:
350         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
351 #if defined(THREADED_RTS)
352         discardSparksCap(cap);
353 #endif
354         /* scheduleDoGC() deletes all the threads */
355         cap = scheduleDoGC(cap,task,rtsFalse);
356         break;
357     case SCHED_SHUTTING_DOWN:
358         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
359         // If we are a worker, just exit.  If we're a bound thread
360         // then we will exit below when we've removed our TSO from
361         // the run queue.
362         if (task->tso == NULL && emptyRunQueue(cap)) {
363             return cap;
364         }
365         break;
366     default:
367         barf("sched_state: %d", sched_state);
368     }
369
370     /* this was the place to activate a spark, now below... */
371
372     scheduleStartSignalHandlers(cap);
373
374     // Only check the black holes here if we've nothing else to do.
375     // During normal execution, the black hole list only gets checked
376     // at GC time, to avoid repeatedly traversing this possibly long
377     // list each time around the scheduler.
378     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
379
380     scheduleCheckWakeupThreads(cap);
381
382     scheduleCheckBlockedThreads(cap);
383
384 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
385     /* work distribution in multithreaded and parallel systems 
386
387        REMARK: IMHO best location for work-stealing as well.
388        tests above might yield some new jobs, so no need to steal a
389        spark in some cases. I believe the yieldCapability.. above
390        should be moved here.
391     */
392
393 #if defined(PARALLEL_HASKELL)
394     /* if messages have been buffered... a NOOP in THREADED_RTS */
395     scheduleSendPendingMessages();
396 #endif
397
398     /* If the run queue is empty,...*/
399     if (emptyRunQueue(cap)) {
400       /* ...take one of our own sparks and turn it into a thread */
401       scheduleActivateSpark(cap);
402
403         /* if this did not work, try to steal a spark from someone else */
404       if (emptyRunQueue(cap)) {
405 #if defined(PARALLEL_HASKELL)
406         receivedFinish = scheduleGetRemoteWork(cap);
407         continue; //  a new round, (hopefully) with new work
408         /* 
409            in GUM, this a) sends out a FISH and returns IF no fish is
410                            out already
411                         b) (blocking) awaits and receives messages
412            
413            in Eden, this is only the blocking receive, as b) in GUM.
414
415            in Threaded-RTS, this does plain nothing. Stealing routine
416                 is inside Capability.c and called from
417                 yieldCapability() at the very beginning, see REMARK.
418         */
419 #endif
420       }
421     } else { /* i.e. run queue was (initially) not empty */
422       schedulePushWork(cap,task);
423       /* work pushing, currently relevant only for THREADED_RTS:
424          (pushes threads, wakes up idle capabilities for stealing) */
425     }
426
427 #if defined(PARALLEL_HASKELL)
428     /* since we perform a blocking receive and continue otherwise,
429        either we never reach here or we definitely have work! */
430     // from here: non-empty run queue
431     ASSERT(!emptyRunQueue(cap));
432
433     if (PacketsWaiting()) {  /* now process incoming messages, if any
434                                 pending...  
435
436                                 CAUTION: scheduleGetRemoteWork called
437                                 above, waits for messages as well! */
438       processMessages(cap, &receivedFinish);
439     }
440 #endif // PARALLEL_HASKELL: non-empty run queue!
441
442 #endif /* THREADED_RTS || PARALLEL_HASKELL */
443
444     scheduleDetectDeadlock(cap,task);
445 #if defined(THREADED_RTS)
446     cap = task->cap;    // reload cap, it might have changed
447 #endif
448
449     // Normally, the only way we can get here with no threads to
450     // run is if a keyboard interrupt received during 
451     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
452     // Additionally, it is not fatal for the
453     // threaded RTS to reach here with no threads to run.
454     //
455     // win32: might be here due to awaitEvent() being abandoned
456     // as a result of a console event having been delivered.
457     if ( emptyRunQueue(cap) ) {
458 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
459         ASSERT(sched_state >= SCHED_INTERRUPTING);
460 #endif
461         continue; // nothing to do
462     }
463
464     // 
465     // Get a thread to run
466     //
467     t = popRunQueue(cap);
468
469     // Sanity check the thread we're about to run.  This can be
470     // expensive if there is lots of thread switching going on...
471     IF_DEBUG(sanity,checkTSO(t));
472
473 #if defined(THREADED_RTS)
474     // Check whether we can run this thread in the current task.
475     // If not, we have to pass our capability to the right task.
476     {
477         Task *bound = t->bound;
478       
479         if (bound) {
480             if (bound == task) {
481                 debugTrace(DEBUG_sched,
482                            "### Running thread %lu in bound thread", (unsigned long)t->id);
483                 // yes, the Haskell thread is bound to the current native thread
484             } else {
485                 debugTrace(DEBUG_sched,
486                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
487                 // no, bound to a different Haskell thread: pass to that thread
488                 pushOnRunQueue(cap,t);
489                 continue;
490             }
491         } else {
492             // The thread we want to run is unbound.
493             if (task->tso) { 
494                 debugTrace(DEBUG_sched,
495                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
496                 // no, the current native thread is bound to a different
497                 // Haskell thread, so pass it to any worker thread
498                 pushOnRunQueue(cap,t);
499                 continue; 
500             }
501         }
502     }
503 #endif
504
505     /* context switches are initiated by the timer signal, unless
506      * the user specified "context switch as often as possible", with
507      * +RTS -C0
508      */
509     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
510         && !emptyThreadQueues(cap)) {
511         cap->context_switch = 1;
512     }
513          
514 run_thread:
515
516     // CurrentTSO is the thread to run.  t might be different if we
517     // loop back to run_thread, so make sure to set CurrentTSO after
518     // that.
519     cap->r.rCurrentTSO = t;
520
521     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
522                               (long)t->id, whatNext_strs[t->what_next]);
523
524     startHeapProfTimer();
525
526     // Check for exceptions blocked on this thread
527     maybePerformBlockedException (cap, t);
528
529     // ----------------------------------------------------------------------
530     // Run the current thread 
531
532     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
533     ASSERT(t->cap == cap);
534
535     prev_what_next = t->what_next;
536
537     errno = t->saved_errno;
538 #if mingw32_HOST_OS
539     SetLastError(t->saved_winerror);
540 #endif
541
542     cap->in_haskell = rtsTrue;
543
544     dirty_TSO(cap,t);
545
546 #if defined(THREADED_RTS)
547     if (recent_activity == ACTIVITY_DONE_GC) {
548         // ACTIVITY_DONE_GC means we turned off the timer signal to
549         // conserve power (see #1623).  Re-enable it here.
550         nat prev;
551         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
552         if (prev == ACTIVITY_DONE_GC) {
553             startTimer();
554         }
555     } else {
556         recent_activity = ACTIVITY_YES;
557     }
558 #endif
559
560     switch (prev_what_next) {
561         
562     case ThreadKilled:
563     case ThreadComplete:
564         /* Thread already finished, return to scheduler. */
565         ret = ThreadFinished;
566         break;
567         
568     case ThreadRunGHC:
569     {
570         StgRegTable *r;
571         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
572         cap = regTableToCapability(r);
573         ret = r->rRet;
574         break;
575     }
576     
577     case ThreadInterpret:
578         cap = interpretBCO(cap);
579         ret = cap->r.rRet;
580         break;
581         
582     default:
583         barf("schedule: invalid what_next field");
584     }
585
586     cap->in_haskell = rtsFalse;
587
588     // The TSO might have moved, eg. if it re-entered the RTS and a GC
589     // happened.  So find the new location:
590     t = cap->r.rCurrentTSO;
591
592     // We have run some Haskell code: there might be blackhole-blocked
593     // threads to wake up now.
594     // Lock-free test here should be ok, we're just setting a flag.
595     if ( blackhole_queue != END_TSO_QUEUE ) {
596         blackholes_need_checking = rtsTrue;
597     }
598     
599     // And save the current errno in this thread.
600     // XXX: possibly bogus for SMP because this thread might already
601     // be running again, see code below.
602     t->saved_errno = errno;
603 #if mingw32_HOST_OS
604     // Similarly for Windows error code
605     t->saved_winerror = GetLastError();
606 #endif
607
608 #if defined(THREADED_RTS)
609     // If ret is ThreadBlocked, and this Task is bound to the TSO that
610     // blocked, we are in limbo - the TSO is now owned by whatever it
611     // is blocked on, and may in fact already have been woken up,
612     // perhaps even on a different Capability.  It may be the case
613     // that task->cap != cap.  We better yield this Capability
614     // immediately and return to normaility.
615     if (ret == ThreadBlocked) {
616         debugTrace(DEBUG_sched,
617                    "--<< thread %lu (%s) stopped: blocked",
618                    (unsigned long)t->id, whatNext_strs[t->what_next]);
619         continue;
620     }
621 #endif
622
623     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
624     ASSERT(t->cap == cap);
625
626     // ----------------------------------------------------------------------
627     
628     // Costs for the scheduler are assigned to CCS_SYSTEM
629     stopHeapProfTimer();
630 #if defined(PROFILING)
631     CCCS = CCS_SYSTEM;
632 #endif
633     
634     schedulePostRunThread(cap,t);
635
636     t = threadStackUnderflow(task,t);
637
638     ready_to_gc = rtsFalse;
639
640     switch (ret) {
641     case HeapOverflow:
642         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
643         break;
644
645     case StackOverflow:
646         scheduleHandleStackOverflow(cap,task,t);
647         break;
648
649     case ThreadYielding:
650         if (scheduleHandleYield(cap, t, prev_what_next)) {
651             // shortcut for switching between compiler/interpreter:
652             goto run_thread; 
653         }
654         break;
655
656     case ThreadBlocked:
657         scheduleHandleThreadBlocked(t);
658         break;
659
660     case ThreadFinished:
661         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
662         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
663         break;
664
665     default:
666       barf("schedule: invalid thread return code %d", (int)ret);
667     }
668
669     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
670       cap = scheduleDoGC(cap,task,rtsFalse);
671     }
672   } /* end of while() */
673 }
674
675 /* ----------------------------------------------------------------------------
676  * Setting up the scheduler loop
677  * ------------------------------------------------------------------------- */
678
679 static void
680 schedulePreLoop(void)
681 {
682   // initialisation for scheduler - what cannot go into initScheduler()  
683 }
684
685 /* -----------------------------------------------------------------------------
686  * schedulePushWork()
687  *
688  * Push work to other Capabilities if we have some.
689  * -------------------------------------------------------------------------- */
690
691 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
692 static void
693 schedulePushWork(Capability *cap USED_IF_THREADS, 
694                  Task *task      USED_IF_THREADS)
695 {
696   /* following code not for PARALLEL_HASKELL. I kept the call general,
697      future GUM versions might use pushing in a distributed setup */
698 #if defined(THREADED_RTS)
699
700     Capability *free_caps[n_capabilities], *cap0;
701     nat i, n_free_caps;
702
703     // migration can be turned off with +RTS -qg
704     if (!RtsFlags.ParFlags.migrate) return;
705
706     // Check whether we have more threads on our run queue, or sparks
707     // in our pool, that we could hand to another Capability.
708     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
709         && sparkPoolSizeCap(cap) < 2) {
710         return;
711     }
712
713     // First grab as many free Capabilities as we can.
714     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
715         cap0 = &capabilities[i];
716         if (cap != cap0 && tryGrabCapability(cap0,task)) {
717             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
718                 // it already has some work, we just grabbed it at 
719                 // the wrong moment.  Or maybe it's deadlocked!
720                 releaseCapability(cap0);
721             } else {
722                 free_caps[n_free_caps++] = cap0;
723             }
724         }
725     }
726
727     // we now have n_free_caps free capabilities stashed in
728     // free_caps[].  Share our run queue equally with them.  This is
729     // probably the simplest thing we could do; improvements we might
730     // want to do include:
731     //
732     //   - giving high priority to moving relatively new threads, on 
733     //     the gournds that they haven't had time to build up a
734     //     working set in the cache on this CPU/Capability.
735     //
736     //   - giving low priority to moving long-lived threads
737
738     if (n_free_caps > 0) {
739         StgTSO *prev, *t, *next;
740         rtsBool pushed_to_all;
741
742         debugTrace(DEBUG_sched, 
743                    "cap %d: %s and %d free capabilities, sharing...", 
744                    cap->no, 
745                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
746                    "excess threads on run queue":"sparks to share (>=2)",
747                    n_free_caps);
748
749         i = 0;
750         pushed_to_all = rtsFalse;
751
752         if (cap->run_queue_hd != END_TSO_QUEUE) {
753             prev = cap->run_queue_hd;
754             t = prev->_link;
755             prev->_link = END_TSO_QUEUE;
756             for (; t != END_TSO_QUEUE; t = next) {
757                 next = t->_link;
758                 t->_link = END_TSO_QUEUE;
759                 if (t->what_next == ThreadRelocated
760                     || t->bound == task // don't move my bound thread
761                     || tsoLocked(t)) {  // don't move a locked thread
762                     setTSOLink(cap, prev, t);
763                     prev = t;
764                 } else if (i == n_free_caps) {
765                     pushed_to_all = rtsTrue;
766                     i = 0;
767                     // keep one for us
768                     setTSOLink(cap, prev, t);
769                     prev = t;
770                 } else {
771                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
772                     appendToRunQueue(free_caps[i],t);
773                     if (t->bound) { t->bound->cap = free_caps[i]; }
774                     t->cap = free_caps[i];
775                     i++;
776                 }
777             }
778             cap->run_queue_tl = prev;
779         }
780
781 #ifdef SPARK_PUSHING
782         /* JB I left this code in place, it would work but is not necessary */
783
784         // If there are some free capabilities that we didn't push any
785         // threads to, then try to push a spark to each one.
786         if (!pushed_to_all) {
787             StgClosure *spark;
788             // i is the next free capability to push to
789             for (; i < n_free_caps; i++) {
790                 if (emptySparkPoolCap(free_caps[i])) {
791                     spark = findSpark(cap);
792                     if (spark != NULL) {
793                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
794                         newSpark(&(free_caps[i]->r), spark);
795                     }
796                 }
797             }
798         }
799 #endif /* SPARK_PUSHING */
800
801         // release the capabilities
802         for (i = 0; i < n_free_caps; i++) {
803             task->cap = free_caps[i];
804             releaseCapability(free_caps[i]);
805         }
806         // now wake them all up, and they might steal sparks if
807         // the did not get a thread
808         prodAllCapabilities();
809     }
810     task->cap = cap; // reset to point to our Capability.
811
812 #endif /* THREADED_RTS */
813
814 }
815 #endif /* THREADED_RTS || PARALLEL_HASKELL */
816
817 /* ----------------------------------------------------------------------------
818  * Start any pending signal handlers
819  * ------------------------------------------------------------------------- */
820
821 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
822 static void
823 scheduleStartSignalHandlers(Capability *cap)
824 {
825     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
826         // safe outside the lock
827         startSignalHandlers(cap);
828     }
829 }
830 #else
831 static void
832 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
833 {
834 }
835 #endif
836
837 /* ----------------------------------------------------------------------------
838  * Check for blocked threads that can be woken up.
839  * ------------------------------------------------------------------------- */
840
841 static void
842 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
843 {
844 #if !defined(THREADED_RTS)
845     //
846     // Check whether any waiting threads need to be woken up.  If the
847     // run queue is empty, and there are no other tasks running, we
848     // can wait indefinitely for something to happen.
849     //
850     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
851     {
852         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
853     }
854 #endif
855 }
856
857
858 /* ----------------------------------------------------------------------------
859  * Check for threads woken up by other Capabilities
860  * ------------------------------------------------------------------------- */
861
862 static void
863 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
864 {
865 #if defined(THREADED_RTS)
866     // Any threads that were woken up by other Capabilities get
867     // appended to our run queue.
868     if (!emptyWakeupQueue(cap)) {
869         ACQUIRE_LOCK(&cap->lock);
870         if (emptyRunQueue(cap)) {
871             cap->run_queue_hd = cap->wakeup_queue_hd;
872             cap->run_queue_tl = cap->wakeup_queue_tl;
873         } else {
874             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
875             cap->run_queue_tl = cap->wakeup_queue_tl;
876         }
877         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
878         RELEASE_LOCK(&cap->lock);
879     }
880 #endif
881 }
882
883 /* ----------------------------------------------------------------------------
884  * Check for threads blocked on BLACKHOLEs that can be woken up
885  * ------------------------------------------------------------------------- */
886 static void
887 scheduleCheckBlackHoles (Capability *cap)
888 {
889     if ( blackholes_need_checking ) // check without the lock first
890     {
891         ACQUIRE_LOCK(&sched_mutex);
892         if ( blackholes_need_checking ) {
893             checkBlackHoles(cap);
894             blackholes_need_checking = rtsFalse;
895         }
896         RELEASE_LOCK(&sched_mutex);
897     }
898 }
899
900 /* ----------------------------------------------------------------------------
901  * Detect deadlock conditions and attempt to resolve them.
902  * ------------------------------------------------------------------------- */
903
904 static void
905 scheduleDetectDeadlock (Capability *cap, Task *task)
906 {
907
908 #if defined(PARALLEL_HASKELL)
909     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
910     return;
911 #endif
912
913     /* 
914      * Detect deadlock: when we have no threads to run, there are no
915      * threads blocked, waiting for I/O, or sleeping, and all the
916      * other tasks are waiting for work, we must have a deadlock of
917      * some description.
918      */
919     if ( emptyThreadQueues(cap) )
920     {
921 #if defined(THREADED_RTS)
922         /* 
923          * In the threaded RTS, we only check for deadlock if there
924          * has been no activity in a complete timeslice.  This means
925          * we won't eagerly start a full GC just because we don't have
926          * any threads to run currently.
927          */
928         if (recent_activity != ACTIVITY_INACTIVE) return;
929 #endif
930
931         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
932
933         // Garbage collection can release some new threads due to
934         // either (a) finalizers or (b) threads resurrected because
935         // they are unreachable and will therefore be sent an
936         // exception.  Any threads thus released will be immediately
937         // runnable.
938         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
939
940         recent_activity = ACTIVITY_DONE_GC;
941         // disable timer signals (see #1623)
942         stopTimer();
943         
944         if ( !emptyRunQueue(cap) ) return;
945
946 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
947         /* If we have user-installed signal handlers, then wait
948          * for signals to arrive rather then bombing out with a
949          * deadlock.
950          */
951         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
952             debugTrace(DEBUG_sched,
953                        "still deadlocked, waiting for signals...");
954
955             awaitUserSignals();
956
957             if (signals_pending()) {
958                 startSignalHandlers(cap);
959             }
960
961             // either we have threads to run, or we were interrupted:
962             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
963
964             return;
965         }
966 #endif
967
968 #if !defined(THREADED_RTS)
969         /* Probably a real deadlock.  Send the current main thread the
970          * Deadlock exception.
971          */
972         if (task->tso) {
973             switch (task->tso->why_blocked) {
974             case BlockedOnSTM:
975             case BlockedOnBlackHole:
976             case BlockedOnException:
977             case BlockedOnMVar:
978                 throwToSingleThreaded(cap, task->tso, 
979                                       (StgClosure *)nonTermination_closure);
980                 return;
981             default:
982                 barf("deadlock: main thread blocked in a strange way");
983             }
984         }
985         return;
986 #endif
987     }
988 }
989
990
991 /* ----------------------------------------------------------------------------
992  * Send pending messages (PARALLEL_HASKELL only)
993  * ------------------------------------------------------------------------- */
994
995 #if defined(PARALLEL_HASKELL)
996 static void
997 scheduleSendPendingMessages(void)
998 {
999
1000 # if defined(PAR) // global Mem.Mgmt., omit for now
1001     if (PendingFetches != END_BF_QUEUE) {
1002         processFetches();
1003     }
1004 # endif
1005     
1006     if (RtsFlags.ParFlags.BufferTime) {
1007         // if we use message buffering, we must send away all message
1008         // packets which have become too old...
1009         sendOldBuffers(); 
1010     }
1011 }
1012 #endif
1013
1014 /* ----------------------------------------------------------------------------
1015  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1016  * ------------------------------------------------------------------------- */
1017
1018 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1019 static void
1020 scheduleActivateSpark(Capability *cap)
1021 {
1022     StgClosure *spark;
1023
1024 /* We only want to stay here if the run queue is empty and we want some
1025    work. We try to turn a spark into a thread, and add it to the run
1026    queue, from where it will be picked up in the next iteration of the
1027    scheduler loop.  
1028 */
1029     if (!emptyRunQueue(cap)) 
1030       /* In the threaded RTS, another task might have pushed a thread
1031          on our run queue in the meantime ? But would need a lock.. */
1032       return;
1033
1034     spark = findSpark(cap); // defined in Sparks.c
1035
1036     if (spark != NULL) {
1037       debugTrace(DEBUG_sched,
1038                  "turning spark of closure %p into a thread",
1039                  (StgClosure *)spark);
1040       createSparkThread(cap,spark); // defined in Sparks.c
1041     }
1042 }
1043 #endif // PARALLEL_HASKELL || THREADED_RTS
1044
1045 /* ----------------------------------------------------------------------------
1046  * Get work from a remote node (PARALLEL_HASKELL only)
1047  * ------------------------------------------------------------------------- */
1048     
1049 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1050 static rtsBool /* return value used in PARALLEL_HASKELL only */
1051 scheduleGetRemoteWork(Capability *cap)
1052 {
1053 #if defined(PARALLEL_HASKELL)
1054   rtsBool receivedFinish = rtsFalse;
1055
1056   // idle() , i.e. send all buffers, wait for work
1057   if (RtsFlags.ParFlags.BufferTime) {
1058         IF_PAR_DEBUG(verbose, 
1059                 debugBelch("...send all pending data,"));
1060         {
1061           nat i;
1062           for (i=1; i<=nPEs; i++)
1063             sendImmediately(i); // send all messages away immediately
1064         }
1065   }
1066
1067   /* this would be the place for fishing in GUM... 
1068
1069      if (no-earlier-fish-around) 
1070           sendFish(choosePe());
1071    */
1072
1073   // Eden:just look for incoming messages (blocking receive)
1074   IF_PAR_DEBUG(verbose, 
1075                debugBelch("...wait for incoming messages...\n"));
1076   processMessages(cap, &receivedFinish); // blocking receive...
1077
1078
1079   return receivedFinish;
1080   // reenter scheduling look after having received something
1081
1082 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1083
1084   return rtsFalse; /* return value unused in THREADED_RTS */
1085
1086 #endif /* PARALLEL_HASKELL */
1087 }
1088 #endif // PARALLEL_HASKELL || THREADED_RTS
1089
1090 /* ----------------------------------------------------------------------------
1091  * After running a thread...
1092  * ------------------------------------------------------------------------- */
1093
1094 static void
1095 schedulePostRunThread (Capability *cap, StgTSO *t)
1096 {
1097     // We have to be able to catch transactions that are in an
1098     // infinite loop as a result of seeing an inconsistent view of
1099     // memory, e.g. 
1100     //
1101     //   atomically $ do
1102     //       [a,b] <- mapM readTVar [ta,tb]
1103     //       when (a == b) loop
1104     //
1105     // and a is never equal to b given a consistent view of memory.
1106     //
1107     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1108         if (!stmValidateNestOfTransactions (t -> trec)) {
1109             debugTrace(DEBUG_sched | DEBUG_stm,
1110                        "trec %p found wasting its time", t);
1111             
1112             // strip the stack back to the
1113             // ATOMICALLY_FRAME, aborting the (nested)
1114             // transaction, and saving the stack of any
1115             // partially-evaluated thunks on the heap.
1116             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1117             
1118             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1119         }
1120     }
1121
1122   /* some statistics gathering in the parallel case */
1123 }
1124
1125 /* -----------------------------------------------------------------------------
1126  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1127  * -------------------------------------------------------------------------- */
1128
1129 static rtsBool
1130 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1131 {
1132     // did the task ask for a large block?
1133     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1134         // if so, get one and push it on the front of the nursery.
1135         bdescr *bd;
1136         lnat blocks;
1137         
1138         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1139         
1140         debugTrace(DEBUG_sched,
1141                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1142                    (long)t->id, whatNext_strs[t->what_next], blocks);
1143     
1144         // don't do this if the nursery is (nearly) full, we'll GC first.
1145         if (cap->r.rCurrentNursery->link != NULL ||
1146             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1147                                                // if the nursery has only one block.
1148             
1149             ACQUIRE_SM_LOCK
1150             bd = allocGroup( blocks );
1151             RELEASE_SM_LOCK
1152             cap->r.rNursery->n_blocks += blocks;
1153             
1154             // link the new group into the list
1155             bd->link = cap->r.rCurrentNursery;
1156             bd->u.back = cap->r.rCurrentNursery->u.back;
1157             if (cap->r.rCurrentNursery->u.back != NULL) {
1158                 cap->r.rCurrentNursery->u.back->link = bd;
1159             } else {
1160 #if !defined(THREADED_RTS)
1161                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1162                        g0s0 == cap->r.rNursery);
1163 #endif
1164                 cap->r.rNursery->blocks = bd;
1165             }             
1166             cap->r.rCurrentNursery->u.back = bd;
1167             
1168             // initialise it as a nursery block.  We initialise the
1169             // step, gen_no, and flags field of *every* sub-block in
1170             // this large block, because this is easier than making
1171             // sure that we always find the block head of a large
1172             // block whenever we call Bdescr() (eg. evacuate() and
1173             // isAlive() in the GC would both have to do this, at
1174             // least).
1175             { 
1176                 bdescr *x;
1177                 for (x = bd; x < bd + blocks; x++) {
1178                     x->step = cap->r.rNursery;
1179                     x->gen_no = 0;
1180                     x->flags = 0;
1181                 }
1182             }
1183             
1184             // This assert can be a killer if the app is doing lots
1185             // of large block allocations.
1186             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1187             
1188             // now update the nursery to point to the new block
1189             cap->r.rCurrentNursery = bd;
1190             
1191             // we might be unlucky and have another thread get on the
1192             // run queue before us and steal the large block, but in that
1193             // case the thread will just end up requesting another large
1194             // block.
1195             pushOnRunQueue(cap,t);
1196             return rtsFalse;  /* not actually GC'ing */
1197         }
1198     }
1199     
1200     debugTrace(DEBUG_sched,
1201                "--<< thread %ld (%s) stopped: HeapOverflow",
1202                (long)t->id, whatNext_strs[t->what_next]);
1203
1204     if (cap->context_switch) {
1205         // Sometimes we miss a context switch, e.g. when calling
1206         // primitives in a tight loop, MAYBE_GC() doesn't check the
1207         // context switch flag, and we end up waiting for a GC.
1208         // See #1984, and concurrent/should_run/1984
1209         cap->context_switch = 0;
1210         addToRunQueue(cap,t);
1211     } else {
1212         pushOnRunQueue(cap,t);
1213     }
1214     return rtsTrue;
1215     /* actual GC is done at the end of the while loop in schedule() */
1216 }
1217
1218 /* -----------------------------------------------------------------------------
1219  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1220  * -------------------------------------------------------------------------- */
1221
1222 static void
1223 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1224 {
1225     debugTrace (DEBUG_sched,
1226                 "--<< thread %ld (%s) stopped, StackOverflow", 
1227                 (long)t->id, whatNext_strs[t->what_next]);
1228
1229     /* just adjust the stack for this thread, then pop it back
1230      * on the run queue.
1231      */
1232     { 
1233         /* enlarge the stack */
1234         StgTSO *new_t = threadStackOverflow(cap, t);
1235         
1236         /* The TSO attached to this Task may have moved, so update the
1237          * pointer to it.
1238          */
1239         if (task->tso == t) {
1240             task->tso = new_t;
1241         }
1242         pushOnRunQueue(cap,new_t);
1243     }
1244 }
1245
1246 /* -----------------------------------------------------------------------------
1247  * Handle a thread that returned to the scheduler with ThreadYielding
1248  * -------------------------------------------------------------------------- */
1249
1250 static rtsBool
1251 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1252 {
1253     // Reset the context switch flag.  We don't do this just before
1254     // running the thread, because that would mean we would lose ticks
1255     // during GC, which can lead to unfair scheduling (a thread hogs
1256     // the CPU because the tick always arrives during GC).  This way
1257     // penalises threads that do a lot of allocation, but that seems
1258     // better than the alternative.
1259     cap->context_switch = 0;
1260     
1261     /* put the thread back on the run queue.  Then, if we're ready to
1262      * GC, check whether this is the last task to stop.  If so, wake
1263      * up the GC thread.  getThread will block during a GC until the
1264      * GC is finished.
1265      */
1266 #ifdef DEBUG
1267     if (t->what_next != prev_what_next) {
1268         debugTrace(DEBUG_sched,
1269                    "--<< thread %ld (%s) stopped to switch evaluators", 
1270                    (long)t->id, whatNext_strs[t->what_next]);
1271     } else {
1272         debugTrace(DEBUG_sched,
1273                    "--<< thread %ld (%s) stopped, yielding",
1274                    (long)t->id, whatNext_strs[t->what_next]);
1275     }
1276 #endif
1277     
1278     IF_DEBUG(sanity,
1279              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1280              checkTSO(t));
1281     ASSERT(t->_link == END_TSO_QUEUE);
1282     
1283     // Shortcut if we're just switching evaluators: don't bother
1284     // doing stack squeezing (which can be expensive), just run the
1285     // thread.
1286     if (t->what_next != prev_what_next) {
1287         return rtsTrue;
1288     }
1289
1290     addToRunQueue(cap,t);
1291
1292     return rtsFalse;
1293 }
1294
1295 /* -----------------------------------------------------------------------------
1296  * Handle a thread that returned to the scheduler with ThreadBlocked
1297  * -------------------------------------------------------------------------- */
1298
1299 static void
1300 scheduleHandleThreadBlocked( StgTSO *t
1301 #if !defined(GRAN) && !defined(DEBUG)
1302     STG_UNUSED
1303 #endif
1304     )
1305 {
1306
1307       // We don't need to do anything.  The thread is blocked, and it
1308       // has tidied up its stack and placed itself on whatever queue
1309       // it needs to be on.
1310
1311     // ASSERT(t->why_blocked != NotBlocked);
1312     // Not true: for example,
1313     //    - in THREADED_RTS, the thread may already have been woken
1314     //      up by another Capability.  This actually happens: try
1315     //      conc023 +RTS -N2.
1316     //    - the thread may have woken itself up already, because
1317     //      threadPaused() might have raised a blocked throwTo
1318     //      exception, see maybePerformBlockedException().
1319
1320 #ifdef DEBUG
1321     if (traceClass(DEBUG_sched)) {
1322         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1323                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1324         printThreadBlockage(t);
1325         debugTraceEnd();
1326     }
1327 #endif
1328 }
1329
1330 /* -----------------------------------------------------------------------------
1331  * Handle a thread that returned to the scheduler with ThreadFinished
1332  * -------------------------------------------------------------------------- */
1333
1334 static rtsBool
1335 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1336 {
1337     /* Need to check whether this was a main thread, and if so,
1338      * return with the return value.
1339      *
1340      * We also end up here if the thread kills itself with an
1341      * uncaught exception, see Exception.cmm.
1342      */
1343     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1344                (unsigned long)t->id, whatNext_strs[t->what_next]);
1345
1346       //
1347       // Check whether the thread that just completed was a bound
1348       // thread, and if so return with the result.  
1349       //
1350       // There is an assumption here that all thread completion goes
1351       // through this point; we need to make sure that if a thread
1352       // ends up in the ThreadKilled state, that it stays on the run
1353       // queue so it can be dealt with here.
1354       //
1355
1356       if (t->bound) {
1357
1358           if (t->bound != task) {
1359 #if !defined(THREADED_RTS)
1360               // Must be a bound thread that is not the topmost one.  Leave
1361               // it on the run queue until the stack has unwound to the
1362               // point where we can deal with this.  Leaving it on the run
1363               // queue also ensures that the garbage collector knows about
1364               // this thread and its return value (it gets dropped from the
1365               // step->threads list so there's no other way to find it).
1366               appendToRunQueue(cap,t);
1367               return rtsFalse;
1368 #else
1369               // this cannot happen in the threaded RTS, because a
1370               // bound thread can only be run by the appropriate Task.
1371               barf("finished bound thread that isn't mine");
1372 #endif
1373           }
1374
1375           ASSERT(task->tso == t);
1376
1377           if (t->what_next == ThreadComplete) {
1378               if (task->ret) {
1379                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1380                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1381               }
1382               task->stat = Success;
1383           } else {
1384               if (task->ret) {
1385                   *(task->ret) = NULL;
1386               }
1387               if (sched_state >= SCHED_INTERRUPTING) {
1388                   task->stat = Interrupted;
1389               } else {
1390                   task->stat = Killed;
1391               }
1392           }
1393 #ifdef DEBUG
1394           removeThreadLabel((StgWord)task->tso->id);
1395 #endif
1396           return rtsTrue; // tells schedule() to return
1397       }
1398
1399       return rtsFalse;
1400 }
1401
1402 /* -----------------------------------------------------------------------------
1403  * Perform a heap census
1404  * -------------------------------------------------------------------------- */
1405
1406 static rtsBool
1407 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1408 {
1409     // When we have +RTS -i0 and we're heap profiling, do a census at
1410     // every GC.  This lets us get repeatable runs for debugging.
1411     if (performHeapProfile ||
1412         (RtsFlags.ProfFlags.profileInterval==0 &&
1413          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1414         return rtsTrue;
1415     } else {
1416         return rtsFalse;
1417     }
1418 }
1419
1420 /* -----------------------------------------------------------------------------
1421  * Perform a garbage collection if necessary
1422  * -------------------------------------------------------------------------- */
1423
1424 static Capability *
1425 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1426 {
1427     rtsBool heap_census;
1428 #ifdef THREADED_RTS
1429     /* extern static volatile StgWord waiting_for_gc; 
1430        lives inside capability.c */
1431     rtsBool was_waiting;
1432     nat i;
1433 #endif
1434
1435 #ifdef THREADED_RTS
1436     // In order to GC, there must be no threads running Haskell code.
1437     // Therefore, the GC thread needs to hold *all* the capabilities,
1438     // and release them after the GC has completed.  
1439     //
1440     // This seems to be the simplest way: previous attempts involved
1441     // making all the threads with capabilities give up their
1442     // capabilities and sleep except for the *last* one, which
1443     // actually did the GC.  But it's quite hard to arrange for all
1444     // the other tasks to sleep and stay asleep.
1445     //
1446         
1447     /*  Other capabilities are prevented from running yet more Haskell
1448         threads if waiting_for_gc is set. Tested inside
1449         yieldCapability() and releaseCapability() in Capability.c */
1450
1451     was_waiting = cas(&waiting_for_gc, 0, 1);
1452     if (was_waiting) {
1453         do {
1454             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1455             if (cap) yieldCapability(&cap,task);
1456         } while (waiting_for_gc);
1457         return cap;  // NOTE: task->cap might have changed here
1458     }
1459
1460     setContextSwitches();
1461     for (i=0; i < n_capabilities; i++) {
1462         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1463         if (cap != &capabilities[i]) {
1464             Capability *pcap = &capabilities[i];
1465             // we better hope this task doesn't get migrated to
1466             // another Capability while we're waiting for this one.
1467             // It won't, because load balancing happens while we have
1468             // all the Capabilities, but even so it's a slightly
1469             // unsavoury invariant.
1470             task->cap = pcap;
1471             waitForReturnCapability(&pcap, task);
1472             if (pcap != &capabilities[i]) {
1473                 barf("scheduleDoGC: got the wrong capability");
1474             }
1475         }
1476     }
1477
1478     waiting_for_gc = rtsFalse;
1479 #endif
1480
1481     // so this happens periodically:
1482     if (cap) scheduleCheckBlackHoles(cap);
1483     
1484     IF_DEBUG(scheduler, printAllThreads());
1485
1486     /*
1487      * We now have all the capabilities; if we're in an interrupting
1488      * state, then we should take the opportunity to delete all the
1489      * threads in the system.
1490      */
1491     if (sched_state >= SCHED_INTERRUPTING) {
1492         deleteAllThreads(&capabilities[0]);
1493         sched_state = SCHED_SHUTTING_DOWN;
1494     }
1495     
1496     heap_census = scheduleNeedHeapProfile(rtsTrue);
1497
1498     /* everybody back, start the GC.
1499      * Could do it in this thread, or signal a condition var
1500      * to do it in another thread.  Either way, we need to
1501      * broadcast on gc_pending_cond afterward.
1502      */
1503 #if defined(THREADED_RTS)
1504     debugTrace(DEBUG_sched, "doing GC");
1505 #endif
1506     GarbageCollect(force_major || heap_census);
1507     
1508     if (heap_census) {
1509         debugTrace(DEBUG_sched, "performing heap census");
1510         heapCensus();
1511         performHeapProfile = rtsFalse;
1512     }
1513
1514 #ifdef SPARKBALANCE
1515     /* JB 
1516        Once we are all together... this would be the place to balance all
1517        spark pools. No concurrent stealing or adding of new sparks can
1518        occur. Should be defined in Sparks.c. */
1519     balanceSparkPoolsCaps(n_capabilities, capabilities);
1520 #endif
1521
1522 #if defined(THREADED_RTS)
1523     // release our stash of capabilities.
1524     for (i = 0; i < n_capabilities; i++) {
1525         if (cap != &capabilities[i]) {
1526             task->cap = &capabilities[i];
1527             releaseCapability(&capabilities[i]);
1528         }
1529     }
1530     if (cap) {
1531         task->cap = cap;
1532     } else {
1533         task->cap = NULL;
1534     }
1535 #endif
1536
1537     return cap;
1538 }
1539
1540 /* ---------------------------------------------------------------------------
1541  * Singleton fork(). Do not copy any running threads.
1542  * ------------------------------------------------------------------------- */
1543
1544 pid_t
1545 forkProcess(HsStablePtr *entry
1546 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1547             STG_UNUSED
1548 #endif
1549            )
1550 {
1551 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1552     Task *task;
1553     pid_t pid;
1554     StgTSO* t,*next;
1555     Capability *cap;
1556     nat s;
1557     
1558 #if defined(THREADED_RTS)
1559     if (RtsFlags.ParFlags.nNodes > 1) {
1560         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1561         stg_exit(EXIT_FAILURE);
1562     }
1563 #endif
1564
1565     debugTrace(DEBUG_sched, "forking!");
1566     
1567     // ToDo: for SMP, we should probably acquire *all* the capabilities
1568     cap = rts_lock();
1569     
1570     // no funny business: hold locks while we fork, otherwise if some
1571     // other thread is holding a lock when the fork happens, the data
1572     // structure protected by the lock will forever be in an
1573     // inconsistent state in the child.  See also #1391.
1574     ACQUIRE_LOCK(&sched_mutex);
1575     ACQUIRE_LOCK(&cap->lock);
1576     ACQUIRE_LOCK(&cap->running_task->lock);
1577
1578     pid = fork();
1579     
1580     if (pid) { // parent
1581         
1582         RELEASE_LOCK(&sched_mutex);
1583         RELEASE_LOCK(&cap->lock);
1584         RELEASE_LOCK(&cap->running_task->lock);
1585
1586         // just return the pid
1587         rts_unlock(cap);
1588         return pid;
1589         
1590     } else { // child
1591         
1592 #if defined(THREADED_RTS)
1593         initMutex(&sched_mutex);
1594         initMutex(&cap->lock);
1595         initMutex(&cap->running_task->lock);
1596 #endif
1597
1598         // Now, all OS threads except the thread that forked are
1599         // stopped.  We need to stop all Haskell threads, including
1600         // those involved in foreign calls.  Also we need to delete
1601         // all Tasks, because they correspond to OS threads that are
1602         // now gone.
1603
1604         for (s = 0; s < total_steps; s++) {
1605           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1606             if (t->what_next == ThreadRelocated) {
1607                 next = t->_link;
1608             } else {
1609                 next = t->global_link;
1610                 // don't allow threads to catch the ThreadKilled
1611                 // exception, but we do want to raiseAsync() because these
1612                 // threads may be evaluating thunks that we need later.
1613                 deleteThread_(cap,t);
1614             }
1615           }
1616         }
1617         
1618         // Empty the run queue.  It seems tempting to let all the
1619         // killed threads stay on the run queue as zombies to be
1620         // cleaned up later, but some of them correspond to bound
1621         // threads for which the corresponding Task does not exist.
1622         cap->run_queue_hd = END_TSO_QUEUE;
1623         cap->run_queue_tl = END_TSO_QUEUE;
1624
1625         // Any suspended C-calling Tasks are no more, their OS threads
1626         // don't exist now:
1627         cap->suspended_ccalling_tasks = NULL;
1628
1629         // Empty the threads lists.  Otherwise, the garbage
1630         // collector may attempt to resurrect some of these threads.
1631         for (s = 0; s < total_steps; s++) {
1632             all_steps[s].threads = END_TSO_QUEUE;
1633         }
1634
1635         // Wipe the task list, except the current Task.
1636         ACQUIRE_LOCK(&sched_mutex);
1637         for (task = all_tasks; task != NULL; task=task->all_link) {
1638             if (task != cap->running_task) {
1639 #if defined(THREADED_RTS)
1640                 initMutex(&task->lock); // see #1391
1641 #endif
1642                 discardTask(task);
1643             }
1644         }
1645         RELEASE_LOCK(&sched_mutex);
1646
1647 #if defined(THREADED_RTS)
1648         // Wipe our spare workers list, they no longer exist.  New
1649         // workers will be created if necessary.
1650         cap->spare_workers = NULL;
1651         cap->returning_tasks_hd = NULL;
1652         cap->returning_tasks_tl = NULL;
1653 #endif
1654
1655         // On Unix, all timers are reset in the child, so we need to start
1656         // the timer again.
1657         initTimer();
1658         startTimer();
1659
1660         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1661         rts_checkSchedStatus("forkProcess",cap);
1662         
1663         rts_unlock(cap);
1664         hs_exit();                      // clean up and exit
1665         stg_exit(EXIT_SUCCESS);
1666     }
1667 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1668     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1669     return -1;
1670 #endif
1671 }
1672
1673 /* ---------------------------------------------------------------------------
1674  * Delete all the threads in the system
1675  * ------------------------------------------------------------------------- */
1676    
1677 static void
1678 deleteAllThreads ( Capability *cap )
1679 {
1680     // NOTE: only safe to call if we own all capabilities.
1681
1682     StgTSO* t, *next;
1683     nat s;
1684
1685     debugTrace(DEBUG_sched,"deleting all threads");
1686     for (s = 0; s < total_steps; s++) {
1687       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1688         if (t->what_next == ThreadRelocated) {
1689             next = t->_link;
1690         } else {
1691             next = t->global_link;
1692             deleteThread(cap,t);
1693         }
1694       }
1695     }      
1696
1697     // The run queue now contains a bunch of ThreadKilled threads.  We
1698     // must not throw these away: the main thread(s) will be in there
1699     // somewhere, and the main scheduler loop has to deal with it.
1700     // Also, the run queue is the only thing keeping these threads from
1701     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1702
1703 #if !defined(THREADED_RTS)
1704     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1705     ASSERT(sleeping_queue == END_TSO_QUEUE);
1706 #endif
1707 }
1708
1709 /* -----------------------------------------------------------------------------
1710    Managing the suspended_ccalling_tasks list.
1711    Locks required: sched_mutex
1712    -------------------------------------------------------------------------- */
1713
1714 STATIC_INLINE void
1715 suspendTask (Capability *cap, Task *task)
1716 {
1717     ASSERT(task->next == NULL && task->prev == NULL);
1718     task->next = cap->suspended_ccalling_tasks;
1719     task->prev = NULL;
1720     if (cap->suspended_ccalling_tasks) {
1721         cap->suspended_ccalling_tasks->prev = task;
1722     }
1723     cap->suspended_ccalling_tasks = task;
1724 }
1725
1726 STATIC_INLINE void
1727 recoverSuspendedTask (Capability *cap, Task *task)
1728 {
1729     if (task->prev) {
1730         task->prev->next = task->next;
1731     } else {
1732         ASSERT(cap->suspended_ccalling_tasks == task);
1733         cap->suspended_ccalling_tasks = task->next;
1734     }
1735     if (task->next) {
1736         task->next->prev = task->prev;
1737     }
1738     task->next = task->prev = NULL;
1739 }
1740
1741 /* ---------------------------------------------------------------------------
1742  * Suspending & resuming Haskell threads.
1743  * 
1744  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1745  * its capability before calling the C function.  This allows another
1746  * task to pick up the capability and carry on running Haskell
1747  * threads.  It also means that if the C call blocks, it won't lock
1748  * the whole system.
1749  *
1750  * The Haskell thread making the C call is put to sleep for the
1751  * duration of the call, on the susepended_ccalling_threads queue.  We
1752  * give out a token to the task, which it can use to resume the thread
1753  * on return from the C function.
1754  * ------------------------------------------------------------------------- */
1755    
1756 void *
1757 suspendThread (StgRegTable *reg)
1758 {
1759   Capability *cap;
1760   int saved_errno;
1761   StgTSO *tso;
1762   Task *task;
1763 #if mingw32_HOST_OS
1764   StgWord32 saved_winerror;
1765 #endif
1766
1767   saved_errno = errno;
1768 #if mingw32_HOST_OS
1769   saved_winerror = GetLastError();
1770 #endif
1771
1772   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1773    */
1774   cap = regTableToCapability(reg);
1775
1776   task = cap->running_task;
1777   tso = cap->r.rCurrentTSO;
1778
1779   debugTrace(DEBUG_sched, 
1780              "thread %lu did a safe foreign call", 
1781              (unsigned long)cap->r.rCurrentTSO->id);
1782
1783   // XXX this might not be necessary --SDM
1784   tso->what_next = ThreadRunGHC;
1785
1786   threadPaused(cap,tso);
1787
1788   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1789       tso->why_blocked = BlockedOnCCall;
1790       tso->flags |= TSO_BLOCKEX;
1791       tso->flags &= ~TSO_INTERRUPTIBLE;
1792   } else {
1793       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1794   }
1795
1796   // Hand back capability
1797   task->suspended_tso = tso;
1798
1799   ACQUIRE_LOCK(&cap->lock);
1800
1801   suspendTask(cap,task);
1802   cap->in_haskell = rtsFalse;
1803   releaseCapability_(cap);
1804   
1805   RELEASE_LOCK(&cap->lock);
1806
1807 #if defined(THREADED_RTS)
1808   /* Preparing to leave the RTS, so ensure there's a native thread/task
1809      waiting to take over.
1810   */
1811   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1812 #endif
1813
1814   errno = saved_errno;
1815 #if mingw32_HOST_OS
1816   SetLastError(saved_winerror);
1817 #endif
1818   return task;
1819 }
1820
1821 StgRegTable *
1822 resumeThread (void *task_)
1823 {
1824     StgTSO *tso;
1825     Capability *cap;
1826     Task *task = task_;
1827     int saved_errno;
1828 #if mingw32_HOST_OS
1829     StgWord32 saved_winerror;
1830 #endif
1831
1832     saved_errno = errno;
1833 #if mingw32_HOST_OS
1834     saved_winerror = GetLastError();
1835 #endif
1836
1837     cap = task->cap;
1838     // Wait for permission to re-enter the RTS with the result.
1839     waitForReturnCapability(&cap,task);
1840     // we might be on a different capability now... but if so, our
1841     // entry on the suspended_ccalling_tasks list will also have been
1842     // migrated.
1843
1844     // Remove the thread from the suspended list
1845     recoverSuspendedTask(cap,task);
1846
1847     tso = task->suspended_tso;
1848     task->suspended_tso = NULL;
1849     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1850     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1851     
1852     if (tso->why_blocked == BlockedOnCCall) {
1853         awakenBlockedExceptionQueue(cap,tso);
1854         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1855     }
1856     
1857     /* Reset blocking status */
1858     tso->why_blocked  = NotBlocked;
1859     
1860     cap->r.rCurrentTSO = tso;
1861     cap->in_haskell = rtsTrue;
1862     errno = saved_errno;
1863 #if mingw32_HOST_OS
1864     SetLastError(saved_winerror);
1865 #endif
1866
1867     /* We might have GC'd, mark the TSO dirty again */
1868     dirty_TSO(cap,tso);
1869
1870     IF_DEBUG(sanity, checkTSO(tso));
1871
1872     return &cap->r;
1873 }
1874
1875 /* ---------------------------------------------------------------------------
1876  * scheduleThread()
1877  *
1878  * scheduleThread puts a thread on the end  of the runnable queue.
1879  * This will usually be done immediately after a thread is created.
1880  * The caller of scheduleThread must create the thread using e.g.
1881  * createThread and push an appropriate closure
1882  * on this thread's stack before the scheduler is invoked.
1883  * ------------------------------------------------------------------------ */
1884
1885 void
1886 scheduleThread(Capability *cap, StgTSO *tso)
1887 {
1888     // The thread goes at the *end* of the run-queue, to avoid possible
1889     // starvation of any threads already on the queue.
1890     appendToRunQueue(cap,tso);
1891 }
1892
1893 void
1894 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1895 {
1896 #if defined(THREADED_RTS)
1897     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1898                               // move this thread from now on.
1899     cpu %= RtsFlags.ParFlags.nNodes;
1900     if (cpu == cap->no) {
1901         appendToRunQueue(cap,tso);
1902     } else {
1903         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1904     }
1905 #else
1906     appendToRunQueue(cap,tso);
1907 #endif
1908 }
1909
1910 Capability *
1911 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1912 {
1913     Task *task;
1914
1915     // We already created/initialised the Task
1916     task = cap->running_task;
1917
1918     // This TSO is now a bound thread; make the Task and TSO
1919     // point to each other.
1920     tso->bound = task;
1921     tso->cap = cap;
1922
1923     task->tso = tso;
1924     task->ret = ret;
1925     task->stat = NoStatus;
1926
1927     appendToRunQueue(cap,tso);
1928
1929     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1930
1931     cap = schedule(cap,task);
1932
1933     ASSERT(task->stat != NoStatus);
1934     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1935
1936     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1937     return cap;
1938 }
1939
1940 /* ----------------------------------------------------------------------------
1941  * Starting Tasks
1942  * ------------------------------------------------------------------------- */
1943
1944 #if defined(THREADED_RTS)
1945 void OSThreadProcAttr
1946 workerStart(Task *task)
1947 {
1948     Capability *cap;
1949
1950     // See startWorkerTask().
1951     ACQUIRE_LOCK(&task->lock);
1952     cap = task->cap;
1953     RELEASE_LOCK(&task->lock);
1954
1955     // set the thread-local pointer to the Task:
1956     taskEnter(task);
1957
1958     // schedule() runs without a lock.
1959     cap = schedule(cap,task);
1960
1961     // On exit from schedule(), we have a Capability.
1962     releaseCapability(cap);
1963     workerTaskStop(task);
1964 }
1965 #endif
1966
1967 /* ---------------------------------------------------------------------------
1968  * initScheduler()
1969  *
1970  * Initialise the scheduler.  This resets all the queues - if the
1971  * queues contained any threads, they'll be garbage collected at the
1972  * next pass.
1973  *
1974  * ------------------------------------------------------------------------ */
1975
1976 void 
1977 initScheduler(void)
1978 {
1979 #if !defined(THREADED_RTS)
1980   blocked_queue_hd  = END_TSO_QUEUE;
1981   blocked_queue_tl  = END_TSO_QUEUE;
1982   sleeping_queue    = END_TSO_QUEUE;
1983 #endif
1984
1985   blackhole_queue   = END_TSO_QUEUE;
1986
1987   sched_state    = SCHED_RUNNING;
1988   recent_activity = ACTIVITY_YES;
1989
1990 #if defined(THREADED_RTS)
1991   /* Initialise the mutex and condition variables used by
1992    * the scheduler. */
1993   initMutex(&sched_mutex);
1994 #endif
1995   
1996   ACQUIRE_LOCK(&sched_mutex);
1997
1998   /* A capability holds the state a native thread needs in
1999    * order to execute STG code. At least one capability is
2000    * floating around (only THREADED_RTS builds have more than one).
2001    */
2002   initCapabilities();
2003
2004   initTaskManager();
2005
2006 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2007   initSparkPools();
2008 #endif
2009
2010 #if defined(THREADED_RTS)
2011   /*
2012    * Eagerly start one worker to run each Capability, except for
2013    * Capability 0.  The idea is that we're probably going to start a
2014    * bound thread on Capability 0 pretty soon, so we don't want a
2015    * worker task hogging it.
2016    */
2017   { 
2018       nat i;
2019       Capability *cap;
2020       for (i = 1; i < n_capabilities; i++) {
2021           cap = &capabilities[i];
2022           ACQUIRE_LOCK(&cap->lock);
2023           startWorkerTask(cap, workerStart);
2024           RELEASE_LOCK(&cap->lock);
2025       }
2026   }
2027 #endif
2028
2029   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2030
2031   RELEASE_LOCK(&sched_mutex);
2032 }
2033
2034 void
2035 exitScheduler(
2036     rtsBool wait_foreign
2037 #if !defined(THREADED_RTS)
2038                          __attribute__((unused))
2039 #endif
2040 )
2041                /* see Capability.c, shutdownCapability() */
2042 {
2043     Task *task = NULL;
2044
2045 #if defined(THREADED_RTS)
2046     ACQUIRE_LOCK(&sched_mutex);
2047     task = newBoundTask();
2048     RELEASE_LOCK(&sched_mutex);
2049 #endif
2050
2051     // If we haven't killed all the threads yet, do it now.
2052     if (sched_state < SCHED_SHUTTING_DOWN) {
2053         sched_state = SCHED_INTERRUPTING;
2054         scheduleDoGC(NULL,task,rtsFalse);    
2055     }
2056     sched_state = SCHED_SHUTTING_DOWN;
2057
2058 #if defined(THREADED_RTS)
2059     { 
2060         nat i;
2061         
2062         for (i = 0; i < n_capabilities; i++) {
2063             shutdownCapability(&capabilities[i], task, wait_foreign);
2064         }
2065         boundTaskExiting(task);
2066         stopTaskManager();
2067     }
2068 #else
2069     freeCapability(&MainCapability);
2070 #endif
2071 }
2072
2073 void
2074 freeScheduler( void )
2075 {
2076     freeTaskManager();
2077     if (n_capabilities != 1) {
2078         stgFree(capabilities);
2079     }
2080 #if defined(THREADED_RTS)
2081     closeMutex(&sched_mutex);
2082 #endif
2083 }
2084
2085 /* -----------------------------------------------------------------------------
2086    performGC
2087
2088    This is the interface to the garbage collector from Haskell land.
2089    We provide this so that external C code can allocate and garbage
2090    collect when called from Haskell via _ccall_GC.
2091    -------------------------------------------------------------------------- */
2092
2093 static void
2094 performGC_(rtsBool force_major)
2095 {
2096     Task *task;
2097     // We must grab a new Task here, because the existing Task may be
2098     // associated with a particular Capability, and chained onto the 
2099     // suspended_ccalling_tasks queue.
2100     ACQUIRE_LOCK(&sched_mutex);
2101     task = newBoundTask();
2102     RELEASE_LOCK(&sched_mutex);
2103     scheduleDoGC(NULL,task,force_major);
2104     boundTaskExiting(task);
2105 }
2106
2107 void
2108 performGC(void)
2109 {
2110     performGC_(rtsFalse);
2111 }
2112
2113 void
2114 performMajorGC(void)
2115 {
2116     performGC_(rtsTrue);
2117 }
2118
2119 /* -----------------------------------------------------------------------------
2120    Stack overflow
2121
2122    If the thread has reached its maximum stack size, then raise the
2123    StackOverflow exception in the offending thread.  Otherwise
2124    relocate the TSO into a larger chunk of memory and adjust its stack
2125    size appropriately.
2126    -------------------------------------------------------------------------- */
2127
2128 static StgTSO *
2129 threadStackOverflow(Capability *cap, StgTSO *tso)
2130 {
2131   nat new_stack_size, stack_words;
2132   lnat new_tso_size;
2133   StgPtr new_sp;
2134   StgTSO *dest;
2135
2136   IF_DEBUG(sanity,checkTSO(tso));
2137
2138   // don't allow throwTo() to modify the blocked_exceptions queue
2139   // while we are moving the TSO:
2140   lockClosure((StgClosure *)tso);
2141
2142   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2143       // NB. never raise a StackOverflow exception if the thread is
2144       // inside Control.Exceptino.block.  It is impractical to protect
2145       // against stack overflow exceptions, since virtually anything
2146       // can raise one (even 'catch'), so this is the only sensible
2147       // thing to do here.  See bug #767.
2148
2149       debugTrace(DEBUG_gc,
2150                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2151                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2152       IF_DEBUG(gc,
2153                /* If we're debugging, just print out the top of the stack */
2154                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2155                                                 tso->sp+64)));
2156
2157       // Send this thread the StackOverflow exception
2158       unlockTSO(tso);
2159       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2160       return tso;
2161   }
2162
2163   /* Try to double the current stack size.  If that takes us over the
2164    * maximum stack size for this thread, then use the maximum instead
2165    * (that is, unless we're already at or over the max size and we
2166    * can't raise the StackOverflow exception (see above), in which
2167    * case just double the size). Finally round up so the TSO ends up as
2168    * a whole number of blocks.
2169    */
2170   if (tso->stack_size >= tso->max_stack_size) {
2171       new_stack_size = tso->stack_size * 2;
2172   } else { 
2173       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2174   }
2175   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2176                                        TSO_STRUCT_SIZE)/sizeof(W_);
2177   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2178   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2179
2180   debugTrace(DEBUG_sched, 
2181              "increasing stack size from %ld words to %d.",
2182              (long)tso->stack_size, new_stack_size);
2183
2184   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2185   TICK_ALLOC_TSO(new_stack_size,0);
2186
2187   /* copy the TSO block and the old stack into the new area */
2188   memcpy(dest,tso,TSO_STRUCT_SIZE);
2189   stack_words = tso->stack + tso->stack_size - tso->sp;
2190   new_sp = (P_)dest + new_tso_size - stack_words;
2191   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2192
2193   /* relocate the stack pointers... */
2194   dest->sp         = new_sp;
2195   dest->stack_size = new_stack_size;
2196         
2197   /* Mark the old TSO as relocated.  We have to check for relocated
2198    * TSOs in the garbage collector and any primops that deal with TSOs.
2199    *
2200    * It's important to set the sp value to just beyond the end
2201    * of the stack, so we don't attempt to scavenge any part of the
2202    * dead TSO's stack.
2203    */
2204   tso->what_next = ThreadRelocated;
2205   setTSOLink(cap,tso,dest);
2206   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2207   tso->why_blocked = NotBlocked;
2208
2209   IF_PAR_DEBUG(verbose,
2210                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2211                      tso->id, tso, tso->stack_size);
2212                /* If we're debugging, just print out the top of the stack */
2213                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2214                                                 tso->sp+64)));
2215   
2216   unlockTSO(dest);
2217   unlockTSO(tso);
2218
2219   IF_DEBUG(sanity,checkTSO(dest));
2220 #if 0
2221   IF_DEBUG(scheduler,printTSO(dest));
2222 #endif
2223
2224   return dest;
2225 }
2226
2227 static StgTSO *
2228 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2229 {
2230     bdescr *bd, *new_bd;
2231     lnat free_w, tso_size_w;
2232     StgTSO *new_tso;
2233
2234     tso_size_w = tso_sizeW(tso);
2235
2236     if (tso_size_w < MBLOCK_SIZE_W || 
2237         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2238     {
2239         return tso;
2240     }
2241
2242     // don't allow throwTo() to modify the blocked_exceptions queue
2243     // while we are moving the TSO:
2244     lockClosure((StgClosure *)tso);
2245
2246     // this is the number of words we'll free
2247     free_w = round_to_mblocks(tso_size_w/2);
2248
2249     bd = Bdescr((StgPtr)tso);
2250     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2251     bd->free = bd->start + TSO_STRUCT_SIZEW;
2252
2253     new_tso = (StgTSO *)new_bd->start;
2254     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2255     new_tso->stack_size = new_bd->free - new_tso->stack;
2256
2257     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2258                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2259
2260     tso->what_next = ThreadRelocated;
2261     tso->_link = new_tso; // no write barrier reqd: same generation
2262
2263     // The TSO attached to this Task may have moved, so update the
2264     // pointer to it.
2265     if (task->tso == tso) {
2266         task->tso = new_tso;
2267     }
2268
2269     unlockTSO(new_tso);
2270     unlockTSO(tso);
2271
2272     IF_DEBUG(sanity,checkTSO(new_tso));
2273
2274     return new_tso;
2275 }
2276
2277 /* ---------------------------------------------------------------------------
2278    Interrupt execution
2279    - usually called inside a signal handler so it mustn't do anything fancy.   
2280    ------------------------------------------------------------------------ */
2281
2282 void
2283 interruptStgRts(void)
2284 {
2285     sched_state = SCHED_INTERRUPTING;
2286     setContextSwitches();
2287     wakeUpRts();
2288 }
2289
2290 /* -----------------------------------------------------------------------------
2291    Wake up the RTS
2292    
2293    This function causes at least one OS thread to wake up and run the
2294    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2295    an external event has arrived that may need servicing (eg. a
2296    keyboard interrupt).
2297
2298    In the single-threaded RTS we don't do anything here; we only have
2299    one thread anyway, and the event that caused us to want to wake up
2300    will have interrupted any blocking system call in progress anyway.
2301    -------------------------------------------------------------------------- */
2302
2303 void
2304 wakeUpRts(void)
2305 {
2306 #if defined(THREADED_RTS)
2307     // This forces the IO Manager thread to wakeup, which will
2308     // in turn ensure that some OS thread wakes up and runs the
2309     // scheduler loop, which will cause a GC and deadlock check.
2310     ioManagerWakeup();
2311 #endif
2312 }
2313
2314 /* -----------------------------------------------------------------------------
2315  * checkBlackHoles()
2316  *
2317  * Check the blackhole_queue for threads that can be woken up.  We do
2318  * this periodically: before every GC, and whenever the run queue is
2319  * empty.
2320  *
2321  * An elegant solution might be to just wake up all the blocked
2322  * threads with awakenBlockedQueue occasionally: they'll go back to
2323  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2324  * doesn't give us a way to tell whether we've actually managed to
2325  * wake up any threads, so we would be busy-waiting.
2326  *
2327  * -------------------------------------------------------------------------- */
2328
2329 static rtsBool
2330 checkBlackHoles (Capability *cap)
2331 {
2332     StgTSO **prev, *t;
2333     rtsBool any_woke_up = rtsFalse;
2334     StgHalfWord type;
2335
2336     // blackhole_queue is global:
2337     ASSERT_LOCK_HELD(&sched_mutex);
2338
2339     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2340
2341     // ASSUMES: sched_mutex
2342     prev = &blackhole_queue;
2343     t = blackhole_queue;
2344     while (t != END_TSO_QUEUE) {
2345         ASSERT(t->why_blocked == BlockedOnBlackHole);
2346         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2347         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2348             IF_DEBUG(sanity,checkTSO(t));
2349             t = unblockOne(cap, t);
2350             *prev = t;
2351             any_woke_up = rtsTrue;
2352         } else {
2353             prev = &t->_link;
2354             t = t->_link;
2355         }
2356     }
2357
2358     return any_woke_up;
2359 }
2360
2361 /* -----------------------------------------------------------------------------
2362    Deleting threads
2363
2364    This is used for interruption (^C) and forking, and corresponds to
2365    raising an exception but without letting the thread catch the
2366    exception.
2367    -------------------------------------------------------------------------- */
2368
2369 static void 
2370 deleteThread (Capability *cap, StgTSO *tso)
2371 {
2372     // NOTE: must only be called on a TSO that we have exclusive
2373     // access to, because we will call throwToSingleThreaded() below.
2374     // The TSO must be on the run queue of the Capability we own, or 
2375     // we must own all Capabilities.
2376
2377     if (tso->why_blocked != BlockedOnCCall &&
2378         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2379         throwToSingleThreaded(cap,tso,NULL);
2380     }
2381 }
2382
2383 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2384 static void 
2385 deleteThread_(Capability *cap, StgTSO *tso)
2386 { // for forkProcess only:
2387   // like deleteThread(), but we delete threads in foreign calls, too.
2388
2389     if (tso->why_blocked == BlockedOnCCall ||
2390         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2391         unblockOne(cap,tso);
2392         tso->what_next = ThreadKilled;
2393     } else {
2394         deleteThread(cap,tso);
2395     }
2396 }
2397 #endif
2398
2399 /* -----------------------------------------------------------------------------
2400    raiseExceptionHelper
2401    
2402    This function is called by the raise# primitve, just so that we can
2403    move some of the tricky bits of raising an exception from C-- into
2404    C.  Who knows, it might be a useful re-useable thing here too.
2405    -------------------------------------------------------------------------- */
2406
2407 StgWord
2408 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2409 {
2410     Capability *cap = regTableToCapability(reg);
2411     StgThunk *raise_closure = NULL;
2412     StgPtr p, next;
2413     StgRetInfoTable *info;
2414     //
2415     // This closure represents the expression 'raise# E' where E
2416     // is the exception raise.  It is used to overwrite all the
2417     // thunks which are currently under evaluataion.
2418     //
2419
2420     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2421     // LDV profiling: stg_raise_info has THUNK as its closure
2422     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2423     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2424     // 1 does not cause any problem unless profiling is performed.
2425     // However, when LDV profiling goes on, we need to linearly scan
2426     // small object pool, where raise_closure is stored, so we should
2427     // use MIN_UPD_SIZE.
2428     //
2429     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2430     //                                 sizeofW(StgClosure)+1);
2431     //
2432
2433     //
2434     // Walk up the stack, looking for the catch frame.  On the way,
2435     // we update any closures pointed to from update frames with the
2436     // raise closure that we just built.
2437     //
2438     p = tso->sp;
2439     while(1) {
2440         info = get_ret_itbl((StgClosure *)p);
2441         next = p + stack_frame_sizeW((StgClosure *)p);
2442         switch (info->i.type) {
2443             
2444         case UPDATE_FRAME:
2445             // Only create raise_closure if we need to.
2446             if (raise_closure == NULL) {
2447                 raise_closure = 
2448                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2449                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2450                 raise_closure->payload[0] = exception;
2451             }
2452             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2453             p = next;
2454             continue;
2455
2456         case ATOMICALLY_FRAME:
2457             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2458             tso->sp = p;
2459             return ATOMICALLY_FRAME;
2460             
2461         case CATCH_FRAME:
2462             tso->sp = p;
2463             return CATCH_FRAME;
2464
2465         case CATCH_STM_FRAME:
2466             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2467             tso->sp = p;
2468             return CATCH_STM_FRAME;
2469             
2470         case STOP_FRAME:
2471             tso->sp = p;
2472             return STOP_FRAME;
2473
2474         case CATCH_RETRY_FRAME:
2475         default:
2476             p = next; 
2477             continue;
2478         }
2479     }
2480 }
2481
2482
2483 /* -----------------------------------------------------------------------------
2484    findRetryFrameHelper
2485
2486    This function is called by the retry# primitive.  It traverses the stack
2487    leaving tso->sp referring to the frame which should handle the retry.  
2488
2489    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2490    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2491
2492    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2493    create) because retries are not considered to be exceptions, despite the
2494    similar implementation.
2495
2496    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2497    not be created within memory transactions.
2498    -------------------------------------------------------------------------- */
2499
2500 StgWord
2501 findRetryFrameHelper (StgTSO *tso)
2502 {
2503   StgPtr           p, next;
2504   StgRetInfoTable *info;
2505
2506   p = tso -> sp;
2507   while (1) {
2508     info = get_ret_itbl((StgClosure *)p);
2509     next = p + stack_frame_sizeW((StgClosure *)p);
2510     switch (info->i.type) {
2511       
2512     case ATOMICALLY_FRAME:
2513         debugTrace(DEBUG_stm,
2514                    "found ATOMICALLY_FRAME at %p during retry", p);
2515         tso->sp = p;
2516         return ATOMICALLY_FRAME;
2517       
2518     case CATCH_RETRY_FRAME:
2519         debugTrace(DEBUG_stm,
2520                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2521         tso->sp = p;
2522         return CATCH_RETRY_FRAME;
2523       
2524     case CATCH_STM_FRAME: {
2525         StgTRecHeader *trec = tso -> trec;
2526         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2527         debugTrace(DEBUG_stm,
2528                    "found CATCH_STM_FRAME at %p during retry", p);
2529         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2530         stmAbortTransaction(tso -> cap, trec);
2531         stmFreeAbortedTRec(tso -> cap, trec);
2532         tso -> trec = outer;
2533         p = next; 
2534         continue;
2535     }
2536       
2537
2538     default:
2539       ASSERT(info->i.type != CATCH_FRAME);
2540       ASSERT(info->i.type != STOP_FRAME);
2541       p = next; 
2542       continue;
2543     }
2544   }
2545 }
2546
2547 /* -----------------------------------------------------------------------------
2548    resurrectThreads is called after garbage collection on the list of
2549    threads found to be garbage.  Each of these threads will be woken
2550    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2551    on an MVar, or NonTermination if the thread was blocked on a Black
2552    Hole.
2553
2554    Locks: assumes we hold *all* the capabilities.
2555    -------------------------------------------------------------------------- */
2556
2557 void
2558 resurrectThreads (StgTSO *threads)
2559 {
2560     StgTSO *tso, *next;
2561     Capability *cap;
2562     step *step;
2563
2564     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2565         next = tso->global_link;
2566
2567         step = Bdescr((P_)tso)->step;
2568         tso->global_link = step->threads;
2569         step->threads = tso;
2570
2571         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2572         
2573         // Wake up the thread on the Capability it was last on
2574         cap = tso->cap;
2575         
2576         switch (tso->why_blocked) {
2577         case BlockedOnMVar:
2578         case BlockedOnException:
2579             /* Called by GC - sched_mutex lock is currently held. */
2580             throwToSingleThreaded(cap, tso,
2581                                   (StgClosure *)blockedOnDeadMVar_closure);
2582             break;
2583         case BlockedOnBlackHole:
2584             throwToSingleThreaded(cap, tso,
2585                                   (StgClosure *)nonTermination_closure);
2586             break;
2587         case BlockedOnSTM:
2588             throwToSingleThreaded(cap, tso,
2589                                   (StgClosure *)blockedIndefinitely_closure);
2590             break;
2591         case NotBlocked:
2592             /* This might happen if the thread was blocked on a black hole
2593              * belonging to a thread that we've just woken up (raiseAsync
2594              * can wake up threads, remember...).
2595              */
2596             continue;
2597         default:
2598             barf("resurrectThreads: thread blocked in a strange way");
2599         }
2600     }
2601 }
2602
2603 /* -----------------------------------------------------------------------------
2604    performPendingThrowTos is called after garbage collection, and
2605    passed a list of threads that were found to have pending throwTos
2606    (tso->blocked_exceptions was not empty), and were blocked.
2607    Normally this doesn't happen, because we would deliver the
2608    exception directly if the target thread is blocked, but there are
2609    small windows where it might occur on a multiprocessor (see
2610    throwTo()).
2611
2612    NB. we must be holding all the capabilities at this point, just
2613    like resurrectThreads().
2614    -------------------------------------------------------------------------- */
2615
2616 void
2617 performPendingThrowTos (StgTSO *threads)
2618 {
2619     StgTSO *tso, *next;
2620     Capability *cap;
2621     step *step;
2622
2623     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2624         next = tso->global_link;
2625
2626         step = Bdescr((P_)tso)->step;
2627         tso->global_link = step->threads;
2628         step->threads = tso;
2629
2630         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2631         
2632         cap = tso->cap;
2633         maybePerformBlockedException(cap, tso);
2634     }
2635 }