Follow extensible exception changes
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* flag set by signal handler to precipitate a context switch
122  * LOCK: none (just an advisory flag)
123  */
124 int context_switch = 0;
125
126 /* flag that tracks whether we have done any execution in this time slice.
127  * LOCK: currently none, perhaps we should lock (but needs to be
128  * updated in the fast path of the scheduler).
129  */
130 nat recent_activity = ACTIVITY_YES;
131
132 /* if this flag is set as well, give up execution
133  * LOCK: none (changes once, from false->true)
134  */
135 rtsBool sched_state = SCHED_RUNNING;
136
137 #if defined(GRAN)
138 StgTSO *CurrentTSO;
139 #endif
140
141 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
142  *  exists - earlier gccs apparently didn't.
143  *  -= chak
144  */
145 StgTSO dummy_tso;
146
147 /*
148  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
149  * in an MT setting, needed to signal that a worker thread shouldn't hang around
150  * in the scheduler when it is out of work.
151  */
152 rtsBool shutting_down_scheduler = rtsFalse;
153
154 /*
155  * This mutex protects most of the global scheduler data in
156  * the THREADED_RTS runtime.
157  */
158 #if defined(THREADED_RTS)
159 Mutex sched_mutex;
160 #endif
161
162 #if defined(PARALLEL_HASKELL)
163 StgTSO *LastTSO;
164 rtsTime TimeOfLastYield;
165 rtsBool emitSchedule = rtsTrue;
166 #endif
167
168 #if !defined(mingw32_HOST_OS)
169 #define FORKPROCESS_PRIMOP_SUPPORTED
170 #endif
171
172 /* -----------------------------------------------------------------------------
173  * static function prototypes
174  * -------------------------------------------------------------------------- */
175
176 static Capability *schedule (Capability *initialCapability, Task *task);
177
178 //
179 // These function all encapsulate parts of the scheduler loop, and are
180 // abstracted only to make the structure and control flow of the
181 // scheduler clearer.
182 //
183 static void schedulePreLoop (void);
184 #if defined(THREADED_RTS)
185 static void schedulePushWork(Capability *cap, Task *task);
186 #endif
187 static void scheduleStartSignalHandlers (Capability *cap);
188 static void scheduleCheckBlockedThreads (Capability *cap);
189 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
190 static void scheduleCheckBlackHoles (Capability *cap);
191 static void scheduleDetectDeadlock (Capability *cap, Task *task);
192 #if defined(GRAN)
193 static StgTSO *scheduleProcessEvent(rtsEvent *event);
194 #endif
195 #if defined(PARALLEL_HASKELL)
196 static StgTSO *scheduleSendPendingMessages(void);
197 static void scheduleActivateSpark(void);
198 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
199 #endif
200 #if defined(PAR) || defined(GRAN)
201 static void scheduleGranParReport(void);
202 #endif
203 static void schedulePostRunThread(StgTSO *t);
204 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
205 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
206                                          StgTSO *t);
207 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
208                                     nat prev_what_next );
209 static void scheduleHandleThreadBlocked( StgTSO *t );
210 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
211                                              StgTSO *t );
212 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
213 static Capability *scheduleDoGC(Capability *cap, Task *task,
214                                 rtsBool force_major);
215
216 static rtsBool checkBlackHoles(Capability *cap);
217
218 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
219 static StgTSO *threadStackUnderflow(Task *task, StgTSO *tso);
220
221 static void deleteThread (Capability *cap, StgTSO *tso);
222 static void deleteAllThreads (Capability *cap);
223
224 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
225 static void deleteThread_(Capability *cap, StgTSO *tso);
226 #endif
227
228 #if defined(PARALLEL_HASKELL)
229 StgTSO * createSparkThread(rtsSpark spark);
230 StgTSO * activateSpark (rtsSpark spark);  
231 #endif
232
233 #ifdef DEBUG
234 static char *whatNext_strs[] = {
235   "(unknown)",
236   "ThreadRunGHC",
237   "ThreadInterpret",
238   "ThreadKilled",
239   "ThreadRelocated",
240   "ThreadComplete"
241 };
242 #endif
243
244 /* -----------------------------------------------------------------------------
245  * Putting a thread on the run queue: different scheduling policies
246  * -------------------------------------------------------------------------- */
247
248 STATIC_INLINE void
249 addToRunQueue( Capability *cap, StgTSO *t )
250 {
251 #if defined(PARALLEL_HASKELL)
252     if (RtsFlags.ParFlags.doFairScheduling) { 
253         // this does round-robin scheduling; good for concurrency
254         appendToRunQueue(cap,t);
255     } else {
256         // this does unfair scheduling; good for parallelism
257         pushOnRunQueue(cap,t);
258     }
259 #else
260     // this does round-robin scheduling; good for concurrency
261     appendToRunQueue(cap,t);
262 #endif
263 }
264
265 /* ---------------------------------------------------------------------------
266    Main scheduling loop.
267
268    We use round-robin scheduling, each thread returning to the
269    scheduler loop when one of these conditions is detected:
270
271       * out of heap space
272       * timer expires (thread yields)
273       * thread blocks
274       * thread ends
275       * stack overflow
276
277    GRAN version:
278      In a GranSim setup this loop iterates over the global event queue.
279      This revolves around the global event queue, which determines what 
280      to do next. Therefore, it's more complicated than either the 
281      concurrent or the parallel (GUM) setup.
282
283    GUM version:
284      GUM iterates over incoming messages.
285      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
286      and sends out a fish whenever it has nothing to do; in-between
287      doing the actual reductions (shared code below) it processes the
288      incoming messages and deals with delayed operations 
289      (see PendingFetches).
290      This is not the ugliest code you could imagine, but it's bloody close.
291
292    ------------------------------------------------------------------------ */
293
294 static Capability *
295 schedule (Capability *initialCapability, Task *task)
296 {
297   StgTSO *t;
298   Capability *cap;
299   StgThreadReturnCode ret;
300 #if defined(GRAN)
301   rtsEvent *event;
302 #elif defined(PARALLEL_HASKELL)
303   StgTSO *tso;
304   GlobalTaskId pe;
305   rtsBool receivedFinish = rtsFalse;
306 # if defined(DEBUG)
307   nat tp_size, sp_size; // stats only
308 # endif
309 #endif
310   nat prev_what_next;
311   rtsBool ready_to_gc;
312 #if defined(THREADED_RTS)
313   rtsBool first = rtsTrue;
314 #endif
315   
316   cap = initialCapability;
317
318   // Pre-condition: this task owns initialCapability.
319   // The sched_mutex is *NOT* held
320   // NB. on return, we still hold a capability.
321
322   debugTrace (DEBUG_sched, 
323               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
324               task, initialCapability);
325
326   schedulePreLoop();
327
328   // -----------------------------------------------------------
329   // Scheduler loop starts here:
330
331 #if defined(PARALLEL_HASKELL)
332 #define TERMINATION_CONDITION        (!receivedFinish)
333 #elif defined(GRAN)
334 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
335 #else
336 #define TERMINATION_CONDITION        rtsTrue
337 #endif
338
339   while (TERMINATION_CONDITION) {
340
341 #if defined(GRAN)
342       /* Choose the processor with the next event */
343       CurrentProc = event->proc;
344       CurrentTSO = event->tso;
345 #endif
346
347 #if defined(THREADED_RTS)
348       if (first) {
349           // don't yield the first time, we want a chance to run this
350           // thread for a bit, even if there are others banging at the
351           // door.
352           first = rtsFalse;
353           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
354       } else {
355           // Yield the capability to higher-priority tasks if necessary.
356           yieldCapability(&cap, task);
357       }
358 #endif
359       
360 #if defined(THREADED_RTS)
361       schedulePushWork(cap,task);
362 #endif
363
364     // Check whether we have re-entered the RTS from Haskell without
365     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
366     // call).
367     if (cap->in_haskell) {
368           errorBelch("schedule: re-entered unsafely.\n"
369                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
370           stg_exit(EXIT_FAILURE);
371     }
372
373     // The interruption / shutdown sequence.
374     // 
375     // In order to cleanly shut down the runtime, we want to:
376     //   * make sure that all main threads return to their callers
377     //     with the state 'Interrupted'.
378     //   * clean up all OS threads assocated with the runtime
379     //   * free all memory etc.
380     //
381     // So the sequence for ^C goes like this:
382     //
383     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
384     //     arranges for some Capability to wake up
385     //
386     //   * all threads in the system are halted, and the zombies are
387     //     placed on the run queue for cleaning up.  We acquire all
388     //     the capabilities in order to delete the threads, this is
389     //     done by scheduleDoGC() for convenience (because GC already
390     //     needs to acquire all the capabilities).  We can't kill
391     //     threads involved in foreign calls.
392     // 
393     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
394     //
395     //   * sched_state := SCHED_SHUTTING_DOWN
396     //
397     //   * all workers exit when the run queue on their capability
398     //     drains.  All main threads will also exit when their TSO
399     //     reaches the head of the run queue and they can return.
400     //
401     //   * eventually all Capabilities will shut down, and the RTS can
402     //     exit.
403     //
404     //   * We might be left with threads blocked in foreign calls, 
405     //     we should really attempt to kill these somehow (TODO);
406     
407     switch (sched_state) {
408     case SCHED_RUNNING:
409         break;
410     case SCHED_INTERRUPTING:
411         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
412 #if defined(THREADED_RTS)
413         discardSparksCap(cap);
414 #endif
415         /* scheduleDoGC() deletes all the threads */
416         cap = scheduleDoGC(cap,task,rtsFalse);
417         break;
418     case SCHED_SHUTTING_DOWN:
419         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
420         // If we are a worker, just exit.  If we're a bound thread
421         // then we will exit below when we've removed our TSO from
422         // the run queue.
423         if (task->tso == NULL && emptyRunQueue(cap)) {
424             return cap;
425         }
426         break;
427     default:
428         barf("sched_state: %d", sched_state);
429     }
430
431 #if defined(THREADED_RTS)
432     // If the run queue is empty, take a spark and turn it into a thread.
433     {
434         if (emptyRunQueue(cap)) {
435             StgClosure *spark;
436             spark = findSpark(cap);
437             if (spark != NULL) {
438                 debugTrace(DEBUG_sched,
439                            "turning spark of closure %p into a thread",
440                            (StgClosure *)spark);
441                 createSparkThread(cap,spark);     
442             }
443         }
444     }
445 #endif // THREADED_RTS
446
447     scheduleStartSignalHandlers(cap);
448
449     // Only check the black holes here if we've nothing else to do.
450     // During normal execution, the black hole list only gets checked
451     // at GC time, to avoid repeatedly traversing this possibly long
452     // list each time around the scheduler.
453     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
454
455     scheduleCheckWakeupThreads(cap);
456
457     scheduleCheckBlockedThreads(cap);
458
459     scheduleDetectDeadlock(cap,task);
460 #if defined(THREADED_RTS)
461     cap = task->cap;    // reload cap, it might have changed
462 #endif
463
464     // Normally, the only way we can get here with no threads to
465     // run is if a keyboard interrupt received during 
466     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
467     // Additionally, it is not fatal for the
468     // threaded RTS to reach here with no threads to run.
469     //
470     // win32: might be here due to awaitEvent() being abandoned
471     // as a result of a console event having been delivered.
472     if ( emptyRunQueue(cap) ) {
473 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
474         ASSERT(sched_state >= SCHED_INTERRUPTING);
475 #endif
476         continue; // nothing to do
477     }
478
479 #if defined(PARALLEL_HASKELL)
480     scheduleSendPendingMessages();
481     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
482         continue;
483
484 #if defined(SPARKS)
485     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
486 #endif
487
488     /* If we still have no work we need to send a FISH to get a spark
489        from another PE */
490     if (emptyRunQueue(cap)) {
491         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
492         ASSERT(rtsFalse); // should not happen at the moment
493     }
494     // from here: non-empty run queue.
495     //  TODO: merge above case with this, only one call processMessages() !
496     if (PacketsWaiting()) {  /* process incoming messages, if
497                                 any pending...  only in else
498                                 because getRemoteWork waits for
499                                 messages as well */
500         receivedFinish = processMessages();
501     }
502 #endif
503
504 #if defined(GRAN)
505     scheduleProcessEvent(event);
506 #endif
507
508     // 
509     // Get a thread to run
510     //
511     t = popRunQueue(cap);
512
513 #if defined(GRAN) || defined(PAR)
514     scheduleGranParReport(); // some kind of debuging output
515 #else
516     // Sanity check the thread we're about to run.  This can be
517     // expensive if there is lots of thread switching going on...
518     IF_DEBUG(sanity,checkTSO(t));
519 #endif
520
521 #if defined(THREADED_RTS)
522     // Check whether we can run this thread in the current task.
523     // If not, we have to pass our capability to the right task.
524     {
525         Task *bound = t->bound;
526       
527         if (bound) {
528             if (bound == task) {
529                 debugTrace(DEBUG_sched,
530                            "### Running thread %lu in bound thread", (unsigned long)t->id);
531                 // yes, the Haskell thread is bound to the current native thread
532             } else {
533                 debugTrace(DEBUG_sched,
534                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
535                 // no, bound to a different Haskell thread: pass to that thread
536                 pushOnRunQueue(cap,t);
537                 continue;
538             }
539         } else {
540             // The thread we want to run is unbound.
541             if (task->tso) { 
542                 debugTrace(DEBUG_sched,
543                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
544                 // no, the current native thread is bound to a different
545                 // Haskell thread, so pass it to any worker thread
546                 pushOnRunQueue(cap,t);
547                 continue; 
548             }
549         }
550     }
551 #endif
552
553     /* context switches are initiated by the timer signal, unless
554      * the user specified "context switch as often as possible", with
555      * +RTS -C0
556      */
557     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
558         && !emptyThreadQueues(cap)) {
559         context_switch = 1;
560     }
561          
562 run_thread:
563
564     // CurrentTSO is the thread to run.  t might be different if we
565     // loop back to run_thread, so make sure to set CurrentTSO after
566     // that.
567     cap->r.rCurrentTSO = t;
568
569     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
570                               (long)t->id, whatNext_strs[t->what_next]);
571
572     startHeapProfTimer();
573
574     // Check for exceptions blocked on this thread
575     maybePerformBlockedException (cap, t);
576
577     // ----------------------------------------------------------------------
578     // Run the current thread 
579
580     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
581     ASSERT(t->cap == cap);
582
583     prev_what_next = t->what_next;
584
585     errno = t->saved_errno;
586 #if mingw32_HOST_OS
587     SetLastError(t->saved_winerror);
588 #endif
589
590     cap->in_haskell = rtsTrue;
591
592     dirty_TSO(cap,t);
593
594 #if defined(THREADED_RTS)
595     if (recent_activity == ACTIVITY_DONE_GC) {
596         // ACTIVITY_DONE_GC means we turned off the timer signal to
597         // conserve power (see #1623).  Re-enable it here.
598         nat prev;
599         prev = xchg((P_)&recent_activity, ACTIVITY_YES);
600         if (prev == ACTIVITY_DONE_GC) {
601             startTimer();
602         }
603     } else {
604         recent_activity = ACTIVITY_YES;
605     }
606 #endif
607
608     switch (prev_what_next) {
609         
610     case ThreadKilled:
611     case ThreadComplete:
612         /* Thread already finished, return to scheduler. */
613         ret = ThreadFinished;
614         break;
615         
616     case ThreadRunGHC:
617     {
618         StgRegTable *r;
619         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
620         cap = regTableToCapability(r);
621         ret = r->rRet;
622         break;
623     }
624     
625     case ThreadInterpret:
626         cap = interpretBCO(cap);
627         ret = cap->r.rRet;
628         break;
629         
630     default:
631         barf("schedule: invalid what_next field");
632     }
633
634     cap->in_haskell = rtsFalse;
635
636     // The TSO might have moved, eg. if it re-entered the RTS and a GC
637     // happened.  So find the new location:
638     t = cap->r.rCurrentTSO;
639
640     // We have run some Haskell code: there might be blackhole-blocked
641     // threads to wake up now.
642     // Lock-free test here should be ok, we're just setting a flag.
643     if ( blackhole_queue != END_TSO_QUEUE ) {
644         blackholes_need_checking = rtsTrue;
645     }
646     
647     // And save the current errno in this thread.
648     // XXX: possibly bogus for SMP because this thread might already
649     // be running again, see code below.
650     t->saved_errno = errno;
651 #if mingw32_HOST_OS
652     // Similarly for Windows error code
653     t->saved_winerror = GetLastError();
654 #endif
655
656 #if defined(THREADED_RTS)
657     // If ret is ThreadBlocked, and this Task is bound to the TSO that
658     // blocked, we are in limbo - the TSO is now owned by whatever it
659     // is blocked on, and may in fact already have been woken up,
660     // perhaps even on a different Capability.  It may be the case
661     // that task->cap != cap.  We better yield this Capability
662     // immediately and return to normaility.
663     if (ret == ThreadBlocked) {
664         debugTrace(DEBUG_sched,
665                    "--<< thread %lu (%s) stopped: blocked",
666                    (unsigned long)t->id, whatNext_strs[t->what_next]);
667         continue;
668     }
669 #endif
670
671     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
672     ASSERT(t->cap == cap);
673
674     // ----------------------------------------------------------------------
675     
676     // Costs for the scheduler are assigned to CCS_SYSTEM
677     stopHeapProfTimer();
678 #if defined(PROFILING)
679     CCCS = CCS_SYSTEM;
680 #endif
681     
682     schedulePostRunThread(t);
683
684     t = threadStackUnderflow(task,t);
685
686     ready_to_gc = rtsFalse;
687
688     switch (ret) {
689     case HeapOverflow:
690         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691         break;
692
693     case StackOverflow:
694         scheduleHandleStackOverflow(cap,task,t);
695         break;
696
697     case ThreadYielding:
698         if (scheduleHandleYield(cap, t, prev_what_next)) {
699             // shortcut for switching between compiler/interpreter:
700             goto run_thread; 
701         }
702         break;
703
704     case ThreadBlocked:
705         scheduleHandleThreadBlocked(t);
706         break;
707
708     case ThreadFinished:
709         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711         break;
712
713     default:
714       barf("schedule: invalid thread return code %d", (int)ret);
715     }
716
717     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718       cap = scheduleDoGC(cap,task,rtsFalse);
719     }
720   } /* end of while() */
721 }
722
723 /* ----------------------------------------------------------------------------
724  * Setting up the scheduler loop
725  * ------------------------------------------------------------------------- */
726
727 static void
728 schedulePreLoop(void)
729 {
730 #if defined(GRAN) 
731     /* set up first event to get things going */
732     /* ToDo: assign costs for system setup and init MainTSO ! */
733     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
734               ContinueThread, 
735               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
736     
737     debugTrace (DEBUG_gran,
738                 "GRAN: Init CurrentTSO (in schedule) = %p", 
739                 CurrentTSO);
740     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
741     
742     if (RtsFlags.GranFlags.Light) {
743         /* Save current time; GranSim Light only */
744         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
745     }      
746 #endif
747 }
748
749 /* -----------------------------------------------------------------------------
750  * schedulePushWork()
751  *
752  * Push work to other Capabilities if we have some.
753  * -------------------------------------------------------------------------- */
754
755 #if defined(THREADED_RTS)
756 static void
757 schedulePushWork(Capability *cap USED_IF_THREADS, 
758                  Task *task      USED_IF_THREADS)
759 {
760     Capability *free_caps[n_capabilities], *cap0;
761     nat i, n_free_caps;
762
763     // migration can be turned off with +RTS -qg
764     if (!RtsFlags.ParFlags.migrate) return;
765
766     // Check whether we have more threads on our run queue, or sparks
767     // in our pool, that we could hand to another Capability.
768     if ((emptyRunQueue(cap) || cap->run_queue_hd->_link == END_TSO_QUEUE)
769         && sparkPoolSizeCap(cap) < 2) {
770         return;
771     }
772
773     // First grab as many free Capabilities as we can.
774     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
775         cap0 = &capabilities[i];
776         if (cap != cap0 && tryGrabCapability(cap0,task)) {
777             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
778                 // it already has some work, we just grabbed it at 
779                 // the wrong moment.  Or maybe it's deadlocked!
780                 releaseCapability(cap0);
781             } else {
782                 free_caps[n_free_caps++] = cap0;
783             }
784         }
785     }
786
787     // we now have n_free_caps free capabilities stashed in
788     // free_caps[].  Share our run queue equally with them.  This is
789     // probably the simplest thing we could do; improvements we might
790     // want to do include:
791     //
792     //   - giving high priority to moving relatively new threads, on 
793     //     the gournds that they haven't had time to build up a
794     //     working set in the cache on this CPU/Capability.
795     //
796     //   - giving low priority to moving long-lived threads
797
798     if (n_free_caps > 0) {
799         StgTSO *prev, *t, *next;
800         rtsBool pushed_to_all;
801
802         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
803
804         i = 0;
805         pushed_to_all = rtsFalse;
806
807         if (cap->run_queue_hd != END_TSO_QUEUE) {
808             prev = cap->run_queue_hd;
809             t = prev->_link;
810             prev->_link = END_TSO_QUEUE;
811             for (; t != END_TSO_QUEUE; t = next) {
812                 next = t->_link;
813                 t->_link = END_TSO_QUEUE;
814                 if (t->what_next == ThreadRelocated
815                     || t->bound == task // don't move my bound thread
816                     || tsoLocked(t)) {  // don't move a locked thread
817                     setTSOLink(cap, prev, t);
818                     prev = t;
819                 } else if (i == n_free_caps) {
820                     pushed_to_all = rtsTrue;
821                     i = 0;
822                     // keep one for us
823                     setTSOLink(cap, prev, t);
824                     prev = t;
825                 } else {
826                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
827                     appendToRunQueue(free_caps[i],t);
828                     if (t->bound) { t->bound->cap = free_caps[i]; }
829                     t->cap = free_caps[i];
830                     i++;
831                 }
832             }
833             cap->run_queue_tl = prev;
834         }
835
836         // If there are some free capabilities that we didn't push any
837         // threads to, then try to push a spark to each one.
838         if (!pushed_to_all) {
839             StgClosure *spark;
840             // i is the next free capability to push to
841             for (; i < n_free_caps; i++) {
842                 if (emptySparkPoolCap(free_caps[i])) {
843                     spark = findSpark(cap);
844                     if (spark != NULL) {
845                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
846                         newSpark(&(free_caps[i]->r), spark);
847                     }
848                 }
849             }
850         }
851
852         // release the capabilities
853         for (i = 0; i < n_free_caps; i++) {
854             task->cap = free_caps[i];
855             releaseCapability(free_caps[i]);
856         }
857     }
858     task->cap = cap; // reset to point to our Capability.
859 }
860 #endif
861
862 /* ----------------------------------------------------------------------------
863  * Start any pending signal handlers
864  * ------------------------------------------------------------------------- */
865
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 static void
868 scheduleStartSignalHandlers(Capability *cap)
869 {
870     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871         // safe outside the lock
872         startSignalHandlers(cap);
873     }
874 }
875 #else
876 static void
877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
878 {
879 }
880 #endif
881
882 /* ----------------------------------------------------------------------------
883  * Check for blocked threads that can be woken up.
884  * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 {
889 #if !defined(THREADED_RTS)
890     //
891     // Check whether any waiting threads need to be woken up.  If the
892     // run queue is empty, and there are no other tasks running, we
893     // can wait indefinitely for something to happen.
894     //
895     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896     {
897         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
898     }
899 #endif
900 }
901
902
903 /* ----------------------------------------------------------------------------
904  * Check for threads woken up by other Capabilities
905  * ------------------------------------------------------------------------- */
906
907 static void
908 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
909 {
910 #if defined(THREADED_RTS)
911     // Any threads that were woken up by other Capabilities get
912     // appended to our run queue.
913     if (!emptyWakeupQueue(cap)) {
914         ACQUIRE_LOCK(&cap->lock);
915         if (emptyRunQueue(cap)) {
916             cap->run_queue_hd = cap->wakeup_queue_hd;
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         } else {
919             setTSOLink(cap, cap->run_queue_tl, cap->wakeup_queue_hd);
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         }
922         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
923         RELEASE_LOCK(&cap->lock);
924     }
925 #endif
926 }
927
928 /* ----------------------------------------------------------------------------
929  * Check for threads blocked on BLACKHOLEs that can be woken up
930  * ------------------------------------------------------------------------- */
931 static void
932 scheduleCheckBlackHoles (Capability *cap)
933 {
934     if ( blackholes_need_checking ) // check without the lock first
935     {
936         ACQUIRE_LOCK(&sched_mutex);
937         if ( blackholes_need_checking ) {
938             checkBlackHoles(cap);
939             blackholes_need_checking = rtsFalse;
940         }
941         RELEASE_LOCK(&sched_mutex);
942     }
943 }
944
945 /* ----------------------------------------------------------------------------
946  * Detect deadlock conditions and attempt to resolve them.
947  * ------------------------------------------------------------------------- */
948
949 static void
950 scheduleDetectDeadlock (Capability *cap, Task *task)
951 {
952
953 #if defined(PARALLEL_HASKELL)
954     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
955     return;
956 #endif
957
958     /* 
959      * Detect deadlock: when we have no threads to run, there are no
960      * threads blocked, waiting for I/O, or sleeping, and all the
961      * other tasks are waiting for work, we must have a deadlock of
962      * some description.
963      */
964     if ( emptyThreadQueues(cap) )
965     {
966 #if defined(THREADED_RTS)
967         /* 
968          * In the threaded RTS, we only check for deadlock if there
969          * has been no activity in a complete timeslice.  This means
970          * we won't eagerly start a full GC just because we don't have
971          * any threads to run currently.
972          */
973         if (recent_activity != ACTIVITY_INACTIVE) return;
974 #endif
975
976         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
977
978         // Garbage collection can release some new threads due to
979         // either (a) finalizers or (b) threads resurrected because
980         // they are unreachable and will therefore be sent an
981         // exception.  Any threads thus released will be immediately
982         // runnable.
983         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
984
985         recent_activity = ACTIVITY_DONE_GC;
986         // disable timer signals (see #1623)
987         stopTimer();
988         
989         if ( !emptyRunQueue(cap) ) return;
990
991 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
992         /* If we have user-installed signal handlers, then wait
993          * for signals to arrive rather then bombing out with a
994          * deadlock.
995          */
996         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
997             debugTrace(DEBUG_sched,
998                        "still deadlocked, waiting for signals...");
999
1000             awaitUserSignals();
1001
1002             if (signals_pending()) {
1003                 startSignalHandlers(cap);
1004             }
1005
1006             // either we have threads to run, or we were interrupted:
1007             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1008
1009             return;
1010         }
1011 #endif
1012
1013 #if !defined(THREADED_RTS)
1014         /* Probably a real deadlock.  Send the current main thread the
1015          * Deadlock exception.
1016          */
1017         if (task->tso) {
1018             switch (task->tso->why_blocked) {
1019             case BlockedOnSTM:
1020             case BlockedOnBlackHole:
1021             case BlockedOnException:
1022             case BlockedOnMVar:
1023                 throwToSingleThreaded(cap, task->tso, 
1024                                       (StgClosure *)nonTermination_closure);
1025                 return;
1026             default:
1027                 barf("deadlock: main thread blocked in a strange way");
1028             }
1029         }
1030         return;
1031 #endif
1032     }
1033 }
1034
1035 /* ----------------------------------------------------------------------------
1036  * Process an event (GRAN only)
1037  * ------------------------------------------------------------------------- */
1038
1039 #if defined(GRAN)
1040 static StgTSO *
1041 scheduleProcessEvent(rtsEvent *event)
1042 {
1043     StgTSO *t;
1044
1045     if (RtsFlags.GranFlags.Light)
1046       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1047
1048     /* adjust time based on time-stamp */
1049     if (event->time > CurrentTime[CurrentProc] &&
1050         event->evttype != ContinueThread)
1051       CurrentTime[CurrentProc] = event->time;
1052     
1053     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1054     if (!RtsFlags.GranFlags.Light)
1055       handleIdlePEs();
1056
1057     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1058
1059     /* main event dispatcher in GranSim */
1060     switch (event->evttype) {
1061       /* Should just be continuing execution */
1062     case ContinueThread:
1063       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1064       /* ToDo: check assertion
1065       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1066              run_queue_hd != END_TSO_QUEUE);
1067       */
1068       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1069       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1070           procStatus[CurrentProc]==Fetching) {
1071         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1072               CurrentTSO->id, CurrentTSO, CurrentProc);
1073         goto next_thread;
1074       } 
1075       /* Ignore ContinueThreads for completed threads */
1076       if (CurrentTSO->what_next == ThreadComplete) {
1077         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1078               CurrentTSO->id, CurrentTSO, CurrentProc);
1079         goto next_thread;
1080       } 
1081       /* Ignore ContinueThreads for threads that are being migrated */
1082       if (PROCS(CurrentTSO)==Nowhere) { 
1083         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1084               CurrentTSO->id, CurrentTSO, CurrentProc);
1085         goto next_thread;
1086       }
1087       /* The thread should be at the beginning of the run queue */
1088       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1089         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1090               CurrentTSO->id, CurrentTSO, CurrentProc);
1091         break; // run the thread anyway
1092       }
1093       /*
1094       new_event(proc, proc, CurrentTime[proc],
1095                 FindWork,
1096                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1097       goto next_thread; 
1098       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1099       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1100
1101     case FetchNode:
1102       do_the_fetchnode(event);
1103       goto next_thread;             /* handle next event in event queue  */
1104       
1105     case GlobalBlock:
1106       do_the_globalblock(event);
1107       goto next_thread;             /* handle next event in event queue  */
1108       
1109     case FetchReply:
1110       do_the_fetchreply(event);
1111       goto next_thread;             /* handle next event in event queue  */
1112       
1113     case UnblockThread:   /* Move from the blocked queue to the tail of */
1114       do_the_unblock(event);
1115       goto next_thread;             /* handle next event in event queue  */
1116       
1117     case ResumeThread:  /* Move from the blocked queue to the tail of */
1118       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1119       event->tso->gran.blocktime += 
1120         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1121       do_the_startthread(event);
1122       goto next_thread;             /* handle next event in event queue  */
1123       
1124     case StartThread:
1125       do_the_startthread(event);
1126       goto next_thread;             /* handle next event in event queue  */
1127       
1128     case MoveThread:
1129       do_the_movethread(event);
1130       goto next_thread;             /* handle next event in event queue  */
1131       
1132     case MoveSpark:
1133       do_the_movespark(event);
1134       goto next_thread;             /* handle next event in event queue  */
1135       
1136     case FindWork:
1137       do_the_findwork(event);
1138       goto next_thread;             /* handle next event in event queue  */
1139       
1140     default:
1141       barf("Illegal event type %u\n", event->evttype);
1142     }  /* switch */
1143     
1144     /* This point was scheduler_loop in the old RTS */
1145
1146     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1147
1148     TimeOfLastEvent = CurrentTime[CurrentProc];
1149     TimeOfNextEvent = get_time_of_next_event();
1150     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1151     // CurrentTSO = ThreadQueueHd;
1152
1153     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1154                          TimeOfNextEvent));
1155
1156     if (RtsFlags.GranFlags.Light) 
1157       GranSimLight_leave_system(event, &ActiveTSO); 
1158
1159     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1160
1161     IF_DEBUG(gran, 
1162              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1163
1164     /* in a GranSim setup the TSO stays on the run queue */
1165     t = CurrentTSO;
1166     /* Take a thread from the run queue. */
1167     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1168
1169     IF_DEBUG(gran, 
1170              debugBelch("GRAN: About to run current thread, which is\n");
1171              G_TSO(t,5));
1172
1173     context_switch = 0; // turned on via GranYield, checking events and time slice
1174
1175     IF_DEBUG(gran, 
1176              DumpGranEvent(GR_SCHEDULE, t));
1177
1178     procStatus[CurrentProc] = Busy;
1179 }
1180 #endif // GRAN
1181
1182 /* ----------------------------------------------------------------------------
1183  * Send pending messages (PARALLEL_HASKELL only)
1184  * ------------------------------------------------------------------------- */
1185
1186 #if defined(PARALLEL_HASKELL)
1187 static StgTSO *
1188 scheduleSendPendingMessages(void)
1189 {
1190     StgSparkPool *pool;
1191     rtsSpark spark;
1192     StgTSO *t;
1193
1194 # if defined(PAR) // global Mem.Mgmt., omit for now
1195     if (PendingFetches != END_BF_QUEUE) {
1196         processFetches();
1197     }
1198 # endif
1199     
1200     if (RtsFlags.ParFlags.BufferTime) {
1201         // if we use message buffering, we must send away all message
1202         // packets which have become too old...
1203         sendOldBuffers(); 
1204     }
1205 }
1206 #endif
1207
1208 /* ----------------------------------------------------------------------------
1209  * Activate spark threads (PARALLEL_HASKELL only)
1210  * ------------------------------------------------------------------------- */
1211
1212 #if defined(PARALLEL_HASKELL)
1213 static void
1214 scheduleActivateSpark(void)
1215 {
1216 #if defined(SPARKS)
1217   ASSERT(emptyRunQueue());
1218 /* We get here if the run queue is empty and want some work.
1219    We try to turn a spark into a thread, and add it to the run queue,
1220    from where it will be picked up in the next iteration of the scheduler
1221    loop.
1222 */
1223
1224       /* :-[  no local threads => look out for local sparks */
1225       /* the spark pool for the current PE */
1226       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1227       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1228           pool->hd < pool->tl) {
1229         /* 
1230          * ToDo: add GC code check that we really have enough heap afterwards!!
1231          * Old comment:
1232          * If we're here (no runnable threads) and we have pending
1233          * sparks, we must have a space problem.  Get enough space
1234          * to turn one of those pending sparks into a
1235          * thread... 
1236          */
1237
1238         spark = findSpark(rtsFalse);            /* get a spark */
1239         if (spark != (rtsSpark) NULL) {
1240           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1241           IF_PAR_DEBUG(fish, // schedule,
1242                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1243                              tso->id, tso, advisory_thread_count));
1244
1245           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1246             IF_PAR_DEBUG(fish, // schedule,
1247                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1248                             spark));
1249             return rtsFalse; /* failed to generate a thread */
1250           }                  /* otherwise fall through & pick-up new tso */
1251         } else {
1252           IF_PAR_DEBUG(fish, // schedule,
1253                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1254                              spark_queue_len(pool)));
1255           return rtsFalse;  /* failed to generate a thread */
1256         }
1257         return rtsTrue;  /* success in generating a thread */
1258   } else { /* no more threads permitted or pool empty */
1259     return rtsFalse;  /* failed to generateThread */
1260   }
1261 #else
1262   tso = NULL; // avoid compiler warning only
1263   return rtsFalse;  /* dummy in non-PAR setup */
1264 #endif // SPARKS
1265 }
1266 #endif // PARALLEL_HASKELL
1267
1268 /* ----------------------------------------------------------------------------
1269  * Get work from a remote node (PARALLEL_HASKELL only)
1270  * ------------------------------------------------------------------------- */
1271     
1272 #if defined(PARALLEL_HASKELL)
1273 static rtsBool
1274 scheduleGetRemoteWork(rtsBool *receivedFinish)
1275 {
1276   ASSERT(emptyRunQueue());
1277
1278   if (RtsFlags.ParFlags.BufferTime) {
1279         IF_PAR_DEBUG(verbose, 
1280                 debugBelch("...send all pending data,"));
1281         {
1282           nat i;
1283           for (i=1; i<=nPEs; i++)
1284             sendImmediately(i); // send all messages away immediately
1285         }
1286   }
1287 # ifndef SPARKS
1288         //++EDEN++ idle() , i.e. send all buffers, wait for work
1289         // suppress fishing in EDEN... just look for incoming messages
1290         // (blocking receive)
1291   IF_PAR_DEBUG(verbose, 
1292                debugBelch("...wait for incoming messages...\n"));
1293   *receivedFinish = processMessages(); // blocking receive...
1294
1295         // and reenter scheduling loop after having received something
1296         // (return rtsFalse below)
1297
1298 # else /* activate SPARKS machinery */
1299 /* We get here, if we have no work, tried to activate a local spark, but still
1300    have no work. We try to get a remote spark, by sending a FISH message.
1301    Thread migration should be added here, and triggered when a sequence of 
1302    fishes returns without work. */
1303         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1304
1305       /* =8-[  no local sparks => look for work on other PEs */
1306         /*
1307          * We really have absolutely no work.  Send out a fish
1308          * (there may be some out there already), and wait for
1309          * something to arrive.  We clearly can't run any threads
1310          * until a SCHEDULE or RESUME arrives, and so that's what
1311          * we're hoping to see.  (Of course, we still have to
1312          * respond to other types of messages.)
1313          */
1314         rtsTime now = msTime() /*CURRENT_TIME*/;
1315         IF_PAR_DEBUG(verbose, 
1316                      debugBelch("--  now=%ld\n", now));
1317         IF_PAR_DEBUG(fish, // verbose,
1318              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1319                  (last_fish_arrived_at!=0 &&
1320                   last_fish_arrived_at+delay > now)) {
1321                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1322                      now, last_fish_arrived_at+delay, 
1323                      last_fish_arrived_at,
1324                      delay);
1325              });
1326   
1327         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1328             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1329           if (last_fish_arrived_at==0 ||
1330               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1331             /* outstandingFishes is set in sendFish, processFish;
1332                avoid flooding system with fishes via delay */
1333     next_fish_to_send_at = 0;  
1334   } else {
1335     /* ToDo: this should be done in the main scheduling loop to avoid the
1336              busy wait here; not so bad if fish delay is very small  */
1337     int iq = 0; // DEBUGGING -- HWL
1338     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1339     /* send a fish when ready, but process messages that arrive in the meantime */
1340     do {
1341       if (PacketsWaiting()) {
1342         iq++; // DEBUGGING
1343         *receivedFinish = processMessages();
1344       }
1345       now = msTime();
1346     } while (!*receivedFinish || now<next_fish_to_send_at);
1347     // JB: This means the fish could become obsolete, if we receive
1348     // work. Better check for work again? 
1349     // last line: while (!receivedFinish || !haveWork || now<...)
1350     // next line: if (receivedFinish || haveWork )
1351
1352     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1353       return rtsFalse;  // NB: this will leave scheduler loop
1354                         // immediately after return!
1355                           
1356     IF_PAR_DEBUG(fish, // verbose,
1357                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1358
1359   }
1360
1361     // JB: IMHO, this should all be hidden inside sendFish(...)
1362     /* pe = choosePE(); 
1363        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1364                 NEW_FISH_HUNGER);
1365
1366     // Global statistics: count no. of fishes
1367     if (RtsFlags.ParFlags.ParStats.Global &&
1368          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1369            globalParStats.tot_fish_mess++;
1370            }
1371     */ 
1372
1373   /* delayed fishes must have been sent by now! */
1374   next_fish_to_send_at = 0;  
1375   }
1376       
1377   *receivedFinish = processMessages();
1378 # endif /* SPARKS */
1379
1380  return rtsFalse;
1381  /* NB: this function always returns rtsFalse, meaning the scheduler
1382     loop continues with the next iteration; 
1383     rationale: 
1384       return code means success in finding work; we enter this function
1385       if there is no local work, thus have to send a fish which takes
1386       time until it arrives with work; in the meantime we should process
1387       messages in the main loop;
1388  */
1389 }
1390 #endif // PARALLEL_HASKELL
1391
1392 /* ----------------------------------------------------------------------------
1393  * PAR/GRAN: Report stats & debugging info(?)
1394  * ------------------------------------------------------------------------- */
1395
1396 #if defined(PAR) || defined(GRAN)
1397 static void
1398 scheduleGranParReport(void)
1399 {
1400   ASSERT(run_queue_hd != END_TSO_QUEUE);
1401
1402   /* Take a thread from the run queue, if we have work */
1403   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1404
1405     /* If this TSO has got its outport closed in the meantime, 
1406      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1407      * It has to be marked as TH_DEAD for this purpose.
1408      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1409
1410 JB: TODO: investigate wether state change field could be nuked
1411      entirely and replaced by the normal tso state (whatnext
1412      field). All we want to do is to kill tsos from outside.
1413      */
1414
1415     /* ToDo: write something to the log-file
1416     if (RTSflags.ParFlags.granSimStats && !sameThread)
1417         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1418
1419     CurrentTSO = t;
1420     */
1421     /* the spark pool for the current PE */
1422     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1423
1424     IF_DEBUG(scheduler, 
1425              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1426                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1427
1428     IF_PAR_DEBUG(fish,
1429              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1430                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1431
1432     if (RtsFlags.ParFlags.ParStats.Full && 
1433         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1434         (emitSchedule || // forced emit
1435          (t && LastTSO && t->id != LastTSO->id))) {
1436       /* 
1437          we are running a different TSO, so write a schedule event to log file
1438          NB: If we use fair scheduling we also have to write  a deschedule 
1439              event for LastTSO; with unfair scheduling we know that the
1440              previous tso has blocked whenever we switch to another tso, so
1441              we don't need it in GUM for now
1442       */
1443       IF_PAR_DEBUG(fish, // schedule,
1444                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1445
1446       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1447                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1448       emitSchedule = rtsFalse;
1449     }
1450 }     
1451 #endif
1452
1453 /* ----------------------------------------------------------------------------
1454  * After running a thread...
1455  * ------------------------------------------------------------------------- */
1456
1457 static void
1458 schedulePostRunThread (StgTSO *t)
1459 {
1460     // We have to be able to catch transactions that are in an
1461     // infinite loop as a result of seeing an inconsistent view of
1462     // memory, e.g. 
1463     //
1464     //   atomically $ do
1465     //       [a,b] <- mapM readTVar [ta,tb]
1466     //       when (a == b) loop
1467     //
1468     // and a is never equal to b given a consistent view of memory.
1469     //
1470     if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1471         if (!stmValidateNestOfTransactions (t -> trec)) {
1472             debugTrace(DEBUG_sched | DEBUG_stm,
1473                        "trec %p found wasting its time", t);
1474             
1475             // strip the stack back to the
1476             // ATOMICALLY_FRAME, aborting the (nested)
1477             // transaction, and saving the stack of any
1478             // partially-evaluated thunks on the heap.
1479             throwToSingleThreaded_(&capabilities[0], t, 
1480                                    NULL, rtsTrue, NULL);
1481             
1482             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1483         }
1484     }
1485
1486 #if defined(PAR)
1487     /* HACK 675: if the last thread didn't yield, make sure to print a 
1488        SCHEDULE event to the log file when StgRunning the next thread, even
1489        if it is the same one as before */
1490     LastTSO = t; 
1491     TimeOfLastYield = CURRENT_TIME;
1492 #endif
1493
1494   /* some statistics gathering in the parallel case */
1495
1496 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1497   switch (ret) {
1498     case HeapOverflow:
1499 # if defined(GRAN)
1500       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1501       globalGranStats.tot_heapover++;
1502 # elif defined(PAR)
1503       globalParStats.tot_heapover++;
1504 # endif
1505       break;
1506
1507      case StackOverflow:
1508 # if defined(GRAN)
1509       IF_DEBUG(gran, 
1510                DumpGranEvent(GR_DESCHEDULE, t));
1511       globalGranStats.tot_stackover++;
1512 # elif defined(PAR)
1513       // IF_DEBUG(par, 
1514       // DumpGranEvent(GR_DESCHEDULE, t);
1515       globalParStats.tot_stackover++;
1516 # endif
1517       break;
1518
1519     case ThreadYielding:
1520 # if defined(GRAN)
1521       IF_DEBUG(gran, 
1522                DumpGranEvent(GR_DESCHEDULE, t));
1523       globalGranStats.tot_yields++;
1524 # elif defined(PAR)
1525       // IF_DEBUG(par, 
1526       // DumpGranEvent(GR_DESCHEDULE, t);
1527       globalParStats.tot_yields++;
1528 # endif
1529       break; 
1530
1531     case ThreadBlocked:
1532 # if defined(GRAN)
1533         debugTrace(DEBUG_sched, 
1534                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1535                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1536                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1537                if (t->block_info.closure!=(StgClosure*)NULL)
1538                  print_bq(t->block_info.closure);
1539                debugBelch("\n"));
1540
1541       // ??? needed; should emit block before
1542       IF_DEBUG(gran, 
1543                DumpGranEvent(GR_DESCHEDULE, t)); 
1544       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1545       /*
1546         ngoq Dogh!
1547       ASSERT(procStatus[CurrentProc]==Busy || 
1548               ((procStatus[CurrentProc]==Fetching) && 
1549               (t->block_info.closure!=(StgClosure*)NULL)));
1550       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1551           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1552             procStatus[CurrentProc]==Fetching)) 
1553         procStatus[CurrentProc] = Idle;
1554       */
1555 # elif defined(PAR)
1556 //++PAR++  blockThread() writes the event (change?)
1557 # endif
1558     break;
1559
1560   case ThreadFinished:
1561     break;
1562
1563   default:
1564     barf("parGlobalStats: unknown return code");
1565     break;
1566     }
1567 #endif
1568 }
1569
1570 /* -----------------------------------------------------------------------------
1571  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1572  * -------------------------------------------------------------------------- */
1573
1574 static rtsBool
1575 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1576 {
1577     // did the task ask for a large block?
1578     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1579         // if so, get one and push it on the front of the nursery.
1580         bdescr *bd;
1581         lnat blocks;
1582         
1583         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1584         
1585         debugTrace(DEBUG_sched,
1586                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1587                    (long)t->id, whatNext_strs[t->what_next], blocks);
1588     
1589         // don't do this if the nursery is (nearly) full, we'll GC first.
1590         if (cap->r.rCurrentNursery->link != NULL ||
1591             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1592                                                // if the nursery has only one block.
1593             
1594             ACQUIRE_SM_LOCK
1595             bd = allocGroup( blocks );
1596             RELEASE_SM_LOCK
1597             cap->r.rNursery->n_blocks += blocks;
1598             
1599             // link the new group into the list
1600             bd->link = cap->r.rCurrentNursery;
1601             bd->u.back = cap->r.rCurrentNursery->u.back;
1602             if (cap->r.rCurrentNursery->u.back != NULL) {
1603                 cap->r.rCurrentNursery->u.back->link = bd;
1604             } else {
1605 #if !defined(THREADED_RTS)
1606                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1607                        g0s0 == cap->r.rNursery);
1608 #endif
1609                 cap->r.rNursery->blocks = bd;
1610             }             
1611             cap->r.rCurrentNursery->u.back = bd;
1612             
1613             // initialise it as a nursery block.  We initialise the
1614             // step, gen_no, and flags field of *every* sub-block in
1615             // this large block, because this is easier than making
1616             // sure that we always find the block head of a large
1617             // block whenever we call Bdescr() (eg. evacuate() and
1618             // isAlive() in the GC would both have to do this, at
1619             // least).
1620             { 
1621                 bdescr *x;
1622                 for (x = bd; x < bd + blocks; x++) {
1623                     x->step = cap->r.rNursery;
1624                     x->gen_no = 0;
1625                     x->flags = 0;
1626                 }
1627             }
1628             
1629             // This assert can be a killer if the app is doing lots
1630             // of large block allocations.
1631             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1632             
1633             // now update the nursery to point to the new block
1634             cap->r.rCurrentNursery = bd;
1635             
1636             // we might be unlucky and have another thread get on the
1637             // run queue before us and steal the large block, but in that
1638             // case the thread will just end up requesting another large
1639             // block.
1640             pushOnRunQueue(cap,t);
1641             return rtsFalse;  /* not actually GC'ing */
1642         }
1643     }
1644     
1645     debugTrace(DEBUG_sched,
1646                "--<< thread %ld (%s) stopped: HeapOverflow",
1647                (long)t->id, whatNext_strs[t->what_next]);
1648
1649 #if defined(GRAN)
1650     ASSERT(!is_on_queue(t,CurrentProc));
1651 #elif defined(PARALLEL_HASKELL)
1652     /* Currently we emit a DESCHEDULE event before GC in GUM.
1653        ToDo: either add separate event to distinguish SYSTEM time from rest
1654        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1655     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1656         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1657                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1658         emitSchedule = rtsTrue;
1659     }
1660 #endif
1661       
1662     if (context_switch) {
1663         // Sometimes we miss a context switch, e.g. when calling
1664         // primitives in a tight loop, MAYBE_GC() doesn't check the
1665         // context switch flag, and we end up waiting for a GC.
1666         // See #1984, and concurrent/should_run/1984
1667         context_switch = 0;
1668         addToRunQueue(cap,t);
1669     } else {
1670         pushOnRunQueue(cap,t);
1671     }
1672     return rtsTrue;
1673     /* actual GC is done at the end of the while loop in schedule() */
1674 }
1675
1676 /* -----------------------------------------------------------------------------
1677  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1678  * -------------------------------------------------------------------------- */
1679
1680 static void
1681 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1682 {
1683     debugTrace (DEBUG_sched,
1684                 "--<< thread %ld (%s) stopped, StackOverflow", 
1685                 (long)t->id, whatNext_strs[t->what_next]);
1686
1687     /* just adjust the stack for this thread, then pop it back
1688      * on the run queue.
1689      */
1690     { 
1691         /* enlarge the stack */
1692         StgTSO *new_t = threadStackOverflow(cap, t);
1693         
1694         /* The TSO attached to this Task may have moved, so update the
1695          * pointer to it.
1696          */
1697         if (task->tso == t) {
1698             task->tso = new_t;
1699         }
1700         pushOnRunQueue(cap,new_t);
1701     }
1702 }
1703
1704 /* -----------------------------------------------------------------------------
1705  * Handle a thread that returned to the scheduler with ThreadYielding
1706  * -------------------------------------------------------------------------- */
1707
1708 static rtsBool
1709 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1710 {
1711     // Reset the context switch flag.  We don't do this just before
1712     // running the thread, because that would mean we would lose ticks
1713     // during GC, which can lead to unfair scheduling (a thread hogs
1714     // the CPU because the tick always arrives during GC).  This way
1715     // penalises threads that do a lot of allocation, but that seems
1716     // better than the alternative.
1717     context_switch = 0;
1718     
1719     /* put the thread back on the run queue.  Then, if we're ready to
1720      * GC, check whether this is the last task to stop.  If so, wake
1721      * up the GC thread.  getThread will block during a GC until the
1722      * GC is finished.
1723      */
1724 #ifdef DEBUG
1725     if (t->what_next != prev_what_next) {
1726         debugTrace(DEBUG_sched,
1727                    "--<< thread %ld (%s) stopped to switch evaluators", 
1728                    (long)t->id, whatNext_strs[t->what_next]);
1729     } else {
1730         debugTrace(DEBUG_sched,
1731                    "--<< thread %ld (%s) stopped, yielding",
1732                    (long)t->id, whatNext_strs[t->what_next]);
1733     }
1734 #endif
1735     
1736     IF_DEBUG(sanity,
1737              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1738              checkTSO(t));
1739     ASSERT(t->_link == END_TSO_QUEUE);
1740     
1741     // Shortcut if we're just switching evaluators: don't bother
1742     // doing stack squeezing (which can be expensive), just run the
1743     // thread.
1744     if (t->what_next != prev_what_next) {
1745         return rtsTrue;
1746     }
1747     
1748 #if defined(GRAN)
1749     ASSERT(!is_on_queue(t,CurrentProc));
1750       
1751     IF_DEBUG(sanity,
1752              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1753              checkThreadQsSanity(rtsTrue));
1754
1755 #endif
1756
1757     addToRunQueue(cap,t);
1758
1759 #if defined(GRAN)
1760     /* add a ContinueThread event to actually process the thread */
1761     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1762               ContinueThread,
1763               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1764     IF_GRAN_DEBUG(bq, 
1765                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1766                   G_EVENTQ(0);
1767                   G_CURR_THREADQ(0));
1768 #endif
1769     return rtsFalse;
1770 }
1771
1772 /* -----------------------------------------------------------------------------
1773  * Handle a thread that returned to the scheduler with ThreadBlocked
1774  * -------------------------------------------------------------------------- */
1775
1776 static void
1777 scheduleHandleThreadBlocked( StgTSO *t
1778 #if !defined(GRAN) && !defined(DEBUG)
1779     STG_UNUSED
1780 #endif
1781     )
1782 {
1783 #if defined(GRAN)
1784     IF_DEBUG(scheduler,
1785              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1786                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1787              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1788     
1789     // ??? needed; should emit block before
1790     IF_DEBUG(gran, 
1791              DumpGranEvent(GR_DESCHEDULE, t)); 
1792     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1793     /*
1794       ngoq Dogh!
1795       ASSERT(procStatus[CurrentProc]==Busy || 
1796       ((procStatus[CurrentProc]==Fetching) && 
1797       (t->block_info.closure!=(StgClosure*)NULL)));
1798       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1799       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1800       procStatus[CurrentProc]==Fetching)) 
1801       procStatus[CurrentProc] = Idle;
1802     */
1803 #elif defined(PAR)
1804     IF_DEBUG(scheduler,
1805              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1806                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1807     IF_PAR_DEBUG(bq,
1808                  
1809                  if (t->block_info.closure!=(StgClosure*)NULL) 
1810                  print_bq(t->block_info.closure));
1811     
1812     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1813     blockThread(t);
1814     
1815     /* whatever we schedule next, we must log that schedule */
1816     emitSchedule = rtsTrue;
1817     
1818 #else /* !GRAN */
1819
1820       // We don't need to do anything.  The thread is blocked, and it
1821       // has tidied up its stack and placed itself on whatever queue
1822       // it needs to be on.
1823
1824     // ASSERT(t->why_blocked != NotBlocked);
1825     // Not true: for example,
1826     //    - in THREADED_RTS, the thread may already have been woken
1827     //      up by another Capability.  This actually happens: try
1828     //      conc023 +RTS -N2.
1829     //    - the thread may have woken itself up already, because
1830     //      threadPaused() might have raised a blocked throwTo
1831     //      exception, see maybePerformBlockedException().
1832
1833 #ifdef DEBUG
1834     if (traceClass(DEBUG_sched)) {
1835         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1836                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1837         printThreadBlockage(t);
1838         debugTraceEnd();
1839     }
1840 #endif
1841     
1842     /* Only for dumping event to log file 
1843        ToDo: do I need this in GranSim, too?
1844        blockThread(t);
1845     */
1846 #endif
1847 }
1848
1849 /* -----------------------------------------------------------------------------
1850  * Handle a thread that returned to the scheduler with ThreadFinished
1851  * -------------------------------------------------------------------------- */
1852
1853 static rtsBool
1854 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1855 {
1856     /* Need to check whether this was a main thread, and if so,
1857      * return with the return value.
1858      *
1859      * We also end up here if the thread kills itself with an
1860      * uncaught exception, see Exception.cmm.
1861      */
1862     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1863                (unsigned long)t->id, whatNext_strs[t->what_next]);
1864
1865 #if defined(GRAN)
1866       endThread(t, CurrentProc); // clean-up the thread
1867 #elif defined(PARALLEL_HASKELL)
1868       /* For now all are advisory -- HWL */
1869       //if(t->priority==AdvisoryPriority) ??
1870       advisory_thread_count--; // JB: Caution with this counter, buggy!
1871       
1872 # if defined(DIST)
1873       if(t->dist.priority==RevalPriority)
1874         FinishReval(t);
1875 # endif
1876     
1877 # if defined(EDENOLD)
1878       // the thread could still have an outport... (BUG)
1879       if (t->eden.outport != -1) {
1880       // delete the outport for the tso which has finished...
1881         IF_PAR_DEBUG(eden_ports,
1882                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1883                               t->eden.outport, t->id));
1884         deleteOPT(t);
1885       }
1886       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1887       if (t->eden.epid != -1) {
1888         IF_PAR_DEBUG(eden_ports,
1889                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1890                            t->id, t->eden.epid));
1891         removeTSOfromProcess(t);
1892       }
1893 # endif 
1894
1895 # if defined(PAR)
1896       if (RtsFlags.ParFlags.ParStats.Full &&
1897           !RtsFlags.ParFlags.ParStats.Suppressed) 
1898         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1899
1900       //  t->par only contains statistics: left out for now...
1901       IF_PAR_DEBUG(fish,
1902                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1903                               t->id,t,t->par.sparkname));
1904 # endif
1905 #endif // PARALLEL_HASKELL
1906
1907       //
1908       // Check whether the thread that just completed was a bound
1909       // thread, and if so return with the result.  
1910       //
1911       // There is an assumption here that all thread completion goes
1912       // through this point; we need to make sure that if a thread
1913       // ends up in the ThreadKilled state, that it stays on the run
1914       // queue so it can be dealt with here.
1915       //
1916
1917       if (t->bound) {
1918
1919           if (t->bound != task) {
1920 #if !defined(THREADED_RTS)
1921               // Must be a bound thread that is not the topmost one.  Leave
1922               // it on the run queue until the stack has unwound to the
1923               // point where we can deal with this.  Leaving it on the run
1924               // queue also ensures that the garbage collector knows about
1925               // this thread and its return value (it gets dropped from the
1926               // step->threads list so there's no other way to find it).
1927               appendToRunQueue(cap,t);
1928               return rtsFalse;
1929 #else
1930               // this cannot happen in the threaded RTS, because a
1931               // bound thread can only be run by the appropriate Task.
1932               barf("finished bound thread that isn't mine");
1933 #endif
1934           }
1935
1936           ASSERT(task->tso == t);
1937
1938           if (t->what_next == ThreadComplete) {
1939               if (task->ret) {
1940                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1941                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1942               }
1943               task->stat = Success;
1944           } else {
1945               if (task->ret) {
1946                   *(task->ret) = NULL;
1947               }
1948               if (sched_state >= SCHED_INTERRUPTING) {
1949                   task->stat = Interrupted;
1950               } else {
1951                   task->stat = Killed;
1952               }
1953           }
1954 #ifdef DEBUG
1955           removeThreadLabel((StgWord)task->tso->id);
1956 #endif
1957           return rtsTrue; // tells schedule() to return
1958       }
1959
1960       return rtsFalse;
1961 }
1962
1963 /* -----------------------------------------------------------------------------
1964  * Perform a heap census
1965  * -------------------------------------------------------------------------- */
1966
1967 static rtsBool
1968 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1969 {
1970     // When we have +RTS -i0 and we're heap profiling, do a census at
1971     // every GC.  This lets us get repeatable runs for debugging.
1972     if (performHeapProfile ||
1973         (RtsFlags.ProfFlags.profileInterval==0 &&
1974          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1975         return rtsTrue;
1976     } else {
1977         return rtsFalse;
1978     }
1979 }
1980
1981 /* -----------------------------------------------------------------------------
1982  * Perform a garbage collection if necessary
1983  * -------------------------------------------------------------------------- */
1984
1985 static Capability *
1986 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1987 {
1988     StgTSO *t;
1989     rtsBool heap_census;
1990 #ifdef THREADED_RTS
1991     static volatile StgWord waiting_for_gc;
1992     rtsBool was_waiting;
1993     nat i;
1994 #endif
1995
1996 #ifdef THREADED_RTS
1997     // In order to GC, there must be no threads running Haskell code.
1998     // Therefore, the GC thread needs to hold *all* the capabilities,
1999     // and release them after the GC has completed.  
2000     //
2001     // This seems to be the simplest way: previous attempts involved
2002     // making all the threads with capabilities give up their
2003     // capabilities and sleep except for the *last* one, which
2004     // actually did the GC.  But it's quite hard to arrange for all
2005     // the other tasks to sleep and stay asleep.
2006     //
2007         
2008     was_waiting = cas(&waiting_for_gc, 0, 1);
2009     if (was_waiting) {
2010         do {
2011             debugTrace(DEBUG_sched, "someone else is trying to GC...");
2012             if (cap) yieldCapability(&cap,task);
2013         } while (waiting_for_gc);
2014         return cap;  // NOTE: task->cap might have changed here
2015     }
2016
2017     for (i=0; i < n_capabilities; i++) {
2018         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
2019         if (cap != &capabilities[i]) {
2020             Capability *pcap = &capabilities[i];
2021             // we better hope this task doesn't get migrated to
2022             // another Capability while we're waiting for this one.
2023             // It won't, because load balancing happens while we have
2024             // all the Capabilities, but even so it's a slightly
2025             // unsavoury invariant.
2026             task->cap = pcap;
2027             context_switch = 1;
2028             waitForReturnCapability(&pcap, task);
2029             if (pcap != &capabilities[i]) {
2030                 barf("scheduleDoGC: got the wrong capability");
2031             }
2032         }
2033     }
2034
2035     waiting_for_gc = rtsFalse;
2036 #endif
2037
2038     // so this happens periodically:
2039     if (cap) scheduleCheckBlackHoles(cap);
2040     
2041     IF_DEBUG(scheduler, printAllThreads());
2042
2043     /*
2044      * We now have all the capabilities; if we're in an interrupting
2045      * state, then we should take the opportunity to delete all the
2046      * threads in the system.
2047      */
2048     if (sched_state >= SCHED_INTERRUPTING) {
2049         deleteAllThreads(&capabilities[0]);
2050         sched_state = SCHED_SHUTTING_DOWN;
2051     }
2052     
2053     heap_census = scheduleNeedHeapProfile(rtsTrue);
2054
2055     /* everybody back, start the GC.
2056      * Could do it in this thread, or signal a condition var
2057      * to do it in another thread.  Either way, we need to
2058      * broadcast on gc_pending_cond afterward.
2059      */
2060 #if defined(THREADED_RTS)
2061     debugTrace(DEBUG_sched, "doing GC");
2062 #endif
2063     GarbageCollect(force_major || heap_census);
2064     
2065     if (heap_census) {
2066         debugTrace(DEBUG_sched, "performing heap census");
2067         heapCensus();
2068         performHeapProfile = rtsFalse;
2069     }
2070
2071 #if defined(THREADED_RTS)
2072     // release our stash of capabilities.
2073     for (i = 0; i < n_capabilities; i++) {
2074         if (cap != &capabilities[i]) {
2075             task->cap = &capabilities[i];
2076             releaseCapability(&capabilities[i]);
2077         }
2078     }
2079     if (cap) {
2080         task->cap = cap;
2081     } else {
2082         task->cap = NULL;
2083     }
2084 #endif
2085
2086 #if defined(GRAN)
2087     /* add a ContinueThread event to continue execution of current thread */
2088     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2089               ContinueThread,
2090               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2091     IF_GRAN_DEBUG(bq, 
2092                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2093                   G_EVENTQ(0);
2094                   G_CURR_THREADQ(0));
2095 #endif /* GRAN */
2096
2097     return cap;
2098 }
2099
2100 /* ---------------------------------------------------------------------------
2101  * Singleton fork(). Do not copy any running threads.
2102  * ------------------------------------------------------------------------- */
2103
2104 pid_t
2105 forkProcess(HsStablePtr *entry
2106 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2107             STG_UNUSED
2108 #endif
2109            )
2110 {
2111 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2112     Task *task;
2113     pid_t pid;
2114     StgTSO* t,*next;
2115     Capability *cap;
2116     nat s;
2117     
2118 #if defined(THREADED_RTS)
2119     if (RtsFlags.ParFlags.nNodes > 1) {
2120         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2121         stg_exit(EXIT_FAILURE);
2122     }
2123 #endif
2124
2125     debugTrace(DEBUG_sched, "forking!");
2126     
2127     // ToDo: for SMP, we should probably acquire *all* the capabilities
2128     cap = rts_lock();
2129     
2130     // no funny business: hold locks while we fork, otherwise if some
2131     // other thread is holding a lock when the fork happens, the data
2132     // structure protected by the lock will forever be in an
2133     // inconsistent state in the child.  See also #1391.
2134     ACQUIRE_LOCK(&sched_mutex);
2135     ACQUIRE_LOCK(&cap->lock);
2136     ACQUIRE_LOCK(&cap->running_task->lock);
2137
2138     pid = fork();
2139     
2140     if (pid) { // parent
2141         
2142         RELEASE_LOCK(&sched_mutex);
2143         RELEASE_LOCK(&cap->lock);
2144         RELEASE_LOCK(&cap->running_task->lock);
2145
2146         // just return the pid
2147         rts_unlock(cap);
2148         return pid;
2149         
2150     } else { // child
2151         
2152 #if defined(THREADED_RTS)
2153         initMutex(&sched_mutex);
2154         initMutex(&cap->lock);
2155         initMutex(&cap->running_task->lock);
2156 #endif
2157
2158         // Now, all OS threads except the thread that forked are
2159         // stopped.  We need to stop all Haskell threads, including
2160         // those involved in foreign calls.  Also we need to delete
2161         // all Tasks, because they correspond to OS threads that are
2162         // now gone.
2163
2164         for (s = 0; s < total_steps; s++) {
2165           for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2166             if (t->what_next == ThreadRelocated) {
2167                 next = t->_link;
2168             } else {
2169                 next = t->global_link;
2170                 // don't allow threads to catch the ThreadKilled
2171                 // exception, but we do want to raiseAsync() because these
2172                 // threads may be evaluating thunks that we need later.
2173                 deleteThread_(cap,t);
2174             }
2175           }
2176         }
2177         
2178         // Empty the run queue.  It seems tempting to let all the
2179         // killed threads stay on the run queue as zombies to be
2180         // cleaned up later, but some of them correspond to bound
2181         // threads for which the corresponding Task does not exist.
2182         cap->run_queue_hd = END_TSO_QUEUE;
2183         cap->run_queue_tl = END_TSO_QUEUE;
2184
2185         // Any suspended C-calling Tasks are no more, their OS threads
2186         // don't exist now:
2187         cap->suspended_ccalling_tasks = NULL;
2188
2189         // Empty the threads lists.  Otherwise, the garbage
2190         // collector may attempt to resurrect some of these threads.
2191         for (s = 0; s < total_steps; s++) {
2192             all_steps[s].threads = END_TSO_QUEUE;
2193         }
2194
2195         // Wipe the task list, except the current Task.
2196         ACQUIRE_LOCK(&sched_mutex);
2197         for (task = all_tasks; task != NULL; task=task->all_link) {
2198             if (task != cap->running_task) {
2199 #if defined(THREADED_RTS)
2200                 initMutex(&task->lock); // see #1391
2201 #endif
2202                 discardTask(task);
2203             }
2204         }
2205         RELEASE_LOCK(&sched_mutex);
2206
2207 #if defined(THREADED_RTS)
2208         // Wipe our spare workers list, they no longer exist.  New
2209         // workers will be created if necessary.
2210         cap->spare_workers = NULL;
2211         cap->returning_tasks_hd = NULL;
2212         cap->returning_tasks_tl = NULL;
2213 #endif
2214
2215         // On Unix, all timers are reset in the child, so we need to start
2216         // the timer again.
2217         initTimer();
2218         startTimer();
2219
2220         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2221         rts_checkSchedStatus("forkProcess",cap);
2222         
2223         rts_unlock(cap);
2224         hs_exit();                      // clean up and exit
2225         stg_exit(EXIT_SUCCESS);
2226     }
2227 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2228     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2229     return -1;
2230 #endif
2231 }
2232
2233 /* ---------------------------------------------------------------------------
2234  * Delete all the threads in the system
2235  * ------------------------------------------------------------------------- */
2236    
2237 static void
2238 deleteAllThreads ( Capability *cap )
2239 {
2240     // NOTE: only safe to call if we own all capabilities.
2241
2242     StgTSO* t, *next;
2243     nat s;
2244
2245     debugTrace(DEBUG_sched,"deleting all threads");
2246     for (s = 0; s < total_steps; s++) {
2247       for (t = all_steps[s].threads; t != END_TSO_QUEUE; t = next) {
2248         if (t->what_next == ThreadRelocated) {
2249             next = t->_link;
2250         } else {
2251             next = t->global_link;
2252             deleteThread(cap,t);
2253         }
2254       }
2255     }      
2256
2257     // The run queue now contains a bunch of ThreadKilled threads.  We
2258     // must not throw these away: the main thread(s) will be in there
2259     // somewhere, and the main scheduler loop has to deal with it.
2260     // Also, the run queue is the only thing keeping these threads from
2261     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2262
2263 #if !defined(THREADED_RTS)
2264     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2265     ASSERT(sleeping_queue == END_TSO_QUEUE);
2266 #endif
2267 }
2268
2269 /* -----------------------------------------------------------------------------
2270    Managing the suspended_ccalling_tasks list.
2271    Locks required: sched_mutex
2272    -------------------------------------------------------------------------- */
2273
2274 STATIC_INLINE void
2275 suspendTask (Capability *cap, Task *task)
2276 {
2277     ASSERT(task->next == NULL && task->prev == NULL);
2278     task->next = cap->suspended_ccalling_tasks;
2279     task->prev = NULL;
2280     if (cap->suspended_ccalling_tasks) {
2281         cap->suspended_ccalling_tasks->prev = task;
2282     }
2283     cap->suspended_ccalling_tasks = task;
2284 }
2285
2286 STATIC_INLINE void
2287 recoverSuspendedTask (Capability *cap, Task *task)
2288 {
2289     if (task->prev) {
2290         task->prev->next = task->next;
2291     } else {
2292         ASSERT(cap->suspended_ccalling_tasks == task);
2293         cap->suspended_ccalling_tasks = task->next;
2294     }
2295     if (task->next) {
2296         task->next->prev = task->prev;
2297     }
2298     task->next = task->prev = NULL;
2299 }
2300
2301 /* ---------------------------------------------------------------------------
2302  * Suspending & resuming Haskell threads.
2303  * 
2304  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2305  * its capability before calling the C function.  This allows another
2306  * task to pick up the capability and carry on running Haskell
2307  * threads.  It also means that if the C call blocks, it won't lock
2308  * the whole system.
2309  *
2310  * The Haskell thread making the C call is put to sleep for the
2311  * duration of the call, on the susepended_ccalling_threads queue.  We
2312  * give out a token to the task, which it can use to resume the thread
2313  * on return from the C function.
2314  * ------------------------------------------------------------------------- */
2315    
2316 void *
2317 suspendThread (StgRegTable *reg)
2318 {
2319   Capability *cap;
2320   int saved_errno;
2321   StgTSO *tso;
2322   Task *task;
2323 #if mingw32_HOST_OS
2324   StgWord32 saved_winerror;
2325 #endif
2326
2327   saved_errno = errno;
2328 #if mingw32_HOST_OS
2329   saved_winerror = GetLastError();
2330 #endif
2331
2332   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2333    */
2334   cap = regTableToCapability(reg);
2335
2336   task = cap->running_task;
2337   tso = cap->r.rCurrentTSO;
2338
2339   debugTrace(DEBUG_sched, 
2340              "thread %lu did a safe foreign call", 
2341              (unsigned long)cap->r.rCurrentTSO->id);
2342
2343   // XXX this might not be necessary --SDM
2344   tso->what_next = ThreadRunGHC;
2345
2346   threadPaused(cap,tso);
2347
2348   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2349       tso->why_blocked = BlockedOnCCall;
2350       tso->flags |= TSO_BLOCKEX;
2351       tso->flags &= ~TSO_INTERRUPTIBLE;
2352   } else {
2353       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2354   }
2355
2356   // Hand back capability
2357   task->suspended_tso = tso;
2358
2359   ACQUIRE_LOCK(&cap->lock);
2360
2361   suspendTask(cap,task);
2362   cap->in_haskell = rtsFalse;
2363   releaseCapability_(cap);
2364   
2365   RELEASE_LOCK(&cap->lock);
2366
2367 #if defined(THREADED_RTS)
2368   /* Preparing to leave the RTS, so ensure there's a native thread/task
2369      waiting to take over.
2370   */
2371   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2372 #endif
2373
2374   errno = saved_errno;
2375 #if mingw32_HOST_OS
2376   SetLastError(saved_winerror);
2377 #endif
2378   return task;
2379 }
2380
2381 StgRegTable *
2382 resumeThread (void *task_)
2383 {
2384     StgTSO *tso;
2385     Capability *cap;
2386     Task *task = task_;
2387     int saved_errno;
2388 #if mingw32_HOST_OS
2389     StgWord32 saved_winerror;
2390 #endif
2391
2392     saved_errno = errno;
2393 #if mingw32_HOST_OS
2394     saved_winerror = GetLastError();
2395 #endif
2396
2397     cap = task->cap;
2398     // Wait for permission to re-enter the RTS with the result.
2399     waitForReturnCapability(&cap,task);
2400     // we might be on a different capability now... but if so, our
2401     // entry on the suspended_ccalling_tasks list will also have been
2402     // migrated.
2403
2404     // Remove the thread from the suspended list
2405     recoverSuspendedTask(cap,task);
2406
2407     tso = task->suspended_tso;
2408     task->suspended_tso = NULL;
2409     tso->_link = END_TSO_QUEUE; // no write barrier reqd
2410     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2411     
2412     if (tso->why_blocked == BlockedOnCCall) {
2413         awakenBlockedExceptionQueue(cap,tso);
2414         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2415     }
2416     
2417     /* Reset blocking status */
2418     tso->why_blocked  = NotBlocked;
2419     
2420     cap->r.rCurrentTSO = tso;
2421     cap->in_haskell = rtsTrue;
2422     errno = saved_errno;
2423 #if mingw32_HOST_OS
2424     SetLastError(saved_winerror);
2425 #endif
2426
2427     /* We might have GC'd, mark the TSO dirty again */
2428     dirty_TSO(cap,tso);
2429
2430     IF_DEBUG(sanity, checkTSO(tso));
2431
2432     return &cap->r;
2433 }
2434
2435 /* ---------------------------------------------------------------------------
2436  * scheduleThread()
2437  *
2438  * scheduleThread puts a thread on the end  of the runnable queue.
2439  * This will usually be done immediately after a thread is created.
2440  * The caller of scheduleThread must create the thread using e.g.
2441  * createThread and push an appropriate closure
2442  * on this thread's stack before the scheduler is invoked.
2443  * ------------------------------------------------------------------------ */
2444
2445 void
2446 scheduleThread(Capability *cap, StgTSO *tso)
2447 {
2448     // The thread goes at the *end* of the run-queue, to avoid possible
2449     // starvation of any threads already on the queue.
2450     appendToRunQueue(cap,tso);
2451 }
2452
2453 void
2454 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2455 {
2456 #if defined(THREADED_RTS)
2457     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2458                               // move this thread from now on.
2459     cpu %= RtsFlags.ParFlags.nNodes;
2460     if (cpu == cap->no) {
2461         appendToRunQueue(cap,tso);
2462     } else {
2463         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2464     }
2465 #else
2466     appendToRunQueue(cap,tso);
2467 #endif
2468 }
2469
2470 Capability *
2471 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2472 {
2473     Task *task;
2474
2475     // We already created/initialised the Task
2476     task = cap->running_task;
2477
2478     // This TSO is now a bound thread; make the Task and TSO
2479     // point to each other.
2480     tso->bound = task;
2481     tso->cap = cap;
2482
2483     task->tso = tso;
2484     task->ret = ret;
2485     task->stat = NoStatus;
2486
2487     appendToRunQueue(cap,tso);
2488
2489     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2490
2491 #if defined(GRAN)
2492     /* GranSim specific init */
2493     CurrentTSO = m->tso;                // the TSO to run
2494     procStatus[MainProc] = Busy;        // status of main PE
2495     CurrentProc = MainProc;             // PE to run it on
2496 #endif
2497
2498     cap = schedule(cap,task);
2499
2500     ASSERT(task->stat != NoStatus);
2501     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2502
2503     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2504     return cap;
2505 }
2506
2507 /* ----------------------------------------------------------------------------
2508  * Starting Tasks
2509  * ------------------------------------------------------------------------- */
2510
2511 #if defined(THREADED_RTS)
2512 void
2513 workerStart(Task *task)
2514 {
2515     Capability *cap;
2516
2517     // See startWorkerTask().
2518     ACQUIRE_LOCK(&task->lock);
2519     cap = task->cap;
2520     RELEASE_LOCK(&task->lock);
2521
2522     // set the thread-local pointer to the Task:
2523     taskEnter(task);
2524
2525     // schedule() runs without a lock.
2526     cap = schedule(cap,task);
2527
2528     // On exit from schedule(), we have a Capability.
2529     releaseCapability(cap);
2530     workerTaskStop(task);
2531 }
2532 #endif
2533
2534 /* ---------------------------------------------------------------------------
2535  * initScheduler()
2536  *
2537  * Initialise the scheduler.  This resets all the queues - if the
2538  * queues contained any threads, they'll be garbage collected at the
2539  * next pass.
2540  *
2541  * ------------------------------------------------------------------------ */
2542
2543 void 
2544 initScheduler(void)
2545 {
2546 #if defined(GRAN)
2547   nat i;
2548   for (i=0; i<=MAX_PROC; i++) {
2549     run_queue_hds[i]      = END_TSO_QUEUE;
2550     run_queue_tls[i]      = END_TSO_QUEUE;
2551     blocked_queue_hds[i]  = END_TSO_QUEUE;
2552     blocked_queue_tls[i]  = END_TSO_QUEUE;
2553     ccalling_threadss[i]  = END_TSO_QUEUE;
2554     blackhole_queue[i]    = END_TSO_QUEUE;
2555     sleeping_queue        = END_TSO_QUEUE;
2556   }
2557 #elif !defined(THREADED_RTS)
2558   blocked_queue_hd  = END_TSO_QUEUE;
2559   blocked_queue_tl  = END_TSO_QUEUE;
2560   sleeping_queue    = END_TSO_QUEUE;
2561 #endif
2562
2563   blackhole_queue   = END_TSO_QUEUE;
2564
2565   context_switch = 0;
2566   sched_state    = SCHED_RUNNING;
2567   recent_activity = ACTIVITY_YES;
2568
2569 #if defined(THREADED_RTS)
2570   /* Initialise the mutex and condition variables used by
2571    * the scheduler. */
2572   initMutex(&sched_mutex);
2573 #endif
2574   
2575   ACQUIRE_LOCK(&sched_mutex);
2576
2577   /* A capability holds the state a native thread needs in
2578    * order to execute STG code. At least one capability is
2579    * floating around (only THREADED_RTS builds have more than one).
2580    */
2581   initCapabilities();
2582
2583   initTaskManager();
2584
2585 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2586   initSparkPools();
2587 #endif
2588
2589 #if defined(THREADED_RTS)
2590   /*
2591    * Eagerly start one worker to run each Capability, except for
2592    * Capability 0.  The idea is that we're probably going to start a
2593    * bound thread on Capability 0 pretty soon, so we don't want a
2594    * worker task hogging it.
2595    */
2596   { 
2597       nat i;
2598       Capability *cap;
2599       for (i = 1; i < n_capabilities; i++) {
2600           cap = &capabilities[i];
2601           ACQUIRE_LOCK(&cap->lock);
2602           startWorkerTask(cap, workerStart);
2603           RELEASE_LOCK(&cap->lock);
2604       }
2605   }
2606 #endif
2607
2608   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2609
2610   RELEASE_LOCK(&sched_mutex);
2611 }
2612
2613 void
2614 exitScheduler(
2615     rtsBool wait_foreign
2616 #if !defined(THREADED_RTS)
2617                          __attribute__((unused))
2618 #endif
2619 )
2620                /* see Capability.c, shutdownCapability() */
2621 {
2622     Task *task = NULL;
2623
2624 #if defined(THREADED_RTS)
2625     ACQUIRE_LOCK(&sched_mutex);
2626     task = newBoundTask();
2627     RELEASE_LOCK(&sched_mutex);
2628 #endif
2629
2630     // If we haven't killed all the threads yet, do it now.
2631     if (sched_state < SCHED_SHUTTING_DOWN) {
2632         sched_state = SCHED_INTERRUPTING;
2633         scheduleDoGC(NULL,task,rtsFalse);    
2634     }
2635     sched_state = SCHED_SHUTTING_DOWN;
2636
2637 #if defined(THREADED_RTS)
2638     { 
2639         nat i;
2640         
2641         for (i = 0; i < n_capabilities; i++) {
2642             shutdownCapability(&capabilities[i], task, wait_foreign);
2643         }
2644         boundTaskExiting(task);
2645         stopTaskManager();
2646     }
2647 #else
2648     freeCapability(&MainCapability);
2649 #endif
2650 }
2651
2652 void
2653 freeScheduler( void )
2654 {
2655     freeTaskManager();
2656     if (n_capabilities != 1) {
2657         stgFree(capabilities);
2658     }
2659 #if defined(THREADED_RTS)
2660     closeMutex(&sched_mutex);
2661 #endif
2662 }
2663
2664 /* -----------------------------------------------------------------------------
2665    performGC
2666
2667    This is the interface to the garbage collector from Haskell land.
2668    We provide this so that external C code can allocate and garbage
2669    collect when called from Haskell via _ccall_GC.
2670    -------------------------------------------------------------------------- */
2671
2672 static void
2673 performGC_(rtsBool force_major)
2674 {
2675     Task *task;
2676     // We must grab a new Task here, because the existing Task may be
2677     // associated with a particular Capability, and chained onto the 
2678     // suspended_ccalling_tasks queue.
2679     ACQUIRE_LOCK(&sched_mutex);
2680     task = newBoundTask();
2681     RELEASE_LOCK(&sched_mutex);
2682     scheduleDoGC(NULL,task,force_major);
2683     boundTaskExiting(task);
2684 }
2685
2686 void
2687 performGC(void)
2688 {
2689     performGC_(rtsFalse);
2690 }
2691
2692 void
2693 performMajorGC(void)
2694 {
2695     performGC_(rtsTrue);
2696 }
2697
2698 /* -----------------------------------------------------------------------------
2699    Stack overflow
2700
2701    If the thread has reached its maximum stack size, then raise the
2702    StackOverflow exception in the offending thread.  Otherwise
2703    relocate the TSO into a larger chunk of memory and adjust its stack
2704    size appropriately.
2705    -------------------------------------------------------------------------- */
2706
2707 static StgTSO *
2708 threadStackOverflow(Capability *cap, StgTSO *tso)
2709 {
2710   nat new_stack_size, stack_words;
2711   lnat new_tso_size;
2712   StgPtr new_sp;
2713   StgTSO *dest;
2714
2715   IF_DEBUG(sanity,checkTSO(tso));
2716
2717   // don't allow throwTo() to modify the blocked_exceptions queue
2718   // while we are moving the TSO:
2719   lockClosure((StgClosure *)tso);
2720
2721   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2722       // NB. never raise a StackOverflow exception if the thread is
2723       // inside Control.Exceptino.block.  It is impractical to protect
2724       // against stack overflow exceptions, since virtually anything
2725       // can raise one (even 'catch'), so this is the only sensible
2726       // thing to do here.  See bug #767.
2727
2728       debugTrace(DEBUG_gc,
2729                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2730                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2731       IF_DEBUG(gc,
2732                /* If we're debugging, just print out the top of the stack */
2733                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2734                                                 tso->sp+64)));
2735
2736       // Send this thread the StackOverflow exception
2737       unlockTSO(tso);
2738       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2739       return tso;
2740   }
2741
2742   /* Try to double the current stack size.  If that takes us over the
2743    * maximum stack size for this thread, then use the maximum instead.
2744    * Finally round up so the TSO ends up as a whole number of blocks.
2745    */
2746   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2747   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2748                                        TSO_STRUCT_SIZE)/sizeof(W_);
2749   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2750   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2751
2752   debugTrace(DEBUG_sched, 
2753              "increasing stack size from %ld words to %d.",
2754              (long)tso->stack_size, new_stack_size);
2755
2756   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2757   TICK_ALLOC_TSO(new_stack_size,0);
2758
2759   /* copy the TSO block and the old stack into the new area */
2760   memcpy(dest,tso,TSO_STRUCT_SIZE);
2761   stack_words = tso->stack + tso->stack_size - tso->sp;
2762   new_sp = (P_)dest + new_tso_size - stack_words;
2763   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2764
2765   /* relocate the stack pointers... */
2766   dest->sp         = new_sp;
2767   dest->stack_size = new_stack_size;
2768         
2769   /* Mark the old TSO as relocated.  We have to check for relocated
2770    * TSOs in the garbage collector and any primops that deal with TSOs.
2771    *
2772    * It's important to set the sp value to just beyond the end
2773    * of the stack, so we don't attempt to scavenge any part of the
2774    * dead TSO's stack.
2775    */
2776   tso->what_next = ThreadRelocated;
2777   setTSOLink(cap,tso,dest);
2778   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2779   tso->why_blocked = NotBlocked;
2780
2781   IF_PAR_DEBUG(verbose,
2782                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2783                      tso->id, tso, tso->stack_size);
2784                /* If we're debugging, just print out the top of the stack */
2785                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2786                                                 tso->sp+64)));
2787   
2788   unlockTSO(dest);
2789   unlockTSO(tso);
2790
2791   IF_DEBUG(sanity,checkTSO(dest));
2792 #if 0
2793   IF_DEBUG(scheduler,printTSO(dest));
2794 #endif
2795
2796   return dest;
2797 }
2798
2799 static StgTSO *
2800 threadStackUnderflow (Task *task STG_UNUSED, StgTSO *tso)
2801 {
2802     bdescr *bd, *new_bd;
2803     lnat new_tso_size_w, tso_size_w;
2804     StgTSO *new_tso;
2805
2806     tso_size_w = tso_sizeW(tso);
2807
2808     if (tso_size_w < MBLOCK_SIZE_W || 
2809         (nat)(tso->stack + tso->stack_size - tso->sp) > tso->stack_size / 4) 
2810     {
2811         return tso;
2812     }
2813
2814     // don't allow throwTo() to modify the blocked_exceptions queue
2815     // while we are moving the TSO:
2816     lockClosure((StgClosure *)tso);
2817
2818     new_tso_size_w = round_to_mblocks(tso_size_w/2);
2819
2820     debugTrace(DEBUG_sched, "thread %ld: reducing TSO size from %lu words to %lu",
2821                tso->id, tso_size_w, new_tso_size_w);
2822
2823     bd = Bdescr((StgPtr)tso);
2824     new_bd = splitLargeBlock(bd, new_tso_size_w / BLOCK_SIZE_W);
2825     new_bd->free = bd->free;
2826     bd->free = bd->start + TSO_STRUCT_SIZEW;
2827
2828     new_tso = (StgTSO *)new_bd->start;
2829     memcpy(new_tso,tso,TSO_STRUCT_SIZE);
2830     new_tso->stack_size = new_tso_size_w - TSO_STRUCT_SIZEW;
2831
2832     tso->what_next = ThreadRelocated;
2833     tso->_link = new_tso; // no write barrier reqd: same generation
2834
2835     // The TSO attached to this Task may have moved, so update the
2836     // pointer to it.
2837     if (task->tso == tso) {
2838         task->tso = new_tso;
2839     }
2840
2841     unlockTSO(new_tso);
2842     unlockTSO(tso);
2843
2844     IF_DEBUG(sanity,checkTSO(new_tso));
2845
2846     return new_tso;
2847 }
2848
2849 /* ---------------------------------------------------------------------------
2850    Interrupt execution
2851    - usually called inside a signal handler so it mustn't do anything fancy.   
2852    ------------------------------------------------------------------------ */
2853
2854 void
2855 interruptStgRts(void)
2856 {
2857     sched_state = SCHED_INTERRUPTING;
2858     context_switch = 1;
2859     wakeUpRts();
2860 }
2861
2862 /* -----------------------------------------------------------------------------
2863    Wake up the RTS
2864    
2865    This function causes at least one OS thread to wake up and run the
2866    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2867    an external event has arrived that may need servicing (eg. a
2868    keyboard interrupt).
2869
2870    In the single-threaded RTS we don't do anything here; we only have
2871    one thread anyway, and the event that caused us to want to wake up
2872    will have interrupted any blocking system call in progress anyway.
2873    -------------------------------------------------------------------------- */
2874
2875 void
2876 wakeUpRts(void)
2877 {
2878 #if defined(THREADED_RTS)
2879     // This forces the IO Manager thread to wakeup, which will
2880     // in turn ensure that some OS thread wakes up and runs the
2881     // scheduler loop, which will cause a GC and deadlock check.
2882     ioManagerWakeup();
2883 #endif
2884 }
2885
2886 /* -----------------------------------------------------------------------------
2887  * checkBlackHoles()
2888  *
2889  * Check the blackhole_queue for threads that can be woken up.  We do
2890  * this periodically: before every GC, and whenever the run queue is
2891  * empty.
2892  *
2893  * An elegant solution might be to just wake up all the blocked
2894  * threads with awakenBlockedQueue occasionally: they'll go back to
2895  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2896  * doesn't give us a way to tell whether we've actually managed to
2897  * wake up any threads, so we would be busy-waiting.
2898  *
2899  * -------------------------------------------------------------------------- */
2900
2901 static rtsBool
2902 checkBlackHoles (Capability *cap)
2903 {
2904     StgTSO **prev, *t;
2905     rtsBool any_woke_up = rtsFalse;
2906     StgHalfWord type;
2907
2908     // blackhole_queue is global:
2909     ASSERT_LOCK_HELD(&sched_mutex);
2910
2911     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2912
2913     // ASSUMES: sched_mutex
2914     prev = &blackhole_queue;
2915     t = blackhole_queue;
2916     while (t != END_TSO_QUEUE) {
2917         ASSERT(t->why_blocked == BlockedOnBlackHole);
2918         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2919         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2920             IF_DEBUG(sanity,checkTSO(t));
2921             t = unblockOne(cap, t);
2922             // urk, the threads migrate to the current capability
2923             // here, but we'd like to keep them on the original one.
2924             *prev = t;
2925             any_woke_up = rtsTrue;
2926         } else {
2927             prev = &t->_link;
2928             t = t->_link;
2929         }
2930     }
2931
2932     return any_woke_up;
2933 }
2934
2935 /* -----------------------------------------------------------------------------
2936    Deleting threads
2937
2938    This is used for interruption (^C) and forking, and corresponds to
2939    raising an exception but without letting the thread catch the
2940    exception.
2941    -------------------------------------------------------------------------- */
2942
2943 static void 
2944 deleteThread (Capability *cap, StgTSO *tso)
2945 {
2946     // NOTE: must only be called on a TSO that we have exclusive
2947     // access to, because we will call throwToSingleThreaded() below.
2948     // The TSO must be on the run queue of the Capability we own, or 
2949     // we must own all Capabilities.
2950
2951     if (tso->why_blocked != BlockedOnCCall &&
2952         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2953         throwToSingleThreaded(cap,tso,NULL);
2954     }
2955 }
2956
2957 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2958 static void 
2959 deleteThread_(Capability *cap, StgTSO *tso)
2960 { // for forkProcess only:
2961   // like deleteThread(), but we delete threads in foreign calls, too.
2962
2963     if (tso->why_blocked == BlockedOnCCall ||
2964         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2965         unblockOne(cap,tso);
2966         tso->what_next = ThreadKilled;
2967     } else {
2968         deleteThread(cap,tso);
2969     }
2970 }
2971 #endif
2972
2973 /* -----------------------------------------------------------------------------
2974    raiseExceptionHelper
2975    
2976    This function is called by the raise# primitve, just so that we can
2977    move some of the tricky bits of raising an exception from C-- into
2978    C.  Who knows, it might be a useful re-useable thing here too.
2979    -------------------------------------------------------------------------- */
2980
2981 StgWord
2982 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2983 {
2984     Capability *cap = regTableToCapability(reg);
2985     StgThunk *raise_closure = NULL;
2986     StgPtr p, next;
2987     StgRetInfoTable *info;
2988     //
2989     // This closure represents the expression 'raise# E' where E
2990     // is the exception raise.  It is used to overwrite all the
2991     // thunks which are currently under evaluataion.
2992     //
2993
2994     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2995     // LDV profiling: stg_raise_info has THUNK as its closure
2996     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2997     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2998     // 1 does not cause any problem unless profiling is performed.
2999     // However, when LDV profiling goes on, we need to linearly scan
3000     // small object pool, where raise_closure is stored, so we should
3001     // use MIN_UPD_SIZE.
3002     //
3003     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3004     //                                 sizeofW(StgClosure)+1);
3005     //
3006
3007     //
3008     // Walk up the stack, looking for the catch frame.  On the way,
3009     // we update any closures pointed to from update frames with the
3010     // raise closure that we just built.
3011     //
3012     p = tso->sp;
3013     while(1) {
3014         info = get_ret_itbl((StgClosure *)p);
3015         next = p + stack_frame_sizeW((StgClosure *)p);
3016         switch (info->i.type) {
3017             
3018         case UPDATE_FRAME:
3019             // Only create raise_closure if we need to.
3020             if (raise_closure == NULL) {
3021                 raise_closure = 
3022                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3023                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3024                 raise_closure->payload[0] = exception;
3025             }
3026             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3027             p = next;
3028             continue;
3029
3030         case ATOMICALLY_FRAME:
3031             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3032             tso->sp = p;
3033             return ATOMICALLY_FRAME;
3034             
3035         case CATCH_FRAME:
3036             tso->sp = p;
3037             return CATCH_FRAME;
3038
3039         case CATCH_STM_FRAME:
3040             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3041             tso->sp = p;
3042             return CATCH_STM_FRAME;
3043             
3044         case STOP_FRAME:
3045             tso->sp = p;
3046             return STOP_FRAME;
3047
3048         case CATCH_RETRY_FRAME:
3049         default:
3050             p = next; 
3051             continue;
3052         }
3053     }
3054 }
3055
3056
3057 /* -----------------------------------------------------------------------------
3058    findRetryFrameHelper
3059
3060    This function is called by the retry# primitive.  It traverses the stack
3061    leaving tso->sp referring to the frame which should handle the retry.  
3062
3063    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3064    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3065
3066    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3067    create) because retries are not considered to be exceptions, despite the
3068    similar implementation.
3069
3070    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3071    not be created within memory transactions.
3072    -------------------------------------------------------------------------- */
3073
3074 StgWord
3075 findRetryFrameHelper (StgTSO *tso)
3076 {
3077   StgPtr           p, next;
3078   StgRetInfoTable *info;
3079
3080   p = tso -> sp;
3081   while (1) {
3082     info = get_ret_itbl((StgClosure *)p);
3083     next = p + stack_frame_sizeW((StgClosure *)p);
3084     switch (info->i.type) {
3085       
3086     case ATOMICALLY_FRAME:
3087         debugTrace(DEBUG_stm,
3088                    "found ATOMICALLY_FRAME at %p during retry", p);
3089         tso->sp = p;
3090         return ATOMICALLY_FRAME;
3091       
3092     case CATCH_RETRY_FRAME:
3093         debugTrace(DEBUG_stm,
3094                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3095         tso->sp = p;
3096         return CATCH_RETRY_FRAME;
3097       
3098     case CATCH_STM_FRAME: {
3099         StgTRecHeader *trec = tso -> trec;
3100         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3101         debugTrace(DEBUG_stm,
3102                    "found CATCH_STM_FRAME at %p during retry", p);
3103         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3104         stmAbortTransaction(tso -> cap, trec);
3105         stmFreeAbortedTRec(tso -> cap, trec);
3106         tso -> trec = outer;
3107         p = next; 
3108         continue;
3109     }
3110       
3111
3112     default:
3113       ASSERT(info->i.type != CATCH_FRAME);
3114       ASSERT(info->i.type != STOP_FRAME);
3115       p = next; 
3116       continue;
3117     }
3118   }
3119 }
3120
3121 /* -----------------------------------------------------------------------------
3122    resurrectThreads is called after garbage collection on the list of
3123    threads found to be garbage.  Each of these threads will be woken
3124    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3125    on an MVar, or NonTermination if the thread was blocked on a Black
3126    Hole.
3127
3128    Locks: assumes we hold *all* the capabilities.
3129    -------------------------------------------------------------------------- */
3130
3131 void
3132 resurrectThreads (StgTSO *threads)
3133 {
3134     StgTSO *tso, *next;
3135     Capability *cap;
3136     step *step;
3137
3138     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3139         next = tso->global_link;
3140
3141         step = Bdescr((P_)tso)->step;
3142         tso->global_link = step->threads;
3143         step->threads = tso;
3144
3145         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3146         
3147         // Wake up the thread on the Capability it was last on
3148         cap = tso->cap;
3149         
3150         switch (tso->why_blocked) {
3151         case BlockedOnMVar:
3152         case BlockedOnException:
3153             /* Called by GC - sched_mutex lock is currently held. */
3154             throwToSingleThreaded(cap, tso,
3155                                   (StgClosure *)BlockedOnDeadMVar_closure);
3156             break;
3157         case BlockedOnBlackHole:
3158             throwToSingleThreaded(cap, tso,
3159                                   (StgClosure *)nonTermination_closure);
3160             break;
3161         case BlockedOnSTM:
3162             throwToSingleThreaded(cap, tso,
3163                                   (StgClosure *)BlockedIndefinitely_closure);
3164             break;
3165         case NotBlocked:
3166             /* This might happen if the thread was blocked on a black hole
3167              * belonging to a thread that we've just woken up (raiseAsync
3168              * can wake up threads, remember...).
3169              */
3170             continue;
3171         default:
3172             barf("resurrectThreads: thread blocked in a strange way");
3173         }
3174     }
3175 }
3176
3177 /* -----------------------------------------------------------------------------
3178    performPendingThrowTos is called after garbage collection, and
3179    passed a list of threads that were found to have pending throwTos
3180    (tso->blocked_exceptions was not empty), and were blocked.
3181    Normally this doesn't happen, because we would deliver the
3182    exception directly if the target thread is blocked, but there are
3183    small windows where it might occur on a multiprocessor (see
3184    throwTo()).
3185
3186    NB. we must be holding all the capabilities at this point, just
3187    like resurrectThreads().
3188    -------------------------------------------------------------------------- */
3189
3190 void
3191 performPendingThrowTos (StgTSO *threads)
3192 {
3193     StgTSO *tso, *next;
3194     Capability *cap;
3195     step *step;
3196
3197     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3198         next = tso->global_link;
3199
3200         step = Bdescr((P_)tso)->step;
3201         tso->global_link = step->threads;
3202         step->threads = tso;
3203
3204         debugTrace(DEBUG_sched, "performing blocked throwTo to thread %lu", (unsigned long)tso->id);
3205         
3206         cap = tso->cap;
3207         maybePerformBlockedException(cap, tso);
3208     }
3209 }