Fix whitespace in TcTyDecls
[ghc-hetmet.git] / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2006
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #define KEEP_LOCKCLOSURE
11 #include "Rts.h"
12 #include "SchedAPI.h"
13 #include "RtsUtils.h"
14 #include "RtsFlags.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Printer.h"
23 #include "RtsSignals.h"
24 #include "Sanity.h"
25 #include "Stats.h"
26 #include "STM.h"
27 #include "Timer.h"
28 #include "Prelude.h"
29 #include "ThreadLabels.h"
30 #include "LdvProfile.h"
31 #include "Updates.h"
32 #include "Proftimer.h"
33 #include "ProfHeap.h"
34 #if defined(GRAN) || defined(PARALLEL_HASKELL)
35 # include "GranSimRts.h"
36 # include "GranSim.h"
37 # include "ParallelRts.h"
38 # include "Parallel.h"
39 # include "ParallelDebug.h"
40 # include "FetchMe.h"
41 # include "HLC.h"
42 #endif
43 #include "Sparks.h"
44 #include "Capability.h"
45 #include "Task.h"
46 #include "AwaitEvent.h"
47 #if defined(mingw32_HOST_OS)
48 #include "win32/IOManager.h"
49 #endif
50 #include "Trace.h"
51 #include "RaiseAsync.h"
52 #include "Threads.h"
53 #include "ThrIOManager.h"
54
55 #ifdef HAVE_SYS_TYPES_H
56 #include <sys/types.h>
57 #endif
58 #ifdef HAVE_UNISTD_H
59 #include <unistd.h>
60 #endif
61
62 #include <string.h>
63 #include <stdlib.h>
64 #include <stdarg.h>
65
66 #ifdef HAVE_ERRNO_H
67 #include <errno.h>
68 #endif
69
70 // Turn off inlining when debugging - it obfuscates things
71 #ifdef DEBUG
72 # undef  STATIC_INLINE
73 # define STATIC_INLINE static
74 #endif
75
76 /* -----------------------------------------------------------------------------
77  * Global variables
78  * -------------------------------------------------------------------------- */
79
80 #if defined(GRAN)
81
82 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
83 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
84
85 /* 
86    In GranSim we have a runnable and a blocked queue for each processor.
87    In order to minimise code changes new arrays run_queue_hds/tls
88    are created. run_queue_hd is then a short cut (macro) for
89    run_queue_hds[CurrentProc] (see GranSim.h).
90    -- HWL
91 */
92 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
93 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
94 StgTSO *ccalling_threadss[MAX_PROC];
95 /* We use the same global list of threads (all_threads) in GranSim as in
96    the std RTS (i.e. we are cheating). However, we don't use this list in
97    the GranSim specific code at the moment (so we are only potentially
98    cheating).  */
99
100 #else /* !GRAN */
101
102 #if !defined(THREADED_RTS)
103 // Blocked/sleeping thrads
104 StgTSO *blocked_queue_hd = NULL;
105 StgTSO *blocked_queue_tl = NULL;
106 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
107 #endif
108
109 /* Threads blocked on blackholes.
110  * LOCK: sched_mutex+capability, or all capabilities
111  */
112 StgTSO *blackhole_queue = NULL;
113 #endif
114
115 /* The blackhole_queue should be checked for threads to wake up.  See
116  * Schedule.h for more thorough comment.
117  * LOCK: none (doesn't matter if we miss an update)
118  */
119 rtsBool blackholes_need_checking = rtsFalse;
120
121 /* Linked list of all threads.
122  * Used for detecting garbage collected threads.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *all_threads = NULL;
126
127 /* flag set by signal handler to precipitate a context switch
128  * LOCK: none (just an advisory flag)
129  */
130 int context_switch = 0;
131
132 /* flag that tracks whether we have done any execution in this time slice.
133  * LOCK: currently none, perhaps we should lock (but needs to be
134  * updated in the fast path of the scheduler).
135  */
136 nat recent_activity = ACTIVITY_YES;
137
138 /* if this flag is set as well, give up execution
139  * LOCK: none (changes once, from false->true)
140  */
141 rtsBool sched_state = SCHED_RUNNING;
142
143 #if defined(GRAN)
144 StgTSO *CurrentTSO;
145 #endif
146
147 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
148  *  exists - earlier gccs apparently didn't.
149  *  -= chak
150  */
151 StgTSO dummy_tso;
152
153 /*
154  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
155  * in an MT setting, needed to signal that a worker thread shouldn't hang around
156  * in the scheduler when it is out of work.
157  */
158 rtsBool shutting_down_scheduler = rtsFalse;
159
160 /*
161  * This mutex protects most of the global scheduler data in
162  * the THREADED_RTS runtime.
163  */
164 #if defined(THREADED_RTS)
165 Mutex sched_mutex;
166 #endif
167
168 #if defined(PARALLEL_HASKELL)
169 StgTSO *LastTSO;
170 rtsTime TimeOfLastYield;
171 rtsBool emitSchedule = rtsTrue;
172 #endif
173
174 #if !defined(mingw32_HOST_OS)
175 #define FORKPROCESS_PRIMOP_SUPPORTED
176 #endif
177
178 /* -----------------------------------------------------------------------------
179  * static function prototypes
180  * -------------------------------------------------------------------------- */
181
182 static Capability *schedule (Capability *initialCapability, Task *task);
183
184 //
185 // These function all encapsulate parts of the scheduler loop, and are
186 // abstracted only to make the structure and control flow of the
187 // scheduler clearer.
188 //
189 static void schedulePreLoop (void);
190 #if defined(THREADED_RTS)
191 static void schedulePushWork(Capability *cap, Task *task);
192 #endif
193 static void scheduleStartSignalHandlers (Capability *cap);
194 static void scheduleCheckBlockedThreads (Capability *cap);
195 static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
196 static void scheduleCheckBlackHoles (Capability *cap);
197 static void scheduleDetectDeadlock (Capability *cap, Task *task);
198 #if defined(GRAN)
199 static StgTSO *scheduleProcessEvent(rtsEvent *event);
200 #endif
201 #if defined(PARALLEL_HASKELL)
202 static StgTSO *scheduleSendPendingMessages(void);
203 static void scheduleActivateSpark(void);
204 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
205 #endif
206 #if defined(PAR) || defined(GRAN)
207 static void scheduleGranParReport(void);
208 #endif
209 static void schedulePostRunThread(void);
210 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
211 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
212                                          StgTSO *t);
213 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
214                                     nat prev_what_next );
215 static void scheduleHandleThreadBlocked( StgTSO *t );
216 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
217                                              StgTSO *t );
218 static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
219 static Capability *scheduleDoGC(Capability *cap, Task *task,
220                                 rtsBool force_major);
221
222 static rtsBool checkBlackHoles(Capability *cap);
223
224 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
225
226 static void deleteThread (Capability *cap, StgTSO *tso);
227 static void deleteAllThreads (Capability *cap);
228
229 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
230 static void deleteThread_(Capability *cap, StgTSO *tso);
231 #endif
232
233 #if defined(PARALLEL_HASKELL)
234 StgTSO * createSparkThread(rtsSpark spark);
235 StgTSO * activateSpark (rtsSpark spark);  
236 #endif
237
238 #ifdef DEBUG
239 static char *whatNext_strs[] = {
240   "(unknown)",
241   "ThreadRunGHC",
242   "ThreadInterpret",
243   "ThreadKilled",
244   "ThreadRelocated",
245   "ThreadComplete"
246 };
247 #endif
248
249 /* -----------------------------------------------------------------------------
250  * Putting a thread on the run queue: different scheduling policies
251  * -------------------------------------------------------------------------- */
252
253 STATIC_INLINE void
254 addToRunQueue( Capability *cap, StgTSO *t )
255 {
256 #if defined(PARALLEL_HASKELL)
257     if (RtsFlags.ParFlags.doFairScheduling) { 
258         // this does round-robin scheduling; good for concurrency
259         appendToRunQueue(cap,t);
260     } else {
261         // this does unfair scheduling; good for parallelism
262         pushOnRunQueue(cap,t);
263     }
264 #else
265     // this does round-robin scheduling; good for concurrency
266     appendToRunQueue(cap,t);
267 #endif
268 }
269
270 /* ---------------------------------------------------------------------------
271    Main scheduling loop.
272
273    We use round-robin scheduling, each thread returning to the
274    scheduler loop when one of these conditions is detected:
275
276       * out of heap space
277       * timer expires (thread yields)
278       * thread blocks
279       * thread ends
280       * stack overflow
281
282    GRAN version:
283      In a GranSim setup this loop iterates over the global event queue.
284      This revolves around the global event queue, which determines what 
285      to do next. Therefore, it's more complicated than either the 
286      concurrent or the parallel (GUM) setup.
287
288    GUM version:
289      GUM iterates over incoming messages.
290      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
291      and sends out a fish whenever it has nothing to do; in-between
292      doing the actual reductions (shared code below) it processes the
293      incoming messages and deals with delayed operations 
294      (see PendingFetches).
295      This is not the ugliest code you could imagine, but it's bloody close.
296
297    ------------------------------------------------------------------------ */
298
299 static Capability *
300 schedule (Capability *initialCapability, Task *task)
301 {
302   StgTSO *t;
303   Capability *cap;
304   StgThreadReturnCode ret;
305 #if defined(GRAN)
306   rtsEvent *event;
307 #elif defined(PARALLEL_HASKELL)
308   StgTSO *tso;
309   GlobalTaskId pe;
310   rtsBool receivedFinish = rtsFalse;
311 # if defined(DEBUG)
312   nat tp_size, sp_size; // stats only
313 # endif
314 #endif
315   nat prev_what_next;
316   rtsBool ready_to_gc;
317 #if defined(THREADED_RTS)
318   rtsBool first = rtsTrue;
319 #endif
320   
321   cap = initialCapability;
322
323   // Pre-condition: this task owns initialCapability.
324   // The sched_mutex is *NOT* held
325   // NB. on return, we still hold a capability.
326
327   debugTrace (DEBUG_sched, 
328               "### NEW SCHEDULER LOOP (task: %p, cap: %p)",
329               task, initialCapability);
330
331   schedulePreLoop();
332
333   // -----------------------------------------------------------
334   // Scheduler loop starts here:
335
336 #if defined(PARALLEL_HASKELL)
337 #define TERMINATION_CONDITION        (!receivedFinish)
338 #elif defined(GRAN)
339 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
340 #else
341 #define TERMINATION_CONDITION        rtsTrue
342 #endif
343
344   while (TERMINATION_CONDITION) {
345
346 #if defined(GRAN)
347       /* Choose the processor with the next event */
348       CurrentProc = event->proc;
349       CurrentTSO = event->tso;
350 #endif
351
352 #if defined(THREADED_RTS)
353       if (first) {
354           // don't yield the first time, we want a chance to run this
355           // thread for a bit, even if there are others banging at the
356           // door.
357           first = rtsFalse;
358           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
359       } else {
360           // Yield the capability to higher-priority tasks if necessary.
361           yieldCapability(&cap, task);
362       }
363 #endif
364       
365 #if defined(THREADED_RTS)
366       schedulePushWork(cap,task);
367 #endif
368
369     // Check whether we have re-entered the RTS from Haskell without
370     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
371     // call).
372     if (cap->in_haskell) {
373           errorBelch("schedule: re-entered unsafely.\n"
374                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
375           stg_exit(EXIT_FAILURE);
376     }
377
378     // The interruption / shutdown sequence.
379     // 
380     // In order to cleanly shut down the runtime, we want to:
381     //   * make sure that all main threads return to their callers
382     //     with the state 'Interrupted'.
383     //   * clean up all OS threads assocated with the runtime
384     //   * free all memory etc.
385     //
386     // So the sequence for ^C goes like this:
387     //
388     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
389     //     arranges for some Capability to wake up
390     //
391     //   * all threads in the system are halted, and the zombies are
392     //     placed on the run queue for cleaning up.  We acquire all
393     //     the capabilities in order to delete the threads, this is
394     //     done by scheduleDoGC() for convenience (because GC already
395     //     needs to acquire all the capabilities).  We can't kill
396     //     threads involved in foreign calls.
397     // 
398     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
399     //
400     //   * sched_state := SCHED_SHUTTING_DOWN
401     //
402     //   * all workers exit when the run queue on their capability
403     //     drains.  All main threads will also exit when their TSO
404     //     reaches the head of the run queue and they can return.
405     //
406     //   * eventually all Capabilities will shut down, and the RTS can
407     //     exit.
408     //
409     //   * We might be left with threads blocked in foreign calls, 
410     //     we should really attempt to kill these somehow (TODO);
411     
412     switch (sched_state) {
413     case SCHED_RUNNING:
414         break;
415     case SCHED_INTERRUPTING:
416         debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
417 #if defined(THREADED_RTS)
418         discardSparksCap(cap);
419 #endif
420         /* scheduleDoGC() deletes all the threads */
421         cap = scheduleDoGC(cap,task,rtsFalse);
422         break;
423     case SCHED_SHUTTING_DOWN:
424         debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
425         // If we are a worker, just exit.  If we're a bound thread
426         // then we will exit below when we've removed our TSO from
427         // the run queue.
428         if (task->tso == NULL && emptyRunQueue(cap)) {
429             return cap;
430         }
431         break;
432     default:
433         barf("sched_state: %d", sched_state);
434     }
435
436 #if defined(THREADED_RTS)
437     // If the run queue is empty, take a spark and turn it into a thread.
438     {
439         if (emptyRunQueue(cap)) {
440             StgClosure *spark;
441             spark = findSpark(cap);
442             if (spark != NULL) {
443                 debugTrace(DEBUG_sched,
444                            "turning spark of closure %p into a thread",
445                            (StgClosure *)spark);
446                 createSparkThread(cap,spark);     
447             }
448         }
449     }
450 #endif // THREADED_RTS
451
452     scheduleStartSignalHandlers(cap);
453
454     // Only check the black holes here if we've nothing else to do.
455     // During normal execution, the black hole list only gets checked
456     // at GC time, to avoid repeatedly traversing this possibly long
457     // list each time around the scheduler.
458     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
459
460     scheduleCheckWakeupThreads(cap);
461
462     scheduleCheckBlockedThreads(cap);
463
464     scheduleDetectDeadlock(cap,task);
465 #if defined(THREADED_RTS)
466     cap = task->cap;    // reload cap, it might have changed
467 #endif
468
469     // Normally, the only way we can get here with no threads to
470     // run is if a keyboard interrupt received during 
471     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
472     // Additionally, it is not fatal for the
473     // threaded RTS to reach here with no threads to run.
474     //
475     // win32: might be here due to awaitEvent() being abandoned
476     // as a result of a console event having been delivered.
477     if ( emptyRunQueue(cap) ) {
478 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
479         ASSERT(sched_state >= SCHED_INTERRUPTING);
480 #endif
481         continue; // nothing to do
482     }
483
484 #if defined(PARALLEL_HASKELL)
485     scheduleSendPendingMessages();
486     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
487         continue;
488
489 #if defined(SPARKS)
490     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
491 #endif
492
493     /* If we still have no work we need to send a FISH to get a spark
494        from another PE */
495     if (emptyRunQueue(cap)) {
496         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
497         ASSERT(rtsFalse); // should not happen at the moment
498     }
499     // from here: non-empty run queue.
500     //  TODO: merge above case with this, only one call processMessages() !
501     if (PacketsWaiting()) {  /* process incoming messages, if
502                                 any pending...  only in else
503                                 because getRemoteWork waits for
504                                 messages as well */
505         receivedFinish = processMessages();
506     }
507 #endif
508
509 #if defined(GRAN)
510     scheduleProcessEvent(event);
511 #endif
512
513     // 
514     // Get a thread to run
515     //
516     t = popRunQueue(cap);
517
518 #if defined(GRAN) || defined(PAR)
519     scheduleGranParReport(); // some kind of debuging output
520 #else
521     // Sanity check the thread we're about to run.  This can be
522     // expensive if there is lots of thread switching going on...
523     IF_DEBUG(sanity,checkTSO(t));
524 #endif
525
526 #if defined(THREADED_RTS)
527     // Check whether we can run this thread in the current task.
528     // If not, we have to pass our capability to the right task.
529     {
530         Task *bound = t->bound;
531       
532         if (bound) {
533             if (bound == task) {
534                 debugTrace(DEBUG_sched,
535                            "### Running thread %lu in bound thread", (unsigned long)t->id);
536                 // yes, the Haskell thread is bound to the current native thread
537             } else {
538                 debugTrace(DEBUG_sched,
539                            "### thread %lu bound to another OS thread", (unsigned long)t->id);
540                 // no, bound to a different Haskell thread: pass to that thread
541                 pushOnRunQueue(cap,t);
542                 continue;
543             }
544         } else {
545             // The thread we want to run is unbound.
546             if (task->tso) { 
547                 debugTrace(DEBUG_sched,
548                            "### this OS thread cannot run thread %lu", (unsigned long)t->id);
549                 // no, the current native thread is bound to a different
550                 // Haskell thread, so pass it to any worker thread
551                 pushOnRunQueue(cap,t);
552                 continue; 
553             }
554         }
555     }
556 #endif
557
558     cap->r.rCurrentTSO = t;
559     
560     /* context switches are initiated by the timer signal, unless
561      * the user specified "context switch as often as possible", with
562      * +RTS -C0
563      */
564     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
565         && !emptyThreadQueues(cap)) {
566         context_switch = 1;
567     }
568          
569 run_thread:
570
571     debugTrace(DEBUG_sched, "-->> running thread %ld %s ...", 
572                               (long)t->id, whatNext_strs[t->what_next]);
573
574     startHeapProfTimer();
575
576     // Check for exceptions blocked on this thread
577     maybePerformBlockedException (cap, t);
578
579     // ----------------------------------------------------------------------
580     // Run the current thread 
581
582     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
583     ASSERT(t->cap == cap);
584
585     prev_what_next = t->what_next;
586
587     errno = t->saved_errno;
588 #if mingw32_HOST_OS
589     SetLastError(t->saved_winerror);
590 #endif
591
592     cap->in_haskell = rtsTrue;
593
594     dirtyTSO(t);
595
596 #if defined(THREADED_RTS)
597     if (recent_activity == ACTIVITY_DONE_GC) {
598         // ACTIVITY_DONE_GC means we turned off the timer signal to
599         // conserve power (see #1623).  Re-enable it here.
600         nat prev;
601         prev = xchg(&recent_activity, ACTIVITY_YES);
602         if (prev == ACTIVITY_DONE_GC) {
603             startTimer();
604         }
605     } else {
606         recent_activity = ACTIVITY_YES;
607     }
608 #endif
609
610     switch (prev_what_next) {
611         
612     case ThreadKilled:
613     case ThreadComplete:
614         /* Thread already finished, return to scheduler. */
615         ret = ThreadFinished;
616         break;
617         
618     case ThreadRunGHC:
619     {
620         StgRegTable *r;
621         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
622         cap = regTableToCapability(r);
623         ret = r->rRet;
624         break;
625     }
626     
627     case ThreadInterpret:
628         cap = interpretBCO(cap);
629         ret = cap->r.rRet;
630         break;
631         
632     default:
633         barf("schedule: invalid what_next field");
634     }
635
636     cap->in_haskell = rtsFalse;
637
638     // The TSO might have moved, eg. if it re-entered the RTS and a GC
639     // happened.  So find the new location:
640     t = cap->r.rCurrentTSO;
641
642     // We have run some Haskell code: there might be blackhole-blocked
643     // threads to wake up now.
644     // Lock-free test here should be ok, we're just setting a flag.
645     if ( blackhole_queue != END_TSO_QUEUE ) {
646         blackholes_need_checking = rtsTrue;
647     }
648     
649     // And save the current errno in this thread.
650     // XXX: possibly bogus for SMP because this thread might already
651     // be running again, see code below.
652     t->saved_errno = errno;
653 #if mingw32_HOST_OS
654     // Similarly for Windows error code
655     t->saved_winerror = GetLastError();
656 #endif
657
658 #if defined(THREADED_RTS)
659     // If ret is ThreadBlocked, and this Task is bound to the TSO that
660     // blocked, we are in limbo - the TSO is now owned by whatever it
661     // is blocked on, and may in fact already have been woken up,
662     // perhaps even on a different Capability.  It may be the case
663     // that task->cap != cap.  We better yield this Capability
664     // immediately and return to normaility.
665     if (ret == ThreadBlocked) {
666         debugTrace(DEBUG_sched,
667                    "--<< thread %lu (%s) stopped: blocked",
668                    (unsigned long)t->id, whatNext_strs[t->what_next]);
669         continue;
670     }
671 #endif
672
673     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
674     ASSERT(t->cap == cap);
675
676     // ----------------------------------------------------------------------
677     
678     // Costs for the scheduler are assigned to CCS_SYSTEM
679     stopHeapProfTimer();
680 #if defined(PROFILING)
681     CCCS = CCS_SYSTEM;
682 #endif
683     
684     schedulePostRunThread();
685
686     ready_to_gc = rtsFalse;
687
688     switch (ret) {
689     case HeapOverflow:
690         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
691         break;
692
693     case StackOverflow:
694         scheduleHandleStackOverflow(cap,task,t);
695         break;
696
697     case ThreadYielding:
698         if (scheduleHandleYield(cap, t, prev_what_next)) {
699             // shortcut for switching between compiler/interpreter:
700             goto run_thread; 
701         }
702         break;
703
704     case ThreadBlocked:
705         scheduleHandleThreadBlocked(t);
706         break;
707
708     case ThreadFinished:
709         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
710         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
711         break;
712
713     default:
714       barf("schedule: invalid thread return code %d", (int)ret);
715     }
716
717     if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
718       cap = scheduleDoGC(cap,task,rtsFalse);
719     }
720   } /* end of while() */
721
722   debugTrace(PAR_DEBUG_verbose,
723              "== Leaving schedule() after having received Finish");
724 }
725
726 /* ----------------------------------------------------------------------------
727  * Setting up the scheduler loop
728  * ------------------------------------------------------------------------- */
729
730 static void
731 schedulePreLoop(void)
732 {
733 #if defined(GRAN) 
734     /* set up first event to get things going */
735     /* ToDo: assign costs for system setup and init MainTSO ! */
736     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
737               ContinueThread, 
738               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
739     
740     debugTrace (DEBUG_gran,
741                 "GRAN: Init CurrentTSO (in schedule) = %p", 
742                 CurrentTSO);
743     IF_DEBUG(gran, G_TSO(CurrentTSO, 5));
744     
745     if (RtsFlags.GranFlags.Light) {
746         /* Save current time; GranSim Light only */
747         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
748     }      
749 #endif
750 }
751
752 /* -----------------------------------------------------------------------------
753  * schedulePushWork()
754  *
755  * Push work to other Capabilities if we have some.
756  * -------------------------------------------------------------------------- */
757
758 #if defined(THREADED_RTS)
759 static void
760 schedulePushWork(Capability *cap USED_IF_THREADS, 
761                  Task *task      USED_IF_THREADS)
762 {
763     Capability *free_caps[n_capabilities], *cap0;
764     nat i, n_free_caps;
765
766     // migration can be turned off with +RTS -qg
767     if (!RtsFlags.ParFlags.migrate) return;
768
769     // Check whether we have more threads on our run queue, or sparks
770     // in our pool, that we could hand to another Capability.
771     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
772         && sparkPoolSizeCap(cap) < 2) {
773         return;
774     }
775
776     // First grab as many free Capabilities as we can.
777     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
778         cap0 = &capabilities[i];
779         if (cap != cap0 && tryGrabCapability(cap0,task)) {
780             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
781                 // it already has some work, we just grabbed it at 
782                 // the wrong moment.  Or maybe it's deadlocked!
783                 releaseCapability(cap0);
784             } else {
785                 free_caps[n_free_caps++] = cap0;
786             }
787         }
788     }
789
790     // we now have n_free_caps free capabilities stashed in
791     // free_caps[].  Share our run queue equally with them.  This is
792     // probably the simplest thing we could do; improvements we might
793     // want to do include:
794     //
795     //   - giving high priority to moving relatively new threads, on 
796     //     the gournds that they haven't had time to build up a
797     //     working set in the cache on this CPU/Capability.
798     //
799     //   - giving low priority to moving long-lived threads
800
801     if (n_free_caps > 0) {
802         StgTSO *prev, *t, *next;
803         rtsBool pushed_to_all;
804
805         debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
806
807         i = 0;
808         pushed_to_all = rtsFalse;
809
810         if (cap->run_queue_hd != END_TSO_QUEUE) {
811             prev = cap->run_queue_hd;
812             t = prev->link;
813             prev->link = END_TSO_QUEUE;
814             for (; t != END_TSO_QUEUE; t = next) {
815                 next = t->link;
816                 t->link = END_TSO_QUEUE;
817                 if (t->what_next == ThreadRelocated
818                     || t->bound == task // don't move my bound thread
819                     || tsoLocked(t)) {  // don't move a locked thread
820                     prev->link = t;
821                     prev = t;
822                 } else if (i == n_free_caps) {
823                     pushed_to_all = rtsTrue;
824                     i = 0;
825                     // keep one for us
826                     prev->link = t;
827                     prev = t;
828                 } else {
829                     debugTrace(DEBUG_sched, "pushing thread %lu to capability %d", (unsigned long)t->id, free_caps[i]->no);
830                     appendToRunQueue(free_caps[i],t);
831                     if (t->bound) { t->bound->cap = free_caps[i]; }
832                     t->cap = free_caps[i];
833                     i++;
834                 }
835             }
836             cap->run_queue_tl = prev;
837         }
838
839         // If there are some free capabilities that we didn't push any
840         // threads to, then try to push a spark to each one.
841         if (!pushed_to_all) {
842             StgClosure *spark;
843             // i is the next free capability to push to
844             for (; i < n_free_caps; i++) {
845                 if (emptySparkPoolCap(free_caps[i])) {
846                     spark = findSpark(cap);
847                     if (spark != NULL) {
848                         debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
849                         newSpark(&(free_caps[i]->r), spark);
850                     }
851                 }
852             }
853         }
854
855         // release the capabilities
856         for (i = 0; i < n_free_caps; i++) {
857             task->cap = free_caps[i];
858             releaseCapability(free_caps[i]);
859         }
860     }
861     task->cap = cap; // reset to point to our Capability.
862 }
863 #endif
864
865 /* ----------------------------------------------------------------------------
866  * Start any pending signal handlers
867  * ------------------------------------------------------------------------- */
868
869 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
870 static void
871 scheduleStartSignalHandlers(Capability *cap)
872 {
873     if (RtsFlags.MiscFlags.install_signal_handlers && signals_pending()) {
874         // safe outside the lock
875         startSignalHandlers(cap);
876     }
877 }
878 #else
879 static void
880 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
881 {
882 }
883 #endif
884
885 /* ----------------------------------------------------------------------------
886  * Check for blocked threads that can be woken up.
887  * ------------------------------------------------------------------------- */
888
889 static void
890 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
891 {
892 #if !defined(THREADED_RTS)
893     //
894     // Check whether any waiting threads need to be woken up.  If the
895     // run queue is empty, and there are no other tasks running, we
896     // can wait indefinitely for something to happen.
897     //
898     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
899     {
900         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
901     }
902 #endif
903 }
904
905
906 /* ----------------------------------------------------------------------------
907  * Check for threads woken up by other Capabilities
908  * ------------------------------------------------------------------------- */
909
910 static void
911 scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
912 {
913 #if defined(THREADED_RTS)
914     // Any threads that were woken up by other Capabilities get
915     // appended to our run queue.
916     if (!emptyWakeupQueue(cap)) {
917         ACQUIRE_LOCK(&cap->lock);
918         if (emptyRunQueue(cap)) {
919             cap->run_queue_hd = cap->wakeup_queue_hd;
920             cap->run_queue_tl = cap->wakeup_queue_tl;
921         } else {
922             cap->run_queue_tl->link = cap->wakeup_queue_hd;
923             cap->run_queue_tl = cap->wakeup_queue_tl;
924         }
925         cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
926         RELEASE_LOCK(&cap->lock);
927     }
928 #endif
929 }
930
931 /* ----------------------------------------------------------------------------
932  * Check for threads blocked on BLACKHOLEs that can be woken up
933  * ------------------------------------------------------------------------- */
934 static void
935 scheduleCheckBlackHoles (Capability *cap)
936 {
937     if ( blackholes_need_checking ) // check without the lock first
938     {
939         ACQUIRE_LOCK(&sched_mutex);
940         if ( blackholes_need_checking ) {
941             checkBlackHoles(cap);
942             blackholes_need_checking = rtsFalse;
943         }
944         RELEASE_LOCK(&sched_mutex);
945     }
946 }
947
948 /* ----------------------------------------------------------------------------
949  * Detect deadlock conditions and attempt to resolve them.
950  * ------------------------------------------------------------------------- */
951
952 static void
953 scheduleDetectDeadlock (Capability *cap, Task *task)
954 {
955
956 #if defined(PARALLEL_HASKELL)
957     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
958     return;
959 #endif
960
961     /* 
962      * Detect deadlock: when we have no threads to run, there are no
963      * threads blocked, waiting for I/O, or sleeping, and all the
964      * other tasks are waiting for work, we must have a deadlock of
965      * some description.
966      */
967     if ( emptyThreadQueues(cap) )
968     {
969 #if defined(THREADED_RTS)
970         /* 
971          * In the threaded RTS, we only check for deadlock if there
972          * has been no activity in a complete timeslice.  This means
973          * we won't eagerly start a full GC just because we don't have
974          * any threads to run currently.
975          */
976         if (recent_activity != ACTIVITY_INACTIVE) return;
977 #endif
978
979         debugTrace(DEBUG_sched, "deadlocked, forcing major GC...");
980
981         // Garbage collection can release some new threads due to
982         // either (a) finalizers or (b) threads resurrected because
983         // they are unreachable and will therefore be sent an
984         // exception.  Any threads thus released will be immediately
985         // runnable.
986         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/);
987
988         recent_activity = ACTIVITY_DONE_GC;
989         // disable timer signals (see #1623)
990         stopTimer();
991         
992         if ( !emptyRunQueue(cap) ) return;
993
994 #if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS)
995         /* If we have user-installed signal handlers, then wait
996          * for signals to arrive rather then bombing out with a
997          * deadlock.
998          */
999         if ( RtsFlags.MiscFlags.install_signal_handlers && anyUserHandlers() ) {
1000             debugTrace(DEBUG_sched,
1001                        "still deadlocked, waiting for signals...");
1002
1003             awaitUserSignals();
1004
1005             if (signals_pending()) {
1006                 startSignalHandlers(cap);
1007             }
1008
1009             // either we have threads to run, or we were interrupted:
1010             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
1011         }
1012 #endif
1013
1014 #if !defined(THREADED_RTS)
1015         /* Probably a real deadlock.  Send the current main thread the
1016          * Deadlock exception.
1017          */
1018         if (task->tso) {
1019             switch (task->tso->why_blocked) {
1020             case BlockedOnSTM:
1021             case BlockedOnBlackHole:
1022             case BlockedOnException:
1023             case BlockedOnMVar:
1024                 throwToSingleThreaded(cap, task->tso, 
1025                                       (StgClosure *)NonTermination_closure);
1026                 return;
1027             default:
1028                 barf("deadlock: main thread blocked in a strange way");
1029             }
1030         }
1031         return;
1032 #endif
1033     }
1034 }
1035
1036 /* ----------------------------------------------------------------------------
1037  * Process an event (GRAN only)
1038  * ------------------------------------------------------------------------- */
1039
1040 #if defined(GRAN)
1041 static StgTSO *
1042 scheduleProcessEvent(rtsEvent *event)
1043 {
1044     StgTSO *t;
1045
1046     if (RtsFlags.GranFlags.Light)
1047       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1048
1049     /* adjust time based on time-stamp */
1050     if (event->time > CurrentTime[CurrentProc] &&
1051         event->evttype != ContinueThread)
1052       CurrentTime[CurrentProc] = event->time;
1053     
1054     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1055     if (!RtsFlags.GranFlags.Light)
1056       handleIdlePEs();
1057
1058     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1059
1060     /* main event dispatcher in GranSim */
1061     switch (event->evttype) {
1062       /* Should just be continuing execution */
1063     case ContinueThread:
1064       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1065       /* ToDo: check assertion
1066       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1067              run_queue_hd != END_TSO_QUEUE);
1068       */
1069       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1070       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1071           procStatus[CurrentProc]==Fetching) {
1072         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1073               CurrentTSO->id, CurrentTSO, CurrentProc);
1074         goto next_thread;
1075       } 
1076       /* Ignore ContinueThreads for completed threads */
1077       if (CurrentTSO->what_next == ThreadComplete) {
1078         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1079               CurrentTSO->id, CurrentTSO, CurrentProc);
1080         goto next_thread;
1081       } 
1082       /* Ignore ContinueThreads for threads that are being migrated */
1083       if (PROCS(CurrentTSO)==Nowhere) { 
1084         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1085               CurrentTSO->id, CurrentTSO, CurrentProc);
1086         goto next_thread;
1087       }
1088       /* The thread should be at the beginning of the run queue */
1089       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1090         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1091               CurrentTSO->id, CurrentTSO, CurrentProc);
1092         break; // run the thread anyway
1093       }
1094       /*
1095       new_event(proc, proc, CurrentTime[proc],
1096                 FindWork,
1097                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1098       goto next_thread; 
1099       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1100       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1101
1102     case FetchNode:
1103       do_the_fetchnode(event);
1104       goto next_thread;             /* handle next event in event queue  */
1105       
1106     case GlobalBlock:
1107       do_the_globalblock(event);
1108       goto next_thread;             /* handle next event in event queue  */
1109       
1110     case FetchReply:
1111       do_the_fetchreply(event);
1112       goto next_thread;             /* handle next event in event queue  */
1113       
1114     case UnblockThread:   /* Move from the blocked queue to the tail of */
1115       do_the_unblock(event);
1116       goto next_thread;             /* handle next event in event queue  */
1117       
1118     case ResumeThread:  /* Move from the blocked queue to the tail of */
1119       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1120       event->tso->gran.blocktime += 
1121         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1122       do_the_startthread(event);
1123       goto next_thread;             /* handle next event in event queue  */
1124       
1125     case StartThread:
1126       do_the_startthread(event);
1127       goto next_thread;             /* handle next event in event queue  */
1128       
1129     case MoveThread:
1130       do_the_movethread(event);
1131       goto next_thread;             /* handle next event in event queue  */
1132       
1133     case MoveSpark:
1134       do_the_movespark(event);
1135       goto next_thread;             /* handle next event in event queue  */
1136       
1137     case FindWork:
1138       do_the_findwork(event);
1139       goto next_thread;             /* handle next event in event queue  */
1140       
1141     default:
1142       barf("Illegal event type %u\n", event->evttype);
1143     }  /* switch */
1144     
1145     /* This point was scheduler_loop in the old RTS */
1146
1147     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1148
1149     TimeOfLastEvent = CurrentTime[CurrentProc];
1150     TimeOfNextEvent = get_time_of_next_event();
1151     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1152     // CurrentTSO = ThreadQueueHd;
1153
1154     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1155                          TimeOfNextEvent));
1156
1157     if (RtsFlags.GranFlags.Light) 
1158       GranSimLight_leave_system(event, &ActiveTSO); 
1159
1160     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1161
1162     IF_DEBUG(gran, 
1163              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1164
1165     /* in a GranSim setup the TSO stays on the run queue */
1166     t = CurrentTSO;
1167     /* Take a thread from the run queue. */
1168     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1169
1170     IF_DEBUG(gran, 
1171              debugBelch("GRAN: About to run current thread, which is\n");
1172              G_TSO(t,5));
1173
1174     context_switch = 0; // turned on via GranYield, checking events and time slice
1175
1176     IF_DEBUG(gran, 
1177              DumpGranEvent(GR_SCHEDULE, t));
1178
1179     procStatus[CurrentProc] = Busy;
1180 }
1181 #endif // GRAN
1182
1183 /* ----------------------------------------------------------------------------
1184  * Send pending messages (PARALLEL_HASKELL only)
1185  * ------------------------------------------------------------------------- */
1186
1187 #if defined(PARALLEL_HASKELL)
1188 static StgTSO *
1189 scheduleSendPendingMessages(void)
1190 {
1191     StgSparkPool *pool;
1192     rtsSpark spark;
1193     StgTSO *t;
1194
1195 # if defined(PAR) // global Mem.Mgmt., omit for now
1196     if (PendingFetches != END_BF_QUEUE) {
1197         processFetches();
1198     }
1199 # endif
1200     
1201     if (RtsFlags.ParFlags.BufferTime) {
1202         // if we use message buffering, we must send away all message
1203         // packets which have become too old...
1204         sendOldBuffers(); 
1205     }
1206 }
1207 #endif
1208
1209 /* ----------------------------------------------------------------------------
1210  * Activate spark threads (PARALLEL_HASKELL only)
1211  * ------------------------------------------------------------------------- */
1212
1213 #if defined(PARALLEL_HASKELL)
1214 static void
1215 scheduleActivateSpark(void)
1216 {
1217 #if defined(SPARKS)
1218   ASSERT(emptyRunQueue());
1219 /* We get here if the run queue is empty and want some work.
1220    We try to turn a spark into a thread, and add it to the run queue,
1221    from where it will be picked up in the next iteration of the scheduler
1222    loop.
1223 */
1224
1225       /* :-[  no local threads => look out for local sparks */
1226       /* the spark pool for the current PE */
1227       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1228       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1229           pool->hd < pool->tl) {
1230         /* 
1231          * ToDo: add GC code check that we really have enough heap afterwards!!
1232          * Old comment:
1233          * If we're here (no runnable threads) and we have pending
1234          * sparks, we must have a space problem.  Get enough space
1235          * to turn one of those pending sparks into a
1236          * thread... 
1237          */
1238
1239         spark = findSpark(rtsFalse);            /* get a spark */
1240         if (spark != (rtsSpark) NULL) {
1241           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1242           IF_PAR_DEBUG(fish, // schedule,
1243                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1244                              tso->id, tso, advisory_thread_count));
1245
1246           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1247             IF_PAR_DEBUG(fish, // schedule,
1248                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1249                             spark));
1250             return rtsFalse; /* failed to generate a thread */
1251           }                  /* otherwise fall through & pick-up new tso */
1252         } else {
1253           IF_PAR_DEBUG(fish, // schedule,
1254                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1255                              spark_queue_len(pool)));
1256           return rtsFalse;  /* failed to generate a thread */
1257         }
1258         return rtsTrue;  /* success in generating a thread */
1259   } else { /* no more threads permitted or pool empty */
1260     return rtsFalse;  /* failed to generateThread */
1261   }
1262 #else
1263   tso = NULL; // avoid compiler warning only
1264   return rtsFalse;  /* dummy in non-PAR setup */
1265 #endif // SPARKS
1266 }
1267 #endif // PARALLEL_HASKELL
1268
1269 /* ----------------------------------------------------------------------------
1270  * Get work from a remote node (PARALLEL_HASKELL only)
1271  * ------------------------------------------------------------------------- */
1272     
1273 #if defined(PARALLEL_HASKELL)
1274 static rtsBool
1275 scheduleGetRemoteWork(rtsBool *receivedFinish)
1276 {
1277   ASSERT(emptyRunQueue());
1278
1279   if (RtsFlags.ParFlags.BufferTime) {
1280         IF_PAR_DEBUG(verbose, 
1281                 debugBelch("...send all pending data,"));
1282         {
1283           nat i;
1284           for (i=1; i<=nPEs; i++)
1285             sendImmediately(i); // send all messages away immediately
1286         }
1287   }
1288 # ifndef SPARKS
1289         //++EDEN++ idle() , i.e. send all buffers, wait for work
1290         // suppress fishing in EDEN... just look for incoming messages
1291         // (blocking receive)
1292   IF_PAR_DEBUG(verbose, 
1293                debugBelch("...wait for incoming messages...\n"));
1294   *receivedFinish = processMessages(); // blocking receive...
1295
1296         // and reenter scheduling loop after having received something
1297         // (return rtsFalse below)
1298
1299 # else /* activate SPARKS machinery */
1300 /* We get here, if we have no work, tried to activate a local spark, but still
1301    have no work. We try to get a remote spark, by sending a FISH message.
1302    Thread migration should be added here, and triggered when a sequence of 
1303    fishes returns without work. */
1304         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1305
1306       /* =8-[  no local sparks => look for work on other PEs */
1307         /*
1308          * We really have absolutely no work.  Send out a fish
1309          * (there may be some out there already), and wait for
1310          * something to arrive.  We clearly can't run any threads
1311          * until a SCHEDULE or RESUME arrives, and so that's what
1312          * we're hoping to see.  (Of course, we still have to
1313          * respond to other types of messages.)
1314          */
1315         rtsTime now = msTime() /*CURRENT_TIME*/;
1316         IF_PAR_DEBUG(verbose, 
1317                      debugBelch("--  now=%ld\n", now));
1318         IF_PAR_DEBUG(fish, // verbose,
1319              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1320                  (last_fish_arrived_at!=0 &&
1321                   last_fish_arrived_at+delay > now)) {
1322                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1323                      now, last_fish_arrived_at+delay, 
1324                      last_fish_arrived_at,
1325                      delay);
1326              });
1327   
1328         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1329             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1330           if (last_fish_arrived_at==0 ||
1331               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1332             /* outstandingFishes is set in sendFish, processFish;
1333                avoid flooding system with fishes via delay */
1334     next_fish_to_send_at = 0;  
1335   } else {
1336     /* ToDo: this should be done in the main scheduling loop to avoid the
1337              busy wait here; not so bad if fish delay is very small  */
1338     int iq = 0; // DEBUGGING -- HWL
1339     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1340     /* send a fish when ready, but process messages that arrive in the meantime */
1341     do {
1342       if (PacketsWaiting()) {
1343         iq++; // DEBUGGING
1344         *receivedFinish = processMessages();
1345       }
1346       now = msTime();
1347     } while (!*receivedFinish || now<next_fish_to_send_at);
1348     // JB: This means the fish could become obsolete, if we receive
1349     // work. Better check for work again? 
1350     // last line: while (!receivedFinish || !haveWork || now<...)
1351     // next line: if (receivedFinish || haveWork )
1352
1353     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1354       return rtsFalse;  // NB: this will leave scheduler loop
1355                         // immediately after return!
1356                           
1357     IF_PAR_DEBUG(fish, // verbose,
1358                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1359
1360   }
1361
1362     // JB: IMHO, this should all be hidden inside sendFish(...)
1363     /* pe = choosePE(); 
1364        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1365                 NEW_FISH_HUNGER);
1366
1367     // Global statistics: count no. of fishes
1368     if (RtsFlags.ParFlags.ParStats.Global &&
1369          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1370            globalParStats.tot_fish_mess++;
1371            }
1372     */ 
1373
1374   /* delayed fishes must have been sent by now! */
1375   next_fish_to_send_at = 0;  
1376   }
1377       
1378   *receivedFinish = processMessages();
1379 # endif /* SPARKS */
1380
1381  return rtsFalse;
1382  /* NB: this function always returns rtsFalse, meaning the scheduler
1383     loop continues with the next iteration; 
1384     rationale: 
1385       return code means success in finding work; we enter this function
1386       if there is no local work, thus have to send a fish which takes
1387       time until it arrives with work; in the meantime we should process
1388       messages in the main loop;
1389  */
1390 }
1391 #endif // PARALLEL_HASKELL
1392
1393 /* ----------------------------------------------------------------------------
1394  * PAR/GRAN: Report stats & debugging info(?)
1395  * ------------------------------------------------------------------------- */
1396
1397 #if defined(PAR) || defined(GRAN)
1398 static void
1399 scheduleGranParReport(void)
1400 {
1401   ASSERT(run_queue_hd != END_TSO_QUEUE);
1402
1403   /* Take a thread from the run queue, if we have work */
1404   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1405
1406     /* If this TSO has got its outport closed in the meantime, 
1407      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1408      * It has to be marked as TH_DEAD for this purpose.
1409      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1410
1411 JB: TODO: investigate wether state change field could be nuked
1412      entirely and replaced by the normal tso state (whatnext
1413      field). All we want to do is to kill tsos from outside.
1414      */
1415
1416     /* ToDo: write something to the log-file
1417     if (RTSflags.ParFlags.granSimStats && !sameThread)
1418         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1419
1420     CurrentTSO = t;
1421     */
1422     /* the spark pool for the current PE */
1423     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1424
1425     IF_DEBUG(scheduler, 
1426              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1427                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1428
1429     IF_PAR_DEBUG(fish,
1430              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1431                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1432
1433     if (RtsFlags.ParFlags.ParStats.Full && 
1434         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1435         (emitSchedule || // forced emit
1436          (t && LastTSO && t->id != LastTSO->id))) {
1437       /* 
1438          we are running a different TSO, so write a schedule event to log file
1439          NB: If we use fair scheduling we also have to write  a deschedule 
1440              event for LastTSO; with unfair scheduling we know that the
1441              previous tso has blocked whenever we switch to another tso, so
1442              we don't need it in GUM for now
1443       */
1444       IF_PAR_DEBUG(fish, // schedule,
1445                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1446
1447       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1448                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1449       emitSchedule = rtsFalse;
1450     }
1451 }     
1452 #endif
1453
1454 /* ----------------------------------------------------------------------------
1455  * After running a thread...
1456  * ------------------------------------------------------------------------- */
1457
1458 static void
1459 schedulePostRunThread(void)
1460 {
1461 #if defined(PAR)
1462     /* HACK 675: if the last thread didn't yield, make sure to print a 
1463        SCHEDULE event to the log file when StgRunning the next thread, even
1464        if it is the same one as before */
1465     LastTSO = t; 
1466     TimeOfLastYield = CURRENT_TIME;
1467 #endif
1468
1469   /* some statistics gathering in the parallel case */
1470
1471 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1472   switch (ret) {
1473     case HeapOverflow:
1474 # if defined(GRAN)
1475       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1476       globalGranStats.tot_heapover++;
1477 # elif defined(PAR)
1478       globalParStats.tot_heapover++;
1479 # endif
1480       break;
1481
1482      case StackOverflow:
1483 # if defined(GRAN)
1484       IF_DEBUG(gran, 
1485                DumpGranEvent(GR_DESCHEDULE, t));
1486       globalGranStats.tot_stackover++;
1487 # elif defined(PAR)
1488       // IF_DEBUG(par, 
1489       // DumpGranEvent(GR_DESCHEDULE, t);
1490       globalParStats.tot_stackover++;
1491 # endif
1492       break;
1493
1494     case ThreadYielding:
1495 # if defined(GRAN)
1496       IF_DEBUG(gran, 
1497                DumpGranEvent(GR_DESCHEDULE, t));
1498       globalGranStats.tot_yields++;
1499 # elif defined(PAR)
1500       // IF_DEBUG(par, 
1501       // DumpGranEvent(GR_DESCHEDULE, t);
1502       globalParStats.tot_yields++;
1503 # endif
1504       break; 
1505
1506     case ThreadBlocked:
1507 # if defined(GRAN)
1508         debugTrace(DEBUG_sched, 
1509                    "--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1510                    t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1511                    (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1512                if (t->block_info.closure!=(StgClosure*)NULL)
1513                  print_bq(t->block_info.closure);
1514                debugBelch("\n"));
1515
1516       // ??? needed; should emit block before
1517       IF_DEBUG(gran, 
1518                DumpGranEvent(GR_DESCHEDULE, t)); 
1519       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1520       /*
1521         ngoq Dogh!
1522       ASSERT(procStatus[CurrentProc]==Busy || 
1523               ((procStatus[CurrentProc]==Fetching) && 
1524               (t->block_info.closure!=(StgClosure*)NULL)));
1525       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1526           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1527             procStatus[CurrentProc]==Fetching)) 
1528         procStatus[CurrentProc] = Idle;
1529       */
1530 # elif defined(PAR)
1531 //++PAR++  blockThread() writes the event (change?)
1532 # endif
1533     break;
1534
1535   case ThreadFinished:
1536     break;
1537
1538   default:
1539     barf("parGlobalStats: unknown return code");
1540     break;
1541     }
1542 #endif
1543 }
1544
1545 /* -----------------------------------------------------------------------------
1546  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1547  * -------------------------------------------------------------------------- */
1548
1549 static rtsBool
1550 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1551 {
1552     // did the task ask for a large block?
1553     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1554         // if so, get one and push it on the front of the nursery.
1555         bdescr *bd;
1556         lnat blocks;
1557         
1558         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1559         
1560         debugTrace(DEBUG_sched,
1561                    "--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1562                    (long)t->id, whatNext_strs[t->what_next], blocks);
1563     
1564         // don't do this if the nursery is (nearly) full, we'll GC first.
1565         if (cap->r.rCurrentNursery->link != NULL ||
1566             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1567                                                // if the nursery has only one block.
1568             
1569             ACQUIRE_SM_LOCK
1570             bd = allocGroup( blocks );
1571             RELEASE_SM_LOCK
1572             cap->r.rNursery->n_blocks += blocks;
1573             
1574             // link the new group into the list
1575             bd->link = cap->r.rCurrentNursery;
1576             bd->u.back = cap->r.rCurrentNursery->u.back;
1577             if (cap->r.rCurrentNursery->u.back != NULL) {
1578                 cap->r.rCurrentNursery->u.back->link = bd;
1579             } else {
1580 #if !defined(THREADED_RTS)
1581                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1582                        g0s0 == cap->r.rNursery);
1583 #endif
1584                 cap->r.rNursery->blocks = bd;
1585             }             
1586             cap->r.rCurrentNursery->u.back = bd;
1587             
1588             // initialise it as a nursery block.  We initialise the
1589             // step, gen_no, and flags field of *every* sub-block in
1590             // this large block, because this is easier than making
1591             // sure that we always find the block head of a large
1592             // block whenever we call Bdescr() (eg. evacuate() and
1593             // isAlive() in the GC would both have to do this, at
1594             // least).
1595             { 
1596                 bdescr *x;
1597                 for (x = bd; x < bd + blocks; x++) {
1598                     x->step = cap->r.rNursery;
1599                     x->gen_no = 0;
1600                     x->flags = 0;
1601                 }
1602             }
1603             
1604             // This assert can be a killer if the app is doing lots
1605             // of large block allocations.
1606             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1607             
1608             // now update the nursery to point to the new block
1609             cap->r.rCurrentNursery = bd;
1610             
1611             // we might be unlucky and have another thread get on the
1612             // run queue before us and steal the large block, but in that
1613             // case the thread will just end up requesting another large
1614             // block.
1615             pushOnRunQueue(cap,t);
1616             return rtsFalse;  /* not actually GC'ing */
1617         }
1618     }
1619     
1620     debugTrace(DEBUG_sched,
1621                "--<< thread %ld (%s) stopped: HeapOverflow\n", 
1622                (long)t->id, whatNext_strs[t->what_next]);
1623
1624 #if defined(GRAN)
1625     ASSERT(!is_on_queue(t,CurrentProc));
1626 #elif defined(PARALLEL_HASKELL)
1627     /* Currently we emit a DESCHEDULE event before GC in GUM.
1628        ToDo: either add separate event to distinguish SYSTEM time from rest
1629        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1630     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1631         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1632                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1633         emitSchedule = rtsTrue;
1634     }
1635 #endif
1636       
1637     if (context_switch) {
1638         // Sometimes we miss a context switch, e.g. when calling
1639         // primitives in a tight loop, MAYBE_GC() doesn't check the
1640         // context switch flag, and we end up waiting for a GC.
1641         // See #1984, and concurrent/should_run/1984
1642         context_switch = 0;
1643         addToRunQueue(cap,t);
1644     } else {
1645         pushOnRunQueue(cap,t);
1646     }
1647     return rtsTrue;
1648     /* actual GC is done at the end of the while loop in schedule() */
1649 }
1650
1651 /* -----------------------------------------------------------------------------
1652  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1653  * -------------------------------------------------------------------------- */
1654
1655 static void
1656 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1657 {
1658     debugTrace (DEBUG_sched,
1659                 "--<< thread %ld (%s) stopped, StackOverflow", 
1660                 (long)t->id, whatNext_strs[t->what_next]);
1661
1662     /* just adjust the stack for this thread, then pop it back
1663      * on the run queue.
1664      */
1665     { 
1666         /* enlarge the stack */
1667         StgTSO *new_t = threadStackOverflow(cap, t);
1668         
1669         /* The TSO attached to this Task may have moved, so update the
1670          * pointer to it.
1671          */
1672         if (task->tso == t) {
1673             task->tso = new_t;
1674         }
1675         pushOnRunQueue(cap,new_t);
1676     }
1677 }
1678
1679 /* -----------------------------------------------------------------------------
1680  * Handle a thread that returned to the scheduler with ThreadYielding
1681  * -------------------------------------------------------------------------- */
1682
1683 static rtsBool
1684 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1685 {
1686     // Reset the context switch flag.  We don't do this just before
1687     // running the thread, because that would mean we would lose ticks
1688     // during GC, which can lead to unfair scheduling (a thread hogs
1689     // the CPU because the tick always arrives during GC).  This way
1690     // penalises threads that do a lot of allocation, but that seems
1691     // better than the alternative.
1692     context_switch = 0;
1693     
1694     /* put the thread back on the run queue.  Then, if we're ready to
1695      * GC, check whether this is the last task to stop.  If so, wake
1696      * up the GC thread.  getThread will block during a GC until the
1697      * GC is finished.
1698      */
1699 #ifdef DEBUG
1700     if (t->what_next != prev_what_next) {
1701         debugTrace(DEBUG_sched,
1702                    "--<< thread %ld (%s) stopped to switch evaluators", 
1703                    (long)t->id, whatNext_strs[t->what_next]);
1704     } else {
1705         debugTrace(DEBUG_sched,
1706                    "--<< thread %ld (%s) stopped, yielding",
1707                    (long)t->id, whatNext_strs[t->what_next]);
1708     }
1709 #endif
1710     
1711     IF_DEBUG(sanity,
1712              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1713              checkTSO(t));
1714     ASSERT(t->link == END_TSO_QUEUE);
1715     
1716     // Shortcut if we're just switching evaluators: don't bother
1717     // doing stack squeezing (which can be expensive), just run the
1718     // thread.
1719     if (t->what_next != prev_what_next) {
1720         return rtsTrue;
1721     }
1722     
1723 #if defined(GRAN)
1724     ASSERT(!is_on_queue(t,CurrentProc));
1725       
1726     IF_DEBUG(sanity,
1727              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1728              checkThreadQsSanity(rtsTrue));
1729
1730 #endif
1731
1732     addToRunQueue(cap,t);
1733
1734 #if defined(GRAN)
1735     /* add a ContinueThread event to actually process the thread */
1736     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1737               ContinueThread,
1738               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1739     IF_GRAN_DEBUG(bq, 
1740                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1741                   G_EVENTQ(0);
1742                   G_CURR_THREADQ(0));
1743 #endif
1744     return rtsFalse;
1745 }
1746
1747 /* -----------------------------------------------------------------------------
1748  * Handle a thread that returned to the scheduler with ThreadBlocked
1749  * -------------------------------------------------------------------------- */
1750
1751 static void
1752 scheduleHandleThreadBlocked( StgTSO *t
1753 #if !defined(GRAN) && !defined(DEBUG)
1754     STG_UNUSED
1755 #endif
1756     )
1757 {
1758 #if defined(GRAN)
1759     IF_DEBUG(scheduler,
1760              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1761                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1762              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1763     
1764     // ??? needed; should emit block before
1765     IF_DEBUG(gran, 
1766              DumpGranEvent(GR_DESCHEDULE, t)); 
1767     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1768     /*
1769       ngoq Dogh!
1770       ASSERT(procStatus[CurrentProc]==Busy || 
1771       ((procStatus[CurrentProc]==Fetching) && 
1772       (t->block_info.closure!=(StgClosure*)NULL)));
1773       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1774       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1775       procStatus[CurrentProc]==Fetching)) 
1776       procStatus[CurrentProc] = Idle;
1777     */
1778 #elif defined(PAR)
1779     IF_DEBUG(scheduler,
1780              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1781                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1782     IF_PAR_DEBUG(bq,
1783                  
1784                  if (t->block_info.closure!=(StgClosure*)NULL) 
1785                  print_bq(t->block_info.closure));
1786     
1787     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1788     blockThread(t);
1789     
1790     /* whatever we schedule next, we must log that schedule */
1791     emitSchedule = rtsTrue;
1792     
1793 #else /* !GRAN */
1794
1795       // We don't need to do anything.  The thread is blocked, and it
1796       // has tidied up its stack and placed itself on whatever queue
1797       // it needs to be on.
1798
1799     // ASSERT(t->why_blocked != NotBlocked);
1800     // Not true: for example,
1801     //    - in THREADED_RTS, the thread may already have been woken
1802     //      up by another Capability.  This actually happens: try
1803     //      conc023 +RTS -N2.
1804     //    - the thread may have woken itself up already, because
1805     //      threadPaused() might have raised a blocked throwTo
1806     //      exception, see maybePerformBlockedException().
1807
1808 #ifdef DEBUG
1809     if (traceClass(DEBUG_sched)) {
1810         debugTraceBegin("--<< thread %lu (%s) stopped: ", 
1811                         (unsigned long)t->id, whatNext_strs[t->what_next]);
1812         printThreadBlockage(t);
1813         debugTraceEnd();
1814     }
1815 #endif
1816     
1817     /* Only for dumping event to log file 
1818        ToDo: do I need this in GranSim, too?
1819        blockThread(t);
1820     */
1821 #endif
1822 }
1823
1824 /* -----------------------------------------------------------------------------
1825  * Handle a thread that returned to the scheduler with ThreadFinished
1826  * -------------------------------------------------------------------------- */
1827
1828 static rtsBool
1829 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1830 {
1831     /* Need to check whether this was a main thread, and if so,
1832      * return with the return value.
1833      *
1834      * We also end up here if the thread kills itself with an
1835      * uncaught exception, see Exception.cmm.
1836      */
1837     debugTrace(DEBUG_sched, "--++ thread %lu (%s) finished", 
1838                (unsigned long)t->id, whatNext_strs[t->what_next]);
1839
1840 #if defined(GRAN)
1841       endThread(t, CurrentProc); // clean-up the thread
1842 #elif defined(PARALLEL_HASKELL)
1843       /* For now all are advisory -- HWL */
1844       //if(t->priority==AdvisoryPriority) ??
1845       advisory_thread_count--; // JB: Caution with this counter, buggy!
1846       
1847 # if defined(DIST)
1848       if(t->dist.priority==RevalPriority)
1849         FinishReval(t);
1850 # endif
1851     
1852 # if defined(EDENOLD)
1853       // the thread could still have an outport... (BUG)
1854       if (t->eden.outport != -1) {
1855       // delete the outport for the tso which has finished...
1856         IF_PAR_DEBUG(eden_ports,
1857                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1858                               t->eden.outport, t->id));
1859         deleteOPT(t);
1860       }
1861       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1862       if (t->eden.epid != -1) {
1863         IF_PAR_DEBUG(eden_ports,
1864                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1865                            t->id, t->eden.epid));
1866         removeTSOfromProcess(t);
1867       }
1868 # endif 
1869
1870 # if defined(PAR)
1871       if (RtsFlags.ParFlags.ParStats.Full &&
1872           !RtsFlags.ParFlags.ParStats.Suppressed) 
1873         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1874
1875       //  t->par only contains statistics: left out for now...
1876       IF_PAR_DEBUG(fish,
1877                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1878                               t->id,t,t->par.sparkname));
1879 # endif
1880 #endif // PARALLEL_HASKELL
1881
1882       //
1883       // Check whether the thread that just completed was a bound
1884       // thread, and if so return with the result.  
1885       //
1886       // There is an assumption here that all thread completion goes
1887       // through this point; we need to make sure that if a thread
1888       // ends up in the ThreadKilled state, that it stays on the run
1889       // queue so it can be dealt with here.
1890       //
1891
1892       if (t->bound) {
1893
1894           if (t->bound != task) {
1895 #if !defined(THREADED_RTS)
1896               // Must be a bound thread that is not the topmost one.  Leave
1897               // it on the run queue until the stack has unwound to the
1898               // point where we can deal with this.  Leaving it on the run
1899               // queue also ensures that the garbage collector knows about
1900               // this thread and its return value (it gets dropped from the
1901               // all_threads list so there's no other way to find it).
1902               appendToRunQueue(cap,t);
1903               return rtsFalse;
1904 #else
1905               // this cannot happen in the threaded RTS, because a
1906               // bound thread can only be run by the appropriate Task.
1907               barf("finished bound thread that isn't mine");
1908 #endif
1909           }
1910
1911           ASSERT(task->tso == t);
1912
1913           if (t->what_next == ThreadComplete) {
1914               if (task->ret) {
1915                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1916                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1917               }
1918               task->stat = Success;
1919           } else {
1920               if (task->ret) {
1921                   *(task->ret) = NULL;
1922               }
1923               if (sched_state >= SCHED_INTERRUPTING) {
1924                   task->stat = Interrupted;
1925               } else {
1926                   task->stat = Killed;
1927               }
1928           }
1929 #ifdef DEBUG
1930           removeThreadLabel((StgWord)task->tso->id);
1931 #endif
1932           return rtsTrue; // tells schedule() to return
1933       }
1934
1935       return rtsFalse;
1936 }
1937
1938 /* -----------------------------------------------------------------------------
1939  * Perform a heap census
1940  * -------------------------------------------------------------------------- */
1941
1942 static rtsBool
1943 scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1944 {
1945     // When we have +RTS -i0 and we're heap profiling, do a census at
1946     // every GC.  This lets us get repeatable runs for debugging.
1947     if (performHeapProfile ||
1948         (RtsFlags.ProfFlags.profileInterval==0 &&
1949          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1950         return rtsTrue;
1951     } else {
1952         return rtsFalse;
1953     }
1954 }
1955
1956 /* -----------------------------------------------------------------------------
1957  * Perform a garbage collection if necessary
1958  * -------------------------------------------------------------------------- */
1959
1960 static Capability *
1961 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
1962 {
1963     StgTSO *t;
1964     rtsBool heap_census;
1965 #ifdef THREADED_RTS
1966     static volatile StgWord waiting_for_gc;
1967     rtsBool was_waiting;
1968     nat i;
1969 #endif
1970
1971 #ifdef THREADED_RTS
1972     // In order to GC, there must be no threads running Haskell code.
1973     // Therefore, the GC thread needs to hold *all* the capabilities,
1974     // and release them after the GC has completed.  
1975     //
1976     // This seems to be the simplest way: previous attempts involved
1977     // making all the threads with capabilities give up their
1978     // capabilities and sleep except for the *last* one, which
1979     // actually did the GC.  But it's quite hard to arrange for all
1980     // the other tasks to sleep and stay asleep.
1981     //
1982         
1983     was_waiting = cas(&waiting_for_gc, 0, 1);
1984     if (was_waiting) {
1985         do {
1986             debugTrace(DEBUG_sched, "someone else is trying to GC...");
1987             if (cap) yieldCapability(&cap,task);
1988         } while (waiting_for_gc);
1989         return cap;  // NOTE: task->cap might have changed here
1990     }
1991
1992     for (i=0; i < n_capabilities; i++) {
1993         debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
1994         if (cap != &capabilities[i]) {
1995             Capability *pcap = &capabilities[i];
1996             // we better hope this task doesn't get migrated to
1997             // another Capability while we're waiting for this one.
1998             // It won't, because load balancing happens while we have
1999             // all the Capabilities, but even so it's a slightly
2000             // unsavoury invariant.
2001             task->cap = pcap;
2002             context_switch = 1;
2003             waitForReturnCapability(&pcap, task);
2004             if (pcap != &capabilities[i]) {
2005                 barf("scheduleDoGC: got the wrong capability");
2006             }
2007         }
2008     }
2009
2010     waiting_for_gc = rtsFalse;
2011 #endif
2012
2013     /* Kick any transactions which are invalid back to their
2014      * atomically frames.  When next scheduled they will try to
2015      * commit, this commit will fail and they will retry.
2016      */
2017     { 
2018         StgTSO *next;
2019
2020         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2021             if (t->what_next == ThreadRelocated) {
2022                 next = t->link;
2023             } else {
2024                 next = t->global_link;
2025                 
2026                 // This is a good place to check for blocked
2027                 // exceptions.  It might be the case that a thread is
2028                 // blocked on delivering an exception to a thread that
2029                 // is also blocked - we try to ensure that this
2030                 // doesn't happen in throwTo(), but it's too hard (or
2031                 // impossible) to close all the race holes, so we
2032                 // accept that some might get through and deal with
2033                 // them here.  A GC will always happen at some point,
2034                 // even if the system is otherwise deadlocked.
2035                 maybePerformBlockedException (&capabilities[0], t);
2036
2037                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
2038                     if (!stmValidateNestOfTransactions (t -> trec)) {
2039                         debugTrace(DEBUG_sched | DEBUG_stm,
2040                                    "trec %p found wasting its time", t);
2041                         
2042                         // strip the stack back to the
2043                         // ATOMICALLY_FRAME, aborting the (nested)
2044                         // transaction, and saving the stack of any
2045                         // partially-evaluated thunks on the heap.
2046                         throwToSingleThreaded_(&capabilities[0], t, 
2047                                                NULL, rtsTrue, NULL);
2048                         
2049 #ifdef REG_R1
2050                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2051 #endif
2052                     }
2053                 }
2054             }
2055         }
2056     }
2057     
2058     // so this happens periodically:
2059     if (cap) scheduleCheckBlackHoles(cap);
2060     
2061     IF_DEBUG(scheduler, printAllThreads());
2062
2063     /*
2064      * We now have all the capabilities; if we're in an interrupting
2065      * state, then we should take the opportunity to delete all the
2066      * threads in the system.
2067      */
2068     if (sched_state >= SCHED_INTERRUPTING) {
2069         deleteAllThreads(&capabilities[0]);
2070         sched_state = SCHED_SHUTTING_DOWN;
2071     }
2072     
2073     heap_census = scheduleNeedHeapProfile(rtsTrue);
2074
2075     /* everybody back, start the GC.
2076      * Could do it in this thread, or signal a condition var
2077      * to do it in another thread.  Either way, we need to
2078      * broadcast on gc_pending_cond afterward.
2079      */
2080 #if defined(THREADED_RTS)
2081     debugTrace(DEBUG_sched, "doing GC");
2082 #endif
2083     GarbageCollect(force_major || heap_census);
2084     
2085     if (heap_census) {
2086         debugTrace(DEBUG_sched, "performing heap census");
2087         heapCensus();
2088         performHeapProfile = rtsFalse;
2089     }
2090
2091 #if defined(THREADED_RTS)
2092     // release our stash of capabilities.
2093     for (i = 0; i < n_capabilities; i++) {
2094         if (cap != &capabilities[i]) {
2095             task->cap = &capabilities[i];
2096             releaseCapability(&capabilities[i]);
2097         }
2098     }
2099     if (cap) {
2100         task->cap = cap;
2101     } else {
2102         task->cap = NULL;
2103     }
2104 #endif
2105
2106 #if defined(GRAN)
2107     /* add a ContinueThread event to continue execution of current thread */
2108     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2109               ContinueThread,
2110               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2111     IF_GRAN_DEBUG(bq, 
2112                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2113                   G_EVENTQ(0);
2114                   G_CURR_THREADQ(0));
2115 #endif /* GRAN */
2116
2117     return cap;
2118 }
2119
2120 /* ---------------------------------------------------------------------------
2121  * Singleton fork(). Do not copy any running threads.
2122  * ------------------------------------------------------------------------- */
2123
2124 pid_t
2125 forkProcess(HsStablePtr *entry
2126 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2127             STG_UNUSED
2128 #endif
2129            )
2130 {
2131 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2132     Task *task;
2133     pid_t pid;
2134     StgTSO* t,*next;
2135     Capability *cap;
2136     
2137 #if defined(THREADED_RTS)
2138     if (RtsFlags.ParFlags.nNodes > 1) {
2139         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2140         stg_exit(EXIT_FAILURE);
2141     }
2142 #endif
2143
2144     debugTrace(DEBUG_sched, "forking!");
2145     
2146     // ToDo: for SMP, we should probably acquire *all* the capabilities
2147     cap = rts_lock();
2148     
2149     // no funny business: hold locks while we fork, otherwise if some
2150     // other thread is holding a lock when the fork happens, the data
2151     // structure protected by the lock will forever be in an
2152     // inconsistent state in the child.  See also #1391.
2153     ACQUIRE_LOCK(&sched_mutex);
2154     ACQUIRE_LOCK(&cap->lock);
2155     ACQUIRE_LOCK(&cap->running_task->lock);
2156
2157     pid = fork();
2158     
2159     if (pid) { // parent
2160         
2161         RELEASE_LOCK(&sched_mutex);
2162         RELEASE_LOCK(&cap->lock);
2163         RELEASE_LOCK(&cap->running_task->lock);
2164
2165         // just return the pid
2166         rts_unlock(cap);
2167         return pid;
2168         
2169     } else { // child
2170         
2171 #if defined(THREADED_RTS)
2172         initMutex(&sched_mutex);
2173         initMutex(&cap->lock);
2174         initMutex(&cap->running_task->lock);
2175 #endif
2176
2177         // Now, all OS threads except the thread that forked are
2178         // stopped.  We need to stop all Haskell threads, including
2179         // those involved in foreign calls.  Also we need to delete
2180         // all Tasks, because they correspond to OS threads that are
2181         // now gone.
2182
2183         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2184             if (t->what_next == ThreadRelocated) {
2185                 next = t->link;
2186             } else {
2187                 next = t->global_link;
2188                 // don't allow threads to catch the ThreadKilled
2189                 // exception, but we do want to raiseAsync() because these
2190                 // threads may be evaluating thunks that we need later.
2191                 deleteThread_(cap,t);
2192             }
2193         }
2194         
2195         // Empty the run queue.  It seems tempting to let all the
2196         // killed threads stay on the run queue as zombies to be
2197         // cleaned up later, but some of them correspond to bound
2198         // threads for which the corresponding Task does not exist.
2199         cap->run_queue_hd = END_TSO_QUEUE;
2200         cap->run_queue_tl = END_TSO_QUEUE;
2201
2202         // Any suspended C-calling Tasks are no more, their OS threads
2203         // don't exist now:
2204         cap->suspended_ccalling_tasks = NULL;
2205
2206         // Empty the all_threads list.  Otherwise, the garbage
2207         // collector may attempt to resurrect some of these threads.
2208         all_threads = END_TSO_QUEUE;
2209
2210         // Wipe the task list, except the current Task.
2211         ACQUIRE_LOCK(&sched_mutex);
2212         for (task = all_tasks; task != NULL; task=task->all_link) {
2213             if (task != cap->running_task) {
2214 #if defined(THREADED_RTS)
2215                 initMutex(&task->lock); // see #1391
2216 #endif
2217                 discardTask(task);
2218             }
2219         }
2220         RELEASE_LOCK(&sched_mutex);
2221
2222 #if defined(THREADED_RTS)
2223         // Wipe our spare workers list, they no longer exist.  New
2224         // workers will be created if necessary.
2225         cap->spare_workers = NULL;
2226         cap->returning_tasks_hd = NULL;
2227         cap->returning_tasks_tl = NULL;
2228 #endif
2229
2230         // On Unix, all timers are reset in the child, so we need to start
2231         // the timer again.
2232         initTimer();
2233         startTimer();
2234
2235         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2236         rts_checkSchedStatus("forkProcess",cap);
2237         
2238         rts_unlock(cap);
2239         hs_exit();                      // clean up and exit
2240         stg_exit(EXIT_SUCCESS);
2241     }
2242 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2243     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2244     return -1;
2245 #endif
2246 }
2247
2248 /* ---------------------------------------------------------------------------
2249  * Delete all the threads in the system
2250  * ------------------------------------------------------------------------- */
2251    
2252 static void
2253 deleteAllThreads ( Capability *cap )
2254 {
2255     // NOTE: only safe to call if we own all capabilities.
2256
2257     StgTSO* t, *next;
2258     debugTrace(DEBUG_sched,"deleting all threads");
2259     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2260         if (t->what_next == ThreadRelocated) {
2261             next = t->link;
2262         } else {
2263             next = t->global_link;
2264             deleteThread(cap,t);
2265         }
2266     }      
2267
2268     // The run queue now contains a bunch of ThreadKilled threads.  We
2269     // must not throw these away: the main thread(s) will be in there
2270     // somewhere, and the main scheduler loop has to deal with it.
2271     // Also, the run queue is the only thing keeping these threads from
2272     // being GC'd, and we don't want the "main thread has been GC'd" panic.
2273
2274 #if !defined(THREADED_RTS)
2275     ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2276     ASSERT(sleeping_queue == END_TSO_QUEUE);
2277 #endif
2278 }
2279
2280 /* -----------------------------------------------------------------------------
2281    Managing the suspended_ccalling_tasks list.
2282    Locks required: sched_mutex
2283    -------------------------------------------------------------------------- */
2284
2285 STATIC_INLINE void
2286 suspendTask (Capability *cap, Task *task)
2287 {
2288     ASSERT(task->next == NULL && task->prev == NULL);
2289     task->next = cap->suspended_ccalling_tasks;
2290     task->prev = NULL;
2291     if (cap->suspended_ccalling_tasks) {
2292         cap->suspended_ccalling_tasks->prev = task;
2293     }
2294     cap->suspended_ccalling_tasks = task;
2295 }
2296
2297 STATIC_INLINE void
2298 recoverSuspendedTask (Capability *cap, Task *task)
2299 {
2300     if (task->prev) {
2301         task->prev->next = task->next;
2302     } else {
2303         ASSERT(cap->suspended_ccalling_tasks == task);
2304         cap->suspended_ccalling_tasks = task->next;
2305     }
2306     if (task->next) {
2307         task->next->prev = task->prev;
2308     }
2309     task->next = task->prev = NULL;
2310 }
2311
2312 /* ---------------------------------------------------------------------------
2313  * Suspending & resuming Haskell threads.
2314  * 
2315  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2316  * its capability before calling the C function.  This allows another
2317  * task to pick up the capability and carry on running Haskell
2318  * threads.  It also means that if the C call blocks, it won't lock
2319  * the whole system.
2320  *
2321  * The Haskell thread making the C call is put to sleep for the
2322  * duration of the call, on the susepended_ccalling_threads queue.  We
2323  * give out a token to the task, which it can use to resume the thread
2324  * on return from the C function.
2325  * ------------------------------------------------------------------------- */
2326    
2327 void *
2328 suspendThread (StgRegTable *reg)
2329 {
2330   Capability *cap;
2331   int saved_errno;
2332   StgTSO *tso;
2333   Task *task;
2334 #if mingw32_HOST_OS
2335   StgWord32 saved_winerror;
2336 #endif
2337
2338   saved_errno = errno;
2339 #if mingw32_HOST_OS
2340   saved_winerror = GetLastError();
2341 #endif
2342
2343   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2344    */
2345   cap = regTableToCapability(reg);
2346
2347   task = cap->running_task;
2348   tso = cap->r.rCurrentTSO;
2349
2350   debugTrace(DEBUG_sched, 
2351              "thread %lu did a safe foreign call", 
2352              (unsigned long)cap->r.rCurrentTSO->id);
2353
2354   // XXX this might not be necessary --SDM
2355   tso->what_next = ThreadRunGHC;
2356
2357   threadPaused(cap,tso);
2358
2359   if ((tso->flags & TSO_BLOCKEX) == 0)  {
2360       tso->why_blocked = BlockedOnCCall;
2361       tso->flags |= TSO_BLOCKEX;
2362       tso->flags &= ~TSO_INTERRUPTIBLE;
2363   } else {
2364       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2365   }
2366
2367   // Hand back capability
2368   task->suspended_tso = tso;
2369
2370   ACQUIRE_LOCK(&cap->lock);
2371
2372   suspendTask(cap,task);
2373   cap->in_haskell = rtsFalse;
2374   releaseCapability_(cap);
2375   
2376   RELEASE_LOCK(&cap->lock);
2377
2378 #if defined(THREADED_RTS)
2379   /* Preparing to leave the RTS, so ensure there's a native thread/task
2380      waiting to take over.
2381   */
2382   debugTrace(DEBUG_sched, "thread %lu: leaving RTS", (unsigned long)tso->id);
2383 #endif
2384
2385   errno = saved_errno;
2386 #if mingw32_HOST_OS
2387   SetLastError(saved_winerror);
2388 #endif
2389   return task;
2390 }
2391
2392 StgRegTable *
2393 resumeThread (void *task_)
2394 {
2395     StgTSO *tso;
2396     Capability *cap;
2397     Task *task = task_;
2398     int saved_errno;
2399 #if mingw32_HOST_OS
2400     StgWord32 saved_winerror;
2401 #endif
2402
2403     saved_errno = errno;
2404 #if mingw32_HOST_OS
2405     saved_winerror = GetLastError();
2406 #endif
2407
2408     cap = task->cap;
2409     // Wait for permission to re-enter the RTS with the result.
2410     waitForReturnCapability(&cap,task);
2411     // we might be on a different capability now... but if so, our
2412     // entry on the suspended_ccalling_tasks list will also have been
2413     // migrated.
2414
2415     // Remove the thread from the suspended list
2416     recoverSuspendedTask(cap,task);
2417
2418     tso = task->suspended_tso;
2419     task->suspended_tso = NULL;
2420     tso->link = END_TSO_QUEUE;
2421     debugTrace(DEBUG_sched, "thread %lu: re-entering RTS", (unsigned long)tso->id);
2422     
2423     if (tso->why_blocked == BlockedOnCCall) {
2424         awakenBlockedExceptionQueue(cap,tso);
2425         tso->flags &= ~(TSO_BLOCKEX | TSO_INTERRUPTIBLE);
2426     }
2427     
2428     /* Reset blocking status */
2429     tso->why_blocked  = NotBlocked;
2430     
2431     cap->r.rCurrentTSO = tso;
2432     cap->in_haskell = rtsTrue;
2433     errno = saved_errno;
2434 #if mingw32_HOST_OS
2435     SetLastError(saved_winerror);
2436 #endif
2437
2438     /* We might have GC'd, mark the TSO dirty again */
2439     dirtyTSO(tso);
2440
2441     IF_DEBUG(sanity, checkTSO(tso));
2442
2443     return &cap->r;
2444 }
2445
2446 /* ---------------------------------------------------------------------------
2447  * scheduleThread()
2448  *
2449  * scheduleThread puts a thread on the end  of the runnable queue.
2450  * This will usually be done immediately after a thread is created.
2451  * The caller of scheduleThread must create the thread using e.g.
2452  * createThread and push an appropriate closure
2453  * on this thread's stack before the scheduler is invoked.
2454  * ------------------------------------------------------------------------ */
2455
2456 void
2457 scheduleThread(Capability *cap, StgTSO *tso)
2458 {
2459     // The thread goes at the *end* of the run-queue, to avoid possible
2460     // starvation of any threads already on the queue.
2461     appendToRunQueue(cap,tso);
2462 }
2463
2464 void
2465 scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
2466 {
2467 #if defined(THREADED_RTS)
2468     tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
2469                               // move this thread from now on.
2470     cpu %= RtsFlags.ParFlags.nNodes;
2471     if (cpu == cap->no) {
2472         appendToRunQueue(cap,tso);
2473     } else {
2474         migrateThreadToCapability_lock(&capabilities[cpu],tso);
2475     }
2476 #else
2477     appendToRunQueue(cap,tso);
2478 #endif
2479 }
2480
2481 Capability *
2482 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2483 {
2484     Task *task;
2485
2486     // We already created/initialised the Task
2487     task = cap->running_task;
2488
2489     // This TSO is now a bound thread; make the Task and TSO
2490     // point to each other.
2491     tso->bound = task;
2492     tso->cap = cap;
2493
2494     task->tso = tso;
2495     task->ret = ret;
2496     task->stat = NoStatus;
2497
2498     appendToRunQueue(cap,tso);
2499
2500     debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)tso->id);
2501
2502 #if defined(GRAN)
2503     /* GranSim specific init */
2504     CurrentTSO = m->tso;                // the TSO to run
2505     procStatus[MainProc] = Busy;        // status of main PE
2506     CurrentProc = MainProc;             // PE to run it on
2507 #endif
2508
2509     cap = schedule(cap,task);
2510
2511     ASSERT(task->stat != NoStatus);
2512     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2513
2514     debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)task->tso->id);
2515     return cap;
2516 }
2517
2518 /* ----------------------------------------------------------------------------
2519  * Starting Tasks
2520  * ------------------------------------------------------------------------- */
2521
2522 #if defined(THREADED_RTS)
2523 void
2524 workerStart(Task *task)
2525 {
2526     Capability *cap;
2527
2528     // See startWorkerTask().
2529     ACQUIRE_LOCK(&task->lock);
2530     cap = task->cap;
2531     RELEASE_LOCK(&task->lock);
2532
2533     // set the thread-local pointer to the Task:
2534     taskEnter(task);
2535
2536     // schedule() runs without a lock.
2537     cap = schedule(cap,task);
2538
2539     // On exit from schedule(), we have a Capability.
2540     releaseCapability(cap);
2541     workerTaskStop(task);
2542 }
2543 #endif
2544
2545 /* ---------------------------------------------------------------------------
2546  * initScheduler()
2547  *
2548  * Initialise the scheduler.  This resets all the queues - if the
2549  * queues contained any threads, they'll be garbage collected at the
2550  * next pass.
2551  *
2552  * ------------------------------------------------------------------------ */
2553
2554 void 
2555 initScheduler(void)
2556 {
2557 #if defined(GRAN)
2558   nat i;
2559   for (i=0; i<=MAX_PROC; i++) {
2560     run_queue_hds[i]      = END_TSO_QUEUE;
2561     run_queue_tls[i]      = END_TSO_QUEUE;
2562     blocked_queue_hds[i]  = END_TSO_QUEUE;
2563     blocked_queue_tls[i]  = END_TSO_QUEUE;
2564     ccalling_threadss[i]  = END_TSO_QUEUE;
2565     blackhole_queue[i]    = END_TSO_QUEUE;
2566     sleeping_queue        = END_TSO_QUEUE;
2567   }
2568 #elif !defined(THREADED_RTS)
2569   blocked_queue_hd  = END_TSO_QUEUE;
2570   blocked_queue_tl  = END_TSO_QUEUE;
2571   sleeping_queue    = END_TSO_QUEUE;
2572 #endif
2573
2574   blackhole_queue   = END_TSO_QUEUE;
2575   all_threads       = END_TSO_QUEUE;
2576
2577   context_switch = 0;
2578   sched_state    = SCHED_RUNNING;
2579   recent_activity = ACTIVITY_YES;
2580
2581 #if defined(THREADED_RTS)
2582   /* Initialise the mutex and condition variables used by
2583    * the scheduler. */
2584   initMutex(&sched_mutex);
2585 #endif
2586   
2587   ACQUIRE_LOCK(&sched_mutex);
2588
2589   /* A capability holds the state a native thread needs in
2590    * order to execute STG code. At least one capability is
2591    * floating around (only THREADED_RTS builds have more than one).
2592    */
2593   initCapabilities();
2594
2595   initTaskManager();
2596
2597 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2598   initSparkPools();
2599 #endif
2600
2601 #if defined(THREADED_RTS)
2602   /*
2603    * Eagerly start one worker to run each Capability, except for
2604    * Capability 0.  The idea is that we're probably going to start a
2605    * bound thread on Capability 0 pretty soon, so we don't want a
2606    * worker task hogging it.
2607    */
2608   { 
2609       nat i;
2610       Capability *cap;
2611       for (i = 1; i < n_capabilities; i++) {
2612           cap = &capabilities[i];
2613           ACQUIRE_LOCK(&cap->lock);
2614           startWorkerTask(cap, workerStart);
2615           RELEASE_LOCK(&cap->lock);
2616       }
2617   }
2618 #endif
2619
2620   trace(TRACE_sched, "start: %d capabilities", n_capabilities);
2621
2622   RELEASE_LOCK(&sched_mutex);
2623 }
2624
2625 void
2626 exitScheduler(
2627     rtsBool wait_foreign
2628 #if !defined(THREADED_RTS)
2629                          __attribute__((unused))
2630 #endif
2631 )
2632                /* see Capability.c, shutdownCapability() */
2633 {
2634     Task *task = NULL;
2635
2636 #if defined(THREADED_RTS)
2637     ACQUIRE_LOCK(&sched_mutex);
2638     task = newBoundTask();
2639     RELEASE_LOCK(&sched_mutex);
2640 #endif
2641
2642     // If we haven't killed all the threads yet, do it now.
2643     if (sched_state < SCHED_SHUTTING_DOWN) {
2644         sched_state = SCHED_INTERRUPTING;
2645         scheduleDoGC(NULL,task,rtsFalse);    
2646     }
2647     sched_state = SCHED_SHUTTING_DOWN;
2648
2649 #if defined(THREADED_RTS)
2650     { 
2651         nat i;
2652         
2653         for (i = 0; i < n_capabilities; i++) {
2654             shutdownCapability(&capabilities[i], task, wait_foreign);
2655         }
2656         boundTaskExiting(task);
2657         stopTaskManager();
2658     }
2659 #else
2660     freeCapability(&MainCapability);
2661 #endif
2662 }
2663
2664 void
2665 freeScheduler( void )
2666 {
2667     freeTaskManager();
2668     if (n_capabilities != 1) {
2669         stgFree(capabilities);
2670     }
2671 #if defined(THREADED_RTS)
2672     closeMutex(&sched_mutex);
2673 #endif
2674 }
2675
2676 /* ---------------------------------------------------------------------------
2677    Where are the roots that we know about?
2678
2679         - all the threads on the runnable queue
2680         - all the threads on the blocked queue
2681         - all the threads on the sleeping queue
2682         - all the thread currently executing a _ccall_GC
2683         - all the "main threads"
2684      
2685    ------------------------------------------------------------------------ */
2686
2687 /* This has to be protected either by the scheduler monitor, or by the
2688         garbage collection monitor (probably the latter).
2689         KH @ 25/10/99
2690 */
2691
2692 void
2693 GetRoots( evac_fn evac )
2694 {
2695     nat i;
2696     Capability *cap;
2697     Task *task;
2698
2699 #if defined(GRAN)
2700     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2701         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2702             evac((StgClosure **)&run_queue_hds[i]);
2703         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2704             evac((StgClosure **)&run_queue_tls[i]);
2705         
2706         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2707             evac((StgClosure **)&blocked_queue_hds[i]);
2708         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2709             evac((StgClosure **)&blocked_queue_tls[i]);
2710         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2711             evac((StgClosure **)&ccalling_threads[i]);
2712     }
2713
2714     markEventQueue();
2715
2716 #else /* !GRAN */
2717
2718     for (i = 0; i < n_capabilities; i++) {
2719         cap = &capabilities[i];
2720         evac((StgClosure **)(void *)&cap->run_queue_hd);
2721         evac((StgClosure **)(void *)&cap->run_queue_tl);
2722 #if defined(THREADED_RTS)
2723         evac((StgClosure **)(void *)&cap->wakeup_queue_hd);
2724         evac((StgClosure **)(void *)&cap->wakeup_queue_tl);
2725 #endif
2726         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2727              task=task->next) {
2728             debugTrace(DEBUG_sched,
2729                        "evac'ing suspended TSO %lu", (unsigned long)task->suspended_tso->id);
2730             evac((StgClosure **)(void *)&task->suspended_tso);
2731         }
2732
2733     }
2734     
2735
2736 #if !defined(THREADED_RTS)
2737     evac((StgClosure **)(void *)&blocked_queue_hd);
2738     evac((StgClosure **)(void *)&blocked_queue_tl);
2739     evac((StgClosure **)(void *)&sleeping_queue);
2740 #endif 
2741 #endif
2742
2743     // evac((StgClosure **)&blackhole_queue);
2744
2745 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2746     markSparkQueue(evac);
2747 #endif
2748     
2749 #if defined(RTS_USER_SIGNALS)
2750     // mark the signal handlers (signals should be already blocked)
2751     if (RtsFlags.MiscFlags.install_signal_handlers) {
2752         markSignalHandlers(evac);
2753     }
2754 #endif
2755 }
2756
2757 /* -----------------------------------------------------------------------------
2758    performGC
2759
2760    This is the interface to the garbage collector from Haskell land.
2761    We provide this so that external C code can allocate and garbage
2762    collect when called from Haskell via _ccall_GC.
2763    -------------------------------------------------------------------------- */
2764
2765 static void
2766 performGC_(rtsBool force_major)
2767 {
2768     Task *task;
2769     // We must grab a new Task here, because the existing Task may be
2770     // associated with a particular Capability, and chained onto the 
2771     // suspended_ccalling_tasks queue.
2772     ACQUIRE_LOCK(&sched_mutex);
2773     task = newBoundTask();
2774     RELEASE_LOCK(&sched_mutex);
2775     scheduleDoGC(NULL,task,force_major);
2776     boundTaskExiting(task);
2777 }
2778
2779 void
2780 performGC(void)
2781 {
2782     performGC_(rtsFalse);
2783 }
2784
2785 void
2786 performMajorGC(void)
2787 {
2788     performGC_(rtsTrue);
2789 }
2790
2791 /* -----------------------------------------------------------------------------
2792    Stack overflow
2793
2794    If the thread has reached its maximum stack size, then raise the
2795    StackOverflow exception in the offending thread.  Otherwise
2796    relocate the TSO into a larger chunk of memory and adjust its stack
2797    size appropriately.
2798    -------------------------------------------------------------------------- */
2799
2800 static StgTSO *
2801 threadStackOverflow(Capability *cap, StgTSO *tso)
2802 {
2803   nat new_stack_size, stack_words;
2804   lnat new_tso_size;
2805   StgPtr new_sp;
2806   StgTSO *dest;
2807
2808   IF_DEBUG(sanity,checkTSO(tso));
2809
2810   // don't allow throwTo() to modify the blocked_exceptions queue
2811   // while we are moving the TSO:
2812   lockClosure((StgClosure *)tso);
2813
2814   if (tso->stack_size >= tso->max_stack_size && !(tso->flags & TSO_BLOCKEX)) {
2815       // NB. never raise a StackOverflow exception if the thread is
2816       // inside Control.Exceptino.block.  It is impractical to protect
2817       // against stack overflow exceptions, since virtually anything
2818       // can raise one (even 'catch'), so this is the only sensible
2819       // thing to do here.  See bug #767.
2820
2821       debugTrace(DEBUG_gc,
2822                  "threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)",
2823                  (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2824       IF_DEBUG(gc,
2825                /* If we're debugging, just print out the top of the stack */
2826                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2827                                                 tso->sp+64)));
2828
2829       // Send this thread the StackOverflow exception
2830       unlockTSO(tso);
2831       throwToSingleThreaded(cap, tso, (StgClosure *)stackOverflow_closure);
2832       return tso;
2833   }
2834
2835   /* Try to double the current stack size.  If that takes us over the
2836    * maximum stack size for this thread, then use the maximum instead.
2837    * Finally round up so the TSO ends up as a whole number of blocks.
2838    */
2839   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2840   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2841                                        TSO_STRUCT_SIZE)/sizeof(W_);
2842   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2843   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2844
2845   debugTrace(DEBUG_sched, 
2846              "increasing stack size from %ld words to %d.",
2847              (long)tso->stack_size, new_stack_size);
2848
2849   dest = (StgTSO *)allocateLocal(cap,new_tso_size);
2850   TICK_ALLOC_TSO(new_stack_size,0);
2851
2852   /* copy the TSO block and the old stack into the new area */
2853   memcpy(dest,tso,TSO_STRUCT_SIZE);
2854   stack_words = tso->stack + tso->stack_size - tso->sp;
2855   new_sp = (P_)dest + new_tso_size - stack_words;
2856   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2857
2858   /* relocate the stack pointers... */
2859   dest->sp         = new_sp;
2860   dest->stack_size = new_stack_size;
2861         
2862   /* Mark the old TSO as relocated.  We have to check for relocated
2863    * TSOs in the garbage collector and any primops that deal with TSOs.
2864    *
2865    * It's important to set the sp value to just beyond the end
2866    * of the stack, so we don't attempt to scavenge any part of the
2867    * dead TSO's stack.
2868    */
2869   tso->what_next = ThreadRelocated;
2870   tso->link = dest;
2871   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2872   tso->why_blocked = NotBlocked;
2873
2874   IF_PAR_DEBUG(verbose,
2875                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2876                      tso->id, tso, tso->stack_size);
2877                /* If we're debugging, just print out the top of the stack */
2878                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2879                                                 tso->sp+64)));
2880   
2881   unlockTSO(dest);
2882   unlockTSO(tso);
2883
2884   IF_DEBUG(sanity,checkTSO(dest));
2885 #if 0
2886   IF_DEBUG(scheduler,printTSO(dest));
2887 #endif
2888
2889   return dest;
2890 }
2891
2892 /* ---------------------------------------------------------------------------
2893    Interrupt execution
2894    - usually called inside a signal handler so it mustn't do anything fancy.   
2895    ------------------------------------------------------------------------ */
2896
2897 void
2898 interruptStgRts(void)
2899 {
2900     sched_state = SCHED_INTERRUPTING;
2901     context_switch = 1;
2902     wakeUpRts();
2903 }
2904
2905 /* -----------------------------------------------------------------------------
2906    Wake up the RTS
2907    
2908    This function causes at least one OS thread to wake up and run the
2909    scheduler loop.  It is invoked when the RTS might be deadlocked, or
2910    an external event has arrived that may need servicing (eg. a
2911    keyboard interrupt).
2912
2913    In the single-threaded RTS we don't do anything here; we only have
2914    one thread anyway, and the event that caused us to want to wake up
2915    will have interrupted any blocking system call in progress anyway.
2916    -------------------------------------------------------------------------- */
2917
2918 void
2919 wakeUpRts(void)
2920 {
2921 #if defined(THREADED_RTS)
2922     // This forces the IO Manager thread to wakeup, which will
2923     // in turn ensure that some OS thread wakes up and runs the
2924     // scheduler loop, which will cause a GC and deadlock check.
2925     ioManagerWakeup();
2926 #endif
2927 }
2928
2929 /* -----------------------------------------------------------------------------
2930  * checkBlackHoles()
2931  *
2932  * Check the blackhole_queue for threads that can be woken up.  We do
2933  * this periodically: before every GC, and whenever the run queue is
2934  * empty.
2935  *
2936  * An elegant solution might be to just wake up all the blocked
2937  * threads with awakenBlockedQueue occasionally: they'll go back to
2938  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
2939  * doesn't give us a way to tell whether we've actually managed to
2940  * wake up any threads, so we would be busy-waiting.
2941  *
2942  * -------------------------------------------------------------------------- */
2943
2944 static rtsBool
2945 checkBlackHoles (Capability *cap)
2946 {
2947     StgTSO **prev, *t;
2948     rtsBool any_woke_up = rtsFalse;
2949     StgHalfWord type;
2950
2951     // blackhole_queue is global:
2952     ASSERT_LOCK_HELD(&sched_mutex);
2953
2954     debugTrace(DEBUG_sched, "checking threads blocked on black holes");
2955
2956     // ASSUMES: sched_mutex
2957     prev = &blackhole_queue;
2958     t = blackhole_queue;
2959     while (t != END_TSO_QUEUE) {
2960         ASSERT(t->why_blocked == BlockedOnBlackHole);
2961         type = get_itbl(UNTAG_CLOSURE(t->block_info.closure))->type;
2962         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
2963             IF_DEBUG(sanity,checkTSO(t));
2964             t = unblockOne(cap, t);
2965             // urk, the threads migrate to the current capability
2966             // here, but we'd like to keep them on the original one.
2967             *prev = t;
2968             any_woke_up = rtsTrue;
2969         } else {
2970             prev = &t->link;
2971             t = t->link;
2972         }
2973     }
2974
2975     return any_woke_up;
2976 }
2977
2978 /* -----------------------------------------------------------------------------
2979    Deleting threads
2980
2981    This is used for interruption (^C) and forking, and corresponds to
2982    raising an exception but without letting the thread catch the
2983    exception.
2984    -------------------------------------------------------------------------- */
2985
2986 static void 
2987 deleteThread (Capability *cap, StgTSO *tso)
2988 {
2989     // NOTE: must only be called on a TSO that we have exclusive
2990     // access to, because we will call throwToSingleThreaded() below.
2991     // The TSO must be on the run queue of the Capability we own, or 
2992     // we must own all Capabilities.
2993
2994     if (tso->why_blocked != BlockedOnCCall &&
2995         tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2996         throwToSingleThreaded(cap,tso,NULL);
2997     }
2998 }
2999
3000 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3001 static void 
3002 deleteThread_(Capability *cap, StgTSO *tso)
3003 { // for forkProcess only:
3004   // like deleteThread(), but we delete threads in foreign calls, too.
3005
3006     if (tso->why_blocked == BlockedOnCCall ||
3007         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
3008         unblockOne(cap,tso);
3009         tso->what_next = ThreadKilled;
3010     } else {
3011         deleteThread(cap,tso);
3012     }
3013 }
3014 #endif
3015
3016 /* -----------------------------------------------------------------------------
3017    raiseExceptionHelper
3018    
3019    This function is called by the raise# primitve, just so that we can
3020    move some of the tricky bits of raising an exception from C-- into
3021    C.  Who knows, it might be a useful re-useable thing here too.
3022    -------------------------------------------------------------------------- */
3023
3024 StgWord
3025 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3026 {
3027     Capability *cap = regTableToCapability(reg);
3028     StgThunk *raise_closure = NULL;
3029     StgPtr p, next;
3030     StgRetInfoTable *info;
3031     //
3032     // This closure represents the expression 'raise# E' where E
3033     // is the exception raise.  It is used to overwrite all the
3034     // thunks which are currently under evaluataion.
3035     //
3036
3037     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
3038     // LDV profiling: stg_raise_info has THUNK as its closure
3039     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3040     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3041     // 1 does not cause any problem unless profiling is performed.
3042     // However, when LDV profiling goes on, we need to linearly scan
3043     // small object pool, where raise_closure is stored, so we should
3044     // use MIN_UPD_SIZE.
3045     //
3046     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3047     //                                 sizeofW(StgClosure)+1);
3048     //
3049
3050     //
3051     // Walk up the stack, looking for the catch frame.  On the way,
3052     // we update any closures pointed to from update frames with the
3053     // raise closure that we just built.
3054     //
3055     p = tso->sp;
3056     while(1) {
3057         info = get_ret_itbl((StgClosure *)p);
3058         next = p + stack_frame_sizeW((StgClosure *)p);
3059         switch (info->i.type) {
3060             
3061         case UPDATE_FRAME:
3062             // Only create raise_closure if we need to.
3063             if (raise_closure == NULL) {
3064                 raise_closure = 
3065                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3066                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3067                 raise_closure->payload[0] = exception;
3068             }
3069             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3070             p = next;
3071             continue;
3072
3073         case ATOMICALLY_FRAME:
3074             debugTrace(DEBUG_stm, "found ATOMICALLY_FRAME at %p", p);
3075             tso->sp = p;
3076             return ATOMICALLY_FRAME;
3077             
3078         case CATCH_FRAME:
3079             tso->sp = p;
3080             return CATCH_FRAME;
3081
3082         case CATCH_STM_FRAME:
3083             debugTrace(DEBUG_stm, "found CATCH_STM_FRAME at %p", p);
3084             tso->sp = p;
3085             return CATCH_STM_FRAME;
3086             
3087         case STOP_FRAME:
3088             tso->sp = p;
3089             return STOP_FRAME;
3090
3091         case CATCH_RETRY_FRAME:
3092         default:
3093             p = next; 
3094             continue;
3095         }
3096     }
3097 }
3098
3099
3100 /* -----------------------------------------------------------------------------
3101    findRetryFrameHelper
3102
3103    This function is called by the retry# primitive.  It traverses the stack
3104    leaving tso->sp referring to the frame which should handle the retry.  
3105
3106    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3107    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3108
3109    We skip CATCH_STM_FRAMEs (aborting and rolling back the nested tx that they
3110    create) because retries are not considered to be exceptions, despite the
3111    similar implementation.
3112
3113    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3114    not be created within memory transactions.
3115    -------------------------------------------------------------------------- */
3116
3117 StgWord
3118 findRetryFrameHelper (StgTSO *tso)
3119 {
3120   StgPtr           p, next;
3121   StgRetInfoTable *info;
3122
3123   p = tso -> sp;
3124   while (1) {
3125     info = get_ret_itbl((StgClosure *)p);
3126     next = p + stack_frame_sizeW((StgClosure *)p);
3127     switch (info->i.type) {
3128       
3129     case ATOMICALLY_FRAME:
3130         debugTrace(DEBUG_stm,
3131                    "found ATOMICALLY_FRAME at %p during retry", p);
3132         tso->sp = p;
3133         return ATOMICALLY_FRAME;
3134       
3135     case CATCH_RETRY_FRAME:
3136         debugTrace(DEBUG_stm,
3137                    "found CATCH_RETRY_FRAME at %p during retrry", p);
3138         tso->sp = p;
3139         return CATCH_RETRY_FRAME;
3140       
3141     case CATCH_STM_FRAME: {
3142         StgTRecHeader *trec = tso -> trec;
3143         StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3144         debugTrace(DEBUG_stm,
3145                    "found CATCH_STM_FRAME at %p during retry", p);
3146         debugTrace(DEBUG_stm, "trec=%p outer=%p", trec, outer);
3147         stmAbortTransaction(tso -> cap, trec);
3148         stmFreeAbortedTRec(tso -> cap, trec);
3149         tso -> trec = outer;
3150         p = next; 
3151         continue;
3152     }
3153       
3154
3155     default:
3156       ASSERT(info->i.type != CATCH_FRAME);
3157       ASSERT(info->i.type != STOP_FRAME);
3158       p = next; 
3159       continue;
3160     }
3161   }
3162 }
3163
3164 /* -----------------------------------------------------------------------------
3165    resurrectThreads is called after garbage collection on the list of
3166    threads found to be garbage.  Each of these threads will be woken
3167    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3168    on an MVar, or NonTermination if the thread was blocked on a Black
3169    Hole.
3170
3171    Locks: assumes we hold *all* the capabilities.
3172    -------------------------------------------------------------------------- */
3173
3174 void
3175 resurrectThreads (StgTSO *threads)
3176 {
3177     StgTSO *tso, *next;
3178     Capability *cap;
3179
3180     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3181         next = tso->global_link;
3182         tso->global_link = all_threads;
3183         all_threads = tso;
3184         debugTrace(DEBUG_sched, "resurrecting thread %lu", (unsigned long)tso->id);
3185         
3186         // Wake up the thread on the Capability it was last on
3187         cap = tso->cap;
3188         
3189         switch (tso->why_blocked) {
3190         case BlockedOnMVar:
3191         case BlockedOnException:
3192             /* Called by GC - sched_mutex lock is currently held. */
3193             throwToSingleThreaded(cap, tso,
3194                                   (StgClosure *)BlockedOnDeadMVar_closure);
3195             break;
3196         case BlockedOnBlackHole:
3197             throwToSingleThreaded(cap, tso,
3198                                   (StgClosure *)NonTermination_closure);
3199             break;
3200         case BlockedOnSTM:
3201             throwToSingleThreaded(cap, tso,
3202                                   (StgClosure *)BlockedIndefinitely_closure);
3203             break;
3204         case NotBlocked:
3205             /* This might happen if the thread was blocked on a black hole
3206              * belonging to a thread that we've just woken up (raiseAsync
3207              * can wake up threads, remember...).
3208              */
3209             continue;
3210         default:
3211             barf("resurrectThreads: thread blocked in a strange way");
3212         }
3213     }
3214 }