ca6e426f97e03e2a96874a4d923c4b96a98eb229
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34
35 /* PARALLEL_HASKELL includes go here */
36
37 #include "Sparks.h"
38 #include "Capability.h"
39 #include "Task.h"
40 #include "AwaitEvent.h"
41 #if defined(mingw32_HOST_OS)
42 #include "win32/IOManager.h"
43 #endif
44 #include "Trace.h"
45 #include "RaiseAsync.h"
46 #include "Threads.h"
47 #include "ThrIOManager.h"
48
49 #ifdef HAVE_SYS_TYPES_H
50 #include <sys/types.h>
51 #endif
52 #ifdef HAVE_UNISTD_H
53 #include <unistd.h>
54 #endif
55
56 #include <string.h>
57 #include <stdlib.h>
58 #include <stdarg.h>
59
60 #ifdef HAVE_ERRNO_H
61 #include <errno.h>
62 #endif
63
64 // Turn off inlining when debugging - it obfuscates things
65 #ifdef DEBUG
66 # undef  STATIC_INLINE
67 # define STATIC_INLINE static
68 #endif
69
70 /* -----------------------------------------------------------------------------
71  * Global variables
72  * -------------------------------------------------------------------------- */
73
74 #if !defined(THREADED_RTS)
75 // Blocked/sleeping thrads
76 StgTSO *blocked_queue_hd = NULL;
77 StgTSO *blocked_queue_tl = NULL;
78 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
79 #endif
80
81 /* Threads blocked on blackholes.
82  * LOCK: sched_mutex+capability, or all capabilities
83  */
84 StgTSO *blackhole_queue = NULL;
85
86 /* The blackhole_queue should be checked for threads to wake up.  See
87  * Schedule.h for more thorough comment.
88  * LOCK: none (doesn't matter if we miss an update)
89  */
90 rtsBool blackholes_need_checking = rtsFalse;
91
92 /* flag that tracks whether we have done any execution in this time slice.
93  * LOCK: currently none, perhaps we should lock (but needs to be
94  * updated in the fast path of the scheduler).
95  */
96 nat recent_activity = ACTIVITY_YES;
97
98 /* if this flag is set as well, give up execution
99  * LOCK: none (changes once, from false->true)
100  */
101 rtsBool sched_state = SCHED_RUNNING;
102
103 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
104  *  exists - earlier gccs apparently didn't.
105  *  -= chak
106  */
107 StgTSO dummy_tso;
108
109 /*
110  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
111  * in an MT setting, needed to signal that a worker thread shouldn't hang around
112  * in the scheduler when it is out of work.
113  */
114 rtsBool shutting_down_scheduler = rtsFalse;
115
116 /*
117  * This mutex protects most of the global scheduler data in
118  * the THREADED_RTS runtime.
119  */
120 #if defined(THREADED_RTS)
121 Mutex sched_mutex;
122 #endif
123
124 #if !defined(mingw32_HOST_OS)
125 #define FORKPROCESS_PRIMOP_SUPPORTED
126 #endif
127
128 /* -----------------------------------------------------------------------------
129  * static function prototypes
130  * -------------------------------------------------------------------------- */
131
132 static Capability *schedule (Capability *initialCapability, Task *task);
133
134 //
135 // These function all encapsulate parts of the scheduler loop, and are
136 // abstracted only to make the structure and control flow of the
137 // scheduler clearer.
138 //
139 static void schedulePreLoop (void);
140 static void scheduleFindWork (Capability *cap);
141 #if defined(THREADED_RTS)
142 static void scheduleYield (Capability **pcap, Task *task);
143 #endif
144 static void scheduleStartSignalHandlers (Capability *cap);
145 static void scheduleCheckBlockedThreads (Capability *cap);
146 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
147 static void scheduleCheckBlackHoles (Capability *cap);
148 static void scheduleDetectDeadlock (Capability *cap, Task *task);
149 static void schedulePushWork(Capability *cap, Task *task);
150 #if defined(PARALLEL_HASKELL)
151 static rtsBool scheduleGetRemoteWork(Capability *cap);
152 static void scheduleSendPendingMessages(void);
153 #endif
154 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
155 static void scheduleActivateSpark(Capability *cap);
156 #endif
157 static void schedulePostRunThread(Capability *cap, StgTSO *t);
158 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
159 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
160                                          StgTSO *t);
161 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
162                                     nat prev_what_next );
163 static void scheduleHandleThreadBlocked( StgTSO *t );
164 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
165                                              StgTSO *t );
166 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
167 static Capability *scheduleDoGC(Capability *cap, Task *task,
168                                 rtsBool force_major);
169
170 static rtsBool checkBlackHoles(Capability *cap);
171
172 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
173 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
174
175 static void deleteThread (Capability *cap, StgTSO *tso);
176 static void deleteAllThreads (Capability *cap);
177
178 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
179 static void deleteThread_(Capability *cap, StgTSO *tso);
180 #endif
181
182 #ifdef DEBUG
183 static char *whatNext_strs[] = {
184   "(unknown)",
185   "ThreadRunGHC",
186   "ThreadInterpret",
187   "ThreadKilled",
188   "ThreadRelocated",
189   "ThreadComplete"
190 };
191 #endif
192
193 /* -----------------------------------------------------------------------------
194  * Putting a thread on the run queue: different scheduling policies
195  * -------------------------------------------------------------------------- */
196
197 STATIC_INLINE void
198 addToRunQueue( Capability *cap, StgTSO *t )
199 {
200 #if defined(PARALLEL_HASKELL)
201     if (RtsFlags.ParFlags.doFairScheduling) { 
202         // this does round-robin scheduling; good for concurrency
203         appendToRunQueue(cap,t);
204     } else {
205         // this does unfair scheduling; good for parallelism
206         pushOnRunQueue(cap,t);
207     }
208 #else
209     // this does round-robin scheduling; good for concurrency
210     appendToRunQueue(cap,t);
211 #endif
212 }
213
214 /* ---------------------------------------------------------------------------
215    Main scheduling loop.
216
217    We use round-robin scheduling, each thread returning to the
218    scheduler loop when one of these conditions is detected:
219
220       * out of heap space
221       * timer expires (thread yields)
222       * thread blocks
223       * thread ends
224       * stack overflow
225
226    GRAN version:
227      In a GranSim setup this loop iterates over the global event queue.
228      This revolves around the global event queue, which determines what 
229      to do next. Therefore, it's more complicated than either the 
230      concurrent or the parallel (GUM) setup.
231   This version has been entirely removed (JB 2008/08).
232
233    GUM version:
234      GUM iterates over incoming messages.
235      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
236      and sends out a fish whenever it has nothing to do; in-between
237      doing the actual reductions (shared code below) it processes the
238      incoming messages and deals with delayed operations 
239      (see PendingFetches).
240      This is not the ugliest code you could imagine, but it's bloody close.
241
242   (JB 2008/08) This version was formerly indicated by a PP-Flag PAR,
243   now by PP-flag PARALLEL_HASKELL. The Eden RTS (in GHC-6.x) uses it,
244   as well as future GUM versions. This file has been refurbished to
245   only contain valid code, which is however incomplete, refers to
246   invalid includes etc.
247
248    ------------------------------------------------------------------------ */
249
250 static Capability *
251 schedule (Capability *initialCapability, Task *task)
252 {
253   StgTSO *t;
254   Capability *cap;
255   StgThreadReturnCode ret;
256 #if defined(PARALLEL_HASKELL)
257   rtsBool receivedFinish = rtsFalse;
258 #endif
259   nat prev_what_next;
260   rtsBool ready_to_gc;
261 #if defined(THREADED_RTS)
262   rtsBool first = rtsTrue;
263 #endif
264   
265   cap = initialCapability;
266
267   // Pre-condition: this task owns initialCapability.
268   // The sched_mutex is *NOT* held
269   // NB. on return, we still hold a capability.
270
271   debugTrace (DEBUG_sched, 
272               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
273               task, initialCapability);
274
275   schedulePreLoop();
276
277   // -----------------------------------------------------------
278   // Scheduler loop starts here:
279
280 #if defined(PARALLEL_HASKELL)
281 #define TERMINATION_CONDITION        (!receivedFinish)
282 #else
283 #define TERMINATION_CONDITION        rtsTrue
284 #endif
285
286   while (TERMINATION_CONDITION) {
287
288     // Check whether we have re-entered the RTS from Haskell without
289     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
290     // call).
291     if (cap->in_haskell) {
292           errorBelch("schedule: re-entered unsafely.\n"
293                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
294           stg_exit(EXIT_FAILURE);
295     }
296
297     // The interruption / shutdown sequence.
298     // 
299     // In order to cleanly shut down the runtime, we want to:
300     //   * make sure that all main threads return to their callers
301     //     with the state 'Interrupted'.
302     //   * clean up all OS threads assocated with the runtime
303     //   * free all memory etc.
304     //
305     // So the sequence for ^C goes like this:
306     //
307     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
308     //     arranges for some Capability to wake up
309     //
310     //   * all threads in the system are halted, and the zombies are
311     //     placed on the run queue for cleaning up.  We acquire all
312     //     the capabilities in order to delete the threads, this is
313     //     done by scheduleDoGC() for convenience (because GC already
314     //     needs to acquire all the capabilities).  We can't kill
315     //     threads involved in foreign calls.
316     // 
317     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
318     //
319     //   * sched_state := SCHED_SHUTTING_DOWN
320     //
321     //   * all workers exit when the run queue on their capability
322     //     drains.  All main threads will also exit when their TSO
323     //     reaches the head of the run queue and they can return.
324     //
325     //   * eventually all Capabilities will shut down, and the RTS can
326     //     exit.
327     //
328     //   * We might be left with threads blocked in foreign calls, 
329     //     we should really attempt to kill these somehow (TODO);
330     
331     switch (sched_state) {
332     case SCHED_RUNNING:
333         break;
334     case SCHED_INTERRUPTING:
335         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
336 #if defined(THREADED_RTS)
337         discardSparksCap(cap);
338 #endif
339         /* scheduleDoGC() deletes all the threads */
340         cap = scheduleDoGC(cap,task,rtsFalse);
341         break;
342     case SCHED_SHUTTING_DOWN:
343         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
344         // If we are a worker, just exit.  If we're a bound thread
345         // then we will exit below when we've removed our TSO from
346         // the run queue.
347         if (task->tso == NULL && emptyRunQueue(cap)) {
348             return cap;
349         }
350         break;
351     default:
352         barf("sched_state: %d", sched_state);
353     }
354
355     scheduleFindWork(cap);
356
357     /* work pushing, currently relevant only for THREADED_RTS:
358        (pushes threads, wakes up idle capabilities for stealing) */
359     schedulePushWork(cap,task);
360
361 #if defined(PARALLEL_HASKELL)
362     /* since we perform a blocking receive and continue otherwise,
363        either we never reach here or we definitely have work! */
364     // from here: non-empty run queue
365     ASSERT(!emptyRunQueue(cap));
366
367     if (PacketsWaiting()) {  /* now process incoming messages, if any
368                                 pending...  
369
370                                 CAUTION: scheduleGetRemoteWork called
371                                 above, waits for messages as well! */
372       processMessages(cap, &receivedFinish);
373     }
374 #endif // PARALLEL_HASKELL: non-empty run queue!
375
376     scheduleDetectDeadlock(cap,task);
377
378 #if defined(THREADED_RTS)
379     cap = task->cap;    // reload cap, it might have changed
380 #endif
381
382     // Normally, the only way we can get here with no threads to
383     // run is if a keyboard interrupt received during 
384     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
385     // Additionally, it is not fatal for the
386     // threaded RTS to reach here with no threads to run.
387     //
388     // win32: might be here due to awaitEvent() being abandoned
389     // as a result of a console event having been delivered.
390     
391 #if defined(THREADED_RTS)
392     if (first) 
393     {
394     // XXX: ToDo
395     //     // don't yield the first time, we want a chance to run this
396     //     // thread for a bit, even if there are others banging at the
397     //     // door.
398     //     first = rtsFalse;
399     //     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
400     }
401
402   yield:
403     scheduleYield(&cap,task);
404     if (emptyRunQueue(cap)) continue; // look for work again
405 #endif
406
407 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
408     if ( emptyRunQueue(cap) ) {
409         ASSERT(sched_state >= SCHED_INTERRUPTING);
410     }
411 #endif
412
413     // 
414     // Get a thread to run
415     //
416     t = popRunQueue(cap);
417
418     // Sanity check the thread we're about to run.  This can be
419     // expensive if there is lots of thread switching going on...
420     IF_DEBUG(sanity,checkTSO(t));
421
422 #if defined(THREADED_RTS)
423     // Check whether we can run this thread in the current task.
424     // If not, we have to pass our capability to the right task.
425     {
426         Task *bound = t->bound;
427       
428         if (bound) {
429             if (bound == task) {
430                 debugTrace(DEBUG_sched,
431                            "### Running thread %lu in bound thread", (unsigned long)t->id);
432                 // yes, the Haskell thread is bound to the current native thread
433             } else {
434                 debugTrace(DEBUG_sched,
435                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
436                 // no, bound to a different Haskell thread: pass to that thread
437                 pushOnRunQueue(cap,t);
438                 continue;
439             }
440         } else {
441             // The thread we want to run is unbound.
442             if (task->tso) { 
443                 debugTrace(DEBUG_sched,
444                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
445                 // no, the current native thread is bound to a different
446                 // Haskell thread, so pass it to any worker thread
447                 pushOnRunQueue(cap,t);
448                 continue; 
449             }
450         }
451     }
452 #endif
453
454     /* context switches are initiated by the timer signal, unless
455      * the user specified "context switch as often as possible", with
456      * +RTS -C0
457      */
458     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
459         && !emptyThreadQueues(cap)) {
460         cap->context_switch = 1;
461     }
462          
463 run_thread:
464
465     // CurrentTSO is the thread to run.  t might be different if we
466     // loop back to run_thread, so make sure to set CurrentTSO after
467     // that.
468     cap->r.rCurrentTSO = t;
469
470     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
471                               (long)t->id, whatNext_strs[t->what_next]);
472
473     startHeapProfTimer();
474
475     // Check for exceptions blocked on this thread
476     maybePerformBlockedException (cap, t);
477
478     // ----------------------------------------------------------------------
479     // Run the current thread 
480
481     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
482     ASSERT(t->cap == cap);
483     ASSERT(t->bound ? t->bound->cap == cap : 1);
484
485     prev_what_next = t->what_next;
486
487     errno = t->saved_errno;
488 #if mingw32_HOST_OS
489     SetLastError(t->saved_winerror);
490 #endif
491
492     cap->in_haskell = rtsTrue;
493
494     dirty_TSO(cap,t);
495
496 #if defined(THREADED_RTS)
497     if (recent_activity == ACTIVITY_DONE_GC) {
498         // ACTIVITY_DONE_GC means we turned off the timer signal to
499         // conserve power (see #1623).  Re-enable it here.
500         nat prev;
501         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
502         if (prev == ACTIVITY_DONE_GC) {
503             startTimer();
504         }
505     } else {
506         recent_activity = ACTIVITY_YES;
507     }
508 #endif
509
510     switch (prev_what_next) {
511         
512     case ThreadKilled:
513     case ThreadComplete:
514         /* Thread already finished, return to scheduler. */
515         ret = ThreadFinished;
516         break;
517         
518     case ThreadRunGHC:
519     {
520         StgRegTable *r;
521         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
522         cap = regTableToCapability(r);
523         ret = r->rRet;
524         break;
525     }
526     
527     case ThreadInterpret:
528         cap = interpretBCO(cap);
529         ret = cap->r.rRet;
530         break;
531         
532     default:
533         barf("schedule: invalid what_next field");
534     }
535
536     cap->in_haskell = rtsFalse;
537
538     // The TSO might have moved, eg. if it re-entered the RTS and a GC
539     // happened.  So find the new location:
540     t = cap->r.rCurrentTSO;
541
542     // We have run some Haskell code: there might be blackhole-blocked
543     // threads to wake up now.
544     // Lock-free test here should be ok, we're just setting a flag.
545     if ( blackhole_queue != END_TSO_QUEUE ) {
546         blackholes_need_checking = rtsTrue;
547     }
548     
549     // And save the current errno in this thread.
550     // XXX: possibly bogus for SMP because this thread might already
551     // be running again, see code below.
552     t->saved_errno = errno;
553 #if mingw32_HOST_OS
554     // Similarly for Windows error code
555     t->saved_winerror = GetLastError();
556 #endif
557
558 #if defined(THREADED_RTS)
559     // If ret is ThreadBlocked, and this Task is bound to the TSO that
560     // blocked, we are in limbo - the TSO is now owned by whatever it
561     // is blocked on, and may in fact already have been woken up,
562     // perhaps even on a different Capability.  It may be the case
563     // that task->cap != cap.  We better yield this Capability
564     // immediately and return to normaility.
565     if (ret == ThreadBlocked) {
566         debugTrace(DEBUG_sched,
567                    "--<< thread %lu (%s) stopped: blocked",
568                    (unsigned long)t->id, whatNext_strs[t->what_next]);
569         goto yield;
570     }
571 #endif
572
573     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
574     ASSERT(t->cap == cap);
575
576     // ----------------------------------------------------------------------
577     
578     // Costs for the scheduler are assigned to CCS_SYSTEM
579     stopHeapProfTimer();
580 #if defined(PROFILING)
581     CCCS = CCS_SYSTEM;
582 #endif
583     
584     schedulePostRunThread(cap,t);
585
586     t = threadStackUnderflow(task,t);
587
588     ready_to_gc = rtsFalse;
589
590     switch (ret) {
591     case HeapOverflow:
592         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
593         break;
594
595     case StackOverflow:
596         scheduleHandleStackOverflow(cap,task,t);
597         break;
598
599     case ThreadYielding:
600         if (scheduleHandleYield(cap, t, prev_what_next)) {
601             // shortcut for switching between compiler/interpreter:
602             goto run_thread; 
603         }
604         break;
605
606     case ThreadBlocked:
607         scheduleHandleThreadBlocked(t);
608         break;
609
610     case ThreadFinished:
611         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
612         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
613         break;
614
615     default:
616       barf("schedule: invalid thread return code %d", (int)ret);
617     }
618
619     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
620       cap = scheduleDoGC(cap,task,rtsFalse);
621     }
622   } /* end of while() */
623 }
624
625 /* ----------------------------------------------------------------------------
626  * Setting up the scheduler loop
627  * ------------------------------------------------------------------------- */
628
629 static void
630 schedulePreLoop(void)
631 {
632   // initialisation for scheduler - what cannot go into initScheduler()  
633 }
634
635 /* -----------------------------------------------------------------------------
636  * scheduleFindWork()
637  *
638  * Search for work to do, and handle messages from elsewhere.
639  * -------------------------------------------------------------------------- */
640
641 static void
642 scheduleFindWork (Capability *cap)
643 {
644     scheduleStartSignalHandlers(cap);
645
646     // Only check the black holes here if we've nothing else to do.
647     // During normal execution, the black hole list only gets checked
648     // at GC time, to avoid repeatedly traversing this possibly long
649     // list each time around the scheduler.
650     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
651
652     scheduleCheckWakeupThreads(cap);
653
654     scheduleCheckBlockedThreads(cap);
655
656 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
657     // Try to activate one of our own sparks
658     if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
659 #endif
660
661 #if defined(THREADED_RTS)
662     // Try to steak work if we don't have any
663     if (emptyRunQueue(cap)) { stealWork(cap); }
664 #endif
665     
666 #if defined(PARALLEL_HASKELL)
667     // if messages have been buffered...
668     scheduleSendPendingMessages();
669 #endif
670
671 #if defined(PARALLEL_HASKELL)
672     if (emptyRunQueue(cap)) {
673         receivedFinish = scheduleGetRemoteWork(cap);
674         continue; //  a new round, (hopefully) with new work
675         /* 
676            in GUM, this a) sends out a FISH and returns IF no fish is
677                            out already
678                         b) (blocking) awaits and receives messages
679            
680            in Eden, this is only the blocking receive, as b) in GUM.
681         */
682     }
683 #endif
684 }
685
686 #if defined(THREADED_RTS)
687 STATIC_INLINE rtsBool
688 shouldYieldCapability (Capability *cap, Task *task)
689 {
690     // we need to yield this capability to someone else if..
691     //   - another thread is initiating a GC
692     //   - another Task is returning from a foreign call
693     //   - the thread at the head of the run queue cannot be run
694     //     by this Task (it is bound to another Task, or it is unbound
695     //     and this task it bound).
696     return (waiting_for_gc || 
697             cap->returning_tasks_hd != NULL ||
698             (!emptyRunQueue(cap) && (task->tso == NULL
699                                      ? cap->run_queue_hd->bound != NULL
700                                      : cap->run_queue_hd->bound != task)));
701 }
702
703 // This is the single place where a Task goes to sleep.  There are
704 // two reasons it might need to sleep:
705 //    - there are no threads to run
706 //    - we need to yield this Capability to someone else 
707 //      (see shouldYieldCapability())
708 //
709 // The return value indicates whether 
710
711 static void
712 scheduleYield (Capability **pcap, Task *task)
713 {
714     Capability *cap = *pcap;
715
716     // if we have work, and we don't need to give up the Capability, continue.
717     if (!shouldYieldCapability(cap,task) && 
718         (!emptyRunQueue(cap) || blackholes_need_checking))
719         return;
720
721     // otherwise yield (sleep), and keep yielding if necessary.
722     do {
723         yieldCapability(&cap,task);
724     } 
725     while (shouldYieldCapability(cap,task));
726
727     // note there may still be no threads on the run queue at this
728     // point, the caller has to check.
729
730     *pcap = cap;
731     return;
732 }
733 #endif
734     
735 /* -----------------------------------------------------------------------------
736  * schedulePushWork()
737  *
738  * Push work to other Capabilities if we have some.
739  * -------------------------------------------------------------------------- */
740
741 static void
742 schedulePushWork(Capability *cap USED_IF_THREADS, 
743                  Task *task      USED_IF_THREADS)
744 {
745   /* following code not for PARALLEL_HASKELL. I kept the call general,
746      future GUM versions might use pushing in a distributed setup */
747 #if defined(THREADED_RTS)
748
749     Capability *free_caps[n_capabilities], *cap0;
750     nat i, n_free_caps;
751
752     // migration can be turned off with +RTS -qg
753     if (!RtsFlags.ParFlags.migrate) return;
754
755     // Check whether we have more threads on our run queue, or sparks
756     // in our pool, that we could hand to another Capability.
757     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
758         && sparkPoolSizeCap(cap) < 2) {
759         return;
760     }
761
762     // First grab as many free Capabilities as we can.
763     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
764         cap0 = &capabilities[i];
765         if (cap != cap0 && tryGrabCapability(cap0,task)) {
766             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
767                 // it already has some work, we just grabbed it at 
768                 // the wrong moment.  Or maybe it's deadlocked!
769                 releaseCapability(cap0);
770             } else {
771                 free_caps[n_free_caps++] = cap0;
772             }
773         }
774     }
775
776     // we now have n_free_caps free capabilities stashed in
777     // free_caps[].  Share our run queue equally with them.  This is
778     // probably the simplest thing we could do; improvements we might
779     // want to do include:
780     //
781     //   - giving high priority to moving relatively new threads, on 
782     //     the gournds that they haven't had time to build up a
783     //     working set in the cache on this CPU/Capability.
784     //
785     //   - giving low priority to moving long-lived threads
786
787     if (n_free_caps > 0) {
788         StgTSO *prev, *t, *next;
789         rtsBool pushed_to_all;
790
791         debugTrace(DEBUG_sched, 
792                    "cap %d: %s and %d free capabilities, sharing...", 
793                    cap->no, 
794                    (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
795                    "excess threads on run queue":"sparks to share (>=2)",
796                    n_free_caps);
797
798         i = 0;
799         pushed_to_all = rtsFalse;
800
801         if (cap->run_queue_hd != END_TSO_QUEUE) {
802             prev = cap->run_queue_hd;
803             t = prev->_link;
804             prev->_link = END_TSO_QUEUE;
805             for (; t != END_TSO_QUEUE; t = next) {
806                 next = t->_link;
807                 t->_link = END_TSO_QUEUE;
808                 if (t->what_next == ThreadRelocated
809                     || t->bound == task // don't move my bound thread
810                     || tsoLocked(t)) {  // don't move a locked thread
811                     setTSOLink(cap, prev, t);
812                     prev = t;
813                 } else if (i == n_free_caps) {
814                     pushed_to_all = rtsTrue;
815                     i = 0;
816                     // keep one for us
817                     setTSOLink(cap, prev, t);
818                     prev = t;
819                 } else {
820                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
821                     appendToRunQueue(free_caps[i],t);
822                     if (t->bound) { t->bound->cap = free_caps[i]; }
823                     t->cap = free_caps[i];
824                     i++;
825                 }
826             }
827             cap->run_queue_tl = prev;
828         }
829
830 #ifdef SPARK_PUSHING
831         /* JB I left this code in place, it would work but is not necessary */
832
833         // If there are some free capabilities that we didn't push any
834         // threads to, then try to push a spark to each one.
835         if (!pushed_to_all) {
836             StgClosure *spark;
837             // i is the next free capability to push to
838             for (; i < n_free_caps; i++) {
839                 if (emptySparkPoolCap(free_caps[i])) {
840                     spark = tryStealSpark(cap->sparks);
841                     if (spark != NULL) {
842                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
843                         newSpark(&(free_caps[i]->r), spark);
844                     }
845                 }
846             }
847         }
848 #endif /* SPARK_PUSHING */
849
850         // release the capabilities
851         for (i = 0; i < n_free_caps; i++) {
852             task->cap = free_caps[i];
853             releaseAndWakeupCapability(free_caps[i]);
854         }
855     }
856     task->cap = cap; // reset to point to our Capability.
857
858 #endif /* THREADED_RTS */
859
860 }
861
862 /* ----------------------------------------------------------------------------
863  * Start any pending signal handlers
864  * ------------------------------------------------------------------------- */
865
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 static void
868 scheduleStartSignalHandlers(Capability *cap)
869 {
870     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871         // safe outside the lock
872         startSignalHandlers(cap);
873     }
874 }
875 #else
876 static void
877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
878 {
879 }
880 #endif
881
882 /* ----------------------------------------------------------------------------
883  * Check for blocked threads that can be woken up.
884  * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 {
889 #if !defined(THREADED_RTS)
890     //
891     // Check whether any waiting threads need to be woken up.  If the
892     // run queue is empty, and there are no other tasks running, we
893     // can wait indefinitely for something to happen.
894     //
895     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896     {
897         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
898     }
899 #endif
900 }
901
902
903 /* ----------------------------------------------------------------------------
904  * Check for threads woken up by other Capabilities
905  * ------------------------------------------------------------------------- */
906
907 static void
908 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
909 {
910 #if defined(THREADED_RTS)
911     // Any threads that were woken up by other Capabilities get
912     // appended to our run queue.
913     if (!emptyWakeupQueue(cap)) {
914         ACQUIRE_LOCK(&cap->lock);
915         if (emptyRunQueue(cap)) {
916             cap->run_queue_hd = cap->wakeup_queue_hd;
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         } else {
919             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         }
922         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
923         RELEASE_LOCK(&cap->lock);
924     }
925 #endif
926 }
927
928 /* ----------------------------------------------------------------------------
929  * Check for threads blocked on BLACKHOLEs that can be woken up
930  * ------------------------------------------------------------------------- */
931 static void
932 scheduleCheckBlackHoles (Capability *cap)
933 {
934     if ( blackholes_need_checking ) // check without the lock first
935     {
936         ACQUIRE_LOCK(&sched_mutex);
937         if ( blackholes_need_checking ) {
938             blackholes_need_checking = rtsFalse;
939             // important that we reset the flag *before* checking the
940             // blackhole queue, otherwise we could get deadlock.  This
941             // happens as follows: we wake up a thread that
942             // immediately runs on another Capability, blocks on a
943             // blackhole, and then we reset the blackholes_need_checking flag.
944             checkBlackHoles(cap);
945         }
946         RELEASE_LOCK(&sched_mutex);
947     }
948 }
949
950 /* ----------------------------------------------------------------------------
951  * Detect deadlock conditions and attempt to resolve them.
952  * ------------------------------------------------------------------------- */
953
954 static void
955 scheduleDetectDeadlock (Capability *cap, Task *task)
956 {
957
958 #if defined(PARALLEL_HASKELL)
959     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
960     return;
961 #endif
962
963     /* 
964      * Detect deadlock: when we have no threads to run, there are no
965      * threads blocked, waiting for I/O, or sleeping, and all the
966      * other tasks are waiting for work, we must have a deadlock of
967      * some description.
968      */
969     if ( emptyThreadQueues(cap) )
970     {
971 #if defined(THREADED_RTS)
972         /* 
973          * In the threaded RTS, we only check for deadlock if there
974          * has been no activity in a complete timeslice.  This means
975          * we won't eagerly start a full GC just because we don't have
976          * any threads to run currently.
977          */
978         if (recent_activity != ACTIVITY_INACTIVE) return;
979 #endif
980
981         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
982
983         // Garbage collection can release some new threads due to
984         // either (a) finalizers or (b) threads resurrected because
985         // they are unreachable and will therefore be sent an
986         // exception.  Any threads thus released will be immediately
987         // runnable.
988         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
989
990         recent_activity = ACTIVITY_DONE_GC;
991         // disable timer signals (see #1623)
992         stopTimer();
993         
994         if ( !emptyRunQueue(cap) ) return;
995
996 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
997         /* If we have user-installed signal handlers, then wait
998          * for signals to arrive rather then bombing out with a
999          * deadlock.
1000          */
1001         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1002             debugTrace(DEBUG_sched,
1003                        "still deadlocked, waiting for signals...");
1004
1005             awaitUserSignals();
1006
1007             if (signals_pending()) {
1008                 startSignalHandlers(cap);
1009             }
1010
1011             // either we have threads to run, or we were interrupted:
1012             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1013
1014             return;
1015         }
1016 #endif
1017
1018 #if !defined(THREADED_RTS)
1019         /* Probably a real deadlock.  Send the current main thread the
1020          * Deadlock exception.
1021          */
1022         if (task->tso) {
1023             switch (task->tso->why_blocked) {
1024             case BlockedOnSTM:
1025             case BlockedOnBlackHole:
1026             case BlockedOnException:
1027             case BlockedOnMVar:
1028                 throwToSingleThreaded(cap, task->tso, 
1029                                       (StgClosure *)nonTermination_closure);
1030                 return;
1031             default:
1032                 barf("deadlock: main thread blocked in a strange way");
1033             }
1034         }
1035         return;
1036 #endif
1037     }
1038 }
1039
1040
1041 /* ----------------------------------------------------------------------------
1042  * Send pending messages (PARALLEL_HASKELL only)
1043  * ------------------------------------------------------------------------- */
1044
1045 #if defined(PARALLEL_HASKELL)
1046 static void
1047 scheduleSendPendingMessages(void)
1048 {
1049
1050 # if defined(PAR) // global Mem.Mgmt., omit for now
1051     if (PendingFetches != END_BF_QUEUE) {
1052         processFetches();
1053     }
1054 # endif
1055     
1056     if (RtsFlags.ParFlags.BufferTime) {
1057         // if we use message buffering, we must send away all message
1058         // packets which have become too old...
1059         sendOldBuffers(); 
1060     }
1061 }
1062 #endif
1063
1064 /* ----------------------------------------------------------------------------
1065  * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
1066  * ------------------------------------------------------------------------- */
1067
1068 #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
1069 static void
1070 scheduleActivateSpark(Capability *cap)
1071 {
1072     StgClosure *spark;
1073
1074 /* We only want to stay here if the run queue is empty and we want some
1075    work. We try to turn a spark into a thread, and add it to the run
1076    queue, from where it will be picked up in the next iteration of the
1077    scheduler loop.  
1078 */
1079     if (!emptyRunQueue(cap)) 
1080       /* In the threaded RTS, another task might have pushed a thread
1081          on our run queue in the meantime ? But would need a lock.. */
1082       return;
1083
1084  
1085     // Really we should be using reclaimSpark() here, but
1086     // experimentally it doesn't seem to perform as well as just
1087     // stealing from our own spark pool:
1088     // spark = reclaimSpark(cap->sparks);
1089     spark = tryStealSpark(cap->sparks); // defined in Sparks.c
1090
1091     if (spark != NULL) {
1092       debugTrace(DEBUG_sched,
1093                  "turning spark of closure %p into a thread",
1094                  (StgClosure *)spark);
1095       createSparkThread(cap,spark); // defined in Sparks.c
1096     }
1097 }
1098 #endif // PARALLEL_HASKELL || THREADED_RTS
1099
1100 /* ----------------------------------------------------------------------------
1101  * Get work from a remote node (PARALLEL_HASKELL only)
1102  * ------------------------------------------------------------------------- */
1103     
1104 #if defined(PARALLEL_HASKELL)
1105 static rtsBool /* return value used in PARALLEL_HASKELL only */
1106 scheduleGetRemoteWork (Capability *cap STG_UNUSED)
1107 {
1108 #if defined(PARALLEL_HASKELL)
1109   rtsBool receivedFinish = rtsFalse;
1110
1111   // idle() , i.e. send all buffers, wait for work
1112   if (RtsFlags.ParFlags.BufferTime) {
1113         IF_PAR_DEBUG(verbose, 
1114                 debugBelch("...send all pending data,"));
1115         {
1116           nat i;
1117           for (i=1; i<=nPEs; i++)
1118             sendImmediately(i); // send all messages away immediately
1119         }
1120   }
1121
1122   /* this would be the place for fishing in GUM... 
1123
1124      if (no-earlier-fish-around) 
1125           sendFish(choosePe());
1126    */
1127
1128   // Eden:just look for incoming messages (blocking receive)
1129   IF_PAR_DEBUG(verbose, 
1130                debugBelch("...wait for incoming messages...\n"));
1131   processMessages(cap, &receivedFinish); // blocking receive...
1132
1133
1134   return receivedFinish;
1135   // reenter scheduling look after having received something
1136
1137 #else /* !PARALLEL_HASKELL, i.e. THREADED_RTS */
1138
1139   return rtsFalse; /* return value unused in THREADED_RTS */
1140
1141 #endif /* PARALLEL_HASKELL */
1142 }
1143 #endif // PARALLEL_HASKELL || THREADED_RTS
1144
1145 /* ----------------------------------------------------------------------------
1146  * After running a thread...
1147  * ------------------------------------------------------------------------- */
1148
1149 static void
1150 schedulePostRunThread (Capability *cap, StgTSO *t)
1151 {
1152     // We have to be able to catch transactions that are in an
1153     // infinite loop as a result of seeing an inconsistent view of
1154     // memory, e.g. 
1155     //
1156     //   atomically $ do
1157     //       [a,b] <- mapM readTVar [ta,tb]
1158     //       when (a == b) loop
1159     //
1160     // and a is never equal to b given a consistent view of memory.
1161     //
1162     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1163         if (!stmValidateNestOfTransactions (t -> trec)) {
1164             debugTrace(DEBUG_sched | DEBUG_stm,
1165                        "trec %p found wasting its time", t);
1166             
1167             // strip the stack back to the
1168             // ATOMICALLY_FRAME, aborting the (nested)
1169             // transaction, and saving the stack of any
1170             // partially-evaluated thunks on the heap.
1171             throwToSingleThreaded_(cap, t, NULL, rtsTrue, NULL);
1172             
1173             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1174         }
1175     }
1176
1177   /* some statistics gathering in the parallel case */
1178 }
1179
1180 /* -----------------------------------------------------------------------------
1181  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1182  * -------------------------------------------------------------------------- */
1183
1184 static rtsBool
1185 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1186 {
1187     // did the task ask for a large block?
1188     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1189         // if so, get one and push it on the front of the nursery.
1190         bdescr *bd;
1191         lnat blocks;
1192         
1193         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1194         
1195         debugTrace(DEBUG_sched,
1196                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1197                    (long)t->id, whatNext_strs[t->what_next], blocks);
1198     
1199         // don't do this if the nursery is (nearly) full, we'll GC first.
1200         if (cap->r.rCurrentNursery->link != NULL ||
1201             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1202                                                // if the nursery has only one block.
1203             
1204             ACQUIRE_SM_LOCK
1205             bd = allocGroup( blocks );
1206             RELEASE_SM_LOCK
1207             cap->r.rNursery->n_blocks += blocks;
1208             
1209             // link the new group into the list
1210             bd->link = cap->r.rCurrentNursery;
1211             bd->u.back = cap->r.rCurrentNursery->u.back;
1212             if (cap->r.rCurrentNursery->u.back != NULL) {
1213                 cap->r.rCurrentNursery->u.back->link = bd;
1214             } else {
1215 #if !defined(THREADED_RTS)
1216                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1217                        g0s0 == cap->r.rNursery);
1218 #endif
1219                 cap->r.rNursery->blocks = bd;
1220             }             
1221             cap->r.rCurrentNursery->u.back = bd;
1222             
1223             // initialise it as a nursery block.  We initialise the
1224             // step, gen_no, and flags field of *every* sub-block in
1225             // this large block, because this is easier than making
1226             // sure that we always find the block head of a large
1227             // block whenever we call Bdescr() (eg. evacuate() and
1228             // isAlive() in the GC would both have to do this, at
1229             // least).
1230             { 
1231                 bdescr *x;
1232                 for (x = bd; x < bd + blocks; x++) {
1233                     x->step = cap->r.rNursery;
1234                     x->gen_no = 0;
1235                     x->flags = 0;
1236                 }
1237             }
1238             
1239             // This assert can be a killer if the app is doing lots
1240             // of large block allocations.
1241             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1242             
1243             // now update the nursery to point to the new block
1244             cap->r.rCurrentNursery = bd;
1245             
1246             // we might be unlucky and have another thread get on the
1247             // run queue before us and steal the large block, but in that
1248             // case the thread will just end up requesting another large
1249             // block.
1250             pushOnRunQueue(cap,t);
1251             return rtsFalse;  /* not actually GC'ing */
1252         }
1253     }
1254     
1255     debugTrace(DEBUG_sched,
1256                "--<< thread %ld (%s) stopped: HeapOverflow",
1257                (long)t->id, whatNext_strs[t->what_next]);
1258
1259     if (cap->context_switch) {
1260         // Sometimes we miss a context switch, e.g. when calling
1261         // primitives in a tight loop, MAYBE_GC() doesn't check the
1262         // context switch flag, and we end up waiting for a GC.
1263         // See #1984, and concurrent/should_run/1984
1264         cap->context_switch = 0;
1265         addToRunQueue(cap,t);
1266     } else {
1267         pushOnRunQueue(cap,t);
1268     }
1269     return rtsTrue;
1270     /* actual GC is done at the end of the while loop in schedule() */
1271 }
1272
1273 /* -----------------------------------------------------------------------------
1274  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1275  * -------------------------------------------------------------------------- */
1276
1277 static void
1278 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1279 {
1280     debugTrace (DEBUG_sched,
1281                 "--<< thread %ld (%s) stopped, StackOverflow", 
1282                 (long)t->id, whatNext_strs[t->what_next]);
1283
1284     /* just adjust the stack for this thread, then pop it back
1285      * on the run queue.
1286      */
1287     { 
1288         /* enlarge the stack */
1289         StgTSO *new_t = threadStackOverflow(cap, t);
1290         
1291         /* The TSO attached to this Task may have moved, so update the
1292          * pointer to it.
1293          */
1294         if (task->tso == t) {
1295             task->tso = new_t;
1296         }
1297         pushOnRunQueue(cap,new_t);
1298     }
1299 }
1300
1301 /* -----------------------------------------------------------------------------
1302  * Handle a thread that returned to the scheduler with ThreadYielding
1303  * -------------------------------------------------------------------------- */
1304
1305 static rtsBool
1306 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1307 {
1308     // Reset the context switch flag.  We don't do this just before
1309     // running the thread, because that would mean we would lose ticks
1310     // during GC, which can lead to unfair scheduling (a thread hogs
1311     // the CPU because the tick always arrives during GC).  This way
1312     // penalises threads that do a lot of allocation, but that seems
1313     // better than the alternative.
1314     cap->context_switch = 0;
1315     
1316     /* put the thread back on the run queue.  Then, if we're ready to
1317      * GC, check whether this is the last task to stop.  If so, wake
1318      * up the GC thread.  getThread will block during a GC until the
1319      * GC is finished.
1320      */
1321 #ifdef DEBUG
1322     if (t->what_next != prev_what_next) {
1323         debugTrace(DEBUG_sched,
1324                    "--<< thread %ld (%s) stopped to switch evaluators", 
1325                    (long)t->id, whatNext_strs[t->what_next]);
1326     } else {
1327         debugTrace(DEBUG_sched,
1328                    "--<< thread %ld (%s) stopped, yielding",
1329                    (long)t->id, whatNext_strs[t->what_next]);
1330     }
1331 #endif
1332     
1333     IF_DEBUG(sanity,
1334              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1335              checkTSO(t));
1336     ASSERT(t->_link == END_TSO_QUEUE);
1337     
1338     // Shortcut if we're just switching evaluators: don't bother
1339     // doing stack squeezing (which can be expensive), just run the
1340     // thread.
1341     if (t->what_next != prev_what_next) {
1342         return rtsTrue;
1343     }
1344
1345     addToRunQueue(cap,t);
1346
1347     return rtsFalse;
1348 }
1349
1350 /* -----------------------------------------------------------------------------
1351  * Handle a thread that returned to the scheduler with ThreadBlocked
1352  * -------------------------------------------------------------------------- */
1353
1354 static void
1355 scheduleHandleThreadBlocked( StgTSO *t
1356 #if !defined(GRAN) && !defined(DEBUG)
1357     STG_UNUSED
1358 #endif
1359     )
1360 {
1361
1362       // We don't need to do anything.  The thread is blocked, and it
1363       // has tidied up its stack and placed itself on whatever queue
1364       // it needs to be on.
1365
1366     // ASSERT(t->why_blocked != NotBlocked);
1367     // Not true: for example,
1368     //    - in THREADED_RTS, the thread may already have been woken
1369     //      up by another Capability.  This actually happens: try
1370     //      conc023 +RTS -N2.
1371     //    - the thread may have woken itself up already, because
1372     //      threadPaused() might have raised a blocked throwTo
1373     //      exception, see maybePerformBlockedException().
1374
1375 #ifdef DEBUG
1376     if (traceClass(DEBUG_sched)) {
1377         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1378                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1379         printThreadBlockage(t);
1380         debugTraceEnd();
1381     }
1382 #endif
1383 }
1384
1385 /* -----------------------------------------------------------------------------
1386  * Handle a thread that returned to the scheduler with ThreadFinished
1387  * -------------------------------------------------------------------------- */
1388
1389 static rtsBool
1390 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1391 {
1392     /* Need to check whether this was a main thread, and if so,
1393      * return with the return value.
1394      *
1395      * We also end up here if the thread kills itself with an
1396      * uncaught exception, see Exception.cmm.
1397      */
1398     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1399                (unsigned long)t->id, whatNext_strs[t->what_next]);
1400
1401       //
1402       // Check whether the thread that just completed was a bound
1403       // thread, and if so return with the result.  
1404       //
1405       // There is an assumption here that all thread completion goes
1406       // through this point; we need to make sure that if a thread
1407       // ends up in the ThreadKilled state, that it stays on the run
1408       // queue so it can be dealt with here.
1409       //
1410
1411       if (t->bound) {
1412
1413           if (t->bound != task) {
1414 #if !defined(THREADED_RTS)
1415               // Must be a bound thread that is not the topmost one.  Leave
1416               // it on the run queue until the stack has unwound to the
1417               // point where we can deal with this.  Leaving it on the run
1418               // queue also ensures that the garbage collector knows about
1419               // this thread and its return value (it gets dropped from the
1420               // step->threads list so there's no other way to find it).
1421               appendToRunQueue(cap,t);
1422               return rtsFalse;
1423 #else
1424               // this cannot happen in the threaded RTS, because a
1425               // bound thread can only be run by the appropriate Task.
1426               barf("finished bound thread that isn't mine");
1427 #endif
1428           }
1429
1430           ASSERT(task->tso == t);
1431
1432           if (t->what_next == ThreadComplete) {
1433               if (task->ret) {
1434                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1435                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1436               }
1437               task->stat = Success;
1438           } else {
1439               if (task->ret) {
1440                   *(task->ret) = NULL;
1441               }
1442               if (sched_state >= SCHED_INTERRUPTING) {
1443                   task->stat = Interrupted;
1444               } else {
1445                   task->stat = Killed;
1446               }
1447           }
1448 #ifdef DEBUG
1449           removeThreadLabel((StgWord)task->tso->id);
1450 #endif
1451           return rtsTrue; // tells schedule() to return
1452       }
1453
1454       return rtsFalse;
1455 }
1456
1457 /* -----------------------------------------------------------------------------
1458  * Perform a heap census
1459  * -------------------------------------------------------------------------- */
1460
1461 static rtsBool
1462 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1463 {
1464     // When we have +RTS -i0 and we're heap profiling, do a census at
1465     // every GC.  This lets us get repeatable runs for debugging.
1466     if (performHeapProfile ||
1467         (RtsFlags.ProfFlags.profileInterval==0 &&
1468          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1469         return rtsTrue;
1470     } else {
1471         return rtsFalse;
1472     }
1473 }
1474
1475 /* -----------------------------------------------------------------------------
1476  * Perform a garbage collection if necessary
1477  * -------------------------------------------------------------------------- */
1478
1479 static Capability *
1480 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1481 {
1482     rtsBool heap_census;
1483 #ifdef THREADED_RTS
1484     /* extern static volatile StgWord waiting_for_gc; 
1485        lives inside capability.c */
1486     rtsBool was_waiting;
1487     nat i;
1488 #endif
1489
1490 #ifdef THREADED_RTS
1491     // In order to GC, there must be no threads running Haskell code.
1492     // Therefore, the GC thread needs to hold *all* the capabilities,
1493     // and release them after the GC has completed.  
1494     //
1495     // This seems to be the simplest way: previous attempts involved
1496     // making all the threads with capabilities give up their
1497     // capabilities and sleep except for the *last* one, which
1498     // actually did the GC.  But it's quite hard to arrange for all
1499     // the other tasks to sleep and stay asleep.
1500     //
1501         
1502     /*  Other capabilities are prevented from running yet more Haskell
1503         threads if waiting_for_gc is set. Tested inside
1504         yieldCapability() and releaseCapability() in Capability.c */
1505
1506     was_waiting = cas(&waiting_for_gc, 0, 1);
1507     if (was_waiting) {
1508         do {
1509             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1510             if (cap) yieldCapability(&cap,task);
1511         } while (waiting_for_gc);
1512         return cap;  // NOTE: task->cap might have changed here
1513     }
1514
1515     setContextSwitches();
1516     for (i=0; i < n_capabilities; i++) {
1517         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1518         if (cap != &capabilities[i]) {
1519             Capability *pcap = &capabilities[i];
1520             // we better hope this task doesn't get migrated to
1521             // another Capability while we're waiting for this one.
1522             // It won't, because load balancing happens while we have
1523             // all the Capabilities, but even so it's a slightly
1524             // unsavoury invariant.
1525             task->cap = pcap;
1526             waitForReturnCapability(&pcap, task);
1527             if (pcap != &capabilities[i]) {
1528                 barf("scheduleDoGC: got the wrong capability");
1529             }
1530         }
1531     }
1532
1533     waiting_for_gc = rtsFalse;
1534 #endif
1535
1536     // so this happens periodically:
1537     if (cap) scheduleCheckBlackHoles(cap);
1538     
1539     IF_DEBUG(scheduler, printAllThreads());
1540
1541     /*
1542      * We now have all the capabilities; if we're in an interrupting
1543      * state, then we should take the opportunity to delete all the
1544      * threads in the system.
1545      */
1546     if (sched_state >= SCHED_INTERRUPTING) {
1547         deleteAllThreads(&capabilities[0]);
1548         sched_state = SCHED_SHUTTING_DOWN;
1549     }
1550     
1551     heap_census = scheduleNeedHeapProfile(rtsTrue);
1552
1553     /* everybody back, start the GC.
1554      * Could do it in this thread, or signal a condition var
1555      * to do it in another thread.  Either way, we need to
1556      * broadcast on gc_pending_cond afterward.
1557      */
1558 #if defined(THREADED_RTS)
1559     debugTrace(DEBUG_sched, "doing GC");
1560 #endif
1561     GarbageCollect(force_major || heap_census);
1562     
1563     if (heap_census) {
1564         debugTrace(DEBUG_sched, "performing heap census");
1565         heapCensus();
1566         performHeapProfile = rtsFalse;
1567     }
1568
1569 #ifdef SPARKBALANCE
1570     /* JB 
1571        Once we are all together... this would be the place to balance all
1572        spark pools. No concurrent stealing or adding of new sparks can
1573        occur. Should be defined in Sparks.c. */
1574     balanceSparkPoolsCaps(n_capabilities, capabilities);
1575 #endif
1576
1577 #if defined(THREADED_RTS)
1578     // release our stash of capabilities.
1579     for (i = 0; i < n_capabilities; i++) {
1580         if (cap != &capabilities[i]) {
1581             task->cap = &capabilities[i];
1582             releaseCapability(&capabilities[i]);
1583         }
1584     }
1585     if (cap) {
1586         task->cap = cap;
1587     } else {
1588         task->cap = NULL;
1589     }
1590 #endif
1591
1592     return cap;
1593 }
1594
1595 /* ---------------------------------------------------------------------------
1596  * Singleton fork(). Do not copy any running threads.
1597  * ------------------------------------------------------------------------- */
1598
1599 pid_t
1600 forkProcess(HsStablePtr *entry
1601 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1602             STG_UNUSED
1603 #endif
1604            )
1605 {
1606 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1607     Task *task;
1608     pid_t pid;
1609     StgTSO* t,*next;
1610     Capability *cap;
1611     nat s;
1612     
1613 #if defined(THREADED_RTS)
1614     if (RtsFlags.ParFlags.nNodes > 1) {
1615         errorBelch("forking not supported with +RTS -N<n> greater than 1");
1616         stg_exit(EXIT_FAILURE);
1617     }
1618 #endif
1619
1620     debugTrace(DEBUG_sched, "forking!");
1621     
1622     // ToDo: for SMP, we should probably acquire *all* the capabilities
1623     cap = rts_lock();
1624     
1625     // no funny business: hold locks while we fork, otherwise if some
1626     // other thread is holding a lock when the fork happens, the data
1627     // structure protected by the lock will forever be in an
1628     // inconsistent state in the child.  See also #1391.
1629     ACQUIRE_LOCK(&sched_mutex);
1630     ACQUIRE_LOCK(&cap->lock);
1631     ACQUIRE_LOCK(&cap->running_task->lock);
1632
1633     pid = fork();
1634     
1635     if (pid) { // parent
1636         
1637         RELEASE_LOCK(&sched_mutex);
1638         RELEASE_LOCK(&cap->lock);
1639         RELEASE_LOCK(&cap->running_task->lock);
1640
1641         // just return the pid
1642         rts_unlock(cap);
1643         return pid;
1644         
1645     } else { // child
1646         
1647 #if defined(THREADED_RTS)
1648         initMutex(&sched_mutex);
1649         initMutex(&cap->lock);
1650         initMutex(&cap->running_task->lock);
1651 #endif
1652
1653         // Now, all OS threads except the thread that forked are
1654         // stopped.  We need to stop all Haskell threads, including
1655         // those involved in foreign calls.  Also we need to delete
1656         // all Tasks, because they correspond to OS threads that are
1657         // now gone.
1658
1659         for (s = 0; s < total_steps; s++) {
1660           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1661             if (t->what_next == ThreadRelocated) {
1662                 next = t->_link;
1663             } else {
1664                 next = t->global_link;
1665                 // don't allow threads to catch the ThreadKilled
1666                 // exception, but we do want to raiseAsync() because these
1667                 // threads may be evaluating thunks that we need later.
1668                 deleteThread_(cap,t);
1669             }
1670           }
1671         }
1672         
1673         // Empty the run queue.  It seems tempting to let all the
1674         // killed threads stay on the run queue as zombies to be
1675         // cleaned up later, but some of them correspond to bound
1676         // threads for which the corresponding Task does not exist.
1677         cap->run_queue_hd = END_TSO_QUEUE;
1678         cap->run_queue_tl = END_TSO_QUEUE;
1679
1680         // Any suspended C-calling Tasks are no more, their OS threads
1681         // don't exist now:
1682         cap->suspended_ccalling_tasks = NULL;
1683
1684         // Empty the threads lists.  Otherwise, the garbage
1685         // collector may attempt to resurrect some of these threads.
1686         for (s = 0; s < total_steps; s++) {
1687             all_steps[s].threads = END_TSO_QUEUE;
1688         }
1689
1690         // Wipe the task list, except the current Task.
1691         ACQUIRE_LOCK(&sched_mutex);
1692         for (task = all_tasks; task != NULL; task=task->all_link) {
1693             if (task != cap->running_task) {
1694 #if defined(THREADED_RTS)
1695                 initMutex(&task->lock); // see #1391
1696 #endif
1697                 discardTask(task);
1698             }
1699         }
1700         RELEASE_LOCK(&sched_mutex);
1701
1702 #if defined(THREADED_RTS)
1703         // Wipe our spare workers list, they no longer exist.  New
1704         // workers will be created if necessary.
1705         cap->spare_workers = NULL;
1706         cap->returning_tasks_hd = NULL;
1707         cap->returning_tasks_tl = NULL;
1708 #endif
1709
1710         // On Unix, all timers are reset in the child, so we need to start
1711         // the timer again.
1712         initTimer();
1713         startTimer();
1714
1715         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
1716         rts_checkSchedStatus("forkProcess",cap);
1717         
1718         rts_unlock(cap);
1719         hs_exit();                      // clean up and exit
1720         stg_exit(EXIT_SUCCESS);
1721     }
1722 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1723     barf("forkProcess#: primop not supported on this platform, sorry!\n");
1724     return -1;
1725 #endif
1726 }
1727
1728 /* ---------------------------------------------------------------------------
1729  * Delete all the threads in the system
1730  * ------------------------------------------------------------------------- */
1731    
1732 static void
1733 deleteAllThreads ( Capability *cap )
1734 {
1735     // NOTE: only safe to call if we own all capabilities.
1736
1737     StgTSO* t, *next;
1738     nat s;
1739
1740     debugTrace(DEBUG_sched,"deleting all threads");
1741     for (s = 0; s < total_steps; s++) {
1742       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
1743         if (t->what_next == ThreadRelocated) {
1744             next = t->_link;
1745         } else {
1746             next = t->global_link;
1747             deleteThread(cap,t);
1748         }
1749       }
1750     }      
1751
1752     // The run queue now contains a bunch of ThreadKilled threads.  We
1753     // must not throw these away: the main thread(s) will be in there
1754     // somewhere, and the main scheduler loop has to deal with it.
1755     // Also, the run queue is the only thing keeping these threads from
1756     // being GC'd, and we don't want the "main thread has been GC'd" panic.
1757
1758 #if !defined(THREADED_RTS)
1759     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1760     ASSERT(sleeping_queue == END_TSO_QUEUE);
1761 #endif
1762 }
1763
1764 /* -----------------------------------------------------------------------------
1765    Managing the suspended_ccalling_tasks list.
1766    Locks required: sched_mutex
1767    -------------------------------------------------------------------------- */
1768
1769 STATIC_INLINE void
1770 suspendTask (Capability *cap, Task *task)
1771 {
1772     ASSERT(task->next == NULL && task->prev == NULL);
1773     task->next = cap->suspended_ccalling_tasks;
1774     task->prev = NULL;
1775     if (cap->suspended_ccalling_tasks) {
1776         cap->suspended_ccalling_tasks->prev = task;
1777     }
1778     cap->suspended_ccalling_tasks = task;
1779 }
1780
1781 STATIC_INLINE void
1782 recoverSuspendedTask (Capability *cap, Task *task)
1783 {
1784     if (task->prev) {
1785         task->prev->next = task->next;
1786     } else {
1787         ASSERT(cap->suspended_ccalling_tasks == task);
1788         cap->suspended_ccalling_tasks = task->next;
1789     }
1790     if (task->next) {
1791         task->next->prev = task->prev;
1792     }
1793     task->next = task->prev = NULL;
1794 }
1795
1796 /* ---------------------------------------------------------------------------
1797  * Suspending & resuming Haskell threads.
1798  * 
1799  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1800  * its capability before calling the C function.  This allows another
1801  * task to pick up the capability and carry on running Haskell
1802  * threads.  It also means that if the C call blocks, it won't lock
1803  * the whole system.
1804  *
1805  * The Haskell thread making the C call is put to sleep for the
1806  * duration of the call, on the susepended_ccalling_threads queue.  We
1807  * give out a token to the task, which it can use to resume the thread
1808  * on return from the C function.
1809  * ------------------------------------------------------------------------- */
1810    
1811 void *
1812 suspendThread (StgRegTable *reg)
1813 {
1814   Capability *cap;
1815   int saved_errno;
1816   StgTSO *tso;
1817   Task *task;
1818 #if mingw32_HOST_OS
1819   StgWord32 saved_winerror;
1820 #endif
1821
1822   saved_errno = errno;
1823 #if mingw32_HOST_OS
1824   saved_winerror = GetLastError();
1825 #endif
1826
1827   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
1828    */
1829   cap = regTableToCapability(reg);
1830
1831   task = cap->running_task;
1832   tso = cap->r.rCurrentTSO;
1833
1834   debugTrace(DEBUG_sched, 
1835              "thread %lu did a safe foreign call", 
1836              (unsigned long)cap->r.rCurrentTSO->id);
1837
1838   // XXX this might not be necessary --SDM
1839   tso->what_next = ThreadRunGHC;
1840
1841   threadPaused(cap,tso);
1842
1843   if ((tso->flags & TSO_BLOCKEX) == 0)  {
1844       tso->why_blocked = BlockedOnCCall;
1845       tso->flags |= TSO_BLOCKEX;
1846       tso->flags &= ~TSO_INTERRUPTIBLE;
1847   } else {
1848       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
1849   }
1850
1851   // Hand back capability
1852   task->suspended_tso = tso;
1853
1854   ACQUIRE_LOCK(&cap->lock);
1855
1856   suspendTask(cap,task);
1857   cap->in_haskell = rtsFalse;
1858   releaseCapability_(cap,rtsFalse);
1859   
1860   RELEASE_LOCK(&cap->lock);
1861
1862 #if defined(THREADED_RTS)
1863   /* Preparing to leave the RTS, so ensure there's a native thread/task
1864      waiting to take over.
1865   */
1866   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
1867 #endif
1868
1869   errno = saved_errno;
1870 #if mingw32_HOST_OS
1871   SetLastError(saved_winerror);
1872 #endif
1873   return task;
1874 }
1875
1876 StgRegTable *
1877 resumeThread (void *task_)
1878 {
1879     StgTSO *tso;
1880     Capability *cap;
1881     Task *task = task_;
1882     int saved_errno;
1883 #if mingw32_HOST_OS
1884     StgWord32 saved_winerror;
1885 #endif
1886
1887     saved_errno = errno;
1888 #if mingw32_HOST_OS
1889     saved_winerror = GetLastError();
1890 #endif
1891
1892     cap = task->cap;
1893     // Wait for permission to re-enter the RTS with the result.
1894     waitForReturnCapability(&cap,task);
1895     // we might be on a different capability now... but if so, our
1896     // entry on the suspended_ccalling_tasks list will also have been
1897     // migrated.
1898
1899     // Remove the thread from the suspended list
1900     recoverSuspendedTask(cap,task);
1901
1902     tso = task->suspended_tso;
1903     task->suspended_tso = NULL;
1904     tso->_link = END_TSO_QUEUE; // no write barrier reqd
1905     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
1906     
1907     if (tso->why_blocked == BlockedOnCCall) {
1908         awakenBlockedExceptionQueue(cap,tso);
1909         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
1910     }
1911     
1912     /* Reset blocking status */
1913     tso->why_blocked  = NotBlocked;
1914     
1915     cap->r.rCurrentTSO = tso;
1916     cap->in_haskell = rtsTrue;
1917     errno = saved_errno;
1918 #if mingw32_HOST_OS
1919     SetLastError(saved_winerror);
1920 #endif
1921
1922     /* We might have GC'd, mark the TSO dirty again */
1923     dirty_TSO(cap,tso);
1924
1925     IF_DEBUG(sanity, checkTSO(tso));
1926
1927     return &cap->r;
1928 }
1929
1930 /* ---------------------------------------------------------------------------
1931  * scheduleThread()
1932  *
1933  * scheduleThread puts a thread on the end  of the runnable queue.
1934  * This will usually be done immediately after a thread is created.
1935  * The caller of scheduleThread must create the thread using e.g.
1936  * createThread and push an appropriate closure
1937  * on this thread's stack before the scheduler is invoked.
1938  * ------------------------------------------------------------------------ */
1939
1940 void
1941 scheduleThread(Capability *cap, StgTSO *tso)
1942 {
1943     // The thread goes at the *end* of the run-queue, to avoid possible
1944     // starvation of any threads already on the queue.
1945     appendToRunQueue(cap,tso);
1946 }
1947
1948 void
1949 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
1950 {
1951 #if defined(THREADED_RTS)
1952     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
1953                               // move this thread from now on.
1954     cpu %= RtsFlags.ParFlags.nNodes;
1955     if (cpu == cap->no) {
1956         appendToRunQueue(cap,tso);
1957     } else {
1958         wakeupThreadOnCapability(cap, &capabilities[cpu], tso);
1959     }
1960 #else
1961     appendToRunQueue(cap,tso);
1962 #endif
1963 }
1964
1965 Capability *
1966 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
1967 {
1968     Task *task;
1969
1970     // We already created/initialised the Task
1971     task = cap->running_task;
1972
1973     // This TSO is now a bound thread; make the Task and TSO
1974     // point to each other.
1975     tso->bound = task;
1976     tso->cap = cap;
1977
1978     task->tso = tso;
1979     task->ret = ret;
1980     task->stat = NoStatus;
1981
1982     appendToRunQueue(cap,tso);
1983
1984     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
1985
1986     cap = schedule(cap,task);
1987
1988     ASSERT(task->stat != NoStatus);
1989     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
1990
1991     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
1992     return cap;
1993 }
1994
1995 /* ----------------------------------------------------------------------------
1996  * Starting Tasks
1997  * ------------------------------------------------------------------------- */
1998
1999 #if defined(THREADED_RTS)
2000 void OSThreadProcAttr
2001 workerStart(Task *task)
2002 {
2003     Capability *cap;
2004
2005     // See startWorkerTask().
2006     ACQUIRE_LOCK(&task->lock);
2007     cap = task->cap;
2008     RELEASE_LOCK(&task->lock);
2009
2010     // set the thread-local pointer to the Task:
2011     taskEnter(task);
2012
2013     // schedule() runs without a lock.
2014     cap = schedule(cap,task);
2015
2016     // On exit from schedule(), we have a Capability.
2017     releaseCapability(cap);
2018     workerTaskStop(task);
2019 }
2020 #endif
2021
2022 /* ---------------------------------------------------------------------------
2023  * initScheduler()
2024  *
2025  * Initialise the scheduler.  This resets all the queues - if the
2026  * queues contained any threads, they'll be garbage collected at the
2027  * next pass.
2028  *
2029  * ------------------------------------------------------------------------ */
2030
2031 void 
2032 initScheduler(void)
2033 {
2034 #if !defined(THREADED_RTS)
2035   blocked_queue_hd  = END_TSO_QUEUE;
2036   blocked_queue_tl  = END_TSO_QUEUE;
2037   sleeping_queue    = END_TSO_QUEUE;
2038 #endif
2039
2040   blackhole_queue   = END_TSO_QUEUE;
2041
2042   sched_state    = SCHED_RUNNING;
2043   recent_activity = ACTIVITY_YES;
2044
2045 #if defined(THREADED_RTS)
2046   /* Initialise the mutex and condition variables used by
2047    * the scheduler. */
2048   initMutex(&sched_mutex);
2049 #endif
2050   
2051   ACQUIRE_LOCK(&sched_mutex);
2052
2053   /* A capability holds the state a native thread needs in
2054    * order to execute STG code. At least one capability is
2055    * floating around (only THREADED_RTS builds have more than one).
2056    */
2057   initCapabilities();
2058
2059   initTaskManager();
2060
2061 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2062   initSparkPools();
2063 #endif
2064
2065 #if defined(THREADED_RTS)
2066   /*
2067    * Eagerly start one worker to run each Capability, except for
2068    * Capability 0.  The idea is that we're probably going to start a
2069    * bound thread on Capability 0 pretty soon, so we don't want a
2070    * worker task hogging it.
2071    */
2072   { 
2073       nat i;
2074       Capability *cap;
2075       for (i = 1; i < n_capabilities; i++) {
2076           cap = &capabilities[i];
2077           ACQUIRE_LOCK(&cap->lock);
2078           startWorkerTask(cap, workerStart);
2079           RELEASE_LOCK(&cap->lock);
2080       }
2081   }
2082 #endif
2083
2084   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2085
2086   RELEASE_LOCK(&sched_mutex);
2087 }
2088
2089 void
2090 exitScheduler(
2091     rtsBool wait_foreign
2092 #if !defined(THREADED_RTS)
2093                          __attribute__((unused))
2094 #endif
2095 )
2096                /* see Capability.c, shutdownCapability() */
2097 {
2098     Task *task = NULL;
2099
2100 #if defined(THREADED_RTS)
2101     ACQUIRE_LOCK(&sched_mutex);
2102     task = newBoundTask();
2103     RELEASE_LOCK(&sched_mutex);
2104 #endif
2105
2106     // If we haven't killed all the threads yet, do it now.
2107     if (sched_state < SCHED_SHUTTING_DOWN) {
2108         sched_state = SCHED_INTERRUPTING;
2109         scheduleDoGC(NULL,task,rtsFalse);    
2110     }
2111     sched_state = SCHED_SHUTTING_DOWN;
2112
2113 #if defined(THREADED_RTS)
2114     { 
2115         nat i;
2116         
2117         for (i = 0; i < n_capabilities; i++) {
2118             shutdownCapability(&capabilities[i], task, wait_foreign);
2119         }
2120         boundTaskExiting(task);
2121         stopTaskManager();
2122     }
2123 #endif
2124 }
2125
2126 void
2127 freeScheduler( void )
2128 {
2129     freeCapabilities();
2130     freeTaskManager();
2131     if (n_capabilities != 1) {
2132         stgFree(capabilities);
2133     }
2134 #if defined(THREADED_RTS)
2135     closeMutex(&sched_mutex);
2136 #endif
2137 }
2138
2139 /* -----------------------------------------------------------------------------
2140    performGC
2141
2142    This is the interface to the garbage collector from Haskell land.
2143    We provide this so that external C code can allocate and garbage
2144    collect when called from Haskell via _ccall_GC.
2145    -------------------------------------------------------------------------- */
2146
2147 static void
2148 performGC_(rtsBool force_major)
2149 {
2150     Task *task;
2151     // We must grab a new Task here, because the existing Task may be
2152     // associated with a particular Capability, and chained onto the 
2153     // suspended_ccalling_tasks queue.
2154     ACQUIRE_LOCK(&sched_mutex);
2155     task = newBoundTask();
2156     RELEASE_LOCK(&sched_mutex);
2157     scheduleDoGC(NULL,task,force_major);
2158     boundTaskExiting(task);
2159 }
2160
2161 void
2162 performGC(void)
2163 {
2164     performGC_(rtsFalse);
2165 }
2166
2167 void
2168 performMajorGC(void)
2169 {
2170     performGC_(rtsTrue);
2171 }
2172
2173 /* -----------------------------------------------------------------------------
2174    Stack overflow
2175
2176    If the thread has reached its maximum stack size, then raise the
2177    StackOverflow exception in the offending thread.  Otherwise
2178    relocate the TSO into a larger chunk of memory and adjust its stack
2179    size appropriately.
2180    -------------------------------------------------------------------------- */
2181
2182 static StgTSO *
2183 threadStackOverflow(Capability *cap, StgTSO *tso)
2184 {
2185   nat new_stack_size, stack_words;
2186   lnat new_tso_size;
2187   StgPtr new_sp;
2188   StgTSO *dest;
2189
2190   IF_DEBUG(sanity,checkTSO(tso));
2191
2192   // don't allow throwTo() to modify the blocked_exceptions queue
2193   // while we are moving the TSO:
2194   lockClosure((StgClosure *)tso);
2195
2196   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2197       // NB. never raise a StackOverflow exception if the thread is
2198       // inside Control.Exceptino.block.  It is impractical to protect
2199       // against stack overflow exceptions, since virtually anything
2200       // can raise one (even 'catch'), so this is the only sensible
2201       // thing to do here.  See bug #767.
2202
2203       debugTrace(DEBUG_gc,
2204                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2205                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2206       IF_DEBUG(gc,
2207                /* If we're debugging, just print out the top of the stack */
2208                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2209                                                 tso->sp+64)));
2210
2211       // Send this thread the StackOverflow exception
2212       unlockTSO(tso);
2213       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2214       return tso;
2215   }
2216
2217   /* Try to double the current stack size.  If that takes us over the
2218    * maximum stack size for this thread, then use the maximum instead
2219    * (that is, unless we're already at or over the max size and we
2220    * can't raise the StackOverflow exception (see above), in which
2221    * case just double the size). Finally round up so the TSO ends up as
2222    * a whole number of blocks.
2223    */
2224   if (tso->stack_size >= tso->max_stack_size) {
2225       new_stack_size = tso->stack_size * 2;
2226   } else { 
2227       new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2228   }
2229   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2230                                        TSO_STRUCT_SIZE)/sizeof(W_);
2231   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2232   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2233
2234   debugTrace(DEBUG_sched, 
2235              "increasing stack size from %ld words to %d.",
2236              (long)tso->stack_size, new_stack_size);
2237
2238   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2239   TICK_ALLOC_TSO(new_stack_size,0);
2240
2241   /* copy the TSO block and the old stack into the new area */
2242   memcpy(dest,tso,TSO_STRUCT_SIZE);
2243   stack_words = tso->stack + tso->stack_size - tso->sp;
2244   new_sp = (P_)dest + new_tso_size - stack_words;
2245   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2246
2247   /* relocate the stack pointers... */
2248   dest->sp         = new_sp;
2249   dest->stack_size = new_stack_size;
2250         
2251   /* Mark the old TSO as relocated.  We have to check for relocated
2252    * TSOs in the garbage collector and any primops that deal with TSOs.
2253    *
2254    * It's important to set the sp value to just beyond the end
2255    * of the stack, so we don't attempt to scavenge any part of the
2256    * dead TSO's stack.
2257    */
2258   tso->what_next = ThreadRelocated;
2259   setTSOLink(cap,tso,dest);
2260   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2261   tso->why_blocked = NotBlocked;
2262
2263   IF_PAR_DEBUG(verbose,
2264                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2265                      tso->id, tso, tso->stack_size);
2266                /* If we're debugging, just print out the top of the stack */
2267                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2268                                                 tso->sp+64)));
2269   
2270   unlockTSO(dest);
2271   unlockTSO(tso);
2272
2273   IF_DEBUG(sanity,checkTSO(dest));
2274 #if 0
2275   IF_DEBUG(scheduler,printTSO(dest));
2276 #endif
2277
2278   return dest;
2279 }
2280
2281 static StgTSO *
2282 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2283 {
2284     bdescr *bd, *new_bd;
2285     lnat free_w, tso_size_w;
2286     StgTSO *new_tso;
2287
2288     tso_size_w = tso_sizeW(tso);
2289
2290     if (tso_size_w < MBLOCK_SIZE_W || 
2291         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2292     {
2293         return tso;
2294     }
2295
2296     // don't allow throwTo() to modify the blocked_exceptions queue
2297     // while we are moving the TSO:
2298     lockClosure((StgClosure *)tso);
2299
2300     // this is the number of words we'll free
2301     free_w = round_to_mblocks(tso_size_w/2);
2302
2303     bd = Bdescr((StgPtr)tso);
2304     new_bd = splitLargeBlock(bd, free_w / BLOCK_SIZE_W);
2305     bd->free = bd->start + TSO_STRUCT_SIZEW;
2306
2307     new_tso = (StgTSO *)new_bd->start;
2308     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2309     new_tso->stack_size = new_bd->free - new_tso->stack;
2310
2311     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2312                (long)tso->id, tso_size_w, tso_sizeW(new_tso));
2313
2314     tso->what_next = ThreadRelocated;
2315     tso->_link = new_tso; // no write barrier reqd: same generation
2316
2317     // The TSO attached to this Task may have moved, so update the
2318     // pointer to it.
2319     if (task->tso == tso) {
2320         task->tso = new_tso;
2321     }
2322
2323     unlockTSO(new_tso);
2324     unlockTSO(tso);
2325
2326     IF_DEBUG(sanity,checkTSO(new_tso));
2327
2328     return new_tso;
2329 }
2330
2331 /* ---------------------------------------------------------------------------
2332    Interrupt execution
2333    - usually called inside a signal handler so it mustn't do anything fancy.   
2334    ------------------------------------------------------------------------ */
2335
2336 void
2337 interruptStgRts(void)
2338 {
2339     sched_state = SCHED_INTERRUPTING;
2340     setContextSwitches();
2341     wakeUpRts();
2342 }
2343
2344 /* -----------------------------------------------------------------------------
2345    Wake up the RTS
2346    
2347    This function causes at least one OS thread to wake up and run the
2348    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2349    an external event has arrived that may need servicing (eg. a
2350    keyboard interrupt).
2351
2352    In the single-threaded RTS we don't do anything here; we only have
2353    one thread anyway, and the event that caused us to want to wake up
2354    will have interrupted any blocking system call in progress anyway.
2355    -------------------------------------------------------------------------- */
2356
2357 void
2358 wakeUpRts(void)
2359 {
2360 #if defined(THREADED_RTS)
2361     // This forces the IO Manager thread to wakeup, which will
2362     // in turn ensure that some OS thread wakes up and runs the
2363     // scheduler loop, which will cause a GC and deadlock check.
2364     ioManagerWakeup();
2365 #endif
2366 }
2367
2368 /* -----------------------------------------------------------------------------
2369  * checkBlackHoles()
2370  *
2371  * Check the blackhole_queue for threads that can be woken up.  We do
2372  * this periodically: before every GC, and whenever the run queue is
2373  * empty.
2374  *
2375  * An elegant solution might be to just wake up all the blocked
2376  * threads with awakenBlockedQueue occasionally: they'll go back to
2377  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2378  * doesn't give us a way to tell whether we've actually managed to
2379  * wake up any threads, so we would be busy-waiting.
2380  *
2381  * -------------------------------------------------------------------------- */
2382
2383 static rtsBool
2384 checkBlackHoles (Capability *cap)
2385 {
2386     StgTSO **prev, *t;
2387     rtsBool any_woke_up = rtsFalse;
2388     StgHalfWord type;
2389
2390     // blackhole_queue is global:
2391     ASSERT_LOCK_HELD(&sched_mutex);
2392
2393     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2394
2395     // ASSUMES: sched_mutex
2396     prev = &blackhole_queue;
2397     t = blackhole_queue;
2398     while (t != END_TSO_QUEUE) {
2399         ASSERT(t->why_blocked == BlockedOnBlackHole);
2400         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2401         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2402             IF_DEBUG(sanity,checkTSO(t));
2403             t = unblockOne(cap, t);
2404             *prev = t;
2405             any_woke_up = rtsTrue;
2406         } else {
2407             prev = &t->_link;
2408             t = t->_link;
2409         }
2410     }
2411
2412     return any_woke_up;
2413 }
2414
2415 /* -----------------------------------------------------------------------------
2416    Deleting threads
2417
2418    This is used for interruption (^C) and forking, and corresponds to
2419    raising an exception but without letting the thread catch the
2420    exception.
2421    -------------------------------------------------------------------------- */
2422
2423 static void 
2424 deleteThread (Capability *cap, StgTSO *tso)
2425 {
2426     // NOTE: must only be called on a TSO that we have exclusive
2427     // access to, because we will call throwToSingleThreaded() below.
2428     // The TSO must be on the run queue of the Capability we own, or 
2429     // we must own all Capabilities.
2430
2431     if (tso->why_blocked != BlockedOnCCall &&
2432         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2433         throwToSingleThreaded(cap,tso,NULL);
2434     }
2435 }
2436
2437 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2438 static void 
2439 deleteThread_(Capability *cap, StgTSO *tso)
2440 { // for forkProcess only:
2441   // like deleteThread(), but we delete threads in foreign calls, too.
2442
2443     if (tso->why_blocked == BlockedOnCCall ||
2444         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2445         unblockOne(cap,tso);
2446         tso->what_next = ThreadKilled;
2447     } else {
2448         deleteThread(cap,tso);
2449     }
2450 }
2451 #endif
2452
2453 /* -----------------------------------------------------------------------------
2454    raiseExceptionHelper
2455    
2456    This function is called by the raise# primitve, just so that we can
2457    move some of the tricky bits of raising an exception from C-- into
2458    C.  Who knows, it might be a useful re-useable thing here too.
2459    -------------------------------------------------------------------------- */
2460
2461 StgWord
2462 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2463 {
2464     Capability *cap = regTableToCapability(reg);
2465     StgThunk *raise_closure = NULL;
2466     StgPtr p, next;
2467     StgRetInfoTable *info;
2468     //
2469     // This closure represents the expression 'raise# E' where E
2470     // is the exception raise.  It is used to overwrite all the
2471     // thunks which are currently under evaluataion.
2472     //
2473
2474     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2475     // LDV profiling: stg_raise_info has THUNK as its closure
2476     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2477     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2478     // 1 does not cause any problem unless profiling is performed.
2479     // However, when LDV profiling goes on, we need to linearly scan
2480     // small object pool, where raise_closure is stored, so we should
2481     // use MIN_UPD_SIZE.
2482     //
2483     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2484     //                                 sizeofW(StgClosure)+1);
2485     //
2486
2487     //
2488     // Walk up the stack, looking for the catch frame.  On the way,
2489     // we update any closures pointed to from update frames with the
2490     // raise closure that we just built.
2491     //
2492     p = tso->sp;
2493     while(1) {
2494         info = get_ret_itbl((StgClosure *)p);
2495         next = p + stack_frame_sizeW((StgClosure *)p);
2496         switch (info->i.type) {
2497             
2498         case UPDATE_FRAME:
2499             // Only create raise_closure if we need to.
2500             if (raise_closure == NULL) {
2501                 raise_closure = 
2502                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2503                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2504                 raise_closure->payload[0] = exception;
2505             }
2506             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2507             p = next;
2508             continue;
2509
2510         case ATOMICALLY_FRAME:
2511             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2512             tso->sp = p;
2513             return ATOMICALLY_FRAME;
2514             
2515         case CATCH_FRAME:
2516             tso->sp = p;
2517             return CATCH_FRAME;
2518
2519         case CATCH_STM_FRAME:
2520             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
2521             tso->sp = p;
2522             return CATCH_STM_FRAME;
2523             
2524         case STOP_FRAME:
2525             tso->sp = p;
2526             return STOP_FRAME;
2527
2528         case CATCH_RETRY_FRAME:
2529         default:
2530             p = next; 
2531             continue;
2532         }
2533     }
2534 }
2535
2536
2537 /* -----------------------------------------------------------------------------
2538    findRetryFrameHelper
2539
2540    This function is called by the retry# primitive.  It traverses the stack
2541    leaving tso->sp referring to the frame which should handle the retry.  
2542
2543    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
2544    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
2545
2546    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
2547    create) because retries are not considered to be exceptions, despite the
2548    similar implementation.
2549
2550    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
2551    not be created within memory transactions.
2552    -------------------------------------------------------------------------- */
2553
2554 StgWord
2555 findRetryFrameHelper (StgTSO *tso)
2556 {
2557   StgPtr           p, next;
2558   StgRetInfoTable *info;
2559
2560   p = tso -> sp;
2561   while (1) {
2562     info = get_ret_itbl((StgClosure *)p);
2563     next = p + stack_frame_sizeW((StgClosure *)p);
2564     switch (info->i.type) {
2565       
2566     case ATOMICALLY_FRAME:
2567         debugTrace(DEBUG_stm,
2568                    "found ATOMICALLY_FRAME at %p during retry", p);
2569         tso->sp = p;
2570         return ATOMICALLY_FRAME;
2571       
2572     case CATCH_RETRY_FRAME:
2573         debugTrace(DEBUG_stm,
2574                    "found CATCH_RETRY_FRAME at %p during retrry", p);
2575         tso->sp = p;
2576         return CATCH_RETRY_FRAME;
2577       
2578     case CATCH_STM_FRAME: {
2579         StgTRecHeader *trec = tso -> trec;
2580         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
2581         debugTrace(DEBUG_stm,
2582                    "found CATCH_STM_FRAME at %p during retry", p);
2583         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
2584         stmAbortTransaction(tso -> cap, trec);
2585         stmFreeAbortedTRec(tso -> cap, trec);
2586         tso -> trec = outer;
2587         p = next; 
2588         continue;
2589     }
2590       
2591
2592     default:
2593       ASSERT(info->i.type != CATCH_FRAME);
2594       ASSERT(info->i.type != STOP_FRAME);
2595       p = next; 
2596       continue;
2597     }
2598   }
2599 }
2600
2601 /* -----------------------------------------------------------------------------
2602    resurrectThreads is called after garbage collection on the list of
2603    threads found to be garbage.  Each of these threads will be woken
2604    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2605    on an MVar, or NonTermination if the thread was blocked on a Black
2606    Hole.
2607
2608    Locks: assumes we hold *all* the capabilities.
2609    -------------------------------------------------------------------------- */
2610
2611 void
2612 resurrectThreads (StgTSO *threads)
2613 {
2614     StgTSO *tso, *next;
2615     Capability *cap;
2616     step *step;
2617
2618     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2619         next = tso->global_link;
2620
2621         step = Bdescr((P_)tso)->step;
2622         tso->global_link = step->threads;
2623         step->threads = tso;
2624
2625         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
2626         
2627         // Wake up the thread on the Capability it was last on
2628         cap = tso->cap;
2629         
2630         switch (tso->why_blocked) {
2631         case BlockedOnMVar:
2632         case BlockedOnException:
2633             /* Called by GC - sched_mutex lock is currently held. */
2634             throwToSingleThreaded(cap, tso,
2635                                   (StgClosure *)blockedOnDeadMVar_closure);
2636             break;
2637         case BlockedOnBlackHole:
2638             throwToSingleThreaded(cap, tso,
2639                                   (StgClosure *)nonTermination_closure);
2640             break;
2641         case BlockedOnSTM:
2642             throwToSingleThreaded(cap, tso,
2643                                   (StgClosure *)blockedIndefinitely_closure);
2644             break;
2645         case NotBlocked:
2646             /* This might happen if the thread was blocked on a black hole
2647              * belonging to a thread that we've just woken up (raiseAsync
2648              * can wake up threads, remember...).
2649              */
2650             continue;
2651         default:
2652             barf("resurrectThreads: thread blocked in a strange way");
2653         }
2654     }
2655 }
2656
2657 /* -----------------------------------------------------------------------------
2658    performPendingThrowTos is called after garbage collection, and
2659    passed a list of threads that were found to have pending throwTos
2660    (tso->blocked_exceptions was not empty), and were blocked.
2661    Normally this doesn't happen, because we would deliver the
2662    exception directly if the target thread is blocked, but there are
2663    small windows where it might occur on a multiprocessor (see
2664    throwTo()).
2665
2666    NB. we must be holding all the capabilities at this point, just
2667    like resurrectThreads().
2668    -------------------------------------------------------------------------- */
2669
2670 void
2671 performPendingThrowTos (StgTSO *threads)
2672 {
2673     StgTSO *tso, *next;
2674     Capability *cap;
2675     step *step;
2676
2677     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2678         next = tso->global_link;
2679
2680         step = Bdescr((P_)tso)->step;
2681         tso->global_link = step->threads;
2682         step->threads = tso;
2683
2684         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
2685         
2686         cap = tso->cap;
2687         maybePerformBlockedException(cap, tso);
2688     }
2689 }