kill the PAR/GRAN debug flags
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122  * Used for detecting garbage collected threads.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128  * LOCK: none (just an advisory flag)
129  */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133  * LOCK: currently none, perhaps we should lock (but needs to be
134  * updated in the fast path of the scheduler).
135  */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139  * LOCK: none (changes once, from false->true)
140  */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 #if defined(GRAN)
144 StgTSO *CurrentTSO;
145 #endif
146
147 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
148  *  exists - earlier gccs apparently didn't.
149  *  -= chak
150  */
151 StgTSO dummy_tso;
152
153 /*
154  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155  * in an MT setting, needed to signal that a worker thread shouldn't hang around
156  * in the scheduler when it is out of work.
157  */
158 rtsBool shutting_down_scheduler = rtsFalse;
159
160 /*
161  * This mutex protects most of the global scheduler data in
162  * the THREADED_RTS runtime.
163  */
164 #if defined(THREADED_RTS)
165 Mutex sched_mutex;
166 #endif
167
168 #if defined(PARALLEL_HASKELL)
169 StgTSO *LastTSO;
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
172 #endif
173
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
176 #endif
177
178 /* -----------------------------------------------------------------------------
179  * static function prototypes
180  * -------------------------------------------------------------------------- */
181
182 static Capability *schedule (Capability *initialCapability, Task *task);
183
184 //
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
188 //
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
192 #endif
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
198 #if defined(GRAN)
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
200 #endif
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
205 #endif
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
208 #endif
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
212                                          StgTSO *t);
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
214                                     nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
217                                              StgTSO *t );
218 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220                                 rtsBool force_major);
221
222 static rtsBool checkBlackHoles(Capability *cap);
223
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
225
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
228
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
231 #endif
232
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);  
236 #endif
237
238 #ifdef DEBUG
239 static char *whatNext_strs[] = {
240   "(unknown)",
241   "ThreadRunGHC",
242   "ThreadInterpret",
243   "ThreadKilled",
244   "ThreadRelocated",
245   "ThreadComplete"
246 };
247 #endif
248
249 /* -----------------------------------------------------------------------------
250  * Putting a thread on the run queue: different scheduling policies
251  * -------------------------------------------------------------------------- */
252
253 STATIC_INLINE void
254 addToRunQueue( Capability *cap, StgTSO *t )
255 {
256 #if defined(PARALLEL_HASKELL)
257     if (RtsFlags.ParFlags.doFairScheduling) { 
258         // this does round-robin scheduling; good for concurrency
259         appendToRunQueue(cap,t);
260     } else {
261         // this does unfair scheduling; good for parallelism
262         pushOnRunQueue(cap,t);
263     }
264 #else
265     // this does round-robin scheduling; good for concurrency
266     appendToRunQueue(cap,t);
267 #endif
268 }
269
270 /* ---------------------------------------------------------------------------
271    Main scheduling loop.
272
273    We use round-robin scheduling, each thread returning to the
274    scheduler loop when one of these conditions is detected:
275
276       * out of heap space
277       * timer expires (thread yields)
278       * thread blocks
279       * thread ends
280       * stack overflow
281
282    GRAN version:
283      In a GranSim setup this loop iterates over the global event queue.
284      This revolves around the global event queue, which determines what 
285      to do next. Therefore, it's more complicated than either the 
286      concurrent or the parallel (GUM) setup.
287
288    GUM version:
289      GUM iterates over incoming messages.
290      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291      and sends out a fish whenever it has nothing to do; in-between
292      doing the actual reductions (shared code below) it processes the
293      incoming messages and deals with delayed operations 
294      (see PendingFetches).
295      This is not the ugliest code you could imagine, but it's bloody close.
296
297    ------------------------------------------------------------------------ */
298
299 static Capability *
300 schedule (Capability *initialCapability, Task *task)
301 {
302   StgTSO *t;
303   Capability *cap;
304   StgThreadReturnCode ret;
305 #if defined(GRAN)
306   rtsEvent *event;
307 #elif defined(PARALLEL_HASKELL)
308   StgTSO *tso;
309   GlobalTaskId pe;
310   rtsBool receivedFinish = rtsFalse;
311 # if defined(DEBUG)
312   nat tp_size, sp_size; // stats only
313 # endif
314 #endif
315   nat prev_what_next;
316   rtsBool ready_to_gc;
317 #if defined(THREADED_RTS)
318   rtsBool first = rtsTrue;
319 #endif
320   
321   cap = initialCapability;
322
323   // Pre-condition: this task owns initialCapability.
324   // The sched_mutex is *NOT* held
325   // NB. on return, we still hold a capability.
326
327   debugTrace (DEBUG_sched, 
328               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329               task, initialCapability);
330
331   schedulePreLoop();
332
333   // -----------------------------------------------------------
334   // Scheduler loop starts here:
335
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION        (!receivedFinish)
338 #elif defined(GRAN)
339 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
340 #else
341 #define TERMINATION_CONDITION        rtsTrue
342 #endif
343
344   while (TERMINATION_CONDITION) {
345
346 #if defined(GRAN)
347       /* Choose the processor with the next event */
348       CurrentProc = event->proc;
349       CurrentTSO = event->tso;
350 #endif
351
352 #if defined(THREADED_RTS)
353       if (first) {
354           // don't yield the first time, we want a chance to run this
355           // thread for a bit, even if there are others banging at the
356           // door.
357           first = rtsFalse;
358           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
359       } else {
360           // Yield the capability to higher-priority tasks if necessary.
361           yieldCapability(&cap, task);
362       }
363 #endif
364       
365 #if defined(THREADED_RTS)
366       schedulePushWork(cap,task);
367 #endif
368
369     // Check whether we have re-entered the RTS from Haskell without
370     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
371     // call).
372     if (cap->in_haskell) {
373           errorBelch("schedule: re-entered unsafely.\n"
374                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
375           stg_exit(EXIT_FAILURE);
376     }
377
378     // The interruption / shutdown sequence.
379     // 
380     // In order to cleanly shut down the runtime, we want to:
381     //   * make sure that all main threads return to their callers
382     //     with the state 'Interrupted'.
383     //   * clean up all OS threads assocated with the runtime
384     //   * free all memory etc.
385     //
386     // So the sequence for ^C goes like this:
387     //
388     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
389     //     arranges for some Capability to wake up
390     //
391     //   * all threads in the system are halted, and the zombies are
392     //     placed on the run queue for cleaning up.  We acquire all
393     //     the capabilities in order to delete the threads, this is
394     //     done by scheduleDoGC() for convenience (because GC already
395     //     needs to acquire all the capabilities).  We can't kill
396     //     threads involved in foreign calls.
397     // 
398     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
399     //
400     //   * sched_state := SCHED_SHUTTING_DOWN
401     //
402     //   * all workers exit when the run queue on their capability
403     //     drains.  All main threads will also exit when their TSO
404     //     reaches the head of the run queue and they can return.
405     //
406     //   * eventually all Capabilities will shut down, and the RTS can
407     //     exit.
408     //
409     //   * We might be left with threads blocked in foreign calls, 
410     //     we should really attempt to kill these somehow (TODO);
411     
412     switch (sched_state) {
413     case SCHED_RUNNING:
414         break;
415     case SCHED_INTERRUPTING:
416         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418         discardSparksCap(cap);
419 #endif
420         /* scheduleDoGC() deletes all the threads */
421         cap = scheduleDoGC(cap,task,rtsFalse);
422         break;
423     case SCHED_SHUTTING_DOWN:
424         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425         // If we are a worker, just exit.  If we're a bound thread
426         // then we will exit below when we've removed our TSO from
427         // the run queue.
428         if (task->tso == NULL && emptyRunQueue(cap)) {
429             return cap;
430         }
431         break;
432     default:
433         barf("sched_state: %d", sched_state);
434     }
435
436 #if defined(THREADED_RTS)
437     // If the run queue is empty, take a spark and turn it into a thread.
438     {
439         if (emptyRunQueue(cap)) {
440             StgClosure *spark;
441             spark = findSpark(cap);
442             if (spark != NULL) {
443                 debugTrace(DEBUG_sched,
444                            "turning spark of closure %p into a thread",
445                            (StgClosure *)spark);
446                 createSparkThread(cap,spark);     
447             }
448         }
449     }
450 #endif // THREADED_RTS
451
452     scheduleStartSignalHandlers(cap);
453
454     // Only check the black holes here if we've nothing else to do.
455     // During normal execution, the black hole list only gets checked
456     // at GC time, to avoid repeatedly traversing this possibly long
457     // list each time around the scheduler.
458     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
459
460     scheduleCheckWakeupThreads(cap);
461
462     scheduleCheckBlockedThreads(cap);
463
464     scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466     cap = task->cap;    // reload cap, it might have changed
467 #endif
468
469     // Normally, the only way we can get here with no threads to
470     // run is if a keyboard interrupt received during 
471     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472     // Additionally, it is not fatal for the
473     // threaded RTS to reach here with no threads to run.
474     //
475     // win32: might be here due to awaitEvent() being abandoned
476     // as a result of a console event having been delivered.
477     if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479         ASSERT(sched_state >= SCHED_INTERRUPTING);
480 #endif
481         continue; // nothing to do
482     }
483
484 #if defined(PARALLEL_HASKELL)
485     scheduleSendPendingMessages();
486     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
487         continue;
488
489 #if defined(SPARKS)
490     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
491 #endif
492
493     /* If we still have no work we need to send a FISH to get a spark
494        from another PE */
495     if (emptyRunQueue(cap)) {
496         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497         ASSERT(rtsFalse); // should not happen at the moment
498     }
499     // from here: non-empty run queue.
500     //  TODO: merge above case with this, only one call processMessages() !
501     if (PacketsWaiting()) {  /* process incoming messages, if
502                                 any pending...  only in else
503                                 because getRemoteWork waits for
504                                 messages as well */
505         receivedFinish = processMessages();
506     }
507 #endif
508
509 #if defined(GRAN)
510     scheduleProcessEvent(event);
511 #endif
512
513     // 
514     // Get a thread to run
515     //
516     t = popRunQueue(cap);
517
518 #if defined(GRAN) || defined(PAR)
519     scheduleGranParReport(); // some kind of debuging output
520 #else
521     // Sanity check the thread we're about to run.  This can be
522     // expensive if there is lots of thread switching going on...
523     IF_DEBUG(sanity,checkTSO(t));
524 #endif
525
526 #if defined(THREADED_RTS)
527     // Check whether we can run this thread in the current task.
528     // If not, we have to pass our capability to the right task.
529     {
530         Task *bound = t->bound;
531       
532         if (bound) {
533             if (bound == task) {
534                 debugTrace(DEBUG_sched,
535                            "### Running thread %lu in bound thread", (unsigned long)t->id);
536                 // yes, the Haskell thread is bound to the current native thread
537             } else {
538                 debugTrace(DEBUG_sched,
539                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
540                 // no, bound to a different Haskell thread: pass to that thread
541                 pushOnRunQueue(cap,t);
542                 continue;
543             }
544         } else {
545             // The thread we want to run is unbound.
546             if (task->tso) { 
547                 debugTrace(DEBUG_sched,
548                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549                 // no, the current native thread is bound to a different
550                 // Haskell thread, so pass it to any worker thread
551                 pushOnRunQueue(cap,t);
552                 continue; 
553             }
554         }
555     }
556 #endif
557
558     cap->r.rCurrentTSO = t;
559     
560     /* context switches are initiated by the timer signal, unless
561      * the user specified "context switch as often as possible", with
562      * +RTS -C0
563      */
564     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565         && !emptyThreadQueues(cap)) {
566         context_switch = 1;
567     }
568          
569 run_thread:
570
571     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
572                               (long)t->id, whatNext_strs[t->what_next]);
573
574     startHeapProfTimer();
575
576     // Check for exceptions blocked on this thread
577     maybePerformBlockedException (cap, t);
578
579     // ----------------------------------------------------------------------
580     // Run the current thread 
581
582     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583     ASSERT(t->cap == cap);
584
585     prev_what_next = t->what_next;
586
587     errno = t->saved_errno;
588 #if mingw32_HOST_OS
589     SetLastError(t->saved_winerror);
590 #endif
591
592     cap->in_haskell = rtsTrue;
593
594     dirtyTSO(t);
595
596 #if defined(THREADED_RTS)
597     if (recent_activity == ACTIVITY_DONE_GC) {
598         // ACTIVITY_DONE_GC means we turned off the timer signal to
599         // conserve power (see #1623).  Re-enable it here.
600         nat prev;
601         prev = xchg(&recent_activity, ACTIVITY_YES);
602         if (prev == ACTIVITY_DONE_GC) {
603             startTimer();
604         }
605     } else {
606         recent_activity = ACTIVITY_YES;
607     }
608 #endif
609
610     switch (prev_what_next) {
611         
612     case ThreadKilled:
613     case ThreadComplete:
614         /* Thread already finished, return to scheduler. */
615         ret = ThreadFinished;
616         break;
617         
618     case ThreadRunGHC:
619     {
620         StgRegTable *r;
621         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
622         cap = regTableToCapability(r);
623         ret = r->rRet;
624         break;
625     }
626     
627     case ThreadInterpret:
628         cap = interpretBCO(cap);
629         ret = cap->r.rRet;
630         break;
631         
632     default:
633         barf("schedule: invalid what_next field");
634     }
635
636     cap->in_haskell = rtsFalse;
637
638     // The TSO might have moved, eg. if it re-entered the RTS and a GC
639     // happened.  So find the new location:
640     t = cap->r.rCurrentTSO;
641
642     // We have run some Haskell code: there might be blackhole-blocked
643     // threads to wake up now.
644     // Lock-free test here should be ok, we're just setting a flag.
645     if ( blackhole_queue != END_TSO_QUEUE ) {
646         blackholes_need_checking = rtsTrue;
647     }
648     
649     // And save the current errno in this thread.
650     // XXX: possibly bogus for SMP because this thread might already
651     // be running again, see code below.
652     t->saved_errno = errno;
653 #if mingw32_HOST_OS
654     // Similarly for Windows error code
655     t->saved_winerror = GetLastError();
656 #endif
657
658 #if defined(THREADED_RTS)
659     // If ret is ThreadBlocked, and this Task is bound to the TSO that
660     // blocked, we are in limbo - the TSO is now owned by whatever it
661     // is blocked on, and may in fact already have been woken up,
662     // perhaps even on a different Capability.  It may be the case
663     // that task->cap != cap.  We better yield this Capability
664     // immediately and return to normaility.
665     if (ret == ThreadBlocked) {
666         debugTrace(DEBUG_sched,
667                    "--<< thread %lu (%s) stopped: blocked",
668                    (unsigned long)t->id, whatNext_strs[t->what_next]);
669         continue;
670     }
671 #endif
672
673     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674     ASSERT(t->cap == cap);
675
676     // ----------------------------------------------------------------------
677     
678     // Costs for the scheduler are assigned to CCS_SYSTEM
679     stopHeapProfTimer();
680 #if defined(PROFILING)
681     CCCS = CCS_SYSTEM;
682 #endif
683     
684     schedulePostRunThread();
685
686     ready_to_gc = rtsFalse;
687
688     switch (ret) {
689     case HeapOverflow:
690         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691         break;
692
693     case StackOverflow:
694         scheduleHandleStackOverflow(cap,task,t);
695         break;
696
697     case ThreadYielding:
698         if (scheduleHandleYield(cap, t, prev_what_next)) {
699             // shortcut for switching between compiler/interpreter:
700             goto run_thread; 
701         }
702         break;
703
704     case ThreadBlocked:
705         scheduleHandleThreadBlocked(t);
706         break;
707
708     case ThreadFinished:
709         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711         break;
712
713     default:
714       barf("schedule: invalid thread return code %d", (int)ret);
715     }
716
717     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718       cap = scheduleDoGC(cap,task,rtsFalse);
719     }
720   } /* end of while() */
721 }
722
723 /* ----------------------------------------------------------------------------
724  * Setting up the scheduler loop
725  * ------------------------------------------------------------------------- */
726
727 static void
728 schedulePreLoop(void)
729 {
730 #if defined(GRAN) 
731     /* set up first event to get things going */
732     /* ToDo: assign costs for system setup and init MainTSO ! */
733     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
734               ContinueThread, 
735               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
736     
737     debugTrace (DEBUG_gran,
738                 "GRAN: Init CurrentTSO (in schedule) = %p", 
739                 CurrentTSO);
740     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
741     
742     if (RtsFlags.GranFlags.Light) {
743         /* Save current time; GranSim Light only */
744         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
745     }      
746 #endif
747 }
748
749 /* -----------------------------------------------------------------------------
750  * schedulePushWork()
751  *
752  * Push work to other Capabilities if we have some.
753  * -------------------------------------------------------------------------- */
754
755 #if defined(THREADED_RTS)
756 static void
757 schedulePushWork(Capability *cap USED_IF_THREADS, 
758                  Task *task      USED_IF_THREADS)
759 {
760     Capability *free_caps[n_capabilities], *cap0;
761     nat i, n_free_caps;
762
763     // migration can be turned off with +RTS -qg
764     if (!RtsFlags.ParFlags.migrate) return;
765
766     // Check whether we have more threads on our run queue, or sparks
767     // in our pool, that we could hand to another Capability.
768     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
769         && sparkPoolSizeCap(cap) < 2) {
770         return;
771     }
772
773     // First grab as many free Capabilities as we can.
774     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
775         cap0 = &capabilities[i];
776         if (cap != cap0 && tryGrabCapability(cap0,task)) {
777             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
778                 // it already has some work, we just grabbed it at 
779                 // the wrong moment.  Or maybe it's deadlocked!
780                 releaseCapability(cap0);
781             } else {
782                 free_caps[n_free_caps++] = cap0;
783             }
784         }
785     }
786
787     // we now have n_free_caps free capabilities stashed in
788     // free_caps[].  Share our run queue equally with them.  This is
789     // probably the simplest thing we could do; improvements we might
790     // want to do include:
791     //
792     //   - giving high priority to moving relatively new threads, on 
793     //     the gournds that they haven't had time to build up a
794     //     working set in the cache on this CPU/Capability.
795     //
796     //   - giving low priority to moving long-lived threads
797
798     if (n_free_caps > 0) {
799         StgTSO *prev, *t, *next;
800         rtsBool pushed_to_all;
801
802         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
803
804         i = 0;
805         pushed_to_all = rtsFalse;
806
807         if (cap->run_queue_hd != END_TSO_QUEUE) {
808             prev = cap->run_queue_hd;
809             t = prev->link;
810             prev->link = END_TSO_QUEUE;
811             for (; t != END_TSO_QUEUE; t = next) {
812                 next = t->link;
813                 t->link = END_TSO_QUEUE;
814                 if (t->what_next == ThreadRelocated
815                     || t->bound == task // don't move my bound thread
816                     || tsoLocked(t)) {  // don't move a locked thread
817                     prev->link = t;
818                     prev = t;
819                 } else if (i == n_free_caps) {
820                     pushed_to_all = rtsTrue;
821                     i = 0;
822                     // keep one for us
823                     prev->link = t;
824                     prev = t;
825                 } else {
826                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
827                     appendToRunQueue(free_caps[i],t);
828                     if (t->bound) { t->bound->cap = free_caps[i]; }
829                     t->cap = free_caps[i];
830                     i++;
831                 }
832             }
833             cap->run_queue_tl = prev;
834         }
835
836         // If there are some free capabilities that we didn't push any
837         // threads to, then try to push a spark to each one.
838         if (!pushed_to_all) {
839             StgClosure *spark;
840             // i is the next free capability to push to
841             for (; i < n_free_caps; i++) {
842                 if (emptySparkPoolCap(free_caps[i])) {
843                     spark = findSpark(cap);
844                     if (spark != NULL) {
845                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
846                         newSpark(&(free_caps[i]->r), spark);
847                     }
848                 }
849             }
850         }
851
852         // release the capabilities
853         for (i = 0; i < n_free_caps; i++) {
854             task->cap = free_caps[i];
855             releaseCapability(free_caps[i]);
856         }
857     }
858     task->cap = cap; // reset to point to our Capability.
859 }
860 #endif
861
862 /* ----------------------------------------------------------------------------
863  * Start any pending signal handlers
864  * ------------------------------------------------------------------------- */
865
866 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
867 static void
868 scheduleStartSignalHandlers(Capability *cap)
869 {
870     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
871         // safe outside the lock
872         startSignalHandlers(cap);
873     }
874 }
875 #else
876 static void
877 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
878 {
879 }
880 #endif
881
882 /* ----------------------------------------------------------------------------
883  * Check for blocked threads that can be woken up.
884  * ------------------------------------------------------------------------- */
885
886 static void
887 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
888 {
889 #if !defined(THREADED_RTS)
890     //
891     // Check whether any waiting threads need to be woken up.  If the
892     // run queue is empty, and there are no other tasks running, we
893     // can wait indefinitely for something to happen.
894     //
895     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
896     {
897         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
898     }
899 #endif
900 }
901
902
903 /* ----------------------------------------------------------------------------
904  * Check for threads woken up by other Capabilities
905  * ------------------------------------------------------------------------- */
906
907 static void
908 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
909 {
910 #if defined(THREADED_RTS)
911     // Any threads that were woken up by other Capabilities get
912     // appended to our run queue.
913     if (!emptyWakeupQueue(cap)) {
914         ACQUIRE_LOCK(&cap->lock);
915         if (emptyRunQueue(cap)) {
916             cap->run_queue_hd = cap->wakeup_queue_hd;
917             cap->run_queue_tl = cap->wakeup_queue_tl;
918         } else {
919             cap->run_queue_tl->link = cap->wakeup_queue_hd;
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         }
922         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
923         RELEASE_LOCK(&cap->lock);
924     }
925 #endif
926 }
927
928 /* ----------------------------------------------------------------------------
929  * Check for threads blocked on BLACKHOLEs that can be woken up
930  * ------------------------------------------------------------------------- */
931 static void
932 scheduleCheckBlackHoles (Capability *cap)
933 {
934     if ( blackholes_need_checking ) // check without the lock first
935     {
936         ACQUIRE_LOCK(&sched_mutex);
937         if ( blackholes_need_checking ) {
938             checkBlackHoles(cap);
939             blackholes_need_checking = rtsFalse;
940         }
941         RELEASE_LOCK(&sched_mutex);
942     }
943 }
944
945 /* ----------------------------------------------------------------------------
946  * Detect deadlock conditions and attempt to resolve them.
947  * ------------------------------------------------------------------------- */
948
949 static void
950 scheduleDetectDeadlock (Capability *cap, Task *task)
951 {
952
953 #if defined(PARALLEL_HASKELL)
954     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
955     return;
956 #endif
957
958     /* 
959      * Detect deadlock: when we have no threads to run, there are no
960      * threads blocked, waiting for I/O, or sleeping, and all the
961      * other tasks are waiting for work, we must have a deadlock of
962      * some description.
963      */
964     if ( emptyThreadQueues(cap) )
965     {
966 #if defined(THREADED_RTS)
967         /* 
968          * In the threaded RTS, we only check for deadlock if there
969          * has been no activity in a complete timeslice.  This means
970          * we won't eagerly start a full GC just because we don't have
971          * any threads to run currently.
972          */
973         if (recent_activity != ACTIVITY_INACTIVE) return;
974 #endif
975
976         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
977
978         // Garbage collection can release some new threads due to
979         // either (a) finalizers or (b) threads resurrected because
980         // they are unreachable and will therefore be sent an
981         // exception.  Any threads thus released will be immediately
982         // runnable.
983         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
984
985         recent_activity = ACTIVITY_DONE_GC;
986         // disable timer signals (see #1623)
987         stopTimer();
988         
989         if ( !emptyRunQueue(cap) ) return;
990
991 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
992         /* If we have user-installed signal handlers, then wait
993          * for signals to arrive rather then bombing out with a
994          * deadlock.
995          */
996         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
997             debugTrace(DEBUG_sched,
998                        "still deadlocked, waiting for signals...");
999
1000             awaitUserSignals();
1001
1002             if (signals_pending()) {
1003                 startSignalHandlers(cap);
1004             }
1005
1006             // either we have threads to run, or we were interrupted:
1007             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1008         }
1009 #endif
1010
1011 #if !defined(THREADED_RTS)
1012         /* Probably a real deadlock.  Send the current main thread the
1013          * Deadlock exception.
1014          */
1015         if (task->tso) {
1016             switch (task->tso->why_blocked) {
1017             case BlockedOnSTM:
1018             case BlockedOnBlackHole:
1019             case BlockedOnException:
1020             case BlockedOnMVar:
1021                 throwToSingleThreaded(cap, task->tso, 
1022                                       (StgClosure *)NonTermination_closure);
1023                 return;
1024             default:
1025                 barf("deadlock: main thread blocked in a strange way");
1026             }
1027         }
1028         return;
1029 #endif
1030     }
1031 }
1032
1033 /* ----------------------------------------------------------------------------
1034  * Process an event (GRAN only)
1035  * ------------------------------------------------------------------------- */
1036
1037 #if defined(GRAN)
1038 static StgTSO *
1039 scheduleProcessEvent(rtsEvent *event)
1040 {
1041     StgTSO *t;
1042
1043     if (RtsFlags.GranFlags.Light)
1044       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1045
1046     /* adjust time based on time-stamp */
1047     if (event->time > CurrentTime[CurrentProc] &&
1048         event->evttype != ContinueThread)
1049       CurrentTime[CurrentProc] = event->time;
1050     
1051     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1052     if (!RtsFlags.GranFlags.Light)
1053       handleIdlePEs();
1054
1055     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1056
1057     /* main event dispatcher in GranSim */
1058     switch (event->evttype) {
1059       /* Should just be continuing execution */
1060     case ContinueThread:
1061       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1062       /* ToDo: check assertion
1063       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1064              run_queue_hd != END_TSO_QUEUE);
1065       */
1066       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1067       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1068           procStatus[CurrentProc]==Fetching) {
1069         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1070               CurrentTSO->id, CurrentTSO, CurrentProc);
1071         goto next_thread;
1072       } 
1073       /* Ignore ContinueThreads for completed threads */
1074       if (CurrentTSO->what_next == ThreadComplete) {
1075         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1076               CurrentTSO->id, CurrentTSO, CurrentProc);
1077         goto next_thread;
1078       } 
1079       /* Ignore ContinueThreads for threads that are being migrated */
1080       if (PROCS(CurrentTSO)==Nowhere) { 
1081         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1082               CurrentTSO->id, CurrentTSO, CurrentProc);
1083         goto next_thread;
1084       }
1085       /* The thread should be at the beginning of the run queue */
1086       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1087         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1088               CurrentTSO->id, CurrentTSO, CurrentProc);
1089         break; // run the thread anyway
1090       }
1091       /*
1092       new_event(proc, proc, CurrentTime[proc],
1093                 FindWork,
1094                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1095       goto next_thread; 
1096       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1097       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1098
1099     case FetchNode:
1100       do_the_fetchnode(event);
1101       goto next_thread;             /* handle next event in event queue  */
1102       
1103     case GlobalBlock:
1104       do_the_globalblock(event);
1105       goto next_thread;             /* handle next event in event queue  */
1106       
1107     case FetchReply:
1108       do_the_fetchreply(event);
1109       goto next_thread;             /* handle next event in event queue  */
1110       
1111     case UnblockThread:   /* Move from the blocked queue to the tail of */
1112       do_the_unblock(event);
1113       goto next_thread;             /* handle next event in event queue  */
1114       
1115     case ResumeThread:  /* Move from the blocked queue to the tail of */
1116       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1117       event->tso->gran.blocktime += 
1118         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1119       do_the_startthread(event);
1120       goto next_thread;             /* handle next event in event queue  */
1121       
1122     case StartThread:
1123       do_the_startthread(event);
1124       goto next_thread;             /* handle next event in event queue  */
1125       
1126     case MoveThread:
1127       do_the_movethread(event);
1128       goto next_thread;             /* handle next event in event queue  */
1129       
1130     case MoveSpark:
1131       do_the_movespark(event);
1132       goto next_thread;             /* handle next event in event queue  */
1133       
1134     case FindWork:
1135       do_the_findwork(event);
1136       goto next_thread;             /* handle next event in event queue  */
1137       
1138     default:
1139       barf("Illegal event type %u\n", event->evttype);
1140     }  /* switch */
1141     
1142     /* This point was scheduler_loop in the old RTS */
1143
1144     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1145
1146     TimeOfLastEvent = CurrentTime[CurrentProc];
1147     TimeOfNextEvent = get_time_of_next_event();
1148     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1149     // CurrentTSO = ThreadQueueHd;
1150
1151     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1152                          TimeOfNextEvent));
1153
1154     if (RtsFlags.GranFlags.Light) 
1155       GranSimLight_leave_system(event, &ActiveTSO); 
1156
1157     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1158
1159     IF_DEBUG(gran, 
1160              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1161
1162     /* in a GranSim setup the TSO stays on the run queue */
1163     t = CurrentTSO;
1164     /* Take a thread from the run queue. */
1165     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1166
1167     IF_DEBUG(gran, 
1168              debugBelch("GRAN: About to run current thread, which is\n");
1169              G_TSO(t,5));
1170
1171     context_switch = 0; // turned on via GranYield, checking events and time slice
1172
1173     IF_DEBUG(gran, 
1174              DumpGranEvent(GR_SCHEDULE, t));
1175
1176     procStatus[CurrentProc] = Busy;
1177 }
1178 #endif // GRAN
1179
1180 /* ----------------------------------------------------------------------------
1181  * Send pending messages (PARALLEL_HASKELL only)
1182  * ------------------------------------------------------------------------- */
1183
1184 #if defined(PARALLEL_HASKELL)
1185 static StgTSO *
1186 scheduleSendPendingMessages(void)
1187 {
1188     StgSparkPool *pool;
1189     rtsSpark spark;
1190     StgTSO *t;
1191
1192 # if defined(PAR) // global Mem.Mgmt., omit for now
1193     if (PendingFetches != END_BF_QUEUE) {
1194         processFetches();
1195     }
1196 # endif
1197     
1198     if (RtsFlags.ParFlags.BufferTime) {
1199         // if we use message buffering, we must send away all message
1200         // packets which have become too old...
1201         sendOldBuffers(); 
1202     }
1203 }
1204 #endif
1205
1206 /* ----------------------------------------------------------------------------
1207  * Activate spark threads (PARALLEL_HASKELL only)
1208  * ------------------------------------------------------------------------- */
1209
1210 #if defined(PARALLEL_HASKELL)
1211 static void
1212 scheduleActivateSpark(void)
1213 {
1214 #if defined(SPARKS)
1215   ASSERT(emptyRunQueue());
1216 /* We get here if the run queue is empty and want some work.
1217    We try to turn a spark into a thread, and add it to the run queue,
1218    from where it will be picked up in the next iteration of the scheduler
1219    loop.
1220 */
1221
1222       /* :-[  no local threads => look out for local sparks */
1223       /* the spark pool for the current PE */
1224       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1225       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1226           pool->hd < pool->tl) {
1227         /* 
1228          * ToDo: add GC code check that we really have enough heap afterwards!!
1229          * Old comment:
1230          * If we're here (no runnable threads) and we have pending
1231          * sparks, we must have a space problem.  Get enough space
1232          * to turn one of those pending sparks into a
1233          * thread... 
1234          */
1235
1236         spark = findSpark(rtsFalse);            /* get a spark */
1237         if (spark != (rtsSpark) NULL) {
1238           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1239           IF_PAR_DEBUG(fish, // schedule,
1240                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1241                              tso->id, tso, advisory_thread_count));
1242
1243           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1244             IF_PAR_DEBUG(fish, // schedule,
1245                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1246                             spark));
1247             return rtsFalse; /* failed to generate a thread */
1248           }                  /* otherwise fall through & pick-up new tso */
1249         } else {
1250           IF_PAR_DEBUG(fish, // schedule,
1251                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1252                              spark_queue_len(pool)));
1253           return rtsFalse;  /* failed to generate a thread */
1254         }
1255         return rtsTrue;  /* success in generating a thread */
1256   } else { /* no more threads permitted or pool empty */
1257     return rtsFalse;  /* failed to generateThread */
1258   }
1259 #else
1260   tso = NULL; // avoid compiler warning only
1261   return rtsFalse;  /* dummy in non-PAR setup */
1262 #endif // SPARKS
1263 }
1264 #endif // PARALLEL_HASKELL
1265
1266 /* ----------------------------------------------------------------------------
1267  * Get work from a remote node (PARALLEL_HASKELL only)
1268  * ------------------------------------------------------------------------- */
1269     
1270 #if defined(PARALLEL_HASKELL)
1271 static rtsBool
1272 scheduleGetRemoteWork(rtsBool *receivedFinish)
1273 {
1274   ASSERT(emptyRunQueue());
1275
1276   if (RtsFlags.ParFlags.BufferTime) {
1277         IF_PAR_DEBUG(verbose, 
1278                 debugBelch("...send all pending data,"));
1279         {
1280           nat i;
1281           for (i=1; i<=nPEs; i++)
1282             sendImmediately(i); // send all messages away immediately
1283         }
1284   }
1285 # ifndef SPARKS
1286         //++EDEN++ idle() , i.e. send all buffers, wait for work
1287         // suppress fishing in EDEN... just look for incoming messages
1288         // (blocking receive)
1289   IF_PAR_DEBUG(verbose, 
1290                debugBelch("...wait for incoming messages...\n"));
1291   *receivedFinish = processMessages(); // blocking receive...
1292
1293         // and reenter scheduling loop after having received something
1294         // (return rtsFalse below)
1295
1296 # else /* activate SPARKS machinery */
1297 /* We get here, if we have no work, tried to activate a local spark, but still
1298    have no work. We try to get a remote spark, by sending a FISH message.
1299    Thread migration should be added here, and triggered when a sequence of 
1300    fishes returns without work. */
1301         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1302
1303       /* =8-[  no local sparks => look for work on other PEs */
1304         /*
1305          * We really have absolutely no work.  Send out a fish
1306          * (there may be some out there already), and wait for
1307          * something to arrive.  We clearly can't run any threads
1308          * until a SCHEDULE or RESUME arrives, and so that's what
1309          * we're hoping to see.  (Of course, we still have to
1310          * respond to other types of messages.)
1311          */
1312         rtsTime now = msTime() /*CURRENT_TIME*/;
1313         IF_PAR_DEBUG(verbose, 
1314                      debugBelch("--  now=%ld\n", now));
1315         IF_PAR_DEBUG(fish, // verbose,
1316              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1317                  (last_fish_arrived_at!=0 &&
1318                   last_fish_arrived_at+delay > now)) {
1319                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1320                      now, last_fish_arrived_at+delay, 
1321                      last_fish_arrived_at,
1322                      delay);
1323              });
1324   
1325         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1326             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1327           if (last_fish_arrived_at==0 ||
1328               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1329             /* outstandingFishes is set in sendFish, processFish;
1330                avoid flooding system with fishes via delay */
1331     next_fish_to_send_at = 0;  
1332   } else {
1333     /* ToDo: this should be done in the main scheduling loop to avoid the
1334              busy wait here; not so bad if fish delay is very small  */
1335     int iq = 0; // DEBUGGING -- HWL
1336     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1337     /* send a fish when ready, but process messages that arrive in the meantime */
1338     do {
1339       if (PacketsWaiting()) {
1340         iq++; // DEBUGGING
1341         *receivedFinish = processMessages();
1342       }
1343       now = msTime();
1344     } while (!*receivedFinish || now<next_fish_to_send_at);
1345     // JB: This means the fish could become obsolete, if we receive
1346     // work. Better check for work again? 
1347     // last line: while (!receivedFinish || !haveWork || now<...)
1348     // next line: if (receivedFinish || haveWork )
1349
1350     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1351       return rtsFalse;  // NB: this will leave scheduler loop
1352                         // immediately after return!
1353                           
1354     IF_PAR_DEBUG(fish, // verbose,
1355                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1356
1357   }
1358
1359     // JB: IMHO, this should all be hidden inside sendFish(...)
1360     /* pe = choosePE(); 
1361        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1362                 NEW_FISH_HUNGER);
1363
1364     // Global statistics: count no. of fishes
1365     if (RtsFlags.ParFlags.ParStats.Global &&
1366          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1367            globalParStats.tot_fish_mess++;
1368            }
1369     */ 
1370
1371   /* delayed fishes must have been sent by now! */
1372   next_fish_to_send_at = 0;  
1373   }
1374       
1375   *receivedFinish = processMessages();
1376 # endif /* SPARKS */
1377
1378  return rtsFalse;
1379  /* NB: this function always returns rtsFalse, meaning the scheduler
1380     loop continues with the next iteration; 
1381     rationale: 
1382       return code means success in finding work; we enter this function
1383       if there is no local work, thus have to send a fish which takes
1384       time until it arrives with work; in the meantime we should process
1385       messages in the main loop;
1386  */
1387 }
1388 #endif // PARALLEL_HASKELL
1389
1390 /* ----------------------------------------------------------------------------
1391  * PAR/GRAN: Report stats & debugging info(?)
1392  * ------------------------------------------------------------------------- */
1393
1394 #if defined(PAR) || defined(GRAN)
1395 static void
1396 scheduleGranParReport(void)
1397 {
1398   ASSERT(run_queue_hd != END_TSO_QUEUE);
1399
1400   /* Take a thread from the run queue, if we have work */
1401   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1402
1403     /* If this TSO has got its outport closed in the meantime, 
1404      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1405      * It has to be marked as TH_DEAD for this purpose.
1406      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1407
1408 JB: TODO: investigate wether state change field could be nuked
1409      entirely and replaced by the normal tso state (whatnext
1410      field). All we want to do is to kill tsos from outside.
1411      */
1412
1413     /* ToDo: write something to the log-file
1414     if (RTSflags.ParFlags.granSimStats && !sameThread)
1415         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1416
1417     CurrentTSO = t;
1418     */
1419     /* the spark pool for the current PE */
1420     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1421
1422     IF_DEBUG(scheduler, 
1423              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1424                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1425
1426     IF_PAR_DEBUG(fish,
1427              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1428                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1429
1430     if (RtsFlags.ParFlags.ParStats.Full && 
1431         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1432         (emitSchedule || // forced emit
1433          (t && LastTSO && t->id != LastTSO->id))) {
1434       /* 
1435          we are running a different TSO, so write a schedule event to log file
1436          NB: If we use fair scheduling we also have to write  a deschedule 
1437              event for LastTSO; with unfair scheduling we know that the
1438              previous tso has blocked whenever we switch to another tso, so
1439              we don't need it in GUM for now
1440       */
1441       IF_PAR_DEBUG(fish, // schedule,
1442                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1443
1444       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1445                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1446       emitSchedule = rtsFalse;
1447     }
1448 }     
1449 #endif
1450
1451 /* ----------------------------------------------------------------------------
1452  * After running a thread...
1453  * ------------------------------------------------------------------------- */
1454
1455 static void
1456 schedulePostRunThread(void)
1457 {
1458 #if defined(PAR)
1459     /* HACK 675: if the last thread didn't yield, make sure to print a 
1460        SCHEDULE event to the log file when StgRunning the next thread, even
1461        if it is the same one as before */
1462     LastTSO = t; 
1463     TimeOfLastYield = CURRENT_TIME;
1464 #endif
1465
1466   /* some statistics gathering in the parallel case */
1467
1468 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1469   switch (ret) {
1470     case HeapOverflow:
1471 # if defined(GRAN)
1472       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1473       globalGranStats.tot_heapover++;
1474 # elif defined(PAR)
1475       globalParStats.tot_heapover++;
1476 # endif
1477       break;
1478
1479      case StackOverflow:
1480 # if defined(GRAN)
1481       IF_DEBUG(gran, 
1482                DumpGranEvent(GR_DESCHEDULE, t));
1483       globalGranStats.tot_stackover++;
1484 # elif defined(PAR)
1485       // IF_DEBUG(par, 
1486       // DumpGranEvent(GR_DESCHEDULE, t);
1487       globalParStats.tot_stackover++;
1488 # endif
1489       break;
1490
1491     case ThreadYielding:
1492 # if defined(GRAN)
1493       IF_DEBUG(gran, 
1494                DumpGranEvent(GR_DESCHEDULE, t));
1495       globalGranStats.tot_yields++;
1496 # elif defined(PAR)
1497       // IF_DEBUG(par, 
1498       // DumpGranEvent(GR_DESCHEDULE, t);
1499       globalParStats.tot_yields++;
1500 # endif
1501       break; 
1502
1503     case ThreadBlocked:
1504 # if defined(GRAN)
1505         debugTrace(DEBUG_sched, 
1506                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1507                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1508                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1509                if (t->block_info.closure!=(StgClosure*)NULL)
1510                  print_bq(t->block_info.closure);
1511                debugBelch("\n"));
1512
1513       // ??? needed; should emit block before
1514       IF_DEBUG(gran, 
1515                DumpGranEvent(GR_DESCHEDULE, t)); 
1516       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1517       /*
1518         ngoq Dogh!
1519       ASSERT(procStatus[CurrentProc]==Busy || 
1520               ((procStatus[CurrentProc]==Fetching) && 
1521               (t->block_info.closure!=(StgClosure*)NULL)));
1522       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1523           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1524             procStatus[CurrentProc]==Fetching)) 
1525         procStatus[CurrentProc] = Idle;
1526       */
1527 # elif defined(PAR)
1528 //++PAR++  blockThread() writes the event (change?)
1529 # endif
1530     break;
1531
1532   case ThreadFinished:
1533     break;
1534
1535   default:
1536     barf("parGlobalStats: unknown return code");
1537     break;
1538     }
1539 #endif
1540 }
1541
1542 /* -----------------------------------------------------------------------------
1543  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1544  * -------------------------------------------------------------------------- */
1545
1546 static rtsBool
1547 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1548 {
1549     // did the task ask for a large block?
1550     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1551         // if so, get one and push it on the front of the nursery.
1552         bdescr *bd;
1553         lnat blocks;
1554         
1555         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1556         
1557         debugTrace(DEBUG_sched,
1558                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1559                    (long)t->id, whatNext_strs[t->what_next], blocks);
1560     
1561         // don't do this if the nursery is (nearly) full, we'll GC first.
1562         if (cap->r.rCurrentNursery->link != NULL ||
1563             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1564                                                // if the nursery has only one block.
1565             
1566             ACQUIRE_SM_LOCK
1567             bd = allocGroup( blocks );
1568             RELEASE_SM_LOCK
1569             cap->r.rNursery->n_blocks += blocks;
1570             
1571             // link the new group into the list
1572             bd->link = cap->r.rCurrentNursery;
1573             bd->u.back = cap->r.rCurrentNursery->u.back;
1574             if (cap->r.rCurrentNursery->u.back != NULL) {
1575                 cap->r.rCurrentNursery->u.back->link = bd;
1576             } else {
1577 #if !defined(THREADED_RTS)
1578                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1579                        g0s0 == cap->r.rNursery);
1580 #endif
1581                 cap->r.rNursery->blocks = bd;
1582             }             
1583             cap->r.rCurrentNursery->u.back = bd;
1584             
1585             // initialise it as a nursery block.  We initialise the
1586             // step, gen_no, and flags field of *every* sub-block in
1587             // this large block, because this is easier than making
1588             // sure that we always find the block head of a large
1589             // block whenever we call Bdescr() (eg. evacuate() and
1590             // isAlive() in the GC would both have to do this, at
1591             // least).
1592             { 
1593                 bdescr *x;
1594                 for (x = bd; x < bd + blocks; x++) {
1595                     x->step = cap->r.rNursery;
1596                     x->gen_no = 0;
1597                     x->flags = 0;
1598                 }
1599             }
1600             
1601             // This assert can be a killer if the app is doing lots
1602             // of large block allocations.
1603             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1604             
1605             // now update the nursery to point to the new block
1606             cap->r.rCurrentNursery = bd;
1607             
1608             // we might be unlucky and have another thread get on the
1609             // run queue before us and steal the large block, but in that
1610             // case the thread will just end up requesting another large
1611             // block.
1612             pushOnRunQueue(cap,t);
1613             return rtsFalse;  /* not actually GC'ing */
1614         }
1615     }
1616     
1617     debugTrace(DEBUG_sched,
1618                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1619                (long)t->id, whatNext_strs[t->what_next]);
1620
1621 #if defined(GRAN)
1622     ASSERT(!is_on_queue(t,CurrentProc));
1623 #elif defined(PARALLEL_HASKELL)
1624     /* Currently we emit a DESCHEDULE event before GC in GUM.
1625        ToDo: either add separate event to distinguish SYSTEM time from rest
1626        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1627     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1628         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1629                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1630         emitSchedule = rtsTrue;
1631     }
1632 #endif
1633       
1634     if (context_switch) {
1635         // Sometimes we miss a context switch, e.g. when calling
1636         // primitives in a tight loop, MAYBE_GC() doesn't check the
1637         // context switch flag, and we end up waiting for a GC.
1638         // See #1984, and concurrent/should_run/1984
1639         context_switch = 0;
1640         addToRunQueue(cap,t);
1641     } else {
1642         pushOnRunQueue(cap,t);
1643     }
1644     return rtsTrue;
1645     /* actual GC is done at the end of the while loop in schedule() */
1646 }
1647
1648 /* -----------------------------------------------------------------------------
1649  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1650  * -------------------------------------------------------------------------- */
1651
1652 static void
1653 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1654 {
1655     debugTrace (DEBUG_sched,
1656                 "--<< thread %ld (%s) stopped, StackOverflow", 
1657                 (long)t->id, whatNext_strs[t->what_next]);
1658
1659     /* just adjust the stack for this thread, then pop it back
1660      * on the run queue.
1661      */
1662     { 
1663         /* enlarge the stack */
1664         StgTSO *new_t = threadStackOverflow(cap, t);
1665         
1666         /* The TSO attached to this Task may have moved, so update the
1667          * pointer to it.
1668          */
1669         if (task->tso == t) {
1670             task->tso = new_t;
1671         }
1672         pushOnRunQueue(cap,new_t);
1673     }
1674 }
1675
1676 /* -----------------------------------------------------------------------------
1677  * Handle a thread that returned to the scheduler with ThreadYielding
1678  * -------------------------------------------------------------------------- */
1679
1680 static rtsBool
1681 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1682 {
1683     // Reset the context switch flag.  We don't do this just before
1684     // running the thread, because that would mean we would lose ticks
1685     // during GC, which can lead to unfair scheduling (a thread hogs
1686     // the CPU because the tick always arrives during GC).  This way
1687     // penalises threads that do a lot of allocation, but that seems
1688     // better than the alternative.
1689     context_switch = 0;
1690     
1691     /* put the thread back on the run queue.  Then, if we're ready to
1692      * GC, check whether this is the last task to stop.  If so, wake
1693      * up the GC thread.  getThread will block during a GC until the
1694      * GC is finished.
1695      */
1696 #ifdef DEBUG
1697     if (t->what_next != prev_what_next) {
1698         debugTrace(DEBUG_sched,
1699                    "--<< thread %ld (%s) stopped to switch evaluators", 
1700                    (long)t->id, whatNext_strs[t->what_next]);
1701     } else {
1702         debugTrace(DEBUG_sched,
1703                    "--<< thread %ld (%s) stopped, yielding",
1704                    (long)t->id, whatNext_strs[t->what_next]);
1705     }
1706 #endif
1707     
1708     IF_DEBUG(sanity,
1709              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1710              checkTSO(t));
1711     ASSERT(t->link == END_TSO_QUEUE);
1712     
1713     // Shortcut if we're just switching evaluators: don't bother
1714     // doing stack squeezing (which can be expensive), just run the
1715     // thread.
1716     if (t->what_next != prev_what_next) {
1717         return rtsTrue;
1718     }
1719     
1720 #if defined(GRAN)
1721     ASSERT(!is_on_queue(t,CurrentProc));
1722       
1723     IF_DEBUG(sanity,
1724              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1725              checkThreadQsSanity(rtsTrue));
1726
1727 #endif
1728
1729     addToRunQueue(cap,t);
1730
1731 #if defined(GRAN)
1732     /* add a ContinueThread event to actually process the thread */
1733     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1734               ContinueThread,
1735               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1736     IF_GRAN_DEBUG(bq, 
1737                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1738                   G_EVENTQ(0);
1739                   G_CURR_THREADQ(0));
1740 #endif
1741     return rtsFalse;
1742 }
1743
1744 /* -----------------------------------------------------------------------------
1745  * Handle a thread that returned to the scheduler with ThreadBlocked
1746  * -------------------------------------------------------------------------- */
1747
1748 static void
1749 scheduleHandleThreadBlocked( StgTSO *t
1750 #if !defined(GRAN) && !defined(DEBUG)
1751     STG_UNUSED
1752 #endif
1753     )
1754 {
1755 #if defined(GRAN)
1756     IF_DEBUG(scheduler,
1757              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1758                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1759              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1760     
1761     // ??? needed; should emit block before
1762     IF_DEBUG(gran, 
1763              DumpGranEvent(GR_DESCHEDULE, t)); 
1764     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1765     /*
1766       ngoq Dogh!
1767       ASSERT(procStatus[CurrentProc]==Busy || 
1768       ((procStatus[CurrentProc]==Fetching) && 
1769       (t->block_info.closure!=(StgClosure*)NULL)));
1770       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1771       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1772       procStatus[CurrentProc]==Fetching)) 
1773       procStatus[CurrentProc] = Idle;
1774     */
1775 #elif defined(PAR)
1776     IF_DEBUG(scheduler,
1777              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1778                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1779     IF_PAR_DEBUG(bq,
1780                  
1781                  if (t->block_info.closure!=(StgClosure*)NULL) 
1782                  print_bq(t->block_info.closure));
1783     
1784     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1785     blockThread(t);
1786     
1787     /* whatever we schedule next, we must log that schedule */
1788     emitSchedule = rtsTrue;
1789     
1790 #else /* !GRAN */
1791
1792       // We don't need to do anything.  The thread is blocked, and it
1793       // has tidied up its stack and placed itself on whatever queue
1794       // it needs to be on.
1795
1796     // ASSERT(t->why_blocked != NotBlocked);
1797     // Not true: for example,
1798     //    - in THREADED_RTS, the thread may already have been woken
1799     //      up by another Capability.  This actually happens: try
1800     //      conc023 +RTS -N2.
1801     //    - the thread may have woken itself up already, because
1802     //      threadPaused() might have raised a blocked throwTo
1803     //      exception, see maybePerformBlockedException().
1804
1805 #ifdef DEBUG
1806     if (traceClass(DEBUG_sched)) {
1807         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1808                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1809         printThreadBlockage(t);
1810         debugTraceEnd();
1811     }
1812 #endif
1813     
1814     /* Only for dumping event to log file 
1815        ToDo: do I need this in GranSim, too?
1816        blockThread(t);
1817     */
1818 #endif
1819 }
1820
1821 /* -----------------------------------------------------------------------------
1822  * Handle a thread that returned to the scheduler with ThreadFinished
1823  * -------------------------------------------------------------------------- */
1824
1825 static rtsBool
1826 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1827 {
1828     /* Need to check whether this was a main thread, and if so,
1829      * return with the return value.
1830      *
1831      * We also end up here if the thread kills itself with an
1832      * uncaught exception, see Exception.cmm.
1833      */
1834     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1835                (unsigned long)t->id, whatNext_strs[t->what_next]);
1836
1837 #if defined(GRAN)
1838       endThread(t, CurrentProc); // clean-up the thread
1839 #elif defined(PARALLEL_HASKELL)
1840       /* For now all are advisory -- HWL */
1841       //if(t->priority==AdvisoryPriority) ??
1842       advisory_thread_count--; // JB: Caution with this counter, buggy!
1843       
1844 # if defined(DIST)
1845       if(t->dist.priority==RevalPriority)
1846         FinishReval(t);
1847 # endif
1848     
1849 # if defined(EDENOLD)
1850       // the thread could still have an outport... (BUG)
1851       if (t->eden.outport != -1) {
1852       // delete the outport for the tso which has finished...
1853         IF_PAR_DEBUG(eden_ports,
1854                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1855                               t->eden.outport, t->id));
1856         deleteOPT(t);
1857       }
1858       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1859       if (t->eden.epid != -1) {
1860         IF_PAR_DEBUG(eden_ports,
1861                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1862                            t->id, t->eden.epid));
1863         removeTSOfromProcess(t);
1864       }
1865 # endif 
1866
1867 # if defined(PAR)
1868       if (RtsFlags.ParFlags.ParStats.Full &&
1869           !RtsFlags.ParFlags.ParStats.Suppressed) 
1870         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1871
1872       //  t->par only contains statistics: left out for now...
1873       IF_PAR_DEBUG(fish,
1874                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1875                               t->id,t,t->par.sparkname));
1876 # endif
1877 #endif // PARALLEL_HASKELL
1878
1879       //
1880       // Check whether the thread that just completed was a bound
1881       // thread, and if so return with the result.  
1882       //
1883       // There is an assumption here that all thread completion goes
1884       // through this point; we need to make sure that if a thread
1885       // ends up in the ThreadKilled state, that it stays on the run
1886       // queue so it can be dealt with here.
1887       //
1888
1889       if (t->bound) {
1890
1891           if (t->bound != task) {
1892 #if !defined(THREADED_RTS)
1893               // Must be a bound thread that is not the topmost one.  Leave
1894               // it on the run queue until the stack has unwound to the
1895               // point where we can deal with this.  Leaving it on the run
1896               // queue also ensures that the garbage collector knows about
1897               // this thread and its return value (it gets dropped from the
1898               // all_threads list so there's no other way to find it).
1899               appendToRunQueue(cap,t);
1900               return rtsFalse;
1901 #else
1902               // this cannot happen in the threaded RTS, because a
1903               // bound thread can only be run by the appropriate Task.
1904               barf("finished bound thread that isn't mine");
1905 #endif
1906           }
1907
1908           ASSERT(task->tso == t);
1909
1910           if (t->what_next == ThreadComplete) {
1911               if (task->ret) {
1912                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1913                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1914               }
1915               task->stat = Success;
1916           } else {
1917               if (task->ret) {
1918                   *(task->ret) = NULL;
1919               }
1920               if (sched_state >= SCHED_INTERRUPTING) {
1921                   task->stat = Interrupted;
1922               } else {
1923                   task->stat = Killed;
1924               }
1925           }
1926 #ifdef DEBUG
1927           removeThreadLabel((StgWord)task->tso->id);
1928 #endif
1929           return rtsTrue; // tells schedule() to return
1930       }
1931
1932       return rtsFalse;
1933 }
1934
1935 /* -----------------------------------------------------------------------------
1936  * Perform a heap census
1937  * -------------------------------------------------------------------------- */
1938
1939 static rtsBool
1940 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1941 {
1942     // When we have +RTS -i0 and we're heap profiling, do a census at
1943     // every GC.  This lets us get repeatable runs for debugging.
1944     if (performHeapProfile ||
1945         (RtsFlags.ProfFlags.profileInterval==0 &&
1946          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1947         return rtsTrue;
1948     } else {
1949         return rtsFalse;
1950     }
1951 }
1952
1953 /* -----------------------------------------------------------------------------
1954  * Perform a garbage collection if necessary
1955  * -------------------------------------------------------------------------- */
1956
1957 static Capability *
1958 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1959 {
1960     StgTSO *t;
1961     rtsBool heap_census;
1962 #ifdef THREADED_RTS
1963     static volatile StgWord waiting_for_gc;
1964     rtsBool was_waiting;
1965     nat i;
1966 #endif
1967
1968 #ifdef THREADED_RTS
1969     // In order to GC, there must be no threads running Haskell code.
1970     // Therefore, the GC thread needs to hold *all* the capabilities,
1971     // and release them after the GC has completed.  
1972     //
1973     // This seems to be the simplest way: previous attempts involved
1974     // making all the threads with capabilities give up their
1975     // capabilities and sleep except for the *last* one, which
1976     // actually did the GC.  But it's quite hard to arrange for all
1977     // the other tasks to sleep and stay asleep.
1978     //
1979         
1980     was_waiting = cas(&waiting_for_gc, 0, 1);
1981     if (was_waiting) {
1982         do {
1983             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1984             if (cap) yieldCapability(&cap,task);
1985         } while (waiting_for_gc);
1986         return cap;  // NOTE: task->cap might have changed here
1987     }
1988
1989     for (i=0; i < n_capabilities; i++) {
1990         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1991         if (cap != &capabilities[i]) {
1992             Capability *pcap = &capabilities[i];
1993             // we better hope this task doesn't get migrated to
1994             // another Capability while we're waiting for this one.
1995             // It won't, because load balancing happens while we have
1996             // all the Capabilities, but even so it's a slightly
1997             // unsavoury invariant.
1998             task->cap = pcap;
1999             context_switch = 1;
2000             waitForReturnCapability(&pcap, task);
2001             if (pcap != &capabilities[i]) {
2002                 barf("scheduleDoGC: got the wrong capability");
2003             }
2004         }
2005     }
2006
2007     waiting_for_gc = rtsFalse;
2008 #endif
2009
2010     /* Kick any transactions which are invalid back to their
2011      * atomically frames.  When next scheduled they will try to
2012      * commit, this commit will fail and they will retry.
2013      */
2014     { 
2015         StgTSO *next;
2016
2017         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2018             if (t->what_next == ThreadRelocated) {
2019                 next = t->link;
2020             } else {
2021                 next = t->global_link;
2022                 
2023                 // This is a good place to check for blocked
2024                 // exceptions.  It might be the case that a thread is
2025                 // blocked on delivering an exception to a thread that
2026                 // is also blocked - we try to ensure that this
2027                 // doesn't happen in throwTo(), but it's too hard (or
2028                 // impossible) to close all the race holes, so we
2029                 // accept that some might get through and deal with
2030                 // them here.  A GC will always happen at some point,
2031                 // even if the system is otherwise deadlocked.
2032                 maybePerformBlockedException (&capabilities[0], t);
2033
2034                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2035                     if (!stmValidateNestOfTransactions (t -> trec)) {
2036                         debugTrace(DEBUG_sched | DEBUG_stm,
2037                                    "trec %p found wasting its time", t);
2038                         
2039                         // strip the stack back to the
2040                         // ATOMICALLY_FRAME, aborting the (nested)
2041                         // transaction, and saving the stack of any
2042                         // partially-evaluated thunks on the heap.
2043                         throwToSingleThreaded_(&capabilities[0], t, 
2044                                                NULL, rtsTrue, NULL);
2045                         
2046 #ifdef REG_R1
2047                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2048 #endif
2049                     }
2050                 }
2051             }
2052         }
2053     }
2054     
2055     // so this happens periodically:
2056     if (cap) scheduleCheckBlackHoles(cap);
2057     
2058     IF_DEBUG(scheduler, printAllThreads());
2059
2060     /*
2061      * We now have all the capabilities; if we're in an interrupting
2062      * state, then we should take the opportunity to delete all the
2063      * threads in the system.
2064      */
2065     if (sched_state >= SCHED_INTERRUPTING) {
2066         deleteAllThreads(&capabilities[0]);
2067         sched_state = SCHED_SHUTTING_DOWN;
2068     }
2069     
2070     heap_census = scheduleNeedHeapProfile(rtsTrue);
2071
2072     /* everybody back, start the GC.
2073      * Could do it in this thread, or signal a condition var
2074      * to do it in another thread.  Either way, we need to
2075      * broadcast on gc_pending_cond afterward.
2076      */
2077 #if defined(THREADED_RTS)
2078     debugTrace(DEBUG_sched, "doing GC");
2079 #endif
2080     GarbageCollect(force_major || heap_census);
2081     
2082     if (heap_census) {
2083         debugTrace(DEBUG_sched, "performing heap census");
2084         heapCensus();
2085         performHeapProfile = rtsFalse;
2086     }
2087
2088 #if defined(THREADED_RTS)
2089     // release our stash of capabilities.
2090     for (i = 0; i < n_capabilities; i++) {
2091         if (cap != &capabilities[i]) {
2092             task->cap = &capabilities[i];
2093             releaseCapability(&capabilities[i]);
2094         }
2095     }
2096     if (cap) {
2097         task->cap = cap;
2098     } else {
2099         task->cap = NULL;
2100     }
2101 #endif
2102
2103 #if defined(GRAN)
2104     /* add a ContinueThread event to continue execution of current thread */
2105     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2106               ContinueThread,
2107               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2108     IF_GRAN_DEBUG(bq, 
2109                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2110                   G_EVENTQ(0);
2111                   G_CURR_THREADQ(0));
2112 #endif /* GRAN */
2113
2114     return cap;
2115 }
2116
2117 /* ---------------------------------------------------------------------------
2118  * Singleton fork(). Do not copy any running threads.
2119  * ------------------------------------------------------------------------- */
2120
2121 pid_t
2122 forkProcess(HsStablePtr *entry
2123 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2124             STG_UNUSED
2125 #endif
2126            )
2127 {
2128 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2129     Task *task;
2130     pid_t pid;
2131     StgTSO* t,*next;
2132     Capability *cap;
2133     
2134 #if defined(THREADED_RTS)
2135     if (RtsFlags.ParFlags.nNodes > 1) {
2136         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2137         stg_exit(EXIT_FAILURE);
2138     }
2139 #endif
2140
2141     debugTrace(DEBUG_sched, "forking!");
2142     
2143     // ToDo: for SMP, we should probably acquire *all* the capabilities
2144     cap = rts_lock();
2145     
2146     // no funny business: hold locks while we fork, otherwise if some
2147     // other thread is holding a lock when the fork happens, the data
2148     // structure protected by the lock will forever be in an
2149     // inconsistent state in the child.  See also #1391.
2150     ACQUIRE_LOCK(&sched_mutex);
2151     ACQUIRE_LOCK(&cap->lock);
2152     ACQUIRE_LOCK(&cap->running_task->lock);
2153
2154     pid = fork();
2155     
2156     if (pid) { // parent
2157         
2158         RELEASE_LOCK(&sched_mutex);
2159         RELEASE_LOCK(&cap->lock);
2160         RELEASE_LOCK(&cap->running_task->lock);
2161
2162         // just return the pid
2163         rts_unlock(cap);
2164         return pid;
2165         
2166     } else { // child
2167         
2168 #if defined(THREADED_RTS)
2169         initMutex(&sched_mutex);
2170         initMutex(&cap->lock);
2171         initMutex(&cap->running_task->lock);
2172 #endif
2173
2174         // Now, all OS threads except the thread that forked are
2175         // stopped.  We need to stop all Haskell threads, including
2176         // those involved in foreign calls.  Also we need to delete
2177         // all Tasks, because they correspond to OS threads that are
2178         // now gone.
2179
2180         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2181             if (t->what_next == ThreadRelocated) {
2182                 next = t->link;
2183             } else {
2184                 next = t->global_link;
2185                 // don't allow threads to catch the ThreadKilled
2186                 // exception, but we do want to raiseAsync() because these
2187                 // threads may be evaluating thunks that we need later.
2188                 deleteThread_(cap,t);
2189             }
2190         }
2191         
2192         // Empty the run queue.  It seems tempting to let all the
2193         // killed threads stay on the run queue as zombies to be
2194         // cleaned up later, but some of them correspond to bound
2195         // threads for which the corresponding Task does not exist.
2196         cap->run_queue_hd = END_TSO_QUEUE;
2197         cap->run_queue_tl = END_TSO_QUEUE;
2198
2199         // Any suspended C-calling Tasks are no more, their OS threads
2200         // don't exist now:
2201         cap->suspended_ccalling_tasks = NULL;
2202
2203         // Empty the all_threads list.  Otherwise, the garbage
2204         // collector may attempt to resurrect some of these threads.
2205         all_threads = END_TSO_QUEUE;
2206
2207         // Wipe the task list, except the current Task.
2208         ACQUIRE_LOCK(&sched_mutex);
2209         for (task = all_tasks; task != NULL; task=task->all_link) {
2210             if (task != cap->running_task) {
2211 #if defined(THREADED_RTS)
2212                 initMutex(&task->lock); // see #1391
2213 #endif
2214                 discardTask(task);
2215             }
2216         }
2217         RELEASE_LOCK(&sched_mutex);
2218
2219 #if defined(THREADED_RTS)
2220         // Wipe our spare workers list, they no longer exist.  New
2221         // workers will be created if necessary.
2222         cap->spare_workers = NULL;
2223         cap->returning_tasks_hd = NULL;
2224         cap->returning_tasks_tl = NULL;
2225 #endif
2226
2227         // On Unix, all timers are reset in the child, so we need to start
2228         // the timer again.
2229         initTimer();
2230         startTimer();
2231
2232         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2233         rts_checkSchedStatus("forkProcess",cap);
2234         
2235         rts_unlock(cap);
2236         hs_exit();                      // clean up and exit
2237         stg_exit(EXIT_SUCCESS);
2238     }
2239 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2240     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2241     return -1;
2242 #endif
2243 }
2244
2245 /* ---------------------------------------------------------------------------
2246  * Delete all the threads in the system
2247  * ------------------------------------------------------------------------- */
2248    
2249 static void
2250 deleteAllThreads ( Capability *cap )
2251 {
2252     // NOTE: only safe to call if we own all capabilities.
2253
2254     StgTSO* t, *next;
2255     debugTrace(DEBUG_sched,"deleting all threads");
2256     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2257         if (t->what_next == ThreadRelocated) {
2258             next = t->link;
2259         } else {
2260             next = t->global_link;
2261             deleteThread(cap,t);
2262         }
2263     }      
2264
2265     // The run queue now contains a bunch of ThreadKilled threads.  We
2266     // must not throw these away: the main thread(s) will be in there
2267     // somewhere, and the main scheduler loop has to deal with it.
2268     // Also, the run queue is the only thing keeping these threads from
2269     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2270
2271 #if !defined(THREADED_RTS)
2272     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2273     ASSERT(sleeping_queue == END_TSO_QUEUE);
2274 #endif
2275 }
2276
2277 /* -----------------------------------------------------------------------------
2278    Managing the suspended_ccalling_tasks list.
2279    Locks required: sched_mutex
2280    -------------------------------------------------------------------------- */
2281
2282 STATIC_INLINE void
2283 suspendTask (Capability *cap, Task *task)
2284 {
2285     ASSERT(task->next == NULL && task->prev == NULL);
2286     task->next = cap->suspended_ccalling_tasks;
2287     task->prev = NULL;
2288     if (cap->suspended_ccalling_tasks) {
2289         cap->suspended_ccalling_tasks->prev = task;
2290     }
2291     cap->suspended_ccalling_tasks = task;
2292 }
2293
2294 STATIC_INLINE void
2295 recoverSuspendedTask (Capability *cap, Task *task)
2296 {
2297     if (task->prev) {
2298         task->prev->next = task->next;
2299     } else {
2300         ASSERT(cap->suspended_ccalling_tasks == task);
2301         cap->suspended_ccalling_tasks = task->next;
2302     }
2303     if (task->next) {
2304         task->next->prev = task->prev;
2305     }
2306     task->next = task->prev = NULL;
2307 }
2308
2309 /* ---------------------------------------------------------------------------
2310  * Suspending & resuming Haskell threads.
2311  * 
2312  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2313  * its capability before calling the C function.  This allows another
2314  * task to pick up the capability and carry on running Haskell
2315  * threads.  It also means that if the C call blocks, it won't lock
2316  * the whole system.
2317  *
2318  * The Haskell thread making the C call is put to sleep for the
2319  * duration of the call, on the susepended_ccalling_threads queue.  We
2320  * give out a token to the task, which it can use to resume the thread
2321  * on return from the C function.
2322  * ------------------------------------------------------------------------- */
2323    
2324 void *
2325 suspendThread (StgRegTable *reg)
2326 {
2327   Capability *cap;
2328   int saved_errno;
2329   StgTSO *tso;
2330   Task *task;
2331 #if mingw32_HOST_OS
2332   StgWord32 saved_winerror;
2333 #endif
2334
2335   saved_errno = errno;
2336 #if mingw32_HOST_OS
2337   saved_winerror = GetLastError();
2338 #endif
2339
2340   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2341    */
2342   cap = regTableToCapability(reg);
2343
2344   task = cap->running_task;
2345   tso = cap->r.rCurrentTSO;
2346
2347   debugTrace(DEBUG_sched, 
2348              "thread %lu did a safe foreign call", 
2349              (unsigned long)cap->r.rCurrentTSO->id);
2350
2351   // XXX this might not be necessary --SDM
2352   tso->what_next = ThreadRunGHC;
2353
2354   threadPaused(cap,tso);
2355
2356   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2357       tso->why_blocked = BlockedOnCCall;
2358       tso->flags |= TSO_BLOCKEX;
2359       tso->flags &= ~TSO_INTERRUPTIBLE;
2360   } else {
2361       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2362   }
2363
2364   // Hand back capability
2365   task->suspended_tso = tso;
2366
2367   ACQUIRE_LOCK(&cap->lock);
2368
2369   suspendTask(cap,task);
2370   cap->in_haskell = rtsFalse;
2371   releaseCapability_(cap);
2372   
2373   RELEASE_LOCK(&cap->lock);
2374
2375 #if defined(THREADED_RTS)
2376   /* Preparing to leave the RTS, so ensure there's a native thread/task
2377      waiting to take over.
2378   */
2379   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2380 #endif
2381
2382   errno = saved_errno;
2383 #if mingw32_HOST_OS
2384   SetLastError(saved_winerror);
2385 #endif
2386   return task;
2387 }
2388
2389 StgRegTable *
2390 resumeThread (void *task_)
2391 {
2392     StgTSO *tso;
2393     Capability *cap;
2394     Task *task = task_;
2395     int saved_errno;
2396 #if mingw32_HOST_OS
2397     StgWord32 saved_winerror;
2398 #endif
2399
2400     saved_errno = errno;
2401 #if mingw32_HOST_OS
2402     saved_winerror = GetLastError();
2403 #endif
2404
2405     cap = task->cap;
2406     // Wait for permission to re-enter the RTS with the result.
2407     waitForReturnCapability(&cap,task);
2408     // we might be on a different capability now... but if so, our
2409     // entry on the suspended_ccalling_tasks list will also have been
2410     // migrated.
2411
2412     // Remove the thread from the suspended list
2413     recoverSuspendedTask(cap,task);
2414
2415     tso = task->suspended_tso;
2416     task->suspended_tso = NULL;
2417     tso->link = END_TSO_QUEUE;
2418     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2419     
2420     if (tso->why_blocked == BlockedOnCCall) {
2421         awakenBlockedExceptionQueue(cap,tso);
2422         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2423     }
2424     
2425     /* Reset blocking status */
2426     tso->why_blocked  = NotBlocked;
2427     
2428     cap->r.rCurrentTSO = tso;
2429     cap->in_haskell = rtsTrue;
2430     errno = saved_errno;
2431 #if mingw32_HOST_OS
2432     SetLastError(saved_winerror);
2433 #endif
2434
2435     /* We might have GC'd, mark the TSO dirty again */
2436     dirtyTSO(tso);
2437
2438     IF_DEBUG(sanity, checkTSO(tso));
2439
2440     return &cap->r;
2441 }
2442
2443 /* ---------------------------------------------------------------------------
2444  * scheduleThread()
2445  *
2446  * scheduleThread puts a thread on the end  of the runnable queue.
2447  * This will usually be done immediately after a thread is created.
2448  * The caller of scheduleThread must create the thread using e.g.
2449  * createThread and push an appropriate closure
2450  * on this thread's stack before the scheduler is invoked.
2451  * ------------------------------------------------------------------------ */
2452
2453 void
2454 scheduleThread(Capability *cap, StgTSO *tso)
2455 {
2456     // The thread goes at the *end* of the run-queue, to avoid possible
2457     // starvation of any threads already on the queue.
2458     appendToRunQueue(cap,tso);
2459 }
2460
2461 void
2462 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2463 {
2464 #if defined(THREADED_RTS)
2465     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2466                               // move this thread from now on.
2467     cpu %= RtsFlags.ParFlags.nNodes;
2468     if (cpu == cap->no) {
2469         appendToRunQueue(cap,tso);
2470     } else {
2471         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2472     }
2473 #else
2474     appendToRunQueue(cap,tso);
2475 #endif
2476 }
2477
2478 Capability *
2479 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2480 {
2481     Task *task;
2482
2483     // We already created/initialised the Task
2484     task = cap->running_task;
2485
2486     // This TSO is now a bound thread; make the Task and TSO
2487     // point to each other.
2488     tso->bound = task;
2489     tso->cap = cap;
2490
2491     task->tso = tso;
2492     task->ret = ret;
2493     task->stat = NoStatus;
2494
2495     appendToRunQueue(cap,tso);
2496
2497     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2498
2499 #if defined(GRAN)
2500     /* GranSim specific init */
2501     CurrentTSO = m->tso;                // the TSO to run
2502     procStatus[MainProc] = Busy;        // status of main PE
2503     CurrentProc = MainProc;             // PE to run it on
2504 #endif
2505
2506     cap = schedule(cap,task);
2507
2508     ASSERT(task->stat != NoStatus);
2509     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2510
2511     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2512     return cap;
2513 }
2514
2515 /* ----------------------------------------------------------------------------
2516  * Starting Tasks
2517  * ------------------------------------------------------------------------- */
2518
2519 #if defined(THREADED_RTS)
2520 void
2521 workerStart(Task *task)
2522 {
2523     Capability *cap;
2524
2525     // See startWorkerTask().
2526     ACQUIRE_LOCK(&task->lock);
2527     cap = task->cap;
2528     RELEASE_LOCK(&task->lock);
2529
2530     // set the thread-local pointer to the Task:
2531     taskEnter(task);
2532
2533     // schedule() runs without a lock.
2534     cap = schedule(cap,task);
2535
2536     // On exit from schedule(), we have a Capability.
2537     releaseCapability(cap);
2538     workerTaskStop(task);
2539 }
2540 #endif
2541
2542 /* ---------------------------------------------------------------------------
2543  * initScheduler()
2544  *
2545  * Initialise the scheduler.  This resets all the queues - if the
2546  * queues contained any threads, they'll be garbage collected at the
2547  * next pass.
2548  *
2549  * ------------------------------------------------------------------------ */
2550
2551 void 
2552 initScheduler(void)
2553 {
2554 #if defined(GRAN)
2555   nat i;
2556   for (i=0; i<=MAX_PROC; i++) {
2557     run_queue_hds[i]      = END_TSO_QUEUE;
2558     run_queue_tls[i]      = END_TSO_QUEUE;
2559     blocked_queue_hds[i]  = END_TSO_QUEUE;
2560     blocked_queue_tls[i]  = END_TSO_QUEUE;
2561     ccalling_threadss[i]  = END_TSO_QUEUE;
2562     blackhole_queue[i]    = END_TSO_QUEUE;
2563     sleeping_queue        = END_TSO_QUEUE;
2564   }
2565 #elif !defined(THREADED_RTS)
2566   blocked_queue_hd  = END_TSO_QUEUE;
2567   blocked_queue_tl  = END_TSO_QUEUE;
2568   sleeping_queue    = END_TSO_QUEUE;
2569 #endif
2570
2571   blackhole_queue   = END_TSO_QUEUE;
2572   all_threads       = END_TSO_QUEUE;
2573
2574   context_switch = 0;
2575   sched_state    = SCHED_RUNNING;
2576   recent_activity = ACTIVITY_YES;
2577
2578 #if defined(THREADED_RTS)
2579   /* Initialise the mutex and condition variables used by
2580    * the scheduler. */
2581   initMutex(&sched_mutex);
2582 #endif
2583   
2584   ACQUIRE_LOCK(&sched_mutex);
2585
2586   /* A capability holds the state a native thread needs in
2587    * order to execute STG code. At least one capability is
2588    * floating around (only THREADED_RTS builds have more than one).
2589    */
2590   initCapabilities();
2591
2592   initTaskManager();
2593
2594 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2595   initSparkPools();
2596 #endif
2597
2598 #if defined(THREADED_RTS)
2599   /*
2600    * Eagerly start one worker to run each Capability, except for
2601    * Capability 0.  The idea is that we're probably going to start a
2602    * bound thread on Capability 0 pretty soon, so we don't want a
2603    * worker task hogging it.
2604    */
2605   { 
2606       nat i;
2607       Capability *cap;
2608       for (i = 1; i < n_capabilities; i++) {
2609           cap = &capabilities[i];
2610           ACQUIRE_LOCK(&cap->lock);
2611           startWorkerTask(cap, workerStart);
2612           RELEASE_LOCK(&cap->lock);
2613       }
2614   }
2615 #endif
2616
2617   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2618
2619   RELEASE_LOCK(&sched_mutex);
2620 }
2621
2622 void
2623 exitScheduler(
2624     rtsBool wait_foreign
2625 #if !defined(THREADED_RTS)
2626                          __attribute__((unused))
2627 #endif
2628 )
2629                /* see Capability.c, shutdownCapability() */
2630 {
2631     Task *task = NULL;
2632
2633 #if defined(THREADED_RTS)
2634     ACQUIRE_LOCK(&sched_mutex);
2635     task = newBoundTask();
2636     RELEASE_LOCK(&sched_mutex);
2637 #endif
2638
2639     // If we haven't killed all the threads yet, do it now.
2640     if (sched_state < SCHED_SHUTTING_DOWN) {
2641         sched_state = SCHED_INTERRUPTING;
2642         scheduleDoGC(NULL,task,rtsFalse);    
2643     }
2644     sched_state = SCHED_SHUTTING_DOWN;
2645
2646 #if defined(THREADED_RTS)
2647     { 
2648         nat i;
2649         
2650         for (i = 0; i < n_capabilities; i++) {
2651             shutdownCapability(&capabilities[i], task, wait_foreign);
2652         }
2653         boundTaskExiting(task);
2654         stopTaskManager();
2655     }
2656 #else
2657     freeCapability(&MainCapability);
2658 #endif
2659 }
2660
2661 void
2662 freeScheduler( void )
2663 {
2664     freeTaskManager();
2665     if (n_capabilities != 1) {
2666         stgFree(capabilities);
2667     }
2668 #if defined(THREADED_RTS)
2669     closeMutex(&sched_mutex);
2670 #endif
2671 }
2672
2673 /* -----------------------------------------------------------------------------
2674    performGC
2675
2676    This is the interface to the garbage collector from Haskell land.
2677    We provide this so that external C code can allocate and garbage
2678    collect when called from Haskell via _ccall_GC.
2679    -------------------------------------------------------------------------- */
2680
2681 static void
2682 performGC_(rtsBool force_major)
2683 {
2684     Task *task;
2685     // We must grab a new Task here, because the existing Task may be
2686     // associated with a particular Capability, and chained onto the 
2687     // suspended_ccalling_tasks queue.
2688     ACQUIRE_LOCK(&sched_mutex);
2689     task = newBoundTask();
2690     RELEASE_LOCK(&sched_mutex);
2691     scheduleDoGC(NULL,task,force_major);
2692     boundTaskExiting(task);
2693 }
2694
2695 void
2696 performGC(void)
2697 {
2698     performGC_(rtsFalse);
2699 }
2700
2701 void
2702 performMajorGC(void)
2703 {
2704     performGC_(rtsTrue);
2705 }
2706
2707 /* -----------------------------------------------------------------------------
2708    Stack overflow
2709
2710    If the thread has reached its maximum stack size, then raise the
2711    StackOverflow exception in the offending thread.  Otherwise
2712    relocate the TSO into a larger chunk of memory and adjust its stack
2713    size appropriately.
2714    -------------------------------------------------------------------------- */
2715
2716 static StgTSO *
2717 threadStackOverflow(Capability *cap, StgTSO *tso)
2718 {
2719   nat new_stack_size, stack_words;
2720   lnat new_tso_size;
2721   StgPtr new_sp;
2722   StgTSO *dest;
2723
2724   IF_DEBUG(sanity,checkTSO(tso));
2725
2726   // don't allow throwTo() to modify the blocked_exceptions queue
2727   // while we are moving the TSO:
2728   lockClosure((StgClosure *)tso);
2729
2730   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2731       // NB. never raise a StackOverflow exception if the thread is
2732       // inside Control.Exceptino.block.  It is impractical to protect
2733       // against stack overflow exceptions, since virtually anything
2734       // can raise one (even 'catch'), so this is the only sensible
2735       // thing to do here.  See bug #767.
2736
2737       debugTrace(DEBUG_gc,
2738                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2739                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2740       IF_DEBUG(gc,
2741                /* If we're debugging, just print out the top of the stack */
2742                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2743                                                 tso->sp+64)));
2744
2745       // Send this thread the StackOverflow exception
2746       unlockTSO(tso);
2747       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2748       return tso;
2749   }
2750
2751   /* Try to double the current stack size.  If that takes us over the
2752    * maximum stack size for this thread, then use the maximum instead.
2753    * Finally round up so the TSO ends up as a whole number of blocks.
2754    */
2755   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2756   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2757                                        TSO_STRUCT_SIZE)/sizeof(W_);
2758   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2759   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2760
2761   debugTrace(DEBUG_sched, 
2762              "increasing stack size from %ld words to %d.",
2763              (long)tso->stack_size, new_stack_size);
2764
2765   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2766   TICK_ALLOC_TSO(new_stack_size,0);
2767
2768   /* copy the TSO block and the old stack into the new area */
2769   memcpy(dest,tso,TSO_STRUCT_SIZE);
2770   stack_words = tso->stack + tso->stack_size - tso->sp;
2771   new_sp = (P_)dest + new_tso_size - stack_words;
2772   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2773
2774   /* relocate the stack pointers... */
2775   dest->sp         = new_sp;
2776   dest->stack_size = new_stack_size;
2777         
2778   /* Mark the old TSO as relocated.  We have to check for relocated
2779    * TSOs in the garbage collector and any primops that deal with TSOs.
2780    *
2781    * It's important to set the sp value to just beyond the end
2782    * of the stack, so we don't attempt to scavenge any part of the
2783    * dead TSO's stack.
2784    */
2785   tso->what_next = ThreadRelocated;
2786   tso->link = dest;
2787   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2788   tso->why_blocked = NotBlocked;
2789
2790   IF_PAR_DEBUG(verbose,
2791                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2792                      tso->id, tso, tso->stack_size);
2793                /* If we're debugging, just print out the top of the stack */
2794                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2795                                                 tso->sp+64)));
2796   
2797   unlockTSO(dest);
2798   unlockTSO(tso);
2799
2800   IF_DEBUG(sanity,checkTSO(dest));
2801 #if 0
2802   IF_DEBUG(scheduler,printTSO(dest));
2803 #endif
2804
2805   return dest;
2806 }
2807
2808 /* ---------------------------------------------------------------------------
2809    Interrupt execution
2810    - usually called inside a signal handler so it mustn't do anything fancy.   
2811    ------------------------------------------------------------------------ */
2812
2813 void
2814 interruptStgRts(void)
2815 {
2816     sched_state = SCHED_INTERRUPTING;
2817     context_switch = 1;
2818     wakeUpRts();
2819 }
2820
2821 /* -----------------------------------------------------------------------------
2822    Wake up the RTS
2823    
2824    This function causes at least one OS thread to wake up and run the
2825    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2826    an external event has arrived that may need servicing (eg. a
2827    keyboard interrupt).
2828
2829    In the single-threaded RTS we don't do anything here; we only have
2830    one thread anyway, and the event that caused us to want to wake up
2831    will have interrupted any blocking system call in progress anyway.
2832    -------------------------------------------------------------------------- */
2833
2834 void
2835 wakeUpRts(void)
2836 {
2837 #if defined(THREADED_RTS)
2838     // This forces the IO Manager thread to wakeup, which will
2839     // in turn ensure that some OS thread wakes up and runs the
2840     // scheduler loop, which will cause a GC and deadlock check.
2841     ioManagerWakeup();
2842 #endif
2843 }
2844
2845 /* -----------------------------------------------------------------------------
2846  * checkBlackHoles()
2847  *
2848  * Check the blackhole_queue for threads that can be woken up.  We do
2849  * this periodically: before every GC, and whenever the run queue is
2850  * empty.
2851  *
2852  * An elegant solution might be to just wake up all the blocked
2853  * threads with awakenBlockedQueue occasionally: they'll go back to
2854  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2855  * doesn't give us a way to tell whether we've actually managed to
2856  * wake up any threads, so we would be busy-waiting.
2857  *
2858  * -------------------------------------------------------------------------- */
2859
2860 static rtsBool
2861 checkBlackHoles (Capability *cap)
2862 {
2863     StgTSO **prev, *t;
2864     rtsBool any_woke_up = rtsFalse;
2865     StgHalfWord type;
2866
2867     // blackhole_queue is global:
2868     ASSERT_LOCK_HELD(&sched_mutex);
2869
2870     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2871
2872     // ASSUMES: sched_mutex
2873     prev = &blackhole_queue;
2874     t = blackhole_queue;
2875     while (t != END_TSO_QUEUE) {
2876         ASSERT(t->why_blocked == BlockedOnBlackHole);
2877         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2878         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2879             IF_DEBUG(sanity,checkTSO(t));
2880             t = unblockOne(cap, t);
2881             // urk, the threads migrate to the current capability
2882             // here, but we'd like to keep them on the original one.
2883             *prev = t;
2884             any_woke_up = rtsTrue;
2885         } else {
2886             prev = &t->link;
2887             t = t->link;
2888         }
2889     }
2890
2891     return any_woke_up;
2892 }
2893
2894 /* -----------------------------------------------------------------------------
2895    Deleting threads
2896
2897    This is used for interruption (^C) and forking, and corresponds to
2898    raising an exception but without letting the thread catch the
2899    exception.
2900    -------------------------------------------------------------------------- */
2901
2902 static void 
2903 deleteThread (Capability *cap, StgTSO *tso)
2904 {
2905     // NOTE: must only be called on a TSO that we have exclusive
2906     // access to, because we will call throwToSingleThreaded() below.
2907     // The TSO must be on the run queue of the Capability we own, or 
2908     // we must own all Capabilities.
2909
2910     if (tso->why_blocked != BlockedOnCCall &&
2911         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2912         throwToSingleThreaded(cap,tso,NULL);
2913     }
2914 }
2915
2916 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2917 static void 
2918 deleteThread_(Capability *cap, StgTSO *tso)
2919 { // for forkProcess only:
2920   // like deleteThread(), but we delete threads in foreign calls, too.
2921
2922     if (tso->why_blocked == BlockedOnCCall ||
2923         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
2924         unblockOne(cap,tso);
2925         tso->what_next = ThreadKilled;
2926     } else {
2927         deleteThread(cap,tso);
2928     }
2929 }
2930 #endif
2931
2932 /* -----------------------------------------------------------------------------
2933    raiseExceptionHelper
2934    
2935    This function is called by the raise# primitve, just so that we can
2936    move some of the tricky bits of raising an exception from C-- into
2937    C.  Who knows, it might be a useful re-useable thing here too.
2938    -------------------------------------------------------------------------- */
2939
2940 StgWord
2941 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
2942 {
2943     Capability *cap = regTableToCapability(reg);
2944     StgThunk *raise_closure = NULL;
2945     StgPtr p, next;
2946     StgRetInfoTable *info;
2947     //
2948     // This closure represents the expression 'raise# E' where E
2949     // is the exception raise.  It is used to overwrite all the
2950     // thunks which are currently under evaluataion.
2951     //
2952
2953     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
2954     // LDV profiling: stg_raise_info has THUNK as its closure
2955     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
2956     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
2957     // 1 does not cause any problem unless profiling is performed.
2958     // However, when LDV profiling goes on, we need to linearly scan
2959     // small object pool, where raise_closure is stored, so we should
2960     // use MIN_UPD_SIZE.
2961     //
2962     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
2963     //                                 sizeofW(StgClosure)+1);
2964     //
2965
2966     //
2967     // Walk up the stack, looking for the catch frame.  On the way,
2968     // we update any closures pointed to from update frames with the
2969     // raise closure that we just built.
2970     //
2971     p = tso->sp;
2972     while(1) {
2973         info = get_ret_itbl((StgClosure *)p);
2974         next = p + stack_frame_sizeW((StgClosure *)p);
2975         switch (info->i.type) {
2976             
2977         case UPDATE_FRAME:
2978             // Only create raise_closure if we need to.
2979             if (raise_closure == NULL) {
2980                 raise_closure = 
2981                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
2982                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
2983                 raise_closure->payload[0] = exception;
2984             }
2985             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
2986             p = next;
2987             continue;
2988
2989         case ATOMICALLY_FRAME:
2990             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
2991             tso->sp = p;
2992             return ATOMICALLY_FRAME;
2993             
2994         case CATCH_FRAME:
2995             tso->sp = p;
2996             return CATCH_FRAME;
2997
2998         case CATCH_STM_FRAME:
2999             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3000             tso->sp = p;
3001             return CATCH_STM_FRAME;
3002             
3003         case STOP_FRAME:
3004             tso->sp = p;
3005             return STOP_FRAME;
3006
3007         case CATCH_RETRY_FRAME:
3008         default:
3009             p = next; 
3010             continue;
3011         }
3012     }
3013 }
3014
3015
3016 /* -----------------------------------------------------------------------------
3017    findRetryFrameHelper
3018
3019    This function is called by the retry# primitive.  It traverses the stack
3020    leaving tso->sp referring to the frame which should handle the retry.  
3021
3022    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3023    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3024
3025    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3026    create) because retries are not considered to be exceptions, despite the
3027    similar implementation.
3028
3029    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3030    not be created within memory transactions.
3031    -------------------------------------------------------------------------- */
3032
3033 StgWord
3034 findRetryFrameHelper (StgTSO *tso)
3035 {
3036   StgPtr           p, next;
3037   StgRetInfoTable *info;
3038
3039   p = tso -> sp;
3040   while (1) {
3041     info = get_ret_itbl((StgClosure *)p);
3042     next = p + stack_frame_sizeW((StgClosure *)p);
3043     switch (info->i.type) {
3044       
3045     case ATOMICALLY_FRAME:
3046         debugTrace(DEBUG_stm,
3047                    "found ATOMICALLY_FRAME at %p during retry", p);
3048         tso->sp = p;
3049         return ATOMICALLY_FRAME;
3050       
3051     case CATCH_RETRY_FRAME:
3052         debugTrace(DEBUG_stm,
3053                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3054         tso->sp = p;
3055         return CATCH_RETRY_FRAME;
3056       
3057     case CATCH_STM_FRAME: {
3058         StgTRecHeader *trec = tso -> trec;
3059         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3060         debugTrace(DEBUG_stm,
3061                    "found CATCH_STM_FRAME at %p during retry", p);
3062         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3063         stmAbortTransaction(tso -> cap, trec);
3064         stmFreeAbortedTRec(tso -> cap, trec);
3065         tso -> trec = outer;
3066         p = next; 
3067         continue;
3068     }
3069       
3070
3071     default:
3072       ASSERT(info->i.type != CATCH_FRAME);
3073       ASSERT(info->i.type != STOP_FRAME);
3074       p = next; 
3075       continue;
3076     }
3077   }
3078 }
3079
3080 /* -----------------------------------------------------------------------------
3081    resurrectThreads is called after garbage collection on the list of
3082    threads found to be garbage.  Each of these threads will be woken
3083    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3084    on an MVar, or NonTermination if the thread was blocked on a Black
3085    Hole.
3086
3087    Locks: assumes we hold *all* the capabilities.
3088    -------------------------------------------------------------------------- */
3089
3090 void
3091 resurrectThreads (StgTSO *threads)
3092 {
3093     StgTSO *tso, *next;
3094     Capability *cap;
3095
3096     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3097         next = tso->global_link;
3098         tso->global_link = all_threads;
3099         all_threads = tso;
3100         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3101         
3102         // Wake up the thread on the Capability it was last on
3103         cap = tso->cap;
3104         
3105         switch (tso->why_blocked) {
3106         case BlockedOnMVar:
3107         case BlockedOnException:
3108             /* Called by GC - sched_mutex lock is currently held. */
3109             throwToSingleThreaded(cap, tso,
3110                                   (StgClosure *)BlockedOnDeadMVar_closure);
3111             break;
3112         case BlockedOnBlackHole:
3113             throwToSingleThreaded(cap, tso,
3114                                   (StgClosure *)NonTermination_closure);
3115             break;
3116         case BlockedOnSTM:
3117             throwToSingleThreaded(cap, tso,
3118                                   (StgClosure *)BlockedIndefinitely_closure);
3119             break;
3120         case NotBlocked:
3121             /* This might happen if the thread was blocked on a black hole
3122              * belonging to a thread that we've just woken up (raiseAsync
3123              * can wake up threads, remember...).
3124              */
3125             continue;
3126         default:
3127             barf("resurrectThreads: thread blocked in a strange way");
3128         }
3129     }
3130 }